Untitled
unknown
plain_text
a year ago
5.3 kB
5
Indexable
import rclpy import numpy as np from rclpy.node import Node from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy, QoSDurabilityPolicy from sensor_msgs.msg import LaserScan from nav_msgs.msg import OccupancyGrid, MapMetaData, Odometry from geometry_msgs.msg import Pose, Quaternion def bresenham(start, end): """ Bresenham's Line Algorithm for producing a list of points from start to end (inclusive). (original from roguebasin.com) """ x1, y1 = start x2, y2 = end points = [] dx = abs(x2 - x1) dy = abs(y2 - y1) x, y = x1, y1 sx = -1 if x1 > x2 else 1 sy = -1 if y1 > y2 else 1 if dx > dy: err = dx / 2.0 while x != x2: points.append((x, y)) err -= dy if err < 0: y += sy err += dx x += sx else: err = dy / 2.0 while y != y2: points.append((x, y)) err -= dx if err < 0: x += sx err += dy y += sy points.append((x, y)) return points class MapUpdater(Node): def __init__(self): super().__init__('map_updater') qos_profile = QoSProfile( reliability=QoSReliabilityPolicy.RELIABLE, durability=QoSDurabilityPolicy.TRANSIENT_LOCAL, history=QoSHistoryPolicy.KEEP_LAST, depth=1 ) self.map_pub = self.create_publisher(OccupancyGrid, '/map', qos_profile) self.lidar_sub = self.create_subscription(LaserScan, '/scan', self.handle_lidar_scan, 10) self.odom_sub = self.create_subscription(Odometry, '/odom', self.handle_odom, 10) self.meta = MapMetaData() self.meta.width = 200 self.meta.height = 200 self.meta.resolution = 0.05 self.meta.origin = Pose() self.meta.origin.position.x = -self.meta.width * self.meta.resolution / 2 self.meta.origin.position.y = -self.meta.height * self.meta.resolution / 2 self.meta.origin.orientation = Quaternion(x=0.0, y=0.0, z=0.0, w=1.0) self.grid = -1 * np.ones((self.meta.height, self.meta.width), dtype=np.int8) self.robot_pose = Pose() self.robot_x = 0 self.robot_y = 0 timer_period = 1.0 # seconds (1 Hz frequency) self.timer = self.create_timer(timer_period, self.timer_callback) def handle_odom(self, msg): self.robot_pose = msg.pose.pose def handle_lidar_scan(self, scan): points = self.convert_scan_to_points(scan) self.update_map(points) def timer_callback(self): self.publish_map() def convert_scan_to_points(self, scan): angles = np.arange(scan.angle_min, scan.angle_max, scan.angle_increment) clean_ranges = np.nan_to_num(scan.ranges, nan=0.0, posinf=0.0, neginf=0.0) x_coords = clean_ranges * np.cos(angles) y_coords = clean_ranges * np.sin(angles) return np.vstack((x_coords, y_coords)).T def update_map(self, points): robot_x, robot_y = self.robot_position_in_map() # Increment for occupied cells unit = 25 for point in points: end_x = int(point[0] / self.meta.resolution) + robot_x end_y = int(point[1] / self.meta.resolution) + robot_y # Bresenham's algorithm - endpoint, considered as occupied for free_x, free_y in bresenham((robot_x, robot_y), (end_x, end_y))[:-1]: if 0 <= free_x < self.meta.width and 0 <= free_y < self.meta.height: self.grid[free_y, free_x] = 0 # Process the occupied if 0 <= end_x < self.meta.width and 0 <= end_y < self.meta.height: prev_value = self.grid[end_y, end_x] new_value = min(100, prev_value + unit) # Increment and cap at 100 for accumenlation self.grid[end_y, end_x] = new_value def robot_position_in_map(self): self.robot_x = int((self.robot_pose.position.x - self.meta.origin.position.x) / self.meta.resolution) self.robot_y = int((self.robot_pose.position.y - self.meta.origin.position.y) / self.meta.resolution) return self.robot_x, self.robot_y def publish_map(self): msg = OccupancyGrid() msg.header.stamp = self.get_clock().now().to_msg() msg.header.frame_id = 'map' msg.info = self.meta # Create a copy of the grid for output preparation output_grid = np.full((self.meta.height, self.meta.width), -1, dtype=int) # Set occupied and free areas based on the thresholds occupied_indices = self.grid >= 100 # Cells that are fully occupied free_indices = self.grid == 0 # Cells that are explicitly marked free output_grid[occupied_indices] = 100 output_grid[free_indices] = 0 msg.data = output_grid.ravel().tolist() self.map_pub.publish(msg) def main(args=None): rclpy.init(args=args) node = MapUpdater() rclpy.spin(node) node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()
Editor is loading...
Leave a Comment