zoukankan      html  css  js  c++  java
  • Deep Q-Network 学习笔记(五)—— 改进③:Prioritized Replay 算法

    也就是优先采样,这里的推导部分完全没看懂 Orz,这里也只是记录实现代码。

    也就是看了以下两篇文章对应做了实现。

    莫烦老师的教程:

    https://morvanzhou.github.io/tutorials/machine-learning/reinforcement-learning/4-6-prioritized-replay/

    还有这位大神的:

    https://jaromiru.com/2016/11/07/lets-make-a-dqn-double-learning-and-prioritized-experience-replay/

    https://github.com/jaara/AI-blog

    这里同样是在上一篇的基础上完善。

    完整实现

    tree 优先采样:

    import numpy as np
    import random
    
    
    class SumTree:
        write = 0
    
        def __init__(self, capacity):
            # 容量。
            self.capacity = capacity
            self.tree = np.zeros(2 * capacity - 1)
            self.data = np.zeros(capacity, dtype=object)
    
        def _propagate(self, idx, change):
            # 整数除法。
            parent = (idx - 1) // 2
    
            self.tree[parent] += change
    
            if parent != 0:
                self._propagate(parent, change)
    
        def _retrieve(self, idx, s):
            """
            示例:
              样本数(sample num):4 个,则 Tree 的 size 是:4 X 2 - 1 = 7。
              其中,size 的后 sample num 个是单个样本的优先级,其它的是父级,即:父级的优先级=SUM(子级的优先级)。
              如果 data = ["test1", "test2", "test3", "test4"],它们单个的优先级对应的就是下图中的“3, 4, 5, 6”。
            Tree structure and array storage:
    
            Tree index:
                 18         -> storing priority sum
                / 
              7    11
             /    / 
            3   4 5   6    -> storing priority for transitions
    
            Array type for storing:
            [18,7,11,3,4,5,6]
            """
            left = 2 * idx + 1
            right = left + 1
    
            if left >= len(self.tree):
                return idx
    
            if s <= self.tree[left]:
                return self._retrieve(left, s)
            else:
                return self._retrieve(right, s - self.tree[left])
    
        def total(self):
            return self.tree[0]
    
        def add(self, p, data):
            idx = self.write + self.capacity - 1
    
            self.data[self.write] = data
            self.update(idx, p)
    
            self.write += 1
            if self.write >= self.capacity:
                self.write = 0
    
        def update(self, idx, p):
            change = p - self.tree[idx]
    
            self.tree[idx] = p
            self._propagate(idx, change)
    
        def get(self, s):
            idx = self._retrieve(0, s)
            data_index = idx - self.capacity + 1
    
            return idx, self.tree[idx], self.data[data_index]
    
    
    class Memory:
        e = 0.01
        a = 0.6
    
        def __init__(self, capacity):
            self.tree = SumTree(capacity)
    
        def _get_priority(self, error):
            return (error + self.e) ** self.a
    
        def add(self, error, sample):
            p = self._get_priority(error)
            self.tree.add(p, sample)
    
        def sample(self, n):
            batch = []
    
            # 计算样本抽取的区间。
            segment = self.tree.total() / n
    
            for i in range(n):
                # 第 i 个样本抽取区间的开始序号。
                a = segment * i
                # 第 i 个样本抽取区间的结束序号。
                b = segment * (i + 1)
    
                # 在(a, b)的区间内随机选数。
                s = random.uniform(a, b)
                # 根据 s 来抽取,并将数据(数据序号,优先级,数据)返回。
                (idx, p, data) = self.tree.get(s)
                # 将样本增加到集合里。
                batch.append((idx, data))
    
            return batch
    
        def update(self, idx, error):
            p = self._get_priority(error)
            self.tree.update(idx, p)
    
    if __name__ == "__main__":
        memory = Memory(4)
        memory.add(1, "test1")
        print(memory.tree.data)
        print(memory.tree.tree)
        memory.add(1, "test2")
        print(memory.tree.data)
        print(memory.tree.tree)
        memory.add(2, "test3")
        memory.add(3, "test4")
        samples, sample_index, i_s_weight = memory.sample(2)
        print("samples:", samples)
        print("sample_index:", sample_index)
        print("Importance-Sampling Weight", i_s_weight)

    神经网络:

    import tensorflow as tf
    import numpy as np
    
    
    class DeepQNetwork:
        # q_eval 网络状态输入参数。
        q_eval_input = None
    
        # # q_eval 网络动作输入参数。
        # q_action_input = None
    
        # q_eval 网络中 q_target 的输入参数。
        q_eval_target = None
    
        # q_eval 网络输出结果。
        q_eval_output = None
    
        # q_eval 网络输出的结果中的最优得分。
        q_predict = None
    
        # q_eval 网络输出的结果中当前选择的动作得分。
        reward_action = None
    
        # q_eval 网络损失函数。
        loss = None
    
        # q_eval 网络训练。
        train_op = None
    
        # q_target 网络状态输入参数。
        q_target_input = None
    
        # q_target 网络输出结果。
        q_target_output = None
    
        # 更换 target_net 的步数。
        replace_target_stepper = 0
    
        def __init__(self, input_num, output_num, learning_rate=0.001, replace_target_stepper=300, session=None):
            self.learning_rate = learning_rate
            self.INPUT_NUM = input_num
            self.OUTPUT_NUM = output_num
            self.replace_target_stepper = replace_target_stepper
    
            self.create()
    
            if session is None:
                self.session = tf.InteractiveSession()
                self.session.run(tf.initialize_all_variables())
            else:
                self.session = session
    
        def create(self):
            neuro_layer_1 = 3
            w_init = tf.random_normal_initializer(0, 0.3)
            b_init = tf.constant_initializer(0.1)
    
            # -------------- 创建 eval 神经网络, 及时提升参数 -------------- #
            self.q_eval_input = tf.placeholder(shape=[None, self.INPUT_NUM], dtype=tf.float32, name="q_eval_input")
            # self.q_action_input = tf.placeholder(shape=[None, self.OUTPUT_NUM], dtype=tf.float32)
            self.q_eval_target = tf.placeholder(shape=[None, self.OUTPUT_NUM], dtype=tf.float32, name="q_target")
    
            with tf.variable_scope("eval_net"):
                q_name = ['eval_net_params', tf.GraphKeys.GLOBAL_VARIABLES]
    
                with tf.variable_scope('l1'):
                    w1 = tf.get_variable('w1', [self.INPUT_NUM, neuro_layer_1], initializer=w_init, collections=q_name)
                    b1 = tf.get_variable('b1', [1, neuro_layer_1], initializer=b_init, collections=q_name)
                    l1 = tf.nn.relu(tf.matmul(self.q_eval_input, w1) + b1)
    
                with tf.variable_scope('l2'):
                    w2 = tf.get_variable('w2', [neuro_layer_1, self.OUTPUT_NUM], initializer=w_init, collections=q_name)
                    b2 = tf.get_variable('b2', [1, self.OUTPUT_NUM], initializer=b_init, collections=q_name)
                    self.q_eval_output = tf.matmul(l1, w2) + b2
                    self.q_predict = tf.argmax(self.q_eval_output, 1)
    
            with tf.variable_scope('loss'):
                # # 取出当前动作的得分。
                # self.reward_action = tf.reduce_sum(tf.multiply(self.q_eval_output, self.q_action_input),
                #                                    reduction_indices=1)
                # self.loss = tf.reduce_mean(tf.square((self.q_eval_target - self.reward_action)))
                self.loss = tf.reduce_mean(tf.squared_difference(self.q_eval_target, self.q_eval_output))
    
            with tf.variable_scope('train'):
                self.train_op = tf.train.GradientDescentOptimizer(self.learning_rate).minimize(self.loss)
    
            # -------------- 创建 target 神经网络, 及时提升参数 -------------- #
            self.q_target_input = tf.placeholder(shape=[None, self.INPUT_NUM], dtype=tf.float32, name="q_target_input")
    
            with tf.variable_scope("target_net"):
                t_name = ['target_net_params', tf.GraphKeys.GLOBAL_VARIABLES]
    
                with tf.variable_scope('l1'):
                    w1 = tf.get_variable('w1', [self.INPUT_NUM, neuro_layer_1], initializer=w_init, collections=t_name)
                    b1 = tf.get_variable('b1', [1, neuro_layer_1], initializer=b_init, collections=t_name)
                    l1 = tf.nn.relu(tf.matmul(self.q_target_input, w1) + b1)
    
                with tf.variable_scope('l2'):
                    w2 = tf.get_variable('w2', [neuro_layer_1, self.OUTPUT_NUM], initializer=w_init, collections=t_name)
                    b2 = tf.get_variable('b2', [1, self.OUTPUT_NUM], initializer=b_init, collections=t_name)
                    self.q_target_output = tf.matmul(l1, w2) + b2
    
        def replace_target_params(self):
            """
            使用 Tensorflow 中的 assign 功能替换 target_net 所有参数。
            :return:
            """
            # 提取 target_net 的参数。
            t_params = tf.get_collection('target_net_params')
            # 提取 eval_net 的参数。
            e_params = tf.get_collection('eval_net_params')
            # 更新 target_net 参数。
            self.session.run([tf.assign(t, e) for t, e in zip(t_params, e_params)])
    
        def get_q(self, input_data):
            return self.session.run(self.q_eval_output, {self.q_eval_input: input_data})
    
        def get_next_q(self, input_data):
            return self.session.run(self.q_target_output, {self.q_target_input: input_data})
    
        def get_predict(self, input_data):
            return np.max(self.get_q(input_data))
    
        def get_action(self, input_data):
            return np.argmax(self.get_q(input_data))
    
        def train(self, input_data, y_):
            _, cost = self.session.run([self.train_op, self.loss],
                                       feed_dict={self.q_eval_input: input_data,
                                                  self.q_eval_target: y_})
            return cost

    主逻辑:

    import numpy as np
    from collections import deque
    import random
    from q_network import DeepQNetwork
    from tree import Memory
    
    
    class Agent:
    
        r = np.array([[-1, -1, -1, -1, 0, -1],
                      [-1, -1, -1, 0, -1, 100.0],
                      [-1, -1, -1, 0, -1, -1],
                      [-1, 0, 0, -1, 0, -1],
                      [0, -1, -1, 1, -1, 100],
                      [-1, 0, -1, -1, 0, 100],
                      ])
    
        # 神经网络。
        network = None
    
        def __init__(self, train_num=2000, prioritized=True):
            # 执行步数。
            self.step_index = 0
    
            # 状态数。
            self.STATE_NUM = 6
    
            # 动作数。
            self.ACTION_NUM = 6
    
            # 记忆上限。
            self.memory_size = 5000
    
            # 当前记忆数。
            self.memory_counter = 0
    
            self.prioritized = prioritized
    
            if prioritized:
                self.replay_memory_store = Memory(self.memory_size)
            else:
                # 保存观察到的执行过的行动的存储器,即:曾经经历过的记忆。
                self.replay_memory_store = deque()
    
            # 训练之前观察多少步。
            self.OBSERVE = 5000
    
            self.TRAIN_NUM = train_num
    
            # 训练步数统计。
            self.learn_step_counter = 0
    
            # 选取的小批量训练样本数。
            self.BATCH = 20
    
            # γ经验折损率。
            self.gamma = 0.9
    
            # -------------------- 探索策略 -------------------- #
            # epsilon 的最小值,当 epsilon 小于该值时,将不在随机选择行为。
            self.FINAL_EPSILON = 0.0001
    
            # epsilon 的初始值,epsilon 逐渐减小。
            self.INITIAL_EPSILON = 0.1
    
            # epsilon 衰减的总步数。
            self.EXPLORE = 3000000.
    
            # 探索模式计数。
            self.epsilon = 0
            # -------------------- 探索策略 -------------------- #
    
            # 生成神经网络。
            self.network = DeepQNetwork(input_num=self.STATE_NUM,
                                        output_num=self.ACTION_NUM,
                                        learning_rate=0.001,
                                        replace_target_stepper=1000,
                                        session=None)
    
            # 生成一个状态矩阵(6 X 6),每一行代表一个状态。
            self.state_list = np.identity(self.STATE_NUM)
    
            # 生成一个动作矩阵。
            self.action_list = np.identity(self.ACTION_NUM)
    
            # 输出图表。
            self.q_list = []
            self.running_q = 0
    
        def select_action(self, current_state_index):
            """
            根据策略选择动作。
            :param current_state_index:
            :return:
            """
            # 获得当前状态。
            current_state = self.state_list[current_state_index:current_state_index + 1]
    
            # 根据当前状态获得在 Q 网络中最有价值的动作,并返回动作序号。
            current_action_index = self.network.get_action(current_state)
    
            # 输出图表。
            actions_value = self.network.get_predict(current_state)
            self.running_q = self.running_q * 0.99 + 0.01 * np.max(actions_value)
            self.q_list.append(self.running_q)
    
            if np.random.uniform() < self.epsilon:
                current_action_index = np.random.randint(0, self.ACTION_NUM)
    
            # 开始训练后,在 epsilon 小于一定的值之前,将逐步减小 epsilon。
            if self.step_index > self.OBSERVE and self.epsilon > self.FINAL_EPSILON:
                self.epsilon -= (self.INITIAL_EPSILON - self.FINAL_EPSILON) / self.EXPLORE
    
            return current_action_index
    
        def save_store(self, current_state_index, current_action_index, current_reward, next_state_index, done):
                """
                保存记忆。
                :param current_state_index: 当前状态 index。
                :param current_action_index: 动作 index。
                :param current_reward: 奖励。
                :param next_state_index: 下一个状态 index。
                :param done: 是否结束。
                :return:
                """
                current_state = self.state_list[current_state_index:current_state_index + 1]
                current_action = self.action_list[current_action_index:current_action_index + 1]
                next_state = self.state_list[next_state_index:next_state_index + 1]
    
                # 保存数据(当前状态, 当前执行的动作, 当前动作的得分,下一个状态,是否结束)。
                memory_data = (current_state, current_action, current_reward, next_state, done)
    
                if self.prioritized:
                    x, y, errors = self._get_targets([(0, memory_data)])
                    self.replay_memory_store.add(errors[0], memory_data)
                else:
                    # 记忆动作。
                    self.replay_memory_store.append(memory_data)
    
                    # 如果超过记忆的容量,则将最久远的记忆移除。
                    if len(self.replay_memory_store) > self.memory_size:
                        self.replay_memory_store.popleft()
    
                self.memory_counter += 1
    
        def run_game(self, state_index, action_index):
            """
            执行动作。
            :param state_index: 当前状态。
            :param action_index: 执行的动作。
            :return:
            """
            reward = self.r[state_index][action_index]
    
            next_state = action_index
    
            done = False
    
            if action_index == 5:
                done = True
    
            return next_state, reward, done
    
        def experience_replay(self):
            """
            记忆回放。
            :return:
            """
            # 检查是否替换 target_net 参数
            if self.learn_step_counter % self.network.replace_target_stepper == 0:
                self.network.replace_target_params()
    
            # 随机选择一小批记忆样本。
            batch = self.BATCH if self.memory_counter > self.BATCH else self.memory_counter
    
            if self.prioritized:
                minibatch = self.replay_memory_store.sample(batch)
            else:
                minibatch = random.sample(self.replay_memory_store, batch)
    
            x, y, errors = self._get_targets(minibatch)
    
            if self.prioritized:
                # update errors
                for i in range(len(minibatch)):
                    idx = minibatch[i][0]
                    self.replay_memory_store.update(idx, errors[i])
    
                self.network.train(x, y)
            else:
                self.network.train(x, y)
    
            self.learn_step_counter += 1
    
        def _get_targets(self, batch):
            if self.prioritized:
                # (当前状态,当前动作,当前得分,下一个状态) = (s, a, r, s_)。
                current_states = np.vstack([o[1][0] for o in batch])
                current_actions = np.vstack([o[1][1] for o in batch])
                current_rewards = np.vstack([o[1][2] for o in batch])
                next_states = np.vstack([o[1][3] for o in batch])
            else:
                current_states = np.vstack([o[0] for o in batch])
                current_actions = np.vstack([o[1] for o in batch])
                current_rewards = np.vstack([o[2] for o in batch])
                next_states = np.vstack([o[3] for o in batch])
    
            # 当前状态在 Q 网络的得分。
            p = self.network.get_q(current_states)
            # 下一状态在 Q 网络的得分。
            p_ = self.network.get_q(next_states)
            # 下一状态在 Target 网络的得分。
            p_target = self.network.get_next_q(next_states)
    
            x = np.zeros((len(batch), self.STATE_NUM))
            y = np.zeros((len(batch), self.ACTION_NUM))
            errors = np.zeros(len(batch))
    
            for i in range(len(batch)):
                s = current_states[i]
                a = current_actions[i]
                r = current_rewards[i][0]
                s_ = next_states[i]
    
                a_index = np.argmax(a)
    
                # 获得第 i 个样本当前状态的所有动作的 Q 值表。
                target_q = p[i]
                # 获得第 i 个样本当前动作的 Q 得分。
                current_q = target_q[a_index]
    
                max_q = p_target[i][np.argmax(p_[i])]
    
                if r <= -1 or s_ is None:
                    target_q[a_index] = r
                else:
                    target_q[a_index] = r + self.gamma * max_q
    
                x[i] = s
                y[i] = target_q
                errors[i] = abs(current_q - target_q[a_index])
    
            return x, y, errors
    
        def train(self):
            """
            训练。
            :return:
            """
            # 初始化当前状态。
            current_state = np.random.randint(0, self.ACTION_NUM - 1)
            self.epsilon = self.INITIAL_EPSILON
    
            while True:
                # 选择动作。
                action = self.select_action(current_state)
    
                # 执行动作,得到:下一个状态,执行动作的得分,是否结束。
                next_state, reward, done = self.run_game(current_state, action)
    
                # 保存记忆。
                self.save_store(current_state, action, reward, next_state, done)
    
                # 先观察一段时间累积足够的记忆在进行训练。
                if self.step_index > self.OBSERVE:
                    self.experience_replay()
    
                if self.step_index - self.OBSERVE > self.TRAIN_NUM:
                    break
    
                if done:
                    current_state = np.random.randint(0, self.ACTION_NUM - 1)
                else:
                    current_state = next_state
    
                self.step_index += 1
    
        def pay(self):
            """
            运行并测试。
            :return:
            """
            self.train()
    
            # 显示 R 矩阵。
            print(self.r)
    
            for index in range(5):
    
                start_room = index
    
                print("#############################", "Agent 在", start_room, "开始行动", "#############################")
    
                current_state = start_room
    
                step = 0
    
                target_state = 5
    
                while current_state != target_state:
                    next_state = self.network.get_action(self.state_list[current_state:current_state + 1])
    
                    print("Agent 由", current_state, "号房间移动到了", next_state, "号房间")
    
                    current_state = next_state
    
                    step += 1
    
                print("Agent 在", start_room, "号房间开始移动了", step, "步到达了目标房间 5")
    
                print("#############################", "Agent 在", 5, "结束行动", "#############################")
    
        def show_plt(self):
            import matplotlib.pyplot as plt
            plt.plot(np.array(self.q_list), c='r', label='natural')
            # plt.plot(np.array(q_double), c='b', label='double')
            plt.legend(loc='best')
            plt.ylabel('Q eval')
            plt.xlabel('training steps')
            plt.grid()
            plt.show()
    
    if __name__ == "__main__":
        agent = Agent(train_num=20000, prioritized=True)
        agent.pay()
        agent.show_plt()
  • 相关阅读:
    mysql-基础和基本指令
    网络笔试面试
    Windows访问Linux下的共享目录的配置方法
    帧动画 连续播放多张图片动画 以及ui动画 SoundPool
    ScrollView listView gridView 之间的冲突问题
    handler------post传送方式
    handler通信机制
    内部存储 openFileInputStream openFileOutputStream
    popupMenu-----弹出菜单
    为系统菜单添加图标--------暴力反射
  • 原文地址:https://www.cnblogs.com/cjnmy36723/p/7063197.html
Copyright © 2011-2022 走看看