Untitled
unknown
plain_text
5 months ago
15 kB
13
No Index
#!/usr/bin/env python3 """ Python node for the MonocularMode cpp node. Author: Azmyin Md. Kamal Date: 01/01/2024 Requirements * Dataset must be configured in EuRoC MAV format * Paths to dataset must be set before bulding (or running) this node * Make sure to set path to your workspace in common.hpp Command line arguments -- settings_name: EuRoC, TUM2, KITTI etc; the name of the .yaml file containing camera intrinsics and other configurations -- image_seq: MH01, V102, etc; the name of the image sequence you want to run """ # Imports #* Import Python modules import sys # System specific modules import os # Operating specific functions import glob import time # Python timing module import copy # For deepcopying arrays import shutil # High level folder operation tool from pathlib import Path # To find the "home" directory location import argparse # To accept user arguments from commandline import natsort # To ensure all images are chosen loaded in the correct order import yaml # To manipulate YAML files for reading configuration files import copy # For making deepcopies of openCV matrices, python lists, numpy arrays etc. import numpy as np # Python Linear Algebra module import cv2 # OpenCV #* ROS2 imports import ament_index_python.packages import rclpy from rclpy.node import Node from rclpy.parameter import Parameter # If you have more files in the submodules folder # from .submodules.py_utils import fn1 # Import helper functions from files in your submodules folder # Import a custom message interface # from your_custom_msg_interface.msg import CustomMsg #* Note the camel caps convention # Import ROS2 message templates from sensor_msgs.msg import Image # http://wiki.ros.org/sensor_msgs from sensor_msgs.msg import Imu from std_msgs.msg import String, Float64 # ROS2 string message template from cv_bridge import CvBridge, CvBridgeError # Library to convert image messages to numpy array #* Class definition class MonoDriver(Node): def __init__(self, node_name = "mono_py_node"): super().__init__(node_name) # Initializes the rclpy.Node class. It expects the name of the node # Initialize parameters to be passed from the command line (or launch file) self.declare_parameter("settings_name","EuRoC") self.declare_parameter("image_seq","NULL") #* Parse values sent by command line self.settings_name = str(self.get_parameter('settings_name').value) self.image_seq = str(self.get_parameter('image_seq').value) # DEBUG print(f"-------------- Received parameters --------------------------\n") print(f"self.settings_name: {self.settings_name}") print(f"self.image_seq: {self.image_seq}") print() # Global path definitions self.home_dir = str(Path.home()) + "/ros2_test/src/ros2_orb_slam3" #! Change this to match path to your workspace self.parent_dir = "TEST_DATASET" #! Change or provide path to the parent directory where data for all image sequences are stored #self.image_sequence_dir = self.home_dir + "/" + self.parent_dir + "/" + self.image_seq # Full path to the image sequence folder #print(f"self.image_sequence_dir: {self.image_sequence_dir}\n") # Global variables self.node_name = "mono_py_driver" self.image_seq_dir = "" self.imgz_seqz = [] self.time_seqz = [] # Maybe redundant self.frame = 0 self.timestamp = 0 self.acc = [] self.gyr = [] self.timestamp_imu = 0 # Define a CvBridge object self.br = CvBridge() self.create_subscription( Image, '/image_raw', # Replace with your camera topic self.image_callback, 10 # Queue size ) self.create_subscription( Imu, '/imu', # Replace with your camera topic self.imu_callback, 10 # Queue size ) self.timer = self.create_timer(0.05, self.timer_callback) # Read images from the chosen dataset, order them in ascending order and prepare timestep data as well #self.imgz_seqz_dir, self.imgz_seqz, self.time_seqz = self.get_image_dataset_asl(self.image_sequence_dir, "mav0") print(self.image_seq_dir) print(len(self.imgz_seqz)) #* ROS2 publisher/subscriber variables [HARDCODED] self.pub_exp_config_name = "/mono_py_driver/experiment_settings" self.sub_exp_ack_name = "/mono_py_driver/exp_settings_ack" self.pub_img_to_agent_name = "/mono_py_driver/img_msg" self.pub_timestep_to_agent_name = "/mono_py_driver/timestep_msg" self.pub_imu_to_agent_name = "/mono_py_driver/imu_msg" #self.pub_timestep_imu_to_agent_name = "/mono_py_driver/timestep_imu_msg" self.send_config = True # Set False once handshake is completed with the cpp node #* Setup ROS2 publishers and subscribers self.publish_exp_config_ = self.create_publisher(String, self.pub_exp_config_name, 1) # Publish configs to the ORB-SLAM3 C++ node #* Build the configuration string to be sent out #self.exp_config_msg = self.settings_name + "/" + self.image_seq # Example EuRoC/sample_euroc_MH05 self.exp_config_msg = self.settings_name # Example EuRoC print(f"Configuration to be sent: {self.exp_config_msg}") #* Subscriber to get acknowledgement from CPP node that it received experimetn settings self.subscribe_exp_ack_ = self.create_subscription(String, self.sub_exp_ack_name, self.ack_callback ,10) # Publisher to send RGB image self.publish_img_msg_ = self.create_publisher(Image, self.pub_img_to_agent_name, 1) self.publish_timestep_msg_ = self.create_publisher(Float64, self.pub_timestep_to_agent_name, 1) self.publish_imu_msg_ = self.create_publisher(Imu, self.pub_imu_to_agent_name, 1) #self.publish_timestep_imu_msg_ = self.create_publisher(Float64, self.pub_timestep_imu_to_agent_name, 1) # Initialize work variables for main logic self.start_frame = 0 # Default 0 self.end_frame = -1 # Default -1 self.frame_stop = -1 # Set -1 to use the whole sequence, some positive integer to force sequence to stop, 350 test2, 736 test3 self.show_imgz = False # Default, False, set True to see the output directly from this node self.frame_id = 0 # Integer id of an image frame self.frame_count = 0 # Ensure we are consistent with the count number of the frame self.inference_time = [] # List to compute average time print() print(f"MonoDriver initialized, attempting handshake with CPP node") # **************************************************************************************** # **************************************************************************************** # **************************************************************************************** def image_callback(self, msg): """ Callback function for live camera feed. """ try: # Convert ROS Image to OpenCV format frame = self.br.imgmsg_to_cv2(msg, "bgr8") # Generate a timestamp for SLAM timestamp = self.get_clock().now().to_msg().sec + self.get_clock().now().to_msg().nanosec * 1e-9 #self.run_py_node(frame, timestamp)#kp ob schlau hier # Debug: Display the frame (optional) #cv2.imshow("Live Camera Feed", frame) # TODO: Pass the frame and timestamp to ORB-SLAM (modify to fit your implementation) # Example: # orb_slam.TrackMonocular(frame, timestamp) if cv2.waitKey(1) & 0xFF == ord('q'): # Press 'q' to quit visualization rclpy.shutdown() except CvBridgeError as e: self.get_logger().error(f"Failed to convert image: {e}") self.frame = frame self.timestamp = timestamp return frame, timestamp def imu_callback(self, msg): try: acc = [msg.linear_acceleration.x, msg.linear_acceleration.y, msg.linear_acceleration.z] gyr = [msg.angular_velocity.x, msg.angular_velocity.y, msg.angular_velocity.z] # Generate a timestamp for SLAM timestamp_imu3 = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9 timestamp_imu = self.get_clock().now().to_msg().sec + self.get_clock().now().to_msg().nanosec * 1e-9 print(timestamp_imu) print(timestamp_imu3, "stamp") #self.run_py_node(acc, gyr, timestamp_imu)#kp ob schlau hier # Debug: Display the frame (optional) #cv2.imshow("Live Camera Feed", frame) # TODO: Pass the frame and timestamp to ORB-SLAM (modify to fit your implementation) # Example: # orb_slam.TrackMonocular(frame, timestamp) except CvBridgeError as e: self.get_logger().error(f"Failed to convert IMU: {e}") self.acc = acc self.gyr = gyr self.timestamp_imu = timestamp_imu return acc, gyr, timestamp_imu # **************************************************************************************** def ack_callback(self, msg): """ Callback function """ print(f"Got ack: {msg.data}") if(msg.data == "ACK"): self.send_config = False # self.subscribe_exp_ack_.destory() # TODO doesn't work # **************************************************************************************** def timer_callback(self): """ Timer callback to call run_py_node at a fixed rate. """ if self.frame is not None and self.acc is not None: self.run_py_node() else: self.get_logger().warning("Data not yet available for run_py_node.") # **************************************************************************************** def handshake_with_cpp_node(self): """ Send and receive acknowledge of sent configuration settings """ if (self.send_config == True): # print(f"Sent mesasge: {self.exp_config_msg}") msg = String() msg.data = self.exp_config_msg self.publish_exp_config_.publish(msg) time.sleep(0.01) # **************************************************************************************** # **************************************************************************************** def run_py_node(self): """ Master function that sends the RGB image message to the CPP node """ # Initialize work variables img_msg = None # sensor_msgs image object imu_msg = Imu() # Path to this image """img_look_up_path = self.imgz_seqz_dir + imgz_name timestep = float(imgz_name.split(".")[0])""" # Kept if you use a custom message interface to also pass timestep value self.frame_id = self.frame_id + 1 #print(img_look_up_path) # print(f"Frame ID: {frame_id}") # Based on the tutorials img_msg = self.br.cv2_to_imgmsg(self.frame, encoding="bgr8") timestep_msg = Float64() timestep_msg.data = self.timestamp #imu_msg.header.stamp = self.get_clock().now().to_msg() imu_msg.header.stamp.sec = int(self.timestamp_imu) imu_msg.header.stamp.nanosec = int((self.timestamp_imu % 1) * 1e9) imu_msg.header.frame_id = "imu_frame" imu_msg.linear_acceleration.x = self.acc[0] imu_msg.linear_acceleration.y = self.acc[1] imu_msg.linear_acceleration.z = self.acc[2] # Populate angular velocity imu_msg.angular_velocity.x = self.gyr[0] imu_msg.angular_velocity.y = self.gyr[1] imu_msg.angular_velocity.z = self.gyr[2] # Publish RGB image and timestep, must be in the order shown below. I know not very optimum, you can use a custom message interface to send both try: self.publish_timestep_msg_.publish(timestep_msg) self.publish_img_msg_.publish(img_msg) self.publish_imu_msg_.publish(imu_msg) except CvBridgeError as e: print(e) # **************************************************************************************** # main function def main(args = None): rclpy.init(args=args) # Initialize node n = MonoDriver("mono_py_node") #* Initialize the node rate = n.create_rate(20) # https://answers.ros.org/question/358343/rate-and-sleep-function-in-rclpy-library-for-ros2/ #* Blocking loop to initialize handshake """while(n.send_config == True): n.handshake_with_cpp_node() rclpy.spin_once(n) #self.rate.sleep(10) # Potential bug, breaks code if(n.send_config == False): break print(f"Handshake complete")""" #* Blocking loop to send RGB image and timestep message """for idx, imgz_name in enumerate(n.imgz_seqz[n.start_frame:n.end_frame]): try: rclpy.spin_once(n) # Blocking we need a non blocking take care of callbacks n.run_py_node(idx, imgz_name) rate.sleep() # DEBUG, if you want to halt sending images after a certain Frame is reached if (n.frame_id>n.frame_stop and n.frame_stop != -1): print(f"BREAK!") break except KeyboardInterrupt: break""" try: if(n.send_config == True): n.handshake_with_cpp_node() print(f"Handshake complete") rclpy.spin(n) #self.rate.sleep(10) # Potential bug, breaks code print(f"after spin node") #rclpy.spin(n) # Blocking we need a non blocking take care of callbacks #n.run_py_node(n.frame, n.timestamp, n.acc, n.gyr, n.timestamp_imu) #rate.sleep() # DEBUG, if you want to halt sending images after a certain Frame is reached except KeyboardInterrupt: print("Shutting down node...") finally: cv2.destroyAllWindows() # Close all OpenCV windows n.destroy_node() # Destroy the node rclpy.shutdown() # Shutdown ROS2 # Dunders, this .py is the main file if __name__=="__main__": main()
Editor is loading...
Leave a Comment