Untitled
unknown
plain_text
a year ago
8.6 kB
4
Indexable
from bagpy import bagreader import pandas as pd import os import numpy as np ## define your rosbag file path and topic name here bagfile_path='2024-03-08-11-32-37.bag' topic_names = ['/imu/data', '/cmd_vel', '/jackal_velocity_controller/odom', '/diagnostics'] file_names = ['imu_data.csv', 'cmd_vel.csv', 'odom.csv', 'diagnostics.csv'] def generate_csv(topic_name, file_name,bagfile_path): b = bagreader(bagfile_path) msg = b.message_by_topic(topic_name) df_laser = pd.read_csv(msg) # df_laser.to_csv(file_name, index=False) # file_names = ['imu_data.csv', 'imu_data.csv', 'imu_data.csv', 'imu_data.csv'] for i in range(len(topic_names)): generate_csv(topic_names[i], file_names[i],bagfile_path) print(f"csv files generated for topic {topic_names[i]}") ## Loading generated csv files for filteration ## csv_files_directory = bagfile_path.split('.')[0] # Get the list of csv files in the directory csv_files = [f for f in os.listdir(csv_files_directory) if f.endswith('.csv')] if not csv_files: # Check if the list is empty print("No CSV files found in the directory.") else: print(csv_files) # Since we now know there's at least one file, we can safely read them. # Let's also add a loop to read each file found instead of assuming the exact number. for i, file_name in enumerate(csv_files): df = pd.read_csv(os.path.join(csv_files_directory, file_name)) print(f"Contents of {file_name}:") ## round off Time to 1 decimal place in each csv file and overwrite the file for file_name in csv_files: df = pd.read_csv(os.path.join(csv_files_directory, file_name)) df['Time'] = df['Time'].round(1) df.to_csv(os.path.join(csv_files_directory, file_name), index=False) print(f"Time rounded off in {file_name} and saved.") # access complete path of the csv files csv_files = [os.path.join(csv_files_directory, f) for f in csv_files] # processing each file based on their names for file_name in csv_files: if 'imu' in file_name: print(f"Processing IMU data in {file_name}...") process_imu_data(file_name) elif 'cmd_vel' in file_name: print(f"Processing cmd_vel data in {file_name}...") process_cmd_vel_data(file_name) elif 'odom' in file_name: print(f"Processing odom data in {file_name}...") # process odom data process_odom_data(file_name) elif 'diagnostics' in file_name: print(f"Processing diagnostics data in {file_name}...") # process diagnostics data else: print(f"Unknown file: {file_name}") # Load the CSV file csv_path = 'imu-data.csv' def process_imu_data(csv_path): # Load the CSV file df = pd.read_csv(csv_path) # Calculate the trace of orientation covariance matrix for each row df['orientation_covariance_trace'] = df[['orientation_covariance_0', 'orientation_covariance_4', 'orientation_covariance_8']].sum(axis=1) # Calculate the trace of linear acceleration covariance matrix for each row df['linear_acceleration_covariance_trace'] = df[['linear_acceleration_covariance_0', 'linear_acceleration_covariance_4', 'linear_acceleration_covariance_8']].sum(axis=1) # List of original covariance columns to remove covariance_columns = [col for col in df.columns if 'covariance' in col and 'trace' not in col] # Remove the original covariance matrix columns df = df.drop(columns=covariance_columns) df = df.drop(columns=['header.seq', 'header.stamp.nsecs', 'header.stamp.secs']) # Ensure the DataFrame is sorted by Time df = df.sort_values(by='Time') # df=df.drop(columns=['header.seq', 'header.stamp.nsecs','header.stamp.secs' ]) # Filter rows, keeping only those 100ms apart filtered_rows = [True] # Keep the first row last_time_kept = df['Time'].iloc[0] for current_time in df['Time'][1:]: if current_time - last_time_kept >= 0.1: filtered_rows.append(True) last_time_kept = current_time else: filtered_rows.append(False) df_filtered = df[filtered_rows] # Reset index of the filtered DataFrame df_filtered = df_filtered.reset_index(drop=True) # Save the modified DataFrame to a new folder called 'filtered' and create one if doesn't exist filtered_csv_folder = 'filtered' os.makedirs(filtered_csv_folder, exist_ok=True) filtered_csv_path = os.path.join(filtered_csv_folder, os.path.basename(csv_path)) df_filtered.to_csv(filtered_csv_path, index=False) # modified_csv_path # process_imu_data('2024-03-08-11-32-37\imu-data.csv') def process_cmd_vel_data(csv_path): # Load the CSV file df = pd.read_csv(csv_path) # Ensure the DataFrame is sorted by Time df = df.sort_values(by='Time') # Filter rows, keeping only those 100ms apart filtered_rows = [True] # Keep the first row last_time_kept = df['Time'].iloc[0] for current_time in df['Time'][1:]: if current_time - last_time_kept >= 0.1: filtered_rows.append(True) last_time_kept = current_time else: filtered_rows.append(False) df = df[filtered_rows] # Reset index of the filtered DataFrame df = df.reset_index(drop=True) # Save the modified DataFrame to a new folder called 'filtered' and create one if doesn't exist filtered_csv_folder = 'filtered' os.makedirs(filtered_csv_folder, exist_ok=True) filtered_csv_path = os.path.join(filtered_csv_folder, os.path.basename(csv_path)) df.to_csv(filtered_csv_path, index=False) # process_cmd_vel_data("2024-03-08-11-32-37\cmd_vel.csv") def parse_covariance_and_calculate_trace(covariance_str): # Remove parentheses and split by comma covariance_values = covariance_str.strip("()").split(", ") # Convert to numpy array of floats covariance_array = np.array(covariance_values, dtype=float) # Reshape to 6x6 matrix covariance_matrix = covariance_array.reshape(6, 6) # Calculate the trace trace = np.trace(covariance_matrix) return trace # Load your CSV csv_path = 'jackal_velocity_controller-odom.csv' # Update this path to your CSV file def process_odom_data(csv_path): df = pd.read_csv(csv_path) # Apply the function to calculate traces for pose.covariance and twist.covariance df['pose_covariance_trace'] = df['pose.covariance'].apply(parse_covariance_and_calculate_trace) df['twist_covariance_trace'] = df['twist.covariance'].apply(parse_covariance_and_calculate_trace) covariance_columns = [col for col in df.columns if 'covariance' in col and 'trace' not in col] # Remove the original covariance matrix columns df= df.drop(columns=covariance_columns) df = df.drop(columns=['header.seq', 'header.stamp.nsecs', 'header.stamp.secs']) # Ensure the DataFrame is sorted by Time df = df.sort_values(by='Time') # df=df.drop(columns=['header.seq', 'header.stamp.nsecs','header.stamp.secs' ]) # Filter rows, keeping only those 100ms apart filtered_rows = [True] # Keep the first row last_time_kept = df['Time'].iloc[0] for current_time in df['Time'][1:]: if current_time - last_time_kept >= 0.1: filtered_rows.append(True) last_time_kept = current_time else: filtered_rows.append(False) df_filtered = df[filtered_rows] # Reset index of the filtered DataFrame df = df_filtered.reset_index(drop=True) filtered_csv_folder = 'filtered' os.makedirs(filtered_csv_folder, exist_ok=True) filtered_csv_path = os.path.join(filtered_csv_folder, os.path.basename(csv_path)) df.to_csv(filtered_csv_path, index=False) print("Processing complete for odom data") # process_odom_data('2024-03-08-11-32-37\jackal_velocity_controller-odom.csv') def process_diagnostic_data(csv_path): df = pd.read_csv(csv_path) filtered_csv_folder = 'filtered' os.makedirs(filtered_csv_folder, exist_ok=True) filtered_csv_path = os.path.join(filtered_csv_folder, os.path.basename(csv_path)) df.to_csv(filtered_csv_path, index=False) print("Processing complete for diagnostics data") process_diagnostic_data('2024-03-08-11-32-37\diagnostics.csv')
Editor is loading...
Leave a Comment