1 import queue 2 import threading 3 import contextlib 4 import time 5 6 StopEvent = object() 7 8 9 class ThreadPool(object): 10 11 def __init__(self, max_num): 12 13 self.q = queue.Queue() 14 #最多创建线程数 15 self.max_num = max_num 16 self.cancel = False 17 self.terminal = False 18 #真实创建的线程列表 19 self.generate_list = [] 20 #空闲的线程列表 21 self.free_list = [] 22 23 def run(self, func, args, callback=None): 24 """ 25 线程池执行一个任务 26 :param func: 任务函数 27 :param args: 任务函数所需参数 28 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) 29 :return: 如果线程池已经终止,则返回True否则None 30 """ 31 if self.cancel: 32 return 33 if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 34 self.generate_thread() 35 w = (func, args, callback,) 36 self.q.put(w) 37 38 def generate_thread(self): 39 """ 40 创建一个线程 41 """ 42 t = threading.Thread(target=self.call) 43 t.start() 44 45 def call(self): 46 """ 47 循环去获取任务函数并执行任务函数 48 """ 49 current_thread = threading.currentThread() 50 self.generate_list.append(current_thread) 51 52 event = self.q.get() 53 while event != StopEvent: 54 55 func, arguments, callback = event 56 try: 57 result = func(*arguments) 58 success = True 59 except Exception as e: 60 success = False 61 result = None 62 63 if callback is not None: 64 try: 65 callback(success, result) 66 except Exception as e: 67 pass 68 69 with self.worker_state(self.free_list, current_thread): 70 if self.terminal: 71 event = StopEvent 72 else: 73 event = self.q.get() 74 else: 75 76 self.generate_list.remove(current_thread) 77 78 def close(self): 79 """ 80 执行完所有的任务后,所有线程停止 81 """ 82 self.cancel = True 83 full_size = len(self.generate_list) 84 while full_size: 85 self.q.put(StopEvent) 86 full_size -= 1 87 88 def terminate(self): 89 """ 90 无论是否还有任务,终止线程 91 """ 92 self.terminal = True 93 94 while self.generate_list: 95 self.q.put(StopEvent) 96 97 self.q.queue.clear() 98 99 @contextlib.contextmanager 100 def worker_state(self, state_list, worker_thread): 101 """ 102 用于记录线程中正在等待的线程数 103 """ 104 state_list.append(worker_thread) 105 try: 106 yield 107 finally: 108 state_list.remove(worker_thread) 109 110 111 112 pool = ThreadPool(5) 113 114 def callback(status, result): 115 # status, execute action status 116 # result, execute action return value 117 pass 118 119 120 def action(i): 121 print(i) 122 123 for i in range(30): 124 ret = pool.run(action, (i,), callback) 125 126 time.sleep(5) 127 128 print(len(pool.generate_list), len(pool.free_list)) 129 print(len(pool.generate_list), len(pool.free_list)) 130 # pool.close() 131 pool.terminate()