zoukankan      html  css  js  c++  java
  • concurrent.futures模块简单介绍(线程池,进程池)

    一、基类Executor

    Executor类是ThreadPoolExecutor 和ProcessPoolExecutor 的基类。它为我们提供了如下方法:

    submit(fn, *args, **kwargs):提交任务。以 fn(*args **kwargs) 方式执行并返回 Future 对像。

    fn:函数地址。

    *args:位置参数。

    **kwargs:关键字参数。

    map(func, *iterables, timeout=None, chunksize=1):

    func:函数地址。

    iterables:一个可迭代对象,以迭代的方式将参数传递给函数。

    timeout:这个参数没弄明白,如果是None等待所有进程结束。

    chunksize:使用 ProcessPoolExecutor 时,这个方法会将 iterables 分割任务块,并作为独立的任务提交到执行池中。这些块的数量可以由 chunksize 指定设置。 对很长的迭代器来说,设置chunksize 值比默认值 1 能显著地提高性能。 chunksize 对 ThreadPoolExecutor 没有效果。

    shutdown(wait=True):如果为True会等待线程池或进程池执行完成后释放正在使用的资源。如果 wait 为 False,将立即返回,所有待执行的期程完成执行后会释放已分配的资源。 不管 wait 的值是什么,整个 Python 程序将等到所有待执行的期程完成执行后才退出。

    二、线程池对象

    ThreadPoolExecutor 是 Executor 的子类,下面介绍ThreadPoolExecutor 的参数。

    class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=()):

    max_workers:线程池的数量。

    thread_name_prefix:线程名前缀。默认线程名ThreadPoolExecutor-线程数。

    initializer:一个函数或方法,在启用线程前会调用这个函数(给线程池添加额外任务)

    initargs :以元祖的方式给initializer中的函数传递参数。

    这里需要说明的是除了max_workers这个参数外其它三个参数基本很少用。max_workers很好理解就是线程池的数量。

    下面来说initializer和initargs 这两个奇怪的家伙。

    示例一:

    from concurrent.futures import ThreadPoolExecutor
    def work():
        print('工作线程')
    def test(num):
        print('test:',num)
    executor = ThreadPoolExecutor(max_workers=2,initializer=test(7))  # 开启2个线程  initializer指定参数test(7)
    executor.submit(work)  
    executor.submit(work)
    
    # 打印内容如下
    test: 7
    工作线程
    工作线程

    示例二:

    from concurrent.futures import ThreadPoolExecutor
    def work():
        print('工作线程')
    def test(num):
        print('test:',num)
    executor = ThreadPoolExecutor(max_workers=2,initializer=test,initargs=(7,)) # 这里我们使用initargs=(7,)的方式给test传递参数。
    executor.submit(work)
    executor.submit(work)
    
    # 打印内容如下
    test: 7
    工作线程
    工作线程
    test: 7

    通过示例一和示例二我们可以发现initializer=test(7)时,test函数只被调用了1次,当initializer=test,initargs=(7,)时,test被调用了2次。具体原因没有去分析。感觉没什么用。以后有时间看看源码在补上。

    三、进程池对象

    ProcessPoolExecutor 也是 Executor 的子类,下面是ProcessPoolExecutor 参数介绍:

    class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

    max_workers:工作进程数。如果 max_workers 为 None 或未给出,它将默认为机器的处理器个数。 如果 max_workers 小于等于 0,则将引发 ValueError。 在 Windows 上,max_workers 必须小于等于 61,否则将引发 ValueError。 如果 max_workers 为 None,则所选择的默认最多为 61,即使存在更多处理器。

    mp_context :可以是一个多进程上下文或是 None。 它将被用来启动工作进程。 如果 mp_context 为 None 或未给出,将使用默认的多进程上下文。

    initializer:一个函数或方法,在启用线程前会调用这个函数。

    initargs :以元祖的方式给initializer中的函数传递参数。

    关于说initializer和initargs 与ThreadPoolExecutor 类似这里不多说了。


    四、创建线程池

    from concurrent.futures import ThreadPoolExecutor
    import time
    def work(num):
        time.sleep(1)
        print('工作线程:',num)
    if __name__ == '__main__':
        executor = ThreadPoolExecutor(max_workers=5)  # 创建线程池,数量为5
        for i in range(5):
            executor.submit(work, i)
        print('主线程')
    
    # 打印内容如下
    主线程
    工作线程:   0
    工作线程:   1
    工作线程:   2
    工作线程:   3
    工作线程:   4
    # 使用shutdown等待所有线程结束后在打印主线程 from concurrent.futures import ThreadPoolExecutor import time def work(num): time.sleep(1) print('工作线程:',num) if __name__ == '__main__': executor = ThreadPoolExecutor(max_workers=5) # 创建线程池,数量为5 for i in range(5): executor.submit(work, i) executor.shutdown(wait=True) # 等待线程池结束 print('主线程') # 打印内容如下 工作线程: 0 工作线程: 1 工作线程: 2 工作线程: 3 工作线程: 4 主线程

    如果想要在线程执行的过程中添加额外的功能,可以使用initializer参数,如下:

    from concurrent.futures import ThreadPoolExecutor
    
    def work(num):
        print('工作线程:',num)
    def test(num):
        print('额外任务:',num)
    if __name__ == '__main__':
        executor = ThreadPoolExecutor(max_workers=5,initializer=test,initargs=(7,)) # 添加额外任务
        for i in range(5):
            executor.submit(work, i)
        executor.shutdown(wait=True)
        print('主线程')
    
    # 打印内容如下
    额外任务: 7
    工作线程: 0
    额外任务: 7
    工作线程: 1
    额外任务: 7
    工作线程: 2 
    额外任务: 7
    工作线程: 3 
    额外任务: 7
    工作线程: 4 
    主线程

    五、进程池

    进程池与线程池用法基本一致,只是名字和实现不一样而已。

    from concurrent.futures import ProcessPoolExecutor
    import time
    def work(num):
        time.sleep(1)
        print('工作进程:',num)
    if __name__ == '__main__':
        executor = ProcessPoolExecutor(max_workers=5)  # 创建进程池,数量为5
        for i in range(5):
            executor.submit(work, i)
        print('主线程')
    
    # 打印内容如下
    主线程
    工作进程: 0
    工作进程: 1
    工作进程: 2
    工作进程: 3
    工作进程: 4
    
    # 使用shutdown等待所有线程结束后在打印主线程
    from concurrent.futures import ProcessPoolExecutor
    import time
    def work(num):
        time.sleep(1)
        print('工作进程:',num)
    if __name__ == '__main__':
        executor = ProcessPoolExecutor(max_workers=5)  # 创建进程池,数量为5
        for i in range(5):
            executor.submit(work, i)
        executor.shutdown(wait=True)  # 等待进程池结束
        print('主线程')
    # 打印内容如下
    工作进程: 0
    工作进程: 1
    工作进程: 2
    工作进程: 3
    工作进程: 4
    主线程

    如果想要在线程执行的过程中添加额外的功能,可以使用initializer参数,如下:

    from concurrent.futures import ProcessPoolExecutor
    
    def work(num):
        print('工作进程:',num)
    def test(num):
        print('额外任务:',num)
    if __name__ == '__main__':
        executor = ProcessPoolExecutor(max_workers=5,initializer=test,initargs=(7,)) # 添加额外任务
        for i in range(5):
            executor.submit(work, i)
        executor.shutdown(wait=True)
        print('主线程')
    
    # 打印内容如下
    额外任务: 7
    工作进程: 0
    工作进程: 1
    工作进程: 2
    工作进程: 3
    工作进程: 4
    额外任务: 7
    额外任务: 7
    额外任务: 7
    额外任务: 7
    主线程

    六、Future Objects

    future类封装了可调用文件的异步执行。future的实例由executor.submit()时被创建的,除了测试之外不应该直接实例化future对象,所以为了获取future对象我们可以f=executor.submit()即可。

    class concurrent.futures.Future类中的方法:

    cancel():尝试取消执行线程池中的函数调用。如果调用当前正在执行或已完成运行,并且无法取消,则方法将返回false,否则调用将被取消,方法将返回true。

    cancelled():如果线程池中的函数执行成功返回True,调用失败返回false。

    running():如果线程池中的调用当前正在执行且无法取消,则返回true。

    done():如果呼叫成功取消或完成运行,则返回true。否则返回false

    result(timeout=None):返回线程函数的返回值。如果线程函数未执行完成,则此方法将最多等待timeout秒,如果线程函数未在超时秒内完成,则将引发concurrent.futures.TimeoutError。超时可以是int或float。如果未指定超时 timeout=None,则会阻塞,一直等待函数执行完成。如果在线程函数完成之前使用future对象取消了执行,则将引发CancelederRor。如果调用raised,此方法将引发相同的异常。

    exception(timeout=None):返回线程函数引发的异常。如果线程函数尚未完成,则此方法将最多等待timeout秒。如果线程函数未在超时秒内完成,则将引发concurrent.futures.TimeoutError。超时可以是int或float。如果未指定超时或无超时timeout=None,则会一直等待。如果在线程函数完成之前使用future对象取消了执行,则将引发CancelederRor如果线程函数完成但未引发,则返回None。

    add_done_callback(fn):将可调用fn附加到future对象。当future对象被取消或结束运行时,将调用fn,其中future对象是惟一的参数。添加的可调用对象是按照添加顺序调用的,并且总是在属于添加它们的进程的线程中调用。如果Callable引发异常子类,它将被记录并忽略。如果可调用引发BaseException子类,则行为未定义。


    七、Module Functions

    concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED):将fs绑定一个future实例,如果future执行完成或取消执行fs函数。

    fs:fs是一个函数绑定在future实例(可能由不同的执行器实例创建)。返回2个命名元组的集合。第一组名为“done”,包含等待完成,完成前(完成或future对象取消)。第二组名为“not_done”,包含未完成的future(未完成或正在运行的future)。

    timeout:如果为None一直等待,否则会等待timeout秒。

    return_when :必须是如下范围。

    Constant

    Description

    FIRST_COMPLETED

    当任何future 完成或取消或者线程函数执行完成时。

    FIRST_EXCEPTION

    当future通过引发异常而结束时,线程函数将返回。如果没有future引发异常,那么它相当于所有已完成的。

    ALL_COMPLETED

    当所有future完成或取消时,函数将返回。

    concurrent.futures.as_completed(fs, timeout=None):返回一个future迭代器。

    fs:可迭代对象的future。

    timeout:超时时间,如果为None会一直阻塞直到执行完成。否则将等待timeout秒。

    from concurrent.futures._base import as_completed
    from concurrent.futures import ThreadPoolExecutor
    
    def work(num):
        return num ** 2
    if __name__ == '__main__':
        executor = ThreadPoolExecutor(max_workers=5)
        future_list = []  # 存放future对象
        for i in range(5):
            future_list.append(executor.submit(work, i))
        for future in as_completed(future_list):   # 这是一个无聊的用法
            res = future.result()
            print(f'结果:{res}')  # 打印工作线程返回的结果
    # 打印结果如下

    结果:0
    结果:4
    结果:16
    结果:1
    结果:9

     

    参考文档:https://docs.python.org/3/library/concurrent.futures.html

  • 相关阅读:
    Django(app的概念、ORM介绍及编码错误问题)
    Django(完整的登录示例、render字符串替换和redirect跳转)
    Construct Binary Tree from Preorder and Inorder Traversal
    Single Number II
    Single Number
    Binary Tree Level Order Traversal II
    Binary Tree Level Order Traversal
    Binary Tree Zigzag Level Order Traversal
    Recover Binary Search Tree
    Add Binary
  • 原文地址:https://www.cnblogs.com/caesar-id/p/11215910.html
Copyright © 2011-2022 走看看