Untitled

 avatar
unknown
plain_text
a month ago
19 kB
6
Indexable
import sys
import random
import math
from enum import Enum

import rclpy
from rclpy.node import Node
from rclpy.signals import SignalHandlerOptions
from rclpy.executors import ExternalShutdownException
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.executors import ExternalShutdownException, MultiThreadedExecutor
from rclpy.qos import QoSPresetProfiles

from sensor_msgs.msg import Image
from sensor_msgs.msg import LaserScan
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Twist, Pose

from tf_transformations import euler_from_quaternion
import angles
from rclpy.publisher import Publisher

from auro_interfaces.srv import ItemRequest
from assessment_interfaces.msg import ItemHolders, ItemLog, ItemList, RobotList, ZoneList



''' 

GLOBAL VARIABLE DECLERATION

'''
LINEAR_VELOCITY  = 0.3
ANGULAR_VELOCITY = 0.3

TURN_LEFT = 1
TURN_RIGHT = -1

SCAN_THRESHOLD = 0.5
SCAN_FRONT = 0
SCAN_LEFT = 1
SCAN_BACK = 2
SCAN_RIGHT = 3



class State(Enum):
    NAVIGATE_ITEM = 0
    NAVIGATE_ZONE = 1
    COLLECTING = 2
    OFFLOADING = 3
    SEARCHING = 4



class RobotController(Node):

    def __init__(self):
        super().__init__('robot_controller')
        
        self.state = State.SEARCHING
        self.declare_parameter('x', 0.0)
        self.declare_parameter('y', 0.0)
        self.declare_parameter('yaw', 0.0)

        self.initial_x = self.get_parameter('x').get_parameter_value().double_value
        self.initial_y = self.get_parameter('y').get_parameter_value().double_value
        self.initial_yaw = self.get_parameter('yaw').get_parameter_value().double_value

        # For turning 90 degrees
        self.current_yaw = 0.0
        self.desired_yaw = math.radians(90)
        

        self.scan_triggered = [False] * 4
        self.SCAN_FRONT = 0
        self.SCAN_LEFT = 1
        self.SCAN_BACK = 2
        self.SCAN_RIGHT = 3
        self.obstacle_detected = False

        self.item_to_collect = None
        self.zone_to_navigate = None
        self.item_detected = False
        self.zone_detected = False
        self.search_for_item_state = True
        self.search_for_zone_state = False

        self.items = ItemList()
        self.zones = ZoneList()
        

        item_callback_group = MutuallyExclusiveCallbackGroup()
        timer_callback_group = MutuallyExclusiveCallbackGroup()
        client_callback_group = MutuallyExclusiveCallbackGroup()
        timer_callback_group = MutuallyExclusiveCallbackGroup()
    
        self.timer_period = 0.1 # 100 milliseconds = 10 Hz
        self.timer = self.create_timer(self.timer_period, self.control_loop)


        # Publisher and subscribers 
        self.create_subscription(ItemHolders, '/item_holders', self.item_holders, 10, callback_group=timer_callback_group)
        self.create_subscription(ItemLog, '/item_log', self.item_log, 10, callback_group=timer_callback_group)

        self.cmd_vel_pub_1 = self.create_publisher(Twist, '/robot1/cmd_vel', 10)
        self.create_subscription(Odometry, '/robot1/odom', self.odom_callback, 10, callback_group=timer_callback_group)
        self.create_subscription(LaserScan, '/robot1/scan', self.scan_callback, QoSPresetProfiles.SENSOR_DATA.value, callback_group=timer_callback_group)
        self.create_subscription(ItemList, '/robot1/items', self.item_callback, 10, callback_group=timer_callback_group)
        self.create_subscription(RobotList, '/robot1/robots', self.robot_callback, 10, callback_group=timer_callback_group)
        self.create_subscription(ZoneList, '/robot1/zone', self.zone_callback, 10, callback_group=timer_callback_group)
        try:
            self.cmd_vel_pub_2 = self.create_publisher(Twist, '/robot2/cmd_vel', 10)
            self.create_subscription(Odometry, '/robot2/odom', self.odom_callback, 10, callback_group=timer_callback_group)
            self.create_subscription(LaserScan, '/robot2/scan', self.scan_callback, QoSPresetProfiles.SENSOR_DATA.value, callback_group=timer_callback_group)
            self.create_subscription(ItemList, '/robot2/items', self.item_callback, 10, callback_group=timer_callback_group)
            self.create_subscription(RobotList, '/robot2/robots', self.robots, 10, callback_group=timer_callback_group)
            self.create_subscription(ZoneList, '/robot2/zone', self.zone, 10, callback_group=timer_callback_group)
        except:
            pass
        try:
            self.cmd_vel_pub_3 = self.create_publisher(Twist, '/robot3/cmd_vel', 10)
            self.create_subscription(Odometry, '/robot3/odom', self.odom_callback, 10, callback_group=timer_callback_group)
            self.create_subscription(LaserScan, '/robot3/scan', self.scan_callback, QoSPresetProfiles.SENSOR_DATA.value, callback_group=timer_callback_group)
            self.create_subscription(ItemList, '/robot3/items', self.item_callback, 10, callback_group=timer_callback_group)
            self.create_subscription(RobotList, '/robot3/robots', self.robots, 10, callback_group=timer_callback_group)
            self.create_subscription(ZoneList, '/robot3/zone', self.zone, 10, callback_group=timer_callback_group)
        except:
            pass
        

        # Service Clients
        self.pick_up_service = self.create_client(ItemRequest, '/pick_up_item', callback_group=item_callback_group)
        self.offload_service = self.create_client(ItemRequest, '/offload_item', callback_group=item_callback_group)

        

    '''
    
    CALLBACKS FOR SUBSCRIPTIONS 

    '''

    def item_holders(self, msg):
        # Detect number of robots in map for later subscriptions
        pass
        # Detect number of robots
        robot_amount = len(msg.data)
        # self.get_logger().info(f"XXXXXXXXXXXXXXX: {len(msg.data)}")
        if robot_amount == 1:
            # self.get_logger().info("There is 1 robot")
            pass
        elif robot_amount == 2:
            # self.get_logger().info("There are 2 robots")
            pass
        elif robot_amount == 3:
            # self.get_logger().info("There are 3 robot")
            pass
        else:
            # self.get_logger().info("ERROR")
            pass

    
    def odom_callback(self, msg):
        self.pose = msg.pose.pose
        (roll, pitch, yaw) = euler_from_quaternion([self.pose.orientation.x,
                                                    self.pose.orientation.y,
                                                    self.pose.orientation.z,
                                                    self.pose.orientation.w])
        self.yaw = yaw

        
    def scan_callback(self, msg):
        front_ranges = msg.ranges[331:359] + msg.ranges[0:30] 
        left_ranges  = msg.ranges[31:90] 
        back_ranges  = msg.ranges[91:270] 
        right_ranges = msg.ranges[271:330]
        self.scan_triggered[SCAN_FRONT] = min(front_ranges) < SCAN_THRESHOLD 
        self.scan_triggered[SCAN_LEFT]  = min(left_ranges)  < SCAN_THRESHOLD
        self.scan_triggered[SCAN_BACK]  = min(back_ranges)  < SCAN_THRESHOLD
        self.scan_triggered[SCAN_RIGHT] = min(right_ranges) < SCAN_THRESHOLD

    def item_callback(self, msg):
        self.items = msg

    def item_log(self, msg):
        self.item_log = msg

    def robot_callback(self, msg):
        self.robots = msg

    def zone_callback(self, msg):
        self.zones = msg
    '''
    
    MAIN FSM LOGIC
    
    How it works:
    (1) Search for nearest item
    (2) Go to item and pick up
    (3) Check item is available (i.e Zone's not occupied of that colour yet)   
        (3a) If item colour belongs to Zone, Return item to that zone -> 5
        (3b) Choose nearest Zone detected to offload item
    (4) Move item to nearest Zone
    (5) Offload Item
    (6) Return to search for Item
    (7) Repeat

    '''

    def control_loop(self):        
        match self.state:
            case State.NAVIGATE_ITEM:
                # When we want to find an item but don't currently hold one, we will be in this state
                self.get_logger().info("In item navigation state")
                self.navigate_to_item()

            case State.NAVIGATE_ZONE:
                # When we have collected the item and want to navigate to a zone, we will be in this state
                self.get_logger().info("In zone navigation state")
                self.navigate_to_zone()

            case State.COLLECTING:
                # Collect the item and check if it was a success or not
                self.get_logger().info("In collecting state")
                request =  ItemRequest.Request()
                request.robot_id = 'robot1'

                future = self.pick_up_service.call_async(request)
                self.executor.spin_until_future_complete(future)
                response = future.result()
                
                if response.success:
                    self.get_logger().info("Item successfully picked up")
                    self.search_for_item_state = False
                    self.search_for_zone_state = True
                    self.state = State.SEARCHING
                else:
                    self.get_logger().info(f"Failed to pick up item: {response}")

            case State.OFFLOADING:
                # We're in a zone and now need to offload the item
                self.get_logger().info("In offloading state")
                request = ItemRequest.Request()
                request.robot_id = 'robot1'

                future = self.offload_service.call_async(request)
                self.executor.spin_until_future_complete(future)
                response = future.result()

                if response.success:
                    self.get_logger().info("Item successfully offloaded")
                    self.search_for_zone_state = False
                    self.zone_to_navigate = False
                    self.search_for_item_state = True
                    self.item_detected = False
                    self.state = State.SEARCHING
                else:
                    self.get_logger.info(f"Failed to off load item:  {response}")


            case State.SEARCHING:
                # Need to move appropriately depending on the given context
                # of what or where we need to search for
                
                # self.get_logger().info("in searching state")
                # (a) - items
                if len(self.items.data) == 0:
                    self.item_detected = False
                else:
                    self.item_detected = True
                
                if self.search_for_item_state and not self.item_detected:
                    # Need to move until item detected
                    msg = Twist()
                    yaw_error = angles.shortest_angular_distance(self.current_yaw, self.desired_yaw)
                    # Add a tollerance to our turning
                    if abs(yaw_error) > math.radians(2): # the yaw tolerance

                        msg.angular.z = ANGULAR_VELOCITY if yaw_error > 0 else  -ANGULAR_VELOCITY
                        self.get_logger().info("TURNING 90 DEGREES")
                    else:
                        msg.angular.z = 0.0
                        self.get_logger().info("Completed turn")
                        self.current_yaw = 0.0 # Reset values
                        self.desired_yaw = math.radians(90)
                    
                    self.cmd_vel_pub_1.publish(msg)
                    self.cmd_vel_pub_2.publish(msg)
                    self.cmd_vel_pub_3.publish(msg)
                    
                elif self.search_for_item_state and self.item_detected:
                    self.state = State.NAVIGATE_ITEM
                
                # (b) - zones
                if len(self.zones.data) == 0:
                    self.zone_detected = False
                else:
                    self.zone_detected = True
                if self.search_for_zone_state and not self.zone_detected:
                    # Need to move until zone detected
                    msg = Twist()
                    yaw_error = angles.shortest_angular_distance(self.current_yaw, self.desired_yaw)
                    
                    if abs(yaw_error) > math.radians(2): # the yaw tolerances

                        msg.angular.z = ANGULAR_VELOCITY if yaw_error > 0 else  -ANGULAR_VELOCITY
                        self.get_logger().info("TURNING 90 DEGREES")
                    else:
                        msg.angular.z = 0.0
                        self.get_logger().info("Completed turn")
                        self.current_yaw = 0.0 # Reset values
                        self.desired_yaw = math.radians(90)
                    
                    self.cmd_vel_pub_1.publish(msg)
                    self.cmd_vel_pub_2.publish(msg)
                    self.cmd_vel_pub_3.publish(msg)


                elif self.search_for_zone_state and self.zone_detected:
                    self.state = State.NAVIGATE_ZONE


    def navigate_to_item(self):
        # Used to move robot to item detected
        # self.get_logger().info(f"self.items = {self.items}")
        if len(self.items.data) == 0:
            # If robot sensor doesn't detect anything, move random until detected
            self.item_detected = False
            return
        else:
            self.item_detected = True
            self.item_to_collect = self.items.data[0]
            
        # Use scanner to avoid collisions
        msg = Twist()
        # self.get_logger().info(f"Navigating to item: {self.item_to_collect}")
        estimated_distance = 32.4 * float(self.item_to_collect.diameter) ** -0.75
        self.get_logger().info(f'Estimated distance {estimated_distance}')
        
        if self.scan_triggered[SCAN_FRONT] or self.scan_triggered[SCAN_LEFT] or self.scan_triggered[SCAN_RIGHT]:
            
            # Need to also check this in the detection state as otherwise ignores 
            # close object by isolating states
            if estimated_distance <= 0.35:
                # If close enough, pick up the ITEM!
                msg.linear.x = 0.1
                msg.angular.z = 0.0
                self.cmd_vel_pub_1.publish(msg)
                self.cmd_vel_pub_2.publish(msg)
                self.cmd_vel_pub_3.publish(msg)
                self.obstacle_detected=False
                self.get_logger().info("Close enough to pick up ITEM, changing state.")
                self.state = State.COLLECTING
            
            
            self.obstacle_detected = True
            self.get_logger().info("Obstacle in front turning to avoid")
            msg.linear.x = 0.01
            yaw_error = angles.shortest_angular_distance(self.current_yaw, self.desired_yaw)
            if abs(yaw_error) > math.radians(5): # the yaw tolerance

                msg.angular.z = ANGULAR_VELOCITY if yaw_error > 0 else  -ANGULAR_VELOCITY
            else:
                msg.angular.z = 0.0
                self.get_logger().info("Completed turn")
                self.current_yaw = 0.0 # Reset values
                self.desired_yaw = math.radians(90)
                self.cmd_vel_pub_1.publish(msg)
                self.cmd_vel_pub_2.publish(msg)
                self.cmd_vel_pub_3.publish(msg)

        else:
            self.obstacle_detected = False

        if self.obstacle_detected:
            pass
        else: # Only do movement when no obstacle detected
            
            msg.linear.x = LINEAR_VELOCITY
            msg.angular.z = self.item_to_collect.x / 320.0
            self.cmd_vel_pub_1.publish(msg)
            self.cmd_vel_pub_2.publish(msg)
            self.cmd_vel_pub_3.publish(msg)


            if estimated_distance <= 0.35:
                # If close enough, pick up the ITEM!
                msg.linear.x = 0.1
                msg.angular.z = 0.0
                self.cmd_vel_pub_1.publish(msg)
                self.cmd_vel_pub_2.publish(msg)
                self.cmd_vel_pub_3.publish(msg)
                self.get_logger().info("Close enough to pick up ITEM, changing state.")
                self.state = State.COLLECTING
    
    def navigate_to_zone(self):
        # Used to move robot to its desired zone

        if len(self.zones.data) == 0:
            self.zone_detected = False
            return
        else:
            self.zone_detected = True
            self.zone_to_navigate = self.zones.data[0]

            
        self.get_logger().info(f"Zone info:   {self.zone_to_navigate}" )
        msg = Twist()
        # estimated_distance = 32.4 * float(self.zone_to_navigate.size) ** -0.75
        # self.get_logger().info(f"Estimated distance (ZONE): {estimated_distance}")
        if self.scan_triggered[SCAN_FRONT]:
            self.obstacle_detected = True
            self.get_logger().info("Obstacle in front turning to avoid")
            msg.linear.x = 0.1
            yaw_error = angles.shortest_angular_distance(self.current_yaw, self.desired_yaw)
            if abs(yaw_error) > math.radians(2): # the yaw tolerance

                msg.angular.z = ANGULAR_VELOCITY if yaw_error > 0 else  -ANGULAR_VELOCITY
            else:
                msg.angular.z = 0.0
                self.get_logger().info("Completed turn")
                self.current_yaw = 0.0 # Reset values
                self.desired_yaw = math.radians(90)
        elif self.scan_triggered[SCAN_LEFT]:
            self.obstacle_detected = True
            self.get_logger().info("Obstacle to the left")
            msg.angular.z = -ANGULAR_VELOCITY
        elif self.scan_triggered[SCAN_RIGHT]:
            self.obstacle_detected = True
            self.get_logger().info("Obstacle to the right")
            msg.angular.z = ANGULAR_VELOCITY
        else:
            self.obstacle_detected = False

      
        self.cmd_vel_pub_1.publish(msg)
        self.cmd_vel_pub_2.publish(msg)
        self.cmd_vel_pub_3.publish(msg)
        msg.linear.x = LINEAR_VELOCITY
        msg.angular.z = self.zone_to_navigate.x / 310
        self.cmd_vel_pub_1.publish(msg)
        self.cmd_vel_pub_2.publish(msg)
        self.cmd_vel_pub_3.publish(msg)
        zone_size = self.zone_to_navigate.size

        if zone_size == 1.0:
            self.get_logger().info("IN ZONE")
            msg.linear.x = 0.0
            msg.angular.z = 0.0
            self.cmd_vel_pub_1.publish(msg)
            self.cmd_vel_pub_2.publish(msg)
            self.cmd_vel_pub_3.publish(msg)
            self.search_for_zone_state = False
            self.state = State.OFFLOADING
    

    def destroy_node(self):
        super().destroy_node()


def main(args=None):

    rclpy.init(args = args, signal_handler_options = SignalHandlerOptions.NO)

    node = RobotController()

    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    except ExternalShutdownException:
        sys.exit(1)
    finally:
        node.destroy_node()
        rclpy.try_shutdown()


if __name__ == '__main__':
    main()
Editor is loading...
Leave a Comment