Untitled

 avatar
unknown
plain_text
17 days ago
1.4 kB
3
Indexable
def disconnect(self, close_code) -> None:
        for subscriber in self.subscribers:
            subscriber.unregister()
        for timer in self.timers:
            timer.shutdown()

    def forward_ros_topic(self, topic_name: str, topic_type: Type, gui_msg_type: str) -> None:
        """
        Subscribes to a ROS topic and forwards messages to the GUI as JSON

        @param topic_name:      ROS topic name
        @param topic_type:      ROS message type
        @param gui_msg_type:    String to identify the message type in the GUI
        """

        def callback(ros_message: Any):
            # Formatting a ROS message as a string outputs YAML
            # Parse it back into a dictionary, so we can send it as JSON
            self.send_message_as_json({"type": gui_msg_type, **yaml.safe_load(str(ros_message))})

        self.subscribers.append(rospy.Subscriber(topic_name, topic_type, callback))


# example of usage we pass in the topic name, which is found in the msg folder, the topic type (defined in the message of that topic), and the gui_msg_type, which is basically used in calling a function that will turn the message into a json message and send the message to itself (so the other functions can handle it and send it to the frontend)
self.forward_ros_topic("/sa_humidity_data", RelativeHumidity, "soil_humidity")
Leave a Comment