进程间通信——队列和管道(multiprocess.Queue、multiprocess.Pipe)
进程间通信
IPC(Inter-Process Communication)
队列:创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
Queue模块([maxsize]) 创建共享的进程队列。
参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。 底层队列使用管道和锁定实现。
q.get([block[,timeout]])
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False
,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为
可用,将引发Queue.Empty异常。
q.get_nowait( )
同q.get(False)方法。
q.put(item[,block [,timeout]])
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发
Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
不可靠的方法
q.qsize()
返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引
发NotImplementedError异常。
q.empty()
如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
q.full()
如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。
进程间的通信
import time
from multiprocessing import Process,Queue
def fun(q):
print(q.get())
q.put(2)
if __name__ == '__main__':
q=Queue()
p = Process(target=fun,args=[q,]).start()
q.put(1)
time.sleep(2)#不睡的话会卡在这里,睡之后先执行子进程
print(q.get())
from multiprocessing import Queue
q = Queue(3)
q.put(1)
q.put(1)如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列
q.put(1)如果队列中的数据一直不被取走,程序就会永远停在这里。
try: 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。
q.put_nowait(1)因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。
except:
print('123')# 会打印123
get_nowait()方法与q.put_nowait(1)一样
消费者模型
import time
import random
from multiprocessing import Process,Queue
生产者消费者模型
解决数据供需不平衡的情况
队列是进程安全的 内置了锁来保证队列中的每一个数据都不会被多个进程重复取
def consumer(q,name):
while True:
food = q.get()
if food == 'done':break
time.sleep(random.random())
print('%s吃了%s'%(name,food))
def producer(q,name,food):
for i in range(10):
time.sleep(random.random())
print('%s生产了%s%s'%(name,food,i))
q.put('%s%s'%(food,i))
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer,args=[q,'Egon','泔水'])
p2 = Process(target=producer,args=[q,'Yuan','骨头鱼刺'])
p1.start()
p2.start()
Process(target=consumer,args=[q,'alex']).start()
Process(target=consumer,args=[q,'wusir']).start()
p1.join()
p2.join()
q.put('done')
q.put('done')
进阶版 JoinableQueue模块
from multiprocessing import Process,JoinableQueue
def consumer(q,name,):
while True:
food = q.get() 消费食物
print('%s吃了%s'%(name,food))
q.task_done() consumer每完成一个任务就会给q发送一个taskdone
def producer(q,name,food):
for i in range(10):
print('%s生产了%s%s'%(name,food,i))
q.put('%s%s'%(food,i)) 生产食物
q.join() 等到所有的数据都被taskdone才结束
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=producer,args=[q,'egon','泔水'])
p2 = Process(target=producer,args=[q,'alex','鱼刺'])
p1.start()
p2.start()
c1 = Process(target=consumer,args=[q,'taibai',])
c2 = Process(target=consumer,args=[q,'wusir',])
c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
p1.join()
p2.join()
JoinableQueue的实例p除了与Queue对象相同的方法之外,还具有以下方法:
q.task_done()
使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。
q.join()
生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。
下面的例子说明如何建立永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列,并等待它们被处理。
producer
put
生产完全部的数据就没有其他工作了
在生产数据方 : 允许执行q.join
join会发起一个阻塞,直到所有当前队列中的数据都被消费
consumer
get 获取到数据
处理数据
q.task_done() 告诉q,刚刚从q获取的数据已经处理完了
consumer每完成一个任务就会给q发送一个taskdone
producer在所有的数据都生产完之后会执行q.join()
producer会等待consumer消费完数据才结束
主进程中对producer进程进行join
主进程中的代码会等待producer执行完才结束
producer结束就意味着主进程代码的结束
consumer作为守护进程结束
consumer中queue中的所有数据被消费
producer join结束
主进程的代码结束
consumer结束
主进程结束
管道 Pipe模块
![](https://images2018.cnblogs.com/blog/1349570/201805/1349570-20180514174516589-114480639.png)
from multiprocessing import Pipe
A,B = Pipe()
A.send('1234')
print(B.recv())
A.send('1234')
print(B.recv())
应该特别注意管道端点的正确管理问题。如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。
这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能
在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成
EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。
from multiprocessing import Process, Pipe
def f(parent_conn,child_conn):
parent_conn.close() #不写close将不会引发EOFError
while True:
try:
print(child_conn.recv())
except EOFError:
child_conn.close()
break
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(parent_conn,child_conn,))
p.start()
child_conn.close()
parent_conn.send('hello')
parent_conn.send('hello')
parent_conn.send('hello')
parent_conn.close()
p.join()
数据共享 Manager模块
from multiprocessing import Manager,Process,Lock
def func(dic,lock):
with lock: 上下文管理 :必须有一个开始动作 和 一个结束动作的时候
dic['count'] = dic['count'] - 1
if __name__ == '__main__':
m = Manager()
lock = Lock()
dic = m.dict({'count':100})
p_lst = []
for i in range(100):
p = Process(target=func,args=[dic,lock])
p_lst.append(p)
p.start()
for p in p_lst:p.join()
print(dic)
同一台机器上 : Queue
在不同台机器上 :消息中间件
进程池和multiprocess.Pool模块
定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果
Pool([numprocess [,initializer [, initargs]]]):创建进程池
参数介绍:
1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
3 initargs:是要传给initializer的参数组
主要方法:
1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
2 '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同
线程调用p.apply()函数或者使用p.apply_async()'''
3
4 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
5 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递
给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。'''
6
7 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
8
9 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
import time
import random
from multiprocessing import Pool
def func(i):
print('func%s' % i)
time.sleep(random.randint(1,3))
return i**2 # 平方
if __name__ == '__main__':
p = Pool(5)
ret_l = []
for i in range(15):
# p.apply(func=func,args=(i,)) # 同步调用
ret = p.apply_async(func=func,args=(i,))# 异步调用 返回函数的值
ret_l.append(ret)
for ret in ret_l : print(ret.get())
# 主进程和所有的子进程异步了
其他方法:
1 方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
2 obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程
操作中引发了异常,它将在调用此方法时再次被引发。
3 obj.ready():如果调用完成,返回True
4 obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
5 obj.wait([timeout]):等待结果变为可用。
6 obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
回调函数(爬虫)
需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数
去处理该结果,该函数即回调函数我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执
行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。
import os
from urllib.request import urlopen
from multiprocessing import Pool
def get_url(url):
print('-->',url,os.getpid())
ret = urlopen(url)
# content = ret.read()
return url
def call(url):
# 分析
print(url,os.getpid())
if __name__ == '__main__':
print(os.getpid())
l = [
'http://www.baidu.com', # 5
'http://www.sina.com',
'http://www.sohu.com',
'http://www.sogou.com',
'http://www.qq.com',
'http://www.bilibili.com', #0.1
]
p = Pool(5) # count(cpu)+1
ret_l = []
for url in l:
ret = p.apply_async(func = get_url,args=[url,],callback=call)
ret_l.append(ret)
for ret in ret_l : ret.get()
# 回调函数
# 在进程池中,起了一个任务,这个任务对应的函数在执行完毕之后
# 的返回值会自动作为参数返回给回调函数
# 回调函数就根据返回值再进行相应的处理
# 回调函数 是在主进程执行的