在一个池子里,放固定数量的线程,这些线程等待任务,一旦有任务来,就有线程自发的去执行任务。
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
concurrent.futures 这个模块是异步调用的机制
concurrent.futures 提交任务都是用submit
for + submit 多个任务的提交
shutdown 是等效于Pool中的close+join,是指不允许再继续向池中增加任务,
然后让父进程(线程)等待池中所有进程执行完所有任务。
线程池,多进程,Pool进程效率对比
线程池:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
def func(num):
sum = 0
for i in range(num):
sum += i ** 2
print(sum)
if __name__ == '__main__':
tp = ThreadPoolExecutor(5)
start = time.time()
for i in range(1000):
tp.submit(func,i)
tp.shutdown()
print(time.time()-start)
ProcessPoolExecutor进程池
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
def func(num):
sum = 0
for i in range(num):
sum += i ** 2
print(sum)
if __name__ == '__main__':
tp = ProcessPoolExecutor(5)
start = time.time()
for i in range(1000):
tp.submit(func,i)
tp.shutdown()
print(time.time()-start)
from multiprocessing import Pool
import time
def func(num):
sum = 0
for i in range(num):
sum += i **2
print(sum)
if __name__ == '__main__':
p = Pool(5)
start = time.time()
for i in range(1000):
p.apply_async(func,args=(i,))
p.close()
p.join()
print(time.time()-start)
结果:针对计算密集的程序来说
不管是Pool的进程池还是ProcessPoolExecutor()的进程池,执行效率相当
ThreadPoolExecutor 的效率要差很多
所以 当计算密集时,使用多进程。
多任务的提交
from concurrent.futures import ThreadPoolExecutor
import time
def func(num):
sum = 0
for i in range(num):
sum += i ** 2
print(sum)
t = ThreadPoolExecutor(20)
start =time.time()
t.map(func,range(1000))
#map提交多个任务到线程池中,等效于for i in range(1000): tp.submit(func,i)
t.shutdown()
print(time.time() - start)
线程池的返回值
from concurrent.futures import ThreadPoolExecutor
def func(num):
sum = 0
for i in range(num):
sum += i ** 2
return sum
t = ThreadPoolExecutor(20)
下列代码是用for + submit提交多个任务的方式,对应拿结果的方法是result
lst = []
for i in range(1000):
ret = t.submit(func,i)
lst.append(ret)
t.shutdown()
[print(i.result()) for i in lst]
在Pool进程池中拿结果,是用get方法。
在ThreadPoolExecutor里边拿结果是用result方法
下列代码是用map的方式提交多个任务,
对应 拿结果的方法是__next__() 返回的是一个生成器对象
res = t.map(func,range(1000))
t.shutdown()
print(res.__next__())
print(res.__next__())
print(res.__next__())
print(res.__next__())
print(res.__next__())
print(res.__next__())
回调函数
from concurrent.futures import ThreadPoolExecutor
import time
def func(num):
sum = 0
for i in range(num):
sum += i ** 2
return sum
def call_back_fun(res):
print(res)
print(res.result())
t = ThreadPoolExecutor(20)
for i in range(1000):
t.submit(func,i).add_done_callback(call_back_fun)
t.shutdown()
线程池的回调函数不是父线程调用的
from concurrent.futures import ProcessPoolExecutor
import os
def func(num):
sum = 0
for i in range(num):
sum += i ** 2
return sum
def call_back_fun(res):
print(res.result(),os.getpid())
if __name__ == '__main__':
print(os.getpid())
t = ProcessPoolExecutor(20)
for i in range(1000):
t.submit(func,i).add_done_callback(call_back_fun)
t.shutdown()
不管是ProcessPoolExecutor的进程池 还是Pool的进程池,回调函数都是父进程调用的。