Einordnung
Vor kurzem wurde in einem Post die Herangehensweise und Implementierung von Deep Q-Networks erklärt. DQN ist allerdings lediglich einer der grundlegenden Algorithmen im Reinforcement Learning, Forschung auf dem Gebiet zeigt eine Reihe möglicher Verbesserungen. Eine davon ist das Deep Recurrent Q-Network, welches Recurrent Layers verwendet, mehr Informationen gleichzeitig zu verarbeiten und die Interaktionen verschiedener Informationen besser verarbeiten zu können.
Grundlagen
Deep Q-Networks haben eine Reihe von praktischen Beschränkungen. Im Folgenden soll die wichtigste Beschränkung vorgestellt werden, welche durch Deep Recurrent Q-Networks gelöst werden konnte:
Man nehme an, ein Algorithmus soll das Spiel Pong lernen. Bei Pong handelt es sich um ein kompetitives Zwei-Spieler Arcade-Spiel, bei welchem zwei Scheiben verwendet werden, um mit einem Ball zu interagieren und diesen durch das Spielfeld zu bewegen. Ziel jedes Spielers ist es, den Ball so zu bewegen, dass der andere Spieler den Ball nicht abfangen kann und den Spielrand beim Gegner verlässt; in diesem Fall bekommt der Spieler einen Punkt. Eine mögliche Modellierung hierbei ist, dem Agenten immer die aktuelle Observation (also den Zustand des Bildschirms als RGB-Array) zu übergeben. Weiterhin nehmen wir an, wir befinden uns in einem fixen Zeitpunkt und sehen den Ball wie in dem Bild unten.
Nun stellt sich eine Frage: Wie wird sich der Ball abhängig vom aktuellen Zustand weiterbewegen? Die Antwort ist schnell klar - es gibt keine deterministische Antwort auf diese Frage. Um diese
Frage zu beantworten, benötigt ein Beobachter mindestens zwei Zeitschritte (und Informationen über die Weite des Zeitschrittes) um eine Flugtrajektorie und Geschwindigkeit zu berechnen. Also
war die Idee, ein Deep Q-Network als functional
-Model zu definieren. Ein Functional Model ist hierbei ein Modell, welches mehrere In-
und Outputs akzeptiert, also nicht zwangsläufig einen klassischen Graphen abbildet, wie dies beispielsweise bei dem sequential
-Model
der Fall ist.
Anwender welche mit künstlichen neuronalen Netzen bereits gearbeitet haben, sind eventuell mit dem Konzepte der Recurrent Models
vertraut. Diese bieten sich hier an, da diese effizienter sind
eine solche Aufgabe zu lösen als es klassische Feedforward Models
sind.
Recurrent Modells erhalten beim Input (sofern definiert) Informationen aus mehreren Zeitschritten vor dem aktuellen Zeitschritt. Intern in den Recurrent Layers wird hierbei die Interaktion zwischen den einzelnen Zeitschritten mit modelliert, um besserer Ergebnisse bei Problemen wie Sequence Forecasting oder Time Forecasting zu erreichen.
Als häufig genutztes Beispiel für Recurrent Layer kann man das Long Short-Term Memory-Layer sehen, welches in Keras implementiert wurde und bereits (1997 von Sepp Hochreiter vorgestellt wurde)[https://www.researchgate.net/publication/13853244_Long_Short-term_Memory]. Für neue Veröffentlichungen, welche sich mit LSTM auseinandersetzen, siehe beispielsweise bei 1, 2.
Der Ansatz oben wurde in 3 sogar noch ein wenig komplexer implementiert. Hier wurde Pong implementiert, allerdings mit der Besonderheit, dass ab und an Frames leer (also komplett ohne Informationen) an den Agenten übergeben wurden. Ein Algorithmus wie das Deep Q-Network, welches auf simple State-to-State-Transitions ausgelegt ist, kann hierbei keine sinnvolle Entscheidung treffen. Ein Deep Recurrent Q-Network, welches beispielsweise fünf Zeitschritte als Input akzeptiert, kann hierbei weiterhin sinnvoll arbeiten, da es alle notwendigen Informationen besitzt und potenziell die Position des Balls interpolieren kann. Die Autoren zeigen auch, dass sich das Recurrent Modell gegenüber dem Feed-Forward-Modell durchsetzt.
Implementierung
In dieser Version wird eine Implementierung eines Deep Recurrent Neural Networks, welches LSTM-Layer beinhaltet.
Alternative Layer für diese Art von Aufgabe sind beispielsweise die Gated Recurrent Unit (GRU) oder das Simple Recurrent Neural Network-Layer (SimpleRNN).
Die Imports sind identisch zur Implementierung eines DQN’s:
import datetime
import math
import gym
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.python.keras.models import Sequential
from tensorflow.python.keras.layers import Dense, InputLayer, LSTM
from tensorflow.python.keras.optimizer_v2.adam import Adam
from tensorflow.python.keras.losses import mean_squared_error
from tensorflow.python.keras.metrics import Mean
from tensorflow.python.keras.models import load_model, save_model
from collections import deque
Eine Implementierung eines ausgelagerten Experience Replays ist straight-forward:
class ExperienceMemory:
def __init__(self, memory_length: int=10000, batch_size: int=64, nonunique_experience: bool=True):
self.memory_length = memory_length
self.batch_size = batch_size
self.nonunique_experience = nonunique_experience
# ** Initialize replay memory D to capacity N **
self.cstate_memory = deque([], maxlen=self.memory_length)
self.action_memory = deque([], maxlen=self.memory_length)
self.reward_memory = deque([], maxlen=self.memory_length)
self.done_memory = deque([], maxlen=self.memory_length)
self.pstate_memory = deque([], maxlen=self.memory_length)
def record(self, cstate, action, reward, done, pstate):
# Convert states into processable objects
cstate = tf.expand_dims(tf.convert_to_tensor(cstate), axis=0)
pstate = tf.expand_dims(tf.convert_to_tensor(pstate), axis=0)
# Save data
self.cstate_memory.append(cstate)
self.action_memory.append(action)
self.reward_memory.append(reward)
self.done_memory.append(done)
self.pstate_memory.append(pstate)
def return_experience(self):
# Retrieve experience
batch_indices = np.random.choice(len(self.cstate_memory), size=self.batch_size, replace=self.nonunique_experience)
batch_cstate = np.array(list(self.cstate_memory))[batch_indices, :]
batch_action = np.take(list(self.action_memory), batch_indices)
batch_reward = np.take(list(self.reward_memory), batch_indices)
batch_done = np.take(list(self.done_memory), batch_indices)
batch_pstate = np.array(list(self.pstate_memory))[batch_indices, :]
# Convert experience into respective tensorflow tensors
cstates = tf.squeeze(tf.convert_to_tensor(batch_cstate))
actions = tf.convert_to_tensor(batch_action)
rewards = tf.convert_to_tensor(batch_reward, dtype=tf.float32)
dones = tf.convert_to_tensor(batch_done, dtype=tf.int64)
pstates = tf.squeeze(tf.convert_to_tensor(batch_pstate))
return (cstates, actions, rewards, dones, pstates)
def flush_memory(self):
self.cstate_memory.clear()
self.action_memory.clear()
self.reward_memory.clear()
self.done_memory.clear()
self.pstate_memory.clear()
Hinweis: Die Experience in diesem Memory wird bei der Ausgabe in Tensorflow-Tensoren umgewandelt, um das Training später als
@tf.function
ausführen zu können. Bei dieser Art von Funktionalität handelt es sich um eine Umwandlung der Operation in einen Graphen, damit beinhaltete Operationen schneller ausgeführt werden können.
Die genauere Funktionsweise von
@tf.functions
kann hier nachgelesen werden.
Der eigentliche Agent kann auf folgende Art implementiert werden:
class DRQNAgent:
def __init__(self, observation_size, action_shape, num_rounds):
self.observation_size = observation_size
self.action_shape = action_shape
self.num_rounds = num_rounds
# Network parameters
self.alpha = 0.005
self.gamma = 0.95
self.epsilon = 0.8
self.min_epsilon = 0.05
self.epsilon_decay = 0.005
self.batch_size = 64
self.nonunique_experience = True
self.memory_length = 10000
# Internal memory for last states
self.states = np.zeros((1, self.num_rounds, self.observation_size))
# ** Initialize replay memory D to capacity N **
self.experience_memory = ExperienceMemory(memory_length=self.memory_length, batch_size=self.batch_size, nonunique_experience=self.nonunique_experience)
# ** Initialize action-value function Q with random weights **
self.model = Sequential(name="DRQN")
self.model.add(InputLayer(input_shape=(self.num_rounds, self.observation_size), name="Input_Layer"))
self.r_layer = LSTM(units=24, activation='tanh', name='LSTM_Layer', input_shape=(self.observation_size, self.num_rounds))
self.model.add(self.r_layer)
self.model.add(Dense(units=self.action_shape, activation='linear', name='Output_Layer'))
self.optimizer = Adam(learning_rate=self.alpha)
self.loss = mean_squared_error
self.model.compile(loss=self.loss, optimizer=self.optimizer)
self.model.summary()
# Define metrics for tensorboard
self.score_log_path = 'logs/' + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
self.score_writer = tf.summary.create_file_writer(self.score_log_path)
def compute_action(self, state, evaluation: bool=False):
if (np.random.uniform(0, 1) < self.epsilon and evaluation == False):
return np.random.choice(range(self.action_shape)) # ** With probability epsilon select a random action a_t **
else:
state = np.reshape(state, [1, self.num_rounds, self.observation_size])
Q_values = self.model(state)[0]
return np.argmax(Q_values) # ** Otherwise select a_t = argmax(Q) **
def update_internal_state(self, pstate):
self.states = np.roll(self.states, -1, axis=1)
self.states[:, -1] = pstate
def clear_internal_state(self):
self.states = np.zeros((1, self.num_rounds, self.observation_size))
# Updated Epsilon-Greedy-Update scheme
def update_epsilon_parameter(self):
self.epsilon = max(self.epsilon * math.exp(-self.epsilon_decay), self.min_epsilon)
def store_step(self, cstate, action, reward, done, pstate):
self.experience_memory.record(cstate, action, reward, done, pstate)
def train(self): # **For each update step**
# ** Sample random minibatch of transitions from D**
b_cstate, b_action, b_reward, b_done, b_pstate = self.experience_memory.return_experience()
# ** Set y_j **
prediction_model_p = self.model(b_pstate)
q_value_network = np.zeros((self.batch_size,))
for i in range(self.batch_size):
q_value_network[i] = float(1 - b_done[i]) * self.gamma * np.max(prediction_model_p[i])
target_q_p = np.add(b_reward, q_value_network)
# **Perform gradient descent step on Q**
self.model.train_on_batch(b_cstate, target_q_p) # Performs better
# self.model.fit(b_cstate, target_q_p, verbose=0, epochs=5)
Hier sind einige Änderungen im Gegensatz zum DQNAgent, welche bisher noch nicht angesprochen wurden:
-
Die Form des Inputs hat sich verändert. Hierbei werden nicht (wie leicht anzunehmen) die Inputs in den Dimensionen
[self.num_rounds, self.observation_size]
übergeben, sondern in der Form[1, self.num_rounds, self.observation_size]
. Dies hat mit den Eingängen an den Layern zu tun. -
Es ist notwendig, eine Art von internem Speicher verwendet werden, um Informationen aus vorherigen Frames zwischenzuspeichern. In dem internen Speicher wird nach jedem Frame gespeichert; hierbei wird der älteste Frame entfernt.
-
Der interne Speicher wird am Anfang zu initialisiert; die Idee hierhinter ist, dem Algorithmus im ersten Schritt keine Informationen mitzugeben. Möglicherweise gibt es hierzu bessere Alternativen, dies erscheint aber bisher die vielversprechendste Variante.
def training_loop(env, agent: DRQNAgent, max_frames_episode: int):
current_obs = env.reset()
episode_reward = 0
agent.clear_internal_state()
agent.update_internal_state(current_obs)
for j in range(max_frames_episode):
action = agent.compute_action(agent.states)
next_obs, reward, done, _ = env.step(action)
next_obs = np.array(next_obs)
previous_states = agent.states.copy()
agent.update_internal_state(next_obs)
agent.store_step(previous_states, action, reward, done, agent.states.copy())
current_obs = next_obs
episode_reward += reward
if done:
agent.update_epsilon_parameter()
break
return episode_reward, agent
def evaluation_loop(env, agent: DRQNAgent, max_frames_episode: int):
current_obs = env.reset()
evaluation_reward = 0
agent.clear_internal_state()
agent.update_internal_state(current_obs)
for j in range(max_frames_episode):
# ** Execute action a_t in emulator and observe reward r_t and image x_{t+1}
action = agent.compute_action(agent.states, evaluation=True)
next_obs, reward, done, _ = env.step(action)
next_obs = np.array(next_obs)
# **Storing all information about the last episode in the memory buffer**
previous_states = agent.states.copy()
agent.update_internal_state(next_obs)
agent.store_step(previous_states, action, reward, done, agent.states.copy())
# Reseting environment information so that next episode can handle the previous information
current_obs = next_obs
evaluation_reward += reward
if done:
break
return evaluation_reward, agent
Bei der training_loop
und evaluation_loop
ist hierbei zu beachten, dass nicht einfach nur die aktuellen Informationen in den Experience Memory gespeichert werden, sondern der
komplette interne Speicher des Agenten. Dies ist notwendig, um einen sinnvollen Input für das Deep Recurrent Q-Network während der Trainingsphase vorliegen zu haben.
if __name__ == "__main__":
env = gym.make("CartPole-v1")
action_shape = env.action_space.n
observation_shape = env.observation_space.shape[0]
target_timesteps = 10
n_episodes, max_frames_episode, avg_length, evaluation_interval = 1000, 500, 50, 10
episodic_reward, avg_reward, evaluation_rewards = [], [], []
# Seeding
tf.random.set_seed(69)
np.random.seed(69)
env.seed(69)
agent = DRQNAgent(observation_shape, action_shape, target_timesteps)
for i in range(n_episodes):
# **Observing state of the environment**
episode_reward, agent = training_loop(env=env, agent=agent, max_frames_episode=max_frames_episode)
print("Training Score in Episode {}: {}".format(i, episode_reward))
if (i % evaluation_interval == 0):
# Evaluation loop
eval_reward, agent = evaluation_loop(env=env, agent=agent, max_frames_episode=max_frames_episode)
print("Evaluation Score in Episode {}: {}".format(i, eval_reward))
with agent.score_writer.as_default():
tf.summary.scalar('Episodic Score', episode_reward, step=i)
if (i % evaluation_interval == 0):
tf.summary.scalar('Evaluation Score', eval_reward, step=i)
episodic_reward.append(episode_reward)
avg_reward.append(avg_n(episodic_reward, n=avg_length))
agent.train()
In diesem Teil ergeben sich keine funktionellen Änderungen mehr. Als kleine formelle Änderung muss hier noch die Anzahl der Zeitschritte übergeben werden.
Die Performance kann entsprechend an den Diagrammen unten abgelesen werden. Das erste Diagramm entspricht der Zeitschrittweite , das zweite Diagramm und das letzte Diagramm einer Zeitschrittweite von Zeitschritten.
Abschließend hier noch ein Vergleich der verschiedenen Zeitschrittweiten:
Hinweis: Trainingsperformance kann sich von Gerät zu Gerät und entsprechenden Seeds teilweise stark unterscheiden. Allgemeine Reproduzierbarkeit von solchen Ergebnissen ist im Allgemeinen nicht garantierbar.
Weitere Informationen
Recurrent Layers können mit der Option stateful=True
initialisiert werden. Hierbei erhält der entsprechende Layer einen eigenen internen Speicher, welcher nicht implementiert werden
muss. Weiterhin ist das Netzwerk damit in der Lage, grundsätzlich Inputs beliebiger Länge zu verarbeiten; hierzu muss lediglich immer der aktuellste Zeitschritt in das Modell als Input
gegeben werden.
Hieraus resultieren allerdings auch eine Reihe von Problemen und Beschränkungen, welche beachtet werden müssen. Dadurch, dass das Netzwerk nicht weiß, wann eine Episode endet, müssen nach jeder Episode alle internen Speicher manuell zurückgesetzt werden. Weiterhin müssen in einem Experience Memory zu jedem Schritt auch die dazugehörigen internen Werte mitgespeichert werden; alternativ würde ein Frame “aus dem Kontext” gerissen werden. Allerdings sollen sich auch die internen Werte über das Training verändern - man füttert das Netzwerk also in Teilen mit veralteten Informationen.
Beide Arten der Umsetzung haben ihre Art von Vor- und Nachteilen, die Version, welche in diesem Post beschrieben wird, funktioniert allerdings verhältnismäßig gut.
Änderungen
[23.01.2023] Einführung interaktiver Plots und Beschreibung der Nicht-Reproduzierbarkeit von Ergebnissen