zoukankan      html  css  js  c++  java
  • JoinableQueue

    其实我们的思路无非是发送结束信号而已,有另外一种队列提供了这种机制

    JoinableQueue([maxsize])

    这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
    

    参数介绍

    maxsize是队列中允许最大项数,省略则无大小限制。
    

    方法介绍

    JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
    q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
    q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
    

    基于JoinableQueue实现生产者消费者模型

    from multiprocessing import Process,JoinableQueue
    import time,random,os
    def consumer(q,name):
        while True:
            res=q.get()
            time.sleep(random.randint(1,3))
            print('33[43m%s 吃 %s33[0m' %(name,res))
            q.task_done() #发送信号给q.join(),说明已经从队列中取走一个数据并处理完毕了
    
    def producer(q,name,food):
        for i in range(3):
            time.sleep(random.randint(1,3))
            res='%s%s' %(food,i)
            q.put(res)
            print('33[45m%s 生产了 %s33[0m' %(name,res))
        q.join() #等到消费者把自己放入队列中的所有的数据都取走之后,生产者才结束
    
    if __name__ == '__main__':
        q=JoinableQueue() #使用JoinableQueue()
    
        #生产者们:即厨师们
        p1=Process(target=producer,args=(q,'egon1','包子'))
        p2=Process(target=producer,args=(q,'egon2','骨头'))
        p3=Process(target=producer,args=(q,'egon3','泔水'))
    
        #消费者们:即吃货们
        c1=Process(target=consumer,args=(q,'alex1'))
        c2=Process(target=consumer,args=(q,'alex2'))
        c1.daemon=True
        c2.daemon=True
    
        #开始
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
    
        p1.join()
        p2.join()
        p3.join()
        #1、主进程等生产者p1、p2、p3结束
        #2、而p1、p2、p3是在消费者把所有数据都取干净之后才会结束
        #3、所以一旦p1、p2、p3结束了,证明消费者也没必要存在了,应该随着主进程一块死掉,因而需要将生产者们设置成守护进程
        print('主')






    下面是示例:

    from multiprocessing import Process, JoinableQueue
    import time
    def producer(q):
    for i in range(10):
    res = '包子%s'%i
    time.sleep(0.5)
    print('生产者生产了%s个'%res)
    q.put(res)
    q.join()
    def consmer(q):
    while True:
    res = q.get()
    if res is None:break
    time.sleep(1)
    print('消费者吃了%s个'%res)
    q.task_done()
    if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer, args=(q, ))
    c1 = Process(target=consmer, args=(q, ))
    p1.start()
    c1.daemon = True
    c1.start()
    p1.join()
    print('主')

     

  • 相关阅读:
    css中优先级与层叠
    微服务通过feign.RequestInterceptor传递参数
    maven详解
    Java8新特性interface中的static方法和default方法
    设计模式六大原则---转
    MySQL主从复制作用和原理
    Mysql Binlog三种格式详细介绍
    分布式主键生成逻辑总结--转
    java幂等性的解决方案
    spring事务的传播机制新解
  • 原文地址:https://www.cnblogs.com/yuexijun/p/11524335.html
Copyright © 2011-2022 走看看