Untitled
unknown
plain_text
2 years ago
7.9 kB
14
Indexable
"""
Agents with different policies
"""
import os
import numpy as np
from abc import ABC, abstractmethod
import matplotlib.pyplot as plt
np.random.seed(0)
class IAgent(ABC):
'''
Agent Interface
'''
@abstractmethod
def update_Q(self, **kwargs):
'''
Update the Q
Q[a] : Estimate of reward for using Arm/Action 'a' (or VM in our case)
'''
pass
@abstractmethod
def get_action(self, **kwargs):
'''
Implementation of MultiArmBandit Action Selection Policy
'''
pass
class Agent_Epsilon(IAgent):
'''
Agent implementing Epsilon Policy for exploration-exploitation
'''
def __init__(self, nActions, eps):
self.nActions = nActions
self.eps = eps
self.n = np.zeros(nActions, dtype=np.int64) # action counts n(a)
self.Q = np.zeros(nActions, dtype=float) # value Q(a)
#def update_Q(self, action, reward):
def update_Q(self, **kwargs):
# Update Q action-value given (action, reward)
action = kwargs["action"]
reward = kwargs["reward"]
self.n[action] += 1
self.Q[action] += (1.0/self.n[action]) * (reward - self.Q[action])
def get_action(self, **kwargs):
# Epsilon-greedy policy
if np.random.random() < self.eps: # explore
return np.random.randint(self.nActions)
else: # exploit
return np.random.choice(np.flatnonzero(self.Q == self.Q.max()))
class Agent_UCB(IAgent):
'''
Agent implementing Upper Confidence bound (UCB) Policy
'''
def __init__(self, nActions, eps):
self.nActions = nActions
self.eps = eps
self.n = np.ones(nActions, dtype=np.int64) # action counts n(a)
self.Q = np.zeros(nActions, dtype=float) # value Q(a)
def update_Q(self, action, reward):
# Update Q action-value given (action, reward)
self.n[action] += 1
#self.Q[action] += (1.0/self.n[action]) * (reward - self.Q[action])
self.Q[action] += (1.0/self.n[action]) * (reward - self.Q[action])
#self.Q[action] = (1 - 1.0/self.n[action]) * self.Q[action] + (1.0/self.n[action]) * reward
def get_action(self, **kwargs):
# Upper-Confidence-Bound Policy
t = kwargs["timestep"]
#print(t)
#ucb_values = self.Q / self.n + self.eps * np.sqrt(np.log(t+1) / self.n)
ucb_values = self.Q+ self.eps * np.sqrt(np.log(t+1) / self.n)
#action = np.argmax(ucb_values)
values = np.asarray(ucb_values)
#print(t, ucb_values) )
#print(t, np.argmax(np.random.random(values.shape) * (values==values.max())))
return np.argmax(np.random.random(values.shape) * (values==values.max()))
class Environment:
def __init__(self, probs):
self.probs = probs # success probabilities for each arm
def _clamp(self,num, min_value, max_value):
return max(min(num, max_value), min_value) # make sure num is within min_value and max_value
def step(self, action):
# Pull arm and get stochastic reward, (sampled from a gaussian distribution and clammped between 0 and 1)
return self._clamp(np.random.normal(self.probs[action], 0.3, 1)[0], 0, 1)
# Start multi-armed bandit simulation
def experiment_with_epsilon_policy(probs, N_episodes):
env = Environment(probs) # initialize arm probabilities
agent = Agent_Epsilon(len(env.probs), eps) # initialize agent
actions, rewards = [], []
for episode in range(N_episodes):
action = agent.get_action() # sample policy
reward = env.step(action) # take step + get reward
agent.update_Q(action=action, reward=reward) # update Q
actions.append(action)
rewards.append(1-reward)
return np.array(actions), np.array(rewards)
# Start multi-armed bandit simulation
def experiment_with_UCB_policy(probs, N_episodes):
env = Environment(probs) # initialize arm probabilities
agent = Agent_UCB(len(env.probs), eps=2.0) # initialize agent
actions, rewards = [], []
for episode in range(N_episodes):
action = agent.get_action(timestep=episode) # sample policy
reward = env.step(action) # take step + get reward
agent.update_Q(action=action, reward=reward) # update Q
actions.append(action)
rewards.append(1-reward)
return np.array(actions), np.array(rewards)
if __name__== "__main__":
# Settings
# VM reliabilty is modeled as gaussian distribution
probs = [0.28, 0.95, 0.90, 0.85, 0.80, 0.75, 0.70, 0.65, 0.55, 0.50] # VM reliability mean, variance is assumed 0.3
N_experiments = 500 # number of experiments to perform
N_steps = 5000 # number of steps (episodes)
# Epsilon_Greedy Policy
eps = 0.01 # probability of random exploration (fraction)
save_fig = True # save file in same directory
output_dir = os.path.join(os.getcwd(), "simulation_output")
# Run multi-armed bandit experiments
print("Running multi-armed bandits with nActions = {}, eps = {}".format(len(probs), eps))
R1 = np.zeros((N_steps,)) # reward history sum
A1 = np.zeros((N_steps, len(probs))) # action history sum
R2 = np.zeros((N_steps,)) # reward history sum
A2 = np.zeros((N_steps, len(probs))) # action history sum
for i in range(N_experiments):
actions1, rewards1 = experiment_with_UCB_policy(probs, N_steps) # perform experiment
actions2, rewards2 = experiment_with_epsilon_policy(probs, N_steps) # perform experiment
#actions,rewards=actions1-actions2,rewards1-rewards2
if (i + 1) % (N_experiments / 100) == 0:
print("[Experiment {}/{}] ".format(i + 1, N_experiments) +
"n_time_steps = {}, ".format(N_steps) +
"reward_avg = {}".format(np.sum(rewards1) / len(rewards1)) +
"Max_reward = {}".format(np.max(rewards1)))
R1 += rewards1
R2 += rewards2
for j, a in enumerate(actions1):
A1[j][a] += 1
for j, a in enumerate(actions2):
A2[j][a] += 1
# Plot reward results
R_avg1 = R1 / float(N_experiments)
R_avg2 = R2 / float(N_experiments)
plt.plot(R_avg1, "-",label='UCB policy')
plt.plot(R_avg2, "--",label='Epsilon G policy')
plt.xlabel("Time Step")
plt.ylabel("Average Reward")
plt.legend(loc='upper left', shadow=True)
plt.grid()
ax = plt.gca()
plt.xlim([1, N_steps])
if save_fig:
if not os.path.exists(output_dir): os.mkdir(output_dir)
plt.savefig(os.path.join(output_dir, "rewards.png"), bbox_inches="tight")
else:
plt.show()
#plt.close()
# Plot action results
for i in range(len(probs)):
A_pct1 = A1[:,i] / N_experiments
A_pct2 = A2[:,i] / N_experiments
steps = list(np.array(range(len(A_pct1)))+1)
plt.plot(steps, A_pct1, "-",
linewidth=4,
label="VM {} (mean: {})".format(i+1, probs[i]))
plt.plot(steps, A_pct2, "o",
linewidth=4,
label="VM {} (mean: {})".format(i+1, probs[i]))
plt.xlabel("Time Step")
plt.ylabel("VM Preference Probability")
leg = plt.legend(loc='upper left', shadow=True)
plt.xlim([1, N_steps])
plt.ylim([0, 1])
for legobj in leg.legend_handles:
legobj.set_linewidth(4.0)
if save_fig:
if not os.path.exists(output_dir): os.mkdir(output_dir)
plt.savefig(os.path.join(output_dir, "actions.png"), bbox_inches="tight")
else:
plt.show()
#plt.close()
Editor is loading...