zoukankan      html  css  js  c++  java
  • 一个别人的线程池的编写

    自定义了线程的start方法,当启动线程的时候才初始化线程池,并根据线程池定义的数量和任务数量取min,

    而不是先开启定义的线程数等待命令,在一定程度上避免了空线程对内存的消耗。

    并且引入了contextlib.contextmanager ,这个装饰器使得函数可用使用with方法

    # -*- coding:utf-8 -*-
    from queue import Queue
    import contextlib
    import threading
    
    WorkerStop = object()
    
    
    class ThreadPool:
        workers = 0
    
        threadFactory = threading.Thread
        currentThread = staticmethod(threading.currentThread)
    
        def __init__(self, maxthreads=20, name=None):
    
            self.q = Queue(0)
            self.max = maxthreads
            self.name = name
            self.waiters = []
            self.working = []
    
        def start(self):
            needsize = self.q.qsize()
            while self.workers < min(self.max, needsize):
                self.startAWorker()
    
        def startAWorker(self):
            self.workers += 1
            name = "PoolThread-%s-%s" % (self.name or id(self), self.workers)
            newThread = self.threadFactory(target=self._worker, name=name)
            newThread.start()
    
        def callInThread(self, func, *args, **kw):
            self.callInThreadWithCallback(None, func, *args, **kw)
    
        def callInThreadWithCallback(self, onResult, func, *args, **kw):
            o = (func, args, kw, onResult)
            self.q.put(o)
    
        @contextlib.contextmanager
        def _workerState(self, stateList, workerThread):
            stateList.append(workerThread)
            try:
                yield
            finally:
                stateList.remove(workerThread)
    
        def _worker(self):
            ct = self.currentThread
            o = self.q.get()
            while o is not WorkerStop:
                with self._workerState(self.working, ct):
                    function, args, kwargs, onResult = o
                    del o
                    try:
                        result = function(*args, **kwargs)
                        success = True
                    except:
                        success = False
                        if onResult is None:
                            pass
    
                        else:
                            pass
    
                    del function, args, kwargs
    
                    if onResult is not None:
                        try:
                            onResult(success, result)
                        except:
                            # context.call(ctx, log.err)
                            pass
    
                    del onResult, result
    
                with self._workerState(self.waiters, ct):
                    o = self.q.get()
    
        def stop(self):
            while self.workers:
                self.q.put(WorkerStop)
                self.workers -= 1
    
    #下面使用例子
    def show(arg):
        import time
        time.sleep(1)
        print(arg)
    
    
    pool = ThreadPool(20)
    
    for i in range(3):
        pool.callInThread(show, i)
    
    pool.start()
    pool.stop()
  • 相关阅读:
    Map 循环出key 和 value
    Jquery Validate
    Cookie/Session机制详解
    Java根据sessionId获取Session对象
    在线用户统计二
    页面在线访问人数统计&&在线登录人数统计一
    在线会话管理
    oneworld元数据配置
    java map遍历
    黑马程序员——C语言位运算符
  • 原文地址:https://www.cnblogs.com/mmyy-blog/p/9568209.html
Copyright © 2011-2022 走看看