zoukankan      html  css  js  c++  java
  • 线程池的定义方法

    方法一 :low版的线程池,没有重复利用创建的线程

    from multiprocessing import Pool
    import queue,threading,time
    class ThreadPool(object):
        def __init__(self,max_num=20):
            self.queue = queue.Queue(max_num)#创建一个长度最大为20的队列
            for i in range(max_num):
                # 创建一个由20个位置的队列,队列中每个位置都放了个类名(threading.Thread类),并且这个类名都指向同一个类存地址
                #类只有一个
                self.queue.put(threading.Thread)
        #拿走一个位置
        def get_thread(self):
            return self.queue.get()#把队列中的元素取完了,就等待队列中重新有元素
        #把拿走的还回去
        def add_thread(self):
            self.queue.put(threading.Thread)
    
    上面是主干
    pool = ThreadPool(10)
    #在队列中创建了类,只是类
    def func(args,p):
        print(args)
        time.sleep(2)
        p.add_thread()#增加了threading.Thread类
    #拿走的去干活
    for i in range(10):
        #获得类
       ret = pool.get_thread()#获取threading.Thread的一个位置后队列变为19个长度
        #对象=类(),对象是线程,对象去哪了(没了,去内存了,等待被销毁【没人用了去垃圾回收站】)
       t =ret(target=func,args=(i,pool))
       t.start()

    #方法二 :队列,放任务;线程一次次的去取任务

      主要部分:  

    def call(self):#自己拿任务(元组),执行完再去拿 """ 循环去获取任务函数并执行任务函数 :return: """ current_thread = threading.currentThread()#获取当前线程 self.generate_list.append(current_thread) #取任务并执行 event = self.q.get()#第一个任务 while event != StopEvent:#如果不是停止标志,就是任务或元组包 #是元组==》是任务 # 解开任务包 #执行任务 #再去取任务(没有任务就等待) #标记:我空闲了 self.free_list.append(current_thread)#空闲,添加当前任务 event = self.q.get()#第二个任务 ,有任务就执行,没有任务就标志:我空闲了
                self.free_list.remove(current_thread)
            else:
                #不是元组,不是任务,就是停止标志
                self.generate_list.remove(current_thread)#线程对象就扔内存地址里,变成了垃圾
    from multiprocessing import Pool
    import queue,threading,time
    import  contextlib
    StopEvent = object()#全局变量 停止标志
    
    class ThreadPool(object):
        def __init__(self, max_num):
            self.q = queue.Queue()#对象中创建了队列,可放无限多的任务
            self.max_num = max_num#最多创建的线程数(线程池最大容量)
            self.terminal = False
            self.generate_list =[]#真实创建的线程列表
            self.free_list = []#空闲线程数量
        def run(self,func,args,callback=None):
            """
             线程池执行一个任务
            :param func:任务函数
            :param args:任务函数所需要的参数
            :param callback:任务执行失败或成功后执行回调函数,回调函数有两个参数1、任务函数执行状态;
            2、任务函数返回值(默认为None,即:不执行回调函数)
            :return:如果线程池已经终止,则返回True否则None
            """
            if len(self.free_list) == 0 and len(self.generate_list)<self.max_num:
                self.generate_thread()
            w  = (func,args,callback,)
            self.q.put(w)
        def  generate_thread(self):
            """
            创建一个线程
            :return:
            """
            t =threading.Thread(target=self.call)#每个线程执行call方法
            t.start()
        def call(self):#自己拿任务(元组),执行完再去拿
            """
            循环去获取任务函数并执行任务函数
            :return:
            """
            current_thread = threading.currentThread()#获取当前线程
            self.generate_list.append(current_thread)
            #取任务并执行
            event = self.q.get()#第一个任务
            while event != StopEvent:#如果不是停止标志,就是任务或元组包
                #是元组==》是任务
                # 解开任务包
                #执行任务
                #再去取任务(没有任务就等待)
                func,arguments,callback = event#解开任务包
                ret = func(arguments)#如果出错就提示
                try:
                    result = func(*arguments)
                    success = True
                except Exception as e:
                    success = False
                    ret = e
                if callback is not None:#如果没有回调函数直接pass,否则就执行,
                    try:
                        callback(success, ret)#回调函数出现的错误与否,进行下一步操作
                    except Exception as e:
                        pass
                self.free_list.append(current_thread)
                event = self.q.get()#第二个任务 ,标记为:我空闲了
                self.free_list.remove(current_thread)
                #  #相当于上面三句,利用上下文管理
                # with self.worker_state(self.free_list,current_thread):
                #     if self.terminal:
                #         event = StopEvent
                #     else:
                #           event =self.q.get()#有任务就执行,没有任务就标志:我空闲了
            else:
                #不是元组,不是任务,就是停止标志
                self.generate_list.remove(current_thread)#线程对象就扔内存地址里,变成了垃圾
    
        # def close(self):
        #     """
        #     执行完所有的任务后,所有线程停止
        #     """
        #     self.cancel = True
        #     full_size = len(self.generate_list)
        #     while full_size:
        #         self.q.put(StopEvent)
        #         full_size -= 1
        #
        # def terminate(self):
        #     """
        #     无论是否还有任务,终止线程
        #     """
        #     self.terminal = True
        #
        #     while self.generate_list:
        #         self.q.put(StopEvent)
        #
        #     self.q.queue.clear()
        #
        # @contextlib.contextmanager
        # def worker_state(self, state_list, worker_thread):
        #     """
        #     用于记录线程中正在等待的线程数
        #     """
        #     state_list.append(worker_thread)
        #     try:
        #         yield
        #     finally:
        #         state_list.remove(worker_thread)
    
    #执行init方法
    pool = ThreadPool(5)
    
    # def callback(status, result):
    #     # status, execute action status
    #     # result, execute action return value
    #     pass
    def action(i):
        print(i)
    
    for i in range(10):#300个循环就是300个任务
        #将任务放在队列中
        #着手开始处理任务:
        #   —1.创建线程(创建线程是有限制的)
        #        - 有空闲线程,则不再创建线程
        #        - 不能高于线程池的限制
        #        - 根据任务个数去判断
        #   —2.线程去队列中取任务
         pool.run(func=action,args=(i,),)#任务的集合
    time.sleep(5)
    print(len(pool.generate_list), len(pool.free_list))
    # pool.close()
    # pool.terminate()
    程序没有停下,挂起在,红色部分关闭线程。

    上下文管理:https://docs.python.org/2/library/contextlib.html

    人的思维,观点是一直都会进化的。如果现在的思维,观点看法;和十年前的一模一样的话。那就可以说,你这十年是白活的。
  • 相关阅读:
    如何通过命令行窗口查看sqlite数据库文件
    eclipse自动补全的设置
    文本装饰
    注释和特殊符号
    文本装饰
    网页背景
    通过ArcGIS Server admin 查看和删除已注册的 Web Adaptor
    通过 ArcGIS Server Manager 查看已安装的 Web Adaptor
    通过 ArcGIS Server Manager 验证 DataStore
    Windows上安装ArcGIS Enterprise——以 Windows Server 2012 R2上安装 ArcGIS 10.8为例
  • 原文地址:https://www.cnblogs.com/liuzhiyun/p/7439995.html
Copyright © 2011-2022 走看看