Untitled
unknown
plain_text
2 years ago
12 kB
6
Indexable
import rclpy
import numpy as np
import rclpy.node
import sensor_msgs
import geometry_msgs
import matplotlib.pyplot
import scipy.spatial.transform
import sklearn.neighbors
import copy
#import my_turtlebot.utils.icp
import tf2_ros.transform_broadcaster
class localization(rclpy.node.Node):
def __init__(self):
super().__init__('localization_node')
self.lidar_new = None
self.lidar_old = None
self.pose = []
self.velocity = None
#self.position_new = np.array([0, 0, 0])
self.position_old = np.array([0, 0, 0])
self.runs = 100
self.time = 1
self.initial_robot_pose = None
self._tf = tf2_ros.TransformBroadcaster(self)
self.subscription = self.create_subscription(
sensor_msgs.msg.LaserScan,
'/scan',
self.new_scan,
10
)
self.velocity_subscription = self.create_subscription(
geometry_msgs.msg.Twist,
'/cmd_vel',
self.new_velocity,
10
)
self.timer = self.create_timer(self.time, self.update_and_plot) #reduce update rate to avoid crashes in rviz
def new_scan(self,msg):
self.lidar_new = self.point_cloud(msg)
def new_velocity(self, msg):
self.velocity = msg
def ICP(self, current_point_cloud, reference_point_cloud, convergence_threshold=10e-6):
num_iterations = 0
current_points = current_point_cloud.copy()
prev_rmse = 0
translation_vector = np.zeros(3) # Creates a 1D array
translation_vector = translation_vector.reshape((3, 1))
rotation_matrix = np.eye(3, 3)
while True:
nearest_neighbors = self.find_nearest_neighbors(current_points, reference_point_cloud)
rotation_matrix, translation_vector = self.register_point_clouds(current_point_cloud, nearest_neighbors)
current_points = self.apply_transform(current_point_cloud, rotation_matrix, translation_vector)
rmse = self.calculate_rmse(current_points, nearest_neighbors)
if np.abs(rmse - prev_rmse) < convergence_threshold:
break
prev_rmse = rmse
num_iterations +=1
return rotation_matrix, translation_vector, num_iterations
def calculate_rmse(self,point_cloud_1,point_cloud_2):
squared_differences = (point_cloud_1 - point_cloud_2) ** 2
sum_of_squares = np.sum(squared_differences, axis=0)
mean_of_squares = np.mean(sum_of_squares) # More efficient than dividing in the sqrt
rmse = np.sqrt(mean_of_squares)
return rmse
def apply_transform(self, point_cloud, rotation_matrix, translation_vector):
return np.dot(rotation_matrix, point_cloud) + translation_vector
def apply_inverse_transform(self, point_cloud, rotation_matrix, translation_vector):
return np.dot(rotation_matrix.T, point_cloud - translation_vector)
def calculate_transform_errors(self, rotation_matrix_1, translation_vector_1, rotation_matrix_2, translation_vector_2):
rotation_error = np.sum(np.abs(rotation_matrix_1 - rotation_matrix_2))
translation_error = np.sum(np.abs(translation_vector_1 - translation_vector_2))
return rotation_error, translation_error
def register_point_clouds(self, source_point_cloud, target_point_cloud):
centered_source = source_point_cloud - np.mean(source_point_cloud, axis=1, keepdims=True)
centered_target = target_point_cloud - np.mean(target_point_cloud, axis=1, keepdims=True)
covariance_matrix = np.dot(centered_source, centered_target.T)
U, _, Vh = np.linalg.svd(covariance_matrix)
rotation_matrix = np.dot(U, Vh).T
# Ensure right-handed coordinate system
if np.linalg.det(rotation_matrix) < 0:
Vh[2, :] *= -1
rotation_matrix = np.dot(U, Vh).T
translation_vector = np.mean(target_point_cloud, axis=1, keepdims=True) - np.dot(rotation_matrix, np.mean(source_point_cloud, axis=1, keepdims=True))
return rotation_matrix, translation_vector
def find_nearest_neighbors(self, source_points, target_points):
kdtree = sklearn.neighbors.KDTree(target_points.T, leaf_size=30, metric='euclidean')
nearest_neighbor_indices = kdtree.query(source_points.T, k=1, return_distance=False).reshape(-1)
return target_points[:, nearest_neighbor_indices]
# def point_cloud(self, lidar_new):
# angles = np.arange(lidar_new.angle_min, lidar_new.angle_max, lidar_new.angle_increment)
# ranges = np.array(lidar_new.ranges)
# # Filter out invalid (infinite) range readings
# valid_indices = np.isfinite(ranges) # Use isfinite for better handling of NaNs
# ranges = ranges[valid_indices]
# angles = angles[valid_indices]
# #MÅSKE SKAL DENNE LOGIK ÆNDRES
# # Convert polar coordinates to Cartesian
# x = ranges * np.cos(angles)
# y = ranges * np.sin(angles)
# # Create the 3D point cloud with homogeneous coordinates
# points = np.vstack((x, y, np.zeros(x.shape), np.ones(x.shape)))
# return points
def point_cloud(self, scan):
# Generate an array of angles based on the laser scan properties
angles = np.linspace(scan.angle_min, scan.angle_max, num=len(scan.ranges))
clean_ranges = np.nan_to_num(scan.ranges, nan=0.0, posinf=0.0, neginf=0.0)
# Compute Cartesian coordinates
x_coords = clean_ranges * np.cos(angles)
y_coords = clean_ranges * np.sin(angles)
# Combine x and y coordinates into a 2D array
points = np.column_stack((x_coords, y_coords))
# Convert to 3D points in homogeneous coordinates (add a z-coordinate of 0 and a homogeneous coordinate of 1)
points_3d_homogeneous = np.vstack((points.T, np.zeros(points.shape[0]), np.ones(points.shape[0])))
return points_3d_homogeneous
def prediction_model_position(self):
if len(self.pose) < 2:
return np.eye(4) # No prediction possible, return identity
# Calculate relative transformation based on the last two pose
previous_pose = self.pose[-2]
current_pose = self.pose[-1]
relative_transformation = np.linalg.inv(previous_pose) @ current_pose
return relative_transformation
def prediction_model_velocity(self):
T = np.eye(4)
# Translation component based on linear velocity
T[:3, 3] = np.array([self.velocity.linear.x, 0.0, 0.0]) * self.time
# Rotation component based on angular velocity
rotation_z = scipy.spatial.transform.Rotation.from_euler('z', self.velocity.angular.z * self.time, degrees=False)
T[:3, :3] = rotation_z.as_matrix()
return T
def position_new(self, lidar_new):
# Calculates the prediction
prediction_transform = self.prediction_model_position() # Reuse refactored function
previous_pose = self.pose[-1] if self.pose else np.eye(4)
if self.lidar_old is None:
self.lidar_old = prediction_transform @ lidar_new
return prediction_transform
# Prepare data for ICP
initial_guess = previous_pose @ prediction_transform
source_points = initial_guess @ lidar_new
target_points = self.lidar_old
#
# ICP Matching
rotation, translation, _ = self.ICP(source_points[:3, :], target_points[:3, :], self.runs)
refinement_transform = np.eye(4)
refinement_transform[:3, :3] = rotation
refinement_transform[:3, 3] = translation.reshape(-1)
# Update pose estimate
refined_pose = refinement_transform @ previous_pose @ prediction_transform
self.lidar_old = refined_pose @ lidar_new
self.pose.append(refined_pose)
# if self.initial_robot_pose is None: # Store the initial pose
# self.initial_robot_pose = refined_pose.copy()
return refined_pose
def visualize_point_clouds(self,
transformed_points,
reference_points,
prev_robot_pose,
current_robot_pose):
fig, ax = matplotlib.pyplot.subplots(1, 1, figsize=(10, 10))
# Scatter plots
ax.scatter(self.lidar_old[0, :], self.lidar_old[1, :],
c='green', label=r'previous_scan') # Previous LiDAR scan
ax.scatter(transformed_points[0, :], transformed_points[1, :],
c='red', label=r'aligned_scan', s = 12) # Transformed points
ax.scatter(reference_points[0, :], reference_points[1, :],
c='blue', label=r'reference_scan', s = 12) # Reference points
# Robot pose markers
# ax.scatter(self.initial_robot_pose[0, 3], self.initial_robot_pose[1, 3],
# c='black', label='previous_robot_pose', s = 100, marker='o')
ax.scatter(current_robot_pose[0, 3], current_robot_pose[1, 3],
c='black', label='current_robot_pose', s = 100, marker='X')
# Formatting
ax.set_xlabel('x')
ax.set_ylabel('y')
velocity = prev_robot_pose[0, 3] / self.time # Calculate directly
#ax.set_title(f"Current speed in x direction: {velocity:.2f} m/s") # f-string formatting
speed_text = ax.text(0.7, 0.95, f"Current speed in x direction: {velocity:.2f} m/s",
ha='center', va='top', transform=ax.transAxes,
bbox=dict(facecolor='white', edgecolor='gray', alpha=0.8))
ax.legend()
ax.set_xlim(-10, 10)
ax.set_ylim(-5, 5)
matplotlib.pyplot.savefig('pointclouds.png')
# Show the plot interactively
# matplotlib.pyplot.show()
def transform_broadcaster(self,position,rotation,frame_id="world"):
transform_msg = geometry_msgs.msg.TransformStamped()
transform_msg.header.stamp = self.get_clock().now().to_msg()
transform_msg.header.frame_id = frame_id
transform_msg.child_frame_id = 'lidar'
transform_msg.transform.translation.x = position[0]
transform_msg.transform.translation.y = position[1]
transform_msg.transform.translation.z = position[2]
quaternion = scipy.spatial.transform.Rotation.from_euler('z', rotation).as_quat()
transform_msg.transform.rotation.x = quaternion[0]
transform_msg.transform.rotation.y = quaternion[1]
transform_msg.transform.rotation.z = quaternion[2]
transform_msg.transform.rotation.w = quaternion[3]
self._tf.sendTransform(transform_msg)
def update_and_plot(self):
if self.velocity is not None and self.lidar_new is not None:
lidar_new = copy.deepcopy(self.lidar_new)
new_pose = self.position_new(lidar_new) # Use refactored function
self.position_old = new_pose[:3, 3]
self.position_old[2] = scipy.spatial.transform.Rotation.from_matrix(new_pose[:3, :3]).as_euler('xyz')[-1]
#if self.get_clock().now() == 0: #update every 5 seconds
transformed_points = new_pose @ self.lidar_old
self.visualize_point_clouds(transformed_points,
self.lidar_old,
new_pose,
new_pose)
# Broadcast Transform and Log Info
self.transform_broadcaster(new_pose[:3, 3], self.position_old[2])
self.get_logger().info(f"New position: \n{self.position_old}")
def main(args=None):
rclpy.init(args=args)
node = localization()
rclpy.spin(node)
rclpy.shutdown()Editor is loading...
Leave a Comment