本节我们会使用RNN来进行回归训练(Regression),会继续使用自己创建的sin曲线预测一条cos曲线。
首先我们需要先确定RNN的各种参数:
import tensorflow as tf import numpy as np import matplotlib.pyplot as plt BATCH_START = 0 # 建立 batch data 时候的 index TIME_STEPS = 20 # backpropagation through time 的 time_steps BATCH_SIZE = 50 INPUT_SIZE = 1 # sin 数据输入 size OUTPUT_SIZE = 1 # cos 数据输出 size CELL_SIZE = 10 # RNN 的 hidden unit size LR = 0.006 # learning rate
定义一个数据生成的get_batch function:
def get_batch(): global BATCH_START, TIME_STEPS # xs shape (50batch, 20steps) xs = np.arange(BATCH_START, BATCH_START+TIME_STEPS*BATCH_SIZE).reshape((BATCH_SIZE, TIME_STEPS)) / (10*np.pi) seq = np.sin(xs) res = np.cos(xs) BATCH_START += TIME_STEPS # returned seq, res and xs: shape (batch, step, input) return [seq[:, :, np.newaxis], res[:, :, np.newaxis], xs]
定义LSTMRNN的主体结构
使用一个class来定义这次的LSTMRNN会更加的方便,第一步定义class中的__int__传入各种参数:
class LSTMRNN(object): def __init__(self, n_steps, input_size, output_size, cell_size, batch_size): self.n_steps = n_steps self.input_size = input_size self.output_size = output_size self.cell_size = cell_size self.batch_size = batch_size with tf.name_scope('inputs'): self.xs = tf.placeholder(tf.float32, [None, n_steps, input_size], name='xs') self.ys = tf.placeholder(tf.float32, [None, n_steps, output_size], name='ys') with tf.variable_scope('in_hidden'): self.add_input_layer() with tf.variable_scope('LSTM_cell'): self.add_cell() with tf.variable_scope('out_hidden'): self.add_output_layer() with tf.name_scope('cost'): self.compute_cost() with tf.name_scope('train'): self.train_op = tf.train.AdamOptimizer(LR).minimize(self.cost)
设置add_input_layer功能 添加input_layer:
def add_input_layer(self,): l_in_x = tf.reshape(self.xs, [-1, self.input_size], name='2_2D') # (batch*n_step, in_size) # Ws (in_size, cell_size) Ws_in = self._weight_variable([self.input_size, self.cell_size]) # bs (cell_size, ) bs_in = self._bias_variable([self.cell_size,]) # l_in_y = (batch * n_steps, cell_size) with tf.name_scope('Wx_plus_b'): l_in_y = tf.matmul(l_in_x, Ws_in) + bs_in # reshape l_in_y ==> (batch, n_steps, cell_size) self.l_in_y = tf.reshape(l_in_y, [-1, self.n_steps, self.cell_size], name='2_3D')
设置add_cell功能,添加cell,注意这里的self.cell_int_state,因为我们在train的时候,这个地方要做特别的声明。
def add_cell(self): lstm_cell = tf.contrib.rnn.BasicLSTMCell(self.cell_size, forget_bias=1.0, state_is_tuple=True) with tf.name_scope('initial_state'): self.cell_init_state = lstm_cell.zero_state(self.batch_size, dtype=tf.float32) self.cell_outputs, self.cell_final_state = tf.nn.dynamic_rnn( lstm_cell, self.l_in_y, initial_state=self.cell_init_state, time_major=False)
设置add_output_layer的功能,添加output_layer:
def add_output_layer(self): # shape = (batch * steps, cell_size) l_out_x = tf.reshape(self.cell_outputs, [-1, self.cell_size], name='2_2D') Ws_out = self._weight_variable([self.cell_size, self.output_size]) bs_out = self._bias_variable([self.output_size, ]) # shape = (batch * steps, output_size) with tf.name_scope('Wx_plus_b'): self.pred = tf.matmul(l_out_x, Ws_out) + bs_out
添加RNN中剩下的部分
def compute_cost(self): losses = tf.contrib.legacy_seq2seq.sequence_loss_by_example( [tf.reshape(self.pred, [-1], name='reshape_pred')], [tf.reshape(self.ys, [-1], name='reshape_target')], [tf.ones([self.batch_size * self.n_steps], dtype=tf.float32)], average_across_timesteps=True, softmax_loss_function=self.ms_error, name='losses' ) with tf.name_scope('average_cost'): self.cost = tf.div( tf.reduce_sum(losses, name='losses_sum'), self.batch_size, name='average_cost') tf.summary.scalar('cost', self.cost) def ms_error(self, y_target, y_pre): return tf.square(tf.sub(y_target, y_pre)) def _weight_variable(self, shape, name='weights'): initializer = tf.random_normal_initializer(mean=0., stddev=1.,) return tf.get_variable(shape=shape, initializer=initializer, name=name) def _bias_variable(self, shape, name='biases'): initializer = tf.constant_initializer(0.1) return tf.get_variable(name=name, shape=shape, initializer=initializer)
这里说明一下TensorFlow LSTMatate_is_tuple参数问题
state_is_tuple 官方建议设置为True。此时,输入和输出的states为c(cell状态)和h(输出)的二元组
输入、输出、cell的维度相同,都是 batch_size * num_units。
训练LSTMRNN
if __name__ == '__main__': # 搭建 LSTMRNN 模型 model = LSTMRNN(TIME_STEPS, INPUT_SIZE, OUTPUT_SIZE, CELL_SIZE, BATCH_SIZE) sess = tf.Session() # sess.run(tf.initialize_all_variables()) # tf 马上就要废弃这种写法 # 替换成下面的写法: sess.run(tf.global_variables_initializer()) # 训练 200 次 for i in range(200): seq, res, xs = get_batch() # 提取 batch data if i == 0: # 初始化 data feed_dict = { model.xs: seq, model.ys: res, } else: feed_dict = { model.xs: seq, model.ys: res, model.cell_init_state: state # 保持 state 的连续性 } # 训练 _, cost, state, pred = sess.run( [model.train_op, model.cost, model.cell_final_state, model.pred], feed_dict=feed_dict) # 打印 cost 结果 if i % 20 == 0: print('cost: ', round(cost, 4))
完整代码如下所示:
import tensorflow as tf from tensorflow.examples.tutorials.mnist import input_data import numpy as np import matplotlib.pyplot as plt tf.set_random_seed(1) np.random.seed(1) # Hyper Parameters BATCH_SIZE = 64 TIME_STEP = 28 # rnn time step / image height INPUT_SIZE = 28 # rnn input size / image width LR = 0.01 # learning rate # data mnist = input_data.read_data_sets('./mnist', one_hot=True) # they has been normalized to range (0,1) test_x = mnist.test.images[:2000] test_y = mnist.test.labels[:2000] # plot one example print(mnist.train.images.shape) # (55000, 28 * 28) print(mnist.train.labels.shape) # (55000, 10) plt.imshow(mnist.train.images[0].reshape((28, 28)), cmap='gray') plt.title('%i' % np.argmax(mnist.train.labels[0])) plt.show() # tensorflow placeholders tf_x = tf.placeholder(tf.float32, [None, TIME_STEP * INPUT_SIZE]) # shape(batch, 784) image = tf.reshape(tf_x, [-1, TIME_STEP, INPUT_SIZE]) # (batch, height, width, channel) tf_y = tf.placeholder(tf.int32, [None, 10]) # input y # RNN rnn_cell = tf.nn.rnn_cell.LSTMCell(num_units=64) outputs, (h_c, h_n) = tf.nn.dynamic_rnn( rnn_cell, # cell you have chosen image, # input initial_state=None, # the initial hidden state dtype=tf.float32, # must given if set initial_state = None time_major=False, # False: (batch, time step, input); True: (time step, batch, input) ) output = tf.layers.dense(outputs[:, -1, :], 10) # output based on the last output step loss = tf.losses.softmax_cross_entropy(onehot_labels=tf_y, logits=output) # compute cost train_op = tf.train.AdamOptimizer(LR).minimize(loss) accuracy = tf.metrics.accuracy( # return (acc, update_op), and create 2 local variables labels=tf.argmax(tf_y, axis=1), predictions=tf.argmax(output, axis=1),)[1] sess = tf.Session() init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer()) # the local var is for accuracy_op sess.run(init_op) # initialize var in graph for step in range(1200): # training b_x, b_y = mnist.train.next_batch(BATCH_SIZE) _, loss_ = sess.run([train_op, loss], {tf_x: b_x, tf_y: b_y}) if step % 50 == 0: # testing accuracy_ = sess.run(accuracy, {tf_x: test_x, tf_y: test_y}) print('train loss: %.4f' % loss_, '| test accuracy: %.2f' % accuracy_) # print 10 predictions from test data test_output = sess.run(output, {tf_x: test_x[:10]}) pred_y = np.argmax(test_output, 1) print(pred_y, 'prediction number') print(np.argmax(test_y[:10], 1), 'real number')