Untitled
unknown
python
4 years ago
17 kB
7
Indexable
import numpy as np
import matplotlib.pyplot as plt
import gym
import random
from gym import Env
class CarMDP(Env):
"""
Car MDP with simple stochastic dynamics.
The states are tuples with two elements:
- a position index (i, j)
- an integer from (0, 1, 2, 3) representing absolute orientation (see self.orientations in __init__ below)
For example, the state
s = (0, 1, 2)
represents the car in the cell with indices (0, 1) and oriented to face the South.
"""
def __init__(self, width, height, obstacles, goal_transition, initial_state, p_corr, base_reward=-0.01,
collision_reward=-5., goal_reward=10., stagnation_penalty=-0.01):
self.width = width
self.height = height
self.grid_map = np.ones((width, height))
for cell in obstacles:
self.grid_map[cell[0], cell[1]] = 0.
self.obstacles = obstacles
self.orientations = {0: 'North', 1: 'East', 2: 'South', 3: 'West'}
self.A = {0: 'Forward', 1: 'Left', 2: 'Right', 3: 'Brake'}
self.goal_transition = goal_transition # Tuple containing start and end state for the 'goal transition'
self.p_corr = p_corr
self.p_err = (1. - p_corr)/2.
self.base_reward = base_reward
self.collision_reward = collision_reward
self.goal_reward = goal_reward
self.stagnation_penalty = stagnation_penalty
self.state_history = []
self.action_history = []
self.reward_history = []
assert initial_state[0] >= 0 and initial_state[1] >= 0 and initial_state[0] < self.width and initial_state[1] < self.height and \
initial_state[2] in self.orientations, "ERROR: initial state {:} is not valid.".format(init_state)
self.state_history = [initial_state]
self.action_history = []
self.reward_history = []
self.init_state=initial_state
def reset(self):
self.state_history = [self.init_state]
self.action_history = []
self.reward_history = []
def is_collision(self, state):
is_out_of_bounds = state[0] < 0 or state[0] >= self.width or state[1] < 0 or \
state[1] >= self.height
return is_out_of_bounds or (state[0], state[1]) in self.obstacles
def transition_dynamics(self, state, action):
assert not self.is_collision(state), "ERROR: can't take an action from a non-state."
delta = 1
orientation = state[2]
if self.orientations[orientation] == 'North':
left = (state[0] - delta, state[1] - delta)
forward = (state[0], state[1] - delta)
right = (state[0] + delta, state[1] - delta)
elif self.orientations[orientation] == 'West':
left = (state[0] - delta, state[1] + delta)
forward = (state[0] - delta, state[1])
right = (state[0] - delta, state[1] - delta)
elif self.orientations[orientation] == 'South':
left = (state[0] + delta, state[1] + delta)
forward = (state[0], state[1] + delta)
right = (state[0] - delta, state[1] + delta)
elif self.orientations[orientation] == 'East':
left = (state[0] + delta, state[1] - delta)
forward = (state[0] + delta, state[1])
right = (state[0] + delta, state[1] + delta)
# p gives categorical distribution over (state, left, forward, right)
if self.A[action] == 'Forward':
p = np.array([0., self.p_err, self.p_corr, self.p_err])
elif self.A[action] == 'Right':
p = np.array([0., 0., 2.*self.p_err, self.p_corr])
elif self.A[action] == 'Left':
p = np.array([0., self.p_corr, 2. * self.p_err, 0.])
elif self.A[action] == 'Brake':
p = np.array([self.p_corr, 0., 2. * self.p_err, 0.])
candidate_next_state_positions = (state, left, forward, right)
next_state_position = candidate_next_state_positions[categorical_sample_index(p)]
# Handle orientation dynamics (deterministic)
new_orientation = orientation
if self.A[action] == 'Right':
new_orientation = (orientation + 1) % 4
elif self.A[action] == 'Left':
new_orientation = (orientation - 1) % 4
return next_state_position[0], next_state_position[1], new_orientation
def step(self, action):
assert action in self.A, f"ERROR: action {action} not permitted"
terminal = False
current_state = self.state_history[-1] # -1 means the current element
next_state = self.transition_dynamics(current_state, action)
if self.is_collision(next_state):
reward = self.collision_reward
terminal = True
elif (current_state[0], current_state[1]) == self.goal_transition[0] and \
(next_state[0], next_state[1]) == self.goal_transition[1]:
reward = self.goal_reward
terminal = True # TODO: allow multiple laps like this?
elif current_state == next_state:
reward = self.stagnation_penalty
terminal = False
else:
reward = self.base_reward
terminal = False
self.state_history.append(next_state)
self.reward_history.append(reward)
self.action_history.append(action)
return next_state, reward, terminal, []
def render(self, title):
self._plot_history(title)
def _plot_history(self, title):
"""
Plot the MDP's trajectory on the grid map.
:param title:
:return:
"""
fig = plt.figure()
plt.imshow(self.grid_map.T, cmap='gray')
plt.grid()
x = np.zeros(len(self.state_history))
y = np.zeros(x.shape)
for idx in range(len(x)):
x[idx] = self.state_history[idx][0]
y[idx] = self.state_history[idx][1]
if self.state_history[idx][2] == 0:
plt.arrow(x[idx], y[idx], 0., -0.25, width=0.1)
elif self.state_history[idx][2] == 1:
plt.arrow(x[idx], y[idx], 0.25, 0., width=0.1)
elif self.state_history[idx][2] == 2:
plt.arrow(x[idx], y[idx], 0., 0.25, width=0.1)
else:
plt.arrow(x[idx], y[idx], -0.25, 0., width=0.1)
plt.plot(x, y, 'b-') # Plot trajectory
plt.xlim([-0.5, self.width + 0.5])
plt.ylim([self.height + 0.5, -0.5])
plt.title(title)
plt.xlabel('x')
plt.ylabel('y')
plt.show()
return fig
def categorical_sample_index(p: np.ndarray) -> int:
"""
Sample a categorical distribution.
:param p: a categorical distribution's probability mass function (i.e., p[idx] is the probability of this function
returning idx for an integer 0 <= idx < len(p)). I.e., np.sum(p) == 1 and p[idx] >= 0 for 0<=idx<len(p).
:return: index of a sample weighted by the categorical distribution described by p
"""
P = np.cumsum(p)
sample = np.random.rand()
return np.argmax(P > sample)
class ReinforcementLearningAgent:
"""
Your implementation of a reinforcement learning agent.
Feel free to add additional methods and attributes.
"""
def __init__(self,steps):
### STUDENT CODE GOES HERE
# Set any parameters
# You can add arguments to __init__, so log as they have default values (e.g., epsilon=0.1)
self.time = 0 # keep track of the total time
self.timeWeight = 1e-5
self.exp_rate = 0.05
self.epsilon = 0
self.alpha = 0.05
self.gamma = 1
self.env = car_mdp
self.num_states = self.env.width * self.env.height * 4
self.num_actions = 4
self.states = {}
self.steps=steps
self.indexes = {}
i=0
for x in range(-1,self.env.width+1):
for y in range(-1,self.env.height+1):
for o in range(4):
self.states[i] = [x,y,o]
self.indexes[x,y,o] = i
i+=1
self.Q = np.zeros((i, self.num_actions))
self.model = {}
def reset(self, init_state) -> int:
"""
Called at the start of each episode.
:param init_state:
:return: first action to take.
"""
### STUDENT CODE GOES HERE
# self.time= 0
# self.model = {}
return 0 # Random policy (CHANGE THIS)
def next_action(self, reward: float, state: int, terminal: bool, past_action : int) -> int:
"""
Called during each time step of a reinforcement learning episode
:param reward: reward resulting from the last time step's action
:param state: state resulting from the last time step's action
:param terminal: bool indicating whether state is a terminal state
:return: next action to take
"""
### STUDENT CODE GOES HERE
try:
past_state = self.indexes[self.env.state_history[-2]]
except:
past_state = self.indexes[self.env.initial_state]
if not terminal:
state = self.indexes[state]
if np.random.uniform(0, 1) < self.epsilon:
action = np.random.randint(4)
else:
action = np.argmax(self.Q[state, :])
self.Q[past_state, past_action] \
= self.Q[past_state, past_action] + self.alpha * (reward + self.gamma * self.Q[state, action] - self.Q[past_state, past_action])
planningStep = past_state, state, past_action, reward
self.time+=1
self.planning( planningStep)
for i in range(self.steps):
self.model_learning()
return action
else:
state = self.indexes[state]
self.Q[past_state, past_action] = self.Q[past_state, past_action] + reward
planningStep = past_state, state, past_action, reward
self.time+=1
self.planning( planningStep)
for i in range(self.steps):
self.model_learning()
# print(reward)
# Produce the next action to take in an episode as a function of the observed reward and state
# You may find it useful to track past actions, states, and rewards
# Additionally, algorithms that learn during an episode (e.g., temporal difference) may find use for this method
# return np.random.randint(4) # Random policy (CHANGE THIS)
def planning(self, planningStep):
### STUDENT CODE GOES HERE
# Set any parameters
# You can add other arguments to planning
state, nxtState, action, reward= planningStep
if state not in self.model.keys():
self.model[state] = {}
for a in range(4):
# # the initial model for such actions was that they would
# # lead back to the same state with a reward of 0.
if a not in self.model[state].keys():
self.model[state][a] = (0, state, 1)
self.model[state][action] = (reward, nxtState, self.time)
def model_learning(self):
### STUDENT CODE GOES HERE
# Set any parameters
# You can add other arguments to model_learning
rand_idx = np.random.choice(range(len(self.model.keys())))
_state = list(self.model)[rand_idx]
# randomly choose an action
rand_idx = np.random.choice(range(len(self.model[_state].keys())))
_action = list(self.model[_state])[rand_idx]
_reward, _nxtState, _time = self.model[_state][_action]
# update _reward
_reward += self.timeWeight * np.sqrt(abs(self.time - _time) )
if np.random.uniform(0, 1) < self.epsilon:
action = np.random.randint(4)
else:
action = np.argmax(self.Q[_nxtState, :])
self.Q[_state,_action] += self.alpha * (_reward + self.gamma *(self.Q[_nxtState][action])- self.Q[_state,_action])
def finish_episode(self):
"""
Called at the end of each episode.
:return: nothing
"""
### STUDENT CODE GOES HERE
# Algorithms that learn from an entire episode (e.g., Monte Carlo) may find a use for this method
def test_rl_algorithm(rl_agent, car_mdp, initial_state, n_episodes=10000, n_plot=np.inf):
"""
Code that will be used to test your implementation of ReinforcementLearningAgent.
As you can see, you are responsible for implementing three methods in ReinforcementLearningAgent:
- reset (called at the start of every episode)
- next_action (called at every time step of an episode)
- finish_episode (called at the end of each episode)
:param rl_agent: an instance of your ReinforcementLearningAgent class
:param car_mdp: an instance of CarMDP
:param init_state: the initial state
:param n_episodes: number of episodes to use for this test
:param n_plot: display a plot every n_plot episodes
:return:
"""
returns = []
for episode in range(n_episodes):
G = 0. # Keep track of the returns for this episode (discount factor gamma=1)
# Re-initialize the MDP and the RL agent
car_mdp.reset();
action = rl_agent.reset(initial_state)
terminal = False
while not terminal: # Loop until a terminal state is reached
next_state, reward, terminal, [] = car_mdp.step(action)
G += reward
action = rl_agent.next_action(reward, next_state, terminal,action)
rl_agent.finish_episode()
returns += [G]
# Plot the trajectory every n_plot episodes
if episode % n_plot == 0 and episode > 0:
car_mdp.render('State History ' + str(episode + 1))
return returns
if __name__ == '__main__':
# Size of the CarMDP map (any cell outside of this rectangle is a terminal state)
width = 9
height = 6
initial_state = (0, 0, 2) # Top left corner (0, 0), facing "Down" (2)
obstacles = [(2, 2), (2, 3), (3, 2), (3, 3), # Cells filled with obstacles are terminal states
(5, 2), (5, 3), (6, 2), (6, 3),
(1, 1), (1, 2)]
goal_transition = ((1, 0), (0, 0)) # Transitioning from cell (1, 0) to cell (0, 0) terminates and gives a reward
p_corr = 0.95 # Probability of actions working as intended
# Create environment
car_mdp = CarMDP(width, height, obstacles, goal_transition, initial_state, p_corr=p_corr)
# Create RL agent. # You must complete this class in your solution, it is random policy right now
# the first agent (rl_agent) is just to track the agent to see if it is learning
# rl_agent = ReinforcementLearningAgent()
# returns = test_rl_algorithm(rl_agent, car_mdp, initial_state, n_episodes=10000, n_plot=1000)
# Example plot. You need to change it according to the assignment requirements.
n_runs = 1
n_episodes = 10000
k=[]
steps = [5,10]
for s in steps:
returns = np.zeros((n_runs, n_episodes))
for run in range(n_runs):
rl_agent = ReinforcementLearningAgent(s)
returns[run, :] = test_rl_algorithm(rl_agent, car_mdp, initial_state, n_episodes=n_episodes)
k.append(returns)
plt.figure()
for i in range(len(steps)):
returns= k[i]
# Plot one curve like this for each parameter setting - the template code for ReinforcementLearningAgent just
# returns a random action, so this example curve will just be noise. When your method is working, the mean return
# should increase as the number of episodes increases. Feel free to change the rolling average width
rolling_average_width = 100
# Compute the mean (over n_runs) for each episode
mean_return = np.mean(returns, axis=0)
# Compute the rolling average (over episodes) to smooth out the curve
rolling_average_mean_return = np.convolve(mean_return, np.ones(rolling_average_width), 'valid')/rolling_average_width
plt.plot(rolling_average_mean_return,label=str(steps[i]) ) # Plot the smoothed average return for each episode over n_runs
plt.grid()
plt.title('Learning Curve')
plt.xlabel('Episode')
plt.ylabel('Average Return')
plt.legend(["5","10"])
plt.show()
# rolling_average_mean_return2 = np.convolve(mean_return, np.ones(rolling_average_width), 'valid')/rolling_average_width
# plt.figure()
# plt.plot(returns[4:10:5,:])
# plt.legend(["n=5","n=10"])
# plt.grid()
# plt.title('Learning Curve fo n=5 ad n=10')
# plt.xlabel('Episode')
# plt.ylabel('Average Return')
Editor is loading...