Cassie
unknown
python
2 years ago
19 kB
3
Indexable
import numpy as np import gymnasium.utils as utils import mujoco as m import gymnasium as gym from gymnasium.envs.mujoco.mujoco_env import MujocoEnv from gymnasium.spaces import Box import torch import os import cv2 import ray from ray.rllib.agents.ppo import PPOTrainer #First we need to define the environment #The constants are defined here THETA_LEFT = 0.5 THETA_RIGHT = 0 MAX_STEPS = 300 OMEGA = 1 STEPS_IN_CYCLE= 30 a_swing = 0 b_swing = 0.5 a_stance = 0.5 b_stance = 1 FORWARD_QUARTERNIONS = np.array([1, 0, 0, 0]) KAPPA = 8 X_VEL = 0.5 Z_VEL = 0 c_swing_frc = -1 c_stance_frc = 0 c_swing_spd = 0 c_stance_spd = -1 #The camera configuration DEFAULT_CAMERA_CONFIG = { "trackbodyid": 0, # use the body id of Cassie "distance": 4.0, "lookat": np.array((0.0, 0.0, 0.85)), # adjust the height to match Cassie's height "elevation": -20.0, } #The environment class class CassieEnv(MujocoEnv): metadata = { "render_modes": [ "human", "rgb_array", "depth_array", ], "render_fps": 100, } def vm_cdf(self,x, mu, kappa, num_points=1000): """Computes the CDF of the von Mises distribution. Parameters: x (float or array-like): Value(s) at which to evaluate the CDF. mu (float): Mean parameter of the von Mises distribution. kappa (float): Concentration parameter of the von Mises distribution. num_points (int, optional): Number of points to use in the numerical integration. Default is 1000. Returns: float or array-like: The CDF of the von Mises distribution evaluated at `x`. """ def besseli0(x): """Approximation of the Bessel I0 function.""" ax = np.abs(x) if ax < 3.75: y = x / 3.75 y2 = y ** 2 return 1.0 + y2 * (3.5156229 + y2 * (3.0899424 + y2 * (1.2067492 + y2 * (0.2659732 + y2 * (0.0360768 + y2 * 0.0045813))))) else: y = 3.75 / ax return (np.exp(ax) / np.sqrt(ax)) * (0.39894228 + y * (0.01328592 + y * (0.00225319 + y * (-0.00157565 + y * (0.00916281 + y * (-0.02057706 + y * (0.02635537 + y * (-0.01647633 + y * 0.00392377)))))))) def integrand(t): """Integrand of the CDF.""" return np.exp(kappa * np.cos(t - mu)) / besseli0(kappa) if isinstance(x, (int, float)): # Compute the numerical integral using the trapezoidal rule xvals = np.linspace(-np.pi, x, num_points) yvals = integrand(xvals) integral = np.trapz(yvals, xvals) # Compute the normalization constant zvals = integrand(np.linspace(-np.pi, np.pi, num_points)) normalization = np.trapz(zvals, np.linspace(-np.pi, np.pi, num_points)) # Return the CDF return integral / normalization else: cdf_values = [] for xi in x: # Compute the numerical integral using the trapezoidal rule xvals = np.linspace(-np.pi, xi, num_points) yvals = integrand(xvals) integral = np.trapz(yvals, xvals) # Compute the normalization constant zvals = integrand(np.linspace(-np.pi, np.pi, num_points)) normalization = np.trapz(zvals, np.linspace(-np.pi, np.pi, num_points)) # Append the CDF value to the list cdf_values.append(integral / normalization) return np.array(cdf_values) def __init__(self,config, **kwargs): utils.EzPickle.__init__(self, config, **kwargs) self._forward_reward_weight = config.get("forward_reward_weight", 1.25) self._ctrl_cost_weight = config.get("ctrl_cost_weight", 0.1) self._healthy_reward = config.get("healthy_reward", 5.0) self._terminate_when_unhealthy = config.get("terminate_when_unhealthy", True) self._healthy_z_range = config.get("healthy_z_range", (0.5, 2.0)) actuator_ranges = { 'left-hip-roll': [-4.5, 4.5], 'left-hip-yaw': [-4.5, 4.5], 'left-hip-pitch': [-12.2, 12.2], 'left-knee': [-12.2, 12.2], 'left-foot': [-0.9, 0.9], 'right-hip-roll': [-4.5, 4.5], 'right-hip-yaw': [-4.5, 4.5], 'right-hip-pitch': [-12.2, 12.2], 'right-knee': [-12.2, 12.2], 'right-foot': [-0.9, 0.9] } # create the action space using the actuator ranges low = [actuator_ranges[key][0] for key in actuator_ranges.keys()] high = [actuator_ranges[key][1] for key in actuator_ranges.keys()] self.action_space = gym.spaces.Box(np.array(low), np.array(high), dtype=np.float32) self._reset_noise_scale = config.get("reset_noise_scale", 1e-2) self.phi = 0 self._exclude_current_positions_from_observation = config.get("exclude_current_positions_from_observation", True) self.steps =0 self.previous_action = np.zeros (10) observation_space = Box(low=-np.inf, high=np.inf, shape=(31,), dtype=np.float64) MujocoEnv.__init__(self, config.get("model_path","cassie.xml") ,20,render_mode=config.get("render_mode",None), observation_space=observation_space, **kwargs) #set the camera settings to match the DEFAULT_CAMERA_CONFIG we defined above @property def healthy_reward(self): return float(self.is_healthy or self._terminate_when_unhealthy) * self._healthy_reward @property def is_healthy(self): min_z, max_z = self._healthy_z_range is_healthy = min_z < self.data.qpos[2] < max_z return is_healthy @property def terminated(self): terminated = (not self.is_healthy) if (self._terminate_when_unhealthy or self.steps>MAX_STEPS) else False return terminated def _get_obs(self): '''The sensor data are the following left-foot-input [-2.20025499] left-foot-output [-0.0440051] left-hip-pitch-input [0.03050987] left-hip-roll-input [0.2013339] left-hip-yaw-input [0.30352121] left-knee-input [-12.57921603] left-shin-output [-0.00254359] left-tarsus-output [1.01621498] pelvis-angular-velocity [-0.66091646 0.09304743 -0.14707221] pelvis-linear-acceleration [ 0.18789681 -11.51133484 -0.64882624] pelvis-magnetometer [ 3.71266894e-04 -4.99997057e-01 -1.67494055e-03] pelvis-orientation [ 9.99998508e-01 -1.67501875e-03 2.04083784e-04 -3.70925603e-04] right-foot-input [-2.18139208] right-foot-output [-0.04362784] right-hip-pitch-input [0.00453772] right-hip-roll-input [0.12788968] right-hip-yaw-input [0.08293241] right-knee-input [-12.53914917] right-shin-output [-0.00094644] right-tarsus-output [1.01240756] ''' p =np.array ([np.sin((2*np.pi*(self.phi+THETA_LEFT))/STEPS_IN_CYCLE),np.sin((2*np.pi*(self.phi+THETA_RIGHT))/STEPS_IN_CYCLE)]) #getting the read positions of the sensors and concatenate the lists return np.concatenate([self.data.sensordata,p]) def get_pos(self): #Robot State qpos = self.data.qpos.flat.copy() qvel = self.data.qvel.flat.copy() #Desired velocity #Phase ratios and clock inputs #p = {sin(2pi(phi+theta_left)/L),sin(2pi(phi+theta_right)/L)} where L is the number of timesteps in each period p = (np.sin((2*np.pi*(self.phi+THETA_LEFT))/STEPS_IN_CYCLE),np.sin((2*np.pi*(self.phi+THETA_RIGHT))/STEPS_IN_CYCLE)) ''' Position [1], [2] -> Pelvis y, z [3], [4], [5], [6] -> Pelvis Orientation qw, qx, qy, qz [7], [8], [9] -> Left Hip Roll (Motor[0]), Yaw (Motor[1]), Pitch (Motor[2]) [14] -> Left Knee (Motor[3]) [15] -> Left Shin (Joint[0]) [16] -> Left Tarsus (Joint[1]) [20] -> Left Foot (Motor[4], Joint[2]) [21], [22], [23] -> Right Hip Roll (Motor[5]), Yaw (Motor[6]), Pitch (Motor[7]) [28] -> Rigt Knee (Motor[8]) [29] -> Rigt Shin (Joint[3]) [30] -> Rigt Tarsus (Joint[4]) [34] -> Rigt Foot (Motor[9], Joint[5]) ''' pos_index = np.array([1,2,3,4,5,6,7,8,9,14,15,16,20,21,22,23,28,29,30,34]) ''' Velocity [0], [1], [2] -> Pelvis x, y, z [3], [4], [5] -> Pelvis Orientation wx, wy, wz [6], [7], [8] -> Left Hip Roll (Motor[0]), Yaw (Motor[1]), Pitch (Motor[2]) [12] -> Left Knee (Motor[3]) [13] -> Left Shin (Joint[0]) [14] -> Left Tarsus (Joint[1]) [18] -> Left Foot (Motor[4], Joint[2]) [19], [20], [21] -> Right Hip Roll (Motor[5]), Yaw (Motor[6]), Pitch (Motor[7]) [25] -> Rigt Knee (Motor[8]) [26] -> Rigt Shin (Joint[3]) [27] -> Rigt Tarsus (Joint[4]) [31] -> Rigt Foot (Motor[9], Joint[5]) ''' vel_index = np.array([0,1,2,3,4,5,6,7,8,12,13,14,18,19,20,21,25,26,27,31]) return np.concatenate([qpos[pos_index], qvel[vel_index],[p[0],p[1]]]) def von_mises(a,kappa,phi ): vm = torch.distributions.von_mises(a,kappa) return vm.cdf(phi) #computes the reward def compute_reward(self,action): # Extract some proxies qpos = self.data.qpos.flat.copy() qvel = self.data.qvel.flat.copy() pos_index = np.array([1,2,3,4,5,6,7,8,9,14,15,16,20,21,22,23,28,29,30,34]) vel_index = np.array([0,1,2,3,4,5,6,7,8,12,13,14,18,19,20,21,25,26,27,31]) qpos = qpos[pos_index] qvel=qvel[vel_index] #Feet Contact Forces contact_force_right_foot = np.zeros(6) m.mj_contactForce(self.model,self.data,0,contact_force_right_foot) contact_force_left_foot = np.zeros(6) m.mj_contactForce(self.model,self.data,1,contact_force_left_foot) # Update previous position ######## Odometry xy reward ######## q_vx = 1-np.exp(-2*OMEGA*np.linalg.norm(np.array([qvel[0]]) - np.array([X_VEL]))**2) ################ ######## Odometry xy reward ######## q_vy = 1-np.exp(-2*OMEGA*np.linalg.norm(np.array([qvel[2]]) - np.array([Z_VEL]))**2) ################ q_left_frc = 1.0 - np.exp(-OMEGA * np.linalg.norm(contact_force_left_foot)**2/4000) q_right_frc = 1.0 - np.exp(-OMEGA * np.linalg.norm(contact_force_right_foot)**2/4000) q_left_spd = 1.0 - np.exp(-OMEGA * np.linalg.norm(qvel[12])**2) q_right_spd = 1.0 - np.exp(-OMEGA * np.linalg.norm(qvel[19])**2) q_action_diff = 1 - np.exp(-5*np.linalg.norm(action-self.previous_action)) q_orientation = 1 -np.exp(-3*(1-((self.data.sensor('pelvis-orientation').data.T)@(FORWARD_QUARTERNIONS))**2)) q_torque = 1 - np.exp(-0.05*np.linalg.norm(action)) q_pelvis_acc = 1 - np.exp(-0.10*(np.linalg.norm(self.data.sensor('pelvis-angular-velocity').data) + np.linalg.norm(self.data.sensor('pelvis-linear-acceleration').data))) I = lambda phi,a,b : self.vm_cdf(phi,a,KAPPA)*(1-self.vm_cdf(phi,b,KAPPA)) I_swing_frc = lambda phi : I(phi,a_swing,b_swing) I_swing_spd = lambda phi : I(phi, a_swing,b_swing) I_stance_spd = lambda phi : I(phi, a_stance,b_stance) I_stance_frc = lambda phi : I(phi, a_stance,b_stance) C_frc = lambda phi : c_swing_frc * I_swing_frc(phi) + c_stance_frc * I_stance_frc(phi) + c_stance_frc * I_stance_frc(phi) C_spd = lambda phi : c_swing_spd * I_swing_spd(phi) + c_stance_spd * I_stance_spd(phi) R_cmd = -1.0*q_vx-1.0*q_vy-1.0*q_orientation R_smooth = -1.0*q_action_diff - 1.0* q_torque - 1.0*q_pelvis_acc R_biped = 0 R_biped += C_frc(self.phi+THETA_LEFT) * q_left_frc R_biped += C_frc(self.phi+THETA_RIGHT) * q_right_frc R_biped += C_spd(self.phi+THETA_LEFT) * q_left_spd R_biped += C_spd(self.phi+THETA_RIGHT) * q_right_spd reward = 1.5 + 0.5 * R_biped + 0.375* R_cmd + 0.125* R_smooth #store all used values with their names in a dictionary self.rewards = { 'R_biped': R_biped, 'R_cmd': R_cmd, 'R_smooth': R_smooth, 'q_vx': q_vx, 'q_vy': q_vy, 'q_orientation': q_orientation, 'q_action_diff': q_action_diff, 'q_torque': q_torque, 'q_pelvis_acc': q_pelvis_acc, 'q_left_frc': q_left_frc, 'q_right_frc': q_right_frc, 'q_left_spd': q_left_spd, 'q_right_spd': q_right_spd, 'reward': reward, 'contact_force_left_foot': np.linalg.norm(contact_force_left_foot)**2/4000 } return reward #step in time def step(self, action): #clip the action to the ranges in action_space action = np.clip(action, self.action_space.low, self.action_space.high) self.do_simulation(action, self.frame_skip) observation = self._get_obs() reward = self.compute_reward(action) terminated = self.terminated self.steps +=1 self.phi+= 1.0/STEPS_IN_CYCLE self.phi = self.phi % 1 if self.render_mode == "human": self.render() self.previous_action = action return observation, reward, terminated, False, {} #resets the simulation def reset_model(self): m.mj_inverse(self.model, self.data) noise_low = -self._reset_noise_scale noise_high = self._reset_noise_scale self.previous_action = np.zeros (10) self.phi = 0 self.steps = 0 qpos = self.init_qpos + self.np_random.uniform(low=noise_low, high=noise_high, size=self.model.nq) qvel = self.init_qvel + self.np_random.uniform(low=noise_low, high=noise_high, size=self.model.nv) self.set_state(qpos, qvel) observation = self._get_obs() return observation import os log_dir = "/home/ajvendetta/ray_results" sim_dir = "./sim/" #load the trainer from the latest checkpoint if exists #get the full directory of latest modified directory in the log_dir if(os.path.exists(log_dir)): latest_log_directory = max([d for d in os.listdir(log_dir) if d.startswith("PPO_")], default=0) print(latest_log_directory) #get the latest directory in the latest log directory latest_directory = max([d.split("_")[-1] for d in os.listdir(os.path.join(log_dir, latest_log_directory)) if d.startswith("checkpoint")], default=0) #load the trainer from the latest checkpoint checkpoint_path = os.path.join(log_dir, latest_log_directory, "checkpoint_{}/".format(latest_directory, latest_directory)) print(checkpoint_path) #register the environment in rllib #import the necessary libraries to initialize ray and register_env from ray.tune.registry import register_env #initialize ray and choose the log directory #initialize ray and register the environment ray.init(ignore_reinit_error=True) register_env("cassie-v0", lambda config: CassieEnv(config)) config = { "framework": "torch", "log_level" : "WARN", "num_gpus": 1, "num_cpus": 19, "num_workers":6, "num_envs_per_worker": 1, "rollout_fragment_length": 300, "train_batch_size": 50000, "sgd_minibatch_size": 9000, "num_sgd_iter": 10, "optimizer": { "type": "Adam", "lr": 3e-4, "epsilon": 1e-5 }, "model": { "conv_filters": None, "fcnet_activation": "relu", "fcnet_hiddens": [256, 256], "use_lstm": False, "vf_share_layers": False, "free_log_std": True }, "entropy_coeff": 0.01, "gamma": 0.99, "lambda": 0.95, "kl_coeff": 0.5, "clip_param": 0.2, "vf_clip_param": 10.0, "grad_clip": 0.5, "kl_target": 0.01, "batch_mode": "truncate_episodes", "observation_filter": "NoFilter", "reuse_actors": True, "disable_env_checking" : True, "num_gpus_per_worker": 0.16, "num_cpus_per_worker": 1, "evaluation_config": { "explore": True, "evaluation_interval": 10, "evaluation_num_episodes": 20 } } import tensorflow as tf import tensorboard torch.cuda.empty_cache() temp = PPOTrainer(config, "cassie-v0") temp.restore(checkpoint_path) # Get policy weights policy_weights = temp.get_policy().get_weights() # Destroy temp temp.stop() trainer = PPOTrainer(config=config, env="cassie-v0") # Set the policy weights to the second trainer trainer.get_policy().set_weights(policy_weights) import cv2 import os # Training loop max_test_i = 0 checkpoint_frequency = 5 simulation_frequency = 10 env = CassieEnv({}) env.render_mode = "rgb_array" # Find the latest directory named test_i in the sim directory latest_directory = max([int(d.split("_")[-1]) for d in os.listdir(sim_dir) if d.startswith("test_")], default=0) max_test_i = latest_directory + 1 # Create folder for test test_dir = os.path.join(sim_dir, "test_{}".format(max_test_i)) os.makedirs(test_dir, exist_ok=True) # Define video codec and framerate fourcc = cv2.VideoWriter_fourcc(*"mp4v") fps = 30 # Set initial iteration count i = trainer.iteration if hasattr(trainer, "iteration") else 0 while True: # Train for one iteration result = trainer.train() i += 1 print("Episode Reward Mean for iteration {} is {}".format(i, result["episode_reward_mean"])) # Save model every 10 epochs if i % checkpoint_frequency == 0: checkpoint_path = trainer.save() print("Checkpoint saved at", checkpoint_path) # Run a test every 20 epochs if i % simulation_frequency == 0: # Run test video_path = os.path.join(test_dir, "sim_{}.mp4".format(i)) env.reset() obs = env.reset()[0] done = False frames = [] while not done: action = trainer.compute_single_action(obs) obs, _, done, _, _ = env.step(action) frame = env.render() frames.append(frame) # Save frames as video height, width, _ = frames[0].shape video_writer = cv2.VideoWriter(video_path, fourcc, fps, (width, height)) for frame in frames: video_writer.write(frame) video_writer.release() # Increment test index max_test_i += 1
Editor is loading...