Untitled
unknown
plain_text
21 days ago
936 B
3
Indexable
def disconnect(self, close_code) -> None: for subscriber in self.subscribers: subscriber.unregister() for timer in self.timers: timer.shutdown() def forward_ros_topic(self, topic_name: str, topic_type: Type, gui_msg_type: str) -> None: """ Subscribes to a ROS topic and forwards messages to the GUI as JSON @param topic_name: ROS topic name @param topic_type: ROS message type @param gui_msg_type: String to identify the message type in the GUI """ def callback(ros_message: Any): # Formatting a ROS message as a string outputs YAML # Parse it back into a dictionary, so we can send it as JSON self.send_message_as_json({"type": gui_msg_type, **yaml.safe_load(str(ros_message))}) self.subscribers.append(rospy.Subscriber(topic_name, topic_type, callback))
Editor is loading...
Leave a Comment