Untitled
unknown
plain_text
10 months ago
936 B
6
Indexable
def disconnect(self, close_code) -> None:
for subscriber in self.subscribers:
subscriber.unregister()
for timer in self.timers:
timer.shutdown()
def forward_ros_topic(self, topic_name: str, topic_type: Type, gui_msg_type: str) -> None:
"""
Subscribes to a ROS topic and forwards messages to the GUI as JSON
@param topic_name: ROS topic name
@param topic_type: ROS message type
@param gui_msg_type: String to identify the message type in the GUI
"""
def callback(ros_message: Any):
# Formatting a ROS message as a string outputs YAML
# Parse it back into a dictionary, so we can send it as JSON
self.send_message_as_json({"type": gui_msg_type, **yaml.safe_load(str(ros_message))})
self.subscribers.append(rospy.Subscriber(topic_name, topic_type, callback))Editor is loading...
Leave a Comment