生产者消费者模型
模型就是解决某个问题的固定方法或套路
1.1某种问题
生产者:泛指产生数据的一方
消费者:泛指处理数据的一方
由于生产者和消费者的对处理速度不一样,数据传输过程耦合度过高,造成CPU工作效率低的情况
1.2解决思路
- 先将双方解开耦合,让不同进程负责不同的任务
- 提供一个共享的容器,来平衡双方的能力,引入进程间通信的队列模型(Queue)
#模拟爬虫案例
from multiprocessing import Process,Queue
import requests
import re,os,time,random
#生产者任务
def product(urls,q):
i = 0
for url in urls:
response = requests.get(url)
data = response.text
time.sleep(random.random()) #模拟网路延时
q.put(data)
i += 1
#消费者任务
def customer(q):
while True:
data = q.get()
time.sleep(random.random())
res = re.findall('src=//(.*?) width',data)
if __name__ == '__main__':
urls_list = ["http://www.baidu.com"]
#创建队列进程
q = Queue()
p = Process(target=product,args=(urls_list,q))
p.start()
c = Process(target=customer,args=(q,))
c.start()
#虽然实现了并发的目的,由于进程间数据无法交互,所以无法控制进程的关闭
引入JoinableQueue继承了Queue的方法,增加了join和task_done这个组合
#如何关闭进程?
#1.确定生产者任务完成
#确定消费者任务完成
#情况一:生产者1与消费者1
from multiprocessing import Process,JoinableQueue
#生产者任务
def product(q):
for i in range(5):
q.put(i)
#消费者任务
def customer(q):
while True:
i = q.get()
print(i)
if __name__ == '__main__':
#创建双方共享的容器
q = JoinableQueue()
#生产者进程
p = Process(target=product,args=(q,))
p.start()
#消费者进程
c = Process(target=customer,args=(q,))
c.start()
c.daemon = True #守护生产进程,随生产进程的结束而结束
p.join() #生产任务结束
#情况二:多个生产
from multiprocessing import Process,JoinableQueue
#生产者任务
def product(q):
for i in range(5):
q.put(i)
#消费者任务
def customer(q):
while True:
i = q.get()
print(i)
q.task_done() #标记消费任务完成
if __name__ == '__main__':
#创建双方共享的容器
q = JoinableQueue()
#生产者进程
p1 = Process(target=product,args=(q,))
p2 = Process(target=product,args=(q,))
p1.start()
p2.start()
#消费者进程
c = Process(target=customer,args=(q,))
c.start()
p1.join()
p2.join() #生产任务结束
q.join() #队列任务完成
#结束所有任务
c.terminate() #直接终止消费进程