zoukankan      html  css  js  c++  java
  • JoinableQueue

    其实我们的思路无非是发送结束信号而已,有另外一种队列提供了这种机制

    JoinableQueue([maxsize])

    这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
    

    参数介绍

    maxsize是队列中允许最大项数,省略则无大小限制。
    

    方法介绍

    JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
    q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
    q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
    

    基于JoinableQueue实现生产者消费者模型

    from multiprocessing import Process,JoinableQueue
    import time,random,os
    def consumer(q,name):
        while True:
            res=q.get()
            time.sleep(random.randint(1,3))
            print('33[43m%s 吃 %s33[0m' %(name,res))
            q.task_done() #发送信号给q.join(),说明已经从队列中取走一个数据并处理完毕了
    
    def producer(q,name,food):
        for i in range(3):
            time.sleep(random.randint(1,3))
            res='%s%s' %(food,i)
            q.put(res)
            print('33[45m%s 生产了 %s33[0m' %(name,res))
        q.join() #等到消费者把自己放入队列中的所有的数据都取走之后,生产者才结束
    
    if __name__ == '__main__':
        q=JoinableQueue() #使用JoinableQueue()
    
        #生产者们:即厨师们
        p1=Process(target=producer,args=(q,'egon1','包子'))
        p2=Process(target=producer,args=(q,'egon2','骨头'))
        p3=Process(target=producer,args=(q,'egon3','泔水'))
    
        #消费者们:即吃货们
        c1=Process(target=consumer,args=(q,'alex1'))
        c2=Process(target=consumer,args=(q,'alex2'))
        c1.daemon=True
        c2.daemon=True
    
        #开始
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
    
        p1.join()
        p2.join()
        p3.join()
        #1、主进程等生产者p1、p2、p3结束
        #2、而p1、p2、p3是在消费者把所有数据都取干净之后才会结束
        #3、所以一旦p1、p2、p3结束了,证明消费者也没必要存在了,应该随着主进程一块死掉,因而需要将生产者们设置成守护进程
        print('主')






    下面是示例:

    from multiprocessing import Process, JoinableQueue
    import time
    def producer(q):
    for i in range(10):
    res = '包子%s'%i
    time.sleep(0.5)
    print('生产者生产了%s个'%res)
    q.put(res)
    q.join()
    def consmer(q):
    while True:
    res = q.get()
    if res is None:break
    time.sleep(1)
    print('消费者吃了%s个'%res)
    q.task_done()
    if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer, args=(q, ))
    c1 = Process(target=consmer, args=(q, ))
    p1.start()
    c1.daemon = True
    c1.start()
    p1.join()
    print('主')

     

  • 相关阅读:
    Oracle 查询语句截取字符串
    删除ORACLE数据库中重复的数据
    [易学C#]C#3.0语言新特性之匿名类型
    [讨论]程序之路在何方?
    [易学C#]C#3.0语言新特性之扩展方法
    [易学C#]C#3.0语言新特性之对象和集合初始化器
    用C#解决Oracle9i和Oracle10g字符集不兼容的问题
    一个C#操作Oracle的通用类
    [易学C#]C#3.0语言新特性之隐式类型
    重拾 DirectX 一:配置Visual Studio 2008+Microsoft DirectX SDK (June 2008) 开发环境
  • 原文地址:https://www.cnblogs.com/yuexijun/p/11524335.html
Copyright © 2011-2022 走看看