tf.train.batch( tensors, batch_size, num_threads=1, capacity=32, enqueue_many=False, shapes=None, dynamic_pad=False, allow_smaller_final_batch=False, shared_name=None, name=None )
tf.train.slice_input_producer( tensor_list, num_epochs=None, shuffle=True, seed=None, capacity=32, shared_name=None, name=None )
核心步骤:
- 调用 tf.train.slice_input_producer,从 本地文件里抽取tensor,准备放入Filename Queue(文件名队列)中;
- 调用 tf.train.batch,从文件名队列中提取tensor,使用单个或多个线程,准备放入文件队列;
- 调用 tf.train.Coordinator() 来创建一个线程协调器,用来管理之后在Session中启动的所有线程;
- 调用tf.train.start_queue_runners, 启动入队线程,由多个或单个线程,按照设定规则,把文件读入Filename Queue中。函数返回线程ID的列表,一般情况下,系统有多少个核,就会启动多少个入队线程(入队具体使用多少个线程在tf.train.batch中定义);
- 文件从 Filename Queue中读入内存队列的操作不用手动执行,由tf自动完成;
- 调用sess.run 来启动数据出列和执行计算;
- 使用 coord.should_stop()来查询是否应该终止所有线程,当文件队列(queue)中的所有文件都已经读取出列的时候,会抛出一个 OutofRangeError 的异常,这时候就应该停止Sesson中的所有线程了;
- 使用coord.request_stop()来发出终止所有线程的命令,使用coord.join(threads)把线程加入主线程,等待threads结束。
Queue和Coordinator操作事例:
import tensorflow as tf import numpy as np # 样本个数 sample_num=5 # 设置迭代次数 epoch_num = 2 # 设置一个批次中包含样本个数 batch_size = 3 # 计算每一轮epoch中含有的batch个数 batch_total = int(sample_num/batch_size)+1 # 生成4个数据和标签 def generate_data(sample_num=sample_num): labels = np.asarray(range(0, sample_num)) images = np.random.random([sample_num, 224, 224, 3]) print('image size {},label size :{}'.format(images.shape, labels.shape)) return images,labels def get_batch_data(batch_size=batch_size): images, label = generate_data() # 数据类型转换为tf.float32 images = tf.cast(images, tf.float32) label = tf.cast(label, tf.int32) #从tensor列表中按顺序或随机抽取一个tensor准备放入文件名称队列 input_queue = tf.train.slice_input_producer([images, label], num_epochs=epoch_num, shuffle=False) #从文件名称队列中读取文件准备放入文件队列 image_batch, label_batch = tf.train.batch(input_queue, batch_size=batch_size, num_threads=2, capacity=64, allow_smaller_final_batch=False) return image_batch, label_batch image_batch, label_batch = get_batch_data(batch_size=batch_size) with tf.Session() as sess: # 先执行初始化工作 sess.run(tf.global_variables_initializer()) sess.run(tf.local_variables_initializer()) # 开启一个协调器 coord = tf.train.Coordinator() # 使用start_queue_runners 启动队列填充 threads = tf.train.start_queue_runners(sess, coord) try: while not coord.should_stop(): print ('************') # 获取每一个batch中batch_size个样本和标签 image_batch_v, label_batch_v = sess.run([image_batch, label_batch]) print(image_batch_v.shape, label_batch_v) except tf.errors.OutOfRangeError: #如果读取到文件队列末尾会抛出此异常 print("done! now lets kill all the threads……") finally: # 协调器coord发出所有线程终止信号 coord.request_stop() print('all threads are asked to stop!') coord.join(threads) #把开启的线程加入主线程,等待threads结束 print('all threads are stopped!')
输出:
************ ((3, 224, 224, 3), array([0, 1, 2], dtype=int32)) ************ ((3, 224, 224, 3), array([3, 4, 0], dtype=int32)) ************ ((3, 224, 224, 3), array([1, 2, 3], dtype=int32)) ************ done! now lets kill all the threads…… all threads are asked to stop! all threads are stopped!
以上程序在 tf.train.slice_input_producer 函数中设置了 num_epochs 的数量, 所以在文件队列末尾有结束标志,读到这个结束标志的时候抛出 OutofRangeError 异常,就可以结束各个线程了。
如果不设置 num_epochs 的数量,则文件队列是无限循环的,没有结束标志,程序会一直执行下去。