Multiprocessing Path Planning

mail@pastecode.io avatar
unknown
python
a year ago
7.3 kB
2
Indexable
Never
from functools import partial
import gc
import itertools
from multiprocessing import Lock, Manager
from multiprocessing.pool import ThreadPool
import threading
from typing import Tuple
import csv
import ompl.util

import numpy as np

from mcs.demos.dyan_work.PathPlanner import IKPlannerWithRecycling, PathNotFoundError, check_path_validity
from timor import Geometry, ModuleAssembly, ModulesDB
from timor.task import Obstacle

from timor.task.Task import Task, TaskHeader
import pandas as pd

from timor.utilities import logging, spatial
from timor.utilities.tolerated_pose import ToleratedPose


def process_pair(pair: Tuple, task: Task, lock: Lock, file_to_write: str) -> int:
    thread_id = str(threading.get_ident())
    print("Thread id: " + thread_id + pair.__repr__() + ".\n", flush=True)
    # define consts
    db = ModulesDB.from_name('modrob-gen2')
    loop_number = 50
    time_limit = 30

    # set base pose with 4*4 transformation matrix
    base_pose = [[1, 0, 0, 0],
                 [0, 1, 0, 0.1],
                 [0, 0, 1, 0.035],
                 [0, 0, 0, 1]]

    goal_pose = ToleratedPose([[1, 0, 0, -0.9],
                               [0, 0.7071, -0.7071, 0.3],
                               [0, 0.7071, 0.7071, 0.5],
                               [0, 0, 0, 1]])

    stat_planning = np.array([])
    stat_solving = np.array([])

    modules_1 = pair[0]
    modules_2 = pair[1]

    assembly = ModuleAssembly.from_serial_modules(db, modules_1)
    new_assembly = ModuleAssembly.from_serial_modules(db, modules_2)

    robot = assembly.robot
    new_robot = new_assembly.robot

    robot.set_base_placement(base_pose)
    new_robot.set_base_placement(base_pose)

    q0 = robot.configuration

    if new_robot.njoints != robot.njoints:
        return -1

    print("Initialization for the thread " + thread_id + " done, starting planning from scratch. \n", flush=True)

    # plan from scratch
    for _ in range(loop_number):

        robot.update_configuration(q0)
        ik_planner = IKPlannerWithRecycling(robot, task,
                                            time_limit=time_limit,
                                            simplify_path=True)
        try:
            _ = ik_planner.find_path(q0, goal_pose)

            stat_solving = np.append(stat_solving, ik_planner.solving_time)
            stat_planning = np.append(stat_planning, ik_planner.planning_time)
        except (PathNotFoundError, TimeoutError) as e:
            print("Error in planning from scratch for the thread " + thread_id + ". \n", flush=True)
            continue
        finally:
            del ik_planner
            gc.collect()
    print("Planning from scratch for the thread " + thread_id + " done, starting planning with reuse. \n", flush=True)
    # plan with reuse
    stat_planning_reuse = np.array([])
    stat_solving_reuse = np.array([])
    for _ in range(loop_number):
        robot.update_configuration(q0)
        new_robot.update_configuration(q0)

        new_ik_planner = IKPlannerWithRecycling(new_robot, task,
                                                time_limit=time_limit,
                                                simplify_path=True,
                                                save_path_state=True)
        try:
            path_to_ik = new_ik_planner.find_path(q0, goal_pose)
            saved_path = new_ik_planner.saved_path
        except (PathNotFoundError, TimeoutError):
            print("Error in planning with reuse for the thread " + thread_id + ". \n", flush=True)
            continue
        finally:
            del new_ik_planner
            gc.collect()

        if not check_path_validity(robot, task, path_to_ik):
            print("Path is not valid for reuse for the thread " + thread_id + ". \n", flush=True)
            continue

        robot.update_configuration(q0)

        ik_planner = IKPlannerWithRecycling(robot, task,
                                            time_limit=time_limit,
                                            simplify_path=True,
                                            save_path_state=True,
                                            suggested_path=saved_path, ik_candidates=path_to_ik)

        try:
            _ = ik_planner.find_path(path_to_ik.q[-1], goal_pose)
            time_planning = ik_planner.planning_time
            time_solving = ik_planner.solving_time

            stat_solving_reuse = np.append(stat_solving_reuse, time_solving)
            stat_planning_reuse = np.append(stat_planning_reuse, time_planning)
        except (PathNotFoundError, TimeoutError):
            print("Error in reuse for the thread " + thread_id + ". \n", flush=True)
            continue
        finally:
            del ik_planner
            gc.collect()

    if len(stat_solving_reuse) == 0:
        print("Length of statistics is 0 for the thread " + thread_id + ". \n", flush=True)
        return -1

    row = [repr(assembly.original_module_ids), repr(new_assembly.original_module_ids),
           stat_planning_reuse.mean(),
           stat_solving_reuse.mean(), stat_planning.mean(), stat_solving.mean()]

    print("Full success for the thread " + thread_id + ". \n", flush=True)
    with lock:
        with open(file_to_write, mode='a+', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(row)
    return 1


def get_modules(filename):
    """ Loads modules from csv file. """
    df = pd.read_csv(filename, header=None).values
    result = []
    for t in df:
        res = t[0][1:-1].replace("'", "").replace(" ", "").split(",")
        result.append(res)
    del df
    gc.collect()
    return result


def create_task() -> Task:
    """ Creates a task for experiment purposes."""

    # define obstacle
    box = Geometry.Box(dict(x=.3, y=.8, z=0.6), spatial.homogeneous([.7, .6, .31]))
    tabletop = Geometry.Box(dict(x=3.5, y=1.0, z=.05))
    legs = [Geometry.Box(dict(x=.07, y=.07, z=.5), spatial.homogeneous([.5 - 1 * (i % 2), .4 - .8 * (i // 2), -.25]))
            for i in range(4)]
    table = Geometry.ComposedGeometry((box, tabletop, *legs))
    obstacle = Obstacle.Obstacle('123', collision=table, name='Table')

    # define task
    header = TaskHeader('own_defined_task_for_sa')
    task = Task(header)
    task.add_obstacle(obstacle)
    return task


def main():
    file_to_read = 'data_25_07/25_07_random20.csv'
    file_to_write = "data_25_07/results_25_07_random_202.csv"
    task_to_solve = create_task()
    all_modules = get_modules(file_to_read)
    cartesian_product_modules = list(itertools.product(all_modules, all_modules))
    del all_modules
    gc.collect()

    with open(file_to_write, mode='a+', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(
            ['assembly', 'assembly_reused', 'time planning reuse', 'time solving reuse', 'time planning scratch',
             'time solving scratch'])

    with ThreadPool(4) as pool:
        m = Manager()
        lock = m.Lock()
        func = partial(process_pair, task=task_to_solve, lock=lock, file_to_write=file_to_write)
        pool.map(func, list(cartesian_product_modules))


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.ERROR)
    ompl.util.setLogLevel(ompl.util.LogLevel.LOG_ERROR)
    main()