Untitled

 avatar
unknown
python
a year ago
2.9 kB
9
Indexable
import rclpy
from rclpy.node import Node
from nav2_simple_commander.robot_navigator import BasicNavigator
from geometry_msgs.msg import PoseStamped, PoseWithCovarianceStamped

class PoseSubscriber(Node):
    def __init__(self):
        super().__init__("pose_subscriber")
        self.subscription = self.create_subscription(PoseWithCovarianceStamped, "/amcl_pose", self.listener_callback, 10)
        self.basic_navigator = BasicNavigator()
        self.current_pose = None
        self.pose_list = []  # Initialize pose_list
        self.get_logger().info("Pose subscriber node initialized")
        self.process_user_input()

    def listener_callback(self, msg):
        self.current_pose_list = msg.pose.pose.position
        self.get_logger().info(f"Got new pose {self.current_pose}")

    def process_user_input(self):
        while rclpy.ok():
            user_input = input("Enter command (q to quit, r to reset, s to save pose, f to navigate): ")
            if user_input == "q":
                self.get_logger().info("Quitting")
                break
            elif user_input == "r":
                self.reset_pose()
            elif user_input == "s":
                self.save_pose()
            elif user_input == "f":
                self.navigate_to_current_pose()
            else:
                self.get_logger().warning("Invalid command. Please enter 'q', 'r', 's', or 'f'.")
            rclpy.spin_once(self)

    def reset_pose(self):
        self.current_pose = None
        self.get_logger().info("Pose reset")

    def save_pose(self):
        if self.current_pose is not None:
            self.pose_list.append(self.current_pose)
            self.get_logger().info(f"Saved new pose {self.current_pose}")
        else:
            self.get_logger().info("No current pose available.")

    def navigate_to_current_pose(self):
        if self.pose_list:
            for coordinate in self.pose_list:
                self.basic_navigator.goToPose(coordinate)
                while not self.basic_navigator.isTaskComplete():
                    self.get_logger().info(f"Waiting to reach {coordinate}")
                self.get_logger().info(f"Reached coordinate: {coordinate}")
        else:
            self.get_logger().info("No poses to go.")


    def navigate_to_current_pose(self):
        if self.pose_list:
            kordinat = self.pose_list[0]
            self.basic_navigator.goToPose(kordinat)
            while not self.basic_navigator.isTaskComplete():
                self.get_logger().info(f"Waiting to reach {kordinat}")
            self.get_logger().info(f"Reached coordinate: {kordinat}")
        else:
            self.get_logger().info("No pose to go.")

def main(args=None):
    rclpy.init(args=args)
    node = PoseSubscriber()
    rclpy.spin(node)
    rclpy.shutdown()

if __name__ == "__main__":
    main()
Editor is loading...
Leave a Comment