zoukankan      html  css  js  c++  java
  • TensorFlow queue多线程读取数据

    一、tensorflow读取机制图解

    我们必须要把数据先读入后才能进行计算,假设读入用时0.1s,计算用时0.9s,那么就意味着每过1s,GPU都会有0.1s无事可做,这就大大降低了运算的效率。

    解决这个问题方法就是将读入数据和计算分别放在两个线程中,将数据读入内存的一个队列,如下图所示:

    读取线程源源不断地将文件系统中的图片读入到一个内存的队列中,而负责计算的是另一个线程,计算需要数据时,直接从内存队列中取就可以了。这样就可以解决GPU因为IO而空闲的问题!

    在tensorflow中,为了方便管理,在内存队列前又添加了一层所谓的“文件名队列”。tensorflow使用文件名队列+内存队列双队列的形式读入文件,可以很好地管理epoch。下面我们用图片的形式来说明这个机制的运行方式。

    如果再尝试读入,系统由于检测到了“结束”,就会自动抛出一个异常(OutOfRange)。外部捕捉到这个异常后就可以结束程序了。这就是tensorflow中读取数据的基本机制。

    二、tensorflow读取数据机制的对应函数

    对于文件名队列,我们使用tf.train.string_input_producer函数。这个函数需要传入一个文件名list,系统会自动将它转为一个文件名队列。tf.train.string_input_producer还有两个重要的参数,一个是num_epochs,表示epoch数。另外一个就是shuffle是指在一个epoch内文件的顺序是否被打乱。

    在tensorflow中,内存队列不需要我们自己建立,我们只需要使用reader对象从文件名队列中读取数据就可以了。

    在我们使用tf.train.string_input_producer创建文件名队列后,整个系统其实还是处于“停滞状态”的,也就是说,我们文件名并没有真正被加入到队列中,此时如果我们开始计算,因为内存队列中什么也没有,计算单元就会一直等待,导致整个系统被阻塞。使用tf.train.start_queue_runners之后,才会启动填充队列的线程,这时系统就不再“停滞”。此后计算单元就可以拿到数据并进行计算,整个程序也就跑起来了。

    reader每次读取一张图片并保存。

     1 import tensorflow as tf 
     2 
     3 # 新建一个Session
     4 with tf.Session() as sess:
     5     # 我们要读三幅图片A.jpg, B.jpg, C.jpg
     6     filename = ['A.jpg', 'B.jpg', 'C.jpg']
     7     # string_input_producer会产生一个文件名队列
     8     filename_queue = tf.train.string_input_producer(filename, shuffle=False, num_epochs=5)
     9     # reader从文件名队列中读数据。对应的方法是reader.read
    10     reader = tf.WholeFileReader()
    11     key, value = reader.read(filename_queue)
    12     # tf.train.string_input_producer定义了一个epoch变量,要对它进行初始化
    13     tf.local_variables_initializer().run()
    14     # 使用start_queue_runners之后,才会开始填充队列
    15     threads = tf.train.start_queue_runners(sess=sess)
    16     i = 0
    17     while True:
    18         i += 1
    19         # 获取图片数据并保存
    20         image_data = sess.run(value)
    21         with open('read/test_%d.jpg' % i, 'wb') as f:
    22             f.write(image_data)

    三个概念:

    • Queue是TF队列和缓存机制的实现
    • QueueRunner是TF中对操作Queue的线程的封装
    • Coordinator是TF中用来协调线程运行的工具

    Queue:

    • tf.FIFOQueue 按入列顺序出列的队列
    • tf.RandomShuffleQueue 随机顺序出列的队列
    • tf.PaddingFIFOQueue 以固定长度批量出列的队列
    • tf.PriorityQueue 带优先级出列的队列

    创建函数的参数:

    tf.FIFOQueue(capacity, dtypes, shapes=None, names=None ...)
    Queue主要包含入列(enqueue)出列(dequeue)两个操作。enqueue操作返回计算图中的一个Operation节点,dequeue操作返回一个Tensor值。Tensor在创建时同样只是一个定义(或称为“声明”),需要放在Session中运行才能获得真正的数值。
     1 import tensorflow as tf
     2 tf.InteractiveSession()
     3 
     4 q = tf.FIFOQueue(2, "float")
     5 init = q.enqueue_many(([0,0],))
     6 
     7 x = q.dequeue()
     8 y = x+1
     9 q_inc = q.enqueue([y])
    10 
    11 init.run()
    12 q_inc.run()
    13 q_inc.run()
    14 q_inc.run()
    15 x.eval()  # 返回1
    16 x.eval()  # 返回2
    17 x.eval()  # 卡住

    QueueRunner
    Tensorflow的计算主要在使用CPU/GPU和内存,而数据读取涉及磁盘操作,速度远低于前者操作。因此通常会使用多个线程读取数据,然后
    使用一个线程消费数据,QueueRunner就是来管理这些读写队列的线程。
     1 import tensorflow as tf  
     2 import sys  
     3 q = tf.FIFOQueue(10, "float")  
     4 counter = tf.Variable(0.0)  #计数器
     5 # 给计数器加一
     6 increment_op = tf.assign_add(counter, 1.0)
     7 # 将计数器加入队列
     8 enqueue_op = q.enqueue(counter)
     9 
    10 # 创建QueueRunner
    11 # 用多个线程向队列添加数据
    12 # 这里实际创建了4个线程,两个增加计数,两个执行入队
    13 qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2)
    14 
    15 # 主线程
    16 sess = tf.InteractiveSession()
    17 tf.global_variables_initializer().run()
    18 # 启动入队线程
    19 qr.create_threads(sess, start=True)
    20 for i in range(20):
    21     print (sess.run(q.dequeue()))
    
    

    增加计数的进程会不停的后台运行,执行入队的进程会先执行10次(因为队列长度只有10),然后主线程开始消费数据,当一部分数据消费被后,入队的进程又会开始执行。最终主线程消费完20个数据后停止,但其他线程继续运行,程序不会结束。

    Coordinator:

    用来保存线程组运行状态的协调器对象

     1 import tensorflow as tf
     2 import threading, time
     3 
     4 # 子线程函数
     5 def loop(coord, id):
     6     t = 0
     7     while not coord.should_stop():
     8         print(id)
     9         time.sleep(1)
    10         t += 1
    11         # 只有1号线程调用request_stop方法
    12         if (t >= 2 and id == 1):
    13             coord.request_stop()
    14 
    15 # 主线程
    16 coord = tf.train.Coordinator()
    17 # 使用Python API创建10个线程
    18 threads = [threading.Thread(target=loop, args=(coord, i)) for i in range(10)]
    19 
    20 # 启动所有线程,并等待线程结束
    21 for t in threads: t.start()
    22 coord.join(threads)

    所有的子线程执行完两个周期后都会停止,主线程会等待所有子线程都停止后结束,从而使整个程序结束。由此可见,只要有任何一个线程调用了Coordinator的request_stop方法,所有的线程都可以通过should_stop方法感知并停止当前线程。

    ALL:

    第一种,显式的创建QueueRunner,然后调用它的create_threads方法启动线程。例如下面这段代码:

     1 import tensorflow as tf
     2 
     3 # 1000个4维输入向量,每个数取值为1-10之间的随机数
     4 data = 10 * np.random.randn(1000, 4) + 1
     5 # 1000个随机的目标值,值为0或1
     6 target = np.random.randint(0, 2, size=1000)
     7 
     8 # 创建Queue,队列中每一项包含一个输入数据和相应的目标值
     9 queue = tf.FIFOQueue(capacity=50, dtypes=[tf.float32, tf.int32], shapes=[[4], []])
    10 
    11 # 批量入列数据(这是一个Operation)
    12 enqueue_op = queue.enqueue_many([data, target])
    13 # 出列数据(这是一个Tensor定义)
    14 data_sample, label_sample = queue.dequeue()
    15 
    16 # 创建包含4个线程的QueueRunner
    17 qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)
    18 
    19 with tf.Session() as sess:
    20     # 创建Coordinator
    21     coord = tf.train.Coordinator()
    22     # 启动QueueRunner管理的线程
    23     enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
    24     # 主线程,消费100个数据
    25     for step in range(100):
    26         if coord.should_stop():
    27             break
    28         data_batch, label_batch = sess.run([data_sample, label_sample])
    29     # 主线程计算完成,停止所有采集数据的进程
    30     coord.request_stop()
    31     coord.join(enqueue_threads)

    第二种,使用全局的start_queue_runners方法启动线程。

    在这个例子中,tf.train.string_input_produecer将一个隐含的QueueRunner添加到全局图中,类似的操作还有tf.train.shuffle_batch等)。由于没有显式地返回QueueRunner来用create_threads启动线程,这里用tf.train.start_queue_runners方法直接启动tf.GraphKeys.QUEUE_RUNNERS集合中的所有队列线程。

     1 import tensorflow as tf
     2 
     3 # 同时打开多个文件,显示创建Queue,同时隐含了QueueRunner的创建
     4 filename_queue = tf.train.string_input_producer(["data1.csv","data2.csv"])
     5 reader = tf.TextLineReader(skip_header_lines=1)
     6 # Tensorflow的Reader对象可以直接接受一个Queue作为输入
     7 key, value = reader.read(filename_queue)
     8 
     9 with tf.Session() as sess:
    10     coord = tf.train.Coordinator()
    11     # 启动计算图中所有的队列线程
    12     threads = tf.train.start_queue_runners(coord=coord)
    13     # 主线程,消费100个数据
    14     for _ in range(100):
    15         features, labels = sess.run([data_batch, label_batch])
    16     # 主线程计算完成,停止所有采集数据的进程
    17     coord.request_stop()
    18     coord.join(threads)

    这两种方式在效果上是等效的

     1 import pandas as pd
     2 import numpy as np
     3 import tensorflow as tf
     4 
     5 
     6 def generate_data():
     7     num = 25
     8     label = np.asarray(range(0, num))
     9     images = np.random.random([num, 5])
    10     print('label size :{}, image size {}'.format(label.shape, images.shape))
    11     return images,label
    12 
    13 def get_batch_data():
    14     images, label = generate_data()
    15     input_queue = tf.train.slice_input_producer([images, label], shuffle=False,num_epochs=2)
    16     image_batch, label_batch = tf.train.batch(input_queue, batch_size=5, num_threads=1, capacity=64,allow_smaller_final_batch=False)
    17     return image_batch,label_batch
    18 
    19 
    20 images,label = get_batch_data()
    21 sess = tf.Session()
    22 sess.run(tf.global_variables_initializer())
    23 sess.run(tf.local_variables_initializer())#这一行必须加,因为slice_input_producer的原因
    24 coord = tf.train.Coordinator()
    25 threads = tf.train.start_queue_runners(sess,coord)
    26 try:
    27     while not coord.should_stop():
    28         i,l = sess.run([images,label])
    29         print(i)
    30         print(l)
    31 except tf.errors.OutOfRangeError:
    32     print('Done training')
    33 finally:
    34     coord.request_stop()
    35 coord.join(threads)
    36 sess.close()

    使用队列机制不需要 feed_dict,不再浪费内存,并提高GPU的利用率,节省训练时间

    文件准备

    1
    2
    3
    4
    5
    6
    7
    $ echo -e "Alpha1,A1 Alpha2,A2 Alpha3,A3" > A.csv
    $ echo -e "Bee1,B1 Bee2,B2 Bee3,B3" > B.csv
    $ echo -e "Sea1,C1 Sea2,C2 Sea3,C3" > C.csv
    $ cat A.csv
    Alpha1,A1
    Alpha2,A2
    Alpha3,A3

    单个Reader,单个样本

     1 import tensorflow as tf
     2 # 生成一个先入先出队列和一个QueueRunner
     3 filenames = ['A.csv', 'B.csv', 'C.csv']
     4 filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
     5 # 定义Reader
     6 reader = tf.TextLineReader()
     7 key, value = reader.read(filename_queue)
     8 # 定义Decoder
     9 example, label = tf.decode_csv(value, record_defaults=[['null'], ['null']])
    10 # 运行Graph
    11 with tf.Session() as sess:
    12     coord = tf.train.Coordinator()  #创建一个协调器,管理线程
    13     threads = tf.train.start_queue_runners(coord=coord)  #启动QueueRunner, 此时文件名队列已经进队。
    14     for i in range(10):
    15         print example.eval()   #取样本的时候,一个Reader先从文件名队列中取出文件名,读出数据,Decoder解析后进入样本队列。
    16     coord.request_stop()
    17     coord.join(threads)
    18 # outpt
    19 Alpha1
    20 Alpha2
    21 Alpha3
    22 Bee1
    23 Bee2
    24 Bee3
    25 Sea1
    26 Sea2
    27 Sea3
    28 Alpha1

    单个Reader,多个样本

     1 import tensorflow as tf
     2 filenames = ['A.csv', 'B.csv', 'C.csv']
     3 filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
     4 reader = tf.TextLineReader()
     5 key, value = reader.read(filename_queue)
     6 example, label = tf.decode_csv(value, record_defaults=[['null'], ['null']])
     7 # 使用tf.train.batch()会多加了一个样本队列和一个QueueRunner。Decoder解后数据会进入这个队列,再批量出队。
     8 # 虽然这里只有一个Reader,但可以设置多线程,相应增加线程数会提高读取速度,但并不是线程越多越好。
     9 example_batch, label_batch = tf.train.batch(
    10       [example, label], batch_size=5)
    11 with tf.Session() as sess:
    12     coord = tf.train.Coordinator()
    13     threads = tf.train.start_queue_runners(coord=coord)
    14     for i in range(10):
    15         print example_batch.eval()
    16     coord.request_stop()
    17     coord.join(threads)
    18 # output
    19 # ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2']
    20 # ['Bee3' 'Sea1' 'Sea2' 'Sea3' 'Alpha1']
    21 # ['Alpha2' 'Alpha3' 'Bee1' 'Bee2' 'Bee3']
    22 # ['Sea1' 'Sea2' 'Sea3' 'Alpha1' 'Alpha2']
    23 # ['Alpha3' 'Bee1' 'Bee2' 'Bee3' 'Sea1']
    24 # ['Sea2' 'Sea3' 'Alpha1' 'Alpha2' 'Alpha3']
    25 # ['Bee1' 'Bee2' 'Bee3' 'Sea1' 'Sea2']
    26 # ['Sea3' 'Alpha1' 'Alpha2' 'Alpha3' 'Bee1']
    27 # ['Bee2' 'Bee3' 'Sea1' 'Sea2' 'Sea3']
    28 # ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2']

    多Reader,多个样本

     1 import tensorflow as tf
     2 filenames = ['A.csv', 'B.csv', 'C.csv']
     3 filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
     4 reader = tf.TextLineReader()
     5 key, value = reader.read(filename_queue)
     6 record_defaults = [['null'], ['null']]
     7 example_list = [tf.decode_csv(value, record_defaults=record_defaults)
     8                   for _ in range(2)]  # Reader设置为2
     9 # 使用tf.train.batch_join(),可以使用多个reader,并行读取数据。每个Reader使用一个线程。
    10 example_batch, label_batch = tf.train.batch_join(
    11       example_list, batch_size=5)
    12 with tf.Session() as sess:
    13     coord = tf.train.Coordinator()
    14     threads = tf.train.start_queue_runners(coord=coord)
    15     for i in range(10):
    16         print example_batch.eval()
    17     coord.request_stop()
    18     coord.join(threads)
    19     
    20 # output
    21 # ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2']
    22 # ['Bee3' 'Sea1' 'Sea2' 'Sea3' 'Alpha1']
    23 # ['Alpha2' 'Alpha3' 'Bee1' 'Bee2' 'Bee3']
    24 # ['Sea1' 'Sea2' 'Sea3' 'Alpha1' 'Alpha2']
    25 # ['Alpha3' 'Bee1' 'Bee2' 'Bee3' 'Sea1']
    26 # ['Sea2' 'Sea3' 'Alpha1' 'Alpha2' 'Alpha3']
    27 # ['Bee1' 'Bee2' 'Bee3' 'Sea1' 'Sea2']
    28 # ['Sea3' 'Alpha1' 'Alpha2' 'Alpha3' 'Bee1']
    29 # ['Bee2' 'Bee3' 'Sea1' 'Sea2' 'Sea3']
    30 # ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2']


    tf.train.batch
    tf.train.shuffle_batch函数是单个Reader读取,但是可以多线程。tf.train.batch_jointf.train.shuffle_batch_join可设置多Reader读取,每个Reader使用一个线程。至于两种方法的效率,单Reader时,2个线程就达到了速度的极限。多Reader时,2个Reader就达到了极限。所以并不是线程越多越快,甚至更多的线程反而会使效率下降。

    迭代控制

     1 filenames = ['A.csv', 'B.csv', 'C.csv']
     2 filename_queue = tf.train.string_input_producer(filenames, shuffle=False, num_epochs=3)  # num_epoch: 设置迭代数
     3 reader = tf.TextLineReader()
     4 key, value = reader.read(filename_queue)
     5 record_defaults = [['null'], ['null']]
     6 example_list = [tf.decode_csv(value, record_defaults=record_defaults)
     7                   for _ in range(2)]
     8 example_batch, label_batch = tf.train.batch_join(
     9       example_list, batch_size=5)
    10 init_local_op = tf.initialize_local_variables()
    11 with tf.Session() as sess:
    12     sess.run(init_local_op)   # 初始化本地变量 
    13     coord = tf.train.Coordinator()
    14     threads = tf.train.start_queue_runners(coord=coord)
    15     try:
    16         while not coord.should_stop():
    17             print example_batch.eval()
    18     except tf.errors.OutOfRangeError:
    19         print('Epochs Complete!')
    20     finally:
    21         coord.request_stop()
    22     coord.join(threads)
    23     coord.request_stop()
    24     coord.join(threads)
    25 # output
    26 # ['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2']
    27 # ['Bee3' 'Sea1' 'Sea2' 'Sea3' 'Alpha1']
    28 # ['Alpha2' 'Alpha3' 'Bee1' 'Bee2' 'Bee3']
    29 # ['Sea1' 'Sea2' 'Sea3' 'Alpha1' 'Alpha2']
    30 # ['Alpha3' 'Bee1' 'Bee2' 'Bee3' 'Sea1']
    31 # Epochs Complete!


    参考自:在迭代控制中,记得添加tf.initialize_local_variables(),官网教程没有说明,但是如果不初始化,运行就会报错。

    https://zhuanlan.zhihu.com/p/27238630

    http://www.jianshu.com/p/d063804fb272

    http://honggang.io/2016/08/19/tensorflow-data-reading/

    http://www.jianshu.com/p/f07f28448313

  • 相关阅读:
    JavaScript中的数据类型转换
    JavaScript中的变量
    set_uid set_gid stick_bit 软硬链接
    chmod、chown、umask、lsattr/chattr
    环境变量、cp、mv、cat 等命令
    相对和绝对路径 mkdir cd rm 等命令
    linux 系统 目录,以部分及相关命令
    单用户模式 和救援模式 、以及相互登陆(免密)
    putty 、xshell的使用 和 putty 、xshell、 shell 间免密登陆
    vmware NAT 网络出现问题的解决方法
  • 原文地址:https://www.cnblogs.com/demian/p/8005407.html
Copyright © 2011-2022 走看看