Untitled

 avatar
unknown
plain_text
2 years ago
5.9 kB
2
Indexable
import os 
import tensorflow as tf
from keras.optimizers import Adam 
import tensorflow_probability as tfp
from networks import ActorCriticNetwork
import numpy as np

eps = np.finfo(np.float32).eps.item()

def get_expected_return(
            rewards: tf.Tensor, 
            gamma: float, 
            standardize: bool = True) -> tf.Tensor:
        """Compute expected returns per timestep."""

        n = tf.shape(rewards)[0]
        i=n
        if i>200:
            i=i-200
            rewards=rewards[i:]
            n=n-i
        returns = tf.TensorArray(dtype=tf.float32, size=n)
        
        # Start from the end of `rewards` and accumulate reward sums
        # into the `returns` array
        rewards = tf.cast(rewards[::-1], dtype=tf.float32)
        

        discounted_sum = tf.constant(0.0)
        discounted_sum_shape = discounted_sum.shape
        for i in tf.range(n):
            reward = rewards[i]
            discounted_sum = reward + gamma * discounted_sum
            discounted_sum.set_shape(discounted_sum_shape)
            returns = returns.write(i, discounted_sum)
        returns = returns.stack()[::-1]

        if standardize:
            returns = ((returns - tf.math.reduce_mean(returns)) / 
                    (tf.math.reduce_std(returns) + eps))

        return returns


huber_loss = tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.SUM)

def compute_loss(
    action_probs: tf.Tensor,  
    values: tf.Tensor,  
    returns: tf.Tensor) -> tf.Tensor:
  """Computes the combined Actor-Critic loss."""

  advantage = returns - values

  action_log_probs = tf.math.log(action_probs)
  actor_loss = -tf.math.reduce_mean(action_log_probs * advantage)
  tf.print(actor_loss)
  #tf.print(actor_loss)
  #critic_loss=tf.math.reduce_mean(tf.math.pow(advantage,2))
  #tf.print(critic_loss)
  critic_loss = huber_loss(values, returns)

  return actor_loss + critic_loss


class Agent:
    def __init__(self, alpha=0.0003, gamma=0.99, n_actions=2):
        self.gamma = gamma 
        self.n_actions = n_actions 
        self.action = None
        self.action_space = [i for i in range(self.n_actions)]

        self.actor_critic = ActorCriticNetwork(n_actions=n_actions)

        self.actor_critic.compile(optimizer=Adam(learning_rate=alpha))


    def choose_action(self, observation):
        state = tf.convert_to_tensor([observation])
        _, probs = self.actor_critic(state)
        tf.print(probs)
        action_probabilities = tfp.distributions.Categorical(probs=probs)
        action = action_probabilities.sample()
        tf.print(action)
        self.action = action
        action=action.numpy()
        if(action[0]==5):
            print("Errore")
        return action
    
    
    def save_models(self, i):
        print('... saving models ...')
        checkpoint_file = os.path.join(self.actor_critic.checkpoint_file, str(i))
        self.actor_critic.save_weights(checkpoint_file)


    def load_models(self, i):
        print('... loading models ...')
        checkpoint_file = os.path.join(self.actor_critic.checkpoint_file, str(i))
        self.actor_critic.load_weights(checkpoint_file)

    
    
    def learn2(self, state, reward, state_, done):
        state = tf.convert_to_tensor([state], dtype=tf.float32)
        state_ = tf.convert_to_tensor([state_], dtype=tf.float32)
        reward = tf.convert_to_tensor(reward, dtype=tf.float32)
        
        with tf.GradientTape() as tape:
            

            state_value, probs = self.actor_critic(state)
            state_value_, action_probs_ = self.actor_critic(state_)

            state_value = tf.squeeze(state_value)
            #state_value_ = tf.squeeze(state_value_)

            #inizio mio
            returns = get_expected_return(reward, gamma = 0.99)
            tf.print(returns)
            tf.print(action_probs_)
            tf.print(state_value_)

            action_probs, values, returns = [tf.expand_dims(x, 1) for x in [action_probs_, state_value_, returns]] 

            total_loss = compute_loss(action_probs, values, returns)
            #fine mio

            # action_probs = tfp.distributions.Categorical(probs=probs)
            # log_prob = action_probs.log_prob(self.action)

            # delta = reward + self.gamma * state_value_ * (1-int(done)) - state_value 
            # actor_loss = -log_prob * delta
            # critic_loss = delta**2

            # total_loss = actor_loss + critic_loss 
        
        gradient = tape.gradient(total_loss, self.actor_critic.trainable_variables)
        self.actor_critic.optimizer.apply_gradients(zip(gradient, self.actor_critic.trainable_variables))


    def learn(self, state, reward, state_, done):
        state = tf.convert_to_tensor([state], dtype=tf.float32)
        state_ = tf.convert_to_tensor([state_], dtype=tf.float32)
        reward = tf.convert_to_tensor(reward, dtype=tf.float32)
        
        with tf.GradientTape() as tape:
            state_value, probs = self.actor_critic(state)
            state_value_, _ = self.actor_critic(state_)

            state_value = tf.squeeze(state_value)
            state_value_ = tf.squeeze(state_value_)

            action_probs = tfp.distributions.Categorical(probs=probs)
            log_prob = action_probs.log_prob(self.action)

            delta = reward + self.gamma * state_value_ * (1-int(done)) - state_value 
            actor_loss = -log_prob * delta
            critic_loss = delta**2

            total_loss = actor_loss + critic_loss 
        
        gradient = tape.gradient(total_loss, self.actor_critic.trainable_variables)
        self.actor_critic.optimizer.apply_gradients(zip(gradient, self.actor_critic.trainable_variables))