zoukankan      html  css  js  c++  java
  • python线程池ThreadPoolExecutor与进程池ProcessPoolExecutor

     python中ThreadPoolExecutor(线程池)与ProcessPoolExecutor(进程池)都是concurrent.futures模块下的,主线程(或进程)中可以获取某一个线程(进程)执行的状态或者某一个任务执行的状态及返回值。

    通过submit返回的是一个future对象,它是一个未来可期的对象,通过它可以获悉线程的状态

    ThreadPoolExecutor(线程池)

    通过submit函数提交执行的函数到线程池中,done()判断线程执行的状态:
     1 import time
     2 from concurrent.futures import ThreadPoolExecutor
     3 
     4 def get_thread_time(times):
     5     time.sleep(times)
     6     return times
     7 
     8 # 创建线程池  指定最大容纳数量为4
     9 executor = ThreadPoolExecutor(max_workers=4)
    10 # 通过submit提交执行的函数到线程池中
    11 task1 = executor.submit(get_thread_time, (1))
    12 task2 = executor.submit(get_thread_time, (2))
    13 task3 = executor.submit(get_thread_time, (3))
    14 task4 = executor.submit(get_thread_time, (4))
    15 print("task1:{} ".format(task1.done()))
    16 print("task2:{}".format(task2.done()))
    17 print("task3:{} ".format(task3.done()))
    18 print("task4:{}".format(task4.done()))
    19 time.sleep(2.5)
    20 print('after 2.5s {}'.format('-'*20))
    21 
    22 done_map = {
    23     "task1":task1.done(),
    24     "task2":task2.done(),
    25     "task3":task3.done(),
    26     "task4":task4.done()
    27 }
    28 # 2.5秒之后,线程的执行状态
    29 for task_name,done in done_map.items():
    30     if done:
    31         print("{}:completed".format(task_name))

    result:

    task1:False 
    task2:False
    task3:False 
    task4:False
    after 2.5s --------------------
    task1:completed
    task2:completed

    初始状态4个task都是未完成状态,2.5秒后task1和task2执行完成,task3和task由于是sleep(3) sleep(4)所以仍然是未完成的sleep状态

    通过wait()判断线程执行的状态:

    wait(fs, timeout=None, return_when=ALL_COMPLETED),wait接受3个参数,fs表示执行的task序列;timeout表示等待的最长时间,超过这个时间即使线程未执行完成也将返回;return_when表示wait返回结果的条件,默认为ALL_COMPLETED全部执行完成再返回:

     1 import time
     2 from concurrent.futures import (
     3     ThreadPoolExecutor, wait
     4 )
     5 
     6 
     7 def get_thread_time(times):
     8     time.sleep(times)
     9     return times
    10 
    11 
    12 start = time.time()
    13 executor = ThreadPoolExecutor(max_workers=4)
    14 task_list = [executor.submit(get_thread_time, times) for times in [1, 2, 3, 4]]
    15 i = 1
    16 for task in task_list:
    17     print("task{}:{}".format(i, task))
    18     i += 1
    19 print(wait(task_list, timeout=2.5))

    wait在2.5秒后返回线程的状态,result:

    task1:<Future at 0x7ff3c885f208 state=running>
    task2:<Future at 0x7ff3c885fb00 state=running>
    task3:<Future at 0x7ff3c764b2b0 state=running>
    task4:<Future at 0x7ff3c764b9b0 state=running>
    DoneAndNotDoneFutures(
    done
    ={<Future at 0x7ff3c885f208 state=finished returned int>, <Future at 0x7ff3c885fb00 state=finished returned int>},

    not_done={<Future at 0x7ff3c764b2b0 state=running>, <Future at 0x7ff3c764b9b0 state=running>})

    可以看到在timeout 2.5时,task1和task2执行完毕,task3和task4仍在执行中

    通过map返回线程的执行结果:

     1 import time
     2 from concurrent.futures import ThreadPoolExecutor
     3 
     4 
     5 def get_thread_time(times):
     6     time.sleep(times)
     7     return times
     8 
     9 
    10 start = time.time()
    11 executor = ThreadPoolExecutor(max_workers=4)
    12 
    13 i = 1
    14 for result in executor.map(get_thread_time,[2,3,1,4]):
    15     print("task{}:{}".format(i, result))
    16     i += 1

    map(fn, *iterables, timeout=None),第一个参数fn是线程执行的函数;第二个参数接受一个可迭代对象;第三个参数timeout跟wait()的timeout一样,但由于map是返回线程执行的结果,如果timeout小于线程执行时间会抛异常TimeoutError。

    import time
    from concurrent.futures import ThreadPoolExecutor
    
    
    def get_thread_time(times):
        time.sleep(times)
        return times
    
    
    start = time.time()
    executor = ThreadPoolExecutor(max_workers=4)
    
    i = 1
    for result in executor.map(get_thread_time,[2,3,1,4]):
        print("task{}:{}".format(i, result))
        i += 1

    map的返回是有序的,它会根据第二个参数的顺序返回执行的结果:

    task1:2
    task2:3
    task3:1
    task4:4
    as_completed返回线程执行结果:
     1 import time
     2 from collections import OrderedDict
     3 from concurrent.futures import (
     4     ThreadPoolExecutor, as_completed
     5 )
     6 
     7 
     8 def get_thread_time(times):
     9     time.sleep(times)
    10     return times
    11 
    12 
    13 start = time.time()
    14 executor = ThreadPoolExecutor(max_workers=4)
    15 task_list = [executor.submit(get_thread_time, times) for times in [2, 3, 1, 4]]
    16 task_to_time = OrderedDict(zip(["task1", "task2", "task3", "task4"],[2, 3, 1, 4]))
    17 task_map = OrderedDict(zip(task_list, ["task1", "task2", "task3", "task4"]))
    18 
    19 for result in as_completed(task_list):
    20     task_name = task_map.get(result)
    21     print("{}:{}".format(task_name,task_to_time.get(task_name)))
    
    

    task1、task2、task3、task4的等待时间分别为2s、3s、1s、4s,通过as_completed返回执行完的线程结果,as_completed(fs, timeout=None)接受2个参数,第一个是执行的线程列表,第二个参数timeout与map的timeout一样,当timeout小于线程执行时间会抛异常TimeoutError。

    task3:1
    task1:2
    task2:3
    task4:4

    通过执行结果可以看出,as_completed返回的顺序是线程执行结束的顺序,最先执行结束的线程最早返回。

    ProcessPoolExecutor

    对于频繁的cpu操作,由于GIL锁的原因,多个线程只能用一个cpu,这时多进程的执行效率要比多线程高。

    线程池操作斐波拉切:

     1 import time
     2 from concurrent.futures import ThreadPoolExecutor
     3 
     4 
     5 def fib(n):
     6     if n < 3:
     7         return 1
     8     return fib(n - 1) + fib(n - 2)
     9 
    10 
    11 start_time = time.time()
    12 executor = ThreadPoolExecutor(max_workers=4)
    13 task_list = [executor.submit(fib, n) for n in range(3, 35)]
    14 thread_results = [task.result() for task in as_completed(task_list)]
    15 print(thread_results)
    16 print("ThreadPoolExecutor time is: {}".format(time.time() - start_time))

    result:

    [8, 5, 3, 2, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 10946, 46368, 6765, 28657, 17711, 75025, 121393, 196418, 317811, 514229, 832040, 1346269, 2178309, 3524578, 5702887]
    ThreadPoolExecutor time is: 4.998981237411499

    进程池操作斐波拉切:

     1 import time
     2 from concurrent.futures import ProcessPoolExecutor
     3 
     4 
     5 def fib(n):
     6     if n < 3:
     7         return 1
     8     return fib(n - 1) + fib(n - 2)
     9 
    10 
    11 start_time = time.time()
    12 executor = ProcessPoolExecutor(max_workers=4)
    13 task_list = [executor.submit(fib, n) for n in range(3, 35)]
    14 process_results = [task.result() for task in as_completed(task_list)]
    15 print(process_results)
    16 print("ProcessPoolExecutor time is: {}".format(time.time() - start_time))

    result:

    [2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765, 10946, 17711, 75025, 28657, 46368, 196418, 121393, 317811, 514229, 832040, 1346269, 2178309, 3524578, 5702887]
    ProcessPoolExecutor time is: 3.3585257530212402

    可以看出,对于频繁cpu操作进程是优于线程的,3.3s<4.9s

    ProcessPoolExecutor在使用上和ThreadPoolExecutor大致是一样的,它们在futures中的方法也是相同的,但是对于map()方法ProcessPoolExecutor会多一个参数chunksize(ThreadPoolExecutor中这个参数没有任何作用),chunksize将迭代对象切成块,将其作为分开的任务提交给pool,对于很大的iterables,设置较大chunksize可以提高性能。

  • 相关阅读:
    Bootstrap Alert Auto Close
    event.preventDefault()
    jquery click & get value of attributes of a href
    TFS build dotCover StyleCop
    Asp.net MVC4 Knockoutjs BootStrap Ace NinJect Jqgrid sqlserver2008
    Rendering a simple ASP.NET MVC PartialView using JQuery Ajax Post call
    asp.net mvc JQGrid
    Knockoutjs lostfocus event
    Dota2 demo手游项目历程
    龙之森林项目进程
  • 原文地址:https://www.cnblogs.com/FG123/p/9704233.html
Copyright © 2011-2022 走看看