Untitled
unknown
python
4 years ago
13 kB
13
Indexable
import numpy as np
import torch
import random
import copy
import time
import os
class DQNAgent:
"""
This is the main DQN class.
It is self-contained, in the sense that only the environment must be initialized and provided outside of this class,
in order to train and utilize Deep-Q-Learning.
Parameters:
actions_num = Amount of possible actions, also the size of the output layer.
state_representation = A tensor representation of the environment state, will be used to initialize the size of the input layer of the NN
layers_shape = Shape of the hidden layers, in the following format:
(d,w), d = depth, w = width.
memory_size = Number of experience tuples held in the experience memory
batch_size = Size of each minibatch
learning_rate = Learning rate used for backpropagation
T_train = Timesteps for training the policy network
T_target = Timesteps for synchronising the target network, only used in vanilla DQN
gamma = The discount factor for calculating the target in the bellman equation
epsilon_start = For use in the epsilon-greedy exploration/exploitation strategy
epsilon_decay = Decay rate for epsilon per 1000 steps
epsilon_final = Final value of epsilon
env = An object containing the game environment
load_Name = A tuple containing the name of the models' state dictionary in the format (tNetPath,pNetPath) - it must be stored in the same folder as the qTest.py file
save_Name = A string containing the name of the models' state dictionary in the format (tNetPath,pNetPath) - it will be save in the same folder as q.Test.py
"""
def __init__(self,
actions_num = 4,
state_representation = None,
layers_shape = (10,10),
activation_function = torch.nn.ReLU(),
memory_size = 100000,
batch_size = 32,
T_train = 4,
T_target = 100000,
gamma = 0.99,
epsilon_start = 1,
epsilon_decay = (0.9)/(1000),
epsilon_final = 0.1, #Based on the Van Hasselt DDQN article
reward_System = (-10,10*9,0,0),
learning_rate = 0.00025,
env = None,
load_Name = None,
save_Name = None):
#Network features
self.actions_num =actions_num
self.state_representation = state_representation
self.layers_shape = layers_shape
self.activation_function = activation_function
#Network Setup
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.load_NameTarget,self.load_NamePolicy = load_Name
self.save_NameTarget,self.save_NamePolicy = save_Name
self.initializeNeuralNetworks()
if self.load_NamePolicy != None or self.load_NameTarget != None:
self.loadNeuralNetowrks()
# Training
self.learning_rate = learning_rate
self.T_train = T_train
self.T_target = T_target
self.optimizerT = torch.optim.RMSprop(lr=self.learning_rate,params=self.tNet.parameters(),momentum=0.95) #This is the optimizer mentioned in the papers
self.lossFn = torch.nn.MSELoss(reduction="sum")
#Experience Replay Memory
self.memory_size = memory_size
self.batch_size = batch_size
self.EMR = ExperienceMemoryReplay(memory_size= self.memory_size, batch_size= self.batch_size) #EMR, it's short for ExperienceMemoryReplay ;)
#Policy and Value Evaluation
self.gamma = gamma
self.epsilon_start = epsilon_start
self.epsilon_decay = epsilon_decay
self.epsilon_final = epsilon_final
#Reward system
self.die_Reward = reward_System[0]
self.win_Reward = reward_System[1]
self.stay_Reward = reward_System[2]
self.survive_Reward = reward_System[3]
#Incremental values
self.t = 0
self.epsilon = self.epsilon_start
self.losses = []
#Environment
self.env = env
def initializeNeuralNetworks(self):
input_depth = len(self.state_representation.flatten()) #Depth of the input layer
self.tNet = NeuralNetwork(layers_shape=self.layers_shape,output_depth=self.actions_num,input_depth=input_depth,activation_function=self.activation_function) #Target network
self.pNet = copy.deepcopy(self.tNet) #Policy network
self.tNet.stack.to(self.device)
self.pNet.stack.to(self.device)
def loadNeuralNetowrks(self):
if self.load_NameTarget != None:
try: #In case we wish to automate, but we haven't trained under that specific model name yet
self.tNet = torch.load(self.load_NameTarget, map_location=self.device)
except:
print(f"Model named {self.load_NameTarget} could not be loaded; Will proceed with randomized weights")
if self.load_NamePolicy != None:
try:
self.pNet = torch.load(self.load_NamePolicy, map_location=self.device)
except:
print(f"Model named {self.load_NamePolicy} could not be loaded; Will proceed with randomized weights")
"""For use with the frozenLake environment
Arguments:
env.desc = a NxN np.array
env.s = A single integer representing the index at which the agent is
Output: A one dimensional tensor representing the ASCII-char codes for each letter in the grid.
This way we get a numerical representation of the board, which we can use to train the DQN.
"""
def getState(self):
rawState = self.env.desc.flatten()
tensorState = []
for x in rawState:
tensorState = tensorState + [x[0]]
tensorState.append(self.env.s)
tensorState = torch.tensor(tensorState, dtype=torch.float32)
return tensorState
"""
Rewards:
- Dying: -10 * NxN * gamma
- Winning: 10* NxN * gamma
- Surviving: 1
We add the winning and dying bonus based on the tile size, as this will ensure, that the optimal policy will be to win the game in case we have big boards.
"""
def evaluateReward(self,tuple):
if tuple[2]:
if tuple[1] == 0: #We die
return self.die_Reward
else: #We win
return self.win_Reward #The input depth is the number of tiles on the board i.e NxN
if self.S_t[-1] == tuple[0] or self.S_t[-1]:
return self.stay_Reward
else:
return self.survive_Reward
#Chooses between a greedy or exploratory action
def greedVsExp(self):
if self.epsilon>random.random(): #Choose exploratory
return random.randint(0,self.actions_num-1)
else: #Else choose greedy
a = torch.argmax(self.pNet(self.S_t)).item()
return a
"""This is the primary method for the agent to interact with its environment and collect experience.
"""
def step(self):
#1. Decay Epsilon
if self.epsilon>self.epsilon_final:
self.epsilon = self.epsilon - self.epsilon_decay
#2. Choose between exploitation or exploration (Greedy or exploratory action)
self.S_t = self.getState()
a = self.greedVsExp()
#3. Step in environment - we get a tuple in the format: (Position, R_t+1,Done?, Prob(S_t+1|a,S_t))
stepTuple = self.env.step(a)
reward = self.evaluateReward(stepTuple) #Increase the density of the rewards with a custom reward evaluator
#4. Save transition in the EMR
exp = [self.S_t,a,reward,self.getState(),stepTuple[2],stepTuple[3]["prob"]]
self.EMR.storeExp(exp)
#5. Increment timestep
self.t = self.t + 1
#6. If the game is done reset
if stepTuple[2]:
self.env.reset()
def synchronize(self):
self.tNet = copy.deepcopy(self.pNet)
if self.save_NameTarget != None:
torch.save(self.tNet,self.save_NameTarget)
def optimizeDQN(self):
#1. Get sample batch and reset optimizer
batch = self.EMR.sample() #Returns a list of tuples in the format (S_t,a,R_t+1,S_t+1,Done?,Probability)
self.optimizerT.zero_grad()
#2. Calculate the targets and quality predictions
target_Tensor = []
prediction_Tensor = []
for exp in batch:
maxTerm = self.gamma * max(self.pNet.forward(exp[3]))
target_Tensor = target_Tensor + [exp[2] + maxTerm]
prediction_Tensor = prediction_Tensor + [self.pNet.forward(exp[0])[exp[1]]]
target_Tensor = torch.tensor(target_Tensor,dtype=torch.float32,requires_grad=True)
prediction_Tensor = torch.tensor(prediction_Tensor, dtype=torch.float32, requires_grad=True)
#3. Calculate loss and backpropagate
loss = self.lossFn(prediction_Tensor, target_Tensor)
loss.backward()
self.optimizerT.step()
self.losses = self.losses + [loss.item()]
#4. Save the model
if self.save_NameTarget != None:
torch.save(self.pNet,self.save_NamePolicy)
def optimizeDDQN(self):
#1. Get sample batch, reset optimizer and decide how many transitions to train the policy network with
batch = self.EMR.sample()
self.optimizerT.zero_grad()
target_Tensor = []
policy_Tensor = []
#2. Calculate policy and target
for exp in batch: #exp = (S_t,a,R_t+1,S_t+1,Done?,Prob)
max_a = torch.argmax(self.pNet.forward(exp[3])).item()
target_Tensor = target_Tensor + [exp[2] + self.gamma * self.tNet.forward(exp[3])[max_a]]
policy_Tensor = policy_Tensor + [self.pNet.forward(exp[0])[exp[1]]]
target_Tensor = torch.tensor(target_Tensor, dtype=torch.float32, requires_grad=True)
policy_Tensor = torch.tensor(policy_Tensor,dtype=torch.float32,requires_grad=True)
#3. Calculate the losses and backpropagate
lossP = self.lossFn(policy_Tensor, target_Tensor)
lossP.backward()
self.optimizerP.step()
self.losses = self.losses + [lossP.item()]
#4. Save the models
if self.save_NamePolicy != None:
torch.save(self.tNet,self.save_NameTarget)
class ExperienceMemoryReplay:
def __init__(self,
memory_size,
batch_size
):
self.memory_size = memory_size
self.batch_size = batch_size
self.memory = [] #Although the transitions are tuples, the memory is a list, as tuples are immutable
"""
Stores experience tuples (transitions) in the EMR.
If the EMR memory is full, it deletes the first tuple and appends the latest one to the end.
Arguments:
transition: Tuple consisting of (S_t,a,R_t+1,S_t+1,Done?,P(S_t+1|a,S_t)
"""
def storeExp(self,transition):
if len(self.memory) > self.memory_size:
del self.memory[0]
self.memory = self.memory + [transition]
def sample(self):
return random.sample(self.memory,self.batch_size)
class NeuralNetwork(torch.nn.Module):
def __init__(self,
layers_shape,
output_depth,
input_depth,
activation_function
):
super().__init__()
#Model parameters
self.depth = layers_shape[0]
self.width = layers_shape[1]
self.output_depth = output_depth
self.input_depth = input_depth + 1 #For the grid + our position
self.activation_function = activation_function
self.makeModel()
def makeModel(self):
networkStack = []
networkStack = networkStack + [(torch.nn.Linear(self.input_depth,self.depth))] #Input layer
hiddenLayers = [self.activation_function,torch.nn.Linear(self.depth,self.depth)] * self.width # There is no non-linear function between the last hidden layer and the output
networkStack = networkStack + hiddenLayers
networkStack = networkStack + [(torch.nn.Linear(self.depth,self.output_depth))] #Output layer
self.stack = torch.nn.Sequential(*networkStack)
def forward(self,input):
output = self.stack(input)
return output
Editor is loading...