https://www.cnblogs.com/pdev/p/10685093.html
1. 以下为第一种,函数级的异步执行:
import time from concurrent.futures import ThreadPoolExecutor def task1_fn(url): time.sleep(10) return (url + " FINISHED") def task2_fn(url): time.sleep(3) return (url + " FINISHED") def RunPool(): pool = ThreadPoolExecutor(1) # 启动一个线程池 task1 = pool.submit(task1_fn, "CPU1") # 在另外的线程中运行RunBenchmark() task2 = pool.submit(task2_fn, "CPU2") # 在另外的线程中运行RunBenchmark() cnt = 0 while True: if cnt == 10: task1.cancel() print("task1:%s" % task1.done()) print("task2:%s" % task2.done()) if task1.done(): print('task1 is over') if task1.done() and task2.done(): print('all is over.') break time.sleep(1) cnt += 2 print('task1 result:%s' % task1.result()) print('task2 result:%s' % task2.result()) print("bye") if __name__ == '__main__': RunPool()
2. 类级函数的的异步执行,添加了线程强制中断 pool.shutdown
import time from concurrent.futures import ThreadPoolExecutor class A(): def __init__(self): pass def test_ret(self): return 'hello world' def task1_fn(self, url): time.sleep(10) return (url + " FINISHED" + self.test_ret()) def task2_fn(self, url): time.sleep(3) return (url + " FINISHED" + self.test_ret()) def RunPool(): a = A() pool = ThreadPoolExecutor(2) # 启动一个线程池 task1 = pool.submit(a.task1_fn, "CPU1") # 在另外的线程中运行RunBenchmark() task2 = pool.submit(a.task2_fn, "CPU2") # 在另外的线程中运行RunBenchmark() cnt = 0 while True: if cnt == 10: task1.cancel() print("task1:%s" % task1.done()) print("task2:%s" % task2.done()) if task1.done(): print('task1 is over') else: task1.cancel() pool.shutdown(wait=False) print('task1 result AAAAAAAA %s' % task1.result()) break # if task1.done() and task2.done(): # print('all is over.') # break time.sleep(1) cnt += 2 print('task1 result:%s' % task1.result()) print('task2 result:%s' % task2.result()) print("bye") if __name__ == '__main__': RunPool()
3. 第一个任务一旦完成,则强制终止线程
应用场景:如果某个任务一直处于执行中,无法退出,此时就需要强制退出,而强制退出一般需要重新线程run方法,但 模块 from concurrent.futures import ThreadPoolExecutor 重写run方法较麻烦,故采用另一种方式
wait(f_lst, return_when='FIRST_COMPLETED')
f_lst中存放的是提交到线程池中的线程对象列表
import time import inspect import ctypes from concurrent.futures import ThreadPoolExecutor, wait class A(): def __init__(self): pass def test_ret(self): return 'hello world' def task1_fn(self, url): time.sleep(200) return (url + " FINISHED " + self.test_ret() + ' ' + str(var)) def task2_fn(self, url): time.sleep(5) return (url + " FINISHED " + self.test_ret()) def RunPool(): a = A() pool = ThreadPoolExecutor(2) # 启动一个线程池 task1 = pool.submit(a.task1_fn, "CPU1") # 在另外的线程中运行RunBenchmark() task2 = pool.submit(a.task2_fn, "CPU2") # 在另外的线程中运行RunBenchmark() f_lst = [task1, task2] wait(f_lst, return_when='FIRST_COMPLETED') try: print('task status, task1: %s, task2: %s' % (task1.done(), task2.done())) # print('task1 result:%s' % task1.result()) print('task2 result:%s' % task2.result()) except Exception as e: print('error: %s' % e) print("bye") if __name__ == '__main__': var = 123 RunPool()
注意:只有先完成的任务才可以使用 task.result() 方法,否则会一直卡着,等待 结果返回,而此时线程已被杀死,无法返回结果
4. 关键点说明:
1) task.cancel()当线程未运行之前可以取消,但是线程在线程池中启动后,无法再通过此方式取消
2) pool.shutdown()函数入参 wait=True表示中断线程前,等待直到线程执行完成才会中断, wait=False 表示 线程未执行完成,强制中断
3)传参方式和位置参数一样,不用使用元组
5. 适用业务:
多进程下的多线程,某个任务可能需要等待很长时间,此时就需要中断此任务,或者去执行其他的任务,如果要求线性处理,中断超时任务,就需要用此方法
针对上述情况下的单个进程,可以使用异步进程池 multiprocessing的apply_async方式,主进程和子进程同时执行,互不影响。
https://www.cnblogs.com/pdev/p/10685093.html
**1. 以下为第一种,函数级的异步执行:**`import timefrom concurrent.futures import ThreadPoolExecutor
import timefrom concurrent.futures import ThreadPoolExecutor def task1_fn(url): time.sleep(10) return (url + " FINISHED") def task2_fn(url): time.sleep(3) return (url + " FINISHED") def RunPool(): pool = ThreadPoolExecutor(1) # 启动一个线程池 task1 = pool.submit(task1_fn, "CPU1") # 在另外的线程中运行RunBenchmark() task2 = pool.submit(task2_fn, "CPU2") # 在另外的线程中运行RunBenchmark() cnt = 0 while True: if cnt == 10: task1.cancel() print("task1:%s" % task1.done()) print("task2:%s" % task2.done()) if task1.done(): print('task1 is over') if task1.done() and task2.done(): print('all is over.') break time.sleep(1) cnt += 2 print('task1 result:%s' % task1.result()) print('task2 result:%s' % task2.result()) print("bye") if __name__ == '__main__': RunPool()
**2. 类级函数的的异步执行,添加了线程强制中断 pool.shutdown**
import timefrom concurrent.futures import ThreadPoolExecutor class A(): def __init__(self): pass def test_ret(self): return 'hello world' def task1_fn(self, url): time.sleep(10) return (url + " FINISHED" + self.test_ret()) def task2_fn(self, url): time.sleep(3) return (url + " FINISHED" + self.test_ret()) def RunPool(): a = A() pool = ThreadPoolExecutor(2) # 启动一个线程池 task1 = pool.submit(a.task1_fn, "CPU1") # 在另外的线程中运行RunBenchmark() task2 = pool.submit(a.task2_fn, "CPU2") # 在另外的线程中运行RunBenchmark() cnt = 0 while True: if cnt == 10: task1.cancel() print("task1:%s" % task1.done()) print("task2:%s" % task2.done()) if task1.done(): print('task1 is over') else: task1.cancel() pool.shutdown(wait=False) print('task1 result AAAAAAAA %s' % task1.result()) break # if task1.done() and task2.done(): # print('all is over.') # break time.sleep(1) cnt += 2 print('task1 result:%s' % task1.result()) print('task2 result:%s' % task2.result()) print("bye") if __name__ == '__main__': RunPool()
**> 关键点说明:**1. task.cancel()当线程未运行之前可以取消,但是线程在线程池中启动后,无法再通过此方式取消2. pool.shutdown()函数入参 wait=True表示中断线程前,等待直到线程执行完成才会中断, wait=False 表示 线程未执行完成,强制中断
**> 适用业务:** 多进程下的多线程,某个任务可能需要等待很长时间,此时就需要中断此任务,或者去执行其他的任务,如果要求线性处理,中断超时任务,就需要用此方法 针对上述情况下的单个进程,可以使用异步进程池 multiprocessing的apply_async方式,主进程和子进程同时执行,互不影响。
总结:
#shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前