低版本:
1 #!/usr/bin/env python 2 import threading 3 import time 4 import queue 5 6 7 class TreadPool: 8 """ 9 将线程加入到队列中作为资源去完成任务 10 优点:简单好写容易理解 11 缺点:太尼玛多了..... 12 """ 13 def __init__(self, maxsize): 14 self.maxsize = maxsize 15 self._q = queue.Queue(maxsize) 16 for i in range(maxsize): 17 self._q.put(threading.Thread) 18 19 def get_thread(self): 20 return self._q.get() 21 22 def add_thread(self): 23 self._q.put(threading.Thread) 24 25 26 def task(arg, p): 27 print(arg) 28 time.sleep(1) 29 p.add_thread() 30 31 pool = TreadPool(5) 32 33 for i in range(100): 34 t = pool.get_thread() 35 obj = t(target=task, args=(i, pool)) 36 obj.start()
高级版本:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import queue # 队列模块 5 import threading # 线程模块 6 import contextlib # 上下文模块 7 import time # 时间模块 8 9 StopEvent = object() # 创建一个停止时所需要用到的对象 10 11 12 class ThreadPool(object): 13 """ 14 线程池(用于放置任务,将任务作为队列中元素让线程去取得,可以复用线程减少开销) 15 """ 16 def __init__(self, max_num, max_task_num=None): 17 """ 18 构造方法 19 :param max_num: 20 :param max_task_num:所创建的队列内最大支持的任务个数 21 """ 22 if max_task_num: 23 self.q = queue.Queue(max_task_num) # 指定队列任务数量则创建有限队列 24 else: 25 self.q = queue.Queue() # 未指定队列任务数量则创建无限队列 26 self.max_num = max_num # 每次使用的最大线程个数 27 self.cancel = False # 任务取消,默认False,用于线程停止的判断 28 self.terminal = False # 任务终止,默认False,用于线程池终止的判断 29 self.generate_list = [] # 定义一个已生成任务列表 30 self.free_list = [] # 定义一个空闲任务列表 31 32 def run(self, func, args, callback=None): 33 """ 34 线程池执行一个任务方法 35 :param func: 传递进来的任务函数 36 :param args: 任务函数使用的参数 37 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数 38 1、任务函数执行状态; 39 2、任务函数返回值(默认为None,即:不执行回调函数) 40 :return: 如果线程池已经终止,则返回True否则None 41 """ 42 if self.cancel: # 如果条件为真则不会继续执行 43 return 44 if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 45 self.generate_thread() # 如果现有空闲列表无元素并且已生成任务列表内 46 # 元素个数小于队列支持的最大数量则创建一个线程 47 w = (func, args, callback,) # 具体任务 48 self.q.put(w) # 将任务放入队列当中 49 50 def generate_thread(self): 51 """ 52 创建一个线程方法 53 """ 54 t = threading.Thread(target=self.call) # 调用线程类创建一个线程,参数传递self.call方法 55 t.start() # 线程开始 56 57 def call(self): 58 """ 59 循环去获取任务函数并执行任务函数 60 """ 61 current_thread = threading.currentThread() # 创建当前任务 62 self.generate_list.append(current_thread) # 生成任务列表添加当前任务 63 64 event = self.q.get() # 事件获取 65 while event != StopEvent: # 当前事件不是停止时执行 66 67 func, arguments, callback = event # 任务具体函数,参数获取 68 try: 69 result = func(*arguments) # 结果为任务处理的出的结果 70 success = True # 任务处理成功 71 except Exception as e: 72 success = False # 任务处理失败 73 result = None # 结果为None 74 75 if callback is not None: # 回调不为空 76 try: 77 callback(success, result) # 将刚才执行结果返回 78 except Exception as e: 79 pass 80 81 with self.worker_state(self.free_list, current_thread): 82 if self.terminal: # 如果线程池已经被终止 83 event = StopEvent # 事件变为空任务 84 else: 85 event = self.q.get() # 事件为正常任务 86 else: 87 88 self.generate_list.remove(current_thread) # 生成任务列表移除当前任务 89 90 def close(self): 91 """ 92 执行完所有的任务后,所有线程停止 93 """ 94 self.cancel = True # 线程停止,判定条件变为真 95 full_size = len(self.generate_list) # 获取还有几个在执行任务的线程 96 while full_size: # 向队列中添加相应个数的空任务 97 self.q.put(StopEvent) 98 full_size -= 1 99 100 def terminate(self): 101 """ 102 无论是否还有任务,终止线程 103 """ 104 self.terminal = True # 线程池关闭,判定条件变为真 105 106 while self.generate_list: # 当还有线程存在时放置空任务 107 self.q.put(StopEvent) 108 109 self.q.queue.clear() # 将队列中所有任务清空 110 111 @contextlib.contextmanager 112 def worker_state(self, state_list, worker_thread): 113 """ 114 用于记录线程中正在等待的线程数 115 """ 116 state_list.append(worker_thread) # 等待状态列表中添加正在等待的线程数 117 try: 118 yield 119 finally: 120 state_list.remove(worker_thread) # 移除正在等待的线程数 121 122 123 124 # How to use 125 126 127 pool = ThreadPool(5) # 创建一个每次支持5线程的线程池 128 129 def callback(status, result): 130 # status, execute action status 131 # result, execute action return value 132 pass 133 134 135 def action(i): # 任务函数 136 print(i) 137 138 for i in range(30): # 使用线程池执行30次任务 139 ret = pool.run(action, (i,), callback) 140 141 time.sleep(1) # 1秒等待 142 print(len(pool.generate_list), len(pool.free_list)) # 打印线程池内当前任务个数及空任务个数 143 pool.close() # 线程停止 144 pool.terminate() # 线程池终止