消费者和生产者模型
from multiprocessing import Process,Queue import time import random class Producer(Process): def __init__(self,name,food,q): super().__init__() self.name = name self.food = food self.q = q def run(self): for i in range(1,11): time.sleep(random.randint(1,3)) #1到3秒生产一个数据 f = '{}生产了第{}个{}'.format(self.name,i,self.food) print(f) self.q.put(f) class Consumer(Process): def __init__(self,name,q): super().__init__() self.name = name self.q = q def run(self): while True: food = self.q.get() if food is None: print('{}获取了一个空~结束'.format(self.name)) break #如果进程获取到空值 则跳出结束循环 else: print('{}吃了{}'.format(self.name,food)) time.sleep(random.randint(1, 3)) if __name__ == '__main__': q = Queue(20) p1 = Producer('wdc','包子',q) p2 = Producer('yhf','馒头',q) c1 = Consumer('qqq',q) c2 = Consumer('www',q) p1.start() p2.start() c1.start() c2.start() p1.join() #感知p1和p2的结束 p2.join() q.put(None) #给队列中添加两个空值,供消费者最后获取 q.put(None)
结果:,这种方法虽然能够实现这种功能,但是如果再增加消费者的话,就要再后面继续加q.put(None)。
改进:
from multiprocessing import Process,JoinableQueue import time import random class Producer(Process): def __init__(self,name,food,q): super().__init__() self.name = name self.food = food self.q = q def run(self): for i in range(1,11): time.sleep(random.randint(1,3)) #1到3秒生产一个数据 f = '{}生产了第{}个{}'.format(self.name,i,self.food) print(f) self.q.put(f) self.q.join() #阻塞,直到一个队列中的所有数据全部被处理完毕。在这里的作用就是在这里等待生产的所有的食物被吃完,再继续进行 class Consumer(Process): def __init__(self,name,q): super().__init__() self.name = name self.q = q def run(self): while True: food = self.q.get()print('{}吃了{}'.format(self.name,food)) time.sleep(random.randint(1, 3)) self.q.task_done() #如果是JoinableQueue,一般get()之后都要和task_done()结合使用:累次一个计数器,每取出一个数据,就做一个计数器减1 if __name__ == '__main__': q = JoinableQueue(20) p1 = Producer('wdc','包子',q) p2 = Producer('yhf','馒头',q) c1 = Consumer('qqq',q) c2 = Consumer('www',q) p1.start() p2.start() c1.daemon = True #将c1和c2都设置成守护进程,主进程的代码执行结束,守护进程自动结束。 c2.daemon = True c1.start() c2.start() p1.join() #感知p1和p2的结束 p2.join()
结果:
改进后的执行过程:
在消费者这一端:
每次获取一个数据,处理一个数据,发送一个记号:标志一个数据被处理成功
在生产者这一端:
每次生产一个数据,且每依次的数据都放在队列当中,当生产者生产完毕之后,发送一个join信号,表示已经停止生产数据了且在这里阻塞,等待消费者处理队列中的数据,当数据都被处理完时,join的阻塞结束。
总结: