本章讨论python3.2引入的concurrent.futures模块。future是中文名叫期物。期物是一种对象,表示异步执行的操作
在很多任务中,特别是处理网络I/O。需要使用并发,因为网络有很高的延迟。所以为了不浪费CPU周期去等待,最好在收到网络响应之前做些其他的事。
首先来看下并发和非并发的两个脚本,来对比下各自的运行效率。在这个程序中,我们通过脚本去网站下载各个国家的国旗。网址是http://flupy.org/data/flags/cn/cn.gif
这里http://flupy.org/是基本URL,后面接/data/flags/国家名称/国家名称.gif
首先来看下顺序下载的代码:
country=('CN IN US ID BR PK NG BD JP MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL='http://flupy.org/data/flags'
DEST_URL='downloads/'
def save_flag(img,filename):
path=os.path.join(DEST_URL,filename)
with open(path,'wb') as f:
f.write(img)
def get_flag(cc):
url='{}/{cc}/{cc}.gif'.format(BASE_URL,cc=cc.lower())
resp=requests.get(url)
return resp.content
def show(text):
print(text)
sys.stdout.flush()
def download_many(cc_list):
for cc in sorted(cc_list):
img=get_flag(cc)
show(cc)
save_flag(img,cc.lower()+'.gif')
return len(cc_list)
def main(download_many):
t1=time.time()
count=download_many(country)
elapsed=time.time()-t1
msg=' {} flags downloaded in {:.2f}s'
print(msg.format(count,elapsed))
if __name__=="__main__":
main(download_many)
代码中通过requests对图片进行下载并且保存到downloads文件夹下面。并且在main中统计代码运行的时间。运行结果如下:
/usr/bin/python3.6 /home/zhf/py_prj/function_test/test.py
BD
BR
CD
CN
DE
EG
ET
FR
ID
IN
IR
JP
MX
NG
PH
PK
TR
US
VN
19 flags downloaded in 15.29s
下载了19个图片总共花费15.29秒。接下来我们用concurrent.futures模块来对代码进行改造。在这里添加download_one和download_many_futures两个函数
在ThreadPoolExecutor中设置最大运行的线程max_workers为3个
executor.submit中传入单个的回调函数和参数
future.result()返回的是每个线程运行完后的结果,在这里就是download_one的返回值
futures.as_completed是一个迭代器,在期物运行结束后产出期物
def download_one(cc):
image=get_flag(cc)
show(cc)
save_flag(image,cc.lower()+'.gif')
return cc
def download_many_futures(cc_list):
with futures.ThreadPoolExecutor(max_workers=3) as executor:
to_do=[]
for cc in sorted(cc_list):
future=executor.submit(download_one,cc)
to_do.append(future)
msg='Scheduled for {}:{}'
print(msg.format(cc,future))
results=[]
for future in futures.as_completed(to_do):
res=future.result()
msg='{} result:{!r}'
print(msg.format(future,res))
results.append(res)
return len(results)
来看下运行的结果:总共耗时6.72秒,比之前的顺序下载节省了一半的时间
Scheduled for BD:<Future at 0x7f51f9c7e908 state=running>
Scheduled for BR:<Future at 0x7f51f9c7ef60 state=running>
Scheduled for CD:<Future at 0x7f51f8a20518 state=running>
Scheduled for CN:<Future at 0x7f51f8a20ef0 state=pending>
Scheduled for DE:<Future at 0x7f51f8a20f60 state=pending>
Scheduled for EG:<Future at 0x7f51f8a2c048 state=pending>
Scheduled for ET:<Future at 0x7f51f8a2c0f0 state=pending>
Scheduled for FR:<Future at 0x7f51f8a2c198 state=pending>
Scheduled for ID:<Future at 0x7f51f8a2c278 state=pending>
Scheduled for IN:<Future at 0x7f51f8a2c358 state=pending>
Scheduled for IR:<Future at 0x7f51f8a2c438 state=pending>
Scheduled for JP:<Future at 0x7f51f8a2c518 state=pending>
Scheduled for MX:<Future at 0x7f51f8a2c5f8 state=pending>
Scheduled for NG:<Future at 0x7f51f8a2c6d8 state=pending>
Scheduled for PH:<Future at 0x7f51f8a2c7b8 state=pending>
Scheduled for PK:<Future at 0x7f51f8a2c898 state=pending>
Scheduled for TR:<Future at 0x7f51f8a2c978 state=pending>
Scheduled for US:<Future at 0x7f51f8a2ca58 state=pending>
Scheduled for VN:<Future at 0x7f51f8a2cb38 state=pending>
BD
<Future at 0x7f51f9c7e908 state=finished returned str> result:'BD'
BR
<Future at 0x7f51f9c7ef60 state=finished returned str> result:'BR'
CD
<Future at 0x7f51f8a20518 state=finished returned str> result:'CD'
CN
<Future at 0x7f51f8a20ef0 state=finished returned str> result:'CN'
DE
<Future at 0x7f51f8a20f60 state=finished returned str> result:'DE'
EG
<Future at 0x7f51f8a2c048 state=finished returned str> result:'EG'
FR
<Future at 0x7f51f8a2c198 state=finished returned str> result:'FR'
ID
ET
<Future at 0x7f51f8a2c0f0 state=finished returned str> result:'ET'
<Future at 0x7f51f8a2c278 state=finished returned str> result:'ID'
IN
<Future at 0x7f51f8a2c358 state=finished returned str> result:'IN'
JP
<Future at 0x7f51f8a2c518 state=finished returned str> result:'JP'
IR
<Future at 0x7f51f8a2c438 state=finished returned str> result:'IR'
NG
<Future at 0x7f51f8a2c6d8 state=finished returned str> result:'NG'
MX
<Future at 0x7f51f8a2c5f8 state=finished returned str> result:'MX'
PK
<Future at 0x7f51f8a2c898 state=finished returned str> result:'PK'
PH
<Future at 0x7f51f8a2c7b8 state=finished returned str> result:'PH'
TR
<Future at 0x7f51f8a2c978 state=finished returned str> result:'TR'
US
<Future at 0x7f51f8a2ca58 state=finished returned str> result:'US'
VN
<Future at 0x7f51f8a2cb38 state=finished returned str> result:'VN'
19 flags downloaded in 6.72s。如果我们将max_workers设置为更大的值,比如设置为10, 得到的运行时间将会更快。通过运行的结果最快的时候达到1.9秒。
我们都知道python有全局解释器锁GIL,一次只允许使用一个线程执行python字节码。因此一个Python进程通过不能同时使用多个CPU核。关于GIL的解释可以参考http://cenalulu.github.io/python/gil-in-python/这篇帖子。
那么如果我们有多个CPU核(os.cpu_count可以查看有多少个CPU核),我们该怎么利用呢。这里就要用到futures.ProcessPoolExecutor,ProcessPoolExecutor将工作分配给多个python进程处理。因此如果需要做CPU密集型处理,使用这个模块能够绕开GIL,利用所有可用的CPU核。我们将代码修改为ProcessPoolExecutor来执行看下结果
19 flags downloaded in 4.92s。这个比设置
futures.ThreadPoolExecutor(max_workers=10)的时候还要慢一些。这是由于我的电脑只有4个CPU核,因此限制只能4个并发下载。但是线程版本使用的是10个线程。在用线程运行的时候,所有阻塞型的I/O函数都会释放GIL,允许其他线程运行。time.sleep()也会释放GIL,因此尽管有GIL,python的线程还是能发挥作用。
在一章中我们将介绍asynchio包处理并发