zoukankan      html  css  js  c++  java
  • python并发——线程池与进程池(转)

    Python中进行并发编程一般使用threading和multiprocessing模块,不过大部分的并发编程任务都是派生一系列线程,从队列中收集资源,然后用队列收集结果。在这些任务中,往往需要生成线程池,concurrent.futures模块对threading和multiprocessing模块进行了进一步的包装,可以很方便地实现池的功能。

    Executor与Future

    concurrent.futures供了ThreadPoolExecutor和ProcessPoolExecutor两个类,都继承自Executor,分别被用来创建线程池和进程池,接受max_workers参数,代表创建的线程数或者进程数。ProcessPoolExecutor的max_workers参数可以为空,程序会自动创建基于电脑cpu数目的进程数。

    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    import requests
     
    def load_url(url):
        return requests.get(url)
     
    url = 'http://httpbin.org'
    executor = ThreadPoolExecutor(max_workers=1)
    future = executor.submit(load_url, url)

    Executor中定义了submit()方法,这个方法的作用是提交一个可执行的回调task,并返回一个future实例。future能够使用done()方法判断该任务是否结束,done()方法是不阻塞的,使用result()方法可以获取任务的返回值,这个方法是阻塞的。

    Future类似于js中的Promise,可以添加回调函数:

    回调函数fn在future取消或者完成后运行,参数是future本身。

    submit()方法只能进行单个任务,用并发多个任务,需要使用map与as_completed。

    map

    URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
     
    def load_url(url):
        return requests.get(url)
     
    with ThreadPoolExecutor(max_workers=3) as executor:
        for url, data in zip(URLS, executor.map(load_url, URLS)):
            print('%r page status_code %s' % (url, data.status_code))

    map方法接收两个参数,第一个为要执行的函数,第二个为一个序列,会对序列中的每个元素都执行这个函数,返回值为执行结果组成的生成器。

       由上面可以看出返回结果与序列结果的顺序是一致的

    as_completed

      as_completed()方法返回一个Future组成的生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会yield这个任务,直到所有的任务结束。

    def load_url(url):
        return url, requests.get(url).status_code
     
    with ThreadPoolExecutor(max_workers=3) as executor:
        tasks = [executor.submit(load_url, url) for url in URLS]
        for future in as_completed(tasks):
            print future.result()

    可以看出,结果与序列顺序不一致,先完成的任务会先通知主线程。

    wait

       wait方法可以让主线程阻塞,直到满足设定的要求。有三种条件ALL_COMPLETED, FIRST_COMPLETED,FIRST_EXCEPTION。

    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
    from concurrent.futures import as_completed
    import requests
     
    URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
     
    def load_url(url):
        requests.get(url)
        print url
     
    with ThreadPoolExecutor(max_workers=3) as executor:
        tasks = [executor.submit(load_url, url) for url in URLS]
        wait(tasks, return_when=ALL_COMPLETED)
        print 'all_cone'

    ProcessPoolExecutor

    使用ProcessPoolExecutor与ThreadPoolExecutor方法基本一致,注意文档中有一句:

    The __main__ module must be importable by worker subprocesses. This means that ProcessPoolExecutor will not work in the interactive interpreter.

    需要__main__模块。

    def main():
        with ProcessPoolExecutor() as executor:
            tasks = [executor.submit(load_url, url) for url in URLS]
            for f in as_completed(tasks):
                ret = f.done()
                if ret:
                    print f.result().status_code
     
    if __name__ == '__main__':
        main()
  • 相关阅读:
    Verilog非阻塞赋值的仿真/综合问题 (Nonblocking Assignments in Verilog Synthesis)上
    异步FIFO结构及FPGA设计 跨时钟域设计
    FPGA管脚分配需要考虑的因素
    An Introduction to Delta Sigma Converters (DeltaSigma转换器 上篇)
    An Introduction to Delta Sigma Converters (DeltaSigma转换器 下篇)
    中国通信简史 (下)
    谈谈德国大学的电子专业
    中国通信简史 (上)
    Verilog学习笔记
    Verilog非阻塞赋值的仿真/综合问题(Nonblocking Assignments in Verilog Synthesis) 下
  • 原文地址:https://www.cnblogs.com/wangbin2188/p/12668629.html
Copyright © 2011-2022 走看看