Untitled
unknown
plain_text
a month ago
19 kB
6
Indexable
import sys import random import math from enum import Enum import rclpy from rclpy.node import Node from rclpy.signals import SignalHandlerOptions from rclpy.executors import ExternalShutdownException from rclpy.callback_groups import MutuallyExclusiveCallbackGroup from rclpy.executors import ExternalShutdownException, MultiThreadedExecutor from rclpy.qos import QoSPresetProfiles from sensor_msgs.msg import Image from sensor_msgs.msg import LaserScan from nav_msgs.msg import Odometry from geometry_msgs.msg import Twist, Pose from tf_transformations import euler_from_quaternion import angles from rclpy.publisher import Publisher from auro_interfaces.srv import ItemRequest from assessment_interfaces.msg import ItemHolders, ItemLog, ItemList, RobotList, ZoneList ''' GLOBAL VARIABLE DECLERATION ''' LINEAR_VELOCITY = 0.3 ANGULAR_VELOCITY = 0.3 TURN_LEFT = 1 TURN_RIGHT = -1 SCAN_THRESHOLD = 0.5 SCAN_FRONT = 0 SCAN_LEFT = 1 SCAN_BACK = 2 SCAN_RIGHT = 3 class State(Enum): NAVIGATE_ITEM = 0 NAVIGATE_ZONE = 1 COLLECTING = 2 OFFLOADING = 3 SEARCHING = 4 class RobotController(Node): def __init__(self): super().__init__('robot_controller') self.state = State.SEARCHING self.declare_parameter('x', 0.0) self.declare_parameter('y', 0.0) self.declare_parameter('yaw', 0.0) self.initial_x = self.get_parameter('x').get_parameter_value().double_value self.initial_y = self.get_parameter('y').get_parameter_value().double_value self.initial_yaw = self.get_parameter('yaw').get_parameter_value().double_value # For turning 90 degrees self.current_yaw = 0.0 self.desired_yaw = math.radians(90) self.scan_triggered = [False] * 4 self.SCAN_FRONT = 0 self.SCAN_LEFT = 1 self.SCAN_BACK = 2 self.SCAN_RIGHT = 3 self.obstacle_detected = False self.item_to_collect = None self.zone_to_navigate = None self.item_detected = False self.zone_detected = False self.search_for_item_state = True self.search_for_zone_state = False self.items = ItemList() self.zones = ZoneList() item_callback_group = MutuallyExclusiveCallbackGroup() timer_callback_group = MutuallyExclusiveCallbackGroup() client_callback_group = MutuallyExclusiveCallbackGroup() timer_callback_group = MutuallyExclusiveCallbackGroup() self.timer_period = 0.1 # 100 milliseconds = 10 Hz self.timer = self.create_timer(self.timer_period, self.control_loop) # Publisher and subscribers self.create_subscription(ItemHolders, '/item_holders', self.item_holders, 10, callback_group=timer_callback_group) self.create_subscription(ItemLog, '/item_log', self.item_log, 10, callback_group=timer_callback_group) self.cmd_vel_pub_1 = self.create_publisher(Twist, '/robot1/cmd_vel', 10) self.create_subscription(Odometry, '/robot1/odom', self.odom_callback, 10, callback_group=timer_callback_group) self.create_subscription(LaserScan, '/robot1/scan', self.scan_callback, QoSPresetProfiles.SENSOR_DATA.value, callback_group=timer_callback_group) self.create_subscription(ItemList, '/robot1/items', self.item_callback, 10, callback_group=timer_callback_group) self.create_subscription(RobotList, '/robot1/robots', self.robot_callback, 10, callback_group=timer_callback_group) self.create_subscription(ZoneList, '/robot1/zone', self.zone_callback, 10, callback_group=timer_callback_group) try: self.cmd_vel_pub_2 = self.create_publisher(Twist, '/robot2/cmd_vel', 10) self.create_subscription(Odometry, '/robot2/odom', self.odom_callback, 10, callback_group=timer_callback_group) self.create_subscription(LaserScan, '/robot2/scan', self.scan_callback, QoSPresetProfiles.SENSOR_DATA.value, callback_group=timer_callback_group) self.create_subscription(ItemList, '/robot2/items', self.item_callback, 10, callback_group=timer_callback_group) self.create_subscription(RobotList, '/robot2/robots', self.robots, 10, callback_group=timer_callback_group) self.create_subscription(ZoneList, '/robot2/zone', self.zone, 10, callback_group=timer_callback_group) except: pass try: self.cmd_vel_pub_3 = self.create_publisher(Twist, '/robot3/cmd_vel', 10) self.create_subscription(Odometry, '/robot3/odom', self.odom_callback, 10, callback_group=timer_callback_group) self.create_subscription(LaserScan, '/robot3/scan', self.scan_callback, QoSPresetProfiles.SENSOR_DATA.value, callback_group=timer_callback_group) self.create_subscription(ItemList, '/robot3/items', self.item_callback, 10, callback_group=timer_callback_group) self.create_subscription(RobotList, '/robot3/robots', self.robots, 10, callback_group=timer_callback_group) self.create_subscription(ZoneList, '/robot3/zone', self.zone, 10, callback_group=timer_callback_group) except: pass # Service Clients self.pick_up_service = self.create_client(ItemRequest, '/pick_up_item', callback_group=item_callback_group) self.offload_service = self.create_client(ItemRequest, '/offload_item', callback_group=item_callback_group) ''' CALLBACKS FOR SUBSCRIPTIONS ''' def item_holders(self, msg): # Detect number of robots in map for later subscriptions pass # Detect number of robots robot_amount = len(msg.data) # self.get_logger().info(f"XXXXXXXXXXXXXXX: {len(msg.data)}") if robot_amount == 1: # self.get_logger().info("There is 1 robot") pass elif robot_amount == 2: # self.get_logger().info("There are 2 robots") pass elif robot_amount == 3: # self.get_logger().info("There are 3 robot") pass else: # self.get_logger().info("ERROR") pass def odom_callback(self, msg): self.pose = msg.pose.pose (roll, pitch, yaw) = euler_from_quaternion([self.pose.orientation.x, self.pose.orientation.y, self.pose.orientation.z, self.pose.orientation.w]) self.yaw = yaw def scan_callback(self, msg): front_ranges = msg.ranges[331:359] + msg.ranges[0:30] left_ranges = msg.ranges[31:90] back_ranges = msg.ranges[91:270] right_ranges = msg.ranges[271:330] self.scan_triggered[SCAN_FRONT] = min(front_ranges) < SCAN_THRESHOLD self.scan_triggered[SCAN_LEFT] = min(left_ranges) < SCAN_THRESHOLD self.scan_triggered[SCAN_BACK] = min(back_ranges) < SCAN_THRESHOLD self.scan_triggered[SCAN_RIGHT] = min(right_ranges) < SCAN_THRESHOLD def item_callback(self, msg): self.items = msg def item_log(self, msg): self.item_log = msg def robot_callback(self, msg): self.robots = msg def zone_callback(self, msg): self.zones = msg ''' MAIN FSM LOGIC How it works: (1) Search for nearest item (2) Go to item and pick up (3) Check item is available (i.e Zone's not occupied of that colour yet) (3a) If item colour belongs to Zone, Return item to that zone -> 5 (3b) Choose nearest Zone detected to offload item (4) Move item to nearest Zone (5) Offload Item (6) Return to search for Item (7) Repeat ''' def control_loop(self): match self.state: case State.NAVIGATE_ITEM: # When we want to find an item but don't currently hold one, we will be in this state self.get_logger().info("In item navigation state") self.navigate_to_item() case State.NAVIGATE_ZONE: # When we have collected the item and want to navigate to a zone, we will be in this state self.get_logger().info("In zone navigation state") self.navigate_to_zone() case State.COLLECTING: # Collect the item and check if it was a success or not self.get_logger().info("In collecting state") request = ItemRequest.Request() request.robot_id = 'robot1' future = self.pick_up_service.call_async(request) self.executor.spin_until_future_complete(future) response = future.result() if response.success: self.get_logger().info("Item successfully picked up") self.search_for_item_state = False self.search_for_zone_state = True self.state = State.SEARCHING else: self.get_logger().info(f"Failed to pick up item: {response}") case State.OFFLOADING: # We're in a zone and now need to offload the item self.get_logger().info("In offloading state") request = ItemRequest.Request() request.robot_id = 'robot1' future = self.offload_service.call_async(request) self.executor.spin_until_future_complete(future) response = future.result() if response.success: self.get_logger().info("Item successfully offloaded") self.search_for_zone_state = False self.zone_to_navigate = False self.search_for_item_state = True self.item_detected = False self.state = State.SEARCHING else: self.get_logger.info(f"Failed to off load item: {response}") case State.SEARCHING: # Need to move appropriately depending on the given context # of what or where we need to search for # self.get_logger().info("in searching state") # (a) - items if len(self.items.data) == 0: self.item_detected = False else: self.item_detected = True if self.search_for_item_state and not self.item_detected: # Need to move until item detected msg = Twist() yaw_error = angles.shortest_angular_distance(self.current_yaw, self.desired_yaw) # Add a tollerance to our turning if abs(yaw_error) > math.radians(2): # the yaw tolerance msg.angular.z = ANGULAR_VELOCITY if yaw_error > 0 else -ANGULAR_VELOCITY self.get_logger().info("TURNING 90 DEGREES") else: msg.angular.z = 0.0 self.get_logger().info("Completed turn") self.current_yaw = 0.0 # Reset values self.desired_yaw = math.radians(90) self.cmd_vel_pub_1.publish(msg) self.cmd_vel_pub_2.publish(msg) self.cmd_vel_pub_3.publish(msg) elif self.search_for_item_state and self.item_detected: self.state = State.NAVIGATE_ITEM # (b) - zones if len(self.zones.data) == 0: self.zone_detected = False else: self.zone_detected = True if self.search_for_zone_state and not self.zone_detected: # Need to move until zone detected msg = Twist() yaw_error = angles.shortest_angular_distance(self.current_yaw, self.desired_yaw) if abs(yaw_error) > math.radians(2): # the yaw tolerances msg.angular.z = ANGULAR_VELOCITY if yaw_error > 0 else -ANGULAR_VELOCITY self.get_logger().info("TURNING 90 DEGREES") else: msg.angular.z = 0.0 self.get_logger().info("Completed turn") self.current_yaw = 0.0 # Reset values self.desired_yaw = math.radians(90) self.cmd_vel_pub_1.publish(msg) self.cmd_vel_pub_2.publish(msg) self.cmd_vel_pub_3.publish(msg) elif self.search_for_zone_state and self.zone_detected: self.state = State.NAVIGATE_ZONE def navigate_to_item(self): # Used to move robot to item detected # self.get_logger().info(f"self.items = {self.items}") if len(self.items.data) == 0: # If robot sensor doesn't detect anything, move random until detected self.item_detected = False return else: self.item_detected = True self.item_to_collect = self.items.data[0] # Use scanner to avoid collisions msg = Twist() # self.get_logger().info(f"Navigating to item: {self.item_to_collect}") estimated_distance = 32.4 * float(self.item_to_collect.diameter) ** -0.75 self.get_logger().info(f'Estimated distance {estimated_distance}') if self.scan_triggered[SCAN_FRONT] or self.scan_triggered[SCAN_LEFT] or self.scan_triggered[SCAN_RIGHT]: # Need to also check this in the detection state as otherwise ignores # close object by isolating states if estimated_distance <= 0.35: # If close enough, pick up the ITEM! msg.linear.x = 0.1 msg.angular.z = 0.0 self.cmd_vel_pub_1.publish(msg) self.cmd_vel_pub_2.publish(msg) self.cmd_vel_pub_3.publish(msg) self.obstacle_detected=False self.get_logger().info("Close enough to pick up ITEM, changing state.") self.state = State.COLLECTING self.obstacle_detected = True self.get_logger().info("Obstacle in front turning to avoid") msg.linear.x = 0.01 yaw_error = angles.shortest_angular_distance(self.current_yaw, self.desired_yaw) if abs(yaw_error) > math.radians(5): # the yaw tolerance msg.angular.z = ANGULAR_VELOCITY if yaw_error > 0 else -ANGULAR_VELOCITY else: msg.angular.z = 0.0 self.get_logger().info("Completed turn") self.current_yaw = 0.0 # Reset values self.desired_yaw = math.radians(90) self.cmd_vel_pub_1.publish(msg) self.cmd_vel_pub_2.publish(msg) self.cmd_vel_pub_3.publish(msg) else: self.obstacle_detected = False if self.obstacle_detected: pass else: # Only do movement when no obstacle detected msg.linear.x = LINEAR_VELOCITY msg.angular.z = self.item_to_collect.x / 320.0 self.cmd_vel_pub_1.publish(msg) self.cmd_vel_pub_2.publish(msg) self.cmd_vel_pub_3.publish(msg) if estimated_distance <= 0.35: # If close enough, pick up the ITEM! msg.linear.x = 0.1 msg.angular.z = 0.0 self.cmd_vel_pub_1.publish(msg) self.cmd_vel_pub_2.publish(msg) self.cmd_vel_pub_3.publish(msg) self.get_logger().info("Close enough to pick up ITEM, changing state.") self.state = State.COLLECTING def navigate_to_zone(self): # Used to move robot to its desired zone if len(self.zones.data) == 0: self.zone_detected = False return else: self.zone_detected = True self.zone_to_navigate = self.zones.data[0] self.get_logger().info(f"Zone info: {self.zone_to_navigate}" ) msg = Twist() # estimated_distance = 32.4 * float(self.zone_to_navigate.size) ** -0.75 # self.get_logger().info(f"Estimated distance (ZONE): {estimated_distance}") if self.scan_triggered[SCAN_FRONT]: self.obstacle_detected = True self.get_logger().info("Obstacle in front turning to avoid") msg.linear.x = 0.1 yaw_error = angles.shortest_angular_distance(self.current_yaw, self.desired_yaw) if abs(yaw_error) > math.radians(2): # the yaw tolerance msg.angular.z = ANGULAR_VELOCITY if yaw_error > 0 else -ANGULAR_VELOCITY else: msg.angular.z = 0.0 self.get_logger().info("Completed turn") self.current_yaw = 0.0 # Reset values self.desired_yaw = math.radians(90) elif self.scan_triggered[SCAN_LEFT]: self.obstacle_detected = True self.get_logger().info("Obstacle to the left") msg.angular.z = -ANGULAR_VELOCITY elif self.scan_triggered[SCAN_RIGHT]: self.obstacle_detected = True self.get_logger().info("Obstacle to the right") msg.angular.z = ANGULAR_VELOCITY else: self.obstacle_detected = False self.cmd_vel_pub_1.publish(msg) self.cmd_vel_pub_2.publish(msg) self.cmd_vel_pub_3.publish(msg) msg.linear.x = LINEAR_VELOCITY msg.angular.z = self.zone_to_navigate.x / 310 self.cmd_vel_pub_1.publish(msg) self.cmd_vel_pub_2.publish(msg) self.cmd_vel_pub_3.publish(msg) zone_size = self.zone_to_navigate.size if zone_size == 1.0: self.get_logger().info("IN ZONE") msg.linear.x = 0.0 msg.angular.z = 0.0 self.cmd_vel_pub_1.publish(msg) self.cmd_vel_pub_2.publish(msg) self.cmd_vel_pub_3.publish(msg) self.search_for_zone_state = False self.state = State.OFFLOADING def destroy_node(self): super().destroy_node() def main(args=None): rclpy.init(args = args, signal_handler_options = SignalHandlerOptions.NO) node = RobotController() try: rclpy.spin(node) except KeyboardInterrupt: pass except ExternalShutdownException: sys.exit(1) finally: node.destroy_node() rclpy.try_shutdown() if __name__ == '__main__': main()
Editor is loading...
Leave a Comment