一:进程间的通信(IPC):先进先出 管道:队列=管道+锁
from multiprocessing import Queue q=Queue(4) q.put(['first',],block=True,timeout=3) q.put({'x':2},block=True,timeout=3) q.put(3,block=True,timeout=3) q.put(4,block=True,timeout=3) print(q.get(block=True,timeout=3)) print(q.get(block=True,timeout=3)) print(q.get(block=True,timeout=3)) print(q.get(block=True,timeout=3))
from multiprocessing import Queue #Queued队列模块 q=Queue() print(q.get()) print(q.get()) print(q.get()) print(q.get()) q.put(['first'],block=True,timeout=3) q.put({'x':2},block=True,timeout=3) q.put(3,block=True,timeout=3) print(q.get(block=True,timeout=3)) print(q.get(block=True,timeout=3)) print(q.get(block=True,timeout=3)) print(q.get(block=True,timeout=3)) print(q.get_nowait()) #q.get(block=false) print(q.get_nowait()) #q.get(block=false) print(q.get_nowait()) #q.get(block=false) print(q.get_nowait()) #q.get(block=false)
二:生产者消费者模型
1. 什么是生产者消费者模型
生产者:代指生产数据的任务
消费者:代指处理数据的任务
该模型的工作方式:生产生产数据传递消费者处理
实现方式: 生产者---->队列<------消费者
2. 为什么要用
当程序中出现明细的两类任务,一类负责生产数据,一类负责处理数据,
就可以引入生产者消费者模型来实现生产者与消费者的解耦合,平衡生产能力与消费能力,从提升效率
3. 如何用
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
import time, random from multiprocessing import Process, Queue def produer(name, food, q): for i in range(3): # 生产的数量 res = '%s%s' % (food, i) # 造数据 time.sleep(random.randint(1, 3)) # 模拟生产数据的时间 q.put(res) print('厨师【%s】做了<%s>' % (name, res)) def consumer(name, q): while True: res = q.get() if res is None: break # 生产者生产完后break退出 time.sleep(random.randint(1, 3)) # 模拟处理数据的时间 print('吃货【%s】吃了<%s>' % (name, res)) if __name__ == '__main__': q = Queue() # 生产者们 p1 = Process(target=produer, args=('小混世魔王', '包子', q)) p2 = Process(target=produer, args=('中混世魔王', '馒头', q)) p3 = Process(target=produer, args=('大混世魔王', '鸡蛋', q)) # 消费者们 c1 = Process(target=consumer, args=('扒皮', q)) c2 = Process(target=consumer, args=('钢牙', q)) p1.start() p2.start() p3.start() c1.start() c2.start() print('主')
存在问题
此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。
===>解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
import time, random from multiprocessing import Process, Queue def produer(name, food, q): for i in range(3): # 生产的数量 res = '%s%s' % (food, i) # 造数据 time.sleep(random.randint(1, 3)) # 模拟生产数据的时间 q.put(res) print('厨师【%s】做了<%s>' % (name, res)) def consumer(name, q): while True: res = q.get() if res is None: break # 生产者生产完后break退出 time.sleep(random.randint(1, 3)) # 模拟处理数据的时间 print('吃货【%s】吃了<%s>' % (name, res)) if __name__ == '__main__': q = Queue() # 生产者们 p1 = Process(target=produer, args=('小混世魔王', '包子', q)) p2 = Process(target=produer, args=('中混世魔王', '馒头', q)) p3 = Process(target=produer, args=('大混世魔王', '鸡蛋', q)) # 消费者们 c1 = Process(target=consumer, args=('扒皮', q)) c2 = Process(target=consumer, args=('钢牙', q)) p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() # 保证所有生产者生产完东西 q.put(None) # 所有生产者都生产完,在队列的最后加上None信号 q.put(None) # 给第二个消费者的信号 print('主')
===>队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
import time, random from multiprocessing import Process, JoinableQueue def produer(name, food, q): for i in range(3): # 生产的数量 res = '%s%s' % (food, i) # 造数据 time.sleep(random.randint(1, 3)) # 模拟生产数据的时间 q.put(res) print('厨师【%s】做了<%s>' % (name, res)) def consumer(name, q): while True: res = q.get() time.sleep(random.randint(1, 3)) # 模拟处理数据的时间 print('吃货【%s】吃了<%s>' % (name, res)) q.task_done() #向q.join()发送一次信号,证明一个数据已经被取走了 if __name__ == '__main__': q = JoinableQueue() # 生产者们 p1 = Process(target=produer, args=('小混世魔王', '包子', q)) p2 = Process(target=produer, args=('中混世魔王', '馒头', q)) p3 = Process(target=produer, args=('大混世魔王', '鸡蛋', q)) # 消费者们 c1 = Process(target=consumer, args=('扒皮', q)) c2 = Process(target=consumer, args=('钢牙', q)) p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() # 队列取空 print('主')