zoukankan      html  css  js  c++  java
  • JoinableQueue---创建可连接的共享进程队列

    创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。

    通知进程是使用共享的信号和条件变量来实现的。

    from multiprocessing import Process,JoinableQueue,Queue
    import time,random,os
    def consumer(q):
        while True:
            res=q.get()
            if res is None:break #收到结束信号则结束
            time.sleep(random.randint(1,3))
            print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))
            # q.task_done()
    
    def producer(name,q):
        for i in range(2):
            time.sleep(random.randint(1,3))
            res='%s%s' %(name,i)
            q.put(res)
            print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
    
    if __name__ == '__main__':
        q=Queue()
        #生产者们:即厨师们
        p1=Process(target=producer,args=('包子',q))
        p2=Process(target=producer,args=('骨头',q))
        p3=Process(target=producer,args=('泔水',q))
    
        #消费者们:即吃货们
        c1=Process(target=consumer,args=(q,))
        c2=Process(target=consumer,args=(q,))
    
        #开始
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
    
        p1.join() #必须保证生产者全部生产完毕,才应该发送结束信号
        p2.join()
        p3.join()
        q.put(None) #有几个消费者就应该发送几次结束信号None
        q.put(None) #发送结束信号
        print('')
    View Code

    根据之前的例子:多个人消费必须最后发送几个put(None),这样的话就太low了

    在此,我们可以使用JoinableQueue来实现

    from multiprocessing import Process,JoinableQueue,Queue
    import time,random,os
    def consumer(q):
        while True:
            res=q.get()
            if res is None:break #收到结束信号则结束
            time.sleep(random.randint(1,3))
            print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))
            q.task_done() # 向q.join()发送一次信号,证明一个数据已经被取走了
    
    def producer(name,q):
        for i in range(2):
            time.sleep(random.randint(1,3))
            res='%s%s' %(name,i)
            q.put(res)
            print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
            # q.join() #q.join除了可以放在主进程里面,也可以放在这里
    
    if __name__ == '__main__':
        q=JoinableQueue()
        #生产者们:即厨师们
        p1=Process(target=producer,args=('包子',q))
        p2=Process(target=producer,args=('骨头',q))
        p3=Process(target=producer,args=('泔水',q))
    
        #消费者们:即吃货们
        c1=Process(target=consumer,args=(q,))
        c2=Process(target=consumer,args=(q,))
    
        c1.daemon=True #设置成守护进程
        c2.daemon=True
    
        #开始
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
    
        p1.join() #必须保证生产者全部生产完毕,才应该发送结束信号
        p2.join()
        p3.join()
        q.join() #生产者和消费者的进程都完成了
        print('')
    #为什么设置消费者为守护进程,因为执行到最后,生产者进程执行完了,主进程也完成了,但是由于消费者
    #进程是死循环,并没有结束,设置成守护进程后,主进程结束了,c1和c2也就可以结束了
  • 相关阅读:
    设计模式—适配器模式
    设计模式—策略模式 状态模式
    设计模式——装饰模式和代理模式
    C++常考算法
    ModelState.AddModelError使用
    Json
    ref与out
    三层与mvc
    新的方法(Set<T>)实现mvc的crud
    【程序45】
  • 原文地址:https://www.cnblogs.com/mmyy-blog/p/9428973.html
Copyright © 2011-2022 走看看