Untitled
unknown
plain_text
a year ago
1.4 kB
8
Indexable
def disconnect(self, close_code) -> None:
for subscriber in self.subscribers:
subscriber.unregister()
for timer in self.timers:
timer.shutdown()
def forward_ros_topic(self, topic_name: str, topic_type: Type, gui_msg_type: str) -> None:
"""
Subscribes to a ROS topic and forwards messages to the GUI as JSON
@param topic_name: ROS topic name
@param topic_type: ROS message type
@param gui_msg_type: String to identify the message type in the GUI
"""
def callback(ros_message: Any):
# Formatting a ROS message as a string outputs YAML
# Parse it back into a dictionary, so we can send it as JSON
self.send_message_as_json({"type": gui_msg_type, **yaml.safe_load(str(ros_message))})
self.subscribers.append(rospy.Subscriber(topic_name, topic_type, callback))
# example of usage we pass in the topic name, which is found in the msg folder, the topic type (defined in the message of that topic), and the gui_msg_type, which is basically used in calling a function that will turn the message into a json message and send the message to itself (so the other functions can handle it and send it to the frontend)
self.forward_ros_topic("/sa_humidity_data", RelativeHumidity, "soil_humidity")Editor is loading...
Leave a Comment