Untitled

 avatar
unknown
plain_text
a year ago
12 kB
2
Indexable
import rclpy
import numpy as np
import rclpy.node
import sensor_msgs
import geometry_msgs
import matplotlib.pyplot
import scipy.spatial.transform 
import sklearn.neighbors
import copy
#import my_turtlebot.utils.icp 
import tf2_ros.transform_broadcaster
class localization(rclpy.node.Node):
    def __init__(self):
        super().__init__('localization_node')
        self.lidar_new = None
        self.lidar_old = None
        self.pose = []
        self.velocity = None
        #self.position_new = np.array([0, 0, 0])
        self.position_old = np.array([0, 0, 0])
        self.runs = 100
        self.time = 1
        self.initial_robot_pose = None
        self._tf = tf2_ros.TransformBroadcaster(self)
        self.subscription = self.create_subscription(
            sensor_msgs.msg.LaserScan,
            '/scan',
            self.new_scan,
            10
        )
        self.velocity_subscription = self.create_subscription(
            geometry_msgs.msg.Twist,
            '/cmd_vel',
            self.new_velocity,
            10
        )
        self.timer = self.create_timer(self.time, self.update_and_plot) #reduce update rate to avoid crashes in rviz
    
    def new_scan(self,msg):
        self.lidar_new = self.point_cloud(msg)

    def new_velocity(self, msg):
        self.velocity = msg

    def ICP(self, current_point_cloud, reference_point_cloud, convergence_threshold=10e-6):
        num_iterations = 0
        current_points = current_point_cloud.copy()
        prev_rmse = 0
        translation_vector = np.zeros(3) # Creates a 1D array
        translation_vector = translation_vector.reshape((3, 1)) 
        rotation_matrix = np.eye(3, 3)

        while True:
            nearest_neighbors = self.find_nearest_neighbors(current_points, reference_point_cloud)
            rotation_matrix, translation_vector = self.register_point_clouds(current_point_cloud, nearest_neighbors)
            current_points = self.apply_transform(current_point_cloud, rotation_matrix, translation_vector)
            rmse = self.calculate_rmse(current_points, nearest_neighbors)
            if np.abs(rmse - prev_rmse) < convergence_threshold:
                break
            prev_rmse = rmse
            num_iterations +=1

        return rotation_matrix, translation_vector, num_iterations

    def calculate_rmse(self,point_cloud_1,point_cloud_2):
        squared_differences = (point_cloud_1 - point_cloud_2) ** 2
        sum_of_squares = np.sum(squared_differences, axis=0)
        mean_of_squares = np.mean(sum_of_squares)  # More efficient than dividing in the sqrt
        rmse = np.sqrt(mean_of_squares)
        return rmse

    def apply_transform(self, point_cloud, rotation_matrix, translation_vector):
        return np.dot(rotation_matrix, point_cloud) + translation_vector

    def apply_inverse_transform(self, point_cloud, rotation_matrix, translation_vector):
        return np.dot(rotation_matrix.T, point_cloud - translation_vector)

    def calculate_transform_errors(self, rotation_matrix_1, translation_vector_1, rotation_matrix_2, translation_vector_2):

        rotation_error = np.sum(np.abs(rotation_matrix_1 - rotation_matrix_2))
        translation_error = np.sum(np.abs(translation_vector_1 - translation_vector_2))

        return rotation_error, translation_error
    
    def register_point_clouds(self, source_point_cloud, target_point_cloud):

        centered_source = source_point_cloud - np.mean(source_point_cloud, axis=1, keepdims=True)
        centered_target = target_point_cloud - np.mean(target_point_cloud, axis=1, keepdims=True)

        covariance_matrix = np.dot(centered_source, centered_target.T)
        U, _, Vh = np.linalg.svd(covariance_matrix)
        rotation_matrix = np.dot(U, Vh).T

        # Ensure right-handed coordinate system
        if np.linalg.det(rotation_matrix) < 0:
            Vh[2, :] *= -1
            rotation_matrix = np.dot(U, Vh).T

        translation_vector = np.mean(target_point_cloud, axis=1, keepdims=True) - np.dot(rotation_matrix, np.mean(source_point_cloud, axis=1, keepdims=True))

        return rotation_matrix, translation_vector
    def find_nearest_neighbors(self, source_points, target_points):

        kdtree = sklearn.neighbors.KDTree(target_points.T, leaf_size=30, metric='euclidean')  
        nearest_neighbor_indices = kdtree.query(source_points.T, k=1, return_distance=False).reshape(-1) 
        return target_points[:, nearest_neighbor_indices]

    # def point_cloud(self, lidar_new):
    #     angles = np.arange(lidar_new.angle_min, lidar_new.angle_max, lidar_new.angle_increment)
    #     ranges = np.array(lidar_new.ranges)

    #     # Filter out invalid (infinite) range readings
    #     valid_indices = np.isfinite(ranges)  # Use isfinite for better handling of NaNs
    #     ranges = ranges[valid_indices]
    #     angles = angles[valid_indices]


    #     #MÅSKE SKAL DENNE LOGIK ÆNDRES
        
        
        
    #     # Convert polar coordinates to Cartesian 
    #     x = ranges * np.cos(angles)
    #     y = ranges * np.sin(angles)

    #     # Create the 3D point cloud with homogeneous coordinates 
    #     points = np.vstack((x, y, np.zeros(x.shape), np.ones(x.shape)))

    #     return points
    def point_cloud(self, scan):
        # Generate an array of angles based on the laser scan properties
        angles = np.linspace(scan.angle_min, scan.angle_max, num=len(scan.ranges))
        clean_ranges = np.nan_to_num(scan.ranges, nan=0.0, posinf=0.0, neginf=0.0)

        # Compute Cartesian coordinates
        x_coords = clean_ranges * np.cos(angles)
        y_coords = clean_ranges * np.sin(angles)
        
        # Combine x and y coordinates into a 2D array
        points = np.column_stack((x_coords, y_coords))
        
        # Convert to 3D points in homogeneous coordinates (add a z-coordinate of 0 and a homogeneous coordinate of 1)
        points_3d_homogeneous = np.vstack((points.T, np.zeros(points.shape[0]), np.ones(points.shape[0])))
        
        return points_3d_homogeneous

    def prediction_model_position(self):
        if len(self.pose) < 2:
            return np.eye(4)  # No prediction possible, return identity

        # Calculate relative transformation based on the last two pose
        previous_pose = self.pose[-2]
        current_pose = self.pose[-1]
        relative_transformation = np.linalg.inv(previous_pose) @ current_pose

        return relative_transformation

    def prediction_model_velocity(self):
        T = np.eye(4)
        # Translation component based on linear velocity
        T[:3, 3] = np.array([self.velocity.linear.x, 0.0, 0.0]) * self.time

        # Rotation component based on angular velocity 
        rotation_z = scipy.spatial.transform.Rotation.from_euler('z', self.velocity.angular.z * self.time, degrees=False)
        T[:3, :3] = rotation_z.as_matrix()

        return T
    def position_new(self, lidar_new):
        # Calculates the prediction
        prediction_transform = self.prediction_model_position()  # Reuse refactored function

        previous_pose = self.pose[-1] if self.pose else np.eye(4)

        if self.lidar_old is None:
            self.lidar_old = prediction_transform @ lidar_new
            return prediction_transform

        # Prepare data for ICP
        initial_guess = previous_pose @ prediction_transform
        source_points = initial_guess @ lidar_new
        target_points = self.lidar_old 
        #
        

        # ICP Matching
        rotation, translation, _ = self.ICP(source_points[:3, :], target_points[:3, :], self.runs)
        refinement_transform = np.eye(4)
        refinement_transform[:3, :3] = rotation
        refinement_transform[:3, 3] = translation.reshape(-1)

        # Update pose estimate
        refined_pose = refinement_transform @ previous_pose @ prediction_transform
        self.lidar_old = refined_pose @ lidar_new
        self.pose.append(refined_pose)

        # if self.initial_robot_pose is None:  # Store the initial pose
        #     self.initial_robot_pose = refined_pose.copy()

        return refined_pose


    def visualize_point_clouds(self, 
                            transformed_points, 
                            reference_points, 
                            prev_robot_pose, 
                            current_robot_pose):

        fig, ax = matplotlib.pyplot.subplots(1, 1, figsize=(10, 10))

        # Scatter plots
        ax.scatter(self.lidar_old[0, :], self.lidar_old[1, :], 
                c='green', label=r'previous_scan')  # Previous LiDAR scan
        ax.scatter(transformed_points[0, :], transformed_points[1, :], 
                c='red', label=r'aligned_scan', s = 12)  # Transformed points
        ax.scatter(reference_points[0, :], reference_points[1, :], 
                c='blue', label=r'reference_scan', s = 12)  # Reference points

        # Robot pose markers
        # ax.scatter(self.initial_robot_pose[0, 3], self.initial_robot_pose[1, 3], 
        #         c='black', label='previous_robot_pose', s = 100, marker='o')
        ax.scatter(current_robot_pose[0, 3], current_robot_pose[1, 3], 
                c='black', label='current_robot_pose', s = 100, marker='X')

        # Formatting
        ax.set_xlabel('x')
        ax.set_ylabel('y') 
        velocity = prev_robot_pose[0, 3] / self.time  # Calculate directly
        #ax.set_title(f"Current speed in x direction: {velocity:.2f} m/s")  # f-string formatting
        speed_text = ax.text(0.7, 0.95, f"Current speed in x direction: {velocity:.2f} m/s", 
                     ha='center', va='top', transform=ax.transAxes, 
                     bbox=dict(facecolor='white', edgecolor='gray', alpha=0.8)) 
        ax.legend()
        ax.set_xlim(-10, 10)
        ax.set_ylim(-5, 5)

        matplotlib.pyplot.savefig('pointclouds.png') 

        # Show the plot interactively
        # matplotlib.pyplot.show() 
    def transform_broadcaster(self,position,rotation,frame_id="world"):
        transform_msg = geometry_msgs.msg.TransformStamped()
        transform_msg.header.stamp = self.get_clock().now().to_msg()
        transform_msg.header.frame_id = frame_id
        transform_msg.child_frame_id = 'lidar'

        transform_msg.transform.translation.x = position[0]
        transform_msg.transform.translation.y = position[1]
        transform_msg.transform.translation.z = position[2]

        quaternion = scipy.spatial.transform.Rotation.from_euler('z', rotation).as_quat()

        transform_msg.transform.rotation.x = quaternion[0]
        transform_msg.transform.rotation.y = quaternion[1]
        transform_msg.transform.rotation.z = quaternion[2]
        transform_msg.transform.rotation.w = quaternion[3]

        self._tf.sendTransform(transform_msg)
    def update_and_plot(self):
        if self.velocity is not None and self.lidar_new is not None:
            lidar_new = copy.deepcopy(self.lidar_new) 
            new_pose = self.position_new(lidar_new)  # Use refactored function
            self.position_old = new_pose[:3, 3]
            self.position_old[2] = scipy.spatial.transform.Rotation.from_matrix(new_pose[:3, :3]).as_euler('xyz')[-1]

            #if self.get_clock().now() == 0: #update every 5 seconds

            transformed_points = new_pose @ self.lidar_old
            self.visualize_point_clouds(transformed_points, 
                                        self.lidar_old, 
                                        new_pose, 
                                        new_pose) 

            # Broadcast Transform and Log Info
            self.transform_broadcaster(new_pose[:3, 3], self.position_old[2])
            self.get_logger().info(f"New position: \n{self.position_old}")

def main(args=None):
    rclpy.init(args=args)
    node = localization()
    rclpy.spin(node)
    rclpy.shutdown()
Editor is loading...
Leave a Comment