concurrent.futures模块
这是一个python实现进程池和线程池的模块,由于频繁的创建线程或者进程是一件较为繁琐的操作,且浪费时间和资源。于是Python提供了线程池和进程池的模块。我们只需要将要执行的任务提交给池对象,池会自动创建线程或是进程对象异步执行这些任务,最后拿到返回结果。这些池中的进程和线程创建后不会立即自动销毁,而是等待下一次任务的提交,方便我们重复使用。
ThreadPoolExecutor对象
只需要定义一个池执行器对象,指定该池对象的最大任务数。第一次向池中提交任务时,池会对应创建与任务数对应的线程数,但不超过提供的最大任务数,超过最大任务数的任务会排队等待前一个任务执行完成。
每次提交一个任务可以得到这个任务的future对象。即使用submit()方法将会得到这个future对象,可以通过该对象的方法查看该任务是否完成,完成的结果,以及是否报错等信息,同时还支持绑定回调函数。
future对象方法 | 含义 |
done() | 该future对象对应的任务完成返回True,否则返回False |
cancelled() | 任务是否成功被取消,返回bool,且任务取消会标记对应future对象状态为CANCELED |
running() | 是否正在运行 |
cancel() | 尝试取消该任务,已经执行无法取消,返回false,否则返回True |
result(timeout=None) | 取任务执行结果,默认一直等待,或者设置timeout等待时间,超时抛出异常 |
exception(timeout=None) | 取任务执行时的错误信息,timeout等待时间,超时抛出异常 |
f.add_done_callback(fn) | 设置任务结束时的回调函数,该函数使用该future对象作为参数 |
示例:提交大量的任务,获取这些任务的结果
from concurrent import futures import threading def func1(): logging.info("this is func") time.sleep(3) return 1233 def main(): executer = futures.ThreadPoolExecutor(3) fs = [] for i in range(10): f = executer.submit(func1) fs.append(f) while True: for f in fs: # 遍历列表判断是否全部完成 if not f.done(): # 只要有一个任务没有完成,结束for循环,再一次从头判断 time.sleep(1) break else: # for 正常退出,全部执行完成 ret = [] for f in fs: ret.append((f.result())) # 将结果保存 print("任务执行完成后,shutdown前:", threading.active_count()) executer.shutdown() # 程序结束前清空执行器 break print("计算结果:", ret) # 打印结果,即任务函数的返回值 if __name__ == "__main__": main()
执行器对象支持上下文管理,所以可以使用with语句
with futures.ThreadPoolExecutor(3) as executor: fs = [] for i in range(10): f = executer.submit(func1) fs.append(f) # ....后代码相同,最后不需要执行executor.shutdown(),上下文自动执行
错误处理
执行池在执行提交的任务时候,如果该任务发生错误,将会停止该任务的执行,设置该任务的future对象状态为FINISHED
,并将future对象_exception属性设置为True。在我们获取future对象的结果前,需要判断这个任务是否是由于错误而终止。如果发生过异常,那么该异常在future._exception
属性上, 但是不宜直接获取,推荐使用future.exception()
接口获得这个错误。上面的代码部分修改后。
def func1():
logging.info("this is func")
time.sleep(3)
return 1233
def main():
executer = futures.ThreadPoolExecutor(3)
fs = []
for i in range(10):
f = executer.submit(func1)
fs.append(f)
while True: for f in fs: if not f.done(): time.sleep(1) break else: ret = [] for f in fs: if f._exception: # 该future对象的发生了异常 print(f.exception()) # 这里选择打印信息即可,使用excption()方法可以获取这个异常信息 continue ret.append((f.result())) # 所有正确的结果才能进入该列表 executer.shutdown() break
设置回调函数
任务执行完成时可以自动进行函数的回调,在提交任务时没有设置提交这个回调函数的接口,只能使用提交任务后获得future对象动态增加。future.add_done_callback(fn),fn函数必须为一参函数且未来的实参为该future对象。
def callbackfunc(future): global count with lock: count += 1 if future._exception: print(future.exception()) else: ret.append(future.result()) def funcx(i): if i > 9: raise IndexError("i > 9 error") return 1233 if __name__ == "__main__": lock = threading.Lock() with futures.ThreadPoolExecutor(3) as executor: ret = [] count = 0 for i in range(10): f = executor.submit(funcx, i+1) f.add_done_callback(callbackfunc) # 添加回调函数 while count < 10: time.sleep(0.2) print(ret)
这里尝试使用回调函数将任务计算的结果添加到ret类表中,各个线程分别添加。但事实并非完全如此,在添加回调函数时f.add_done_callback(callbackfunc)
,如果该任务已经被完成了,这个回调函数将会交被主线程调用,也就是说,如果该任务执行时间比添加任务的时间还短,那么该回调函数实际是被主线程执行,for循环中实际为一种串行执行。所以在提交任务之后,应该在尽量短的时间内给该任务的future对象提交callback。
源码:
def add_done_callback(self, fn): with self._condition: if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: self._done_callbacks.append(fn) # 添加,将来任务执行完成后自动调用 return fn(self) # 否则,任务已完成,线程自己调用
ProcessPoolExecutor
一个进程池执行器类,使用方式和线程池执行器基本一致。只是创建进程去处理这些任务。
from concurrent import futures def func(): sum = 0 for i in range(100000): sum += 1 # 同样支持上下文管理,结束时自动执行shutdown() with futures.ProcessPoolExecutor(3) as executor: future = exector.submit(func) while not future.done(): pass if future._exception: print(future.exceptiion()) else: print(future.result())
只提交了一个任务给进程池执行,将只会创建一个进程执行任务。一般为计算密集型的任务才使用进程池执行,IO密集型优先使用多线程。