Untitled
unknown
plain_text
4 months ago
12 kB
3
Indexable
#!/usr/bin/env python3 import random import numpy as np from agent import Fish from communicator import Communicator from shared import SettingLoader class FishesModelling: def init_fishes(self, n): fishes = {} for i in range(n): fishes["fish" + str(i)] = Fish() self.fishes = fishes class PlayerController(SettingLoader, Communicator): def _init_(self): SettingLoader._init_(self) Communicator._init_(self) self.space_subdivisions = 10 self.actions = None self.action_list = None self.states = None self.init_state = None self.ind2state = None self.state2ind = None self.alpha = 0 self.gamma = 0 self.episode_max = 300 def init_states(self): ind2state = {} state2ind = {} count = 0 for row in range(self.space_subdivisions): for col in range(self.space_subdivisions): ind2state[(col, row)] = count state2ind[count] = [col, row] count += 1 self.ind2state = ind2state self.state2ind = state2ind def init_actions(self): self.actions = { "left": (-1, 0), "right": (1, 0), "down": (0, -1), "up": (0, 1) } self.action_list = list(self.actions.keys()) def allowed_movements(self): self.allowed_moves = {} for s in self.ind2state.keys(): self.allowed_moves[self.ind2state[s]] = [] if s[0] < self.space_subdivisions - 1: self.allowed_moves[self.ind2state[s]] += [1] if s[0] > 0: self.allowed_moves[self.ind2state[s]] += [0] if s[1] < self.space_subdivisions - 1: self.allowed_moves[self.ind2state[s]] += [3] if s[1] > 0: self.allowed_moves[self.ind2state[s]] += [2] def player_loop(self): pass class PlayerControllerHuman(PlayerController): def player_loop(self): """ Function that generates the loop of the game. In each iteration the human plays through the keyboard and send this to the game through the sender. Then it receives an update of the game through receiver, with this it computes the next movement. :return: """ while True: # send message to game that you are ready msg = self.receiver() if msg["game_over"]: return def epsilon_greedy(Q, state, all_actions, current_total_steps=0, epsilon_initial=1, epsilon_final=0.2, anneal_timesteps=10000, eps_type="constant"): if eps_type == 'constant': epsilon = epsilon_final # ADD YOUR CODE SNIPPET BETWEEN EX 4.1 # Implemenmt the epsilon-greedy algorithm for a constant epsilon value # Use epsilon and all input arguments of epsilon_greedy you see fit # It is recommended you use the np.random module action = None # ADD YOUR CODE SNIPPET BETWEEN EX 4.1 elif eps_type == 'linear': # ADD YOUR CODE SNIPPET BETWEENEX 4.2 # Implemenmt the epsilon-greedy algorithm for a linear epsilon value # Use epsilon and all input arguments of epsilon_greedy you see fit # use the ScheduleLinear class # It is recommended you use the np.random module action = None # ADD YOUR CODE SNIPPET BETWEENEX 4.2 else: raise "Epsilon greedy type unknown" return action class PlayerControllerRL(PlayerController, FishesModelling): def _init_(self): super()._init_() def player_loop(self): # send message to game that you are ready self.init_actions() self.init_states() self.alpha = self.settings.alpha self.gamma = self.settings.gamma self.epsilon_initial = self.settings.epsilon_initial self.epsilon_final = self.settings.epsilon_final self.annealing_timesteps = self.settings.annealing_timesteps self.threshold = self.settings.threshold self.episode_max = self.settings.episode_max q = self.q_learning() # compute policy policy = self.get_policy(q) # send policy msg = {"policy": policy, "exploration": False} self.sender(msg) msg = self.receiver() print("Q-learning returning") return def q_learning(self): ns = len(self.state2ind.keys()) na = len(self.actions.keys()) discount = self.gamma lr = self.alpha # initialization self.allowed_movements() # ADD YOUR CODE SNIPPET BETWEEN EX. 2.1 # Initialize a numpy array with ns state rows and na state columns with float values from 0.0 to 1.0. #Q = None Q = np.random.uniform(0, 1, (ns, na)) # ADD YOUR CODE SNIPPET BETWEEN EX. 2.1 for s in range(ns): list_pos = self.allowed_moves[s] for i in range(4): if i not in list_pos: Q[s, i] = np.nan Q_old = Q.copy() diff = float('inf') end_episode = False init_pos_tuple = self.settings.init_pos_diver init_pos = self.ind2state[(init_pos_tuple[0], init_pos_tuple[1])] episode = 0 R_total = 0 current_total_steps = 0 steps = 0 # ADD YOUR CODE SNIPPET BETWEEN EX. 2.3 # Change the while loop to incorporate a threshold limit, to stop training when the mean difference # in the Q table is lower than the threshold while episode <= self.episode_max: # ADD YOUR CODE SNIPPET BETWEENEX. 2.3 s_current = init_pos R_total = 0 steps = 0 while not end_episode: # selection of action list_pos = self.allowed_moves[s_current] # ADD YOUR CODE SNIPPET BETWEEN EX 2.1 and 2.2 # Chose an action from all possible actions action = np.nanargmax(Q[s_current, :]) # ADD YOUR CODE SNIPPET BETWEEN EX 2.1 and 2.2 # ADD YOUR CODE SNIPPET BETWEEN EX 5 # Use the epsilon greedy algorithm to retrieve an action # ADD YOUR CODE SNIPPET BETWEEN EX 5 # compute reward action_str = self.action_list[action] msg = {"action": action_str, "exploration": True} self.sender(msg) # wait response from game msg = self.receiver() R = msg["reward"] R_total += R s_next_tuple = msg["state"] end_episode = msg["end_episode"] s_next = self.ind2state[s_next_tuple] # ADD YOUR CODE SNIPPET BETWEEN EX. 2.2 # Implement the Bellman Update equation to update Q Q[s_current][action] += lr * (R + discount*np.nanmax(Q[s_next])-Q[s_current][action]) # ADD YOUR CODE SNIPPET BETWEEN EX. 2.2 s_current = s_next current_total_steps += 1 steps += 1 # ADD YOUR CODE SNIPPET BETWEEN EX. 2.3 # Compute the absolute value of the mean between the Q and Q-old diff = np.absolute(np.nanmean(Q-Q_old)) # ADD YOUR CODE SNIPPET BETWEEN EX. 2.3 Q_old[:] = Q print( "Episode: {}, Steps {}, Diff: {:6e}, Total Reward: {}, Total Steps {}" .format(episode, steps, diff, R_total, current_total_steps)) episode += 1 end_episode = False return Q def get_policy(self, Q): max_actions = np.nanargmax(Q, axis=1) policy = {} list_actions = list(self.actions.keys()) for n in self.state2ind.keys(): state_tuple = self.state2ind[n] policy[(state_tuple[0], state_tuple[1])] = list_actions[max_actions[n]] return policy class PlayerControllerRandom(PlayerController): def _init_(self): super()._init_() def player_loop(self): self.init_actions() self.init_states() self.allowed_movements() self.episode_max = self.settings.episode_max n = self.random_agent() # compute policy policy = self.get_policy(n) # send policy msg = {"policy": policy, "exploration": False} self.sender(msg) msg = self.receiver() print("Random Agent returning") return def random_agent(self): ns = len(self.state2ind.keys()) na = len(self.actions.keys()) init_pos_tuple = self.settings.init_pos_diver init_pos = self.ind2state[(init_pos_tuple[0], init_pos_tuple[1])] episode = 0 R_total = 0 steps = 0 current_total_steps = 0 end_episode = False # ADD YOUR CODE SNIPPET BETWEEN EX. 1.2 # Initialize a numpy array with ns state rows and na state columns with zeros # ADD YOUR CODE SNIPPET BETWEEN EX. 1.2 while episode <= self.episode_max: s_current = init_pos R_total = 0 steps = 0 while not end_episode: # all possible actions possible_actions = self.allowed_moves[s_current] # ADD YOUR CODE SNIPPET BETWEEN EX. 1.2 # Chose an action from all possible actions and add to the counter of actions per state action = None # ADD YOUR CODE SNIPPET BETWEEN EX. 1.2 action_str = self.action_list[action] msg = {"action": action_str, "exploration": True} self.sender(msg) # wait response from game msg = self.receiver() R = msg["reward"] s_next_tuple = msg["state"] end_episode = msg["end_episode"] s_next = self.ind2state[s_next_tuple] s_current = s_next R_total += R current_total_steps += 1 steps += 1 print("Episode: {}, Steps {}, Total Reward: {}, Total Steps {}". format(episode, steps, R_total, current_total_steps)) episode += 1 end_episode = False return n def get_policy(self, Q): nan_max_actions_proxy = [None for _ in range(len(Q))] for _ in range(len(Q)): try: nan_max_actions_proxy[] = np.nanargmax(Q[]) except: nan_max_actions_proxy[_] = np.random.choice([0, 1, 2, 3]) nan_max_actions_proxy = np.array(nan_max_actions_proxy) assert nan_max_actions_proxy.all() == nan_max_actions_proxy.all() policy = {} list_actions = list(self.actions.keys()) for n in self.state2ind.keys(): state_tuple = self.state2ind[n] policy[(state_tuple[0], state_tuple[1])] = list_actions[nan_max_actions_proxy[n]] return policy class ScheduleLinear(object): def _init_(self, schedule_timesteps, final_p, initial_p=1.0): self.schedule_timesteps = schedule_timesteps self.final_p = final_p self.initial_p = initial_p def value(self, t): # ADD YOUR CODE SNIPPET BETWEEN EX 4.2 # Return the annealed linear value return self.initial_p # ADD YOUR CODE SNIPPET BETWEEN EX 4.2
Editor is loading...
Leave a Comment