zoukankan      html  css  js  c++  java
  • python之线程池

    #!/user/bin/evn python
    # -*- coding:utf-8 -*-
    
    import threading
    import queue,time
    
    '''
    线程池的思路:
    将任务依次放在队列里面
    然后从队列取出任务交给线程执行
    '''
    stopEvent=object()#任务完了的标志---下面我们将任务包封装到元组中
    class ThreadPool(object):
        def __init__(self,max_num):
            #创建队列
            self.q=queue.Queue()
            #创建线程最大的数量(线程池的最大容量)
            self.max_num=max_num
            #空闲线程列表(数量)
            self.free_list=[]
            #真实创建的线程列表(数量)
            self.gemerate_list=[]
            #中断任务标志
            self.terminal=False
    
            self.num=0
        def run(self,func,args,callback=None):
    
            #func:任务函数
            #args:任务函数的参数
            #callback:线程执行成功或者失败后执行的回调函数
            task=(func,args,callback)#将任务封装到元组中 ==任务包
            #将任务包放到队列中
            self.q.put(task)
            #创建线程
            if len(self.free_list)==0 and len(self.gemerate_list)<self.max_num :
                self.generate_thread()
    
    
        #创建线程
        def generate_thread(self):
            #创建一个线程
            t=threading.Thread(target=self.call)
            t.start()
    
        def call(self):
            '''
            循环去获取任务函数并执行任务函数
            '''
            #获取当前线程
            current_thread=threading.currentThread
            #将当前线程添加到列表中
            self.gemerate_list.append(current_thread)
    
            #获取任务
            Event=self.q.get()
    
            while Event!=stopEvent :#表示是任务
                #分解任务包
                func,args,callable=Event
                status=True #标志执行成功
                try:
                     #执行任务
                    ret=func(*args)
                except Exception as e:
                    status=False
                    ret=e
                #执行回调函数callback
                if callback==None:
                    pass
                else:
                    callback(status,ret)
    
                if self.terminal :#不终止任务
                    Event=stopEvent
                else:
                    #标记:我空闲了
                    self.free_list.append(current_thread)
                    #再从队列去取任务
                    Event=self.q.get()
                    #将线程从空闲列表中移除
                    self.free_list.remove(current_thread)
    
            else:#表示不是任务
                self.gemerate_list.remove(current_thread)
    
    
    
        #任务执行完毕后 停止运行
        def close(self):
             time.sleep(2)
             num=len(self.gemerate_list)
             print(num)
             while num:
                 self.q.put(stopEvent)#往队列添加停止标志(创建了多少线程,就添加多少)
                 num-=1
    
        def terminals(self):
            self.terminal=True#标记任务终止
            while self.gemerate_list:
                self.q.put(stopEvent)
            self.q.empty()#清空队列
    
    
    
    #回调函数
    def callback(statue,result):
        # print(statue)
        # print(result)
        pass
    
    #任务函数
    def action(args):
        time.sleep(1)
        print(args)
        return args
    
    #创建线程池对象
    pool=ThreadPool(10)
    for item in range(100):
        '''
        #将任务放在队列中
        #着手开始处理任务(线程处理)
            --创建线程
                  有空闲线程,则不再创建线程
                  没有空闲线程,开始创建线程
                        1.不能高于线程池的限制
                        2.根据任务来判断
            --去队列取任务
        '''
        pool.run(func=action,args=(item,),callback=callback)
    # pool.close()#任务执行完后
    #pool.terminals()终止任务
  • 相关阅读:
    Nginx编译安装及平滑升级
    alertmanager 分组,抑制, 静默
    alertmanager 邮件告警&自定义告警模板
    alertmanager 高可用
    程序运行报错UnicodeDecodeError: 'utf8' codec can't decode byte 0x89 in position 0: invalid start byte
    Postman 导入curl 、导出成curl、导出成对应语言代码
    Python 字符串操作(截取/替换/查找/分割)
    In testLogin: indirect fixture *** doesn‘t exist
    postman 传参传递二进制流文件
    开发经验01
  • 原文地址:https://www.cnblogs.com/wangbinbin/p/7495975.html
Copyright © 2011-2022 走看看