zoukankan      html  css  js  c++  java
  • python ThreadPoolExecutor线程池和ProcessPoolExecutor进程池

    前言

    Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候我们就要编写自己的线程池/进程池,以空间换时间。但从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象,对编写线程池/进程池提供了直接的支持。
    相比 threading 等模块,该模块通过 submit 返回的是一个 future 对象,它是一个未来可期的对象,通过它可以获悉线程的状态主线程(或进程)中可以获取某一个线程(进程)执行的状态或者某一个任务执行的状态及返回值:

    主线程可以获取某一个线程(或者任务的)的状态,以及返回值。
    当一个线程完成的时候,主线程能够立即知道。
    让多线程和多进程的编码接口一致。

    线程池的使用

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    
    def task(i):
          time.sleep(i)
          print(f'task{i} completed')
          pass
    
    thread_executor = ThreadPoolExecutor(max_workers=7)  # 创建一个最大容纳数量为7的线程池
    start = time.time()
    
    
    task1 = thread_executor.submit(task, 0)
    task1 = thread_executor.submit(task, 1)
    task1 = thread_executor.submit(task, 2)
    task1 = thread_executor.submit(task, 3)
    task1 = thread_executor.submit(task, 4)
    # 打印该任务是否执行完毕
    print(task1.done())
    # 只有未被提交的到线程池(在等待提交的队列中)的任务才能够取消
    print(task3.cancel())
    time.sleep(4)  # 休眠4秒钟之后,线程池中的任务全部执行完毕,可以打印状态
    print(task1.done())
    
    print(task1.result())  # 该任务的return 返回值  该方法是阻塞的。
    
    thread_executor.shutdown(wait=True)
    end = time.time()
    print(f'Time consuming {end - start}')
    
    • 执行结果
    task0 completed
    False
    False
    task1 completed
    task2 completed
    task3 completed
    True
    None
    task4 completed
    Time consuming 4.002375364303589
    
    • ThreadPoolExecutor构造实例的时候,传入max_workers参数来设置线程池中最多能同时运行的线程数目。
    • 使用submit函数来提交线程需要执行的任务(函数名和参数)到线程池中,并返回该任务的句柄(类似于文件、画图),注意submit()不是阻塞的,而是立即返回。
    • 通过submit函数返回的任务句柄,能够使用done()方法判断该任务是否结束。上面的例子可以看出,由于任务有2s的延时,在task1提交后立刻判断,task1还未完成,而在延时4s之后判断,task1就完成了。
    • 使用cancel()方法可以取消提交的任务,如果任务已经在线程池中运行了,就取消不了。这个例子中,线程池的大小设置为2,任务已经在运行了,所以取消失败。如果改变线程池的大小为1,那么先提交的是task1,task2还在排队等候,这是时候就可以成功取消。
    • 使用result()方法可以获取任务的返回值。查看内部代码,发现这个方法是阻塞的。

    as_completed

    • 上面虽然提供了判断任务是否结束的方法,但是不能在主线程中一直判断啊。有时候我们是得知某个任务结束了,就去获取结果,而不是一直判断每个任务有没有结束。这是就可以使用as_completed方法一次取出所有任务的结果。
    • as_completed()方法是一个生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会yield这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有的任务结束。从结果也可以看出,先完成的任务会先通知主线程。

    map

    • 除了上面的as_completed方法,还可以使用executor.map方法,但是有一点不同。
    from concurrent.futures import ThreadPoolExecutor
    import time
    
    
    def task(i):
          time.sleep(i)
          print(f'task{i} completed')
          pass
    
    
    thread_executor = ThreadPoolExecutor(max_workers=7)  # 创建一个最大容纳数量为7的线程池
    start = time.time()
    
    params = [i for i in range(5)]
    thread_executor.map(task, params)
    
    thread_executor.shutdown(wait=True)
    end = time.time()
    print(f'Time consuming {end - start}')
    
    • 执行结果
    task0 completed
    task1 completed
    task2 completed
    task3 completed
    task4 completed
    Time consuming 4.002383232116699
    

    ProcessPoolExecutor使用

    ProcessPoolExecutor在使用上和ThreadPoolExecutor大致是一样的,它们在futures中的方法也是相同的,但是对于map()方法ProcessPoolExecutor会多一个参数chunksize(ThreadPoolExecutor中这个参数没有任何作用),chunksize将迭代对象切成块,将其作为分开的任务提交给pool,对于很大的iterables,设置较大chunksize可以提高性能。

  • 相关阅读:
    61. 最长不含重复字符的子字符串
    60. 礼物的最大价值 (未理解)
    59. 把数字翻译成字符串
    58. 把数组排成最小的数
    57. 数字序列中某一位的数字 (不懂)
    spring data jpa 官方文档
    idea 编译报错 源发行版 1.8 需要目标发行版 1.8
    idea maven 依赖报错 invalid classes root
    solr
    spring boot 官方文档
  • 原文地址:https://www.cnblogs.com/hziwei/p/12896856.html
Copyright © 2011-2022 走看看