zoukankan      html  css  js  c++  java
  • 理解Python并发编程-PoolExecutor篇

    摘要: 之前我们使用多线程(threading)和多进程(multiprocessing)完成常规的需求,在启动的时候start、jon等步骤不能省,复杂的需要还要用1-2个队列。随着需求越来越复杂,如果没有良好的设计和抽象这部分的功能层次,代码量越多调试的难度就越大。

    之前我们使用多线程(threading)和多进程(multiprocessing)完成常规的需求,在启动的时候start、jon等步骤不能省,复杂的需要还要用1-2个队列。随着需求越来越复杂,如果没有良好的设计和抽象这部分的功能层次,代码量越多调试的难度就越大。有没有什么好的方法把这些步骤抽象一下呢,让我们不关注这些细节,轻装上阵呢?

    答案是:有的。

    从Python3.2开始一个叫做concurrent.futures被纳入了标准库,而在Python2它属于第三方的futures库,需要手动安装:


    pip install futures
    
    ```                                             
    
    
    这个模块中有2个类:ThreadPoolExecutor和ProcessPoolExecutor,也就是对threading和multiprocessing的进行了高级别的抽象,
    
    暴露出统一的接口,帮助开发者非常方便的实现异步调用:
    
    
    ```python
    
    import time
    
    from concurrent.futures import ProcessPoolExecutor, as_completed
    
    
    NUMBERS = range(25, 38)
    
    
    
    def fib(n):
    
        if n<= 2:
    
            return 1
    
        return fib(n-1) + fib(n-2)
    
    
    
    start = time.time()
    
    
    with ProcessPoolExecutor(max_workers=3) as executor:
    
        for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)):
    
            print 'fib({}) = {}'.format(num, result)
    
    
    print 'COST: {}'.format(time.time() - start)


    感受下是不是很轻便呢?看一下花费的时间:


     python fib_executor.py
    
    fib(25) = 75025
    
    fib(26) = 121393
    
    fib(27) = 196418
    
    fib(28) = 317811
    
    fib(29) = 514229
    
    fib(30) = 832040
    
    fib(31) = 1346269
    
    fib(32) = 2178309
    
    fib(33) = 3524578
    
    fib(34) = 5702887
    
    fib(35) = 9227465
    
    fib(36) = 14930352
    
    fib(37) = 24157817
    
    COST: 10.8920350075


    除了用map,另外一个常用的方法是submit。如果你要提交的任务的函数是一样的,就可以简化成map。但是假如提交的任务函数是不一样的,或者执行的过程之可能出现异常(使用map执行过程中发现问题会直接抛出错误)就要用到submit:


    from concurrent.futures import ThreadPoolExecutor, as_completed
    
    
    NUMBERS = range(30, 35)
    
    
    
    def fib(n):
    
        if n == 34:
    
            raise Exception("Don't do this")
    
        if n<= 2:
    
            return 1
    
        return fib(n-1) + fib(n-2)
    
    
    
    with ThreadPoolExecutor(max_workers=3) as executor:
    
        future_to_num = {executor.submit(fib, num): num for num in NUMBERS}
    
        for future in as_completed(future_to_num):
    
            num = future_to_num[future]
    
            try:
    
                result = future.result()
    
            except Exception as e:
    
                print 'raise an exception: {}'.format(e)
    
            else:
    
                print 'fib({}) = {}'.format(num, result)
    
    
    
    with ThreadPoolExecutor(max_workers=3) as executor:
    
        for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)):
    
            print 'fib({}) = {}'.format(num, result)


    执一下:


    python fib_executor_with_raise.py
    
    fib(30) = 832040
    
    fib(31) = 1346269
    
    raise an exception: Don't do this
    
    fib(32) = 2178309
    
    fib(33) = 3524578
    
    Traceback (most recent call last):
    
      File "fib_executor_with_raise.py", line 28, in <module>
    
        for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)):
    
      File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 580, in map
    
        yield future.result()
    
      File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 400, in result
    
        return self.__get_result()
    
      File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 359, in __get_result
    
        reraise(self._exception, self._traceback)
    
      File "/Library/Python/2.7/site-packages/concurrent/futures/_compat.py", line 107, in reraise
    
        exec('raise exc_type, exc_value, traceback', {}, locals_)
    
      File "/Library/Python/2.7/site-packages/concurrent/futures/thread.py", line 61, in run
    
        result = self.fn(*self.args, **self.kwargs)
    
      File "fib_executor_with_raise.py", line 9, in fib
    
        raise Exception("Don't do this")
    
    Exception: Don't do this


    可以看到,第一次捕捉到了异常,但是第二次执行的时候错误直接抛出来了。

    上面说到的map,有些同学马上会说,这不是进程(线程)池的效果吗?看起来确实是的:


    import time
    
    from multiprocessing.pool import Pool
    
    
    NUMBERS = range(25, 38)
    
    
    
    def fib(n):
    
        if n<= 2:
    
            return 1
    
        return fib(n-1) + fib(n-2)
    
    
    
    start = time.time()
    
    
    pool = Pool(3)
    
    results = pool.map(fib, NUMBERS)
    
    for num, result in zip(NUMBERS, pool.map(fib, NUMBERS)):
    
        print 'fib({}) = {}'.format(num, result)
    
    
    print 'COST: {}'.format(time.time() - start)


    好像代码量更小哟。好吧,看一下花费的时间:

    
    
     

    python fib_pool.py
    
    fib(25) = 75025
    
    fib(26) = 121393
    
    fib(27) = 196418
    
    fib(28) = 317811
    
    fib(29) = 514229
    
    fib(30) = 832040
    
    fib(31) = 1346269
    
    fib(32) = 2178309
    
    fib(33) = 3524578
    
    fib(34) = 5702887
    
    fib(35) = 9227465
    
    fib(36) = 14930352
    
    fib(37) = 24157817
    
    COST: 17.1342718601
    
    
    
    
     

    WhatTF竟然花费了1.7倍的时间。为什么?

    BTW,有兴趣的同学可以对比下ThreadPool和ThreadPoolExecutor,由于GIL的缘故,对比的差距一定会更多。

    原理

    我们就拿ProcessPoolExecutor介绍下它的原理,引用官方代码注释中的流程图:


    |======================= In-process =====================|== Out-of-process ==|
    
    
    +----------+     +----------+       +--------+     +-----------+    +---------+
    
    |          |  => | Work Ids |    => |        |  => | Call Q    | => |         |
    
    |          |     +----------+       |        |     +-----------+    |         |
    
    |          |     | ...      |       |        |     | ...       |    |         |
    
    |          |     | 6        |       |        |     | 5, call() |    |         |
    
    |          |     | 7        |       |        |     | ...       |    |         |
    
    | Process  |     | ...      |       | Local  |     +-----------+    | Process |
    
    |  Pool    |     +----------+       | Worker |                      |  #1..n  |
    
    | Executor |                        | Thread |                      |         |
    
    |          |     +----------- +     |        |     +-----------+    |         |
    
    |          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
    
    |          |     +------------+     |        |     +-----------+    |         |
    
    |          |     | 6: call()  |     |        |     | ...       |    |         |
    
    |          |     |    future  |     |        |     | 4, result |    |         |
    
    |          |     | ...        |     |        |     | 3, except |    |         |
    
    +----------+     +------------+     +--------+     +-----------+    +---------+


    我们结合源码和上面的数据流分析一下:

    1. executor.map会创建多个_WorkItem对象,每个对象都传入了新创建的一个Future对象。
    2. 把每个_WorkItem对象然后放进一个叫做「Work Items」的dict中,键是不同的「Work Ids」。
    3. 创建一个管理「Work Ids」队列的线程「Local worker thread」,它能做2件事:
      1. 从「Work Ids」队列中获取Work Id, 通过「Work Items」找到对应的_WorkItem。如果这个Item被取消了,就从「Work Items」里面把它删掉,否则重新打包成一个_CallItem放入「Call Q」这个队列。executor的那些进程会从队列中取_CallItem执行,并把结果封装成_ResultItems放入「Result Q」队列中。
      2. 从「Result Q」队列中获取_ResultItems,然后从「Work Items」更新对应的Future对象并删掉入口。

    看起来就是一个「生产者/消费者」模型罢了,错了。我们要注意,整个过程并不是多个进程与任务+结果-2个队列直接通信的,而是通过一个中间的「Local worker thread」,它就是让效率提升的重要原因之一!!!

    设想,当某一段程序提交了一个请求,期望得到一个答复。但服务程序对这个请求可能很慢,在传统的单线程环境下,调用函数是同步的,也就是说它必须等到服务程序返回结果后,才能进行其他处理。而在Future模式下,调用方式改为异步,而原先等待返回的时间段,在主调用函数中,则可用于处理其他事物。

    Future

    Future是常见的一种并发设计模式,在多个其他语言中都可以见到这种解决方案。

    一个Future对象代表了一些尚未就绪(完成)的结果,在「将来」的某个时间就绪了之后就可以获取到这个结果。比如上面的例子,我们期望并发的执行一些参数不同的fib函数,获取全部的结果。传统模式就是在等待queue.get返回结果,这个是同步模式,而在Future模式下,调用方式改为异步,而原先等待返回的时间段,由于「Local worker thread」的存在,这个时候可以完成其他工作

    在tornado中也有对应的实现。2013年的时候,我曾经写过一篇博客使用tornado让你的请求异步非阻塞,最后也提到了用concurrent.futures实现异步非阻塞的完成耗时任务。

    用云栖社区APP,舒服~

  • 相关阅读:
    Linux下新磁盘挂载
    Centos6.5下Haproxy负载均衡搭建
    Linux下redis搭建与配置
    iptables-nat实现反向代理功能
    iptables防火墙规则的添加、删除、保存
    [Python]-tools
    [Python]-Game
    .[Linux]-部署Nagios监控
    [HTML]-web前端
    [Linux]-部署Zabbix监控
  • 原文地址:https://www.cnblogs.com/jzy996492849/p/7054079.html
Copyright © 2011-2022 走看看