Preliminary
-
Robbins-Monro Algorithm
Robbins-Monro Algorithm is designed to solve the following equation:
[int c(s, heta) au_{ heta}(s)=0 ]where ( au_ heta) is a distribution of (s) parameterized by ( heta).
We can use following rule to obtain ( heta^*)
[ heta_{k+1} = heta_k-eta_k c(s_k, c_k) ]Q-learning Algorithm uses Robbins-Monro to update Q function, i.e.,
[Q'(s,a) = r+lambda Q(s_{t+1}, a_{t+1}) - Q(s, a) ]The original equation of (Q(s, a)) and (Q(s_{t+1}, a_{t+1})) is
[Q(s,a)=r + lambda sum_{s_{t+1}} P(s_{t+1}vert s_t, a)max_{a'}Q(s_{t+1}, a') ]The sum of (s_{t+1}) can be considered as expectation. If we take (Q(s, a)) and (r) into the expectation, the above equation follows Robbin-Monro Algorithm.
Policy Gradient
Now, we parameterize the policy distribution with parameter ( heta) denoted by ( au_ heta) and the objective function is (J( heta)). We want to minimize (J( heta)) (or maximize, depends on the definition) and improve our policy by optimizing ( heta).
Let reward function (mu^{ au_{ heta}}(s_0)=sumlimits_a au(avert s_0)Q^{ au_ heta}(s_0,a)) be the objective function (Note, we neglect time term (t)) and take gradient with respect to ( heta)
Lemma:
If (I-P > 0), for the equation ((I-P)x=y), we have
[egin{align*} x&=(I-P)^{-1}y\ &=sum_{k=0}^{infin} P^{k}y end{align*} ]
Using this lemma, we have
Let (eta(x)) denotes (sumlimits_{k=0}^{infin}P(s=x,k, au_ heta)), we can rewrite the equation
AC
We can using a network to estimate Q function and the actor network to learn ( heta).
from collections import deque
import random
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.optimizers import Adam
physical_devices = tf.config.experimental.list_physical_devices('GPU')
assert len(physical_devices) > 0, "Not enough GPU hardware devices available"
tf.config.experimental.set_memory_growth(physical_devices[0], True)
import gym
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--gamma', type=float, default=0.99)
parser.add_argument('--update_interval', type=int, default=5)
parser.add_argument('--actor_lr', type=float, default=0.0005)
parser.add_argument('--critic_lr', type=float, default=0.001)
args = parser.parse_args()
class Actor:
def __init__(self, state_dim, action_dim):
self.state_dim = state_dim
self.action_dim = action_dim
self.opt = Adam(args.actor_lr)
self.model = self.create_model()
def create_model(self):
return tf.keras.Sequential([
Input((self.state_dim, )),
Dense(32, activation='relu'),
Dense(16, activation='relu'),
Dense(self.action_dim, activation='softmax')
])
def compute_loss(self, actions, logits, advantages):
ce_loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
actions = tf.cast(actions, tf.int32)
policy_loss = ce_loss(actions, logits, sample_weight=tf.stop_gradient(advantages))
return policy_loss
def train(self, states, actions, advantages):
with tf.GradientTape() as tape:
logits = self.model(states, training=True)
loss = self.compute_loss(actions, logits, advantages)
grads = tape.gradient(loss, self.model.trainable_variables)
self.opt.apply_gradients(zip(grads, self.model.trainable_variables))
return loss
class Critic:
def __init__(self, state_dim, action_dim):
self.state_dim = state_dim
self.action_dim = action_dim
self.opt = Adam(args.critic_lr)
self.model = self.create_model()
def create_model(self):
return tf.keras.Sequential([
Input((self.state_dim,)),
Dense(32, activation='relu'),
Dense(16, activation='relu'),
Dense(16, activation='relu'),
Dense(1, activation='linear')
])
def compute_loss(self, v_pred, td_targets):
mse = tf.keras.losses.MeanSquaredError()
return mse(td_targets, v_pred)
def train(self, states, td_targets):
with tf.GradientTape() as tape:
v_pred = self.model(states, training=True)
assert v_pred.shape == td_targets.shape
loss = self.compute_loss(v_pred, tf.stop_gradient(td_targets))
grads = tape.gradient(loss, self.model.trainable_variables)
self.opt.apply_gradients(zip(grads, self.model.trainable_variables))
return loss
class Agent:
def __init__(self, env):
self.env = env
self.state_dim = env.observation_space.shape[0]
self.action_dim = env.action_space.n
self.actor = Actor(self.state_dim, self.action_dim)
self.critic = Critic(self.state_dim, self.action_dim)
def td_target(self, reward, next_state, done):
if done:
return reward
v_value = self.critic.model.predict(
np.reshape(next_state, [1, self.state_dim]))
return np.reshape(reward + args.gamma * v_value[0], [1, 1])
def advantage(self, td_targets, baselines):
return td_targets - baselines
def list_to_batch(self, list):
batch = list[0]
for elem in list[1:]:
batch = np.append(batch, elem, axis=0)
return batch
def train(self, max_episodes=1000):
for ep in range(max_episodes):
state_batch = []
action_batch = []
td_target_batch = []
advatnage_batch = []
episode_reward, done = 0, False
state = self.env.reset()
while not done:
# self.env.render()
probs = self.actor.model.predict(
np.reshape(state, [1, self.state_dim]))
action = np.random.choice(self.action_dim, p=probs[0]) # choice action according to policy
next_state, reward, done, _ = self.env.step(action)
state = np.reshape(state, [1, self.state_dim])
action = np.reshape(action, [1, 1])
next_state = np.reshape(next_state, [1, self.state_dim])
reward = np.reshape(reward, [1, 1])
td_target = self.td_target(reward * 0.01, next_state, done)
advantage = self.advantage(
td_target, self.critic.model.predict(state))
state_batch.append(state)
action_batch.append(action)
td_target_batch.append(td_target)
advatnage_batch.append(advantage)
if len(state_batch) >= args.update_interval or done:
states = self.list_to_batch(state_batch)
actions = self.list_to_batch(action_batch)
td_targets = self.list_to_batch(td_target_batch)
advantages = self.list_to_batch(advatnage_batch)
actor_loss = self.actor.train(states, actions, advantages)
critic_loss = self.critic.train(states, td_targets)
state_batch = []
action_batch = []
td_target_batch = []
advatnage_batch = []
episode_reward += reward[0][0]
state = next_state[0]
print('EP{} EpisodeReward={}'.format(ep, episode_reward))
def main():
env_name = 'CartPole-v1'
env = gym.make(env_name)
agent = Agent(env)
agent.train()
if __name__ == "__main__":
main()