zoukankan      html  css  js  c++  java
  • Python菜鸟之路:Python基础-线程池注释

    import sys
    import threading
    import Queue
    import traceback
    
    # 定义一些Exception,用于自定义异常处理
    
    class NoResultsPending(Exception):
        """All works requests have been processed"""
        pass
    
    class NoWorkersAvailable(Exception):
        """No worket threads available to process remaining requests."""
        pass
    
    def _handle_thread_exception(request, exc_info):
        """默认的异常处理函数,只是简单的打印"""
        traceback.print_exception(*exc_info)
    
    #classes 
    
    class WorkerThread(threading.Thread):
        """后台线程,真正的工作线程,从请求队列(requestQueue)中获取work,
        并将执行后的结果添加到结果队列(resultQueue)"""
        def __init__(self,requestQueue,resultQueue,poll_timeout=5,**kwds):
            threading.Thread.__init__(self,**kwds)
            '''设置为守护进行'''
            self.setDaemon(True)
            self._requestQueue = requestQueue
            self._resultQueue = resultQueue
            self._poll_timeout = poll_timeout
            '''设置一个flag信号,用来表示该线程是否还被dismiss,默认为false'''
            self._dismissed = threading.Event()
            self.start()
            
        def run(self):
            '''每个线程尽可能多的执行work,所以采用loop,
            只要线程可用,并且requestQueue有work未完成,则一直loop'''
            while True:
                if self._dismissed.is_set():
                    break
                try:
                    '''
                    Queue.Queue队列设置了线程同步策略,并且可以设置timeout。
                    一直block,直到requestQueue有值,或者超时
                    '''
                    request = self._requestQueue.get(True,self._poll_timeout)
                except Queue.Empty:
                    continue
                else:
                    '''之所以在这里再次判断dimissed,是因为之前的timeout时间里,很有可能,该线程被dismiss掉了'''
                    if self._dismissed.is_set():
                        self._requestQueue.put(request)
                        break
                    try:
                        '''执行callable,讲请求和结果以tuple的方式放入requestQueue'''
                        result = request.callable(*request.args,**request.kwds)
                        print self.getName()
                        self._resultQueue.put((request,result))
                    except:
                        '''异常处理'''
                        request.exception = True
                        self._resultQueue.put((request,sys.exc_info()))
        
        def dismiss(self):
            '''设置一个标志,表示完成当前work之后,退出'''
            self._dismissed.set()
    
    
    class WorkRequest:
        '''
        @param callable_:,可定制的,执行work的函数
        @param args: 列表参数
        @param kwds: 字典参数
        @param requestID: id
        @param callback: 可定制的,处理resultQueue队列元素的函数
        @param exc_callback:可定制的,处理异常的函数 
        '''
        def __init__(self,callable_,args=None,kwds=None,requestID=None,
                     callback=None,exc_callback=_handle_thread_exception):
            if requestID == None:
                self.requestID = id(self)
            else:
                try:
                    self.requestID = hash(requestID)
                except TypeError:
                    raise TypeError("requestId must be hashable")    
            self.exception = False
            self.callback = callback
            self.exc_callback = exc_callback
            self.callable = callable_
            self.args = args or []
            self.kwds = kwds or {}
            
        def __str__(self):
            return "WorkRequest id=%s args=%r kwargs=%r exception=%s" % 
                (self.requestID,self.args,self.kwds,self.exception)
                
    class ThreadPool:
        '''
        @param num_workers:初始化的线程数量
        @param q_size,resq_size: requestQueue和result队列的初始大小
        @param poll_timeout: 设置工作线程WorkerThread的timeout,也就是等待requestQueue的timeout
        '''
        def __init__(self,num_workers,q_size=0,resq_size=0,poll_timeout=5):
            self._requestQueue = Queue.Queue(q_size)
            self._resultQueue = Queue.Queue(resq_size)
            self.workers = []
            self.dismissedWorkers = []
            self.workRequests = {} #设置个字典,方便使用
            self.createWorkers(num_workers,poll_timeout)
    
        def createWorkers(self,num_workers,poll_timeout=5):
            '''创建num_workers个WorkThread,默认timeout为5'''
            for i in range(num_workers):
                self.workers.append(WorkerThread(self._requestQueue,self._resultQueue,poll_timeout=poll_timeout))                           
        
        def dismissWorkers(self,num_workers,do_join=False):
            '''停用num_workers数量的线程,并加入dismiss_list'''
            dismiss_list = []
            for i in range(min(num_workers,len(self.workers))):
                worker = self.workers.pop()
                worker.dismiss()
                dismiss_list.append(worker)
            if do_join :
                for worker in dismiss_list:
                    worker.join()
            else:
                self.dismissedWorkers.extend(dismiss_list)
        
        def joinAllDismissedWorkers(self):
            '''join 所有停用的thread'''
            #print len(self.dismissedWorkers)
            for worker in self.dismissedWorkers:
                worker.join()
            self.dismissedWorkers = []
        
        def putRequest(self,request ,block=True,timeout=None):
            assert isinstance(request,WorkRequest)
            assert not getattr(request,'exception',None)
            '''当queue满了,也就是容量达到了前面设定的q_size,它将一直阻塞,直到有空余位置,或是timeout'''
            self._requestQueue.put(request, block, timeout)
            self.workRequests[request.requestID] = request
            
        def poll(self,block = False):
            while True:
                if not self.workRequests:
                    raise NoResultsPending
                elif block and not self.workers:
                    raise NoWorkersAvailable
                try:
                    '''默认只要resultQueue有值,则取出,否则一直block'''
                    request , result = self._resultQueue.get(block=block)
                    if request.exception and request.exc_callback:
                        request.exc_callback(request,result)
                    if request.callback and not (request.exception and request.exc_callback):
                        request.callback(request,result)
                    del self.workRequests[request.requestID]
                except Queue.Empty:
                    break
        
        def wait(self):
            while True:
                try:
                    self.poll(True)
                except NoResultsPending:
                    break
        
        def workersize(self):
            return len(self.workers)
        
        def stop(self):
            '''join 所有的thread,确保所有的线程都执行完毕'''
            self.dismissWorkers(self.workersize(),True)
            self.joinAllDismissedWorkers()
    
    if __name__=='__main__':
        import random
        import time
        import datetime
        def do_work(data):
            time.sleep(random.randint(1,3))
            res = str(datetime.datetime.now()) + "" +str(data)
            return res
        
        def print_result(request,result):
            print "---Result from request %s : %r" % (request.requestID,result)
        
        main = ThreadPool(3)
        for i in range(40):
            req = WorkRequest(do_work,args=[i],kwds={},callback=print_result)
            main.putRequest(req)
            print "work request #%s added." % req.requestID
        
        print '-'*20, main.workersize(),'-'*20
        
        counter = 0
        while True:
            try:
                time.sleep(0.5)
                main.poll()
                if(counter==5):
                    print "Add 3 more workers threads"
                    main.createWorkers(3)
                    print '-'*20, main.workersize(),'-'*20
                if(counter==10):
                    print "dismiss 2 workers threads"
                    main.dismissWorkers(2)
                    print '-'*20, main.workersize(),'-'*20
                counter+=1
            except NoResultsPending:
                print "no pending results"
                break
        
        main.stop()
        print "Stop"
    线程池1-带注释
    # 线程池注释
    
    import queue
    import threading
    import contextlib
    import time
    
    StopEvent = object()
    
    
    class ThreadPool(object):
    
        def __init__(self, max_num, max_task_num = None):
            if max_task_num:
                self.q = queue.Queue(max_task_num) # 初始化队列
            else:
                self.q = queue.Queue() # 初始化队列
            self.max_num = max_num
            self.cancel = False
            self.terminal = False
            self.generate_list = [] # 定义列表,存储当前已经生成的线程
            self.free_list = [] # 定义列表,存储当前空闲线程
    
        def run(self, func, args, callback=None):
            """
            线程池执行一个任务
            :param func: 任务函数
            :param args: 任务函数所需参数
            :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
            :return: 如果线程池已经终止,则返回True否则None
            """
            if self.cancel:
                return
            if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: # 如果当前线程数未达到上限,且无空闲线程,则创建线程
                self.generate_thread()
            w = (func, args, callback,) # 生成任务元组
            self.q.put(w) # 将任务放到队列中
    
        def generate_thread(self):
            """
            创建一个线程
            """
            t = threading.Thread(target=self.call)
            t.start()
    
        def call(self):
            """
            循环去获取任务函数并执行任务函数
            """
            current_thread = threading.currentThread # 获取当前线程
            self.generate_list.append(current_thread) # 将获取到的线程加入到列表中
    
            event = self.q.get() # 从队列中获取任务元组
            while event != StopEvent: # 如果获取任务元组成功
    
                func, arguments, callback = event # 拆解任务元组
                try:
                    result = func(*arguments) # 执行任务
                    success = True
                except Exception as e: # 检测到任务执行异常,重置任务状态标识
                    success = False
                    result = None
    
                if callback is not None: # 如果callback函数已经定义,那么执行callback函数
                    try:
                        callback(success, result)
                    except Exception as e:
                        pass
                # 将当前线程设置为空闲
                # 等待任务来临--》有任务来了--》当前线程不再空闲
                # with块语句等价于
                # self.free_list.append(当前线程)
                # self.q.get()
                # self.free_list.remove(当前线程)
                with self.worker_state(self.free_list, current_thread): # 执行完任务后,再次取任务还是终止任务
                    if self.terminal:# 检测到终止信号
                        event = StopEvent # 设置终止事件,下次while循环时,会执行else语句
                    else:
                        event = self.q.get() # 阻塞等待任务
            else:
                self.generate_list.remove(current_thread) # 如果获取任务失败,说明任务已经全部执行完毕,则移除线程
    
        def close(self):
            """
            执行完所有的任务后,所有线程停止
            """
            self.cancel = True
            full_size = len(self.generate_list)
            while full_size: # 拒绝再加入新的任务,等待当前运行任务执行完毕后亭子
                self.q.put(StopEvent)
                full_size -= 1
    
        def terminate(self):
            """
            无论是否还有任务,终止线程
            """
            self.terminal = True # 如果线程列表中依然有线程,会去队列中get任务,但是此时检测到terminal信号,则循环取任务的过程被中断,线程被关闭、销毁
    
            while self.generate_list:
                self.q.put(StopEvent)
    
            self.q.empty()
    
        @contextlib.contextmanager # 将一个生成器函数转换成上下文管理器
        def worker_state(self, state_list, worker_thread):
            """
            用于记录线程中正在等待的线程数
            """
            state_list.append(worker_thread)
            try:
                yield # 遇到yield,就返回到执行event = self.q.get()
            finally: # 当event = self.q.get() 执行state_list.remove(worker_thread)
                state_list.remove(worker_thread)
    
    
    
    # How to use
    
    
    pool = ThreadPool(5)
    
    def callback(status, result):
        # status, execute action status
        # result, execute action return value
        pass
    
    
    def action(i):
        print(i)
    
    for i in range(30):
        ret = pool.run(action, (i,), callback)
    
    time.sleep(5)
    print(len(pool.generate_list), len(pool.free_list))
    print(len(pool.generate_list), len(pool.free_list))
    # pool.close()
    # pool.terminate()
  • 相关阅读:
    RabbitMQ
    虚拟化解决方案
    如何制作Windows镜像
    2018-2019-2 网络对抗技术 20165202 Exp9 Web安全基础
    2018-2019-2 网络对抗技术 20165202 Exp8 Web基础
    2018-2019-2 网络对抗技术 20165202 Exp7 网络欺诈防范
    2018-2019-2 网络对抗技术 20165202 Exp6 信息搜集与漏洞扫描
    2018-2019-2 网络对抗技术 20165202 Exp5 MSF基础应用
    2018-2019-2 网络对抗技术 20165202 Exp4 恶意代码分析
    2018-2019-2 网络对抗技术 20165202 Exp3 免杀原理与实践
  • 原文地址:https://www.cnblogs.com/jishuweiwang/p/5697610.html
Copyright © 2011-2022 走看看