需求
加入我们需要处理一串个位数(0~9),奇数时需要循环打印它;偶数则等待对应时长并完成所有任务;0则是错误,但不需要终止任务,可以自定义一些处理。
关键点
定义func函数处理需求
callback处理返回结果,只有偶数和0返回;奇数会一直执行;要控制线程池状态,则需要针对偶数和0时抛出异常,并捕获异常处理。
threadpool定义线程池并发
实现
# -*- coding: utf-8 -*-
from threadpool import makeRequests, ThreadPool
import time
from multiprocessing import Process
异常定义和特殊值(0)定义
class Finish(SyntaxWarning):
pass
class PauseInfo(SyntaxWarning):
pass
pause_num = 0
func函数定义
0时返回False,其他偶数返回True
def func(para):
if para == pause_num:
print('start for %d and wait %ds' % (para, 4))
time.sleep(4)
print('error bcs ',para)
return False
if para % 2 == 0:
print('start for %d and wait %ds' % (para, para))
time.sleep(para)
print('stop for', para)
return True
while True:
print('continue for', para)
time.sleep(para)
callback定义
def callback(request, result):
if result:
raise Finish
else:
raise PauseInfo
线程池处理
Finish标识任务完成,使用sys.exit退出线程池处理;
def main_thread(paras):
pool = ThreadPool(10)
requests = makeRequests(callable_=func, args_list=paras, callback=callback)
[pool.putRequest(req) for req in requests]
while True:
try:
pool.wait()
except Finish as e:
sys.exit(0)
except PauseInfo as e:
print('Pause bcs %d but will continue' % pause_num)
except Exception as e:
print('Unknown error so will quit')
sys.exit(1)
主函数起一个测试进程
if __name__ == '__main__':
while True:
s = input('Input number list to test and any other word to quit ')
paras = []
for para in s:
if para.isnumeric():
paras.append(int(para))
else:
break
try:
thread_test = Process(target=main_thread, args=(paras,))
thread_test.start()
thread_test.join(timeout=20)
except TimeoutError as e:
print('task timeout')
except Exception as e:
print('unknow error:',e)
结果验证
处理108,看打印可以看到,1被循环处理,0处理的时候报错;8处理完毕则任务结束
Input number list to test and any other word to quit
108
continue for 1
start for 0 and wait 4s
start for 8 and wait 8s
continue for 1
continue for 1
continue for 1
error bcs 0
continue for 1
Pause bcs 0 but will continue
continue for 1
continue for 1
continue for 1
continue for 1
stop for 8
Input number list to test and any other word to quit