Untitled
unknown
python
2 years ago
2.9 kB
11
Indexable
import rclpy
from rclpy.node import Node
from nav2_simple_commander.robot_navigator import BasicNavigator
from geometry_msgs.msg import PoseStamped, PoseWithCovarianceStamped
class PoseSubscriber(Node):
def __init__(self):
super().__init__("pose_subscriber")
self.subscription = self.create_subscription(PoseWithCovarianceStamped, "/amcl_pose", self.listener_callback, 10)
self.basic_navigator = BasicNavigator()
self.current_pose = None
self.pose_list = [] # Initialize pose_list
self.get_logger().info("Pose subscriber node initialized")
self.process_user_input()
def listener_callback(self, msg):
self.current_pose_list = msg.pose.pose.position
self.get_logger().info(f"Got new pose {self.current_pose}")
def process_user_input(self):
while rclpy.ok():
user_input = input("Enter command (q to quit, r to reset, s to save pose, f to navigate): ")
if user_input == "q":
self.get_logger().info("Quitting")
break
elif user_input == "r":
self.reset_pose()
elif user_input == "s":
self.save_pose()
elif user_input == "f":
self.navigate_to_current_pose()
else:
self.get_logger().warning("Invalid command. Please enter 'q', 'r', 's', or 'f'.")
rclpy.spin_once(self)
def reset_pose(self):
self.current_pose = None
self.get_logger().info("Pose reset")
def save_pose(self):
if self.current_pose is not None:
self.pose_list.append(self.current_pose)
self.get_logger().info(f"Saved new pose {self.current_pose}")
else:
self.get_logger().info("No current pose available.")
def navigate_to_current_pose(self):
if self.pose_list:
for coordinate in self.pose_list:
self.basic_navigator.goToPose(coordinate)
while not self.basic_navigator.isTaskComplete():
self.get_logger().info(f"Waiting to reach {coordinate}")
self.get_logger().info(f"Reached coordinate: {coordinate}")
else:
self.get_logger().info("No poses to go.")
def navigate_to_current_pose(self):
if self.pose_list:
kordinat = self.pose_list[0]
self.basic_navigator.goToPose(kordinat)
while not self.basic_navigator.isTaskComplete():
self.get_logger().info(f"Waiting to reach {kordinat}")
self.get_logger().info(f"Reached coordinate: {kordinat}")
else:
self.get_logger().info("No pose to go.")
def main(args=None):
rclpy.init(args=args)
node = PoseSubscriber()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()
Editor is loading...
Leave a Comment