zoukankan      html  css  js  c++  java
  • 深度学习实践-强化学习-bird游戏 1.np.stack(表示进行拼接操作) 2.cv2.resize(进行图像的压缩操作) 3.cv2.cvtColor(进行图片颜色的转换) 4.cv2.threshold(进行图片的二值化操作) 5.random.sample(样本的随机抽取)

    1. np.stack((x_t, x_t, x_t, x_t), axis=2)  将图片进行串接的操作,使得图片的维度为[80, 80, 4]

    参数说明: (x_t, x_t, x_t, x_t) 表示需要进行串接的图片, axis = 2 表示在第三个维度上进行串接操作

    2. cv2.resize(x, [80, 80])  # 将图片的维度变化为80 * 80的维度

    参数说明, x为输入的图片,80, 80表示图片变化的维度

    3.cv2.cvtColor(x_t, tf.COLOR_RGB2GREY) 将图片转换为灰度图

    参数说明, x_t表示输入的图片, tf.COLOR_RGB2GREY表示颜色转换的模式

    4.cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY)  # 将大于等于1的索引变化为255,等于0的不变,也就是对图片进行二值化操作

    参数说明: x_t表示输入图片,1表示阈值,255表示大于阈值后的数值, cv2.THRESH_BINARY表示进行二值化的模式

    5. random.sample(D, batch_size)  从D样本中随机抽取batch_size个数据

    参数说明:D表示样本,batch_size表示抽取样本的个数

    因为对于每一帧图像而言,一只鸟在图像中的移动方向可能是向上的也可能是向下的

    代码说明:使用的网络是卷积网络,

    输入的是4帧的图片,即80*80*4,

    预测结果为当前位置的上下两个方向的V值,即奖励值

    损失值: 为当前真实的奖励值: cost = V[state] - (r_1 + GAMMA * V[next_state]), 目的是为了当前的预测奖励值 与 实际公式计算的奖励值越接近越好

    V[state] 表示当前位置的预测奖励值

    r_1 表示下一个状态的及时奖励

    V[next_state] 表示下一个位置的预测奖励值

    代码说明: 主要代码包括来部分,第一部分:主要包括构造网络结构,生成[None, 2]的输出结果,第二部分,获得一个batch的值来进行模型的训练

    第一部分:构造网络模型, 用于进行v的预测

    第一步:第一层卷积的W_conv1, 大小为[8, 8, 4, 32], 第一层卷积的b_conv1, 大小为[32] 

    第二步:第二层卷积的W_conv2, 大小为[4, 4, 32, 64], 第二层卷积的b_conv2, 大小为[64]

    第三步:第三层卷积的W_conv3, 大小为[3, 3, 64, 64], 第三层卷积的b_conv3, 大小为[64]

    第四步:使用tf.placeholder('float32', [None, 80, 80, 4])  # 初始化输入的数据s

    第五步:使用tf.nn.relu() 构造第一层卷积层 步长为4,使用max_pool_2x2 进行池化操作

    第六步:使用tf.nn.relu() 构造第二层卷积层 步长为2

    第七步:使用tf.nn.relu(conv2d(x, W_conv3, 1) + b) 步长为1 来构造第三层卷积层

    第八步:使用tf.reshape(h_conv4, [-1, 1600])  # 将卷积后的结果进行拉平操作

    第九步:使用tf.nn.relu(tf.matmul(x,  w_fc1) + b)  # 进行第一次的全连接操作

    第十步:使用tf.matmul(x, w_out) + b # 进行输出层的全连接操作,输出的结果为[None, action], 返货的结果为s,readout, h_fc1, s为输入结果,readout为输出结果,h_fc1为第一层全连接的输出结果

    第二部分:将上述求得的s,readout, h_fc1, sess, 传入,这个先观察1000次,1000以后再进行模型参数的训练,同时这里使用一个epsilon, 当random.random() 小于epsilon,那么下一个动作的位置方向是随机值,否者的话就使用np.argmax(readout), 即readout预测出当前位置v值大的索引值作为方向

    第一步:定义网络的损失值和训练步骤

                  第一步:使用tf.placeholder('float32', [None, 2]) 用来构造a即方向

                  第二步: 使用tf.placeholder('float32', [None])  用来构造实际当前位置的V值

                  第三步:预测结果: action_readout = tf.reduce_sum(tf.multiply(readout, a))  来获得当前预测的结果v奖励值

                  第四步: 构造cost函数,使用tf.reduce_mean(tf.square(action_readout - y))

                  第五步:构造train_op, 使用tf.trian.Adaoptimer(1e-6).minimize(cost)

                  第六步:使用sess.run(tf.global_variable_initial())  进行参数的初始化操作

    第二步:构造第一个输出的参数, 大小为[80, 80, 4]

                  第一步:使用game_state = game.GameState() 来实例化game_state的向量

                  第二步:使用D = deque() 来构造存储的向量,用于存储st 当前位置, a_t下一个动作的方向,  r_t,下一个动作的及时奖励,以及st1,下一个位置

                  第三步: 构造一个初始化方向, do_nothing

                                 第一步: np.zeros([2]) 来构造do_nothing的初始值

                                 第二步:do_nothing[0] = 1 将do_nothing的第一个位置的索引值构造为1 

                  第四步:将do_nothing 输入到game_state.frame_step(do_nothing) 获得do_nothing 下一个位置的x_t当前图片, r_0为及时奖励,  terminal是否停止

                  第五步:使用cv2.cvtColor(cv2.resize(x_t, (80, 80), cv2.COLOR_RGB2GRAY)) 将图片进行维度变换,同时将其转换为灰度图

                  第六步: 使用cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY) # 进行二值化操作

                  第七步:使用np.stack((x_t, x_t, x_t, x_t), axis=2) # 将此时的四张图片做一个串接,作为第一个输入s_t 

    第三步:进入循环,用于进行参数的训练, t = 0 

                   第一步:使用readout.eval(feed_dict={s:s_t}) 来获得当前位置的两个方向的v值

                   第二步:根据随机数和np.argmax() 来获得方向值

                                 第一步:使用np.zeros(2) 来获得a_t的初始值

                                第二步: 使用random.random() 来获得随机值,如果随机值小于epsilon,使用random.ranrange(ACTION) 来获得一个0,1的随机值

                                第三步:a_t[index] = 1, 来构造a_t 

                                第四步: 如果随机值大于epsilon, 使用np.argmax(t_readout) 来获得方向较大的预测值v,来作为索引值

                                 第五步: a_t[index] = 1 来构造a_t  

                    第三步: 使用game_state.frame_step(a_t)  来获得x_t1的图片,r_1的及时奖励, terminal判断是否停止了, 将其与s_t的前三帧图片进行拼接

                                 第一步:使用game_state.frame_step(a_t) 来获得x_t1, r_1, terminal 

                                 第二步:使用cv2.cvtColor(cv2.resize(x_t1, (80, 80)), cv2.COLOR_RGB2GRAY) 将彩图转换为灰度图

                                 第三步:使用cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY) 进行二值化操作

                                 第四步:使用np.reshape(x_t, [80, 80, 1]) 将图片转换为[80, 80, 1] 的数据

                                 第五步:使用np.append(x_t, s_t[:, :, 3], axis=2) 将图片进行拼接

                    第四步: 将s_t, a_t, r_1. s_t1 存储在D里面, s_t当前位置,a_t下一个方向,r_1下一个方向的奖励值,s_t1下一个位置

                                 第一步:D.append((s_t, a_t, r_1, s_t1))

                                 第二步:判断len(D) 大于MERORY, 使用D.popleft() 去除最后一张图片,使其保持5000的个数

                    第五步:如果t > OBSERVE, 使用random.sample(D, BATCH_SIZE) 从D中所及获取batch_size章图片进行模型的训练,sess.run(train_op)

                                  第一步:ministbatch = random.sample(D, BATCH_SIZE) 从D中获得batch_size的信息

                                  第二步:s_j_batch = [r[0] for r in mnistbatch] 获得当前位置, a_batch 下一个动作的方向, r_batch下一个动作的及时奖励,s_j1_batch 获得下一个位置的图片

                                  第三步:使用readou_batch = readout.eval(feed_dict={s:s_j1_batch}) 来获得下一个位置的v奖励值,

                                  第四步:构造y_batch,用于存储当前位置的v奖励值

                                               第一步:循环len(ministbatch)

                                               第二步:terminal = ministbatch[i][4] 来获得此时的terminal

                                               第三步:if terminal 表示停止了,那么y_batch.append(r_batch[i])  # 当前的预测奖励值使用r_batch[i]

                                               第四步:如果没有停止,那么y_batch.append(r_batch[i] + GAMMA*readout_batch[i]) # 当前的预测奖励值使用r_batch[j]  + readout_batch[i] 加上的是下一个位置的奖励值

                      第六步: 使用sess.run(train_op, feed_dict={s: s_j_batch, a:a_bacth, y:y_batch}) ,s_j_batch表示当前位置,a_batch表示一个动作的方向,y_batch,表示当前位置的实际奖励值

                      第七步:s_t = s_t1, 将下一个动作的图片赋值给当前图片,t = t + 1 更新迭代次数

                      第八步:如果迭代次数为10000, 使用saver.save(global_step=t) 将模型进行保存

                      第九步:使用epislon -= (initial - final) /  进行episilon的更新操作

                      第十步:打印各个参数信息

     train.py 

    import tensorflow as tf
    import numpy as np
    import sys
    sys.path.append('game/')
    import wrapped_flappy_bird as game
    import random
    from collections import deque
    import cv2
    
    ACTION = 2 # 方向的种类
    BATCH_SIZE = 64 # 每一个batch值
    INITIAL_EPSILON = 0.1 # 刚开始随机的比例
    FINAL_EPSILON = 0.0001 # 结束时随机的比例
    OBSEVER = 1000  # 观察的次数
    EXPLORE = 300000 # 进行探索的次数
    PREPLAY_MEMORY = 50000 # D中保存图片的数量
    FRAME_PER_ACTION = 1 # 每一次迭代的次数
    GAMMA = 0.99  # 下一次奖励的衰减值
    GAME ='bird' # 游戏的名字,用于存储save的名字
    
    
    # 构造w权重参数,输入为shape,使用的是tf.truncated_normal
    def weight_variable(shape):
    
        w = tf.truncated_normal(shape, stddev=0.1)
        return tf.Variable(w)
    # 构造b权重参数,输入为shape, 使用的是tf.constant
    def biase_variable(shape):
    
        b = tf.constant(0.0, shape=shape)
        return tf.Variable(b)
    # 构造卷积层, 使用tf.nn.conv2d
    def conv2d(x, w, stride):
    
        return tf.nn.conv2d(x, w, strides=[1, stride, stride, 1], padding='SAME')
    # 构造池化层, 使用tf.nn.max_pool
    def max_pool_2x2(x):
    
        return tf.nn.max_pool(x, strides=[1, 2, 2, 1], ksize=[1, 2, 2, 1], padding='SAME')
    
    # 第一部分:用于构建网络,输出结果为当前位置两个方向的奖励值
    def creatNetwork():
        # 第一步:构造第一层卷积层W的参数 [32]
        W_conv1 = weight_variable([8, 8, 4, 32])
        # 构造第一层卷积层b的参数 [32]
        b_conv1 = biase_variable([32])
        # 第二步:构造第二层卷积层w的参数, [4, 4, 32, 64]
        W_conv2 = weight_variable([4, 4, 32, 64])
        # 构造第二层卷积层b的参数 [64]
        b_conv2 = biase_variable([64])
    
        # 第三步:构造第三层卷积层w的参数  [3, 3, 64, 64]
        W_conv3 = weight_variable([3, 3, 64, 64])
        # 构造第三层卷积层b的参数 [64]
        b_conv3 = biase_variable([64])
    
        # 构造全连接的w参数[1600, 512]
        W_fc1 = weight_variable([1600, 512])
        # 构造全连接的b参数[512]
        b_fc1 = biase_variable([512])
        # 构造输出层的w参数[512, 2]
        W_out = weight_variable([512, ACTION])
        # 构造输出层的b参数[2]
        b_out = biase_variable([ACTION])
    
        # 第四步:构造输入数据的维度[None, 80, 80, 4]
        s = tf.placeholder('float32', [None, 80, 80, 4])
        # 第五步:第一层卷积, 使用tf.nn.relu() 和 pool层
        h_conv1 = tf.nn.relu(conv2d(s, W_conv1, 4) + b_conv1)
        h_pool1 = max_pool_2x2(h_conv1)
    
        # 第六步:第二层卷积, 使用tf.nn.relu() 构造第二层卷积
        h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2, 2) + b_conv2)
    
        # 第七步:第三层卷积, 使用tf.nn.relu() 构造第三层卷积
        h_conv3 = tf.nn.relu(conv2d(h_conv2, W_conv3, 1) + b_conv3)
    
        # 第八步: 进行维度的拉平操作
        fc_flatten = tf.reshape(h_conv3, [-1, 1600])
    
        # 第九步:进行第一层全连接,
        h_fc1 = tf.nn.relu(tf.matmul(fc_flatten, W_fc1) + b_fc1)
    
        # 第十步:构造输出层
        readout = tf.matmul(h_fc1, W_out) + b_out
        # 返回的是s, readout当前位置上下方向的奖励值, h_fc1表示前一层全连接操作
        return s, readout, h_fc1
    
    # 第二步:用于构建训练的网络
    def trainNetwork(sess, s, readout, h_fc1):
    
        # 第一步:构造损失函数cost和训练操作
        # 构建方向的输入值
        a = tf.placeholder('float32', [None, ACTION])
        # 构建当前位置奖励值
        y = tf.placeholder('float32', [None])
        # 使用位置信息与预测的奖励值进行相乘操作, 获得当前位置的预测奖励值
        ACTION_READOUT = tf.reduce_sum(tf.multiply(a, readout))
        # 使用预测奖励值与当前位置的奖励平方差来获得损失值
        cost = tf.reduce_mean(tf.square(y - ACTION_READOUT))
        # 使用tf.train.AdamOptimizer来构造损失值的降低函数
        train_op = tf.train.AdamOptimizer(1E-6).minimize(cost)
        # 进行参数的初始化操作
        sess.run(tf.global_variables_initializer())
        # 对游戏进行实例化操作
        game_state = game.GameState()
        # 第二步:构造第一个输入的样本,输入的数据的维度为[80, 80, 4]
        # 构造存储信息的列表
        D = deque()
        # 定义一个初始方向
        do_nothing = np.zeros([ACTION])
        # 使得初始方向的位置为do_noting[0] = 1
        do_nothing[0] = 1
        # 根据初始的方向获得第一个位置信息,x_t下一个图的位置, r_0下一个位置的及时奖励,terminal是否停止
        x_t, r_0, terminal = game_state.frame_step(do_nothing)
        # 对生成的图片使用cv2.resize进行维度的变化,将图片转换为灰度图
        x_t = cv2.cvtColor(cv2.resize(x_t, (80, 80)), cv2.COLOR_RGB2GRAY)
        # 进行二值化操作
        ret, x_t = cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY)
        # 使用np.stack进行图片的拼接操作, 将一帧的图片拼接为[80, 80, 4]
        s_t = np.stack((x_t, x_t, x_t, x_t), axis=2)
        # t为第一次迭代
        t = 0
        # 发现的随机值
        epsilon = INITIAL_EPSILON
        # 第三步:进入循环进行操作的训练
        while 'angry bird' != 'happy bird':
            # 将当前位置的图片输入,获得当前位置上下方向的奖励值
            readout_t = readout.eval(feed_dict={s:[s_t]})
            if t % FRAME_PER_ACTION == 0:
                # 将位置进行初始化
                a_t = np.zeros(ACTION)
                # 如果获得的随机值小于阈值,就进行探索操作
                if random.random() < epsilon:
                    # 获得随机的一个方向
                    index = random.randrange(ACTION)
                    # 将方向对应的位置赋值为1
                    a_t[index] = 1
                else:
                    # 获得当前位置两个方向较大的索引值
                    index = np.argmax(readout_t)
                    # 将位置的索引赋值为1
                    a_t[index] = 1
            else:
                a_t = do_nothing
            # 根据方向来获得下一个位置的图片,下一个位置的及时奖励,以及是否停止
            x_t1, r_1, terminal = game_state.frame_step(a_t)
            # 将图片转化为灰度值
            x_t1 = cv2.cvtColor(cv2.resize(x_t1, (80, 80)), cv2.COLOR_RGB2GRAY)
            # 将阈值大于1的设置为255,小于1的就是0不变,进行二值化操作
            ret, x_t1 = cv2.threshold(x_t1, 1, 255, cv2.THRESH_BINARY)
            # 将x_t1进行维度变化转换为(80, 80, 1)
            x_t1 = np.reshape(x_t1, (80, 80, 1))
            # 与测试图片的前3帧图片进行串接操作
            s_t1 = np.append(x_t1, s_t[:, :, :3], axis=2)
            # 将当前位置样本,当前位置,下一个位置的奖励值,下一个位置样本,是否停止加入到D中
            D.append((s_t, a_t, r_1, s_t1, terminal))
            # 如果D的长度大于memory,使用popleft去除最后一个信息
            if len(D) > PREPLAY_MEMORY:
                D.popleft()
            # 如果循环次数t大于观测值
            if t > OBSEVER:
                # 获得一个batch的图片信息
                mnistbatch = random.sample(D, BATCH_SIZE)
                # 获得当前样本的图片信息, [80, 80, 4]
                s_j_batch = [r[0] for r in mnistbatch]
                # 获得当前位置的方向信息,即向上的索引值为[1, 0], 向下的为[0, 1]
                a_batch = [r[1] for r in mnistbatch]
                # 下一个方向的及时奖励值
                r_batch = [r[2] for r in mnistbatch]
                # 获得下一个位置的组合图片信息 [80, 80, 4]
                s_j1_batch = [r[3] for r in mnistbatch]
    
                # 获得下一个方向的v奖励值
                readout_j1_batch = readout.eval(feed_dict={s:s_j1_batch})
                # 用于进行v奖励值的存储
                y_batch = []
                # 循环,计算每一个位置的奖励值v, 即r
                for i in range(len(mnistbatch)):
                    # 获得是够已经停止
                    terminal = mnistbatch[4][i]
                    if terminal:
                        # 如果停止,当前的奖励值为及时奖励
                        y_batch.append(r_batch[i])
    
                    else:
                        # 否者,当前位置的奖励值为及时奖励 + 奖励衰减 * 下一个位置的奖励值
                        y_batch.append(r_batch[i] + GAMMA * readout_j1_batch[i])
    
                # 进行缩小损失值的操作,输入为方向信息,y为当前位置的奖励值, s_j_batch表示当前位置的图片信息
                train_op.eval(feed_dict={
                    a : a_batch,
                    y: y_batch,
                    s: s_j_batch
                })
                # 将下一个阶段的位置信息赋值给当前位置,用于进行迭代
                s_t = s_t1
                # 将循环的次数+1
                t = t + 1
                # 构造saver进行参数的保存
                saver = tf.train.Saver()
    
                if t % 1000 == 0:
                    # 每迭代1000次,就进行参数的保存
                    saver.save(sess, 'saved_network/' + GAME + '-dqn', global_step=t)
            # 获得当前的位置
            state = ''
            if t <= OBSEVER:
                state = 'observe'
            elif t > OBSEVER and t <= OBSEVER + EXPLORE:
                state = 'explore'
    
            else:
                state = 'train'
            # 降低探索的值,以保证探索的比例越来越小    
            if epsilon > FINAL_EPSILON and t > OBSEVER:
                 epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / EXPLORE
            
            # 进行参数的打印    
            print('TIMESTEP', t, '/STATE', state, 
                  '/ EPSILON', epsilon, '/ ACTION',
                  a_t, '/ REWARD', r_1, 
                  '/ Q_MAX %e' % np.max(readout_t))
    
    
    
    
    
    
    
    
    
    
    
    
    
    def playGame():
        # 构造执行函数
        sess = tf.InteractiveSession()
        # 第一部分:卷积模型的构建,输出为s输入值, readout为当前的两个方向的奖励值,h_fc1第一层全连接的结果
        s, readout, h_fc1 = creatNetwork()
        trainNetwork(sess, s, readout, h_fc1)
    
    
    
    def main():
        # 建立游戏
        playGame()
    
    if __name__ == '__main__':
    
        main()

           效果图

                                         

                                    

  • 相关阅读:
    djinn:1 Vulnhub Walkthrough
    面试题:HTTP协议工作原理
    面试题:URI和URL的区别
    面试题:http和https的区别?什么是http无状态协议?什么是本地存储?
    Vue+Element 踩坑记录
    面试题:Vue的生命周期
    面试题:组件封装
    面试题:vuex
    面试题:callback
    面试题---华为机试在线训练:字符串最后一个单词的长度
  • 原文地址:https://www.cnblogs.com/my-love-is-python/p/10685669.html
Copyright © 2011-2022 走看看