Untitled

 avatar
unknown
python
3 years ago
13 kB
11
Indexable
import numpy as np
import torch
import random
import copy
import time
import os

class DQNAgent:

    """
    This is the main DQN class.

    It is self-contained, in the sense that only the environment must be initialized and provided outside of this class,
    in order to train and utilize Deep-Q-Learning.

    Parameters:

    actions_num = Amount of possible actions, also the size of the output layer.
    state_representation = A tensor representation of the environment state, will be used to initialize the size of the input layer of the NN
    layers_shape = Shape of the hidden layers, in the following format:
    (d,w), d = depth, w = width.
    memory_size = Number of experience tuples held in the experience memory
    batch_size = Size of each minibatch
    learning_rate = Learning rate used for backpropagation
    T_train = Timesteps for training the policy network
    T_target = Timesteps for synchronising the target network, only used in vanilla DQN
    gamma = The discount factor for calculating the target in the bellman equation
    epsilon_start = For use in the epsilon-greedy exploration/exploitation strategy
    epsilon_decay = Decay rate for epsilon per 1000 steps
    epsilon_final = Final value of epsilon
    env = An object containing the game environment
    load_Name = A tuple containing the name of the models' state dictionary in the format (tNetPath,pNetPath) - it must be stored in the same folder as the qTest.py file
    save_Name = A string containing the name of the models' state dictionary in the format (tNetPath,pNetPath) - it will be save in the same folder as q.Test.py


    """
    def __init__(self,
                 actions_num = 4,
                 state_representation = None,
                 layers_shape = (10,10),
                 activation_function = torch.nn.ReLU(),
                 memory_size = 100000,
                 batch_size = 32,
                 T_train = 4,
                 T_target = 100000,
                 gamma = 0.99,
                 epsilon_start = 1,
                 epsilon_decay = (0.9)/(1000),
                 epsilon_final = 0.1, #Based on the Van Hasselt DDQN article
                 reward_System = (-10,10*9,0,0),
                 learning_rate = 0.00025,
                 env = None,
                 load_Name = None,
                 save_Name = None):

        #Network features
        self.actions_num =actions_num
        self.state_representation = state_representation
        self.layers_shape = layers_shape
        self.activation_function = activation_function

        #Network Setup
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.load_NameTarget,self.load_NamePolicy = load_Name
        self.save_NameTarget,self.save_NamePolicy = save_Name
        self.initializeNeuralNetworks()

        if self.load_NamePolicy != None or self.load_NameTarget != None:
            self.loadNeuralNetowrks()



        # Training
        self.learning_rate = learning_rate
        self.T_train = T_train
        self.T_target = T_target
        self.optimizerT = torch.optim.RMSprop(lr=self.learning_rate,params=self.tNet.parameters(),momentum=0.95) #This is the optimizer mentioned in the papers
        self.lossFn = torch.nn.MSELoss(reduction="sum")

        #Experience Replay Memory
        self.memory_size = memory_size
        self.batch_size = batch_size
        self.EMR = ExperienceMemoryReplay(memory_size= self.memory_size, batch_size= self.batch_size) #EMR, it's short for ExperienceMemoryReplay ;)

        #Policy and Value Evaluation
        self.gamma = gamma
        self.epsilon_start = epsilon_start
        self.epsilon_decay = epsilon_decay
        self.epsilon_final = epsilon_final

        #Reward system
        self.die_Reward = reward_System[0]
        self.win_Reward = reward_System[1]
        self.stay_Reward = reward_System[2]
        self.survive_Reward = reward_System[3]

        #Incremental values
        self.t = 0
        self.epsilon = self.epsilon_start
        self.losses = []

        #Environment
        self.env = env


    def initializeNeuralNetworks(self):
        input_depth = len(self.state_representation.flatten()) #Depth of the input layer

        self.tNet = NeuralNetwork(layers_shape=self.layers_shape,output_depth=self.actions_num,input_depth=input_depth,activation_function=self.activation_function) #Target network
        self.pNet = copy.deepcopy(self.tNet) #Policy network

        self.tNet.stack.to(self.device)
        self.pNet.stack.to(self.device)

    def loadNeuralNetowrks(self):
        if self.load_NameTarget != None:
            try: #In case we wish to automate, but we haven't trained under that specific model name yet
                self.tNet = torch.load(self.load_NameTarget, map_location=self.device)
            except:
                print(f"Model named {self.load_NameTarget} could not be loaded; Will proceed with randomized weights")
        if self.load_NamePolicy != None:
            try:
                self.pNet = torch.load(self.load_NamePolicy, map_location=self.device)
            except:
                print(f"Model named {self.load_NamePolicy} could not be loaded; Will proceed with randomized weights")

    """For use with the frozenLake environment

    Arguments:
    env.desc = a NxN np.array
    env.s = A single integer representing the index at which the agent is

    Output: A one dimensional tensor representing the ASCII-char codes for each letter in the grid.
    This way we get a numerical representation of the board, which we can use to train the DQN.
    """
    def getState(self):
        rawState = self.env.desc.flatten()
        tensorState = []

        for x in rawState:
            tensorState = tensorState + [x[0]]

        tensorState.append(self.env.s)
        tensorState = torch.tensor(tensorState, dtype=torch.float32)
        return tensorState

    """
    Rewards:
    - Dying: -10 * NxN * gamma
    - Winning: 10* NxN * gamma
    - Surviving: 1

    We add the winning and dying bonus based on the tile size, as this will ensure, that the optimal policy will be to win the game in case we have big boards.
    
    """
    def evaluateReward(self,tuple):

        if tuple[2]:
            if tuple[1] == 0: #We die
                return self.die_Reward
            else: #We win
                return self.win_Reward #The input depth is the number of tiles on the board i.e NxN
        if self.S_t[-1] == tuple[0] or self.S_t[-1]:
            return self.stay_Reward
        else:
            return self.survive_Reward




    #Chooses between a greedy or exploratory action
    def greedVsExp(self):

        if self.epsilon>random.random(): #Choose exploratory
            return random.randint(0,self.actions_num-1)
        else: #Else choose greedy
            a = torch.argmax(self.pNet(self.S_t)).item()
            return a
    """This is the primary method for the agent to interact with its environment and collect experience.
    
    """
    def step(self):
        #1. Decay Epsilon
        if self.epsilon>self.epsilon_final:
            self.epsilon = self.epsilon - self.epsilon_decay

        #2. Choose between exploitation or exploration (Greedy or exploratory action)
        self.S_t = self.getState()
        a = self.greedVsExp()

        #3. Step in environment - we get a tuple in the format: (Position, R_t+1,Done?, Prob(S_t+1|a,S_t))
        stepTuple = self.env.step(a)
        reward = self.evaluateReward(stepTuple) #Increase the density of the rewards with a custom reward evaluator


        #4. Save transition in the EMR
        exp = [self.S_t,a,reward,self.getState(),stepTuple[2],stepTuple[3]["prob"]]
        self.EMR.storeExp(exp)

        #5. Increment timestep
        self.t = self.t + 1

        #6. If the game is done reset
        if stepTuple[2]:
            self.env.reset()

    def synchronize(self):
        self.tNet = copy.deepcopy(self.pNet)
        if self.save_NameTarget != None:
            torch.save(self.tNet,self.save_NameTarget)

    def optimizeDQN(self):
        #1. Get sample batch and reset optimizer
        batch = self.EMR.sample() #Returns a list of tuples in the format (S_t,a,R_t+1,S_t+1,Done?,Probability)
        self.optimizerT.zero_grad()

        #2. Calculate the targets and quality predictions
        target_Tensor = []
        prediction_Tensor = []

        for exp in batch:
            maxTerm = self.gamma * max(self.pNet.forward(exp[3]))
            target_Tensor = target_Tensor +  [exp[2] + maxTerm]
            prediction_Tensor = prediction_Tensor + [self.pNet.forward(exp[0])[exp[1]]]

        target_Tensor = torch.tensor(target_Tensor,dtype=torch.float32,requires_grad=True)
        prediction_Tensor = torch.tensor(prediction_Tensor, dtype=torch.float32, requires_grad=True)


        #3. Calculate loss and backpropagate
        loss = self.lossFn(prediction_Tensor, target_Tensor)
        loss.backward()
        self.optimizerT.step()
        self.losses = self.losses + [loss.item()]

        #4. Save the model
        if self.save_NameTarget != None:
            torch.save(self.pNet,self.save_NamePolicy)





    def optimizeDDQN(self):
        #1. Get sample batch, reset optimizer and decide how many transitions to train the policy network with
        batch = self.EMR.sample()
        self.optimizerT.zero_grad()

        target_Tensor = []
        policy_Tensor = []

        #2. Calculate policy and target
        for exp in batch: #exp = (S_t,a,R_t+1,S_t+1,Done?,Prob)
            max_a = torch.argmax(self.pNet.forward(exp[3])).item()
            target_Tensor = target_Tensor + [exp[2] + self.gamma * self.tNet.forward(exp[3])[max_a]]
            policy_Tensor = policy_Tensor +  [self.pNet.forward(exp[0])[exp[1]]]

        target_Tensor = torch.tensor(target_Tensor, dtype=torch.float32, requires_grad=True)
        policy_Tensor = torch.tensor(policy_Tensor,dtype=torch.float32,requires_grad=True)

        #3. Calculate the losses and backpropagate
        lossP = self.lossFn(policy_Tensor, target_Tensor)

        lossP.backward()

        self.optimizerP.step()

        self.losses = self.losses + [lossP.item()]

        #4. Save the models

        if self.save_NamePolicy != None:
            torch.save(self.tNet,self.save_NameTarget)

class ExperienceMemoryReplay:

    def __init__(self,
                 memory_size,
                 batch_size
                 ):

        self.memory_size = memory_size
        self.batch_size = batch_size

        self.memory = [] #Although the transitions are tuples, the memory is a list, as tuples are immutable

    """
    Stores experience tuples (transitions) in the EMR.
    
    If the EMR memory is full, it deletes the first tuple and appends the latest one to the end. 
    
    Arguments:
    
    transition: Tuple consisting of (S_t,a,R_t+1,S_t+1,Done?,P(S_t+1|a,S_t)
    """
    def storeExp(self,transition):

        if len(self.memory) > self.memory_size:
            del self.memory[0]

        self.memory = self.memory + [transition]

    def sample(self):
        return random.sample(self.memory,self.batch_size)

class NeuralNetwork(torch.nn.Module):
    def __init__(self,
                 layers_shape,
                 output_depth,
                 input_depth,
                 activation_function
                 ):
        super().__init__()


        #Model parameters
        self.depth = layers_shape[0]
        self.width = layers_shape[1]
        self.output_depth = output_depth
        self.input_depth = input_depth + 1 #For the grid + our position

        self.activation_function = activation_function
        self.makeModel()


    def makeModel(self):
        networkStack = []
        networkStack = networkStack + [(torch.nn.Linear(self.input_depth,self.depth))] #Input layer
        hiddenLayers = [self.activation_function,torch.nn.Linear(self.depth,self.depth)] * self.width # There is no non-linear function between the last hidden layer and the output
        networkStack = networkStack + hiddenLayers
        networkStack = networkStack + [(torch.nn.Linear(self.depth,self.output_depth))] #Output layer
        self.stack = torch.nn.Sequential(*networkStack)

    def forward(self,input):
        output = self.stack(input)
        return output





Editor is loading...