zoukankan      html  css  js  c++  java
  • python之深度学习-模拟异步操作(队列)

    队列管理器的创建:

    tf.train.QueueRunner(queue,enqueue_ops = None):

    参数解释:

    queue:一个队列

    enqueue_ops :添加线程的队列操作的列表,[]*2就是指定两个线程

    create_threads(sess,coord=None,start = False);创建线程来运行给定会话的入队操作,返回一个线程的实例

    参数的解释:

    start:布尔值,如果是True就启动线程,如果是False的话,调用者要是想启动线程就必须调用start()

    coord:线程协调器,后面的线程管理器需要用到

    #模拟异步操作:子线程存入数据,主线程从子线程的队列中读取数据
    #1.定义一个队列:大小是1000
    Q = tf.FIFOQueue(1000,tf.float32)
    #2.定义子线程需要怎么处理数据,这里我们把数据每次都+1
    var = tf.Variable(0.0) #初始值是0.0
    data = tf.assign_add(var,1.0) #每次都把var这个变量的值+1,为什么不直接var+1呢,因为这是两个不同的op,
    en_q = Q.enqueue(data) #进队列
    #3.定义一个队列的任务管理器,可以指定多个子线程,接下来我们指定两个
    qr = tf.train.QueueRunner(Q,enqueue_ops=[en_q]*2)
    #不要忘记之前定义过的变量要初始化
    init_op = tf.global_variables_initializer()
    with tf.Session() as sess:
        #初始化变量
        sess.run(init_op)
        #正式开启子线程;这一步必须在会话中完成,在这里设置参数start=True就表示在此时已经开启子线程
        threads = qr.create_threads(sess,start=True)
        #主线程不断地读取数据
        for i in range(300):
            print(sess.run(Q.dequeue()))

    如果你这时候去运行他会报错:

    CancelledError: Enqueue operation was cancelled

    这是因为主线程运行300次后就关闭了,这时候会话也关闭了,但是子线程需要运行1000次后才会关闭,所以这时候在会话关闭的情况下去运行子线程,(会话是进行资源管理的)子线程就得不到运行的资源了,这时候程序就会报错,提示会话已经关闭了,你被强制停止了

    那该怎么办呢?

    这里我们会用到一个叫线程协调器的函数:

    tf.train.Coordinator():实现一个简单的机制来协调一组线程的终止

    参数可以有如下:

    request_stop() ===>请求线程停止:这是针对子线程运行的内容很重要的情况下调用的参数

    should_stop() ===>强制停止

    join(threadName) ===>回收线程

    #模拟异步操作:子线程存入数据,主线程从子线程的队列中读取数据
    #1.定义一个队列:大小是1000
    Q = tf.FIFOQueue(1000,dtypes=tf.float32)
    #2.定义子线程需要怎么处理数据,这里我们把数据每次都+1
    var = tf.Variable(0.0) #初始值是0.0
    data = tf.assign_add(var,1.0) #每次都把var这个变量的值+1,为什么不直接var+1呢,因为这是两个不同的op,
    en_q = Q.enqueue(data) #进队列
    #3.定义一个队列的任务管理器,可以指定多个子线程,接下来我们指定两个
    qr = tf.train.QueueRunner(Q,enqueue_ops=[en_q]*2)
    #不要忘记之前定义过的变量要初始化
    init_op = tf.global_variables_initializer()
    with tf.Session() as sess:
        #初始化变量
        sess.run(init_op)
        #开启线程管理器
        coord = tf.train.Coordinator()
        #正式开启子线程;这一步必须在会话中完成,在这里设置参数start=True就表示在此时已经开启子线程,指定了一个线程管理器coord
        threads = qr.create_threads(sess,coord=coord,start=True)
        #主线程不断地读取数据
        for i in range(300):
            print(sess.run(Q.dequeue()))
        #开始回收子线程
        coord.request_stop()
        coord.join(threads) #回收的线程是threads
  • 相关阅读:
    一个网络狂人的财富轨迹
    婚姻的精髓
    软件史上最伟大的十大程序员
    由瓜子理论引出的人力资源管理启示
    感情裂缝的"维修工" 在生活抛锚的地方起航
    寻找更新过的数据
    asp.net mvc中TempData和ViewData的区别
    SQL Server Backup
    VS字符串时间转换用法
    SQL Server 根据动态条件insert,update语句
  • 原文地址:https://www.cnblogs.com/boost/p/13516816.html
Copyright © 2011-2022 走看看