Multiprocessing Path Planning
unknown
python
2 years ago
7.3 kB
10
Indexable
from functools import partial
import gc
import itertools
from multiprocessing import Lock, Manager
from multiprocessing.pool import ThreadPool
import threading
from typing import Tuple
import csv
import ompl.util
import numpy as np
from mcs.demos.dyan_work.PathPlanner import IKPlannerWithRecycling, PathNotFoundError, check_path_validity
from timor import Geometry, ModuleAssembly, ModulesDB
from timor.task import Obstacle
from timor.task.Task import Task, TaskHeader
import pandas as pd
from timor.utilities import logging, spatial
from timor.utilities.tolerated_pose import ToleratedPose
def process_pair(pair: Tuple, task: Task, lock: Lock, file_to_write: str) -> int:
thread_id = str(threading.get_ident())
print("Thread id: " + thread_id + pair.__repr__() + ".\n", flush=True)
# define consts
db = ModulesDB.from_name('modrob-gen2')
loop_number = 50
time_limit = 30
# set base pose with 4*4 transformation matrix
base_pose = [[1, 0, 0, 0],
[0, 1, 0, 0.1],
[0, 0, 1, 0.035],
[0, 0, 0, 1]]
goal_pose = ToleratedPose([[1, 0, 0, -0.9],
[0, 0.7071, -0.7071, 0.3],
[0, 0.7071, 0.7071, 0.5],
[0, 0, 0, 1]])
stat_planning = np.array([])
stat_solving = np.array([])
modules_1 = pair[0]
modules_2 = pair[1]
assembly = ModuleAssembly.from_serial_modules(db, modules_1)
new_assembly = ModuleAssembly.from_serial_modules(db, modules_2)
robot = assembly.robot
new_robot = new_assembly.robot
robot.set_base_placement(base_pose)
new_robot.set_base_placement(base_pose)
q0 = robot.configuration
if new_robot.njoints != robot.njoints:
return -1
print("Initialization for the thread " + thread_id + " done, starting planning from scratch. \n", flush=True)
# plan from scratch
for _ in range(loop_number):
robot.update_configuration(q0)
ik_planner = IKPlannerWithRecycling(robot, task,
time_limit=time_limit,
simplify_path=True)
try:
_ = ik_planner.find_path(q0, goal_pose)
stat_solving = np.append(stat_solving, ik_planner.solving_time)
stat_planning = np.append(stat_planning, ik_planner.planning_time)
except (PathNotFoundError, TimeoutError) as e:
print("Error in planning from scratch for the thread " + thread_id + ". \n", flush=True)
continue
finally:
del ik_planner
gc.collect()
print("Planning from scratch for the thread " + thread_id + " done, starting planning with reuse. \n", flush=True)
# plan with reuse
stat_planning_reuse = np.array([])
stat_solving_reuse = np.array([])
for _ in range(loop_number):
robot.update_configuration(q0)
new_robot.update_configuration(q0)
new_ik_planner = IKPlannerWithRecycling(new_robot, task,
time_limit=time_limit,
simplify_path=True,
save_path_state=True)
try:
path_to_ik = new_ik_planner.find_path(q0, goal_pose)
saved_path = new_ik_planner.saved_path
except (PathNotFoundError, TimeoutError):
print("Error in planning with reuse for the thread " + thread_id + ". \n", flush=True)
continue
finally:
del new_ik_planner
gc.collect()
if not check_path_validity(robot, task, path_to_ik):
print("Path is not valid for reuse for the thread " + thread_id + ". \n", flush=True)
continue
robot.update_configuration(q0)
ik_planner = IKPlannerWithRecycling(robot, task,
time_limit=time_limit,
simplify_path=True,
save_path_state=True,
suggested_path=saved_path, ik_candidates=path_to_ik)
try:
_ = ik_planner.find_path(path_to_ik.q[-1], goal_pose)
time_planning = ik_planner.planning_time
time_solving = ik_planner.solving_time
stat_solving_reuse = np.append(stat_solving_reuse, time_solving)
stat_planning_reuse = np.append(stat_planning_reuse, time_planning)
except (PathNotFoundError, TimeoutError):
print("Error in reuse for the thread " + thread_id + ". \n", flush=True)
continue
finally:
del ik_planner
gc.collect()
if len(stat_solving_reuse) == 0:
print("Length of statistics is 0 for the thread " + thread_id + ". \n", flush=True)
return -1
row = [repr(assembly.original_module_ids), repr(new_assembly.original_module_ids),
stat_planning_reuse.mean(),
stat_solving_reuse.mean(), stat_planning.mean(), stat_solving.mean()]
print("Full success for the thread " + thread_id + ". \n", flush=True)
with lock:
with open(file_to_write, mode='a+', newline='') as file:
writer = csv.writer(file)
writer.writerow(row)
return 1
def get_modules(filename):
""" Loads modules from csv file. """
df = pd.read_csv(filename, header=None).values
result = []
for t in df:
res = t[0][1:-1].replace("'", "").replace(" ", "").split(",")
result.append(res)
del df
gc.collect()
return result
def create_task() -> Task:
""" Creates a task for experiment purposes."""
# define obstacle
box = Geometry.Box(dict(x=.3, y=.8, z=0.6), spatial.homogeneous([.7, .6, .31]))
tabletop = Geometry.Box(dict(x=3.5, y=1.0, z=.05))
legs = [Geometry.Box(dict(x=.07, y=.07, z=.5), spatial.homogeneous([.5 - 1 * (i % 2), .4 - .8 * (i // 2), -.25]))
for i in range(4)]
table = Geometry.ComposedGeometry((box, tabletop, *legs))
obstacle = Obstacle.Obstacle('123', collision=table, name='Table')
# define task
header = TaskHeader('own_defined_task_for_sa')
task = Task(header)
task.add_obstacle(obstacle)
return task
def main():
file_to_read = 'data_25_07/25_07_random20.csv'
file_to_write = "data_25_07/results_25_07_random_202.csv"
task_to_solve = create_task()
all_modules = get_modules(file_to_read)
cartesian_product_modules = list(itertools.product(all_modules, all_modules))
del all_modules
gc.collect()
with open(file_to_write, mode='a+', newline='') as file:
writer = csv.writer(file)
writer.writerow(
['assembly', 'assembly_reused', 'time planning reuse', 'time solving reuse', 'time planning scratch',
'time solving scratch'])
with ThreadPool(4) as pool:
m = Manager()
lock = m.Lock()
func = partial(process_pair, task=task_to_solve, lock=lock, file_to_write=file_to_write)
pool.map(func, list(cartesian_product_modules))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.ERROR)
ompl.util.setLogLevel(ompl.util.LogLevel.LOG_ERROR)
main()
Editor is loading...