Untitled

 avatar
unknown
plain_text
a year ago
8.6 kB
4
Indexable
from bagpy import bagreader
import pandas as pd
import os
import numpy as np

## define your rosbag file path and topic name here

bagfile_path='2024-03-08-11-32-37.bag'
topic_names = ['/imu/data', '/cmd_vel', '/jackal_velocity_controller/odom', '/diagnostics']
file_names = ['imu_data.csv', 'cmd_vel.csv', 'odom.csv', 'diagnostics.csv']


def generate_csv(topic_name, file_name,bagfile_path):
    b = bagreader(bagfile_path)
    msg = b.message_by_topic(topic_name)
    df_laser = pd.read_csv(msg)
    # df_laser.to_csv(file_name, index=False)

# file_names = ['imu_data.csv', 'imu_data.csv', 'imu_data.csv', 'imu_data.csv']
for i in range(len(topic_names)):
    generate_csv(topic_names[i], file_names[i],bagfile_path)
    print(f"csv files generated for topic {topic_names[i]}")

## Loading generated csv files for filteration 

## 

csv_files_directory = bagfile_path.split('.')[0]

# Get the list of csv files in the directory
csv_files = [f for f in os.listdir(csv_files_directory) if f.endswith('.csv')]

if not csv_files:  # Check if the list is empty
    print("No CSV files found in the directory.")
else:
    print(csv_files)
    
    # Since we now know there's at least one file, we can safely read them.
    # Let's also add a loop to read each file found instead of assuming the exact number.
    for i, file_name in enumerate(csv_files):
        df = pd.read_csv(os.path.join(csv_files_directory, file_name))
        print(f"Contents of {file_name}:")
        
## round off Time to 1 decimal place in each csv file and overwrite the file
for file_name in csv_files:
    df = pd.read_csv(os.path.join(csv_files_directory, file_name))
    df['Time'] = df['Time'].round(1)
    df.to_csv(os.path.join(csv_files_directory, file_name), index=False)
    print(f"Time rounded off in {file_name} and saved.")

# access complete path of the csv files
csv_files = [os.path.join(csv_files_directory, f) for f in csv_files]
# processing each file based on their names
for file_name in csv_files:
    if 'imu' in file_name:
        print(f"Processing IMU data in {file_name}...")
        process_imu_data(file_name)
    elif 'cmd_vel' in file_name:
        print(f"Processing cmd_vel data in {file_name}...")
        process_cmd_vel_data(file_name)
        

    elif 'odom' in file_name:
        print(f"Processing odom data in {file_name}...")
        # process odom data
        process_odom_data(file_name)
    elif 'diagnostics' in file_name:
        print(f"Processing diagnostics data in {file_name}...")
        # process diagnostics data
        
    else:
        print(f"Unknown file: {file_name}")
        


# Load the CSV file
csv_path = 'imu-data.csv'

def process_imu_data(csv_path):
    # Load the CSV file
        
        df = pd.read_csv(csv_path)

        # Calculate the trace of orientation covariance matrix for each row
        df['orientation_covariance_trace'] = df[['orientation_covariance_0', 'orientation_covariance_4', 'orientation_covariance_8']].sum(axis=1)

        # Calculate the trace of linear acceleration covariance matrix for each row
        df['linear_acceleration_covariance_trace'] = df[['linear_acceleration_covariance_0', 'linear_acceleration_covariance_4', 'linear_acceleration_covariance_8']].sum(axis=1)

        # List of original covariance columns to remove
        covariance_columns = [col for col in df.columns if 'covariance' in col and 'trace' not in col]

        # Remove the original covariance matrix columns
        df = df.drop(columns=covariance_columns)

        df = df.drop(columns=['header.seq', 'header.stamp.nsecs', 'header.stamp.secs'])
        #  Ensure the DataFrame is sorted by Time
        df = df.sort_values(by='Time')
        # df=df.drop(columns=['header.seq', 'header.stamp.nsecs','header.stamp.secs' ])

        # Filter rows, keeping only those 100ms apart
        filtered_rows = [True]  # Keep the first row
        last_time_kept = df['Time'].iloc[0]

        for current_time in df['Time'][1:]:
            if current_time - last_time_kept >= 0.1:
                filtered_rows.append(True)
                last_time_kept = current_time
            else:
                filtered_rows.append(False)

        df_filtered = df[filtered_rows]

        # Reset index of the filtered DataFrame
        df_filtered = df_filtered.reset_index(drop=True)

        # Save the modified DataFrame to a new folder called 'filtered' and create one if doesn't exist
        filtered_csv_folder = 'filtered'
        os.makedirs(filtered_csv_folder, exist_ok=True)
        filtered_csv_path = os.path.join(filtered_csv_folder, os.path.basename(csv_path))
        df_filtered.to_csv(filtered_csv_path, index=False)

# modified_csv_path
        



# process_imu_data('2024-03-08-11-32-37\imu-data.csv')
def process_cmd_vel_data(csv_path):
    # Load the CSV file
    df = pd.read_csv(csv_path)
    #  Ensure the DataFrame is sorted by Time
    df = df.sort_values(by='Time')
    # Filter rows, keeping only those 100ms apart
    filtered_rows = [True]  # Keep the first row
    last_time_kept = df['Time'].iloc[0]

    for current_time in df['Time'][1:]:
        if current_time - last_time_kept >= 0.1:
            filtered_rows.append(True)
            last_time_kept = current_time
        else:
            filtered_rows.append(False)

    df = df[filtered_rows]

    # Reset index of the filtered DataFrame
    df = df.reset_index(drop=True)


    # Save the modified DataFrame to a new folder called 'filtered' and create one if doesn't exist
    filtered_csv_folder = 'filtered'
    os.makedirs(filtered_csv_folder, exist_ok=True)
    filtered_csv_path = os.path.join(filtered_csv_folder, os.path.basename(csv_path))
    df.to_csv(filtered_csv_path, index=False)
# process_cmd_vel_data("2024-03-08-11-32-37\cmd_vel.csv")


def parse_covariance_and_calculate_trace(covariance_str):
    # Remove parentheses and split by comma
    covariance_values = covariance_str.strip("()").split(", ")
    # Convert to numpy array of floats
    covariance_array = np.array(covariance_values, dtype=float)
    # Reshape to 6x6 matrix
    covariance_matrix = covariance_array.reshape(6, 6)
    # Calculate the trace
    trace = np.trace(covariance_matrix)
    return trace

# Load your CSV
csv_path = 'jackal_velocity_controller-odom.csv'  # Update this path to your CSV file

def process_odom_data(csv_path):
    df = pd.read_csv(csv_path)

    # Apply the function to calculate traces for pose.covariance and twist.covariance
    df['pose_covariance_trace'] = df['pose.covariance'].apply(parse_covariance_and_calculate_trace)
    df['twist_covariance_trace'] = df['twist.covariance'].apply(parse_covariance_and_calculate_trace)

    covariance_columns = [col for col in df.columns if 'covariance' in col and 'trace' not in col]

    # Remove the original covariance matrix columns
    df= df.drop(columns=covariance_columns)

    df = df.drop(columns=['header.seq', 'header.stamp.nsecs', 'header.stamp.secs'])
    #  Ensure the DataFrame is sorted by Time
    df = df.sort_values(by='Time')
    # df=df.drop(columns=['header.seq', 'header.stamp.nsecs','header.stamp.secs' ])

    # Filter rows, keeping only those 100ms apart
    filtered_rows = [True]  # Keep the first row
    last_time_kept = df['Time'].iloc[0]

    for current_time in df['Time'][1:]:
        if current_time - last_time_kept >= 0.1:
            filtered_rows.append(True)
            last_time_kept = current_time
        else:
            filtered_rows.append(False)

    df_filtered = df[filtered_rows]

    # Reset index of the filtered DataFrame
    df = df_filtered.reset_index(drop=True)
    filtered_csv_folder = 'filtered'
    os.makedirs(filtered_csv_folder, exist_ok=True)
    filtered_csv_path = os.path.join(filtered_csv_folder, os.path.basename(csv_path))
    df.to_csv(filtered_csv_path, index=False)
    print("Processing complete for odom data")
# process_odom_data('2024-03-08-11-32-37\jackal_velocity_controller-odom.csv')
def process_diagnostic_data(csv_path):
    df = pd.read_csv(csv_path)
    filtered_csv_folder = 'filtered'
    os.makedirs(filtered_csv_folder, exist_ok=True)
    filtered_csv_path = os.path.join(filtered_csv_folder, os.path.basename(csv_path))
    df.to_csv(filtered_csv_path, index=False)
    print("Processing complete for diagnostics data")

process_diagnostic_data('2024-03-08-11-32-37\diagnostics.csv')
Editor is loading...
Leave a Comment