'''
1,什么是生产者消费者模型
生产者:比喻的是程序中负责产生数据的任务
消费者:比喻的是程序中负责处理数据的任务
生产者->共享的介质(队列)<-消费者
2,为何用
实现了生产者与消费者的解耦合,生产者可以不停地生产,消费者也可以不停地消费
从而平衡了消费者地生产能力与消费能力,提升了程序整体运行地效率
什么时候用?
当我们地程序中存在明显的两类任务,一类负责产生数据,另一类负责处理数据
此时我们就应该考虑使用生产者消费者模型来提升程序地效率
'''
from multiprocessing import Queue ,Process
import time,os, random
def producer(q):
for i in range(10):
res='包子%s'%i
time.sleep(random.randint(1,3))
q.put(res)
q.put(None)
def consumer(q):
while True:
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print('%s 吃了%s'%(os.getpid(),res))
if __name__ == '__main__':
q=Queue()
p1=Process(target=producer,args=(q,))
c1=Process(target=consumer,args=(q,))
p1.start()
c1.start()
print('主')
----------------------割割更健康-----------------
from multiprocessing import Queue ,Process
import time,os,random
def producer(name,food,q):
for i in range(3):
res='%s%s'%(food,i)
time.sleep(random.randint(1,3))
#往队列里面丢
q.put(res)
print(' 33[45m%s 生产了 %s 33[0m' %(name,res))
def consumer(name,q):
while True:
#往队列里取走
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print(' 33[46m%s 吃了 %s 33[0m' %(name,res))
q.task_done()
if __name__ == '__main__':
q=Queue()
p1=Process(target=producer,args=('egon','包子',q,))
p2=Process(target=producer,args=('杨军','水',q,))
p3=Process(target=producer,args=('侯老师','线',q,))
# 消费者
c1=Process(target=consumer,args=('alex',q))
c2=Process(target=consumer,args=('wupeiqidsb',q))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
#再p1/p2/p3都结束后,才应该往队列里放结束信号,有几个消费者就应该放几个None
q.put(None) #额,给消费者发结束信号
q.put(None)
print('主')
# ---------------------------------终极解决方案---------------------
from multiprocessing import JoinableQueue,Process
import time,os,random
def producer(name,food,q):
for i in range(3):
res='%s %s '%(food,i)
time.sleep(random.randint(1,3))
q.put(res)
print('%s 生产了%s '%(name,res))
def consumer(name,q):
while True:
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print('%s 吃了%s'%(name,res))
#向q.join()发送一次信号,证明一个数据已经被取走了
q.task_done()
if __name__ == '__main__':
q=JoinableQueue()
p1=Process(target=producer,args=('egon','包子',q,))
p2=Process(target=producer,args=('严峻','泔水',q,))
p3=Process(target=producer,args=('侯老师','先',q,))
c1=Process(target=consumer,args=('alex',q,))
c2=Process(target=consumer,args=('wpx',q,))
c1.daemon=True
c2.daemon=True
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.join()#等待队列被取赶紧
#q.join()
#结束意味着主进程代码运行完毕--->>(生产者运行完毕)+队列中地数据也被取赶紧了->消费者没有存在的意义
print('主')
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>ICP通讯>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
进程之间通信必须找到一种介质,该介质必须满足
1、是所有进程共享的
2、必须是内存空间
附加:帮我们自动处理好锁的问题
a、 from multiprocessing import Manager(共享内存,但要自己解决锁的问题)
b、 IPC中的队列(Queue) 共享,内存,自动处理锁的问题(最常用)
c、 IPC中的管道(Pipe),共享,内存,需自己解决锁的问题
a、 用队列Queue
1)共享的空间
2)是内存空间
3)自动帮我们处理好锁定问题
from multiprocessing import Queue
q=Queue(3) #设置队列中maxsize个数为三
q.put('first')
q.put({'second':None})
q.put('三')
# q.put(4) #阻塞。不报错,程序卡在原地等待队列中清出一个值。默认blok=True
print(q.get())
print(q.get())
print(q.get())
强调:
1、队列用来存成进程之间沟通的消息,数据量不应该过大
2、maxsize的值超过的内存限制就变得毫无意义
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>割割很健康>>>>>>>>>>>>>>>
互斥锁vs join:
大前提:二者的原理都是一样,都是将并发变成串行,从而保证有序(在多个程序共享一个资源时,为保证有序不乱,需将并发变成串行)
区别一:join是按照人为指定的顺序执行,而互斥锁是所以进程平等地竞争,谁先抢到谁执行
区别二:互斥锁可以让一部分代码(修改共享数据的代码)串行,而join只能将代码整体串行(详见抢票系统)
from multiprocessing import Process,Lock
import time,random
mutex=Lock()
def task1(lock):
lock.acquire()
print('task1:名字是egon')
time.sleep(random.randint(1,3))
print('task1:性别是male')
time.sleep(random.randint(1,3))
print('task1:年龄是18')
lock.release()
def task2(lock):
lock.acquire()
print('task2:名字是alex')
time.sleep(random.randint(1,3))
print('task2:性别是male')
time.sleep(random.randint(1,3))
print('task2:年龄是78')
lock.release()
def task3(lock):
lock.acquire()
print('task3:名字是lxx')
time.sleep(random.randint(1,3))
print('task3:性别是female')
time.sleep(random.randint(1,3))
print('task3:年龄是30')
lock.release()
if __name__ == '__main__':
p1=Process(target=task1,args=(mutex,))
p2=Process(target=task2,args=(mutex,))
p3=Process(target=task3,args=(mutex,))
p1.start()
p2.start()
p3.start()
一、抢票系统
import json
import time
import random
import os
from multiprocessing import Process,Lock
mutex=Lock()
def search():
time.sleep(random.randint(1,3))
with open('db.json','r',encoding='utf-8') as f:
dic=json.load(f)
print('%s 剩余票数:%s' %(os.getpid(),dic['count']))
def get():
with open('db.json','r',encoding='utf-8') as f:
dic=json.load(f)
if dic['count'] > 0:
dic['count']-=1
time.sleep(random.randint(1,3))
with open('db.json','w',encoding='utf-8') as f:
json.dump(dic,f)
print('%s 购票成功' %os.getpid())
def task(lock):
search()
lock.acquire()
get()
lock.release()
if __name__ == '__main__':
for i in range(10):
p=Process(target=task,args=(mutex,))
p.start()
一、守护进程
from multiprocessing import Process
import time
def task(name):
print('%s is running' % name)
time.sleep(3)
if __name__ == '__main__':
obj = Process(target=task, args=('egon',))
obj.daemon=True #将obj变成守护进程,主进程执行完毕后子进程跟着结束
obj.start() # 发送信号给操作系统
print('主')