zoukankan      html  css  js  c++  java
  • Tensorflow多线程输入数据处理框架(一)——队列与多线程

    参考书

    《TensorFlow:实战Google深度学习框架》(第2版)

    对于队列,修改队列状态的操作主要有Enqueue、EnqueueMany和Dequeue。以下程序展示了如何使用这些函数来操作一个队列。

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    # coding=utf-8 
    
    """
    @author: Li Tian
    @contact: 694317828@qq.com
    @software: pycharm
    @file: queue_operate.py
    @time: 2019/1/31 21:32
    @desc: 操作一个队列
    """
    
    import tensorflow as tf
    
    # 创建一个先进先出的队列,指定队列中最多可以保存两个元素,并指定类型为整数
    q = tf.FIFOQueue(2, "int32")
    # 使用enqueue_many函数来初始化队列中的元素。和变量初始化类似,在使用队列之前需要明确的调用这个初始化过程。
    init = q.enqueue_many(([0, 10],))
    # 使用Dequeue函数将队列中的第一个元素出队列。这个元素的值将被存在变量x中
    x = q.dequeue()
    # 将得到的值+1
    y = x + 1
    # 将+1后的值再重新加入队列。
    q_inc = q.enqueue([y])
    
    with tf.Session() as sess:
        # 运行初始化队列的操作
        init.run()
        for _ in range(5):
            # 运行q_inc将执行数据出队列、出队的元素+1、重新加入队列的整个过程。
            v, _ = sess.run([x, q_inc])
            # 打印出队元素的取值
            print(v)

    运行结果:


    tf.Coordinator主要用于协同多个线程一起停止,以下程序展示了如何使用tf.Coordinator。

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    # coding=utf-8 
    
    """
    @author: Li Tian
    @contact: 694317828@qq.com
    @software: pycharm
    @file: coordinator_test1.py
    @time: 2019/2/2 21:35
    @desc: tf.Coordinator主要用于协同多个线程一起停止,以下程序展示了如何使用tf.Coordinator
    """
    
    import tensorflow as tf
    import numpy as np
    import threading
    import time
    
    
    # 线程中运行的程序,这个程序每隔1秒判断是否需要停止并打印自己的ID。
    def MyLoop(coord, worker_id):
        # 使用tf.Coordinator类提供的协同工具判断当前线程是否需要停止
        while not coord.should_stop():
            # 随机停止所有的线程。
            if np.random.rand() < 0.1:
                print("Stoping from id: %d
    " % worker_id)
                # 调用coord.request_stop()函数来通知其他线程停止。
                coord.request_stop()
            else:
                # 打印当前线程的Id。
                print("Working on id: %d
    " % worker_id)
            # 暂停1秒
            time.sleep(1)
    
    
    # 声明一个tf.train.Coordinator类来协同多个线程。
    coord = tf.train.Coordinator()
    # 声明创建5个线程。
    threads = [threading.Thread(target=MyLoop, args=(coord, i, )) for i in range(5)]
    # 启动所有的线程
    for t in threads:
        t.start()
    # 等待所有线程退出
    coord.join(threads)

     运行结果:


     如何使用tf.QueueRunner和tf.Coordinator来管理多线程队列操作。

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    # coding=utf-8 
    
    """
    @author: Li Tian
    @contact: 694317828@qq.com
    @software: pycharm
    @file: queuerunner_test1.py
    @time: 2019/2/3 12:31
    @desc: 如何使用tf.QueueRunner和tf.Coordinator来管理多线程队列操作。
    """
    
    import tensorflow as tf
    
    # 声明一个先进先出的队列,队列中最多100个元素,类型为实数
    queue = tf.FIFOQueue(100, "float")
    # 定义队列的入队操作
    enqueue_op = queue.enqueue([tf.random_normal([1])])
    
    # 使用tf.train.QueueRunner来创建多个线程运行队列的入队操作。
    # tf.train.QueueRunner的第一个参数给出了被操作的队列,[enqueue_op] * 5
    # 表示了需要启动5个线程,每个线程中运行的是enqueue_op操作
    qr = tf.train.QueueRunner(queue, [enqueue_op] * 5)
    
    # 将定义过的QueueRunner加入Tensorflow计算图上指定的集合。
    # tf.train.add_queue_runner函数没有指定集合
    # 则加入默认集合tf.GraphKeys.QUEUE_RUNNERS。下面的函数就是将刚刚定义的
    # qr加入默认的tf.GraphKeys.QUEUE_RUNNER集合。
    tf.train.add_queue_runner(qr)
    # 定义出队操作
    out_tensor = queue.dequeue()
    
    with tf.Session() as sess:
        # 使用tf.train.Coordinator来协同启动的线程。
        coord = tf.train.Coordinator()
        # 使用tf.train.QueueRunner时,需要明确调用tf.train.start_queue_runners
        # 来启动所有线程。否则因为没有线程运行入队操作,当调用出队操作的时候,程序会一直
        # 等待入队操作被运行。tf.train.start_queue_runners函数会默认启动
        # tf.GraphKeys.QUEUE_RUNNERS集合中所有的QueueRunner。因为这个函数值支持启动
        # 指定集合中的QueueRunner,所以一般来说tf.train.add_queue_runner函数和
        # tf.trian.start_queue_runners函数会指定同一个集合。
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)
        # 获取队列中的取值。
        for _ in range(3):
            print(sess.run(out_tensor)[0])
    
        # 使用tf.train.Coordinator来停止所有的线程
        coord.request_stop()
        coord.join(threads)

      运行结果:

  • 相关阅读:
    数组和对象常用方法汇总
    基于vue的悬浮碰撞窗口(用于打广告的)组件
    时间的基本处理
    防抖动和节流阀
    A. 配置xftp和xshell来远程管理Linux服务器
    课堂练习-找水王
    评价软件
    构建之法阅读笔记02
    学习进度条博客11
    用户场景
  • 原文地址:https://www.cnblogs.com/lyjun/p/10350282.html
Copyright © 2011-2022 走看看