如果有图会很好理解,最近太忙,以后再加吧
-
#首先有一个需要读取的文件名列表 #然后将文件名列表通过函数string_input_producer放进文件名队列。 #有时候因为数据量太大,需要把他们放进不同的tfrecord文件中 filename_queue = tf.train.string_input_producer(["file0.csv","file1.csv"]) #对不同格式的文件有不同的reader reader = tf.TextLineReader() #通过reader的read函数extract a record from a file whose name is in the queue, #如果该文件中所有记录都被抽取完,dequeue这个filename,参考readerbase #read()返回下一个record key, value = reader.read(filename_queue) # decoded record,decode方式和文件内部record格式相关,然后拼接成需要的格式 record_defaults =[[1],[1],[1],[1],[1]] col1, col2, col3, col4, col5 = tf.decode_csv( value, record_defaults=record_defaults) features = tf.stack([col1, col2, col3, col4]) with tf.Session()as sess: # Start populating the filename queue. coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(coord=coord) for i in range(1200): # Retrieve a single instance: example, label = sess.run([features, col5]) coord.request_stop() coord.join(threads)
参考:https://www.tensorflow.org/programmers_guide/reading_data
提到queue就不得不提两个帮助多线程异步的类:tf.train.Coordinator和tf.train.QueueRunner;
- tf.train.Coordinator:控制多线程,使其同时结束。
- tf.train.QueueRunner:包含一些enqueue op,为其create一些线程,每一个op都在一个线程上运行。
coordinator
Coordinator方法:should_stop,request_stop,join
-
1 # Thread body: loop until the coordinator indicates a stop was requested. 2 # If some condition becomes true, ask the coordinator to stop. 3 defMyLoop(coord): 4 whilenot coord.should_stop():#should_stop返回true or false,表示线程是否该结束 5 ...do something... 6 if...some condition...: 7 coord.request_stop()#当某些条件发生时,一个进程request_stop,其他进程因为should_stop返回true而终止 8 # Main thread: create a coordinator. 9 coord = tf.train.Coordinator() 10 # Create 10 threads that run 'MyLoop()' 11 threads =[threading.Thread(target=MyLoop, args=(coord,))for i in xrange(10)] 12 # Start the threads and wait for all of them to stop. 13 for t in threads: 14 t.start() 15 coord.join(threads)
QueueRunner
-
1 example =...ops to create one example... 2 # Create a queue, and an op that enqueues examples one at a time in the queue. 3 #区别于filename queue,这是example queue。可以是接着上面读数据解析然后放进这个queue 4 queue = tf.RandomShuffleQueue(...) 5 enqueue_op = queue.enqueue(example)#定义入队操作 6 # Create a training graph that starts by dequeuing a batch of examples. 7 inputs = queue.dequeue_many(batch_size) 8 train_op =...use 'inputs' to build the training part of the graph... 9 # Create a queue runner that will run 4 threads in parallel to enqueue 10 # examples. 11 #QueueRunner的构造函数,queuerunner是为一个queue的入队操作多线程化服务的, 12 #第二个参数是入队操作列表 13 qr = tf.train.QueueRunner(queue,[enqueue_op]*4) 14 # Launch the graph. 15 sess = tf.Session() 16 # Create a coordinator, launch the queue runner threads. 17 coord = tf.train.Coordinator() 18 #queuerunner为queue创造多线程,并且把这些线程的结束交由coordinator管理 19 enqueue_threads = qr.create_threads(sess, coord=coord, start=True) 20 # Run the training loop, controlling termination with the coordinator. 21 for step in xrange(1000000): 22 if coord.should_stop(): 23 break 24 sess.run(train_op) 25 # When done, ask the threads to stop. 26 coord.request_stop() 27 # And wait for them to actually do it. 28 coord.join(enqueue_threads)
未完待续。。。