zoukankan      html  css  js  c++  java
  • python线程池实现

    python 的线程池主要有threadpool,不过它并不是内置的库,每次使用都需要安装,而且使用起来也不是那么好用,所以自己写了一个线程池实现,每次需要使用直接import即可。其中还可以根据传入的特征量handlerkey来获取每个任务的结果。

    #!/bin/env python
    # -*- coding:utf-8 -*-
    
    """
    @lx
    created on 2016-04-14
    """
    
    import Queue
    import sys
    import threading
    import time
    import StringIO
    import traceback
    
    reload(sys)
    sys.setdefaultencoding("utf8")
    
    
    class MyThread(threading.Thread):
        """Background thread connected to the requests/results queues."""
        def __init__(self, workQueue, resultQueue, timeout=0.1, **kwds):
            threading.Thread.__init__(self, **kwds)
            self.setDaemon(True)
            self._workQueue = workQueue
            self._resultQueue = resultQueue
            self._timeout = timeout
            self._dismissed = threading.Event()
            self.start()
    
        def run(self):
            """Repeatedly process the job queue until told to exit."""
            while True:
                if self._dismissed.isSet():
                    break
    
                handlerKey = None  # unique key
                code = 0  # callback return code
                handlerRet = None
                errMsg = ""
    
                try:
                    callable, args, kwds = self._workQueue.get(True, self._timeout)
                except Queue.Empty:
                    continue
                except:
                    exceptMsg = StringIO.StringIO()
                    traceback.print_exc(file=exceptMsg)
                    errMsg = exceptMsg.getvalue()
                    code = 3301  # system error
                    self._resultQueue.put(
                            (handlerKey, code, (callable, args, kwds), errMsg))
                    break
    
                if self._dismissed.isSet():
                    self._workQueue.put((callable, args, kwds))
                    break
    
                try:
                    if "handlerKey" in kwds:
                        handlerKey = kwds["handlerKey"]
                    handlerRet = callable(*args, **kwds)  # block
                    self._resultQueue.put((handlerKey, code, handlerRet, errMsg))
                except:
                    exceptMsg = StringIO.StringIO()
                    traceback.print_exc(file=exceptMsg)
                    errMsg = exceptMsg.getvalue()
                    code = 3303
                    self._resultQueue.put((handlerKey, code, handlerRet, errMsg))
    
        def dismiss(self):
            """Sets a flag to tell the thread to exit when done with current job."""
            self._dismissed.set()
    
    
    class ThreadPool(object):
        def __init__(self, workerNums=3, timeout=0.1):
            self._workerNums = workerNums
            self._timeout = timeout
            self._workQueue = Queue.Queue()  # no maximum
            self._resultQueue = Queue.Queue()
            self.workers = []
            self.dismissedWorkers = []
            self._createWorkers(self._workerNums)
    
        def _createWorkers(self, workerNums):
            """Add num_workers worker threads to the pool."""
            for i in range(workerNums):
                worker = MyThread(self._workQueue, self._resultQueue,
                                  timeout=self._timeout)
                self.workers.append(worker)
    
        def _dismissWorkers(self, workerNums, _join=False):
            """Tell num_workers worker threads to quit after their current task."""
            dismissList = []
            for i in range(min(workerNums, len(self.workers))):
                worker = self.workers.pop()
                worker.dismiss()
                dismissList.append(worker)
    
            if _join:
                for worker in dismissList:
                    worker.join()
            else:
                self.dismissedWorkers.extend(dismissList)
    
        def _joinAllDissmissedWorkers(self):
            """
            Perform Thread.join() on all
            worker threads that have been dismissed.
            """
            for worker in self.dismissedWorkers:
                worker.join()
            self.dismissedWorkers = []
    
        def addJob(self, callable, *args, **kwds):
            self._workQueue.put((callable, args, kwds))
    
        def getResult(self, block=False, timeout=0.1):
            try:
                item = self._resultQueue.get(block, timeout)
                return item
            except Queue.Empty, e:
                return None
            except:
                raise
    
        def waitForComplete(self, timeout=0.1):
            """
            Last function. To dismiss all worker threads. Delete ThreadPool.
            :param timeout
            """
            while True:
                workerNums = self._workQueue.qsize()  # 释放掉所有线程
                runWorkers = len(self.workers)
    
                if 0 == workerNums:
                    time.sleep(timeout)  # waiting for thread to do job 
                    self._dismissWorkers(runWorkers)
                    break
                # if workerNums < runWorkers:  # 不能这样子乱取消
                #     self._dismissWorkers(runWorkers - workerNums)
                time.sleep(timeout)
            self._joinAllDissmissedWorkers()
    
    
    if "__main__" == __name__:
        test1 = """
        def doSomething(*args, **kwds):
            if "sleep" in kwds:
                sleep = kwds["sleep"]
            msgTxt = "sleep %fs.." % sleep
            time.sleep(sleep)
            return msgTxt
    
        for i in range(10):
            print doSomething(sleep=0.1, handlerKey="key-%d"%i)
    
        wm = ThreadPool(10)
        for i in range(10):
            wm.addJob(doSomething, sleep=1, handlerKey="key-%d"%i)
        wm.waitForComplete()
        for i in range(10):
            print wm.getResult()
        del wm
        """
        # test2 = """
    
        def doSomething_(*args, **kwds):
            sleep = int(args[0])
            msgTxt = "sleep %ds.." % sleep
            time.sleep(sleep)
            return msgTxt
    
    
        wm = ThreadPool(10)
        result = []
        for i in range(10):
            data = 5
            wm.addJob(doSomething_, data)
    
        while 1:
            res = wm.getResult()
            if res:
                result.append(res)
            if 10 == len(result):
                break
            print "sleep 0.1"
            time.sleep(0.1)
        print time.time()
        wm.waitForComplete()
        print time.time()
    # """

    原创文章,转载请备注原文地址 http://www.cnblogs.com/lxmhhy/p/6032924.html

    知识交流讨论请加qq群:180214441。谢谢合作

  • 相关阅读:
    ES6 class -- Class 的基本语法
    ES6 Promise --回调与Promise的对比、信任问题、错误处理、Promise的状态、以及Promise对象的常用方法
    移动端调试,手机缓存清不掉
    JAR 介绍-百度百科
    一致性哈希的基本概念
    Java线程池的配置
    java多线程面试题整理及答案(2018年)
    Java多线程面试题整理
    IntelliJ Idea 常用快捷键
    RESTful规范
  • 原文地址:https://www.cnblogs.com/lxmhhy/p/6032924.html
Copyright © 2011-2022 走看看