zoukankan      html  css  js  c++  java
  • 强化学习笔记

    Preliminary

    • Robbins-Monro Algorithm

      Robbins-Monro Algorithm is designed to solve the following equation:

      [int c(s, heta) au_{ heta}(s)=0 ]

      where ( au_ heta) is a distribution of (s) parameterized by ( heta).

      We can use following rule to obtain ( heta^*)

      [ heta_{k+1} = heta_k-eta_k c(s_k, c_k) ]

      Q-learning Algorithm uses Robbins-Monro to update Q function, i.e.,

      [Q'(s,a) = r+lambda Q(s_{t+1}, a_{t+1}) - Q(s, a) ]

      The original equation of (Q(s, a)) and (Q(s_{t+1}, a_{t+1})) is

      [Q(s,a)=r + lambda sum_{s_{t+1}} P(s_{t+1}vert s_t, a)max_{a'}Q(s_{t+1}, a') ]

      The sum of (s_{t+1}) can be considered as expectation. If we take (Q(s, a)) and (r) into the expectation, the above equation follows Robbin-Monro Algorithm.

    Policy Gradient

    Now, we parameterize the policy distribution with parameter ( heta) denoted by ( au_ heta) and the objective function is (J( heta)). We want to minimize (J( heta)) (or maximize, depends on the definition) and improve our policy by optimizing ( heta).

    Let reward function (mu^{ au_{ heta}}(s_0)=sumlimits_a au(avert s_0)Q^{ au_ heta}(s_0,a)) be the objective function (Note, we neglect time term (t)) and take gradient with respect to ( heta)

    [egin{align*} abla_{ heta} mu^{ au_ heta}(s_0)&=sum_a( abla au(avert s_0)Q(a,s_0)+ au(avert s_0) abla Q(a, s_0))\ &=sum_a( abla au(avert s_0)Q(a,s_0)+ au(avert s_0) abla sum_{s'.r'}P(s',r'vert a,s_0)(r'+mu ^{ au_ heta}(s')))\ end{align*} ]

    Lemma:

    If (I-P > 0), for the equation ((I-P)x=y), we have

    [egin{align*} x&=(I-P)^{-1}y\ &=sum_{k=0}^{infin} P^{k}y end{align*} ]

    Using this lemma, we have

    [ ablamu^{ au_ heta}(s_0) = sum_{xin mathcal S}sum_{k=0}^infin P(s=x, k, au_ heta)sum_a abla au_{ heta}(avert x)Q^{ au}(x,a) ]

    Let (eta(x)) denotes (sumlimits_{k=0}^{infin}P(s=x,k, au_ heta)), we can rewrite the equation

    [egin{align*} ablamu^{ au_ heta}(s_0) &= sum_{xin mathcal S}eta(x)sum_a abla au_{ heta}(avert x)Q^{ au}(x,a)\ &proptosum_{xin mathcal S}frac{eta(x)}{sum_{x'}eta(x')}sum_a abla au_{ heta}(avert x)Q^{ au}(x,a)\ &=E^{ au_ heta}[ abla au_{ heta}(avert x)Q^{ au}(x,a)]\ &=E^{ au_ heta}[Q^{ au}(S_t,A_t) ablalog( au(A_tvert S_t))] end{align*} ]

    AC

    We can using a network to estimate Q function and the actor network to learn ( heta).

    from collections import deque
    import random
    
    import numpy as np
    import tensorflow as tf
    from tensorflow.keras.layers import Input, Dense
    from tensorflow.keras.optimizers import Adam
    physical_devices = tf.config.experimental.list_physical_devices('GPU')
    
    assert len(physical_devices) > 0, "Not enough GPU hardware devices available"
    
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
    
    import gym
    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--gamma', type=float, default=0.99)
    parser.add_argument('--update_interval', type=int, default=5)
    parser.add_argument('--actor_lr', type=float, default=0.0005)
    parser.add_argument('--critic_lr', type=float, default=0.001)
    
    args = parser.parse_args()
    class Actor:
        def __init__(self, state_dim, action_dim):
            self.state_dim = state_dim
            self.action_dim = action_dim
            self.opt = Adam(args.actor_lr)
            self.model = self.create_model()
    
        def create_model(self):
            return tf.keras.Sequential([
                Input((self.state_dim, )),
                Dense(32, activation='relu'),
                Dense(16, activation='relu'),
                Dense(self.action_dim, activation='softmax')
            ])
    
        def compute_loss(self, actions, logits, advantages):
            ce_loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
            actions = tf.cast(actions, tf.int32)
            policy_loss = ce_loss(actions, logits, sample_weight=tf.stop_gradient(advantages))
            return policy_loss
    
        def train(self, states, actions, advantages):
            with tf.GradientTape() as tape:
                logits = self.model(states, training=True)
                loss = self.compute_loss(actions, logits, advantages)
            grads = tape.gradient(loss, self.model.trainable_variables)
            self.opt.apply_gradients(zip(grads, self.model.trainable_variables))
            return loss
    
    class Critic:
        def __init__(self, state_dim, action_dim):
            self.state_dim = state_dim
            self.action_dim = action_dim
            self.opt = Adam(args.critic_lr)
            self.model = self.create_model()
    
        def create_model(self):
            return tf.keras.Sequential([
                Input((self.state_dim,)),
                Dense(32, activation='relu'),
                Dense(16, activation='relu'),
                Dense(16, activation='relu'),
                Dense(1, activation='linear')
            ])
    
        def compute_loss(self, v_pred, td_targets):
            mse = tf.keras.losses.MeanSquaredError()
            return mse(td_targets, v_pred)
    
        def train(self, states, td_targets):
            with tf.GradientTape() as tape:
                v_pred = self.model(states, training=True)
                assert v_pred.shape == td_targets.shape
                loss = self.compute_loss(v_pred, tf.stop_gradient(td_targets))
            grads = tape.gradient(loss, self.model.trainable_variables)
            self.opt.apply_gradients(zip(grads, self.model.trainable_variables))
            return loss
    
    class Agent:
        def __init__(self, env):
            self.env = env
            self.state_dim = env.observation_space.shape[0]
            self.action_dim = env.action_space.n
            self.actor = Actor(self.state_dim, self.action_dim)
            self.critic = Critic(self.state_dim, self.action_dim)
    
        def td_target(self, reward, next_state, done):
            if done:
                return reward
            v_value = self.critic.model.predict(
                np.reshape(next_state, [1, self.state_dim]))
            return np.reshape(reward + args.gamma * v_value[0], [1, 1])
    
        def advantage(self, td_targets, baselines):
            return td_targets - baselines
    
        def list_to_batch(self, list):
            batch = list[0]
            for elem in list[1:]:
                batch = np.append(batch, elem, axis=0)
            return batch
    
        def train(self, max_episodes=1000):
            for ep in range(max_episodes):
                state_batch = []
                action_batch = []
                td_target_batch = []
                advatnage_batch = []
                episode_reward, done = 0, False
    
                state = self.env.reset()
    
                while not done:
                    # self.env.render()
                    probs = self.actor.model.predict(
                        np.reshape(state, [1, self.state_dim]))
                    action = np.random.choice(self.action_dim, p=probs[0])  # choice action according to policy
    
                    next_state, reward, done, _ = self.env.step(action)
    
                    state = np.reshape(state, [1, self.state_dim])
                    action = np.reshape(action, [1, 1])
                    next_state = np.reshape(next_state, [1, self.state_dim])
                    reward = np.reshape(reward, [1, 1])
    
                    td_target = self.td_target(reward * 0.01, next_state, done)
                    advantage = self.advantage(
                        td_target, self.critic.model.predict(state))
    
                    state_batch.append(state)
                    action_batch.append(action)
                    td_target_batch.append(td_target)
                    advatnage_batch.append(advantage)
    
                    if len(state_batch) >= args.update_interval or done:
                        states = self.list_to_batch(state_batch)
                        actions = self.list_to_batch(action_batch)
                        td_targets = self.list_to_batch(td_target_batch)
                        advantages = self.list_to_batch(advatnage_batch)
    
                        actor_loss = self.actor.train(states, actions, advantages)
                        critic_loss = self.critic.train(states, td_targets)
    
                        state_batch = []
                        action_batch = []
                        td_target_batch = []
                        advatnage_batch = []
    
                    episode_reward += reward[0][0]
                    state = next_state[0]
    
                print('EP{} EpisodeReward={}'.format(ep, episode_reward))
    
    def main():
        env_name = 'CartPole-v1'
        env = gym.make(env_name)
        agent = Agent(env)
        agent.train()
    
    
    if __name__ == "__main__":
        main()
    
  • 相关阅读:
    Java的protect修饰限制
    Java静态导入包
    Java代码块
    Java多态类型转换
    Java中只有值传递!!!
    测试
    编译原理实验-LL1语法分析器(自动生成First集、Follow集求法)java实现
    用面向对象的思维设计相关类,从而实现直线与直线、直线与圆、直线与矩形的交点。
    java常见类库学习(1)String、StringBuilder、StringBuffer
    JAVA —Lock锁
  • 原文地址:https://www.cnblogs.com/DemonHunter/p/13463727.html
Copyright © 2011-2022 走看看