Untitled

 avatar
unknown
plain_text
a year ago
5.3 kB
5
Indexable
import rclpy
import numpy as np
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy, QoSDurabilityPolicy
from sensor_msgs.msg import LaserScan
from nav_msgs.msg import OccupancyGrid, MapMetaData, Odometry
from geometry_msgs.msg import Pose, Quaternion

def bresenham(start, end):
    """
    Bresenham's Line Algorithm for producing a list of points from start to end (inclusive).
    (original from roguebasin.com)
    """
    x1, y1 = start
    x2, y2 = end
    points = []
    dx = abs(x2 - x1)
    dy = abs(y2 - y1)
    x, y = x1, y1
    sx = -1 if x1 > x2 else 1
    sy = -1 if y1 > y2 else 1
    if dx > dy:
        err = dx / 2.0
        while x != x2:
            points.append((x, y))
            err -= dy
            if err < 0:
                y += sy
                err += dx
            x += sx
    else:
        err = dy / 2.0
        while y != y2:
            points.append((x, y))
            err -= dx
            if err < 0:
                x += sx
                err += dy
            y += sy
    points.append((x, y))
    return points

class MapUpdater(Node):
    def __init__(self):
        super().__init__('map_updater')
        qos_profile = QoSProfile(
            reliability=QoSReliabilityPolicy.RELIABLE,
            durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=1
        )
        self.map_pub = self.create_publisher(OccupancyGrid, '/map', qos_profile)
        self.lidar_sub = self.create_subscription(LaserScan, '/scan', self.handle_lidar_scan, 10)
        self.odom_sub = self.create_subscription(Odometry, '/odom', self.handle_odom, 10)

        self.meta = MapMetaData()
        self.meta.width = 200
        self.meta.height = 200
        self.meta.resolution = 0.05
        self.meta.origin = Pose()
        self.meta.origin.position.x = -self.meta.width * self.meta.resolution / 2
        self.meta.origin.position.y = -self.meta.height * self.meta.resolution / 2
        self.meta.origin.orientation = Quaternion(x=0.0, y=0.0, z=0.0, w=1.0)

        self.grid = -1 * np.ones((self.meta.height, self.meta.width), dtype=np.int8)
        self.robot_pose = Pose()

        self.robot_x = 0
        self.robot_y = 0

        timer_period = 1.0  # seconds (1 Hz frequency)
        self.timer = self.create_timer(timer_period, self.timer_callback)

    def handle_odom(self, msg):
        self.robot_pose = msg.pose.pose

    def handle_lidar_scan(self, scan):
        points = self.convert_scan_to_points(scan)
        self.update_map(points)

    def timer_callback(self):
        self.publish_map()

    def convert_scan_to_points(self, scan):
        angles = np.arange(scan.angle_min, scan.angle_max, scan.angle_increment)
        clean_ranges = np.nan_to_num(scan.ranges, nan=0.0, posinf=0.0, neginf=0.0)
        x_coords = clean_ranges * np.cos(angles)
        y_coords = clean_ranges * np.sin(angles)
        return np.vstack((x_coords, y_coords)).T


    def update_map(self, points):
        robot_x, robot_y = self.robot_position_in_map()
        # Increment for occupied cells
        unit = 25  

        for point in points:
            end_x = int(point[0] / self.meta.resolution) + robot_x
            end_y = int(point[1] / self.meta.resolution) + robot_y

            # Bresenham's algorithm - endpoint, considered as occupied
            for free_x, free_y in bresenham((robot_x, robot_y), (end_x, end_y))[:-1]:
                if 0 <= free_x < self.meta.width and 0 <= free_y < self.meta.height:
                    self.grid[free_y, free_x] = 0

            # Process the occupied
            if 0 <= end_x < self.meta.width and 0 <= end_y < self.meta.height:
                prev_value = self.grid[end_y, end_x]
                new_value = min(100, prev_value + unit)  # Increment and cap at 100 for accumenlation
                self.grid[end_y, end_x] = new_value

    
    def robot_position_in_map(self):
        self.robot_x = int((self.robot_pose.position.x - self.meta.origin.position.x) / self.meta.resolution)
        self.robot_y = int((self.robot_pose.position.y - self.meta.origin.position.y) / self.meta.resolution)
        return self.robot_x, self.robot_y

    def publish_map(self):
        msg = OccupancyGrid()
        msg.header.stamp = self.get_clock().now().to_msg()
        msg.header.frame_id = 'map'
        msg.info = self.meta

        # Create a copy of the grid for output preparation
        output_grid = np.full((self.meta.height, self.meta.width), -1, dtype=int)

        # Set occupied and free areas based on the thresholds
        occupied_indices = self.grid >= 100  # Cells that are fully occupied
        free_indices = self.grid == 0        # Cells that are explicitly marked free

        output_grid[occupied_indices] = 100  
        output_grid[free_indices] = 0        

        msg.data = output_grid.ravel().tolist()  
        self.map_pub.publish(msg)

def main(args=None):
    rclpy.init(args=args)
    node = MapUpdater()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()
Editor is loading...
Leave a Comment