import os
import tensorflow as tf
from keras.optimizers import Adam
import tensorflow_probability as tfp
from networks import ActorCriticNetwork
import numpy as np
eps = np.finfo(np.float32).eps.item()
def get_expected_return(
rewards: tf.Tensor,
gamma: float,
standardize: bool = True) -> tf.Tensor:
"""Compute expected returns per timestep."""
n = tf.shape(rewards)[0]
i=n
if i>200:
i=i-200
rewards=rewards[i:]
n=n-i
returns = tf.TensorArray(dtype=tf.float32, size=n)
# Start from the end of `rewards` and accumulate reward sums
# into the `returns` array
rewards = tf.cast(rewards[::-1], dtype=tf.float32)
discounted_sum = tf.constant(0.0)
discounted_sum_shape = discounted_sum.shape
for i in tf.range(n):
reward = rewards[i]
discounted_sum = reward + gamma * discounted_sum
discounted_sum.set_shape(discounted_sum_shape)
returns = returns.write(i, discounted_sum)
returns = returns.stack()[::-1]
if standardize:
returns = ((returns - tf.math.reduce_mean(returns)) /
(tf.math.reduce_std(returns) + eps))
return returns
huber_loss = tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.SUM)
def compute_loss(
action_probs: tf.Tensor,
values: tf.Tensor,
returns: tf.Tensor) -> tf.Tensor:
"""Computes the combined Actor-Critic loss."""
advantage = returns - values
action_log_probs = tf.math.log(action_probs)
actor_loss = -tf.math.reduce_mean(action_log_probs * advantage)
tf.print(actor_loss)
#tf.print(actor_loss)
#critic_loss=tf.math.reduce_mean(tf.math.pow(advantage,2))
#tf.print(critic_loss)
critic_loss = huber_loss(values, returns)
return actor_loss + critic_loss
class Agent:
def __init__(self, alpha=0.0003, gamma=0.99, n_actions=2):
self.gamma = gamma
self.n_actions = n_actions
self.action = None
self.action_space = [i for i in range(self.n_actions)]
self.actor_critic = ActorCriticNetwork(n_actions=n_actions)
self.actor_critic.compile(optimizer=Adam(learning_rate=alpha))
def choose_action(self, observation):
state = tf.convert_to_tensor([observation])
_, probs = self.actor_critic(state)
tf.print(probs)
action_probabilities = tfp.distributions.Categorical(probs=probs)
action = action_probabilities.sample()
tf.print(action)
self.action = action
action=action.numpy()
if(action[0]==5):
print("Errore")
return action
def save_models(self, i):
print('... saving models ...')
checkpoint_file = os.path.join(self.actor_critic.checkpoint_file, str(i))
self.actor_critic.save_weights(checkpoint_file)
def load_models(self, i):
print('... loading models ...')
checkpoint_file = os.path.join(self.actor_critic.checkpoint_file, str(i))
self.actor_critic.load_weights(checkpoint_file)
def learn2(self, state, reward, state_, done):
state = tf.convert_to_tensor([state], dtype=tf.float32)
state_ = tf.convert_to_tensor([state_], dtype=tf.float32)
reward = tf.convert_to_tensor(reward, dtype=tf.float32)
with tf.GradientTape() as tape:
state_value, probs = self.actor_critic(state)
state_value_, action_probs_ = self.actor_critic(state_)
state_value = tf.squeeze(state_value)
#state_value_ = tf.squeeze(state_value_)
#inizio mio
returns = get_expected_return(reward, gamma = 0.99)
tf.print(returns)
tf.print(action_probs_)
tf.print(state_value_)
action_probs, values, returns = [tf.expand_dims(x, 1) for x in [action_probs_, state_value_, returns]]
total_loss = compute_loss(action_probs, values, returns)
#fine mio
# action_probs = tfp.distributions.Categorical(probs=probs)
# log_prob = action_probs.log_prob(self.action)
# delta = reward + self.gamma * state_value_ * (1-int(done)) - state_value
# actor_loss = -log_prob * delta
# critic_loss = delta**2
# total_loss = actor_loss + critic_loss
gradient = tape.gradient(total_loss, self.actor_critic.trainable_variables)
self.actor_critic.optimizer.apply_gradients(zip(gradient, self.actor_critic.trainable_variables))
def learn(self, state, reward, state_, done):
state = tf.convert_to_tensor([state], dtype=tf.float32)
state_ = tf.convert_to_tensor([state_], dtype=tf.float32)
reward = tf.convert_to_tensor(reward, dtype=tf.float32)
with tf.GradientTape() as tape:
state_value, probs = self.actor_critic(state)
state_value_, _ = self.actor_critic(state_)
state_value = tf.squeeze(state_value)
state_value_ = tf.squeeze(state_value_)
action_probs = tfp.distributions.Categorical(probs=probs)
log_prob = action_probs.log_prob(self.action)
delta = reward + self.gamma * state_value_ * (1-int(done)) - state_value
actor_loss = -log_prob * delta
critic_loss = delta**2
total_loss = actor_loss + critic_loss
gradient = tape.gradient(total_loss, self.actor_critic.trainable_variables)
self.actor_critic.optimizer.apply_gradients(zip(gradient, self.actor_critic.trainable_variables))