自己在项目的开发中,一般能避免在单个进程中使用多线程就尽量把每个线程包装成独立的进程执行,通过socket或者一些中间件比如redis进行通讯,工作,协调。
但有时候必须涉及到多线程操作,而且碰到的情况中,多个线程必须协调全部正常工作才能执行逻辑,但子线程有着自己的栈区,报错了并不影响其它的线程,导致整个进程无法退出。
我当时想到的有两种思路,一种是多个线程间进行通讯或者一个全局变量的标记,当报错的时候,就修改这个标记,所有的子线程定时去查询这个标记,但感觉这个思路的拓展性太差,而且每个子线程需要主动定期查询或者通讯,太麻烦了。
后面一种就是我准备上代码的思路, 将所有的子线程设计成守护线程,主线程循环查询子线程的状态值,当发现任意的子线程状态异常,获取该子线程的异常对象,并上浮,退出主线程,导致所有的子线程退出。
import threading, traceback
import time
class ExcThread(threading.Thread):
def __init__(self, call_obj, *args, **kwargs):
super(ExcThread, self).__init__(*args, **kwargs)
self.callable_obj = call_obj
# 自己设置的退出状态值
self.exit_code = 0
self.exception = None
self.exc_traceback = ''
# 主动设置为守护线程,必须条件
self.setDaemon(True)
def run(self):
try:
self._run()
except Exception as e:
self.exit_code = 1
# 存储异常对象保存在实例对象中
self.exception = e
self.exc_traceback = traceback.format_exc()
def _run(self):
try:
self.callable_obj(*self._args, **self._kwargs)
except Exception as e:
raise e
def t_func(name, age=18):
while 1:
print(name, age)
time.sleep(3)
if age == 1:
raise Exception('hee')
# 生成一份子线程列表对象,用于主线程轮询检查使用
def start_child_thread():
thread_task_list = []
for i in range(3):
f = ExcThread(call_obj=t_func, args=('sidian',), kwargs={'age': i})
f.start()
thread_task_list.append(f)
return thread_task_list
def check_thread():
t_list = start_child_thread()
while 1:
for task in t_list:
if not task.is_alive():
raise task.exception
time.sleep(1)
if __name__ == '__main__':
check_thread()