Untitled
unknown
plain_text
10 months ago
19 kB
8
Indexable
import sys
import random
import math
from enum import Enum
import rclpy
from rclpy.node import Node
from rclpy.signals import SignalHandlerOptions
from rclpy.executors import ExternalShutdownException
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.executors import ExternalShutdownException, MultiThreadedExecutor
from rclpy.qos import QoSPresetProfiles
from sensor_msgs.msg import Image
from sensor_msgs.msg import LaserScan
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Twist, Pose
from tf_transformations import euler_from_quaternion
import angles
from rclpy.publisher import Publisher
from auro_interfaces.srv import ItemRequest
from assessment_interfaces.msg import ItemHolders, ItemLog, ItemList, RobotList, ZoneList
'''
GLOBAL VARIABLE DECLERATION
'''
LINEAR_VELOCITY = 0.3
ANGULAR_VELOCITY = 0.3
TURN_LEFT = 1
TURN_RIGHT = -1
SCAN_THRESHOLD = 0.5
SCAN_FRONT = 0
SCAN_LEFT = 1
SCAN_BACK = 2
SCAN_RIGHT = 3
class State(Enum):
NAVIGATE_ITEM = 0
NAVIGATE_ZONE = 1
COLLECTING = 2
OFFLOADING = 3
SEARCHING = 4
class RobotController(Node):
def __init__(self):
super().__init__('robot_controller')
self.state = State.SEARCHING
self.declare_parameter('x', 0.0)
self.declare_parameter('y', 0.0)
self.declare_parameter('yaw', 0.0)
self.initial_x = self.get_parameter('x').get_parameter_value().double_value
self.initial_y = self.get_parameter('y').get_parameter_value().double_value
self.initial_yaw = self.get_parameter('yaw').get_parameter_value().double_value
# For turning 90 degrees
self.current_yaw = 0.0
self.desired_yaw = math.radians(90)
self.scan_triggered = [False] * 4
self.SCAN_FRONT = 0
self.SCAN_LEFT = 1
self.SCAN_BACK = 2
self.SCAN_RIGHT = 3
self.obstacle_detected = False
self.item_to_collect = None
self.zone_to_navigate = None
self.item_detected = False
self.zone_detected = False
self.search_for_item_state = True
self.search_for_zone_state = False
self.items = ItemList()
self.zones = ZoneList()
item_callback_group = MutuallyExclusiveCallbackGroup()
timer_callback_group = MutuallyExclusiveCallbackGroup()
client_callback_group = MutuallyExclusiveCallbackGroup()
timer_callback_group = MutuallyExclusiveCallbackGroup()
self.timer_period = 0.1 # 100 milliseconds = 10 Hz
self.timer = self.create_timer(self.timer_period, self.control_loop)
# Publisher and subscribers
self.create_subscription(ItemHolders, '/item_holders', self.item_holders, 10, callback_group=timer_callback_group)
self.create_subscription(ItemLog, '/item_log', self.item_log, 10, callback_group=timer_callback_group)
self.cmd_vel_pub_1 = self.create_publisher(Twist, '/robot1/cmd_vel', 10)
self.create_subscription(Odometry, '/robot1/odom', self.odom_callback, 10, callback_group=timer_callback_group)
self.create_subscription(LaserScan, '/robot1/scan', self.scan_callback, QoSPresetProfiles.SENSOR_DATA.value, callback_group=timer_callback_group)
self.create_subscription(ItemList, '/robot1/items', self.item_callback, 10, callback_group=timer_callback_group)
self.create_subscription(RobotList, '/robot1/robots', self.robot_callback, 10, callback_group=timer_callback_group)
self.create_subscription(ZoneList, '/robot1/zone', self.zone_callback, 10, callback_group=timer_callback_group)
try:
self.cmd_vel_pub_2 = self.create_publisher(Twist, '/robot2/cmd_vel', 10)
self.create_subscription(Odometry, '/robot2/odom', self.odom_callback, 10, callback_group=timer_callback_group)
self.create_subscription(LaserScan, '/robot2/scan', self.scan_callback, QoSPresetProfiles.SENSOR_DATA.value, callback_group=timer_callback_group)
self.create_subscription(ItemList, '/robot2/items', self.item_callback, 10, callback_group=timer_callback_group)
self.create_subscription(RobotList, '/robot2/robots', self.robots, 10, callback_group=timer_callback_group)
self.create_subscription(ZoneList, '/robot2/zone', self.zone, 10, callback_group=timer_callback_group)
except:
pass
try:
self.cmd_vel_pub_3 = self.create_publisher(Twist, '/robot3/cmd_vel', 10)
self.create_subscription(Odometry, '/robot3/odom', self.odom_callback, 10, callback_group=timer_callback_group)
self.create_subscription(LaserScan, '/robot3/scan', self.scan_callback, QoSPresetProfiles.SENSOR_DATA.value, callback_group=timer_callback_group)
self.create_subscription(ItemList, '/robot3/items', self.item_callback, 10, callback_group=timer_callback_group)
self.create_subscription(RobotList, '/robot3/robots', self.robots, 10, callback_group=timer_callback_group)
self.create_subscription(ZoneList, '/robot3/zone', self.zone, 10, callback_group=timer_callback_group)
except:
pass
# Service Clients
self.pick_up_service = self.create_client(ItemRequest, '/pick_up_item', callback_group=item_callback_group)
self.offload_service = self.create_client(ItemRequest, '/offload_item', callback_group=item_callback_group)
'''
CALLBACKS FOR SUBSCRIPTIONS
'''
def item_holders(self, msg):
# Detect number of robots in map for later subscriptions
pass
# Detect number of robots
robot_amount = len(msg.data)
# self.get_logger().info(f"XXXXXXXXXXXXXXX: {len(msg.data)}")
if robot_amount == 1:
# self.get_logger().info("There is 1 robot")
pass
elif robot_amount == 2:
# self.get_logger().info("There are 2 robots")
pass
elif robot_amount == 3:
# self.get_logger().info("There are 3 robot")
pass
else:
# self.get_logger().info("ERROR")
pass
def odom_callback(self, msg):
self.pose = msg.pose.pose
(roll, pitch, yaw) = euler_from_quaternion([self.pose.orientation.x,
self.pose.orientation.y,
self.pose.orientation.z,
self.pose.orientation.w])
self.yaw = yaw
def scan_callback(self, msg):
front_ranges = msg.ranges[331:359] + msg.ranges[0:30]
left_ranges = msg.ranges[31:90]
back_ranges = msg.ranges[91:270]
right_ranges = msg.ranges[271:330]
self.scan_triggered[SCAN_FRONT] = min(front_ranges) < SCAN_THRESHOLD
self.scan_triggered[SCAN_LEFT] = min(left_ranges) < SCAN_THRESHOLD
self.scan_triggered[SCAN_BACK] = min(back_ranges) < SCAN_THRESHOLD
self.scan_triggered[SCAN_RIGHT] = min(right_ranges) < SCAN_THRESHOLD
def item_callback(self, msg):
self.items = msg
def item_log(self, msg):
self.item_log = msg
def robot_callback(self, msg):
self.robots = msg
def zone_callback(self, msg):
self.zones = msg
'''
MAIN FSM LOGIC
How it works:
(1) Search for nearest item
(2) Go to item and pick up
(3) Check item is available (i.e Zone's not occupied of that colour yet)
(3a) If item colour belongs to Zone, Return item to that zone -> 5
(3b) Choose nearest Zone detected to offload item
(4) Move item to nearest Zone
(5) Offload Item
(6) Return to search for Item
(7) Repeat
'''
def control_loop(self):
match self.state:
case State.NAVIGATE_ITEM:
# When we want to find an item but don't currently hold one, we will be in this state
self.get_logger().info("In item navigation state")
self.navigate_to_item()
case State.NAVIGATE_ZONE:
# When we have collected the item and want to navigate to a zone, we will be in this state
self.get_logger().info("In zone navigation state")
self.navigate_to_zone()
case State.COLLECTING:
# Collect the item and check if it was a success or not
self.get_logger().info("In collecting state")
request = ItemRequest.Request()
request.robot_id = 'robot1'
future = self.pick_up_service.call_async(request)
self.executor.spin_until_future_complete(future)
response = future.result()
if response.success:
self.get_logger().info("Item successfully picked up")
self.search_for_item_state = False
self.search_for_zone_state = True
self.state = State.SEARCHING
else:
self.get_logger().info(f"Failed to pick up item: {response}")
case State.OFFLOADING:
# We're in a zone and now need to offload the item
self.get_logger().info("In offloading state")
request = ItemRequest.Request()
request.robot_id = 'robot1'
future = self.offload_service.call_async(request)
self.executor.spin_until_future_complete(future)
response = future.result()
if response.success:
self.get_logger().info("Item successfully offloaded")
self.search_for_zone_state = False
self.zone_to_navigate = False
self.search_for_item_state = True
self.item_detected = False
self.state = State.SEARCHING
else:
self.get_logger.info(f"Failed to off load item: {response}")
case State.SEARCHING:
# Need to move appropriately depending on the given context
# of what or where we need to search for
# self.get_logger().info("in searching state")
# (a) - items
if len(self.items.data) == 0:
self.item_detected = False
else:
self.item_detected = True
if self.search_for_item_state and not self.item_detected:
# Need to move until item detected
msg = Twist()
yaw_error = angles.shortest_angular_distance(self.current_yaw, self.desired_yaw)
# Add a tollerance to our turning
if abs(yaw_error) > math.radians(2): # the yaw tolerance
msg.angular.z = ANGULAR_VELOCITY if yaw_error > 0 else -ANGULAR_VELOCITY
self.get_logger().info("TURNING 90 DEGREES")
else:
msg.angular.z = 0.0
self.get_logger().info("Completed turn")
self.current_yaw = 0.0 # Reset values
self.desired_yaw = math.radians(90)
self.cmd_vel_pub_1.publish(msg)
self.cmd_vel_pub_2.publish(msg)
self.cmd_vel_pub_3.publish(msg)
elif self.search_for_item_state and self.item_detected:
self.state = State.NAVIGATE_ITEM
# (b) - zones
if len(self.zones.data) == 0:
self.zone_detected = False
else:
self.zone_detected = True
if self.search_for_zone_state and not self.zone_detected:
# Need to move until zone detected
msg = Twist()
yaw_error = angles.shortest_angular_distance(self.current_yaw, self.desired_yaw)
if abs(yaw_error) > math.radians(2): # the yaw tolerances
msg.angular.z = ANGULAR_VELOCITY if yaw_error > 0 else -ANGULAR_VELOCITY
self.get_logger().info("TURNING 90 DEGREES")
else:
msg.angular.z = 0.0
self.get_logger().info("Completed turn")
self.current_yaw = 0.0 # Reset values
self.desired_yaw = math.radians(90)
self.cmd_vel_pub_1.publish(msg)
self.cmd_vel_pub_2.publish(msg)
self.cmd_vel_pub_3.publish(msg)
elif self.search_for_zone_state and self.zone_detected:
self.state = State.NAVIGATE_ZONE
def navigate_to_item(self):
# Used to move robot to item detected
# self.get_logger().info(f"self.items = {self.items}")
if len(self.items.data) == 0:
# If robot sensor doesn't detect anything, move random until detected
self.item_detected = False
return
else:
self.item_detected = True
self.item_to_collect = self.items.data[0]
# Use scanner to avoid collisions
msg = Twist()
# self.get_logger().info(f"Navigating to item: {self.item_to_collect}")
estimated_distance = 32.4 * float(self.item_to_collect.diameter) ** -0.75
self.get_logger().info(f'Estimated distance {estimated_distance}')
if self.scan_triggered[SCAN_FRONT] or self.scan_triggered[SCAN_LEFT] or self.scan_triggered[SCAN_RIGHT]:
# Need to also check this in the detection state as otherwise ignores
# close object by isolating states
if estimated_distance <= 0.35:
# If close enough, pick up the ITEM!
msg.linear.x = 0.1
msg.angular.z = 0.0
self.cmd_vel_pub_1.publish(msg)
self.cmd_vel_pub_2.publish(msg)
self.cmd_vel_pub_3.publish(msg)
self.obstacle_detected=False
self.get_logger().info("Close enough to pick up ITEM, changing state.")
self.state = State.COLLECTING
self.obstacle_detected = True
self.get_logger().info("Obstacle in front turning to avoid")
msg.linear.x = 0.01
yaw_error = angles.shortest_angular_distance(self.current_yaw, self.desired_yaw)
if abs(yaw_error) > math.radians(5): # the yaw tolerance
msg.angular.z = ANGULAR_VELOCITY if yaw_error > 0 else -ANGULAR_VELOCITY
else:
msg.angular.z = 0.0
self.get_logger().info("Completed turn")
self.current_yaw = 0.0 # Reset values
self.desired_yaw = math.radians(90)
self.cmd_vel_pub_1.publish(msg)
self.cmd_vel_pub_2.publish(msg)
self.cmd_vel_pub_3.publish(msg)
else:
self.obstacle_detected = False
if self.obstacle_detected:
pass
else: # Only do movement when no obstacle detected
msg.linear.x = LINEAR_VELOCITY
msg.angular.z = self.item_to_collect.x / 320.0
self.cmd_vel_pub_1.publish(msg)
self.cmd_vel_pub_2.publish(msg)
self.cmd_vel_pub_3.publish(msg)
if estimated_distance <= 0.35:
# If close enough, pick up the ITEM!
msg.linear.x = 0.1
msg.angular.z = 0.0
self.cmd_vel_pub_1.publish(msg)
self.cmd_vel_pub_2.publish(msg)
self.cmd_vel_pub_3.publish(msg)
self.get_logger().info("Close enough to pick up ITEM, changing state.")
self.state = State.COLLECTING
def navigate_to_zone(self):
# Used to move robot to its desired zone
if len(self.zones.data) == 0:
self.zone_detected = False
return
else:
self.zone_detected = True
self.zone_to_navigate = self.zones.data[0]
self.get_logger().info(f"Zone info: {self.zone_to_navigate}" )
msg = Twist()
# estimated_distance = 32.4 * float(self.zone_to_navigate.size) ** -0.75
# self.get_logger().info(f"Estimated distance (ZONE): {estimated_distance}")
if self.scan_triggered[SCAN_FRONT]:
self.obstacle_detected = True
self.get_logger().info("Obstacle in front turning to avoid")
msg.linear.x = 0.1
yaw_error = angles.shortest_angular_distance(self.current_yaw, self.desired_yaw)
if abs(yaw_error) > math.radians(2): # the yaw tolerance
msg.angular.z = ANGULAR_VELOCITY if yaw_error > 0 else -ANGULAR_VELOCITY
else:
msg.angular.z = 0.0
self.get_logger().info("Completed turn")
self.current_yaw = 0.0 # Reset values
self.desired_yaw = math.radians(90)
elif self.scan_triggered[SCAN_LEFT]:
self.obstacle_detected = True
self.get_logger().info("Obstacle to the left")
msg.angular.z = -ANGULAR_VELOCITY
elif self.scan_triggered[SCAN_RIGHT]:
self.obstacle_detected = True
self.get_logger().info("Obstacle to the right")
msg.angular.z = ANGULAR_VELOCITY
else:
self.obstacle_detected = False
self.cmd_vel_pub_1.publish(msg)
self.cmd_vel_pub_2.publish(msg)
self.cmd_vel_pub_3.publish(msg)
msg.linear.x = LINEAR_VELOCITY
msg.angular.z = self.zone_to_navigate.x / 310
self.cmd_vel_pub_1.publish(msg)
self.cmd_vel_pub_2.publish(msg)
self.cmd_vel_pub_3.publish(msg)
zone_size = self.zone_to_navigate.size
if zone_size == 1.0:
self.get_logger().info("IN ZONE")
msg.linear.x = 0.0
msg.angular.z = 0.0
self.cmd_vel_pub_1.publish(msg)
self.cmd_vel_pub_2.publish(msg)
self.cmd_vel_pub_3.publish(msg)
self.search_for_zone_state = False
self.state = State.OFFLOADING
def destroy_node(self):
super().destroy_node()
def main(args=None):
rclpy.init(args = args, signal_handler_options = SignalHandlerOptions.NO)
node = RobotController()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
except ExternalShutdownException:
sys.exit(1)
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == '__main__':
main()Editor is loading...
Leave a Comment