Untitled
def disconnect(self, close_code) -> None: for subscriber in self.subscribers: subscriber.unregister() for timer in self.timers: timer.shutdown() def forward_ros_topic(self, topic_name: str, topic_type: Type, gui_msg_type: str) -> None: """ Subscribes to a ROS topic and forwards messages to the GUI as JSON @param topic_name: ROS topic name @param topic_type: ROS message type @param gui_msg_type: String to identify the message type in the GUI """ def callback(ros_message: Any): # Formatting a ROS message as a string outputs YAML # Parse it back into a dictionary, so we can send it as JSON self.send_message_as_json({"type": gui_msg_type, **yaml.safe_load(str(ros_message))}) self.subscribers.append(rospy.Subscriber(topic_name, topic_type, callback)) # example of usage we pass in the topic name, which is found in the msg folder, the topic type (defined in the message of that topic), and the gui_msg_type, which is basically used in calling a function that will turn the message into a json message and send the message to itself (so the other functions can handle it and send it to the frontend) self.forward_ros_topic("/sa_humidity_data", RelativeHumidity, "soil_humidity")
Leave a Comment