zoukankan      html  css  js  c++  java
  • concurrent.futures进行并发编程

    Python中进行并发编程一般使用threading和multiprocessing模块,不过大部分的并发编程任务都是派生一系列线程,从队列中收集资源,然后用队列收集结果。在这些任务中,往往需要生成线程池,concurrent.futures模块对threading和multiprocessing模块进行了进一步的包装,可以很方便地实现池的功能。

    下载

    python3中concurrent.futures是标准库,在python2中还需要自己安装futures:

    pip install futures
    

    Executor与Future

    concurrent.futures供了ThreadPoolExecutor和ProcessPoolExecutor两个类,都继承自Executor,分别被用来创建线程池和进程池,接受max_workers参数,代表创建的线程数或者进程数。ProcessPoolExecutor的max_workers参数可以为空,程序会自动创建基于电脑cpu数目的进程数。

    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    import requests
    
    def load_url(url):
        return requests.get(url)
    
    url = 'http://httpbin.org'
    executor = ThreadPoolExecutor(max_workers=1)
    future = executor.submit(load_url, url)
    

    Executor中定义了submit()方法,这个方法的作用是提交一个可执行的回调task,并返回一个future实例。future能够使用done()方法判断该任务是否结束,done()方法是不阻塞的,使用result()方法可以获取任务的返回值,这个方法是阻塞的。

    print future.done()
    print future.result().status_code
    

    Future类似于js中的Promise,可以添加回调函数:

    future.add_done_callback(fn)
    

    回调函数fn在future取消或者完成后运行,参数是future本身。

    submit()方法只能进行单个任务,用并发多个任务,需要使用map与as_completed。

    map

    URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
    
    def load_url(url):
        return requests.get(url)
    
    with ThreadPoolExecutor(max_workers=3) as executor:
        for url, data in zip(URLS, executor.map(load_url, URLS)):
            print('%r page status_code %s' % (url, data.status_code))
    

     结果:

    'http://httpbin.org' page status_code 200
    'http://example.com/' page status_code 200
    'https://api.github.com/' page status_code 200
    

     map方法接收两个参数,第一个为要执行的函数,第二个为一个序列,会对序列中的每个元素都执行这个函数,返回值为执行结果组成的生成器。

       由上面可以看出返回结果与序列结果的顺序是一致的

    as_completed

      as_completed()方法返回一个Future组成的生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会yield这个任务,直到所有的任务结束。

    def load_url(url):
        return url, requests.get(url).status_code
    
    with ThreadPoolExecutor(max_workers=3) as executor:
        tasks = [executor.submit(load_url, url) for url in URLS]
        for future in as_completed(tasks):
            print future.result()
    

     结果:

    ('http://example.com/', 200)
    ('http://httpbin.org', 200)
    ('https://api.github.com/', 200)
    

     可以看出,结果与序列顺序不一致,先完成的任务会先通知主线程。

    wait

       wait方法可以让主线程阻塞,直到满足设定的要求。有三种条件ALL_COMPLETED, FIRST_COMPLETED,FIRST_EXCEPTION。

    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
    from concurrent.futures import as_completed
    import requests
    
    URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
    
    def load_url(url):
        requests.get(url)
        print url
    
    with ThreadPoolExecutor(max_workers=3) as executor:
        tasks = [executor.submit(load_url, url) for url in URLS]
        wait(tasks, return_when=ALL_COMPLETED)
        print 'all_cone'
    

     返回:

    http://example.com/
    http://httpbin.org
    https://api.github.com/
    all_cone
    

     可以看出阻塞到任务全部完成。

    ProcessPoolExecutor

    使用ProcessPoolExecutor与ThreadPoolExecutor方法基本一致,注意文档中有一句:

    The __main__ module must be importable by worker subprocesses. This means that ProcessPoolExecutor will not work in the interactive interpreter.

    需要__main__模块。

    def main():
        with ProcessPoolExecutor() as executor:
            tasks = [executor.submit(load_url, url) for url in URLS]
            for f in as_completed(tasks):
                ret = f.done()
                if ret:
                    print f.result().status_code
    
    if __name__ == '__main__':
        main()
    

      

  • 相关阅读:
    Zookeeper 基础知识【1】
    Spark 基础复习【1】
    ZooKeeper 入门 一致性
    Hive 视图 索引
    Yarn调度 历史与基础
    mysql 优化【1】
    TCP IP知识梳理
    Java 基础 锁
    Spark 累加器使用
    RATE-MAX----beta答辩博客
  • 原文地址:https://www.cnblogs.com/linxiyue/p/10224004.html
Copyright © 2011-2022 走看看