zoukankan      html  css  js  c++  java
  • python_并发编程——消费者和生产者模型

    消费者和生产者模型

    from multiprocessing import Process,Queue
    import time
    import random
    
    class Producer(Process):
        def __init__(self,name,food,q):
            super().__init__()
            self.name = name
            self.food = food
            self.q = q
        def run(self):
            for i in range(1,11):
                time.sleep(random.randint(1,3))     #1到3秒生产一个数据
                f = '{}生产了第{}个{}'.format(self.name,i,self.food)
                print(f)
                self.q.put(f)
    
    class Consumer(Process):
        def __init__(self,name,q):
            super().__init__()
            self.name = name
            self.q = q
    
        def run(self):
            while True:
                food = self.q.get()
                if food is None:
                    print('{}获取了一个空~结束'.format(self.name))
                    break   #如果进程获取到空值 则跳出结束循环
                else:
                    print('{}吃了{}'.format(self.name,food))
                    time.sleep(random.randint(1, 3))
    
    if __name__ == '__main__':
        q = Queue(20)
        p1 = Producer('wdc','包子',q)
        p2 = Producer('yhf','馒头',q)
        c1 = Consumer('qqq',q)
        c2 = Consumer('www',q)
        p1.start()
        p2.start()
        c1.start()
        c2.start()
        p1.join()   #感知p1和p2的结束
        p2.join()
        q.put(None)     #给队列中添加两个空值,供消费者最后获取
        q.put(None)

    结果:,这种方法虽然能够实现这种功能,但是如果再增加消费者的话,就要再后面继续加q.put(None)。

     改进:

    from multiprocessing import Process,JoinableQueue
    import time
    import random
    
    class Producer(Process):
        def __init__(self,name,food,q):
            super().__init__()
            self.name = name
            self.food = food
            self.q = q
        def run(self):
            for i in range(1,11):
                time.sleep(random.randint(1,3))     #1到3秒生产一个数据
                f = '{}生产了第{}个{}'.format(self.name,i,self.food)
                print(f)
                self.q.put(f)
            self.q.join()   #阻塞,直到一个队列中的所有数据全部被处理完毕。在这里的作用就是在这里等待生产的所有的食物被吃完,再继续进行
    
    class Consumer(Process):
        def __init__(self,name,q):
            super().__init__()
            self.name = name
            self.q = q
    
        def run(self):
            while True:
                food = self.q.get()print('{}吃了{}'.format(self.name,food))
                time.sleep(random.randint(1, 3))
                self.q.task_done()      #如果是JoinableQueue,一般get()之后都要和task_done()结合使用:累次一个计数器,每取出一个数据,就做一个计数器减1
    
    if __name__ == '__main__':
        q = JoinableQueue(20)
        p1 = Producer('wdc','包子',q)
        p2 = Producer('yhf','馒头',q)
        c1 = Consumer('qqq',q)
        c2 = Consumer('www',q)
        p1.start()
        p2.start()
        c1.daemon = True    #将c1和c2都设置成守护进程,主进程的代码执行结束,守护进程自动结束。
        c2.daemon = True
        c1.start()
        c2.start()
        p1.join()   #感知p1和p2的结束
        p2.join()

    结果:

    改进后的执行过程:

      在消费者这一端:

       每次获取一个数据,处理一个数据,发送一个记号:标志一个数据被处理成功

      在生产者这一端:

       每次生产一个数据,且每依次的数据都放在队列当中,当生产者生产完毕之后,发送一个join信号,表示已经停止生产数据了且在这里阻塞,等待消费者处理队列中的数据,当数据都被处理完时,join的阻塞结束。

    总结:

  • 相关阅读:
    centos7 下安装MongoDB
    centos7 学习笔记
    MongoDB相关资料收集
    centos 下安装.net core
    sql server 2008 r2 中的oracle发布使用笔记
    sql server 与oracle数据互导的一种思路--sql server链接服务器
    Visual Studio 2015正式版/产品密钥 Win10正式版官方原版ISO镜像下载大全&安装激活教程
    Modbus库开发笔记:Modbus ASCII Slave开发
    PID控制器开发笔记之十一:专家PID控制器的实现
    μCUnit,微控制器的单元测试框架
  • 原文地址:https://www.cnblogs.com/wangdianchao/p/12078169.html
Copyright © 2011-2022 走看看