进程队列介绍
1 基本语法及过程
先进先出,后进后出,q = Queue()
过程
(1)把数据放到q队列中 put
(2)把书局从队列中拿出来 get
from multiprocessing import Process,Queue q = Queue() q.put(111) res = q.get() print (res)
执行
[root@node10 python]# python3 test.py 111
(3)当队列里面的值都拿出来了,已经没有数据的时候,在获取会阻塞
from multiprocessing import Process,Queue q = Queue() q.put(111) res = q.get() print (res) res = q.get()
执行
[root@node10 python]# python3 test.py 111 Traceback (most recent call last): #这里阻塞住,使用ctrl+c退出 File "test.py", line 6, in <module> res = q.get() File "/usr/lib64/python3.6/multiprocessing/queues.py", line 94, in get res = self._recv_bytes() File "/usr/lib64/python3.6/multiprocessing/connection.py", line 216, in recv_bytes buf = self._recv_bytes(maxlength) File "/usr/lib64/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes buf = self._recv(4) File "/usr/lib64/python3.6/multiprocessing/connection.py", line 379, in _recv chunk = read(handle, remaining) KeyboardInterrupt
(4)get_nowait 无论有没有都拿,如果拿不到,直接报错
from multiprocessing import Process,Queue q = Queue() q.put(111) res = q.get() print (res) res = q.get_nowait()
执行
(5)异常处理,抑制错误
语法:
try: code1 code2 except: code1 code2
把可能出现异常的代码扔到try代码块中
如果发生异常,直接执行except中的代码块,抑制错误
q = Queue() q.put(111) res = q.get() print (res) try: res = q.get_nowait() except: pass
执行
[root@node10 python]# python3 test.py 111
2 可以使用queue 指定队列长度
当指定队列的长度,例如对垒长度为3,则最多放3个,超过最大长度再放,会直接阻塞
from multiprocessing import Process,Queue q = Queue(3) q.put(1) q.put(2) q.put(3) # q.put(4) 阻塞 如果队列已经满了,在放值,直接阻塞 q.put_nowait(4) #如果队列已经满了,再放值,直接报错
执行
异常处理
from multiprocessing import Process,Queue q = Queue(3) q.put(1) q.put(2) q.put(3) # q.put(4) 阻塞 如果队列已经满了,在放值,直接阻塞 #q.put_nowait(4) #如果队列已经满了,再放值,直接报错 try: q.put_nowait(4) except: pass
不会报错,但是没有输出
1.2 full empty介绍
如果队列满了,返回真,反之亦然
from multiprocessing import Process,Queue q = Queue(3) q.put(1) q.put(2) q.put(3) # q.put(4) 阻塞 如果队列已经满了,在放值,直接阻塞 #q.put_nowait(4) #如果队列已经满了,再放值,直接报错 try: q.put_nowait(4) except: pass res = q.full() print(res)
执行
[root@node10 python]# python3 test.py vi test.py True
empty如果队列空了,返回真,反之亦然
from multiprocessing import Process,Queue q = Queue(3) q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) print(q.empty())
执行
[root@node10 python]# python3 test.py vi test.py 1 2 3 True
加如队列不为空
from multiprocessing import Process,Queue q = Queue(3) q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) #print(q.get()) print(q.empty())
执行
[root@node10 python]# python3 test.py vi test.py 1 2 False
1.3 进程之间的通讯
from multiprocessing import Process,Queue def func(q): #主进程添加值,子进程可以通过队列拿到 res = q.get() print ("I'm the subprocess",res) q.put("from subprocess") q = Queue() p = Process(target=func,args=(q,)) p.start() #主进程添加数据 q.put("I'm the main process")
执行
[root@node10 python]# python3 test.py vi test.py I'm the subprocess I'm the main process
产生问题
def func(q): #主进程添加值,子进程可以通过队列拿到 res = q.get() print ("I'm the subprocess",res) q.put("from subprocess") q = Queue() p = Process(target=func,args=(q,)) p.start() #主进程添加数据 q.put("I'm the main process") res = q.get() print(res)
执行
[root@node10 python]# python3 test.py vi test.py I'm the main process #这里发生阻塞
原因,是因为再主进程执行完成输入消息和消费消息之后,就结束了,子进程还没有执行完成,这时候子进程直接被干掉
使用join
子进程添加的值,主进程通过队列拿到
from multiprocessing import Process,Queue def func(q): #主进程添加值,子进程可以通过队列拿到 res = q.get() print ("I'm the subprocess",res) q.put("from subprocess") q = Queue() p = Process(target=func,args=(q,)) p.start() #主进程添加数据 q.put("I'm the main process") p.join()
#子进程添加的值,主进程通过队列拿到 res = q.get() print(res)
执行
[root@node10 python]# python3 test.py vi test.py I'm the subprocess I'm the main process from subprocess
二 生产者和消费者模型
2.1 介绍
爬虫例子
1号进程负责爬取网页中所有内容,可以看成一个生产者
2号进程负责匹配提取网页中的关键字,可以看成一个消费者
有时可能生产者必消费者块,反之亦然,所以为了减少生产者和消费者速度上的差异化,加了一个中间的缓冲队列
生产者和消费者模型从程序上看就是一个存放数据和拿取数据的过程,最为理想的生产者消费者模型,两者之间的速度相对平均.
2.2 建立模型
模拟汽车生产和消费
from multiprocessing import Process,Queue import random,time #消费者模型 def consumer(q,name): while True: car = q.get() print("%s buy a %s"%(name,car)) #生产者模型 def producer(q,name,car): for i in range(3): print("%s produce %s,%s"%(name,car,i)) q.put(car+str(i)) q = Queue() #消费者 c1 = Process(target=consumer,args=(q,"Marketting")) c1.start() #生产者 p1 = Process(target=producer,args=(q,"Factory","BYD")) p1.start()
执行
[root@node10 python]# vi test.py [root@node10 python]# python3 test.py vi test.py Factory produce BYD,0 Factory produce BYD,1 Factory produce BYD,2 Marketting buy a BYD0 Marketting buy a BYD1 Marketting buy a BYD2
生产太快,加一个延迟
import random,time #消费者模型 def consumer(q,name): while True: car = q.get() time.sleep(random.uniform(0.1,1)) print("%s buy a %s"%(name,car)) #生产者模型 def producer(q,name,car): for i in range(3): time.sleep(random.uniform(0.1,1)) print("%s produce %s,%s"%(name,car,i)) q.put(car+str(i)) q = Queue() #消费者 c1 = Process(target=consumer,args=(q,"Marketting")) c1.start() #生产者 p1 = Process(target=producer,args=(q,"Factory","BYD")) p1.start()
执行
[root@node10 python]# python3 test.py vi test.py Factory produce BYD,0 Factory produce BYD,1 Marketting buy a BYD0 Marketting buy a BYD1 Factory produce BYD,2 Marketting buy a BYD2 #这里有阻塞
解决最终的阻塞问题
from multiprocessing import Process,Queue import random,time #消费者模型 def consumer(q,name): #死循环,即一直等待生产出来,然后消费,最终在这里会阻塞 while True: car = q.get() #在这里判断,当car时空值,退出 if car is None: break #加入延迟阻塞 time.sleep(random.uniform(0.1,1)) print("%s buy a %s"%(name,car)) #生产者模型 def producer(q,name,car): for i in range(3): time.sleep(random.uniform(0.1,1)) print("%s produce %s,%s"%(name,car,i)) q.put(car+str(i)) q = Queue() #消费者 c1 = Process(target=consumer,args=(q,"Marketting")) c1.start() #生产者 p1 = Process(target=producer,args=(q,"Factory","BYD")) p1.start() #由于下面插入空值,而且在消费者做了判断,收到空值就会退出,这里执行完,主进程就会退出,导致消费者子进程无法完成消费,使用join p1.join() #在这里加一个控制,当执行完生产,在这里插入一个空值 q.put(None)
执行
[root@node10 python]# python3 test.py vi test.py Factory produce BYD,0 Marketting buy a BYD0 Factory produce BYD,1 Marketting buy a BYD1 Factory produce BYD,2 Marketting buy a BYD2
则一个简单模型建立