zoukankan      html  css  js  c++  java
  • concurrent.futures

    python因为其全局解释器锁GIL而无法通过线程实现真正的平行计算。这个论断我们不展开,但是有个概念我们要说明,IO密集型 vs. 计算密集型。

    IO密集型:读取文件,读取网络套接字频繁。

    计算密集型:大量消耗CPU的数学与逻辑运算,也就是我们这里说的平行计算。

    而concurrent.futures模块,可以利用multiprocessing实现真正的平行计算。

    核心原理是:concurrent.futures会以子进程的形式,平行的运行多个python解释器,从而令python程序可以利用多核CPU来提升执行速度。由于子进程与主解释器相分离,所以他们的全局解释器锁也是相互独立的。每个子进程都能够完整的使用一个CPU内核。

     第一章 concurrent.futures性能阐述

    • 最大公约数

    这个函数是一个计算密集型的函数。

    复制代码
    # -*- coding:utf-8 -*-
    # 求最大公约数
    def gcd(pair):
    a, b = pair
    low = min(a, b)
    for i in range(low, 0, -1):
    if a % i == 0 and b % i == 0:
    return i
    

    numbers = [
    (
    1963309, 2265973), (1879675, 2493670), (2030677, 3814172),
    (
    1551645, 2229620), (1988912, 4736670), (2198964, 7876293)
    ]

    复制代码
    • 不使用多线程/多进程
    复制代码
    import time
    

    start = time.time()
    results
    = list(map(gcd, numbers))
    end
    = time.time()
    print 'Took %.3f seconds.' % (end - start)

    Took 2.507 seconds.

    复制代码

    消耗时间是:2.507。

    • 多线程ThreadPoolExecutor
    复制代码
    import time
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor
    

    start = time.time()
    pool
    = ThreadPoolExecutor(max_workers=2)
    results
    = list(pool.map(gcd, numbers))
    end
    = time.time()
    print 'Took %.3f seconds.' % (end - start)

    Took 2.840 seconds.

    复制代码

    消耗时间是:2.840。

    上面说过gcd是一个计算密集型函数,因为GIL的原因,多线程是无法提升效率的。同时,线程启动的时候,有一定的开销,与线程池进行通信,也会有开销,所以这个程序使用了多线程反而更慢了。

    • 多进程ProcessPoolExecutor
    复制代码
    import time
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor
    

    start = time.time()
    pool
    = ProcessPoolExecutor(max_workers=2)
    results
    = list(pool.map(gcd, numbers))
    end
    = time.time()
    print 'Took %.3f seconds.' % (end - start)

    Took 1.861 seconds.

    复制代码

    消耗时间:1.861。

    在两个CPU核心的机器上运行多进程程序,比其他两个版本都快。这是因为,ProcessPoolExecutor类会利用multiprocessing模块所提供的底层机制,完成下列操作:

    1)把numbers列表中的每一项输入数据都传给map。

    2)用pickle模块对数据进行序列化,将其变成二进制形式。

    3)通过本地套接字,将序列化之后的数据从煮解释器所在的进程,发送到子解释器所在的进程。

    4)在子进程中,用pickle对二进制数据进行反序列化,将其还原成python对象。

    5)引入包含gcd函数的python模块。

    6)各个子进程并行的对各自的输入数据进行计算。

    7)对运行的结果进行序列化操作,将其转变成字节。

    8)将这些字节通过socket复制到主进程之中。

    9)主进程对这些字节执行反序列化操作,将其还原成python对象。

    10)最后,把每个子进程所求出的计算结果合并到一份列表之中,并返回给调用者。

    multiprocessing开销比较大,原因就在于:主进程和子进程之间通信,必须进行序列化和反序列化的操作。

    第二章 concurrent.futures源码分析

    • Executor

    可以任务Executor是一个抽象类,提供了如下抽象方法submit,map(上面已经使用过),shutdown。值得一提的是Executor实现了__enter__和__exit__使得其对象可以使用with操作符。关于上下文管理和with操作符详细请参看这篇博客http://www.cnblogs.com/kangoroo/p/7627167.html

    ThreadPoolExecutor和ProcessPoolExecutor继承了Executor,分别被用来创建线程池和进程池的代码。

    复制代码
    class Executor(object):
    """This is an abstract base class for concrete asynchronous executors."""
    

    def submit(self, fn, *args, **kwargs):
    """Submits a callable to be executed with the given arguments.

    Schedules the callable to be executed as fn(*args, **kwargs) and returns
    a Future instance representing the execution of the callable.

    Returns:
    A Future representing the given call.
    """
    raise NotImplementedError()

    def map(self, fn, *iterables, **kwargs):
    """Returns a iterator equivalent to map(fn, iter).

    Args:
    fn: A callable that will take as many arguments as there are
    passed iterables.
    timeout: The maximum number of seconds to wait. If None, then there
    is no limit on the wait time.

    Returns:
    An iterator equivalent to: map(func, *iterables) but the calls may
    be evaluated out-of-order.

    Raises:
    TimeoutError: If the entire result iterator could not be generated
    before the given timeout.
    Exception: If fn(*args) raises for any values.
    """
    timeout
    = kwargs.get('timeout')
    if timeout is not None:
    end_time
    = timeout + time.time()

    fs = [self.submit(fn, args) for args in itertools.izip(iterables)]

    # Yield must be hidden in closure so that the futures are submitted
    # before the first iterator value is required.
    def result_iterator():
    try:
    for future in fs:
    if timeout is None:
    yield future.result()
    else:
    yield future.result(end_time - time.time())
    finally:
    for future in fs:
    future.cancel()
    return result_iterator()

    def shutdown(self, wait=True):
    """Clean-up the resources associated with the Executor.

    It is safe to call this method several times. Otherwise, no other
    methods can be called after this one.

    Args:
    wait: If True then shutdown will not return until all running
    futures have finished executing and the resources used by the
    executor have been reclaimed.
    """
    pass

    def enter(self):
    return self

    def exit(self, exc_type, exc_val, exc_tb):
    self.shutdown(wait
    =True)
    return False

    复制代码

    下面我们以线程ProcessPoolExecutor的方式说明其中的各个方法。

    • map
    map(self, fn, *iterables, **kwargs)

    map方法的实例我们上面已经实现过,值得注意的是,返回的results列表是有序的,顺序和*iterables迭代器的顺序一致。

    这里我们使用with操作符,使得当任务执行完成之后,自动执行shutdown函数,而无需编写相关释放代码。

    复制代码
    import time
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor
    

    start = time.time()
    with ProcessPoolExecutor(max_workers
    =2) as pool:
    results
    = list(pool.map(gcd, numbers))
    print 'results: %s' % results
    end
    = time.time()
    print 'Took %.3f seconds.' % (end - start)

    复制代码

    产出结果是:

    results: [1, 5, 1, 5, 2, 3]
    Took 1.617 seconds.
    • submit
    submit(self, fn, *args, **kwargs)

    submit方法用于提交一个可并行的方法,submit方法同时返回一个future实例。

    future对象标识这个线程/进程异步进行,并在未来的某个时间执行完成。future实例表示线程/进程状态的回调。

    复制代码
    import time
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor
    

    start = time.time()
    futures
    = list()
    with ProcessPoolExecutor(max_workers
    =2) as pool:
    for pair in numbers:
    future
    = pool.submit(gcd, pair)
    futures.append(future)
    print 'results: %s' % [future.result() for future in futures]
    end
    = time.time()
    print 'Took %.3f seconds.' % (end - start)

    复制代码

    产出结果是:

    results: [1, 5, 1, 5, 2, 3]
    Took 2.289 seconds.
    • future

    submit函数返回future对象,future提供了跟踪任务执行状态的方法。比如判断任务是否执行中future.running(),判断任务是否执行完成future.done()等等。

    as_completed方法传入futures迭代器和timeout两个参数

    默认timeout=None,阻塞等待任务执行完成,并返回执行完成的future对象迭代器,迭代器是通过yield实现的。 

    timeout>0,等待timeout时间,如果timeout时间到仍有任务未能完成,不再执行并抛出异常TimeoutError

    复制代码
    import time
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor, as_completed
    

    start = time.time()
    with ProcessPoolExecutor(max_workers
    =2) as pool:
    futures
    = [ pool.submit(gcd, pair) for pair in numbers]
    for future in futures:
    print '执行中:%s, 已完成:%s' % (future.running(), future.done())
    print '#### 分界线 ####'
    for future in as_completed(futures, timeout=2):
    print '执行中:%s, 已完成:%s' % (future.running(), future.done())
    end
    = time.time()
    print 'Took %.3f seconds.' % (end - start)

    复制代码
    • wait

    wait方法接会返回一个tuple(元组),tuple中包含两个set(集合),一个是completed(已完成的)另外一个是uncompleted(未完成的)。

    使用wait方法的一个优势就是获得更大的自由度,它接收三个参数FIRST_COMPLETED, FIRST_EXCEPTION和ALL_COMPLETE,默认设置为ALL_COMPLETED。

    复制代码
    import time
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor, as_completed, wait, ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION
    

    start = time.time()
    with ProcessPoolExecutor(max_workers
    =2) as pool:
    futures
    = [ pool.submit(gcd, pair) for pair in numbers]
    for future in futures:
    print '执行中:%s, 已完成:%s' % (future.running(), future.done())
    print '#### 分界线 ####'
    done, unfinished
    = wait(futures, timeout=2, return_when=ALL_COMPLETED)
    for d in done:
    print '执行中:%s, 已完成:%s' % (d.running(), d.done())
    print d.result()
    end
    = time.time()
    print 'Took %.3f seconds.' % (end - start)

    复制代码

    由于设置了ALL_COMPLETED,所以wait等待所有的task执行完成,可以看到6个任务都执行完成了。

    复制代码
    执行中:True, 已完成:False
    执行中:True, 已完成:False
    执行中:True, 已完成:False
    执行中:True, 已完成:False
    执行中:False, 已完成:False
    执行中:False, 已完成:False
    #### 分界线 ####
    执行中:False, 已完成:True
    执行中:False, 已完成:True
    执行中:False, 已完成:True
    执行中:False, 已完成:True
    执行中:False, 已完成:True
    执行中:False, 已完成:True
    Took 1.518 seconds.
    复制代码

    如果我们将配置改为FIRST_COMPLETED,wait会等待直到第一个任务执行完成,返回当时所有执行成功的任务。这里并没有做并发控制。

    重跑,结构如下,可以看到执行了2个任务。

    复制代码
    执行中:True, 已完成:False
    执行中:True, 已完成:False
    执行中:True, 已完成:False
    执行中:True, 已完成:False
    执行中:False, 已完成:False
    执行中:False, 已完成:False
    #### 分界线 ####
    执行中:False, 已完成:True
    执行中:False, 已完成:True
    Took 1.517 seconds.
    复制代码
     
  • 相关阅读:
    Day18-lvs
    mysql日志
    【杂文】我们都有光明的前途(回忆录)
    【杂文】银色的 NOI2020(退役记)
    【杂文】SCOI2020 游记
    【学习笔记】字符串—广义后缀自动机
    【学习笔记】数论、数学—常见定理、结论、性质汇总
    【杂文】随心一记
    【杂文】CSP2019 蒟蒻AFO(假)记
    【模板整合计划】目录
  • 原文地址:https://www.cnblogs.com/python001-vip/p/12689976.html
Copyright © 2011-2022 走看看