Untitled
unknown
plain_text
2 years ago
8.6 kB
8
Indexable
from bagpy import bagreader
import pandas as pd
import os
import numpy as np
## define your rosbag file path and topic name here
bagfile_path='2024-03-08-11-32-37.bag'
topic_names = ['/imu/data', '/cmd_vel', '/jackal_velocity_controller/odom', '/diagnostics']
file_names = ['imu_data.csv', 'cmd_vel.csv', 'odom.csv', 'diagnostics.csv']
def generate_csv(topic_name, file_name,bagfile_path):
b = bagreader(bagfile_path)
msg = b.message_by_topic(topic_name)
df_laser = pd.read_csv(msg)
# df_laser.to_csv(file_name, index=False)
# file_names = ['imu_data.csv', 'imu_data.csv', 'imu_data.csv', 'imu_data.csv']
for i in range(len(topic_names)):
generate_csv(topic_names[i], file_names[i],bagfile_path)
print(f"csv files generated for topic {topic_names[i]}")
## Loading generated csv files for filteration
##
csv_files_directory = bagfile_path.split('.')[0]
# Get the list of csv files in the directory
csv_files = [f for f in os.listdir(csv_files_directory) if f.endswith('.csv')]
if not csv_files: # Check if the list is empty
print("No CSV files found in the directory.")
else:
print(csv_files)
# Since we now know there's at least one file, we can safely read them.
# Let's also add a loop to read each file found instead of assuming the exact number.
for i, file_name in enumerate(csv_files):
df = pd.read_csv(os.path.join(csv_files_directory, file_name))
print(f"Contents of {file_name}:")
## round off Time to 1 decimal place in each csv file and overwrite the file
for file_name in csv_files:
df = pd.read_csv(os.path.join(csv_files_directory, file_name))
df['Time'] = df['Time'].round(1)
df.to_csv(os.path.join(csv_files_directory, file_name), index=False)
print(f"Time rounded off in {file_name} and saved.")
# access complete path of the csv files
csv_files = [os.path.join(csv_files_directory, f) for f in csv_files]
# processing each file based on their names
for file_name in csv_files:
if 'imu' in file_name:
print(f"Processing IMU data in {file_name}...")
process_imu_data(file_name)
elif 'cmd_vel' in file_name:
print(f"Processing cmd_vel data in {file_name}...")
process_cmd_vel_data(file_name)
elif 'odom' in file_name:
print(f"Processing odom data in {file_name}...")
# process odom data
process_odom_data(file_name)
elif 'diagnostics' in file_name:
print(f"Processing diagnostics data in {file_name}...")
# process diagnostics data
else:
print(f"Unknown file: {file_name}")
# Load the CSV file
csv_path = 'imu-data.csv'
def process_imu_data(csv_path):
# Load the CSV file
df = pd.read_csv(csv_path)
# Calculate the trace of orientation covariance matrix for each row
df['orientation_covariance_trace'] = df[['orientation_covariance_0', 'orientation_covariance_4', 'orientation_covariance_8']].sum(axis=1)
# Calculate the trace of linear acceleration covariance matrix for each row
df['linear_acceleration_covariance_trace'] = df[['linear_acceleration_covariance_0', 'linear_acceleration_covariance_4', 'linear_acceleration_covariance_8']].sum(axis=1)
# List of original covariance columns to remove
covariance_columns = [col for col in df.columns if 'covariance' in col and 'trace' not in col]
# Remove the original covariance matrix columns
df = df.drop(columns=covariance_columns)
df = df.drop(columns=['header.seq', 'header.stamp.nsecs', 'header.stamp.secs'])
# Ensure the DataFrame is sorted by Time
df = df.sort_values(by='Time')
# df=df.drop(columns=['header.seq', 'header.stamp.nsecs','header.stamp.secs' ])
# Filter rows, keeping only those 100ms apart
filtered_rows = [True] # Keep the first row
last_time_kept = df['Time'].iloc[0]
for current_time in df['Time'][1:]:
if current_time - last_time_kept >= 0.1:
filtered_rows.append(True)
last_time_kept = current_time
else:
filtered_rows.append(False)
df_filtered = df[filtered_rows]
# Reset index of the filtered DataFrame
df_filtered = df_filtered.reset_index(drop=True)
# Save the modified DataFrame to a new folder called 'filtered' and create one if doesn't exist
filtered_csv_folder = 'filtered'
os.makedirs(filtered_csv_folder, exist_ok=True)
filtered_csv_path = os.path.join(filtered_csv_folder, os.path.basename(csv_path))
df_filtered.to_csv(filtered_csv_path, index=False)
# modified_csv_path
# process_imu_data('2024-03-08-11-32-37\imu-data.csv')
def process_cmd_vel_data(csv_path):
# Load the CSV file
df = pd.read_csv(csv_path)
# Ensure the DataFrame is sorted by Time
df = df.sort_values(by='Time')
# Filter rows, keeping only those 100ms apart
filtered_rows = [True] # Keep the first row
last_time_kept = df['Time'].iloc[0]
for current_time in df['Time'][1:]:
if current_time - last_time_kept >= 0.1:
filtered_rows.append(True)
last_time_kept = current_time
else:
filtered_rows.append(False)
df = df[filtered_rows]
# Reset index of the filtered DataFrame
df = df.reset_index(drop=True)
# Save the modified DataFrame to a new folder called 'filtered' and create one if doesn't exist
filtered_csv_folder = 'filtered'
os.makedirs(filtered_csv_folder, exist_ok=True)
filtered_csv_path = os.path.join(filtered_csv_folder, os.path.basename(csv_path))
df.to_csv(filtered_csv_path, index=False)
# process_cmd_vel_data("2024-03-08-11-32-37\cmd_vel.csv")
def parse_covariance_and_calculate_trace(covariance_str):
# Remove parentheses and split by comma
covariance_values = covariance_str.strip("()").split(", ")
# Convert to numpy array of floats
covariance_array = np.array(covariance_values, dtype=float)
# Reshape to 6x6 matrix
covariance_matrix = covariance_array.reshape(6, 6)
# Calculate the trace
trace = np.trace(covariance_matrix)
return trace
# Load your CSV
csv_path = 'jackal_velocity_controller-odom.csv' # Update this path to your CSV file
def process_odom_data(csv_path):
df = pd.read_csv(csv_path)
# Apply the function to calculate traces for pose.covariance and twist.covariance
df['pose_covariance_trace'] = df['pose.covariance'].apply(parse_covariance_and_calculate_trace)
df['twist_covariance_trace'] = df['twist.covariance'].apply(parse_covariance_and_calculate_trace)
covariance_columns = [col for col in df.columns if 'covariance' in col and 'trace' not in col]
# Remove the original covariance matrix columns
df= df.drop(columns=covariance_columns)
df = df.drop(columns=['header.seq', 'header.stamp.nsecs', 'header.stamp.secs'])
# Ensure the DataFrame is sorted by Time
df = df.sort_values(by='Time')
# df=df.drop(columns=['header.seq', 'header.stamp.nsecs','header.stamp.secs' ])
# Filter rows, keeping only those 100ms apart
filtered_rows = [True] # Keep the first row
last_time_kept = df['Time'].iloc[0]
for current_time in df['Time'][1:]:
if current_time - last_time_kept >= 0.1:
filtered_rows.append(True)
last_time_kept = current_time
else:
filtered_rows.append(False)
df_filtered = df[filtered_rows]
# Reset index of the filtered DataFrame
df = df_filtered.reset_index(drop=True)
filtered_csv_folder = 'filtered'
os.makedirs(filtered_csv_folder, exist_ok=True)
filtered_csv_path = os.path.join(filtered_csv_folder, os.path.basename(csv_path))
df.to_csv(filtered_csv_path, index=False)
print("Processing complete for odom data")
# process_odom_data('2024-03-08-11-32-37\jackal_velocity_controller-odom.csv')
def process_diagnostic_data(csv_path):
df = pd.read_csv(csv_path)
filtered_csv_folder = 'filtered'
os.makedirs(filtered_csv_folder, exist_ok=True)
filtered_csv_path = os.path.join(filtered_csv_folder, os.path.basename(csv_path))
df.to_csv(filtered_csv_path, index=False)
print("Processing complete for diagnostics data")
process_diagnostic_data('2024-03-08-11-32-37\diagnostics.csv')
Editor is loading...
Leave a Comment