方法一 :low版的线程池,没有重复利用创建的线程
from multiprocessing import Pool
import queue,threading,time
class ThreadPool(object):
def __init__(self,max_num=20):
self.queue = queue.Queue(max_num)#创建一个长度最大为20的队列
for i in range(max_num):
# 创建一个由20个位置的队列,队列中每个位置都放了个类名(threading.Thread类),并且这个类名都指向同一个类存地址
#类只有一个
self.queue.put(threading.Thread)
#拿走一个位置
def get_thread(self):
return self.queue.get()#把队列中的元素取完了,就等待队列中重新有元素
#把拿走的还回去
def add_thread(self):
self.queue.put(threading.Thread)
上面是主干
pool = ThreadPool(10)
#在队列中创建了类,只是类
def func(args,p):
print(args)
time.sleep(2)
p.add_thread()#增加了threading.Thread类
#拿走的去干活
for i in range(10):
#获得类
ret = pool.get_thread()#获取threading.Thread的一个位置后队列变为19个长度
#对象=类(),对象是线程,对象去哪了(没了,去内存了,等待被销毁【没人用了去垃圾回收站】)
t =ret(target=func,args=(i,pool))
t.start()
#方法二 :队列,放任务;线程一次次的去取任务
主要部分:
def call(self):#自己拿任务(元组),执行完再去拿
"""
循环去获取任务函数并执行任务函数
:return:
"""
current_thread = threading.currentThread()#获取当前线程
self.generate_list.append(current_thread)
#取任务并执行
event = self.q.get()#第一个任务
while event != StopEvent:#如果不是停止标志,就是任务或元组包
#是元组==》是任务
# 解开任务包
#执行任务
#再去取任务(没有任务就等待)
#标记:我空闲了
self.free_list.append(current_thread)#空闲,添加当前任务
event = self.q.get()#第二个任务 ,有任务就执行,没有任务就标志:我空闲了
self.free_list.remove(current_thread)
else:
#不是元组,不是任务,就是停止标志
self.generate_list.remove(current_thread)#线程对象就扔内存地址里,变成了垃圾
from multiprocessing import Pool
import queue,threading,time
import contextlib
StopEvent = object()#全局变量 停止标志
class ThreadPool(object):
def __init__(self, max_num):
self.q = queue.Queue()#对象中创建了队列,可放无限多的任务
self.max_num = max_num#最多创建的线程数(线程池最大容量)
self.terminal = False
self.generate_list =[]#真实创建的线程列表
self.free_list = []#空闲线程数量
def run(self,func,args,callback=None):
"""
线程池执行一个任务
:param func:任务函数
:param args:任务函数所需要的参数
:param callback:任务执行失败或成功后执行回调函数,回调函数有两个参数1、任务函数执行状态;
2、任务函数返回值(默认为None,即:不执行回调函数)
:return:如果线程池已经终止,则返回True否则None
"""
if len(self.free_list) == 0 and len(self.generate_list)<self.max_num:
self.generate_thread()
w = (func,args,callback,)
self.q.put(w)
def generate_thread(self):
"""
创建一个线程
:return:
"""
t =threading.Thread(target=self.call)#每个线程执行call方法
t.start()
def call(self):#自己拿任务(元组),执行完再去拿
"""
循环去获取任务函数并执行任务函数
:return:
"""
current_thread = threading.currentThread()#获取当前线程
self.generate_list.append(current_thread)
#取任务并执行
event = self.q.get()#第一个任务
while event != StopEvent:#如果不是停止标志,就是任务或元组包
#是元组==》是任务
# 解开任务包
#执行任务
#再去取任务(没有任务就等待)
func,arguments,callback = event#解开任务包
ret = func(arguments)#如果出错就提示
try:
result = func(*arguments)
success = True
except Exception as e:
success = False
ret = e
if callback is not None:#如果没有回调函数直接pass,否则就执行,
try:
callback(success, ret)#回调函数出现的错误与否,进行下一步操作
except Exception as e:
pass
self.free_list.append(current_thread)
event = self.q.get()#第二个任务 ,标记为:我空闲了
self.free_list.remove(current_thread)
# #相当于上面三句,利用上下文管理
# with self.worker_state(self.free_list,current_thread):
# if self.terminal:
# event = StopEvent
# else:
# event =self.q.get()#有任务就执行,没有任务就标志:我空闲了
else:
#不是元组,不是任务,就是停止标志
self.generate_list.remove(current_thread)#线程对象就扔内存地址里,变成了垃圾
# def close(self):
# """
# 执行完所有的任务后,所有线程停止
# """
# self.cancel = True
# full_size = len(self.generate_list)
# while full_size:
# self.q.put(StopEvent)
# full_size -= 1
#
# def terminate(self):
# """
# 无论是否还有任务,终止线程
# """
# self.terminal = True
#
# while self.generate_list:
# self.q.put(StopEvent)
#
# self.q.queue.clear()
#
# @contextlib.contextmanager
# def worker_state(self, state_list, worker_thread):
# """
# 用于记录线程中正在等待的线程数
# """
# state_list.append(worker_thread)
# try:
# yield
# finally:
# state_list.remove(worker_thread)
#执行init方法
pool = ThreadPool(5)
# def callback(status, result):
# # status, execute action status
# # result, execute action return value
# pass
def action(i):
print(i)
for i in range(10):#300个循环就是300个任务
#将任务放在队列中
#着手开始处理任务:
# —1.创建线程(创建线程是有限制的)
# - 有空闲线程,则不再创建线程
# - 不能高于线程池的限制
# - 根据任务个数去判断
# —2.线程去队列中取任务
pool.run(func=action,args=(i,),)#任务的集合
time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
# pool.close()
# pool.terminate()
程序没有停下,挂起在,红色部分关闭线程。
上下文管理:https://docs.python.org/2/library/contextlib.html