1. np.stack((x_t, x_t, x_t, x_t), axis=2) 将图片进行串接的操作,使得图片的维度为[80, 80, 4]
参数说明: (x_t, x_t, x_t, x_t) 表示需要进行串接的图片, axis = 2 表示在第三个维度上进行串接操作
2. cv2.resize(x, [80, 80]) # 将图片的维度变化为80 * 80的维度
参数说明, x为输入的图片,80, 80表示图片变化的维度
3.cv2.cvtColor(x_t, tf.COLOR_RGB2GREY) 将图片转换为灰度图
参数说明, x_t表示输入的图片, tf.COLOR_RGB2GREY表示颜色转换的模式
4.cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY) # 将大于等于1的索引变化为255,等于0的不变,也就是对图片进行二值化操作
参数说明: x_t表示输入图片,1表示阈值,255表示大于阈值后的数值, cv2.THRESH_BINARY表示进行二值化的模式
5. random.sample(D, batch_size) 从D样本中随机抽取batch_size个数据
参数说明:D表示样本,batch_size表示抽取样本的个数
因为对于每一帧图像而言,一只鸟在图像中的移动方向可能是向上的也可能是向下的
代码说明:使用的网络是卷积网络,
输入的是4帧的图片,即80*80*4,
预测结果为当前位置的上下两个方向的V值,即奖励值
损失值: 为当前真实的奖励值: cost = V[state] - (r_1 + GAMMA * V[next_state]), 目的是为了当前的预测奖励值 与 实际公式计算的奖励值越接近越好
V[state] 表示当前位置的预测奖励值
r_1 表示下一个状态的及时奖励
V[next_state] 表示下一个位置的预测奖励值
代码说明: 主要代码包括来部分,第一部分:主要包括构造网络结构,生成[None, 2]的输出结果,第二部分,获得一个batch的值来进行模型的训练
第一部分:构造网络模型, 用于进行v的预测
第一步:第一层卷积的W_conv1, 大小为[8, 8, 4, 32], 第一层卷积的b_conv1, 大小为[32]
第二步:第二层卷积的W_conv2, 大小为[4, 4, 32, 64], 第二层卷积的b_conv2, 大小为[64]
第三步:第三层卷积的W_conv3, 大小为[3, 3, 64, 64], 第三层卷积的b_conv3, 大小为[64]
第四步:使用tf.placeholder('float32', [None, 80, 80, 4]) # 初始化输入的数据s
第五步:使用tf.nn.relu() 构造第一层卷积层 步长为4,使用max_pool_2x2 进行池化操作
第六步:使用tf.nn.relu() 构造第二层卷积层 步长为2
第七步:使用tf.nn.relu(conv2d(x, W_conv3, 1) + b) 步长为1 来构造第三层卷积层
第八步:使用tf.reshape(h_conv4, [-1, 1600]) # 将卷积后的结果进行拉平操作
第九步:使用tf.nn.relu(tf.matmul(x, w_fc1) + b) # 进行第一次的全连接操作
第十步:使用tf.matmul(x, w_out) + b # 进行输出层的全连接操作,输出的结果为[None, action], 返货的结果为s,readout, h_fc1, s为输入结果,readout为输出结果,h_fc1为第一层全连接的输出结果
第二部分:将上述求得的s,readout, h_fc1, sess, 传入,这个先观察1000次,1000以后再进行模型参数的训练,同时这里使用一个epsilon, 当random.random() 小于epsilon,那么下一个动作的位置方向是随机值,否者的话就使用np.argmax(readout), 即readout预测出当前位置v值大的索引值作为方向
第一步:定义网络的损失值和训练步骤
第一步:使用tf.placeholder('float32', [None, 2]) 用来构造a即方向
第二步: 使用tf.placeholder('float32', [None]) 用来构造实际当前位置的V值
第三步:预测结果: action_readout = tf.reduce_sum(tf.multiply(readout, a)) 来获得当前预测的结果v奖励值
第四步: 构造cost函数,使用tf.reduce_mean(tf.square(action_readout - y))
第五步:构造train_op, 使用tf.trian.Adaoptimer(1e-6).minimize(cost)
第六步:使用sess.run(tf.global_variable_initial()) 进行参数的初始化操作
第二步:构造第一个输出的参数, 大小为[80, 80, 4]
第一步:使用game_state = game.GameState() 来实例化game_state的向量
第二步:使用D = deque() 来构造存储的向量,用于存储st 当前位置, a_t下一个动作的方向, r_t,下一个动作的及时奖励,以及st1,下一个位置
第三步: 构造一个初始化方向, do_nothing
第一步: np.zeros([2]) 来构造do_nothing的初始值
第二步:do_nothing[0] = 1 将do_nothing的第一个位置的索引值构造为1
第四步:将do_nothing 输入到game_state.frame_step(do_nothing) 获得do_nothing 下一个位置的x_t当前图片, r_0为及时奖励, terminal是否停止
第五步:使用cv2.cvtColor(cv2.resize(x_t, (80, 80), cv2.COLOR_RGB2GRAY)) 将图片进行维度变换,同时将其转换为灰度图
第六步: 使用cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY) # 进行二值化操作
第七步:使用np.stack((x_t, x_t, x_t, x_t), axis=2) # 将此时的四张图片做一个串接,作为第一个输入s_t
第三步:进入循环,用于进行参数的训练, t = 0
第一步:使用readout.eval(feed_dict={s:s_t}) 来获得当前位置的两个方向的v值
第二步:根据随机数和np.argmax() 来获得方向值
第一步:使用np.zeros(2) 来获得a_t的初始值
第二步: 使用random.random() 来获得随机值,如果随机值小于epsilon,使用random.ranrange(ACTION) 来获得一个0,1的随机值
第三步:a_t[index] = 1, 来构造a_t
第四步: 如果随机值大于epsilon, 使用np.argmax(t_readout) 来获得方向较大的预测值v,来作为索引值
第五步: a_t[index] = 1 来构造a_t
第三步: 使用game_state.frame_step(a_t) 来获得x_t1的图片,r_1的及时奖励, terminal判断是否停止了, 将其与s_t的前三帧图片进行拼接
第一步:使用game_state.frame_step(a_t) 来获得x_t1, r_1, terminal
第二步:使用cv2.cvtColor(cv2.resize(x_t1, (80, 80)), cv2.COLOR_RGB2GRAY) 将彩图转换为灰度图
第三步:使用cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY) 进行二值化操作
第四步:使用np.reshape(x_t, [80, 80, 1]) 将图片转换为[80, 80, 1] 的数据
第五步:使用np.append(x_t, s_t[:, :, 3], axis=2) 将图片进行拼接
第四步: 将s_t, a_t, r_1. s_t1 存储在D里面, s_t当前位置,a_t下一个方向,r_1下一个方向的奖励值,s_t1下一个位置
第一步:D.append((s_t, a_t, r_1, s_t1))
第二步:判断len(D) 大于MERORY, 使用D.popleft() 去除最后一张图片,使其保持5000的个数
第五步:如果t > OBSERVE, 使用random.sample(D, BATCH_SIZE) 从D中所及获取batch_size章图片进行模型的训练,sess.run(train_op)
第一步:ministbatch = random.sample(D, BATCH_SIZE) 从D中获得batch_size的信息
第二步:s_j_batch = [r[0] for r in mnistbatch] 获得当前位置, a_batch 下一个动作的方向, r_batch下一个动作的及时奖励,s_j1_batch 获得下一个位置的图片
第三步:使用readou_batch = readout.eval(feed_dict={s:s_j1_batch}) 来获得下一个位置的v奖励值,
第四步:构造y_batch,用于存储当前位置的v奖励值
第一步:循环len(ministbatch)
第二步:terminal = ministbatch[i][4] 来获得此时的terminal
第三步:if terminal 表示停止了,那么y_batch.append(r_batch[i]) # 当前的预测奖励值使用r_batch[i]
第四步:如果没有停止,那么y_batch.append(r_batch[i] + GAMMA*readout_batch[i]) # 当前的预测奖励值使用r_batch[j] + readout_batch[i] 加上的是下一个位置的奖励值
第六步: 使用sess.run(train_op, feed_dict={s: s_j_batch, a:a_bacth, y:y_batch}) ,s_j_batch表示当前位置,a_batch表示一个动作的方向,y_batch,表示当前位置的实际奖励值
第七步:s_t = s_t1, 将下一个动作的图片赋值给当前图片,t = t + 1 更新迭代次数
第八步:如果迭代次数为10000, 使用saver.save(global_step=t) 将模型进行保存
第九步:使用epislon -= (initial - final) / 进行episilon的更新操作
第十步:打印各个参数信息
train.py
import tensorflow as tf import numpy as np import sys sys.path.append('game/') import wrapped_flappy_bird as game import random from collections import deque import cv2 ACTION = 2 # 方向的种类 BATCH_SIZE = 64 # 每一个batch值 INITIAL_EPSILON = 0.1 # 刚开始随机的比例 FINAL_EPSILON = 0.0001 # 结束时随机的比例 OBSEVER = 1000 # 观察的次数 EXPLORE = 300000 # 进行探索的次数 PREPLAY_MEMORY = 50000 # D中保存图片的数量 FRAME_PER_ACTION = 1 # 每一次迭代的次数 GAMMA = 0.99 # 下一次奖励的衰减值 GAME ='bird' # 游戏的名字,用于存储save的名字 # 构造w权重参数,输入为shape,使用的是tf.truncated_normal def weight_variable(shape): w = tf.truncated_normal(shape, stddev=0.1) return tf.Variable(w) # 构造b权重参数,输入为shape, 使用的是tf.constant def biase_variable(shape): b = tf.constant(0.0, shape=shape) return tf.Variable(b) # 构造卷积层, 使用tf.nn.conv2d def conv2d(x, w, stride): return tf.nn.conv2d(x, w, strides=[1, stride, stride, 1], padding='SAME') # 构造池化层, 使用tf.nn.max_pool def max_pool_2x2(x): return tf.nn.max_pool(x, strides=[1, 2, 2, 1], ksize=[1, 2, 2, 1], padding='SAME') # 第一部分:用于构建网络,输出结果为当前位置两个方向的奖励值 def creatNetwork(): # 第一步:构造第一层卷积层W的参数 [32] W_conv1 = weight_variable([8, 8, 4, 32]) # 构造第一层卷积层b的参数 [32] b_conv1 = biase_variable([32]) # 第二步:构造第二层卷积层w的参数, [4, 4, 32, 64] W_conv2 = weight_variable([4, 4, 32, 64]) # 构造第二层卷积层b的参数 [64] b_conv2 = biase_variable([64]) # 第三步:构造第三层卷积层w的参数 [3, 3, 64, 64] W_conv3 = weight_variable([3, 3, 64, 64]) # 构造第三层卷积层b的参数 [64] b_conv3 = biase_variable([64]) # 构造全连接的w参数[1600, 512] W_fc1 = weight_variable([1600, 512]) # 构造全连接的b参数[512] b_fc1 = biase_variable([512]) # 构造输出层的w参数[512, 2] W_out = weight_variable([512, ACTION]) # 构造输出层的b参数[2] b_out = biase_variable([ACTION]) # 第四步:构造输入数据的维度[None, 80, 80, 4] s = tf.placeholder('float32', [None, 80, 80, 4]) # 第五步:第一层卷积, 使用tf.nn.relu() 和 pool层 h_conv1 = tf.nn.relu(conv2d(s, W_conv1, 4) + b_conv1) h_pool1 = max_pool_2x2(h_conv1) # 第六步:第二层卷积, 使用tf.nn.relu() 构造第二层卷积 h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2, 2) + b_conv2) # 第七步:第三层卷积, 使用tf.nn.relu() 构造第三层卷积 h_conv3 = tf.nn.relu(conv2d(h_conv2, W_conv3, 1) + b_conv3) # 第八步: 进行维度的拉平操作 fc_flatten = tf.reshape(h_conv3, [-1, 1600]) # 第九步:进行第一层全连接, h_fc1 = tf.nn.relu(tf.matmul(fc_flatten, W_fc1) + b_fc1) # 第十步:构造输出层 readout = tf.matmul(h_fc1, W_out) + b_out # 返回的是s, readout当前位置上下方向的奖励值, h_fc1表示前一层全连接操作 return s, readout, h_fc1 # 第二步:用于构建训练的网络 def trainNetwork(sess, s, readout, h_fc1): # 第一步:构造损失函数cost和训练操作 # 构建方向的输入值 a = tf.placeholder('float32', [None, ACTION]) # 构建当前位置奖励值 y = tf.placeholder('float32', [None]) # 使用位置信息与预测的奖励值进行相乘操作, 获得当前位置的预测奖励值 ACTION_READOUT = tf.reduce_sum(tf.multiply(a, readout)) # 使用预测奖励值与当前位置的奖励平方差来获得损失值 cost = tf.reduce_mean(tf.square(y - ACTION_READOUT)) # 使用tf.train.AdamOptimizer来构造损失值的降低函数 train_op = tf.train.AdamOptimizer(1E-6).minimize(cost) # 进行参数的初始化操作 sess.run(tf.global_variables_initializer()) # 对游戏进行实例化操作 game_state = game.GameState() # 第二步:构造第一个输入的样本,输入的数据的维度为[80, 80, 4] # 构造存储信息的列表 D = deque() # 定义一个初始方向 do_nothing = np.zeros([ACTION]) # 使得初始方向的位置为do_noting[0] = 1 do_nothing[0] = 1 # 根据初始的方向获得第一个位置信息,x_t下一个图的位置, r_0下一个位置的及时奖励,terminal是否停止 x_t, r_0, terminal = game_state.frame_step(do_nothing) # 对生成的图片使用cv2.resize进行维度的变化,将图片转换为灰度图 x_t = cv2.cvtColor(cv2.resize(x_t, (80, 80)), cv2.COLOR_RGB2GRAY) # 进行二值化操作 ret, x_t = cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY) # 使用np.stack进行图片的拼接操作, 将一帧的图片拼接为[80, 80, 4] s_t = np.stack((x_t, x_t, x_t, x_t), axis=2) # t为第一次迭代 t = 0 # 发现的随机值 epsilon = INITIAL_EPSILON # 第三步:进入循环进行操作的训练 while 'angry bird' != 'happy bird': # 将当前位置的图片输入,获得当前位置上下方向的奖励值 readout_t = readout.eval(feed_dict={s:[s_t]}) if t % FRAME_PER_ACTION == 0: # 将位置进行初始化 a_t = np.zeros(ACTION) # 如果获得的随机值小于阈值,就进行探索操作 if random.random() < epsilon: # 获得随机的一个方向 index = random.randrange(ACTION) # 将方向对应的位置赋值为1 a_t[index] = 1 else: # 获得当前位置两个方向较大的索引值 index = np.argmax(readout_t) # 将位置的索引赋值为1 a_t[index] = 1 else: a_t = do_nothing # 根据方向来获得下一个位置的图片,下一个位置的及时奖励,以及是否停止 x_t1, r_1, terminal = game_state.frame_step(a_t) # 将图片转化为灰度值 x_t1 = cv2.cvtColor(cv2.resize(x_t1, (80, 80)), cv2.COLOR_RGB2GRAY) # 将阈值大于1的设置为255,小于1的就是0不变,进行二值化操作 ret, x_t1 = cv2.threshold(x_t1, 1, 255, cv2.THRESH_BINARY) # 将x_t1进行维度变化转换为(80, 80, 1) x_t1 = np.reshape(x_t1, (80, 80, 1)) # 与测试图片的前3帧图片进行串接操作 s_t1 = np.append(x_t1, s_t[:, :, :3], axis=2) # 将当前位置样本,当前位置,下一个位置的奖励值,下一个位置样本,是否停止加入到D中 D.append((s_t, a_t, r_1, s_t1, terminal)) # 如果D的长度大于memory,使用popleft去除最后一个信息 if len(D) > PREPLAY_MEMORY: D.popleft() # 如果循环次数t大于观测值 if t > OBSEVER: # 获得一个batch的图片信息 mnistbatch = random.sample(D, BATCH_SIZE) # 获得当前样本的图片信息, [80, 80, 4] s_j_batch = [r[0] for r in mnistbatch] # 获得当前位置的方向信息,即向上的索引值为[1, 0], 向下的为[0, 1] a_batch = [r[1] for r in mnistbatch] # 下一个方向的及时奖励值 r_batch = [r[2] for r in mnistbatch] # 获得下一个位置的组合图片信息 [80, 80, 4] s_j1_batch = [r[3] for r in mnistbatch] # 获得下一个方向的v奖励值 readout_j1_batch = readout.eval(feed_dict={s:s_j1_batch}) # 用于进行v奖励值的存储 y_batch = [] # 循环,计算每一个位置的奖励值v, 即r for i in range(len(mnistbatch)): # 获得是够已经停止 terminal = mnistbatch[4][i] if terminal: # 如果停止,当前的奖励值为及时奖励 y_batch.append(r_batch[i]) else: # 否者,当前位置的奖励值为及时奖励 + 奖励衰减 * 下一个位置的奖励值 y_batch.append(r_batch[i] + GAMMA * readout_j1_batch[i]) # 进行缩小损失值的操作,输入为方向信息,y为当前位置的奖励值, s_j_batch表示当前位置的图片信息 train_op.eval(feed_dict={ a : a_batch, y: y_batch, s: s_j_batch }) # 将下一个阶段的位置信息赋值给当前位置,用于进行迭代 s_t = s_t1 # 将循环的次数+1 t = t + 1 # 构造saver进行参数的保存 saver = tf.train.Saver() if t % 1000 == 0: # 每迭代1000次,就进行参数的保存 saver.save(sess, 'saved_network/' + GAME + '-dqn', global_step=t) # 获得当前的位置 state = '' if t <= OBSEVER: state = 'observe' elif t > OBSEVER and t <= OBSEVER + EXPLORE: state = 'explore' else: state = 'train' # 降低探索的值,以保证探索的比例越来越小 if epsilon > FINAL_EPSILON and t > OBSEVER: epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / EXPLORE # 进行参数的打印 print('TIMESTEP', t, '/STATE', state, '/ EPSILON', epsilon, '/ ACTION', a_t, '/ REWARD', r_1, '/ Q_MAX %e' % np.max(readout_t)) def playGame(): # 构造执行函数 sess = tf.InteractiveSession() # 第一部分:卷积模型的构建,输出为s输入值, readout为当前的两个方向的奖励值,h_fc1第一层全连接的结果 s, readout, h_fc1 = creatNetwork() trainNetwork(sess, s, readout, h_fc1) def main(): # 建立游戏 playGame() if __name__ == '__main__': main()
效果图