Untitled
unknown
plain_text
9 months ago
6.1 kB
6
Indexable
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import keras
from keras import layers
import gymnasium as gym
from gymnasium.wrappers.frame_stack import FrameStack
from gymnasium.wrappers import AtariPreprocessing
import numpy as np
import tensorflow as tf
import ale_py
# Configuration parameters
seed = 42
gamma = 0.99
epsilon = 1.0
epsilon_min = 0.1
epsilon_max = 1.0
epsilon_interval = (epsilon_max - epsilon_min)
batch_size = 32
max_steps_per_episode = 10000
max_episodes = 0 # Set to 0 for unlimited training
# Po ilu epizodach włączyć renderowanie
render_after_episodes = 500
# Tworzenie środowiska (na początku bez renderowania)
env = gym.make("BreakoutNoFrameskip-v4", frameskip=1) # Początkowo bez render_mode
env = AtariPreprocessing(env)
env = FrameStack(env, 4)
env.reset(seed=seed)
num_actions = 4
def create_q_model():
return keras.Sequential([
layers.Lambda(lambda tensor: keras.ops.transpose(tensor, [0, 2, 3, 1]), output_shape=(84, 84, 4), input_shape=(4, 84, 84)),
layers.Conv2D(32, 8, strides=4, activation="relu"),
layers.Conv2D(64, 4, strides=2, activation="relu"),
layers.Conv2D(64, 3, strides=1, activation="relu"),
layers.Flatten(),
layers.Dense(512, activation="relu"),
layers.Dense(num_actions, activation="linear"),
])
# Tworzenie modeli Q-Network
model = create_q_model()
model_target = create_q_model()
optimizer = keras.optimizers.Adam(learning_rate=0.00025, clipnorm=1.0)
# Bufor doświadczeń
action_history = []
state_history = []
state_next_history = []
rewards_history = []
done_history = []
episode_reward_history = []
running_reward = 0
episode_count = 0
frame_count = 0
epsilon_random_frames = 50000
epsilon_greedy_frames = 1000000.0
max_memory_length = 100000
update_after_actions = 4
update_target_network = 10000
loss_function = keras.losses.Huber()
while True:
# Restart środowiska po render_after_episodes epizodach
if episode_count == render_after_episodes:
env.close() # Zamykamy stare środowisko
env = gym.make("BreakoutNoFrameskip-v4", frameskip=1, render_mode="human") # Włączamy wyświetlanie
env = AtariPreprocessing(env)
env = FrameStack(env, 4)
print(f"Restart środowiska - renderowanie włączone po {episode_count} epizodach!")
print(f"Średnia nagroda przed restartem: {np.mean(episode_reward_history)}")
observation, _ = env.reset()
state = np.array(observation)
episode_reward = 0
for timestep in range(1, max_steps_per_episode):
frame_count += 1
# Eksploracja lub wybór najlepszej akcji
if frame_count < epsilon_random_frames or epsilon > np.random.rand(1)[0]:
action = np.random.choice(num_actions) # Losowa akcja
else:
state_tensor = keras.ops.convert_to_tensor(state)
state_tensor = keras.ops.expand_dims(state_tensor, 0)
action_probs = model(state_tensor, training=False)
action = keras.ops.argmax(action_probs[0]).numpy()
epsilon -= epsilon_interval / epsilon_greedy_frames
epsilon = max(epsilon, epsilon_min)
# Wykonanie akcji w środowisku
state_next, reward, done, _, _ = env.step(action)
state_next = np.array(state_next)
episode_reward += reward
# Zapisywanie doświadczeń do bufora
action_history.append(action)
state_history.append(state)
state_next_history.append(state_next)
done_history.append(done)
rewards_history.append(reward)
state = state_next
# Trening modelu co 4 kroki
if frame_count % update_after_actions == 0 and len(done_history) > batch_size:
indices = np.random.choice(range(len(done_history)), size=batch_size)
state_sample = np.array([state_history[i] for i in indices])
state_next_sample = np.array([state_next_history[i] for i in indices])
rewards_sample = [rewards_history[i] for i in indices]
action_sample = [action_history[i] for i in indices]
done_sample = keras.ops.convert_to_tensor([float(done_history[i]) for i in indices])
future_rewards = model_target.predict(state_next_sample)
updated_q_values = rewards_sample + gamma * keras.ops.amax(future_rewards, axis=1)
updated_q_values = updated_q_values * (1 - done_sample) - done_sample
masks = keras.ops.one_hot(action_sample, num_actions)
with tf.GradientTape() as tape:
q_values = model(state_sample)
q_action = keras.ops.sum(keras.ops.multiply(q_values, masks), axis=1)
loss = loss_function(updated_q_values, q_action)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
# Aktualizacja modelu docelowego co 10 000 klatek
if frame_count % update_target_network == 0:
model_target.set_weights(model.get_weights())
print(f"running reward: {running_reward:.2f}, episode: {episode_count}, frame count: {frame_count}")
# Ograniczanie pamięci doświadczeń
if len(rewards_history) > max_memory_length:
del rewards_history[:1]
del state_history[:1]
del state_next_history[:1]
del action_history[:1]
del done_history[:1]
if done:
break
print(f"Epizod {episode_count}, nagroda: {episode_reward}")
episode_reward_history.append(episode_reward)
if len(episode_reward_history) > 100:
del episode_reward_history[:1]
running_reward = np.mean(episode_reward_history)
episode_count += 1
if running_reward > 40:
print(f"Solved at episode {episode_count}!")
break
if max_episodes > 0 and episode_count >= max_episodes:
print(f"Stopped at episode {episode_count}!")
break
Editor is loading...
Leave a Comment