Untitled
unknown
python
3 years ago
13 kB
11
Indexable
import numpy as np import torch import random import copy import time import os class DQNAgent: """ This is the main DQN class. It is self-contained, in the sense that only the environment must be initialized and provided outside of this class, in order to train and utilize Deep-Q-Learning. Parameters: actions_num = Amount of possible actions, also the size of the output layer. state_representation = A tensor representation of the environment state, will be used to initialize the size of the input layer of the NN layers_shape = Shape of the hidden layers, in the following format: (d,w), d = depth, w = width. memory_size = Number of experience tuples held in the experience memory batch_size = Size of each minibatch learning_rate = Learning rate used for backpropagation T_train = Timesteps for training the policy network T_target = Timesteps for synchronising the target network, only used in vanilla DQN gamma = The discount factor for calculating the target in the bellman equation epsilon_start = For use in the epsilon-greedy exploration/exploitation strategy epsilon_decay = Decay rate for epsilon per 1000 steps epsilon_final = Final value of epsilon env = An object containing the game environment load_Name = A tuple containing the name of the models' state dictionary in the format (tNetPath,pNetPath) - it must be stored in the same folder as the qTest.py file save_Name = A string containing the name of the models' state dictionary in the format (tNetPath,pNetPath) - it will be save in the same folder as q.Test.py """ def __init__(self, actions_num = 4, state_representation = None, layers_shape = (10,10), activation_function = torch.nn.ReLU(), memory_size = 100000, batch_size = 32, T_train = 4, T_target = 100000, gamma = 0.99, epsilon_start = 1, epsilon_decay = (0.9)/(1000), epsilon_final = 0.1, #Based on the Van Hasselt DDQN article reward_System = (-10,10*9,0,0), learning_rate = 0.00025, env = None, load_Name = None, save_Name = None): #Network features self.actions_num =actions_num self.state_representation = state_representation self.layers_shape = layers_shape self.activation_function = activation_function #Network Setup self.device = 'cuda' if torch.cuda.is_available() else 'cpu' self.load_NameTarget,self.load_NamePolicy = load_Name self.save_NameTarget,self.save_NamePolicy = save_Name self.initializeNeuralNetworks() if self.load_NamePolicy != None or self.load_NameTarget != None: self.loadNeuralNetowrks() # Training self.learning_rate = learning_rate self.T_train = T_train self.T_target = T_target self.optimizerT = torch.optim.RMSprop(lr=self.learning_rate,params=self.tNet.parameters(),momentum=0.95) #This is the optimizer mentioned in the papers self.lossFn = torch.nn.MSELoss(reduction="sum") #Experience Replay Memory self.memory_size = memory_size self.batch_size = batch_size self.EMR = ExperienceMemoryReplay(memory_size= self.memory_size, batch_size= self.batch_size) #EMR, it's short for ExperienceMemoryReplay ;) #Policy and Value Evaluation self.gamma = gamma self.epsilon_start = epsilon_start self.epsilon_decay = epsilon_decay self.epsilon_final = epsilon_final #Reward system self.die_Reward = reward_System[0] self.win_Reward = reward_System[1] self.stay_Reward = reward_System[2] self.survive_Reward = reward_System[3] #Incremental values self.t = 0 self.epsilon = self.epsilon_start self.losses = [] #Environment self.env = env def initializeNeuralNetworks(self): input_depth = len(self.state_representation.flatten()) #Depth of the input layer self.tNet = NeuralNetwork(layers_shape=self.layers_shape,output_depth=self.actions_num,input_depth=input_depth,activation_function=self.activation_function) #Target network self.pNet = copy.deepcopy(self.tNet) #Policy network self.tNet.stack.to(self.device) self.pNet.stack.to(self.device) def loadNeuralNetowrks(self): if self.load_NameTarget != None: try: #In case we wish to automate, but we haven't trained under that specific model name yet self.tNet = torch.load(self.load_NameTarget, map_location=self.device) except: print(f"Model named {self.load_NameTarget} could not be loaded; Will proceed with randomized weights") if self.load_NamePolicy != None: try: self.pNet = torch.load(self.load_NamePolicy, map_location=self.device) except: print(f"Model named {self.load_NamePolicy} could not be loaded; Will proceed with randomized weights") """For use with the frozenLake environment Arguments: env.desc = a NxN np.array env.s = A single integer representing the index at which the agent is Output: A one dimensional tensor representing the ASCII-char codes for each letter in the grid. This way we get a numerical representation of the board, which we can use to train the DQN. """ def getState(self): rawState = self.env.desc.flatten() tensorState = [] for x in rawState: tensorState = tensorState + [x[0]] tensorState.append(self.env.s) tensorState = torch.tensor(tensorState, dtype=torch.float32) return tensorState """ Rewards: - Dying: -10 * NxN * gamma - Winning: 10* NxN * gamma - Surviving: 1 We add the winning and dying bonus based on the tile size, as this will ensure, that the optimal policy will be to win the game in case we have big boards. """ def evaluateReward(self,tuple): if tuple[2]: if tuple[1] == 0: #We die return self.die_Reward else: #We win return self.win_Reward #The input depth is the number of tiles on the board i.e NxN if self.S_t[-1] == tuple[0] or self.S_t[-1]: return self.stay_Reward else: return self.survive_Reward #Chooses between a greedy or exploratory action def greedVsExp(self): if self.epsilon>random.random(): #Choose exploratory return random.randint(0,self.actions_num-1) else: #Else choose greedy a = torch.argmax(self.pNet(self.S_t)).item() return a """This is the primary method for the agent to interact with its environment and collect experience. """ def step(self): #1. Decay Epsilon if self.epsilon>self.epsilon_final: self.epsilon = self.epsilon - self.epsilon_decay #2. Choose between exploitation or exploration (Greedy or exploratory action) self.S_t = self.getState() a = self.greedVsExp() #3. Step in environment - we get a tuple in the format: (Position, R_t+1,Done?, Prob(S_t+1|a,S_t)) stepTuple = self.env.step(a) reward = self.evaluateReward(stepTuple) #Increase the density of the rewards with a custom reward evaluator #4. Save transition in the EMR exp = [self.S_t,a,reward,self.getState(),stepTuple[2],stepTuple[3]["prob"]] self.EMR.storeExp(exp) #5. Increment timestep self.t = self.t + 1 #6. If the game is done reset if stepTuple[2]: self.env.reset() def synchronize(self): self.tNet = copy.deepcopy(self.pNet) if self.save_NameTarget != None: torch.save(self.tNet,self.save_NameTarget) def optimizeDQN(self): #1. Get sample batch and reset optimizer batch = self.EMR.sample() #Returns a list of tuples in the format (S_t,a,R_t+1,S_t+1,Done?,Probability) self.optimizerT.zero_grad() #2. Calculate the targets and quality predictions target_Tensor = [] prediction_Tensor = [] for exp in batch: maxTerm = self.gamma * max(self.pNet.forward(exp[3])) target_Tensor = target_Tensor + [exp[2] + maxTerm] prediction_Tensor = prediction_Tensor + [self.pNet.forward(exp[0])[exp[1]]] target_Tensor = torch.tensor(target_Tensor,dtype=torch.float32,requires_grad=True) prediction_Tensor = torch.tensor(prediction_Tensor, dtype=torch.float32, requires_grad=True) #3. Calculate loss and backpropagate loss = self.lossFn(prediction_Tensor, target_Tensor) loss.backward() self.optimizerT.step() self.losses = self.losses + [loss.item()] #4. Save the model if self.save_NameTarget != None: torch.save(self.pNet,self.save_NamePolicy) def optimizeDDQN(self): #1. Get sample batch, reset optimizer and decide how many transitions to train the policy network with batch = self.EMR.sample() self.optimizerT.zero_grad() target_Tensor = [] policy_Tensor = [] #2. Calculate policy and target for exp in batch: #exp = (S_t,a,R_t+1,S_t+1,Done?,Prob) max_a = torch.argmax(self.pNet.forward(exp[3])).item() target_Tensor = target_Tensor + [exp[2] + self.gamma * self.tNet.forward(exp[3])[max_a]] policy_Tensor = policy_Tensor + [self.pNet.forward(exp[0])[exp[1]]] target_Tensor = torch.tensor(target_Tensor, dtype=torch.float32, requires_grad=True) policy_Tensor = torch.tensor(policy_Tensor,dtype=torch.float32,requires_grad=True) #3. Calculate the losses and backpropagate lossP = self.lossFn(policy_Tensor, target_Tensor) lossP.backward() self.optimizerP.step() self.losses = self.losses + [lossP.item()] #4. Save the models if self.save_NamePolicy != None: torch.save(self.tNet,self.save_NameTarget) class ExperienceMemoryReplay: def __init__(self, memory_size, batch_size ): self.memory_size = memory_size self.batch_size = batch_size self.memory = [] #Although the transitions are tuples, the memory is a list, as tuples are immutable """ Stores experience tuples (transitions) in the EMR. If the EMR memory is full, it deletes the first tuple and appends the latest one to the end. Arguments: transition: Tuple consisting of (S_t,a,R_t+1,S_t+1,Done?,P(S_t+1|a,S_t) """ def storeExp(self,transition): if len(self.memory) > self.memory_size: del self.memory[0] self.memory = self.memory + [transition] def sample(self): return random.sample(self.memory,self.batch_size) class NeuralNetwork(torch.nn.Module): def __init__(self, layers_shape, output_depth, input_depth, activation_function ): super().__init__() #Model parameters self.depth = layers_shape[0] self.width = layers_shape[1] self.output_depth = output_depth self.input_depth = input_depth + 1 #For the grid + our position self.activation_function = activation_function self.makeModel() def makeModel(self): networkStack = [] networkStack = networkStack + [(torch.nn.Linear(self.input_depth,self.depth))] #Input layer hiddenLayers = [self.activation_function,torch.nn.Linear(self.depth,self.depth)] * self.width # There is no non-linear function between the last hidden layer and the output networkStack = networkStack + hiddenLayers networkStack = networkStack + [(torch.nn.Linear(self.depth,self.output_depth))] #Output layer self.stack = torch.nn.Sequential(*networkStack) def forward(self,input): output = self.stack(input) return output
Editor is loading...