zoukankan      html  css  js  c++  java
  • 理解Python并发编程-PoolExecutor篇

    原文链接

    之前我们使用多线程(threading)和多进程(multiprocessing)完成常规的需求,在启动的时候start、jon等步骤不能省,复杂的需要还要用1-2个队列。随着需求越来越复杂,如果没有良好的设计和抽象这部分的功能层次,代码量越多调试的难度就越大。有没有什么好的方法把这些步骤抽象一下呢,让我们不关注这些细节,轻装上阵呢?

    答案是:有的。

    从Python3.2开始一个叫做concurrent.futures被纳入了标准库,而在Python2它属于第三方的futures库,需要手动安装:

    pip install futures
    
    ```                                             
    
    
    这个模块中有2个类:ThreadPoolExecutor和ProcessPoolExecutor,也就是对threading和multiprocessing的进行了高级别的抽象,
    
    暴露出统一的接口,帮助开发者非常方便的实现异步调用:
    
    
    ```python
    
    import time
    
    from concurrent.futures import ProcessPoolExecutor, as_completed
    
    
    NUMBERS = range(25, 38)
    
    
    
    def fib(n):
    
        if n<= 2:
    
            return 1
    
        return fib(n-1) + fib(n-2)
    
    
    
    start = time.time()
    
    
    with ProcessPoolExecutor(max_workers=3) as executor:
    
        for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)):
    
            print 'fib({}) = {}'.format(num, result)
    
    
    print 'COST: {}'.format(time.time() - start)



    感受下是不是很轻便呢?看一下花费的时间:

     python fib_executor.py
    
    fib(25) = 75025
    
    fib(26) = 121393
    
    fib(27) = 196418
    
    fib(28) = 317811
    
    fib(29) = 514229
    
    fib(30) = 832040
    
    fib(31) = 1346269
    
    fib(32) = 2178309
    
    fib(33) = 3524578
    
    fib(34) = 5702887
    
    fib(35) = 9227465
    
    fib(36) = 14930352
    
    fib(37) = 24157817
    
    COST: 10.8920350075



    除了用map,另外一个常用的方法是submit。如果你要提交的任务的函数是一样的,就可以简化成map。但是假如提交的任务函数是不一样的,或者执行的过程之可能出现异常(使用map执行过程中发现问题会直接抛出错误)就要用到submit:

    from concurrent.futures import ThreadPoolExecutor, as_completed
    
    
    NUMBERS = range(30, 35)
    
    
    
    def fib(n):
    
        if n == 34:
    
            raise Exception("Don't do this")
    
        if n<= 2:
    
            return 1
    
        return fib(n-1) + fib(n-2)
    
    
    
    with ThreadPoolExecutor(max_workers=3) as executor:
    
        future_to_num = {executor.submit(fib, num): num for num in NUMBERS}
    
        for future in as_completed(future_to_num):
    
            num = future_to_num[future]
    
            try:
    
                result = future.result()
    
            except Exception as e:
    
                print 'raise an exception: {}'.format(e)
    
            else:
    
                print 'fib({}) = {}'.format(num, result)
    
    
    
    with ThreadPoolExecutor(max_workers=3) as executor:
    
        for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)):
    
            print 'fib({}) = {}'.format(num, result)



    执一下:

    python fib_executor_with_raise.py
    
    fib(30) = 832040
    
    fib(31) = 1346269
    
    raise an exception: Don't do this
    
    fib(32) = 2178309
    
    fib(33) = 3524578
    
    Traceback (most recent call last):
    
      File "fib_executor_with_raise.py", line 28, in <module>
    
        for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)):
    
      File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 580, in map
    
        yield future.result()
    
      File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 400, in result
    
        return self.__get_result()
    
      File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 359, in __get_result
    
        reraise(self._exception, self._traceback)
    
      File "/Library/Python/2.7/site-packages/concurrent/futures/_compat.py", line 107, in reraise
    
        exec('raise exc_type, exc_value, traceback', {}, locals_)
    
      File "/Library/Python/2.7/site-packages/concurrent/futures/thread.py", line 61, in run
    
        result = self.fn(*self.args, **self.kwargs)
    
      File "fib_executor_with_raise.py", line 9, in fib
    
        raise Exception("Don't do this")
    
    Exception: Don't do this



    可以看到,第一次捕捉到了异常,但是第二次执行的时候错误直接抛出来了。

    上面说到的map,有些同学马上会说,这不是进程(线程)池的效果吗?看起来确实是的:

    import time
    
    from multiprocessing.pool import Pool
    
    
    NUMBERS = range(25, 38)
    
    
    
    def fib(n):
    
        if n<= 2:
    
            return 1
    
        return fib(n-1) + fib(n-2)
    
    
    
    start = time.time()
    
    
    pool = Pool(3)
    
    results = pool.map(fib, NUMBERS)
    
    for num, result in zip(NUMBERS, pool.map(fib, NUMBERS)):
    
        print 'fib({}) = {}'.format(num, result)
    
    
    print 'COST: {}'.format(time.time() - start)



    好像代码量更小哟。好吧,看一下花费的时间:

    
    
     

    python fib_pool.py
    
    fib(25) = 75025
    
    fib(26) = 121393
    
    fib(27) = 196418
    
    fib(28) = 317811
    
    fib(29) = 514229
    
    fib(30) = 832040
    
    fib(31) = 1346269
    
    fib(32) = 2178309
    
    fib(33) = 3524578
    
    fib(34) = 5702887
    
    fib(35) = 9227465
    
    fib(36) = 14930352
    
    fib(37) = 24157817
    
    COST: 17.1342718601
    
    
    
    
     

    WhatTF竟然花费了1.7倍的时间。为什么?

    BTW,有兴趣的同学可以对比下ThreadPool和ThreadPoolExecutor,由于GIL的缘故,对比的差距一定会更多。

    原理

    我们就拿ProcessPoolExecutor介绍下它的原理,引用官方代码注释中的流程图:

    |======================= In-process =====================|== Out-of-process ==|
    
    
    +----------+     +----------+       +--------+     +-----------+    +---------+
    
    |          |  => | Work Ids |    => |        |  => | Call Q    | => |         |
    
    |          |     +----------+       |        |     +-----------+    |         |
    
    |          |     | ...      |       |        |     | ...       |    |         |
    
    |          |     | 6        |       |        |     | 5, call() |    |         |
    
    |          |     | 7        |       |        |     | ...       |    |         |
    
    | Process  |     | ...      |       | Local  |     +-----------+    | Process |
    
    |  Pool    |     +----------+       | Worker |                      |  #1..n  |
    
    | Executor |                        | Thread |                      |         |
    
    |          |     +----------- +     |        |     +-----------+    |         |
    
    |          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
    
    |          |     +------------+     |        |     +-----------+    |         |
    
    |          |     | 6: call()  |     |        |     | ...       |    |         |
    
    |          |     |    future  |     |        |     | 4, result |    |         |
    
    |          |     | ...        |     |        |     | 3, except |    |         |
    
    +----------+     +------------+     +--------+     +-----------+    +---------+



    我们结合源码和上面的数据流分析一下:

    1. executor.map会创建多个_WorkItem对象,每个对象都传入了新创建的一个Future对象。
    2. 把每个_WorkItem对象然后放进一个叫做「Work Items」的dict中,键是不同的「Work Ids」。
    3. 创建一个管理「Work Ids」队列的线程「Local worker thread」,它能做2件事:
      1. 从「Work Ids」队列中获取Work Id, 通过「Work Items」找到对应的_WorkItem。如果这个Item被取消了,就从「Work Items」里面把它删掉,否则重新打包成一个_CallItem放入「Call Q」这个队列。executor的那些进程会从队列中取_CallItem执行,并把结果封装成_ResultItems放入「Result Q」队列中。
      2. 从「Result Q」队列中获取_ResultItems,然后从「Work Items」更新对应的Future对象并删掉入口。

    看起来就是一个「生产者/消费者」模型罢了,错了。我们要注意,整个过程并不是多个进程与任务+结果-2个队列直接通信的,而是通过一个中间的「Local worker thread」,它就是让效率提升的重要原因之一!!!

    设想,当某一段程序提交了一个请求,期望得到一个答复。但服务程序对这个请求可能很慢,在传统的单线程环境下,调用函数是同步的,也就是说它必须等到服务程序返回结果后,才能进行其他处理。而在Future模式下,调用方式改为异步,而原先等待返回的时间段,在主调用函数中,则可用于处理其他事物。

    Future

    Future是常见的一种并发设计模式,在多个其他语言中都可以见到这种解决方案。

    一个Future对象代表了一些尚未就绪(完成)的结果,在「将来」的某个时间就绪了之后就可以获取到这个结果。比如上面的例子,我们期望并发的执行一些参数不同的fib函数,获取全部的结果。传统模式就是在等待queue.get返回结果,这个是同步模式,而在Future模式下,调用方式改为异步,而原先等待返回的时间段,由于「Local worker thread」的存在,这个时候可以完成其他工作

    在tornado中也有对应的实现。2013年的时候,我曾经写过一篇博客使用tornado让你的请求异步非阻塞,最后也提到了用concurrent.futures实现异步非阻塞的完成耗时任务。

    原文链接

  • 相关阅读:
    OSX安装nginx和rtmp模块(rtmp直播服务器搭建)
    用runtime来重写Coder和deCode方法 归档解档的时候使用
    Homebrew安装卸载
    Cannot create a new pixel buffer adaptor with an asset writer input that has already started writing'
    OSX下面用ffmpeg抓取桌面以及摄像头推流进行直播
    让nginx支持HLS
    iOS 字典转json字符串
    iOS 七牛多张图片上传
    iOS9UICollectionView自定义布局modifying attributes returned by UICollectionViewFlowLayout without copying them
    Xcode6 iOS7模拟器和Xcode7 iOS8模拟器离线下载
  • 原文地址:https://www.cnblogs.com/iyulang/p/7053975.html
Copyright © 2011-2022 走看看