一 .进程池(multiprocess.Pool)
1.进程池概念
为什么要有进程池?进程池的概念。
在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先
,创建进程需要消耗时间,销毁进程也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据
任务开启或者结束进程。那么我们要怎么做呢?
在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将
进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,
池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。
Pool([numprocess [,initializer [, initargs]]]):创建进程池 1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值 2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None 3 initargs:是要传给initializer的参数组
import os
import time
import random
from multiprocessing import Pool
多进程和进程
from multiprocessing import Pool,Proces def run(n): for i in range(10): print(n+1) if __name__ == '__main__':
进程池 p=Pool(5) # 参数表示5个进程 (类似于5个cppu) p.map(run, range(50)) # 50任务 # 等同于上面只是时间上的差异 进程池更快 下面这个要慢带你 多进程 for i in range(50): Process(target=run,args=(i,)).start()
进程池和多进程效率对比
注意:map : 这个map在这里自带了join方法 不需要人为添加
def run(n): for i in range(10): print(n+1)
if __name__ == '__main__':
进程池 str1=time.time() p=Pool(5) # 参数表示5个进程 (类似于5个cppu) p.map(run, range(50)) # 50任务 这个map在这里自带了join方法 不需要人为添加 t1=time.time()-str1 # 0.26728272438049316 进程池执行时间 多进程 str2 = time.time() ret=[] for i in range(50): p1=Process(target=run,args=(i,)) ret.append(p1) p1.start() for i in ret: i.join() t2=time.time()-str2 # 2.7745769023895264 没有使用进程池执行时间 print(t1,t2)
map进程池传参数
# map : 这个map在这里自带了join方法 不需要人为添加 def run(n): for i in range(10): print(n+1) def run1(n): print(n) # ('aa', 100) print(n[1]) # 100 if __name__ == '__main__': p=Pool(5) # 参数表示5个进程 (类似于5个cppu) p.map(run, range(5)) # 50任务 这个map在这里自带了join方法 不需要人为添加 p.map(run1,[("aa",100)])
2.进程同步(apply)
# 同步 import os,time from multiprocessing import Pool def work(n): print("开始 run%s"%n,os.getpid()) time.sleep(1) print("结束 run%s" % n, os.getpid()) if __name__ == '__main__': p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务 res_l=[] for i in range(10): res=p.apply(work,args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞 # 但不管该任务是否存在阻塞,同步调用都会在原地等着 print(res_l, "空") # # 执行结果: # 开始 run0 14036 # 结束 run0 14036 # 开始 run1 18312 # 结束 run1 18312 # 开始 run2 12744 # 结束 run2 12744 # 开始 run3 14036 # 结束 run3 14036 # 开始 run4 18312 # 结束 run4 18312 # 开始 run5 12744 # 结束 run5 12744 # 开始 run6 14036 # 结束 run6 14036 # 开始 run7 18312 # 结束 run7 18312 # 开始 run8 12744 # 结束 run8 12744 # 开始 run9 14036 # 结束 run9 14036 # [] 空 # 进程已结束,退出代码 0
3.进程异步( apply_async)
# 异步 带问题的 import os import time import random from multiprocessing import Pool def work(n): print("开始 run%s" % n, os.getpid()) time.sleep(1) print("结束 run%s" % n, os.getpid()) if __name__ == '__main__': print("主进程") p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务 for i in range(10): res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行 # 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务 # 注意这里是一个真异步 就是主进程不会等子进程结束 而是主进程执行完了不会管子进程 因为没有发感知进程池结束 # 执行结果 # 主进程 # 进程已结束,退出代码 0
# 异步 import os import time import random from multiprocessing import Pool def work(n): print("开始 run%s" % n, os.getpid()) time.sleep(1) print("结束 run%s" % n, os.getpid()) if __name__ == '__main__': p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务 for i in range(10): res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行 # 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务 p.close() # 结束进程池接收任务 p.join() # 感知进程池中的任务结束 # 执行结果 # 开始 run0 12284 # 开始 run1 4864 # 开始 run2 18452 # 结束 run0 12284 # 开始 run3 12284 # 结束 run1 4864 # 开始 run4 4864 # 结束 run2 18452 # 开始 run5 18452 # 结束 run3 12284 # 开始 run6 12284 # 结束 run4 4864 # 开始 run7 4864 # 结束 run5 18452 # 开始 run8 18452 # 结束 run6 12284 # 开始 run9 12284 # 结束 run7 4864 # 结束 run8 18452 # 结束 run9 12284 # 进程已结束,退出代码 0
import os import time import random from multiprocessing import Pool def work(n): print("开始 run%s" % n, os.getpid()) time.sleep(1) print("结束 run%s" % n, os.getpid()) if __name__ == '__main__': print("主进程") p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务 res_l=[] for i in range(5): res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行 # 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务 # 需要注意的是,进程池中的三个进程不会同时开启或者同时结束 # 而是执行完一个就释放一个进程,这个进程就去接收新的任务。 res_l.append(res) # 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果 # 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了 p.close() p.join() # print(res_l,"》》》》》》》》》》") for res in res_l: print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get # 执行结果 主进程 开始 run0 14684 开始 run1 15180 开始 run2 18996 结束 run0 14684 开始 run3 14684 结束 run1 15180 开始 run4 15180 结束 run2 18996 结束 run3 14684 结束 run4 15180 None None None None None 进程已结束,退出代码 0
from multiprocessing import Pool import time,random,os def work(n): time.sleep(0.5) return n**2 if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待进程池中所有进程执行完毕 for aa in res_l: print(aa.get()) # 获取进程池中所有数据 # 0 # 1 # 4 # 9 # 16 # 25 # 36 # 49 # 64 # 81
# 如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数。 from multiprocessing import Pool import time,random,os def work(n): time.sleep(1) return n**2 if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待进程池中所有进程执行完毕 nums=[] for res in res_l: nums.append(res.get()) #拿到所有结果 print(nums) #主进程拿到所有的处理结果,可以在主进程中进行统一进行处理 # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
4.进程池版 socket
server
#Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count()) #开启6个客户端,会发现2个客户端处于等待状态 #在每个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程 from socket import * from multiprocessing import Pool import os server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn): print('进程pid: %s' %os.getpid()) while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': p=Pool(4) while True: conn,*_=server.accept() p.apply_async(talk,args=(conn,)) # p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问
client
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
5. 进程池回调函数(主要用于爬虫)
需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程: 我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数
# 先执行fun1函数 fun1函数的返回值 作为回调函数的参数 然后去执行回调函数
import os import time import random from multiprocessing import Pool # 先执行fun1函数 fun1函数的返回值 作为回调函数的参数 然后去执行回调函数 def fun1(n): print("这是fun1函数",os.getpid()) return n*n def fun2(aa): # 参数只能回调哪个参数 print("这是fun2函数",os.getpid()) print(aa) if __name__ == '__main__': print("主进程",os.getpid()) p=Pool(5) p.apply_async(fun1,args=(10,),callback=fun2) p.close() p.join() # 说明fun2回调函数 回到 主进程中调用 执行结果 主进程 19268 这是fun1函数 12740 这是fun2函数 19268 100 进程已结束,退出代码 0
import os import time import random from multiprocessing import Pool # 先执行fun1函数 fun1函数的返回值 作为回调函数的参数 然后去执行回调函数 def fun1(n): print("这是fun1函数",os.getpid()) return n*n def fun2(aa): # 参数只能回调哪个参数 print("这是fun2函数",os.getpid()) print(aa) if __name__ == '__main__': print("主进程",os.getpid()) p=Pool(5) p.apply_async(fun1,args=(5,),callback=fun2) p.close() p.join() # 说明fun2回调函数 回到 主进程中调用 执行结果 主进程 7164 这是fun1函数 3272 这是fun2函数 7164 25 进程已结束,退出代码 0
爬虫案例
import requests from multiprocessing import Pool aa=requests.get("https://www.baidu.com") print(aa) print(aa.status_code) print(aa.text) print("11111",aa.content)
print(aa.__dict__) 查看里面属性
print("**************************************")
执行结果
<Response [200]>
200
<!DOCTYPE html>
<!--STATUS OK--><html> <head><meta http-equiv=content-type content=text/html;charset=utf-8><meta http-equiv=X-UA-Compatible content=IE=Edge><meta content=always name=referrer><link rel=stylesheet type=text/css href=https://ss1.bdstatic.com/5eN1bjq8AAUYm2zgoY3K/r/www/cache/bdorz/baidu.min.css><title>ç¾åº¦ä¸ä¸ï¼ä½ å°±ç¥é</title></head> <body link=#0000cc> <div id=wrapper> <div id=head> <div class="head_wrapper"> <div class="s_form"> <div class="s_form_wrapper"> <div id=lg> <img hidefocus=true src=//www.baidu.com/img/bd_logo1.png width=270 height=129> </div> <form id=form name=f action=//www.baidu.com/s class="fm"> <input type=hidden name=bdorz_come value=1> <input type=hidden name=ie value=utf-8> <input type=hidden name=f value=8> <input type=hidden name=rsv_bp value=1> <input type=hidden name=rsv_idx value=1> <input type=hidden name=tn value=baidu><span class="bg s_ipt_wr"><input id=kw name=wd class="s_ipt" value maxlength=255 autocomplete=off autofocus=autofocus></span><span class="bg s_btn_wr"><input type=submit id=su value=ç¾åº¦ä¸ä¸ class="bg s_btn" autofocus></span> </form> </div> </div> <div id=u1> <a href=http://news.baidu.com name=tj_trnews class="mnav">æ°é»</a> <a href=https://www.hao123.com name=tj_trhao123 class="mnav">hao123</a> <a href=http://map.baidu.com name=tj_trmap class="mnav">å°å¾</a> <a href=http://v.baidu.com name=tj_trvideo class="mnav">è§é¢</a> <a href=http://tieba.baidu.com name=tj_trtieba class="mnav">è´´å§</a> <noscript> <a href=http://www.baidu.com/bdorz/login.gif?login&tpl=mn&u=http%3A%2F%2Fwww.baidu.com%2f%3fbdorz_come%3d1 name=tj_login class="lb">ç»å½</a> </noscript> <script>document.write('<a href="http://www.baidu.com/bdorz/login.gif?login&tpl=mn&u='+ encodeURIComponent(window.location.href+ (window.location.search === "" ? "?" : "&")+ "bdorz_come=1")+ '" name="tj_login" class="lb">ç»å½</a>');
</script> <a href=//www.baidu.com/more/ name=tj_briicon class=bri style="display: block;">æ´å¤äº§å</a> </div> </div> </div> <div id=ftCon> <div id=ftConw> <p id=lh> <a href=http://home.baidu.com>å ³äºç¾åº¦</a> <a href=http://ir.baidu.com>About Baidu</a> </p> <p id=cp>©2017 Baidu <a href=http://www.baidu.com/duty/>使ç¨ç¾åº¦åå¿ è¯»</a> <a href=http://jianyi.baidu.com/ class=cp-feedback>æè§åé¦</a> 京ICPè¯030173å· <img src=//www.baidu.com/img/gs.gif> </p> </div> </div> </div> </body> </html>
11111 b'<!DOCTYPE html> <!--STATUS OK--><html> <head><meta http-equiv=content-type content=text/html;charset=utf-8><meta'
b' http-equiv=X-UA-Compatible content=IE=Edge><meta content=always name=referrer><link rel=stylesheet type=text/css '
b'href=https://ss1.bdstatic.com/5eN1bjq8AAUYm2zgoY3K/r/www/cache/bdorz/baidu.min.css><title>xe7x99xbexe5xbaxa6
xe4xb8x80xe4xb8x8bxefxbcx8cxe4xbdxa0xe5xb0xb1xe7x9fxa5xe9x81x93</title></head> <body link=#0000cc> '
b'<div id=wrapper> <div id=head> <div class="head_wrapper"> <div class="s_form"> <div class="s_form_wrapper"> <div id=lg> <img'
b'hidefocus=true src=//www.baidu.com/img/bd_logo1.png width=270 height=129> </div> <form id=form name=f action=//www.baid'
b'u.com/s class="fm"> <input type=hidden name=bdorz_come value=1> <input type=hidden name=ie value=utf-8> <input type=hidde'
b'n name=f value=8> <input type=hidden name=rsv_bp value=1> <input type=hidden name=rsv_idx value=1> <input type=hidden n'
b'ame=tn value=baidu><span class="bg s_ipt_wr"><input id=kw name=wd class="s_ipt" value maxlength=255 autocomplete=off auto'
b'focus=autofocus></span><span class="bg s_btn_wr"><input type=submit id=su value=xe7x99xbexe5xbaxa6xe4xb8x80xe'
b'4xb8x8b class="bg s_btn" autofocus></span> </form> </div> </div> <div id=u1> <a href=http://news.baidu.com name=tj_trn'
b'ews class="mnav">xe6x96xb0xe9x97xbb</a> <a href=https://www.hao123.com name=tj_trhao123 class="mnav">hao123</a> <a hre'
b'f=http://map.baidu.com name=tj_trmap class="mnav">xe5x9cxb0xe5x9bxbe</a> <a href=http://v.baidu.com name=tj_trvideo '
xe9xa6x88</a> xe4xbaxacICPxe8xafx81030173xe5x8fxb7 '
b'<img src=//www.baidu.com/img/gs.gif> </p> </div> </div> </div> </body> </html> '
def get_ali(url): # print(url) # res=requests.get(url) # print(res) # print(res.status_code) # print(res.text) res=requests.get(url) if res.status_code==200: return res.content.decode("utf-8"),url,res.text def show(args): cont,url,text=args print(cont,text) print(url,len(cont)) if __name__=="__main__": ret=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=Pool(5) for url in ret: p.apply_async(get_ali,args=(url,),callback=show) p.close() p.join()
使用多进程请求多个url来减少网络等待浪费的时间
from multiprocessing import Pool import requests import json import os def get_page(url): print('<进程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def pasrse_page(res): print('<进程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s] ' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=Pool(3) res_l=[] for url in urls: res=p.apply_async(get_page,args=(url,),callback=pasrse_page) res_l.append(res) p.close() p.join() print([res.get() for res in res_l]) #拿到的是get_page的结果,其实完全没必要拿该结果,该结果已经传给回调函数处理了 ''' 打印结果: <进程3388> get https://www.baidu.com <进程3389> get https://www.python.org <进程3390> get https://www.openstack.org <进程3388> get https://help.github.com/ <进程3387> parse https://www.baidu.com <进程3389> get http://www.sina.com.cn/ <进程3387> parse https://www.python.org <进程3387> parse https://help.github.com/ <进程3387> parse http://www.sina.com.cn/ <进程3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html> ...',...}] '''
import re from urllib.request import urlopen from multiprocessing import Pool def get_page(url,pattern): response=urlopen(url).read().decode('utf-8') return pattern,response def parse_page(info): pattern,page_content=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0].strip(), 'title':item[1].strip(), 'actor':item[2].strip(), 'time':item[3].strip(), } print(dic) if __name__ == '__main__': regex = r'<dd>.*?<.*?class="board-index.*?>(d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>' pattern1=re.compile(regex,re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get() {'index': '1', 'title': '绝杀慕尼黑', 'actor': '主演:弗拉基米尔·马什科夫,约翰·萨维奇,伊万·科列斯尼科夫', 'time': '上映时间:2019-06-13'} {'index': '2', 'title': '千与千寻', 'actor': '主演:柊瑠美,周冬雨,入野自由', 'time': '上映时间:2019-06-21'} {'index': '3', 'title': '命运之夜——天之杯II :迷失之蝶', 'actor': '主演:杉山纪彰,下屋则子,神谷浩史', 'time': '上映时间:2019-07-12'} {'index': '4', 'title': '玩具总动员4', 'actor': '主演:汤姆·汉克斯,蒂姆·艾伦,安妮·波茨', 'time': '上映时间:2019-06-21'} {'index': '5', 'title': '扫毒2天地对决', 'actor': '主演:刘德华,古天乐,苗侨伟', 'time': '上映时间:2019-07-05'} {'index': '6', 'title': '蜘蛛侠:英雄远征', 'actor': '主演:汤姆·赫兰德,杰克·吉伦哈尔,塞缪尔·杰克逊', 'time': '上映时间:2019-06-28'} {'index': '7', 'title': '爱宠大机密2', 'actor': '主演:帕顿·奥斯瓦尔特,冯绍峰,凯文·哈特', 'time': '上映时间:2019-07-05'} {'index': '8', 'title': '狮子王', 'actor': '主演:唐纳德·格洛弗,塞斯·罗根,詹姆斯·厄尔·琼斯', 'time': '上映时间:2019-07-12'} {'index': '9', 'title': '机动战士高达NT', 'actor': '主演:榎木淳弥,村中知,松浦爱弓', 'time': '上映时间:2019-07-12'} {'index': '10', 'title': '最好的我们', 'actor': '主演:陈飞宇,何蓝逗,惠英红', 'time': '上映时间:2019-06-06'}
6. 进程池返回值
异步获取返回值
def fun (i): # time.sleep(1) return i*i if __name__ == '__main__': p=Pool(5) for i in range(6): res=p.apply_async(fun,args=(i,)) # # print(res) print(res.get()) #等待计算结果 阻塞获取结果 等待下次循环 这里相当于join # print(res.get())获取到值但是和同步一样 但是我们这里是异步 所以异步get阻塞
解决方案
import os import time import random from multiprocessing import Pool # 异步获取返回值 表示五个五个返回数据 def fun (i): time.sleep(1) return i*i if __name__ == '__main__': p=Pool(5) ret=[] for i in range(22): res=p.apply_async(fun,args=(i,)) # ret.append(res) #等待计算结果 阻塞获取结果 等待下次循环 这里相当于join for s in ret: print( s.get()) # 异步获取返回值 表示五个五个返回数据
异步获取返回值 map
# map 自带join方法 close 方法 map 会先把每一个计算结果 添加到列表中 在返回 列表 def fun (i): time.sleep(1) return i*i if __name__ == '__main__': p=Pool(5) res=p.map(fun,range(5)) # print(res) # [0, 1, 4, 9, 16]
同步获取返回值
def fun (i): return i*i if __name__ == '__main__': p=Pool(5) for i in range(5): res=p.apply(fun,args=(i,)) # apply结果就是fun的返回值 print(res) # 0 # 1 # 4 # 9 # 16