import time
from threading import Thread
import inspect
import ctypes
def _async_raise(tid, exctype):
"""Raises an exception in the threads with id tid"""
if not inspect.isclass(exctype):
raise TypeError("Only types can be raised (not instances)")
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
def stop_thread(thread):
_async_raise(thread.ident, SystemExit)
def func(name):
time_count = 0
for i in range(10):
time.sleep(1)
time_count = time_count + 1
print(f"{name} {time_count}秒")
def func2(name, t1):
time_count = 0
for i in range(10):
time.sleep(1)
time_count = time_count + 1
if time_count == 4:
stop_thread(t1)
print(f"{name} {time_count}秒")
# def func2():
# i1 = InnerFunc()
# i1.func("类")
class InnerFunc():
def func(self, name):
time_count = 0
for i in range(10):
time.sleep(1)
time_count = time_count + 1
print(f"Class {name} {time_count}秒")
if __name__ == '__main__':
# lines = inspect.getsourcelines(func2)
# print(lines)
print("-"*100)
t1 = Thread(target=func, args=("线程1",))
t2 = Thread(target=func2, args=("线程2击杀", t1))
t2.setDaemon(True)
t1.start()
t2.start()
# t_class.start()
# time.sleep(4)
# stop_thread(t1)
print("主线程结束")