Untitled
unknown
plain_text
2 years ago
5.3 kB
6
Indexable
import rclpy
import numpy as np
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy, QoSDurabilityPolicy
from sensor_msgs.msg import LaserScan
from nav_msgs.msg import OccupancyGrid, MapMetaData, Odometry
from geometry_msgs.msg import Pose, Quaternion
def bresenham(start, end):
"""
Bresenham's Line Algorithm for producing a list of points from start to end (inclusive).
(original from roguebasin.com)
"""
x1, y1 = start
x2, y2 = end
points = []
dx = abs(x2 - x1)
dy = abs(y2 - y1)
x, y = x1, y1
sx = -1 if x1 > x2 else 1
sy = -1 if y1 > y2 else 1
if dx > dy:
err = dx / 2.0
while x != x2:
points.append((x, y))
err -= dy
if err < 0:
y += sy
err += dx
x += sx
else:
err = dy / 2.0
while y != y2:
points.append((x, y))
err -= dx
if err < 0:
x += sx
err += dy
y += sy
points.append((x, y))
return points
class MapUpdater(Node):
def __init__(self):
super().__init__('map_updater')
qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1
)
self.map_pub = self.create_publisher(OccupancyGrid, '/map', qos_profile)
self.lidar_sub = self.create_subscription(LaserScan, '/scan', self.handle_lidar_scan, 10)
self.odom_sub = self.create_subscription(Odometry, '/odom', self.handle_odom, 10)
self.meta = MapMetaData()
self.meta.width = 200
self.meta.height = 200
self.meta.resolution = 0.05
self.meta.origin = Pose()
self.meta.origin.position.x = -self.meta.width * self.meta.resolution / 2
self.meta.origin.position.y = -self.meta.height * self.meta.resolution / 2
self.meta.origin.orientation = Quaternion(x=0.0, y=0.0, z=0.0, w=1.0)
self.grid = -1 * np.ones((self.meta.height, self.meta.width), dtype=np.int8)
self.robot_pose = Pose()
self.robot_x = 0
self.robot_y = 0
timer_period = 1.0 # seconds (1 Hz frequency)
self.timer = self.create_timer(timer_period, self.timer_callback)
def handle_odom(self, msg):
self.robot_pose = msg.pose.pose
def handle_lidar_scan(self, scan):
points = self.convert_scan_to_points(scan)
self.update_map(points)
def timer_callback(self):
self.publish_map()
def convert_scan_to_points(self, scan):
angles = np.arange(scan.angle_min, scan.angle_max, scan.angle_increment)
clean_ranges = np.nan_to_num(scan.ranges, nan=0.0, posinf=0.0, neginf=0.0)
x_coords = clean_ranges * np.cos(angles)
y_coords = clean_ranges * np.sin(angles)
return np.vstack((x_coords, y_coords)).T
def update_map(self, points):
robot_x, robot_y = self.robot_position_in_map()
# Increment for occupied cells
unit = 25
for point in points:
end_x = int(point[0] / self.meta.resolution) + robot_x
end_y = int(point[1] / self.meta.resolution) + robot_y
# Bresenham's algorithm - endpoint, considered as occupied
for free_x, free_y in bresenham((robot_x, robot_y), (end_x, end_y))[:-1]:
if 0 <= free_x < self.meta.width and 0 <= free_y < self.meta.height:
self.grid[free_y, free_x] = 0
# Process the occupied
if 0 <= end_x < self.meta.width and 0 <= end_y < self.meta.height:
prev_value = self.grid[end_y, end_x]
new_value = min(100, prev_value + unit) # Increment and cap at 100 for accumenlation
self.grid[end_y, end_x] = new_value
def robot_position_in_map(self):
self.robot_x = int((self.robot_pose.position.x - self.meta.origin.position.x) / self.meta.resolution)
self.robot_y = int((self.robot_pose.position.y - self.meta.origin.position.y) / self.meta.resolution)
return self.robot_x, self.robot_y
def publish_map(self):
msg = OccupancyGrid()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'map'
msg.info = self.meta
# Create a copy of the grid for output preparation
output_grid = np.full((self.meta.height, self.meta.width), -1, dtype=int)
# Set occupied and free areas based on the thresholds
occupied_indices = self.grid >= 100 # Cells that are fully occupied
free_indices = self.grid == 0 # Cells that are explicitly marked free
output_grid[occupied_indices] = 100
output_grid[free_indices] = 0
msg.data = output_grid.ravel().tolist()
self.map_pub.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = MapUpdater()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Editor is loading...
Leave a Comment