zoukankan      html  css  js  c++  java
  • python并发与futures模块

    非并发程序(用于对比)

    从网上下载20个国家的国旗图像:

    import os
    import time
    import sys
    
    import requests  # 导入requests库
    
    POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
                'MX PH VN ET EG DE IR TR CD FR').split()  #列出国家代码
    
    BASE_URL = 'http://flupy.org/data/flags'  #下载地址
    
    DEST_DIR = 'downloads/'  #存储路径
    
    
    def save_flag(img, filename):  #保存字节序列
        path = os.path.join(DEST_DIR, filename)
        with open(path, 'wb') as fp:
            fp.write(img)
    
    
    def get_flag(cc):  #指定国家代码,构建url,下载图像
        url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
        resp = requests.get(url)
        return resp.content
    
    
    def show(text): 
        print(text, end=' ')
        sys.stdout.flush()  #刷新缓冲
    
    
    def download_many(cc_list):  #输出下载顺序,返回数量
        for cc in sorted(cc_list): 
            image = get_flag(cc)
            show(cc)
            save_flag(image, cc.lower() + '.gif')
    
        return len(cc_list)
    
    
    def main():  #记录耗时
        t0 = time.time()
        count = download_many(POP20_CC)
        elapsed = time.time() - t0
        msg = '
    {} flags downloaded in {:.2f}s'
        print(msg.format(count, elapsed))
    
    
    if __name__ == '__main__':
        main()

    结果:

    BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
    20 flags downloaded in 32.57s

    Future类

    标准库中有两个名为Future的类:concurrent.futures.Future和asyncio.Future。这两个类作用相同:他们的实例都表示可能已经完成或者尚未完成的延迟计算。

    一般来说,使用Executor.submit()将Future类实例化,参数为一个可调用对象,然后会为传入的可调用对象排期,并返回一个future

    future表示终将发生的事情,而确定某件事会发生的唯一方式是执行时间已经排定。因此,只有把某件事交给concurrent.futures.Excutor子类处理,才会创建Future实例。

    客户端代码不应该改变future的状态,因为无法控制计算何时结束。

    Future类方法

    cancel():尝试去取消调用。如果调用当前正在执行,不能被取消。这个方法将返回False,否则调用将会被取消,方法将返回True

    cancelled():如果调用被成功取消返回True

    running():如果当前正在被执行不能被取消返回True

    done():如果调用被成功取消或者完成running返回True

    result(Timeout = None):拿到调用返回的结果。如果没有执行完毕就会去等待         //asyncio模块的result方法不支持设定时间

    exception(timeout=None):捕获程序执行过程中的异常

    add_done_callback(fn):将fn绑定到future对象上。当future对象被取消或完成运行时,fn函数将会被调用

    Executor类

    Executor是一个抽象类,它提供了异步执行调用的方法。它不能直接使用,但可以通过它的两个子类ThreadPoolExecutor或者ProcessPoolExecutor进行调用。

    Executor.submit(fn, *args, **kwargs):函数fn(*args **kwargs)返回一个Future对象代表调用的执行。

    Executor.map(func, *iterables, timeout=None, chunksize=1) :类似于map,返回一个迭代器,迭代器的__next__方法调用各个future的result方法,得到各个future的结果。

    shutdown(wait=True):给executor发信号,使其释放资源,当futures完成执行时。已经shutdown再调用submit()或map()会抛出RuntimeError。使用with语句,就可以避免必须调用本函数。

    ThreadPoolExecutor类

    ThreadPoolExecutor类是Executor的子类,它实现的接口能在不同的线程中执行可调用对象,在内部维护一个工作线程池。

    多线程并发下载国旗程序:

    import os
    import time
    import sys
    from concurrent import futures  #导入模块
    
    import requests
    
    POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
                'MX PH VN ET EG DE IR TR CD FR').split()
    
    BASE_URL = 'http://flupy.org/data/flags'
    
    DEST_DIR = 'downloads/'
    
    MAX_WORKERS = 20  #设定最大线程个数
    
    def save_flag(img, filename):
        path = os.path.join(DEST_DIR, filename)
        with open(path, 'wb') as fp:
            fp.write(img)
    
    
    def get_flag(cc):
        url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
        resp = requests.get(url)
        return resp.content
    
    
    def show(text):
        print(text, end=' ')
        sys.stdout.flush()
    
    
    def download_one(cc):  #下载一个图像的函数
        image = get_flag(cc)
        show(cc)
        save_flag(image, cc.lower() + '.gif')
        return cc
    
    
    def download_many(cc_list):
        workers = min(MAX_WORKERS, len(cc_list))  #设置线程数
        with futures.ThreadPoolExecutor(workers) as executor:  #上下文管理器,.__exit__方法会调用.shutdown(wait=True),它会在所有线程执行完毕前阻塞线程
            res = executor.map(download_one, sorted(cc_list))  #类似map,见说明
    
        return len(list(res))  
    
    
    def main():  #获取时间
        t0 = time.time()
        count = download_many(POP20_CC)
        elapsed = time.time() - t0
        msg = '
    {} flags downloaded in {:.2f}s'
        print(msg.format(count, elapsed))
    
    
    if __name__ == '__main__':
        main()

    结果:

    IN VN FR ID RU DE EG NG BD MX ET US CD CN BR PH IR JP TR PK 
    20 flags downloaded in 3.80s

    使用多线程使下载时间从32s减少到3.8s。

    ProcessPoolExecutor类

    ProcessPoolExecutor类是Executor的子类,它实现的接口能在不同的进程中执行可调用对象,在内部维护一个工作进程池。

    ThreadPoolExecutor.__init__方法需要指定max_workers参数来指定线程数量。在ProcessPoolExecutor类中,这个参数是可选的,大多数情况下默认使用os.cpu_count()函数返回的cpu数量。

    将download_many变为如下:

    def download_many(cc_list):
        with futures.ProcessPoolExecutor() as executor:  
            res = executor.map(download_one, sorted(cc_list))  
    
        return len(list(res)) 

    结果:

    BD BR CD CN DE FR EG ID IN IR ET JP NG MX PH RU PK TR VN US 
    20 flags downloaded in 8.84s

    我的计算机为4核心,它下载速度大致提升为单进程的4倍。(进程间切换需要时间)

    as_completed函数

    concurrent.futures.as_completed(fs, timeout=None)函数是futures模块独立的函数,它:返回一个迭代器,包含fs给出的future对象。任何future对象在该函数调用之前产生,如果调用__next__函数之后指定秒数之前结果不可用则引发超时异常。

    将executor.map方法替换为executor.submit方法+futures.as_completed函数:

    submit(创建并排定future)+as_completed(获取future的结果)= map                    //map弊端,如果一个调用生成结果比较耗时,则代码会阻塞;而两个函数结合即出现某个结果立即获取。

    def download_many(cc_list):
        cc_list = cc_list[:5]  
        with futures.ThreadPoolExecutor(max_workers=3) as executor:  #硬编码3线程
            to_do = []
            for cc in sorted(cc_list):  
                future = executor.submit(download_one, cc)  #使用submit排定可调用对象的执行时间
                to_do.append(future)  #存储各个future,以便后面传递给as_completed
                msg = 'Scheduled for {}: {}'
                print(msg.format(cc, future))  #显示国家代码和对应的future
    
            results = []
            for future in futures.as_completed(to_do):  #future运行结束后产出future
                res = future.result()  #获取future结果
                msg = '{} result: {!r}'
                print(msg.format(future, res)) #显示结果
                results.append(res)
    
        return len(results)

    wait函数

    concurrent.futures.wait(fstimeout=Nonereturn_when=ALL_COMPLETED)       //返回两个具名元组集合,第一个集合叫done表示已完成或取消的future,一个叫not_done与之相反。第三个可选参数是指定函数返回时间,FIRST_COMPLETED表示任何future完成或取消;FIRST_EXCEPTION抛出异常时;ALL_COMPLETED为默认,所有future完成或取消。

    异常类

    CancelledError ,TimeoutError , BrokenExecutor  ,BrokenThreadPool ,BrokenProcessPool

    以上来自《流畅的python》

  • 相关阅读:
    一文看懂:史上最通俗的视频编码技术详解
    浅谈常见的七种加密算法及实现
    自毁程序
    windows下, 宽字符和窄字符的打印输出
    修改Django自带auth模块的表名
    苹果IOS微信网页cookie缓存清理
    django_rest_framework ModelViewSet不支持PUT方法,PUT修改后不生效
    nginx的access日志打印十六进制x16x03x01x02x00x01
    解决/usr/bin/ld: cannot find -lmariadb报错
    使用antd的Upload组件和axios上传文件
  • 原文地址:https://www.cnblogs.com/lht-record/p/10356986.html
Copyright © 2011-2022 走看看