Untitled
unknown
plain_text
a year ago
12 kB
2
Indexable
import rclpy import numpy as np import rclpy.node import sensor_msgs import geometry_msgs import matplotlib.pyplot import scipy.spatial.transform import sklearn.neighbors import copy #import my_turtlebot.utils.icp import tf2_ros.transform_broadcaster class localization(rclpy.node.Node): def __init__(self): super().__init__('localization_node') self.lidar_new = None self.lidar_old = None self.pose = [] self.velocity = None #self.position_new = np.array([0, 0, 0]) self.position_old = np.array([0, 0, 0]) self.runs = 100 self.time = 1 self.initial_robot_pose = None self._tf = tf2_ros.TransformBroadcaster(self) self.subscription = self.create_subscription( sensor_msgs.msg.LaserScan, '/scan', self.new_scan, 10 ) self.velocity_subscription = self.create_subscription( geometry_msgs.msg.Twist, '/cmd_vel', self.new_velocity, 10 ) self.timer = self.create_timer(self.time, self.update_and_plot) #reduce update rate to avoid crashes in rviz def new_scan(self,msg): self.lidar_new = self.point_cloud(msg) def new_velocity(self, msg): self.velocity = msg def ICP(self, current_point_cloud, reference_point_cloud, convergence_threshold=10e-6): num_iterations = 0 current_points = current_point_cloud.copy() prev_rmse = 0 translation_vector = np.zeros(3) # Creates a 1D array translation_vector = translation_vector.reshape((3, 1)) rotation_matrix = np.eye(3, 3) while True: nearest_neighbors = self.find_nearest_neighbors(current_points, reference_point_cloud) rotation_matrix, translation_vector = self.register_point_clouds(current_point_cloud, nearest_neighbors) current_points = self.apply_transform(current_point_cloud, rotation_matrix, translation_vector) rmse = self.calculate_rmse(current_points, nearest_neighbors) if np.abs(rmse - prev_rmse) < convergence_threshold: break prev_rmse = rmse num_iterations +=1 return rotation_matrix, translation_vector, num_iterations def calculate_rmse(self,point_cloud_1,point_cloud_2): squared_differences = (point_cloud_1 - point_cloud_2) ** 2 sum_of_squares = np.sum(squared_differences, axis=0) mean_of_squares = np.mean(sum_of_squares) # More efficient than dividing in the sqrt rmse = np.sqrt(mean_of_squares) return rmse def apply_transform(self, point_cloud, rotation_matrix, translation_vector): return np.dot(rotation_matrix, point_cloud) + translation_vector def apply_inverse_transform(self, point_cloud, rotation_matrix, translation_vector): return np.dot(rotation_matrix.T, point_cloud - translation_vector) def calculate_transform_errors(self, rotation_matrix_1, translation_vector_1, rotation_matrix_2, translation_vector_2): rotation_error = np.sum(np.abs(rotation_matrix_1 - rotation_matrix_2)) translation_error = np.sum(np.abs(translation_vector_1 - translation_vector_2)) return rotation_error, translation_error def register_point_clouds(self, source_point_cloud, target_point_cloud): centered_source = source_point_cloud - np.mean(source_point_cloud, axis=1, keepdims=True) centered_target = target_point_cloud - np.mean(target_point_cloud, axis=1, keepdims=True) covariance_matrix = np.dot(centered_source, centered_target.T) U, _, Vh = np.linalg.svd(covariance_matrix) rotation_matrix = np.dot(U, Vh).T # Ensure right-handed coordinate system if np.linalg.det(rotation_matrix) < 0: Vh[2, :] *= -1 rotation_matrix = np.dot(U, Vh).T translation_vector = np.mean(target_point_cloud, axis=1, keepdims=True) - np.dot(rotation_matrix, np.mean(source_point_cloud, axis=1, keepdims=True)) return rotation_matrix, translation_vector def find_nearest_neighbors(self, source_points, target_points): kdtree = sklearn.neighbors.KDTree(target_points.T, leaf_size=30, metric='euclidean') nearest_neighbor_indices = kdtree.query(source_points.T, k=1, return_distance=False).reshape(-1) return target_points[:, nearest_neighbor_indices] # def point_cloud(self, lidar_new): # angles = np.arange(lidar_new.angle_min, lidar_new.angle_max, lidar_new.angle_increment) # ranges = np.array(lidar_new.ranges) # # Filter out invalid (infinite) range readings # valid_indices = np.isfinite(ranges) # Use isfinite for better handling of NaNs # ranges = ranges[valid_indices] # angles = angles[valid_indices] # #MÅSKE SKAL DENNE LOGIK ÆNDRES # # Convert polar coordinates to Cartesian # x = ranges * np.cos(angles) # y = ranges * np.sin(angles) # # Create the 3D point cloud with homogeneous coordinates # points = np.vstack((x, y, np.zeros(x.shape), np.ones(x.shape))) # return points def point_cloud(self, scan): # Generate an array of angles based on the laser scan properties angles = np.linspace(scan.angle_min, scan.angle_max, num=len(scan.ranges)) clean_ranges = np.nan_to_num(scan.ranges, nan=0.0, posinf=0.0, neginf=0.0) # Compute Cartesian coordinates x_coords = clean_ranges * np.cos(angles) y_coords = clean_ranges * np.sin(angles) # Combine x and y coordinates into a 2D array points = np.column_stack((x_coords, y_coords)) # Convert to 3D points in homogeneous coordinates (add a z-coordinate of 0 and a homogeneous coordinate of 1) points_3d_homogeneous = np.vstack((points.T, np.zeros(points.shape[0]), np.ones(points.shape[0]))) return points_3d_homogeneous def prediction_model_position(self): if len(self.pose) < 2: return np.eye(4) # No prediction possible, return identity # Calculate relative transformation based on the last two pose previous_pose = self.pose[-2] current_pose = self.pose[-1] relative_transformation = np.linalg.inv(previous_pose) @ current_pose return relative_transformation def prediction_model_velocity(self): T = np.eye(4) # Translation component based on linear velocity T[:3, 3] = np.array([self.velocity.linear.x, 0.0, 0.0]) * self.time # Rotation component based on angular velocity rotation_z = scipy.spatial.transform.Rotation.from_euler('z', self.velocity.angular.z * self.time, degrees=False) T[:3, :3] = rotation_z.as_matrix() return T def position_new(self, lidar_new): # Calculates the prediction prediction_transform = self.prediction_model_position() # Reuse refactored function previous_pose = self.pose[-1] if self.pose else np.eye(4) if self.lidar_old is None: self.lidar_old = prediction_transform @ lidar_new return prediction_transform # Prepare data for ICP initial_guess = previous_pose @ prediction_transform source_points = initial_guess @ lidar_new target_points = self.lidar_old # # ICP Matching rotation, translation, _ = self.ICP(source_points[:3, :], target_points[:3, :], self.runs) refinement_transform = np.eye(4) refinement_transform[:3, :3] = rotation refinement_transform[:3, 3] = translation.reshape(-1) # Update pose estimate refined_pose = refinement_transform @ previous_pose @ prediction_transform self.lidar_old = refined_pose @ lidar_new self.pose.append(refined_pose) # if self.initial_robot_pose is None: # Store the initial pose # self.initial_robot_pose = refined_pose.copy() return refined_pose def visualize_point_clouds(self, transformed_points, reference_points, prev_robot_pose, current_robot_pose): fig, ax = matplotlib.pyplot.subplots(1, 1, figsize=(10, 10)) # Scatter plots ax.scatter(self.lidar_old[0, :], self.lidar_old[1, :], c='green', label=r'previous_scan') # Previous LiDAR scan ax.scatter(transformed_points[0, :], transformed_points[1, :], c='red', label=r'aligned_scan', s = 12) # Transformed points ax.scatter(reference_points[0, :], reference_points[1, :], c='blue', label=r'reference_scan', s = 12) # Reference points # Robot pose markers # ax.scatter(self.initial_robot_pose[0, 3], self.initial_robot_pose[1, 3], # c='black', label='previous_robot_pose', s = 100, marker='o') ax.scatter(current_robot_pose[0, 3], current_robot_pose[1, 3], c='black', label='current_robot_pose', s = 100, marker='X') # Formatting ax.set_xlabel('x') ax.set_ylabel('y') velocity = prev_robot_pose[0, 3] / self.time # Calculate directly #ax.set_title(f"Current speed in x direction: {velocity:.2f} m/s") # f-string formatting speed_text = ax.text(0.7, 0.95, f"Current speed in x direction: {velocity:.2f} m/s", ha='center', va='top', transform=ax.transAxes, bbox=dict(facecolor='white', edgecolor='gray', alpha=0.8)) ax.legend() ax.set_xlim(-10, 10) ax.set_ylim(-5, 5) matplotlib.pyplot.savefig('pointclouds.png') # Show the plot interactively # matplotlib.pyplot.show() def transform_broadcaster(self,position,rotation,frame_id="world"): transform_msg = geometry_msgs.msg.TransformStamped() transform_msg.header.stamp = self.get_clock().now().to_msg() transform_msg.header.frame_id = frame_id transform_msg.child_frame_id = 'lidar' transform_msg.transform.translation.x = position[0] transform_msg.transform.translation.y = position[1] transform_msg.transform.translation.z = position[2] quaternion = scipy.spatial.transform.Rotation.from_euler('z', rotation).as_quat() transform_msg.transform.rotation.x = quaternion[0] transform_msg.transform.rotation.y = quaternion[1] transform_msg.transform.rotation.z = quaternion[2] transform_msg.transform.rotation.w = quaternion[3] self._tf.sendTransform(transform_msg) def update_and_plot(self): if self.velocity is not None and self.lidar_new is not None: lidar_new = copy.deepcopy(self.lidar_new) new_pose = self.position_new(lidar_new) # Use refactored function self.position_old = new_pose[:3, 3] self.position_old[2] = scipy.spatial.transform.Rotation.from_matrix(new_pose[:3, :3]).as_euler('xyz')[-1] #if self.get_clock().now() == 0: #update every 5 seconds transformed_points = new_pose @ self.lidar_old self.visualize_point_clouds(transformed_points, self.lidar_old, new_pose, new_pose) # Broadcast Transform and Log Info self.transform_broadcaster(new_pose[:3, 3], self.position_old[2]) self.get_logger().info(f"New position: \n{self.position_old}") def main(args=None): rclpy.init(args=args) node = localization() rclpy.spin(node) rclpy.shutdown()
Editor is loading...
Leave a Comment