zoukankan      html  css  js  c++  java
  • Python--10、生产者消费者模型

    生产者消费者模型(★)

    平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
    程序中有两类角色:生产数据、消费数据
    实现方式:生产->队列->消费。

    通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
    通过队列的方式实现了程序解耦(生产者不需要管消费者的消费能力,不断的生产)
    #消息队列作为中间件,接受生产者发来的消息,并且供消费者拿走进行消费。
    实现了异步

    示例:

    from multiprocessing import Process,Queue
    import time,random,os
    def consumer(q):
        while True:
            res=q.get()
            time.sleep(random.randint(1,3))
            print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))
    
    def producer(q):
        for i in range(10):
            time.sleep(random.randint(1,3))
            res='卤肉%s' %i
            q.put(res)
            print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
    
    if __name__ == '__main__':
        q=Queue()
        #生产者们:即制造东西
        p1=Process(target=producer,args=(q,))
    
        #消费者们:即使用的
        c1=Process(target=consumer,args=(q,))
    
        #开始
        p1.start()
        c1.start()
        print('主')
    #这个示例会因最后生产者生产完了,但消费者取完后还在取一直阻塞在get()这里。
    
    

    生产者可以在自己生产完后停止往队列中传递,但是消费者需要不断的消费,在消费完生产的数据后无法停止,会一直处于等待状态。

    #若是在生产结束后发送none,会因存在多个生产、消费者时发多个none信号。

    为了避免这种状态,需要使用JoinableQueue方法来实现结束的信号。

    JoinableQueue 队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

    消费者在每次消费完后执行q.task_done() 往消息队列中发送信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常。
    q.join() 调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止。可以判断生产者生产的个数与消费者消费的个数,并且关闭,q.join()需要加到生产者生产完(结束)后,判断完成后会继续执行主进程代码,在执行完后还会被消费者的消费卡主,故把消费者设置成守护进程。 示例:

    from multiprocessing import Process,JoinableQueue
    import time,random,os
    def consumer(q):
        while True:
            res=q.get()
            time.sleep(random.randint(1,3))
            print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))
    
            q.task_done() #向q.join()发送一次信号,证明一个数据已经被取走了
    
    def producer(name,q):
        for i in range(10):
            time.sleep(random.randint(1,3))
            res='%s%s' %(name,i)
            q.put(res)
            print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
        q.join()
    
    
    if __name__ == '__main__':
        q=JoinableQueue()
        #生产者们:即制造东西
        p1=Process(target=producer,args=('肉',q))
        p2=Process(target=producer,args=('骨头',q))
        p3=Process(target=producer,args=('馒头',q))
    
        #消费者们:即使用的
        c1=Process(target=consumer,args=(q,))
        c2=Process(target=consumer,args=(q,))
        c1.daemon=True
        c2.daemon=True
    
        #开始
        p_l=[p1,p2,p3,c1,c2]
        for p in p_l:
            p.start()
    
        p1.join()
        p2.join()
        p3.join()
        print('主') 
        
        #主进程等--->p1,p2,p3等---->c1,c2
        #p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
        #因而c1,c2也没有存在的价值了,应该随着主进程的结束而结束,所以设置成守护进程
  • 相关阅读:
    综合练习-词频统计
    组合数据类型综合练习
    Python综合练习
    PostOrder_Traversal 二叉树的非递归后序遍历
    PAT甲级-1152-Google Recruitment(20 分)
    PAT甲级-1007-Maximum Subsequence Sum (25 分)
    macOS MOjave运行pygame不显示图像-解决方案
    PAT甲级-1004-Counting Leaves(30 分)
    Python进阶-Numpy科学计算库(简单入门)
    回文日期
  • 原文地址:https://www.cnblogs.com/jinyudong/p/7894756.html
Copyright © 2011-2022 走看看