zoukankan      html  css  js  c++  java
  • 流畅python学习笔记:第十七章:并发处理二

    本章讨论python3.2引入的concurrent.futures模块。future是中文名叫期物。期物是一种对象,表示异步执行的操作

    在很多任务中,特别是处理网络I/O。需要使用并发,因为网络有很高的延迟。所以为了不浪费CPU周期去等待,最好在收到网络响应之前做些其他的事。

    首先来看下并发和非并发的两个脚本,来对比下各自的运行效率。在这个程序中,我们通过脚本去网站下载各个国家的国旗。网址是http://flupy.org/data/flags/cn/cn.gif

    这里http://flupy.org/是基本URL,后面接/data/flags/国家名称/国家名称.gif

    首先来看下顺序下载的代码:

    country=('CN IN US ID BR PK NG BD JP MX PH VN ET EG DE IR TR CD FR').split()

    BASE_URL='http://flupy.org/data/flags'

    DEST_URL='downloads/'

    def save_flag(img,filename):

        path=os.path.join(DEST_URL,filename)

        with open(path,'wb') as f:

            f.write(img)

    def get_flag(cc):

        url='{}/{cc}/{cc}.gif'.format(BASE_URL,cc=cc.lower())

        resp=requests.get(url)

        return resp.content

    def show(text):

        print(text)

        sys.stdout.flush()

    def download_many(cc_list):

        for cc in sorted(cc_list):

            img=get_flag(cc)

            show(cc)

            save_flag(img,cc.lower()+'.gif')

        return len(cc_list)

    def main(download_many):

        t1=time.time()

        count=download_many(country)

        elapsed=time.time()-t1

        msg=' {} flags downloaded in {:.2f}s'

        print(msg.format(count,elapsed))

    if __name__=="__main__":

        main(download_many)

    代码中通过requests对图片进行下载并且保存到downloads文件夹下面。并且在main中统计代码运行的时间。运行结果如下:

    /usr/bin/python3.6 /home/zhf/py_prj/function_test/test.py

    BD

    BR

    CD

    CN

    DE

    EG

    ET

    FR

    ID

    IN

    IR

    JP

    MX

    NG

    PH

    PK

    TR

    US

    VN

    19 flags downloaded in 15.29s

    下载了19个图片总共花费15.29秒。接下来我们用concurrent.futures模块来对代码进行改造。在这里添加download_onedownload_many_futures两个函数

    在ThreadPoolExecutor中设置最大运行的线程max_workers为3

    executor.submit中传入单个的回调函数和参数

    future.result()返回的是每个线程运行完后的结果,在这里就是download_one的返回值

    futures.as_completed是一个迭代器,在期物运行结束后产出期物

    def download_one(cc):

        image=get_flag(cc)

        show(cc)

        save_flag(image,cc.lower()+'.gif')

        return cc

    def download_many_futures(cc_list):

        with futures.ThreadPoolExecutor(max_workers=3) as executor:

            to_do=[]

            for cc in sorted(cc_list):

                future=executor.submit(download_one,cc)

                to_do.append(future)

                msg='Scheduled for {}:{}'

                print(msg.format(cc,future))

            results=[]

            for future in futures.as_completed(to_do):

                res=future.result()

                msg='{} result:{!r}'

                print(msg.format(future,res))

                results.append(res)

    return len(results)

    来看下运行的结果:总共耗时6.72秒,比之前的顺序下载节省了一半的时间

    Scheduled for BD:<Future at 0x7f51f9c7e908 state=running>

    Scheduled for BR:<Future at 0x7f51f9c7ef60 state=running>

    Scheduled for CD:<Future at 0x7f51f8a20518 state=running>

    Scheduled for CN:<Future at 0x7f51f8a20ef0 state=pending>

    Scheduled for DE:<Future at 0x7f51f8a20f60 state=pending>

    Scheduled for EG:<Future at 0x7f51f8a2c048 state=pending>

    Scheduled for ET:<Future at 0x7f51f8a2c0f0 state=pending>

    Scheduled for FR:<Future at 0x7f51f8a2c198 state=pending>

    Scheduled for ID:<Future at 0x7f51f8a2c278 state=pending>

    Scheduled for IN:<Future at 0x7f51f8a2c358 state=pending>

    Scheduled for IR:<Future at 0x7f51f8a2c438 state=pending>

    Scheduled for JP:<Future at 0x7f51f8a2c518 state=pending>

    Scheduled for MX:<Future at 0x7f51f8a2c5f8 state=pending>

    Scheduled for NG:<Future at 0x7f51f8a2c6d8 state=pending>

    Scheduled for PH:<Future at 0x7f51f8a2c7b8 state=pending>

    Scheduled for PK:<Future at 0x7f51f8a2c898 state=pending>

    Scheduled for TR:<Future at 0x7f51f8a2c978 state=pending>

    Scheduled for US:<Future at 0x7f51f8a2ca58 state=pending>

    Scheduled for VN:<Future at 0x7f51f8a2cb38 state=pending>

    BD

    <Future at 0x7f51f9c7e908 state=finished returned str> result:'BD'

    BR

    <Future at 0x7f51f9c7ef60 state=finished returned str> result:'BR'

    CD

    <Future at 0x7f51f8a20518 state=finished returned str> result:'CD'

    CN

    <Future at 0x7f51f8a20ef0 state=finished returned str> result:'CN'

    DE

    <Future at 0x7f51f8a20f60 state=finished returned str> result:'DE'

    EG

    <Future at 0x7f51f8a2c048 state=finished returned str> result:'EG'

    FR

    <Future at 0x7f51f8a2c198 state=finished returned str> result:'FR'

    ID

    ET

    <Future at 0x7f51f8a2c0f0 state=finished returned str> result:'ET'

    <Future at 0x7f51f8a2c278 state=finished returned str> result:'ID'

    IN

    <Future at 0x7f51f8a2c358 state=finished returned str> result:'IN'

    JP

    <Future at 0x7f51f8a2c518 state=finished returned str> result:'JP'

    IR

    <Future at 0x7f51f8a2c438 state=finished returned str> result:'IR'

    NG

    <Future at 0x7f51f8a2c6d8 state=finished returned str> result:'NG'

    MX

    <Future at 0x7f51f8a2c5f8 state=finished returned str> result:'MX'

    PK

    <Future at 0x7f51f8a2c898 state=finished returned str> result:'PK'

    PH

    <Future at 0x7f51f8a2c7b8 state=finished returned str> result:'PH'

    TR

    <Future at 0x7f51f8a2c978 state=finished returned str> result:'TR'

    US

    <Future at 0x7f51f8a2ca58 state=finished returned str> result:'US'

    VN

    <Future at 0x7f51f8a2cb38 state=finished returned str> result:'VN'

    19 flags downloaded in 6.72s。如果我们将max_workers设置为更大的值,比如设置为10, 得到的运行时间将会更快。通过运行的结果最快的时候达到1.9秒。

    我们都知道python有全局解释器锁GIL,一次只允许使用一个线程执行python字节码。因此一个Python进程通过不能同时使用多个CPU核。关于GIL的解释可以参考http://cenalulu.github.io/python/gil-in-python/这篇帖子。

    那么如果我们有多个CPU(os.cpu_count可以查看有多少个CPU),我们该怎么利用呢。这里就要用到futures.ProcessPoolExecutorProcessPoolExecutor将工作分配给多个python进程处理。因此如果需要做CPU密集型处理,使用这个模块能够绕开GIL,利用所有可用的CPU核。我们将代码修改为ProcessPoolExecutor来执行看下结果

    19 flags downloaded in 4.92s。这个比设置

    futures.ThreadPoolExecutor(max_workers=10)的时候还要慢一些。这是由于我的电脑只有4CPU核,因此限制只能4个并发下载。但是线程版本使用的是10个线程。在用线程运行的时候,所有阻塞型的I/O函数都会释放GIL,允许其他线程运行。time.sleep()也会释放GIL,因此尽管有GILpython的线程还是能发挥作用。

    在一章中我们将介绍asynchio包处理并发

  • 相关阅读:
    IDEA 实用功能Auto Import:自动优化导包(自动删除、导入包)
    idea 设置主题
    MySql where 后面使用函数导致索引失效问题
    IDEA报错,注解标红,提示Cannot resolve symbol xxx
    分批更新list
    java.lang.ArithmeticException: Rounding necessary
    Java selenium通过JS直接进行赋值给日期框
    postman接口测试之获取响应数据
    Jenkins集成allure测试报告
    Jenkins配置邮件通知
  • 原文地址:https://www.cnblogs.com/zhanghongfeng/p/8570630.html
Copyright © 2011-2022 走看看