Cassie
unknown
python
3 years ago
19 kB
6
Indexable
import numpy as np
import gymnasium.utils as utils
import mujoco as m
import gymnasium as gym
from gymnasium.envs.mujoco.mujoco_env import MujocoEnv
from gymnasium.spaces import Box
import torch
import os
import cv2
import ray
from ray.rllib.agents.ppo import PPOTrainer
#First we need to define the environment
#The constants are defined here
THETA_LEFT = 0.5
THETA_RIGHT = 0
MAX_STEPS = 300
OMEGA = 1
STEPS_IN_CYCLE= 30
a_swing = 0
b_swing = 0.5
a_stance = 0.5
b_stance = 1
FORWARD_QUARTERNIONS = np.array([1, 0, 0, 0])
KAPPA = 8
X_VEL = 0.5
Z_VEL = 0
c_swing_frc = -1
c_stance_frc = 0
c_swing_spd = 0
c_stance_spd = -1
#The camera configuration
DEFAULT_CAMERA_CONFIG = {
"trackbodyid": 0, # use the body id of Cassie
"distance": 4.0,
"lookat": np.array((0.0, 0.0, 0.85)), # adjust the height to match Cassie's height
"elevation": -20.0,
}
#The environment class
class CassieEnv(MujocoEnv):
metadata = {
"render_modes": [
"human",
"rgb_array",
"depth_array",
],
"render_fps": 100,
}
def vm_cdf(self,x, mu, kappa, num_points=1000):
"""Computes the CDF of the von Mises distribution.
Parameters:
x (float or array-like): Value(s) at which to evaluate the CDF.
mu (float): Mean parameter of the von Mises distribution.
kappa (float): Concentration parameter of the von Mises distribution.
num_points (int, optional): Number of points to use in the numerical integration. Default is 1000.
Returns:
float or array-like: The CDF of the von Mises distribution evaluated at `x`.
"""
def besseli0(x):
"""Approximation of the Bessel I0 function."""
ax = np.abs(x)
if ax < 3.75:
y = x / 3.75
y2 = y ** 2
return 1.0 + y2 * (3.5156229 + y2 * (3.0899424 + y2 * (1.2067492 + y2 * (0.2659732 + y2 * (0.0360768 + y2 * 0.0045813)))))
else:
y = 3.75 / ax
return (np.exp(ax) / np.sqrt(ax)) * (0.39894228 + y * (0.01328592 + y * (0.00225319 + y * (-0.00157565 + y * (0.00916281 + y * (-0.02057706 + y * (0.02635537 + y * (-0.01647633 + y * 0.00392377))))))))
def integrand(t):
"""Integrand of the CDF."""
return np.exp(kappa * np.cos(t - mu)) / besseli0(kappa)
if isinstance(x, (int, float)):
# Compute the numerical integral using the trapezoidal rule
xvals = np.linspace(-np.pi, x, num_points)
yvals = integrand(xvals)
integral = np.trapz(yvals, xvals)
# Compute the normalization constant
zvals = integrand(np.linspace(-np.pi, np.pi, num_points))
normalization = np.trapz(zvals, np.linspace(-np.pi, np.pi, num_points))
# Return the CDF
return integral / normalization
else:
cdf_values = []
for xi in x:
# Compute the numerical integral using the trapezoidal rule
xvals = np.linspace(-np.pi, xi, num_points)
yvals = integrand(xvals)
integral = np.trapz(yvals, xvals)
# Compute the normalization constant
zvals = integrand(np.linspace(-np.pi, np.pi, num_points))
normalization = np.trapz(zvals, np.linspace(-np.pi, np.pi, num_points))
# Append the CDF value to the list
cdf_values.append(integral / normalization)
return np.array(cdf_values)
def __init__(self,config, **kwargs):
utils.EzPickle.__init__(self, config, **kwargs)
self._forward_reward_weight = config.get("forward_reward_weight", 1.25)
self._ctrl_cost_weight = config.get("ctrl_cost_weight", 0.1)
self._healthy_reward = config.get("healthy_reward", 5.0)
self._terminate_when_unhealthy = config.get("terminate_when_unhealthy", True)
self._healthy_z_range = config.get("healthy_z_range", (0.5, 2.0))
actuator_ranges = {
'left-hip-roll': [-4.5, 4.5],
'left-hip-yaw': [-4.5, 4.5],
'left-hip-pitch': [-12.2, 12.2],
'left-knee': [-12.2, 12.2],
'left-foot': [-0.9, 0.9],
'right-hip-roll': [-4.5, 4.5],
'right-hip-yaw': [-4.5, 4.5],
'right-hip-pitch': [-12.2, 12.2],
'right-knee': [-12.2, 12.2],
'right-foot': [-0.9, 0.9]
}
# create the action space using the actuator ranges
low = [actuator_ranges[key][0] for key in actuator_ranges.keys()]
high = [actuator_ranges[key][1] for key in actuator_ranges.keys()]
self.action_space = gym.spaces.Box(np.array(low), np.array(high), dtype=np.float32)
self._reset_noise_scale = config.get("reset_noise_scale", 1e-2)
self.phi = 0
self._exclude_current_positions_from_observation = config.get("exclude_current_positions_from_observation", True)
self.steps =0
self.previous_action = np.zeros (10)
observation_space = Box(low=-np.inf, high=np.inf, shape=(31,), dtype=np.float64)
MujocoEnv.__init__(self, config.get("model_path","cassie.xml") ,20,render_mode=config.get("render_mode",None), observation_space=observation_space, **kwargs)
#set the camera settings to match the DEFAULT_CAMERA_CONFIG we defined above
@property
def healthy_reward(self):
return float(self.is_healthy or self._terminate_when_unhealthy) * self._healthy_reward
@property
def is_healthy(self):
min_z, max_z = self._healthy_z_range
is_healthy = min_z < self.data.qpos[2] < max_z
return is_healthy
@property
def terminated(self):
terminated = (not self.is_healthy) if (self._terminate_when_unhealthy or self.steps>MAX_STEPS) else False
return terminated
def _get_obs(self):
'''The sensor data are the following
left-foot-input [-2.20025499]
left-foot-output [-0.0440051]
left-hip-pitch-input [0.03050987]
left-hip-roll-input [0.2013339]
left-hip-yaw-input [0.30352121]
left-knee-input [-12.57921603]
left-shin-output [-0.00254359]
left-tarsus-output [1.01621498]
pelvis-angular-velocity [-0.66091646 0.09304743 -0.14707221]
pelvis-linear-acceleration [ 0.18789681 -11.51133484 -0.64882624]
pelvis-magnetometer [ 3.71266894e-04 -4.99997057e-01 -1.67494055e-03]
pelvis-orientation [ 9.99998508e-01 -1.67501875e-03 2.04083784e-04 -3.70925603e-04]
right-foot-input [-2.18139208]
right-foot-output [-0.04362784]
right-hip-pitch-input [0.00453772]
right-hip-roll-input [0.12788968]
right-hip-yaw-input [0.08293241]
right-knee-input [-12.53914917]
right-shin-output [-0.00094644]
right-tarsus-output [1.01240756]
'''
p =np.array ([np.sin((2*np.pi*(self.phi+THETA_LEFT))/STEPS_IN_CYCLE),np.sin((2*np.pi*(self.phi+THETA_RIGHT))/STEPS_IN_CYCLE)])
#getting the read positions of the sensors and concatenate the lists
return np.concatenate([self.data.sensordata,p])
def get_pos(self):
#Robot State
qpos = self.data.qpos.flat.copy()
qvel = self.data.qvel.flat.copy()
#Desired velocity
#Phase ratios and clock inputs
#p = {sin(2pi(phi+theta_left)/L),sin(2pi(phi+theta_right)/L)} where L is the number of timesteps in each period
p = (np.sin((2*np.pi*(self.phi+THETA_LEFT))/STEPS_IN_CYCLE),np.sin((2*np.pi*(self.phi+THETA_RIGHT))/STEPS_IN_CYCLE))
'''
Position [1], [2] -> Pelvis y, z
[3], [4], [5], [6] -> Pelvis Orientation qw, qx, qy, qz
[7], [8], [9] -> Left Hip Roll (Motor[0]), Yaw (Motor[1]), Pitch (Motor[2])
[14] -> Left Knee (Motor[3])
[15] -> Left Shin (Joint[0])
[16] -> Left Tarsus (Joint[1])
[20] -> Left Foot (Motor[4], Joint[2])
[21], [22], [23] -> Right Hip Roll (Motor[5]), Yaw (Motor[6]), Pitch (Motor[7])
[28] -> Rigt Knee (Motor[8])
[29] -> Rigt Shin (Joint[3])
[30] -> Rigt Tarsus (Joint[4])
[34] -> Rigt Foot (Motor[9], Joint[5])
'''
pos_index = np.array([1,2,3,4,5,6,7,8,9,14,15,16,20,21,22,23,28,29,30,34])
'''
Velocity [0], [1], [2] -> Pelvis x, y, z
[3], [4], [5] -> Pelvis Orientation wx, wy, wz
[6], [7], [8] -> Left Hip Roll (Motor[0]), Yaw (Motor[1]), Pitch (Motor[2])
[12] -> Left Knee (Motor[3])
[13] -> Left Shin (Joint[0])
[14] -> Left Tarsus (Joint[1])
[18] -> Left Foot (Motor[4], Joint[2])
[19], [20], [21] -> Right Hip Roll (Motor[5]), Yaw (Motor[6]), Pitch (Motor[7])
[25] -> Rigt Knee (Motor[8])
[26] -> Rigt Shin (Joint[3])
[27] -> Rigt Tarsus (Joint[4])
[31] -> Rigt Foot (Motor[9], Joint[5])
'''
vel_index = np.array([0,1,2,3,4,5,6,7,8,12,13,14,18,19,20,21,25,26,27,31])
return np.concatenate([qpos[pos_index], qvel[vel_index],[p[0],p[1]]])
def von_mises(a,kappa,phi ):
vm = torch.distributions.von_mises(a,kappa)
return vm.cdf(phi)
#computes the reward
def compute_reward(self,action):
# Extract some proxies
qpos = self.data.qpos.flat.copy()
qvel = self.data.qvel.flat.copy()
pos_index = np.array([1,2,3,4,5,6,7,8,9,14,15,16,20,21,22,23,28,29,30,34])
vel_index = np.array([0,1,2,3,4,5,6,7,8,12,13,14,18,19,20,21,25,26,27,31])
qpos = qpos[pos_index]
qvel=qvel[vel_index]
#Feet Contact Forces
contact_force_right_foot = np.zeros(6)
m.mj_contactForce(self.model,self.data,0,contact_force_right_foot)
contact_force_left_foot = np.zeros(6)
m.mj_contactForce(self.model,self.data,1,contact_force_left_foot)
# Update previous position
######## Odometry xy reward ########
q_vx = 1-np.exp(-2*OMEGA*np.linalg.norm(np.array([qvel[0]]) - np.array([X_VEL]))**2)
################
######## Odometry xy reward ########
q_vy = 1-np.exp(-2*OMEGA*np.linalg.norm(np.array([qvel[2]]) - np.array([Z_VEL]))**2)
################
q_left_frc = 1.0 - np.exp(-OMEGA * np.linalg.norm(contact_force_left_foot)**2/4000)
q_right_frc = 1.0 - np.exp(-OMEGA * np.linalg.norm(contact_force_right_foot)**2/4000)
q_left_spd = 1.0 - np.exp(-OMEGA * np.linalg.norm(qvel[12])**2)
q_right_spd = 1.0 - np.exp(-OMEGA * np.linalg.norm(qvel[19])**2)
q_action_diff = 1 - np.exp(-5*np.linalg.norm(action-self.previous_action))
q_orientation = 1 -np.exp(-3*(1-((self.data.sensor('pelvis-orientation').data.T)@(FORWARD_QUARTERNIONS))**2))
q_torque = 1 - np.exp(-0.05*np.linalg.norm(action))
q_pelvis_acc = 1 - np.exp(-0.10*(np.linalg.norm(self.data.sensor('pelvis-angular-velocity').data) + np.linalg.norm(self.data.sensor('pelvis-linear-acceleration').data)))
I = lambda phi,a,b : self.vm_cdf(phi,a,KAPPA)*(1-self.vm_cdf(phi,b,KAPPA))
I_swing_frc = lambda phi : I(phi,a_swing,b_swing)
I_swing_spd = lambda phi : I(phi, a_swing,b_swing)
I_stance_spd = lambda phi : I(phi, a_stance,b_stance)
I_stance_frc = lambda phi : I(phi, a_stance,b_stance)
C_frc = lambda phi : c_swing_frc * I_swing_frc(phi) + c_stance_frc * I_stance_frc(phi) + c_stance_frc * I_stance_frc(phi)
C_spd = lambda phi : c_swing_spd * I_swing_spd(phi) + c_stance_spd * I_stance_spd(phi)
R_cmd = -1.0*q_vx-1.0*q_vy-1.0*q_orientation
R_smooth = -1.0*q_action_diff - 1.0* q_torque - 1.0*q_pelvis_acc
R_biped = 0
R_biped += C_frc(self.phi+THETA_LEFT) * q_left_frc
R_biped += C_frc(self.phi+THETA_RIGHT) * q_right_frc
R_biped += C_spd(self.phi+THETA_LEFT) * q_left_spd
R_biped += C_spd(self.phi+THETA_RIGHT) * q_right_spd
reward = 1.5 + 0.5 * R_biped + 0.375* R_cmd + 0.125* R_smooth
#store all used values with their names in a dictionary
self.rewards = {
'R_biped': R_biped,
'R_cmd': R_cmd,
'R_smooth': R_smooth,
'q_vx': q_vx,
'q_vy': q_vy,
'q_orientation': q_orientation,
'q_action_diff': q_action_diff,
'q_torque': q_torque,
'q_pelvis_acc': q_pelvis_acc,
'q_left_frc': q_left_frc,
'q_right_frc': q_right_frc,
'q_left_spd': q_left_spd,
'q_right_spd': q_right_spd,
'reward': reward,
'contact_force_left_foot': np.linalg.norm(contact_force_left_foot)**2/4000
}
return reward
#step in time
def step(self, action):
#clip the action to the ranges in action_space
action = np.clip(action, self.action_space.low, self.action_space.high)
self.do_simulation(action, self.frame_skip)
observation = self._get_obs()
reward = self.compute_reward(action)
terminated = self.terminated
self.steps +=1
self.phi+= 1.0/STEPS_IN_CYCLE
self.phi = self.phi % 1
if self.render_mode == "human":
self.render()
self.previous_action = action
return observation, reward, terminated, False, {}
#resets the simulation
def reset_model(self):
m.mj_inverse(self.model, self.data)
noise_low = -self._reset_noise_scale
noise_high = self._reset_noise_scale
self.previous_action = np.zeros (10)
self.phi = 0
self.steps = 0
qpos = self.init_qpos + self.np_random.uniform(low=noise_low, high=noise_high, size=self.model.nq)
qvel = self.init_qvel + self.np_random.uniform(low=noise_low, high=noise_high, size=self.model.nv)
self.set_state(qpos, qvel)
observation = self._get_obs()
return observation
import os
log_dir = "/home/ajvendetta/ray_results"
sim_dir = "./sim/"
#load the trainer from the latest checkpoint if exists
#get the full directory of latest modified directory in the log_dir
if(os.path.exists(log_dir)):
latest_log_directory = max([d for d in os.listdir(log_dir) if d.startswith("PPO_")], default=0)
print(latest_log_directory)
#get the latest directory in the latest log directory
latest_directory = max([d.split("_")[-1] for d in os.listdir(os.path.join(log_dir, latest_log_directory)) if d.startswith("checkpoint")], default=0)
#load the trainer from the latest checkpoint
checkpoint_path = os.path.join(log_dir, latest_log_directory, "checkpoint_{}/".format(latest_directory, latest_directory))
print(checkpoint_path)
#register the environment in rllib
#import the necessary libraries to initialize ray and register_env
from ray.tune.registry import register_env
#initialize ray and choose the log directory
#initialize ray and register the environment
ray.init(ignore_reinit_error=True)
register_env("cassie-v0", lambda config: CassieEnv(config))
config = {
"framework": "torch",
"log_level" : "WARN",
"num_gpus": 1,
"num_cpus": 19,
"num_workers":6,
"num_envs_per_worker": 1,
"rollout_fragment_length": 300,
"train_batch_size": 50000,
"sgd_minibatch_size": 9000,
"num_sgd_iter": 10,
"optimizer": {
"type": "Adam",
"lr": 3e-4,
"epsilon": 1e-5
},
"model": {
"conv_filters": None,
"fcnet_activation": "relu",
"fcnet_hiddens": [256, 256],
"use_lstm": False,
"vf_share_layers": False,
"free_log_std": True
},
"entropy_coeff": 0.01,
"gamma": 0.99,
"lambda": 0.95,
"kl_coeff": 0.5,
"clip_param": 0.2,
"vf_clip_param": 10.0,
"grad_clip": 0.5,
"kl_target": 0.01,
"batch_mode": "truncate_episodes",
"observation_filter": "NoFilter",
"reuse_actors": True,
"disable_env_checking" : True,
"num_gpus_per_worker": 0.16,
"num_cpus_per_worker": 1,
"evaluation_config": {
"explore": True,
"evaluation_interval": 10,
"evaluation_num_episodes": 20
}
}
import tensorflow as tf
import tensorboard
torch.cuda.empty_cache()
temp = PPOTrainer(config, "cassie-v0")
temp.restore(checkpoint_path)
# Get policy weights
policy_weights = temp.get_policy().get_weights()
# Destroy temp
temp.stop()
trainer = PPOTrainer(config=config, env="cassie-v0")
# Set the policy weights to the second trainer
trainer.get_policy().set_weights(policy_weights)
import cv2
import os
# Training loop
max_test_i = 0
checkpoint_frequency = 5
simulation_frequency = 10
env = CassieEnv({})
env.render_mode = "rgb_array"
# Find the latest directory named test_i in the sim directory
latest_directory = max([int(d.split("_")[-1]) for d in os.listdir(sim_dir) if d.startswith("test_")], default=0)
max_test_i = latest_directory + 1
# Create folder for test
test_dir = os.path.join(sim_dir, "test_{}".format(max_test_i))
os.makedirs(test_dir, exist_ok=True)
# Define video codec and framerate
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
fps = 30
# Set initial iteration count
i = trainer.iteration if hasattr(trainer, "iteration") else 0
while True:
# Train for one iteration
result = trainer.train()
i += 1
print("Episode Reward Mean for iteration {} is {}".format(i, result["episode_reward_mean"]))
# Save model every 10 epochs
if i % checkpoint_frequency == 0:
checkpoint_path = trainer.save()
print("Checkpoint saved at", checkpoint_path)
# Run a test every 20 epochs
if i % simulation_frequency == 0:
# Run test
video_path = os.path.join(test_dir, "sim_{}.mp4".format(i))
env.reset()
obs = env.reset()[0]
done = False
frames = []
while not done:
action = trainer.compute_single_action(obs)
obs, _, done, _, _ = env.step(action)
frame = env.render()
frames.append(frame)
# Save frames as video
height, width, _ = frames[0].shape
video_writer = cv2.VideoWriter(video_path, fourcc, fps, (width, height))
for frame in frames:
video_writer.write(frame)
video_writer.release()
# Increment test index
max_test_i += 1
Editor is loading...