一. 管道(进程间通信)
#创建管道的类:
Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道
#参数介绍:
dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。
#主要方法:
conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
#其他方法:
conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
conn1.fileno():返回连接使用的整数文件描述符
conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
from multiprocessing import Process, Pipe
def f(conn):
conn.send("Hello 妹妹") #子进程发送了消息
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe() #建立管道,拿到管道的两端,双工通信方式,两端都可以收发消息
p = Process(target=f, args=(child_conn,)) #将管道的一段给子进程
p.start() #开启子进程
print(parent_conn.recv()) #主进程接受了消息
p.join()
应该特别注意管道端点的正确管理问题。如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起(就是阻塞)。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道的相同一端就会能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。
二. 数据共享: Manager模块
进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题,应该尽量避免使用本节所讲的共享数据的方式,以后我们会尝试使用数据库来解决进程之间的数据共享问题。
Manager默认加锁
from multiprocessing import Process,Manager,Lock
def func(dic, loc):
with loc:
dic['num'] -= 1
if __name__ == '__main__':
m = Manager()
loc = Lock()
dic = m.dict({'num': 100})
p_list = []
for i in range(100):
p = Process(target=func, args=(dic, loc))
p_list.append(p)
p.start()
[pp.join() for pp in p_list]
print('>>>', dic['num'])
三. 进程池
Pool([numprocess [,initializer [, initargs]]]):创建进程池
numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
initializer:是每个工作进程启动时要执行的可调用对象,默认为None
initargs:是要传给initializer的参数组
主要方法:
p.apply(func [ args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs), 然后返回结果。
'''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()'''
p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs), 然后返回结果。
'''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。'''
p.close():锁定不是关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
P.join():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
进程池的同步方法:
import time from multiprocessing import Process,Pool def func(i): num = 0 for j in range(5): num += i time.sleep(1) return num if __name__ == '__main__': pool = Pool(4) for i in range(10): res = pool.apply(func,args=(i,)) # print(res)
进程池的异步方法:
import time from multiprocessing import Process,Pool def func(i): num = 0 for j in range(5): num += i time.sleep(1) # print('>>>>>',num) return num if __name__ == '__main__': pool = Pool(4) red_list = [] for i in range(10): res = pool.apply_async(func,args=(i,)) # # print(res) red_list.append(res) # print(res.get()) # pool.close() #不是关闭进程池,只是锁定 # pool.join() # 等待进程池中所有的任务执行完,但是无法确认是否所有的任务真的全部执行完了,前面要加close方法 for resss in red_list: print(resss.get())
四. 回调函数
import os from multiprocessing import Pool def func1(n): print('func1>>',os.getpid()) print('func1') return n*n def func2(nn): print('func2>>',os.getpid()) print('func2') print(nn) # import time # time.sleep(0.5) if __name__ == '__main__': print('主进程:',os.getpid()) p = Pool(5) #args里面的10给了func1,func1的返回值作为回调函数的参数给了callback对应的函数,不能直接给回调函数直接传参数,他只能是你任务函数func1的函数的返回值 # for i in range(10,20): #如果是多个进程来执行任务,那么当所有子进程将结果给了回调函数之后,回调函数又是在主进程上执行的,那么就会出现打印结果是同步的效果。我们上面func2里面注销的时间模块打开看看 # p.apply_async(func1,args=(i,),callback=func2) p.apply_async(func1,args=(10,),callback=func2) p.close() p.join() #结果 # 主进程: 11852 #发现回调函数是在主进程中完成的,其实如果是在子进程中完成的,那我们直接将代码写在子进程的任务函数func1里面就行了,对不对,这也是为什么称为回调函数的原因。 # func1>> 17332 # func1 # func2>> 11852 # func2 # 100
import requests,json,os from multiprocessing import Pool def get_page(url): print('<进程%s> get %s' % (os.getpid(), url)) respone =requests.get(url) if respone.status_code == 200: return {'url': url, 'text': respone.txt} def parse_page(res): print('<进程%s> parse %s' % (os.getpid(), res['url'])) parse_res = 'url:<%s> size:[%s] ' % (res['url'], len(res['text'])) with open('db.txt', 'a') as f: f.write(parse_res) if __name__ == '__main__': urls = [ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p = Pool(3) res_l = [] for url in urls: res = p.apply_async(get_page, args=(url, ), callback=parse_page) res_l.append(res) p.close() p.join()
from multiprocessing import Pool import time,random import requests import re def get_page(url,pattern): response=requests.get(url) if response.status_code == 200: return (response.text,pattern) def parse_page(info): page_content,pattern=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0], 'title':item[1], 'actor':item[2].strip()[3:], 'time':item[3][5:], 'score':item[4]+item[5] } print(dic) if __name__ == '__main__': pattern1=re.compile(r'<dd>.*?board-index.*?>(d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get()
进程池和信号量的区别:
进程池是多个需要被执行的任务在进程池外面排队等待获取进程对象去执行自己,而信号量是一堆进程等待着去执行一段逻辑代码。
信号量不能控制创建多少个进程,但是可以控制同时多少个进程能够执行,但是进程池能控制你可以创建多少个进程。
举例:就像那些开大车拉煤的,信号量是什么呢,就好比我只有五个车道,你每次只能过5辆车,但是不影响你创建100辆车,但是进程池相当于什么呢?相当于你只有5辆车,每次5个车拉东西,拉完你再把车放回来,给别的人拉煤用。
其他语言里面有更高级的进程池,在设置的时候,可以将进程池中的进程动态的创建出来,当需求增大的时候,就会自动在进程池中添加进程,需求小的时候,自动减少进程,并且可以设置进程数量的上线,最多为多,python里面没有。