目 录
一、进程间通信--引出队列
队列
创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
相关方法:
.put() 向队列中添加值
.get() 向队列取值
.full() 判断队里是否已满
.empty() 判断队列是否已经为空
.get_nowait() 若队列中没有值可取时,直接报错不等待!
from multiprocessing import Process,Queue q = Queue(5) # 参数5表示该队列的容量只能装5个元素 q.put(1) # 点put()方法向队列中添加元素 q.put(2) q.put(3) # print(q.full()) # False 点full() 方法判断队列是否已经装满 q.put(4) q.put(5) # print(q.full()) # True # q.put(6) # 当超过队列容量的时候程序会一直等待直到队列中有元素被取走 # print(q.full()) # 阻塞!!! 不会执行该行代码,因为q.put(6)还在停滞状态 print(q.get()) print(q.get()) print(q.get()) print(q.empty()) # 点empty() 方法判断队列是否为空 False print(q.get()) print(q.get()) print(q.empty()) # True # print(q.get()) # 阻塞态!! 当队列取值超出范围没有值可取时,会出现阻塞 直到队列中有值可取 # print(q.empty()) # 该行代码不会被执行,因为程序已经阻塞在上一行 print(q.get_nowait()) # 若队列中没有值可取的话,不等待直接报错 ''' 报错结果: raise Empty queue.Empty '''
注意:
点full() 方法 点empty() 方法 点get_nowait() 方法都不适用与多进程情况!
因为多进程情况下,如果某一时刻队列为空,但是判断的同时有一个进程向队列中添加值,情况就不好判断
""" full get_nowait empty 都不适用于多进程的情况 原因:多进程的时候‘空’和‘满’的状况不好判断,也许队列刚被取空就有其他进程向队列中添加值。。。 """
二、进程间通信IPC机制
进程间通信
IPC(Inter-Process Communication)
队列
创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
子进程放数据,主进程获取数据
def producer(q): q.put('hello baby!') # def consumer(q): # q.get() if __name__ == '__main__': q = Queue(5) p = Process(target=producer,args=(q,)) p.start() print(q.get()) # 该步骤不需要time.sleep()模拟延迟,因为get() 会知道子进程向队列中放入值才能取,否则进入阻塞
两个子进程相互放、取数据
from multiprocessing import Process,Queue def producer(q): q.put('hello baby!') def consumer(q): print(q.get()) # 取数据 if __name__ == '__main__': q = Queue(5) p = Process(target=producer,args=(q,)) c = Process(target=consumer,args=(q,)) p.start() c.start() print('主') ''' 执行结果: 主 hello baby! '''
三、生产者与消费者模型
""" 生产者:生产/制造数据的 消费者:消费/处理数据的 例子:做包子的,买包子的 1.做包子远比买包子的多 2.做包子的远比包子的少 供需不平衡的问题 """
引出问题:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from multiprocessing import Process,JoinableQueue,Queue import time import os import random def producer(name,food,q): for i in range(10): data = '%s 生产了%s%s'%(name,food,i) time.sleep(random.random()) # random.random()随机产生0~1之间的小数 q.put(data) def consumer(name,q): while True: data = q.get() # if data == None: break print('%s 吃了%s'%(name,data)) time.sleep(random.random()) if __name__ == '__main__': q = Queue() p1 = Process(target=producer,args=('egon','馒头',q)) p2 = Process(target=producer,args=('jason','生蚝',q)) c1 = Process(target=consumer,args=('阳',q)) c2 = Process(target=consumer,args=('朱',q)) p1.start() p2.start() c1.start() c2.start() # p1.join() # p2.join() # q.put(None) # q.put(None)
问题:消费者进程出现阻塞,无值可取!
解决思想:
首先要确定生产者已经生产完毕,然后确定消费者已经取值完毕!
方法1:
在生产者执行完毕后,主进程添加队列 None值,当消费者在队列中取值 == None时,退出
应用方法:join()
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from multiprocessing import Process,JoinableQueue,Queue import time import os import random def producer(name,food,q): for i in range(10): data = '%s 生产了%s%s'%(name,food,i) time.sleep(random.random()) # random.random()随机产生0~1之间的小数 q.put(data) def consumer(name,q): while True: data = q.get() # if data == None: break print('%s 吃了%s'%(name,data)) time.sleep(random.random()) if __name__ == '__main__': q = Queue() p1 = Process(target=producer,args=('egon','馒头',q)) p2 = Process(target=producer,args=('jason','生蚝',q)) c1 = Process(target=consumer,args=('阳',q)) c2 = Process(target=consumer,args=('朱',q)) p1.start() p2.start() c1.start() c2.start() # p1.join() # p2.join() # q.put(None) # q.put(None) ''' 出现问题:消费者进程阻塞,无值可取 解决方法: 首先要确定生产者已经生产完毕,然后确定消费者已经取值完毕! 在生产者执行完毕后,主进程添加队列 None值,当消费者在队列中取值 == None时,退出 应用方法:join() '''
方法2:
JoinableQueue 能够被等待的队列
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from multiprocessing import Process,JoinableQueue import time import os import random def producer(name,food,q): for i in range(10): data = '%s 生产了%s%s'%(name,food,i) time.sleep(random.random()) # random.random()随机产生0~1之间的小数 q.put(data) def consumer(name,q): while True: data = q.get() # if data == None: break print('%s 吃了%s'%(name,data)) time.sleep(random.random()) # q.task_done() 告诉队列你已经从队列中取出了一个数据 并且处理完毕了 q.task_done() if __name__ == '__main__': q = JoinableQueue() p1 = Process(target=producer,args=('egon','馒头',q)) p2 = Process(target=producer,args=('jason','生蚝',q)) c1 = Process(target=consumer,args=('阳',q)) c2 = Process(target=consumer,args=('朱',q)) p1.start() p2.start() c1.daemon = True # 给两个消费者添加守护进程,等待主进程执行完 q.join() 之后自动结束 c2.daemon = True c1.start() c2.start() p1.join() p2.join() # q.join() 等到队列中数据全部取出 q.join()