Untitled

 avatar
unknown
python
3 years ago
17 kB
5
Indexable
import numpy as np 
import matplotlib.pyplot as plt
import gym
import random

from gym import Env

class CarMDP(Env):
    """
    Car MDP with simple stochastic dynamics.
    The states are tuples with two elements:
        - a position index (i, j)
        - an integer from (0, 1, 2, 3) representing absolute orientation (see self.orientations in __init__ below)
    For example, the state
        s = (0, 1, 2)
    represents the car in the cell with indices (0, 1) and oriented to face the South.
    """
    def __init__(self, width, height, obstacles, goal_transition, initial_state, p_corr, base_reward=-0.01,
                 collision_reward=-5., goal_reward=10., stagnation_penalty=-0.01):
        self.width = width
        self.height = height
        self.grid_map = np.ones((width, height))
        for cell in obstacles:
            self.grid_map[cell[0], cell[1]] = 0.
        self.obstacles = obstacles
        self.orientations = {0: 'North', 1: 'East', 2: 'South', 3: 'West'}
        self.A = {0: 'Forward', 1: 'Left', 2: 'Right', 3: 'Brake'}
        self.goal_transition = goal_transition  # Tuple containing start and end state for the 'goal transition'

        self.p_corr = p_corr
        self.p_err = (1. - p_corr)/2.

        self.base_reward = base_reward
        self.collision_reward = collision_reward
        self.goal_reward = goal_reward
        self.stagnation_penalty = stagnation_penalty
        self.state_history = []
        self.action_history = []
        self.reward_history = []
        
        assert initial_state[0] >= 0 and initial_state[1] >= 0 and initial_state[0] < self.width and initial_state[1] < self.height and \
               initial_state[2] in self.orientations, "ERROR: initial state {:} is not valid.".format(init_state)
        self.state_history = [initial_state]
        self.action_history = []
        self.reward_history = []
        self.init_state=initial_state
    
       
    def reset(self):
        self.state_history = [self.init_state]
        self.action_history = []
        self.reward_history = []

    def is_collision(self, state):
        is_out_of_bounds = state[0] < 0 or state[0] >= self.width or state[1] < 0 or \
             state[1] >= self.height
        return is_out_of_bounds or (state[0], state[1]) in self.obstacles

    def transition_dynamics(self, state, action):
        assert not self.is_collision(state), "ERROR: can't take an action from a non-state."
        delta = 1
        orientation = state[2]

        if self.orientations[orientation] == 'North':
            left = (state[0] - delta, state[1] - delta)
            forward = (state[0], state[1] - delta)
            right = (state[0] + delta, state[1] - delta)
        elif self.orientations[orientation] == 'West':
            left = (state[0] - delta, state[1] + delta)
            forward = (state[0] - delta, state[1])
            right = (state[0] - delta, state[1] - delta)
        elif self.orientations[orientation] == 'South':
            left = (state[0] + delta, state[1] + delta)
            forward = (state[0], state[1] + delta)
            right = (state[0] - delta, state[1] + delta)
        elif self.orientations[orientation] == 'East':
            left = (state[0] + delta, state[1] - delta)
            forward = (state[0] + delta, state[1])
            right = (state[0] + delta, state[1] + delta)

        # p gives categorical distribution over (state, left, forward, right)
        if self.A[action] == 'Forward':
            p = np.array([0., self.p_err, self.p_corr, self.p_err])
        elif self.A[action] == 'Right':
            p = np.array([0., 0., 2.*self.p_err, self.p_corr])
        elif self.A[action] == 'Left':
            p = np.array([0., self.p_corr, 2. * self.p_err, 0.])
        elif self.A[action] == 'Brake':
            p = np.array([self.p_corr, 0., 2. * self.p_err, 0.])

        candidate_next_state_positions = (state, left, forward, right)
        next_state_position = candidate_next_state_positions[categorical_sample_index(p)]

        # Handle orientation dynamics (deterministic)
        new_orientation = orientation
        if self.A[action] == 'Right':
            new_orientation = (orientation + 1) % 4
        elif self.A[action] == 'Left':
            new_orientation = (orientation - 1) % 4

        return next_state_position[0], next_state_position[1], new_orientation

    
    def step(self, action):
        assert action in self.A, f"ERROR: action {action} not permitted"
        terminal = False
        current_state = self.state_history[-1] # -1 means the current element
        next_state = self.transition_dynamics(current_state, action)
        if self.is_collision(next_state):
            reward = self.collision_reward
            terminal = True
        elif (current_state[0], current_state[1]) == self.goal_transition[0] and \
                (next_state[0], next_state[1]) == self.goal_transition[1]:
            reward = self.goal_reward
            terminal = True  # TODO: allow multiple laps like this?
        elif current_state == next_state:
            reward = self.stagnation_penalty
            terminal = False
        else:
            reward = self.base_reward
            terminal = False

        self.state_history.append(next_state)
        self.reward_history.append(reward)
        self.action_history.append(action)

        return next_state, reward, terminal, []

    def render(self, title):
        self._plot_history(title)
    
    def _plot_history(self, title):
        """
        Plot the MDP's trajectory on the grid map.
        :param title:
        :return:
        """
        fig = plt.figure()
        plt.imshow(self.grid_map.T, cmap='gray')
        plt.grid()
        x = np.zeros(len(self.state_history))
        y = np.zeros(x.shape)
        for idx in range(len(x)):
            x[idx] = self.state_history[idx][0]
            y[idx] = self.state_history[idx][1]
            if self.state_history[idx][2] == 0:
                plt.arrow(x[idx], y[idx], 0., -0.25, width=0.1)
            elif self.state_history[idx][2] == 1:
                plt.arrow(x[idx], y[idx], 0.25, 0., width=0.1)
            elif self.state_history[idx][2] == 2:
                plt.arrow(x[idx], y[idx], 0., 0.25, width=0.1)
            else:
                plt.arrow(x[idx], y[idx], -0.25, 0., width=0.1)

        plt.plot(x, y, 'b-')  # Plot trajectory
        plt.xlim([-0.5, self.width + 0.5])
        plt.ylim([self.height + 0.5, -0.5])
        plt.title(title)
        plt.xlabel('x')
        plt.ylabel('y')
        plt.show()
        return fig


def categorical_sample_index(p: np.ndarray) -> int:
    """
    Sample a categorical distribution.

    :param p: a categorical distribution's probability mass function (i.e., p[idx] is the probability of this function
              returning idx for an integer 0 <= idx < len(p)). I.e., np.sum(p) == 1 and p[idx] >= 0 for 0<=idx<len(p).
    :return: index of a sample weighted by the categorical distribution described by p
    """
    P = np.cumsum(p)
    sample = np.random.rand()
    return np.argmax(P > sample)








class ReinforcementLearningAgent:
    """
    Your implementation of a reinforcement learning agent.
    Feel free to add additional methods and attributes.
    """
    def __init__(self,steps):
        ### STUDENT CODE GOES HERE
        # Set any parameters
        # You can add arguments to __init__, so log as they have default values (e.g., epsilon=0.1)
        self.time = 0  # keep track of the total time
        self.timeWeight = 1e-5
        self.exp_rate = 0.05
        self.epsilon = 0
        self.alpha = 0.05
        self.gamma = 1
        self.env = car_mdp
        self.num_states = self.env.width * self.env.height * 4 
        self.num_actions = 4
        self.states = {}
        self.steps=steps
        self.indexes = {}
        i=0
        for x in range(-1,self.env.width+1):
            for y in range(-1,self.env.height+1):
                for o in range(4):
                    self.states[i] = [x,y,o]
                    self.indexes[x,y,o] = i
                    i+=1
        self.Q = np.zeros((i, self.num_actions))
        self.model = {}

    def reset(self, init_state) -> int:
        """
        Called at the start of each episode.

        :param init_state:
        :return: first action to take.
        """
        ### STUDENT CODE GOES HERE
#         self.time= 0
#         self.model = {}
        return 0 # Random policy (CHANGE THIS)

    def next_action(self, reward: float, state: int, terminal: bool, past_action : int) -> int:
        """
        Called during each time step of a reinforcement learning episode

        :param reward: reward resulting from the last time step's action
        :param state: state resulting from the last time step's action
        :param terminal: bool indicating whether state is a terminal state
        :return: next action to take
        """
        ### STUDENT CODE GOES HERE

        try:
            past_state = self.indexes[self.env.state_history[-2]]

        except:
            past_state = self.indexes[self.env.initial_state]
        if not terminal:
            state = self.indexes[state]
            if np.random.uniform(0, 1) < self.epsilon:
                action = np.random.randint(4)
            else:
                action = np.argmax(self.Q[state, :])
                
                
            self.Q[past_state, past_action] \
            = self.Q[past_state, past_action] + self.alpha * (reward + self.gamma * self.Q[state, action] - self.Q[past_state, past_action])
            planningStep = past_state, state, past_action, reward
            self.time+=1
            self.planning( planningStep)
            for i in range(self.steps):  
                
                self.model_learning()
            return action
            
        else: 
            state = self.indexes[state]
            self.Q[past_state, past_action] = self.Q[past_state, past_action] + reward
            planningStep = past_state, state, past_action, reward
            self.time+=1
            self.planning( planningStep)
            for i in range(self.steps):  
                
                self.model_learning()

        
#             print(reward)
            
        # Produce the next action to take in an episode as a function of the observed reward and state
        # You may find it useful to track past actions, states, and rewards
        # Additionally, algorithms that learn during an episode (e.g., temporal difference) may find use for this method
#         return np.random.randint(4)  # Random policy (CHANGE THIS)
    def planning(self, planningStep):
        ### STUDENT CODE GOES HERE
        # Set any parameters
        # You can add other arguments to planning
        state, nxtState, action, reward= planningStep
        if state not in self.model.keys():
            self.model[state] = {}
        for a in range(4):
#             # the initial model for such actions was that they would
#             # lead back to the same state with a reward of 0.
            if a not in self.model[state].keys():
                self.model[state][a] = (0, state, 1)

        self.model[state][action] = (reward, nxtState, self.time)
    
    def model_learning(self):
        ### STUDENT CODE GOES HERE
        # Set any parameters
        # You can add other arguments to model_learning



        rand_idx = np.random.choice(range(len(self.model.keys())))
        _state = list(self.model)[rand_idx]
        # randomly choose an action
        rand_idx = np.random.choice(range(len(self.model[_state].keys())))
        _action = list(self.model[_state])[rand_idx]

        _reward, _nxtState, _time = self.model[_state][_action]
        # update _reward
        _reward += self.timeWeight * np.sqrt(abs(self.time - _time)  )
        if np.random.uniform(0, 1) < self.epsilon:
            action = np.random.randint(4)
        else:
            action = np.argmax(self.Q[_nxtState, :])

        self.Q[_state,_action] += self.alpha * (_reward + self.gamma *(self.Q[_nxtState][action])- self.Q[_state,_action])

    
    def finish_episode(self):
        """
        Called at the end of each episode.
        :return: nothing
        """
        ### STUDENT CODE GOES HERE
        # Algorithms that learn from an entire episode (e.g., Monte Carlo) may find a use for this method








def test_rl_algorithm(rl_agent, car_mdp, initial_state, n_episodes=10000, n_plot=np.inf):
    """
    Code that will be used to test your implementation of ReinforcementLearningAgent.
    As you can see, you are responsible for implementing three methods in ReinforcementLearningAgent:
        - reset (called at the start of every episode)
        - next_action (called at every time step of an episode)
        - finish_episode (called at the end of each episode)

    :param rl_agent: an instance of your ReinforcementLearningAgent class
    :param car_mdp: an instance of CarMDP
    :param init_state: the initial state
    :param n_episodes: number of episodes to use for this test
    :param n_plot: display a plot every n_plot episodes
    :return:
    """
    returns = []
    for episode in range(n_episodes):
        G = 0.  # Keep track of the returns for this episode (discount factor gamma=1)
        # Re-initialize the MDP and the RL agent
        car_mdp.reset();
        action = rl_agent.reset(initial_state)
        terminal = False
        while not terminal:  # Loop until a terminal state is reached
            next_state, reward, terminal, [] = car_mdp.step(action)
            G += reward
            action = rl_agent.next_action(reward, next_state, terminal,action)
        rl_agent.finish_episode()
        returns += [G]

        # Plot the trajectory every n_plot episodes
        if episode % n_plot == 0 and episode > 0:
            car_mdp.render('State History ' + str(episode + 1))

    return returns


if __name__ == '__main__':

    # Size of the CarMDP map (any cell outside of this rectangle is a terminal state)
    width = 9
    height = 6
    initial_state = (0, 0, 2)  # Top left corner (0, 0), facing "Down" (2)
    obstacles = [(2, 2), (2, 3), (3, 2), (3, 3),  # Cells filled with obstacles are terminal states
                 (5, 2), (5, 3), (6, 2), (6, 3),
                 (1, 1), (1, 2)]
    goal_transition = ((1, 0), (0, 0))  # Transitioning from cell (1, 0) to cell (0, 0) terminates and gives a reward
    p_corr = 0.95  # Probability of actions working as intended
    
  # Create environment
    car_mdp = CarMDP(width, height, obstacles, goal_transition, initial_state, p_corr=p_corr)
  
 # Create RL agent. # You must complete this class in your solution, it is random policy right now
 # the first agent (rl_agent) is just to track the agent to see if it is learning 
        
#     rl_agent = ReinforcementLearningAgent()  
    
   
#     returns = test_rl_algorithm(rl_agent, car_mdp, initial_state, n_episodes=10000, n_plot=1000)
    
    
    # Example plot. You need to change it according to the assignment requirements.   
    n_runs = 1
    n_episodes = 10000
    k=[]
    steps = [5,10]
    for s in steps:
        returns = np.zeros((n_runs, n_episodes))
        for run in range(n_runs):
            rl_agent = ReinforcementLearningAgent(s)
            returns[run, :] = test_rl_algorithm(rl_agent, car_mdp, initial_state, n_episodes=n_episodes)
        k.append(returns)
        
    plt.figure()
    for  i in range(len(steps)):
        returns= k[i]
    # Plot one curve like this for each parameter setting - the template code for ReinforcementLearningAgent just
    # returns a random action, so this example curve will just be noise. When your method is working, the mean return
    # should increase as the number of episodes increases. Feel free to change the rolling average width
        rolling_average_width = 100
        # Compute the mean (over n_runs) for each episode
        mean_return = np.mean(returns, axis=0)
        # Compute the rolling average (over episodes) to smooth out the curve
        rolling_average_mean_return = np.convolve(mean_return, np.ones(rolling_average_width), 'valid')/rolling_average_width
        
        plt.plot(rolling_average_mean_return,label=str(steps[i]) )  # Plot the smoothed average return for each episode over n_runs
    plt.grid()
    plt.title('Learning Curve')
    plt.xlabel('Episode')
    plt.ylabel('Average Return')
    plt.legend(["5","10"])
    plt.show()
    
#     rolling_average_mean_return2 = np.convolve(mean_return, np.ones(rolling_average_width), 'valid')/rolling_average_width
#     plt.figure()
#     plt.plot(returns[4:10:5,:])
#     plt.legend(["n=5","n=10"])
#     plt.grid()
#     plt.title('Learning Curve fo n=5 ad n=10')
#     plt.xlabel('Episode')
#     plt.ylabel('Average Return')

Editor is loading...