zoukankan      html  css  js  c++  java
  • 线程池的定义方法

    方法一 :low版的线程池,没有重复利用创建的线程

    from multiprocessing import Pool
    import queue,threading,time
    class ThreadPool(object):
        def __init__(self,max_num=20):
            self.queue = queue.Queue(max_num)#创建一个长度最大为20的队列
            for i in range(max_num):
                # 创建一个由20个位置的队列,队列中每个位置都放了个类名(threading.Thread类),并且这个类名都指向同一个类存地址
                #类只有一个
                self.queue.put(threading.Thread)
        #拿走一个位置
        def get_thread(self):
            return self.queue.get()#把队列中的元素取完了,就等待队列中重新有元素
        #把拿走的还回去
        def add_thread(self):
            self.queue.put(threading.Thread)
    
    上面是主干
    pool = ThreadPool(10)
    #在队列中创建了类,只是类
    def func(args,p):
        print(args)
        time.sleep(2)
        p.add_thread()#增加了threading.Thread类
    #拿走的去干活
    for i in range(10):
        #获得类
       ret = pool.get_thread()#获取threading.Thread的一个位置后队列变为19个长度
        #对象=类(),对象是线程,对象去哪了(没了,去内存了,等待被销毁【没人用了去垃圾回收站】)
       t =ret(target=func,args=(i,pool))
       t.start()

    #方法二 :队列,放任务;线程一次次的去取任务

      主要部分:  

    def call(self):#自己拿任务(元组),执行完再去拿 """ 循环去获取任务函数并执行任务函数 :return: """ current_thread = threading.currentThread()#获取当前线程 self.generate_list.append(current_thread) #取任务并执行 event = self.q.get()#第一个任务 while event != StopEvent:#如果不是停止标志,就是任务或元组包 #是元组==》是任务 # 解开任务包 #执行任务 #再去取任务(没有任务就等待) #标记:我空闲了 self.free_list.append(current_thread)#空闲,添加当前任务 event = self.q.get()#第二个任务 ,有任务就执行,没有任务就标志:我空闲了
                self.free_list.remove(current_thread)
            else:
                #不是元组,不是任务,就是停止标志
                self.generate_list.remove(current_thread)#线程对象就扔内存地址里,变成了垃圾
    from multiprocessing import Pool
    import queue,threading,time
    import  contextlib
    StopEvent = object()#全局变量 停止标志
    
    class ThreadPool(object):
        def __init__(self, max_num):
            self.q = queue.Queue()#对象中创建了队列,可放无限多的任务
            self.max_num = max_num#最多创建的线程数(线程池最大容量)
            self.terminal = False
            self.generate_list =[]#真实创建的线程列表
            self.free_list = []#空闲线程数量
        def run(self,func,args,callback=None):
            """
             线程池执行一个任务
            :param func:任务函数
            :param args:任务函数所需要的参数
            :param callback:任务执行失败或成功后执行回调函数,回调函数有两个参数1、任务函数执行状态;
            2、任务函数返回值(默认为None,即:不执行回调函数)
            :return:如果线程池已经终止,则返回True否则None
            """
            if len(self.free_list) == 0 and len(self.generate_list)<self.max_num:
                self.generate_thread()
            w  = (func,args,callback,)
            self.q.put(w)
        def  generate_thread(self):
            """
            创建一个线程
            :return:
            """
            t =threading.Thread(target=self.call)#每个线程执行call方法
            t.start()
        def call(self):#自己拿任务(元组),执行完再去拿
            """
            循环去获取任务函数并执行任务函数
            :return:
            """
            current_thread = threading.currentThread()#获取当前线程
            self.generate_list.append(current_thread)
            #取任务并执行
            event = self.q.get()#第一个任务
            while event != StopEvent:#如果不是停止标志,就是任务或元组包
                #是元组==》是任务
                # 解开任务包
                #执行任务
                #再去取任务(没有任务就等待)
                func,arguments,callback = event#解开任务包
                ret = func(arguments)#如果出错就提示
                try:
                    result = func(*arguments)
                    success = True
                except Exception as e:
                    success = False
                    ret = e
                if callback is not None:#如果没有回调函数直接pass,否则就执行,
                    try:
                        callback(success, ret)#回调函数出现的错误与否,进行下一步操作
                    except Exception as e:
                        pass
                self.free_list.append(current_thread)
                event = self.q.get()#第二个任务 ,标记为:我空闲了
                self.free_list.remove(current_thread)
                #  #相当于上面三句,利用上下文管理
                # with self.worker_state(self.free_list,current_thread):
                #     if self.terminal:
                #         event = StopEvent
                #     else:
                #           event =self.q.get()#有任务就执行,没有任务就标志:我空闲了
            else:
                #不是元组,不是任务,就是停止标志
                self.generate_list.remove(current_thread)#线程对象就扔内存地址里,变成了垃圾
    
        # def close(self):
        #     """
        #     执行完所有的任务后,所有线程停止
        #     """
        #     self.cancel = True
        #     full_size = len(self.generate_list)
        #     while full_size:
        #         self.q.put(StopEvent)
        #         full_size -= 1
        #
        # def terminate(self):
        #     """
        #     无论是否还有任务,终止线程
        #     """
        #     self.terminal = True
        #
        #     while self.generate_list:
        #         self.q.put(StopEvent)
        #
        #     self.q.queue.clear()
        #
        # @contextlib.contextmanager
        # def worker_state(self, state_list, worker_thread):
        #     """
        #     用于记录线程中正在等待的线程数
        #     """
        #     state_list.append(worker_thread)
        #     try:
        #         yield
        #     finally:
        #         state_list.remove(worker_thread)
    
    #执行init方法
    pool = ThreadPool(5)
    
    # def callback(status, result):
    #     # status, execute action status
    #     # result, execute action return value
    #     pass
    def action(i):
        print(i)
    
    for i in range(10):#300个循环就是300个任务
        #将任务放在队列中
        #着手开始处理任务:
        #   —1.创建线程(创建线程是有限制的)
        #        - 有空闲线程,则不再创建线程
        #        - 不能高于线程池的限制
        #        - 根据任务个数去判断
        #   —2.线程去队列中取任务
         pool.run(func=action,args=(i,),)#任务的集合
    time.sleep(5)
    print(len(pool.generate_list), len(pool.free_list))
    # pool.close()
    # pool.terminate()
    程序没有停下,挂起在,红色部分关闭线程。

    上下文管理:https://docs.python.org/2/library/contextlib.html

    人的思维,观点是一直都会进化的。如果现在的思维,观点看法;和十年前的一模一样的话。那就可以说,你这十年是白活的。
  • 相关阅读:
    6 全局锁和表锁
    oracle ogg--ogg搭建过程中遇到的错误及处理
    5 深入浅出索引(下)
    4 深入浅出索引(上)
    oracle ogg 单实例双向-新增表,修改表结构(oracle-oracle
    oracle ogg 单实例双向复制搭建(oracle-oracle)--Oracle GoldenGate
    Iview 中 获取 Menu 导航菜单 选中的值
    idea中git分支的使用
    vue使用axios进行ajax请求
    web前端_Vue框架_设置浏览器上方的标题和图标
  • 原文地址:https://www.cnblogs.com/liuzhiyun/p/7439995.html
Copyright © 2011-2022 走看看