Multiprocessing Path Planning
unknown
python
2 years ago
7.3 kB
6
Indexable
from functools import partial import gc import itertools from multiprocessing import Lock, Manager from multiprocessing.pool import ThreadPool import threading from typing import Tuple import csv import ompl.util import numpy as np from mcs.demos.dyan_work.PathPlanner import IKPlannerWithRecycling, PathNotFoundError, check_path_validity from timor import Geometry, ModuleAssembly, ModulesDB from timor.task import Obstacle from timor.task.Task import Task, TaskHeader import pandas as pd from timor.utilities import logging, spatial from timor.utilities.tolerated_pose import ToleratedPose def process_pair(pair: Tuple, task: Task, lock: Lock, file_to_write: str) -> int: thread_id = str(threading.get_ident()) print("Thread id: " + thread_id + pair.__repr__() + ".\n", flush=True) # define consts db = ModulesDB.from_name('modrob-gen2') loop_number = 50 time_limit = 30 # set base pose with 4*4 transformation matrix base_pose = [[1, 0, 0, 0], [0, 1, 0, 0.1], [0, 0, 1, 0.035], [0, 0, 0, 1]] goal_pose = ToleratedPose([[1, 0, 0, -0.9], [0, 0.7071, -0.7071, 0.3], [0, 0.7071, 0.7071, 0.5], [0, 0, 0, 1]]) stat_planning = np.array([]) stat_solving = np.array([]) modules_1 = pair[0] modules_2 = pair[1] assembly = ModuleAssembly.from_serial_modules(db, modules_1) new_assembly = ModuleAssembly.from_serial_modules(db, modules_2) robot = assembly.robot new_robot = new_assembly.robot robot.set_base_placement(base_pose) new_robot.set_base_placement(base_pose) q0 = robot.configuration if new_robot.njoints != robot.njoints: return -1 print("Initialization for the thread " + thread_id + " done, starting planning from scratch. \n", flush=True) # plan from scratch for _ in range(loop_number): robot.update_configuration(q0) ik_planner = IKPlannerWithRecycling(robot, task, time_limit=time_limit, simplify_path=True) try: _ = ik_planner.find_path(q0, goal_pose) stat_solving = np.append(stat_solving, ik_planner.solving_time) stat_planning = np.append(stat_planning, ik_planner.planning_time) except (PathNotFoundError, TimeoutError) as e: print("Error in planning from scratch for the thread " + thread_id + ". \n", flush=True) continue finally: del ik_planner gc.collect() print("Planning from scratch for the thread " + thread_id + " done, starting planning with reuse. \n", flush=True) # plan with reuse stat_planning_reuse = np.array([]) stat_solving_reuse = np.array([]) for _ in range(loop_number): robot.update_configuration(q0) new_robot.update_configuration(q0) new_ik_planner = IKPlannerWithRecycling(new_robot, task, time_limit=time_limit, simplify_path=True, save_path_state=True) try: path_to_ik = new_ik_planner.find_path(q0, goal_pose) saved_path = new_ik_planner.saved_path except (PathNotFoundError, TimeoutError): print("Error in planning with reuse for the thread " + thread_id + ". \n", flush=True) continue finally: del new_ik_planner gc.collect() if not check_path_validity(robot, task, path_to_ik): print("Path is not valid for reuse for the thread " + thread_id + ". \n", flush=True) continue robot.update_configuration(q0) ik_planner = IKPlannerWithRecycling(robot, task, time_limit=time_limit, simplify_path=True, save_path_state=True, suggested_path=saved_path, ik_candidates=path_to_ik) try: _ = ik_planner.find_path(path_to_ik.q[-1], goal_pose) time_planning = ik_planner.planning_time time_solving = ik_planner.solving_time stat_solving_reuse = np.append(stat_solving_reuse, time_solving) stat_planning_reuse = np.append(stat_planning_reuse, time_planning) except (PathNotFoundError, TimeoutError): print("Error in reuse for the thread " + thread_id + ". \n", flush=True) continue finally: del ik_planner gc.collect() if len(stat_solving_reuse) == 0: print("Length of statistics is 0 for the thread " + thread_id + ". \n", flush=True) return -1 row = [repr(assembly.original_module_ids), repr(new_assembly.original_module_ids), stat_planning_reuse.mean(), stat_solving_reuse.mean(), stat_planning.mean(), stat_solving.mean()] print("Full success for the thread " + thread_id + ". \n", flush=True) with lock: with open(file_to_write, mode='a+', newline='') as file: writer = csv.writer(file) writer.writerow(row) return 1 def get_modules(filename): """ Loads modules from csv file. """ df = pd.read_csv(filename, header=None).values result = [] for t in df: res = t[0][1:-1].replace("'", "").replace(" ", "").split(",") result.append(res) del df gc.collect() return result def create_task() -> Task: """ Creates a task for experiment purposes.""" # define obstacle box = Geometry.Box(dict(x=.3, y=.8, z=0.6), spatial.homogeneous([.7, .6, .31])) tabletop = Geometry.Box(dict(x=3.5, y=1.0, z=.05)) legs = [Geometry.Box(dict(x=.07, y=.07, z=.5), spatial.homogeneous([.5 - 1 * (i % 2), .4 - .8 * (i // 2), -.25])) for i in range(4)] table = Geometry.ComposedGeometry((box, tabletop, *legs)) obstacle = Obstacle.Obstacle('123', collision=table, name='Table') # define task header = TaskHeader('own_defined_task_for_sa') task = Task(header) task.add_obstacle(obstacle) return task def main(): file_to_read = 'data_25_07/25_07_random20.csv' file_to_write = "data_25_07/results_25_07_random_202.csv" task_to_solve = create_task() all_modules = get_modules(file_to_read) cartesian_product_modules = list(itertools.product(all_modules, all_modules)) del all_modules gc.collect() with open(file_to_write, mode='a+', newline='') as file: writer = csv.writer(file) writer.writerow( ['assembly', 'assembly_reused', 'time planning reuse', 'time solving reuse', 'time planning scratch', 'time solving scratch']) with ThreadPool(4) as pool: m = Manager() lock = m.Lock() func = partial(process_pair, task=task_to_solve, lock=lock, file_to_write=file_to_write) pool.map(func, list(cartesian_product_modules)) if __name__ == '__main__': logging.getLogger().setLevel(logging.ERROR) ompl.util.setLogLevel(ompl.util.LogLevel.LOG_ERROR) main()
Editor is loading...