import threading,queue,time StopEvent = object() class Mythreadpool: def __init__(self,max_num): self.q = queue.Queue() self.max_num = max_num self.generate_list = [] self.free_list = [] def run(self,func,args,callback=None): w = func,args,callback self.q.put(w) if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() def generate_thread(self): t = threading.Thread(target=self.call) t.start() def call(self): current_thread = threading.currentThread self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func,args,callback = event try: ret = func(args) except Exception as e: ret = e try: callback(ret) except Exception as b: pass self.free_list.append(current_thread) event = self.q.get() self.free_list.remove(current_thread) else: self.generate_list.remove(current_thread) def close(self): mun = len(self.generate_list) while mun: self.q.put(StopEvent) mun -= 1 def fun(i): time.sleep(0.5) print(i) pool = Mythreadpool(15) for i in range(100): pool.run(func=fun,args=i,) pool.close()
#with 版本 import threading,queue,time,contextlib StopEvent = object() class Mythreadpool: def __init__(self,max_num): self.q = queue.Queue() self.max_num = max_num self.generate_list = [] self.free_list = [] def run(self,func,args,callback=None): w = func,args,callback self.q.put(w) if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() def generate_thread(self): t = threading.Thread(target=self.call) t.start() def call(self): current_thread = threading.currentThread self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func,args,callback = event try: ret = func(args) except Exception as e: ret = e try: callback(ret) except Exception as b: pass with self.with_func(self.free_list,current_thread): # self.free_list.append(current_thread) event = self.q.get() # self.free_list.remove(current_thread) else: self.generate_list.remove(current_thread) @contextlib.contextmanager def with_func(self,list_1,args): list_1.append(args) try: yield finally: list_1.remove(args) def close(self): mun = len(self.generate_list) while mun: self.q.put(StopEvent) mun -= 1 def fun(i): time.sleep(0.5) print(i) pool = Mythreadpool(15) for i in range(100): pool.run(func=fun,args=i,) pool.close()