zoukankan      html  css  js  c++  java
  • 强化学习_Deep Q Learning(DQN)_代码解析

    Deep Q Learning

    使用gym的CartPole作为环境,使用QDN解决离散动作空间的问题。

    一、导入需要的包和定义超参数

    import tensorflow as tf
    import numpy as np
    import gym
    import time
    import random
    from collections import deque
    
    #####################  hyper parameters  ####################
    
    # Hyper Parameters for DQN
    GAMMA = 0.9 # discount factor for target Q
    INITIAL_EPSILON = 0.5 # starting value of epsilon
    FINAL_EPSILON = 0.01 # final value of epsilon
    REPLAY_SIZE = 10000 # experience replay buffer size
    BATCH_SIZE = 32 # size of minibatch

    二、DQN构造函数

    1、初始化经验重放buffer;

    2、设置问题的状态空间维度,动作空间维度;

    3、设置e-greedy的epsilon;

    4、创建用于估计q值的Q网络,创建训练方法。

    5、初始化tensorflow的session

    def __init__(self, env):
        # init experience replay
        self.replay_buffer = deque()
        # init some parameters
        self.time_step = 0
        self.epsilon = INITIAL_EPSILON
        self.state_dim = env.observation_space.shape[0]
        self.action_dim = env.action_space.n
    
        self.create_Q_network()
        self.create_training_method()
    
        # Init session
        self.session = tf.InteractiveSession()
        self.session.run(tf.global_variables_initializer())

    三、创建神经网络

    创建一个3层全连接的神经网络,hidden layer有20个神经元。

    def create_Q_network(self):
        # network weights
        W1 = self.weight_variable([self.state_dim,20])
        b1 = self.bias_variable([20])
        W2 = self.weight_variable([20,self.action_dim])
        b2 = self.bias_variable([self.action_dim])
        # input layer
        self.state_input = tf.placeholder("float",[None,self.state_dim])
        # hidden layers
        h_layer = tf.nn.relu(tf.matmul(self.state_input,W1) + b1)
        # Q Value layer
        self.Q_value = tf.matmul(h_layer,W2) + b2
    
    def weight_variable(self,shape):
        initial = tf.truncated_normal(shape)
        return tf.Variable(initial)
    
    def bias_variable(self,shape):
        initial = tf.constant(0.01, shape = shape)
        return tf.Variable(initial)

    定义cost function和优化的方法,使“实际”q值(y)与当前网络估计的q值的差值尽可能小,即使当前网络尽可能接近真实的q值。

    def create_training_method(self):
        self.action_input = tf.placeholder("float",[None,self.action_dim]) # one hot presentation
        self.y_input = tf.placeholder("float",[None])
        Q_action = tf.reduce_sum(tf.multiply(self.Q_value,self.action_input),reduction_indices = 1)
        self.cost = tf.reduce_mean(tf.square(self.y_input - Q_action))
        self.optimizer = tf.train.AdamOptimizer(0.0001).minimize(self.cost)

    从buffer中随机取样BATCH_SIZE大小的样本,计算y(batch中(s,a)在当前网络下的实际q值)

    if done: y_batch.append(reward_batch[i])

    else :  y_batch.append(reward_batch[i] + GAMMA * np.max(Q_value_batch[i]))

    def train_Q_network(self):
        self.time_step += 1
        # Step 1: obtain random minibatch from replay memory
        minibatch = random.sample(self.replay_buffer,BATCH_SIZE)
        state_batch = [data[0] for data in minibatch]
        action_batch = [data[1] for data in minibatch]
        reward_batch = [data[2] for data in minibatch]
        next_state_batch = [data[3] for data in minibatch]
    
        # Step 2: calculate y
        y_batch = []
        Q_value_batch = self.Q_value.eval(feed_dict={self.state_input:next_state_batch})
        for i in range(0,BATCH_SIZE):
          done = minibatch[i][4]
          if done:
            y_batch.append(reward_batch[i])
          else :
            y_batch.append(reward_batch[i] + GAMMA * np.max(Q_value_batch[i]))
    
        self.optimizer.run(feed_dict={
          self.y_input:y_batch,
          self.action_input:action_batch,
          self.state_input:state_batch
          })

    四、Agent感知环境的接口

    每次决策采取的动作,得到环境的反馈,将(s, a, r, s_, done)存入经验重放buffer。当buffer中经验数量大于batch_size时开始训练。

    def perceive(self,state,action,reward,next_state,done):
        one_hot_action = np.zeros(self.action_dim)
        one_hot_action[action] = 1
        self.replay_buffer.append((state,one_hot_action,reward,next_state,done))
        if len(self.replay_buffer) > REPLAY_SIZE:
          self.replay_buffer.popleft()
    
        if len(self.replay_buffer) > BATCH_SIZE:
          self.train_Q_network()

    五、决策(选取action)

    两种选取方式greedy和e-greedy。

      def egreedy_action(self,state):
        Q_value = self.Q_value.eval(feed_dict = {
          self.state_input:[state]
          })[0]
        if random.random() <= self.epsilon:
            self.epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / 10000
            return random.randint(0,self.action_dim - 1)
        else:
            self.epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / 10000
            return np.argmax(Q_value)
    
      def action(self,state):
        return np.argmax(self.Q_value.eval(feed_dict = {
          self.state_input:[state]
          })[0])

    Agent完整代码:

    DQN
    import tensorflow as tf
    import numpy as np
    import gym
    import time
    import random
    from collections import deque
    
    #####################  hyper parameters  ####################
    
    # Hyper Parameters for DQN
    GAMMA = 0.9 # discount factor for target Q
    INITIAL_EPSILON = 0.5 # starting value of epsilon
    FINAL_EPSILON = 0.01 # final value of epsilon
    REPLAY_SIZE = 10000 # experience replay buffer size
    BATCH_SIZE = 32 # size of minibatch
    
    ###############################  DQN  ####################################
    
    class DQN():
      # DQN Agent
      def __init__(self, env):
        # init experience replay
        self.replay_buffer = deque()
        # init some parameters
        self.time_step = 0
        self.epsilon = INITIAL_EPSILON
        self.state_dim = env.observation_space.shape[0]
        self.action_dim = env.action_space.n
    
        self.create_Q_network()
        self.create_training_method()
    
        # Init session
        self.session = tf.InteractiveSession()
        self.session.run(tf.global_variables_initializer())
    
      def create_Q_network(self):
        # network weights
        W1 = self.weight_variable([self.state_dim,20])
        b1 = self.bias_variable([20])
        W2 = self.weight_variable([20,self.action_dim])
        b2 = self.bias_variable([self.action_dim])
        # input layer
        self.state_input = tf.placeholder("float",[None,self.state_dim])
        # hidden layers
        h_layer = tf.nn.relu(tf.matmul(self.state_input,W1) + b1)
        # Q Value layer
        self.Q_value = tf.matmul(h_layer,W2) + b2
    
      def create_training_method(self):
        self.action_input = tf.placeholder("float",[None,self.action_dim]) # one hot presentation
        self.y_input = tf.placeholder("float",[None])
        Q_action = tf.reduce_sum(tf.multiply(self.Q_value,self.action_input),reduction_indices = 1)
        self.cost = tf.reduce_mean(tf.square(self.y_input - Q_action))
        self.optimizer = tf.train.AdamOptimizer(0.0001).minimize(self.cost)
    
      def perceive(self,state,action,reward,next_state,done):
        one_hot_action = np.zeros(self.action_dim)
        one_hot_action[action] = 1
        self.replay_buffer.append((state,one_hot_action,reward,next_state,done))
        if len(self.replay_buffer) > REPLAY_SIZE:
          self.replay_buffer.popleft()
    
        if len(self.replay_buffer) > BATCH_SIZE:
          self.train_Q_network()
    
      def train_Q_network(self):
        self.time_step += 1
        # Step 1: obtain random minibatch from replay memory
        minibatch = random.sample(self.replay_buffer,BATCH_SIZE)
        state_batch = [data[0] for data in minibatch]
        action_batch = [data[1] for data in minibatch]
        reward_batch = [data[2] for data in minibatch]
        next_state_batch = [data[3] for data in minibatch]
    
        # Step 2: calculate y
        y_batch = []
        Q_value_batch = self.Q_value.eval(feed_dict={self.state_input:next_state_batch})
        for i in range(0,BATCH_SIZE):
          done = minibatch[i][4]
          if done:
            y_batch.append(reward_batch[i])
          else :
            y_batch.append(reward_batch[i] + GAMMA * np.max(Q_value_batch[i]))
    
        self.optimizer.run(feed_dict={
          self.y_input:y_batch,
          self.action_input:action_batch,
          self.state_input:state_batch
          })
    
      def egreedy_action(self,state):
        Q_value = self.Q_value.eval(feed_dict = {
          self.state_input:[state]
          })[0]
        if random.random() <= self.epsilon:
            self.epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / 10000
            return random.randint(0,self.action_dim - 1)
        else:
            self.epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / 10000
            return np.argmax(Q_value)
    
      def action(self,state):
        return np.argmax(self.Q_value.eval(feed_dict = {
          self.state_input:[state]
          })[0])
    
      def weight_variable(self,shape):
        initial = tf.truncated_normal(shape)
        return tf.Variable(initial)
    
      def bias_variable(self,shape):
        initial = tf.constant(0.01, shape = shape)
        return tf.Variable(initial)

    训练agent:

    train.py
    from DQN import DQN
    import gym
    import numpy as np
    import time
    
    ENV_NAME = 'CartPole-v1'
    EPISODE = 3000 # Episode limitation
    STEP = 300 # Step limitation in an episode
    TEST = 10 # The number of experiment test every 100 episode
    
    def main():
      # initialize OpenAI Gym env and dqn agent
      env = gym.make(ENV_NAME)
      agent = DQN(env)
    
      for episode in range(EPISODE):
        # initialize task
        state = env.reset()
        # Train
        ep_reward = 0
        for step in range(STEP):
          action = agent.egreedy_action(state) # e-greedy action for train
          next_state,reward,done,_ = env.step(action)
          # Define reward for agent
          reward = -10 if done else 1
          ep_reward += reward
          agent.perceive(state,action,reward,next_state,done)
          state = next_state
          if done:
            #print('episode complete, reward: ', ep_reward)
            break
        # Test every 100 episodes
        if episode % 100 == 0:
          total_reward = 0
          for i in range(TEST):
            state = env.reset()
            for j in range(STEP):
              #env.render()
              action = agent.action(state) # direct action for test
              state,reward,done,_ = env.step(action)
              total_reward += reward
              if done:
                break
          ave_reward = total_reward/TEST
          print ('episode: ',episode,'Evaluation Average Reward:',ave_reward)
    
    if __name__ == '__main__':
      main()

    reference:

    https://www.cnblogs.com/pinard/p/9714655.html

    https://github.com/ljpzzz/machinelearning/blob/master/reinforcement-learning/dqn.py

  • 相关阅读:
    Oracle 的日期类型
    简单的同步Socket程序服务端
    MMORPG中的相机跟随算法
    使用了UnityEditor中的API,打包时却不能打包UnityEditor的问题
    C# 中的关键字整理
    Unity3D C#中使用LINQ查询(与 SQL的区别)
    C# 值类型与引用类型的异同
    Unity3D NGUI事件监听的综合管理
    Unity3D 动画状态机简单控制核心代码
    Unity3D判断触摸方向
  • 原文地址:https://www.cnblogs.com/jasonlixuetao/p/10964557.html
Copyright © 2011-2022 走看看