zoukankan      html  css  js  c++  java
  • Python 实现线程池

    前言:
    关于线程池(thread pool)的概念请参考http://en.wikipedia.org/wiki/Thread_pool_pattern。在Python中使用线程是有硬伤的,因为Python(这里指C语言实现的Python)的基本调用都最后生成对应C语言的函 数调用,因此Python中使用线程 的开销太大,不过可以使用Stackless Python(Python的一个修改 版)来增强Python中使用线程 的表现。
    同时由于Python中 GIL的存在,导制在使用多CPU时Python无 法充分利用多个CPU,目前pysco这个模块可以针对多CPU提高Python的效率。

    在C语言里要实现个线程池,就要面对一堆的指针,还有pthread这个库中那些看起来很让人头痛的一些函数:
    int pthread_create(pthread_t *restrict thread,
    const pthread_attr_t *restrict attr,
    void *(*start_routine)(void*), void *restrict arg);

    而如果用Python来实现一个 线程池的话就好多了,不仅结构十分清晰,而且代码看起来会很优美


    PYTHON:
    代码
      1 
      2 
      3    1.
      4       import  threading
      5    2.
      6       from  time  import  sleep
      7    3.
      8        
      9    4.
     10       class  ThreadPool:
     11    5.
     12        
     13    6.
     14            """Flexible  thread  pool  class.   Creates  a  pool  of  threads,  then
     15    7.
     16           accepts  tasks  that  will  be  dispatched  to  the  next  available
     17    8.
     18           thread."""
     19    9.
     20            
     21   10.
     22            def  __init__(self,  numThreads):
     23   11.
     24        
     25   12.
     26                """Initialize  the  thread  pool  with  numThreads  workers."""
     27   13.
     28                
     29   14.
     30                self.__threads  =  []
     31   15.
     32                self.__resizeLock  =  threading.Condition(threading.Lock())
     33   16.
     34                self.__taskLock  =  threading.Condition(threading.Lock())
     35   17.
     36                self.__tasks  =  []
     37   18.
     38                self.__isJoining  =  False
     39   19.
     40                self.setThreadCount(numThreads)
     41   20.
     42        
     43   21.
     44            def  setThreadCount(self,  newNumThreads):
     45   22.
     46        
     47   23.
     48                """  External  method  to  set  the  current  pool  size.   Acquires
     49   24.
     50               the  resizing  lock,  then  calls  the  internal  version  to  do  real
     51   25.
     52               work."""
     53   26.
     54                
     55   27.
     56                #  Can't  change  the  thread  count  if  we're  shutting  down  the  pool!
     57   28.
     58                if  self.__isJoining:
     59   29.
     60                    return  False
     61   30.
     62                
     63   31.
     64                self.__resizeLock.acquire()
     65   32.
     66                try:
     67   33.
     68                    self.__setThreadCountNolock(newNumThreads)
     69   34.
     70                finally:
     71   35.
     72                    self.__resizeLock.release()
     73   36.
     74                return  True
     75   37.
     76        
     77   38.
     78            def  __setThreadCountNolock(self,  newNumThreads):
     79   39.
     80                
     81   40.
     82                """Set  the  current  pool  size,  spawning  or  terminating  threads
     83   41.
     84               if  necessary.   Internal  use  only;  assumes  the  resizing  lock  is
     85   42.
     86               held."""
     87   43.
     88                
     89   44.
     90                #  If  we  need  to  grow  the  pool,  do  so
     91   45.
     92                while  newNumThreads >  len(self.__threads):
     93   46.
     94                    newThread  =  ThreadPoolThread(self)
     95   47.
     96                    self.__threads.append(newThread)
     97   48.
     98                    newThread.start()
     99   49.
    100                #  If  we  need  to  shrink  the  pool,  do  so
    101   50.
    102                while  newNumThreads  <len(self.__threads):
    103   51.
    104                    self.__threads[0].goAway()
    105   52.
    106                    del  self.__threads[0]
    107   53.
    108        
    109   54.
    110            def  getThreadCount(self):
    111   55.
    112        
    113   56.
    114                """Return  the  number  of  threads  in  the  pool."""
    115   57.
    116                
    117   58.
    118                self.__resizeLock.acquire()
    119   59.
    120                try:
    121   60.
    122                    return  len(self.__threads)
    123   61.
    124                finally:
    125   62.
    126                    self.__resizeLock.release()
    127   63.
    128        
    129   64.
    130            def  queueTask(self,  task,  args=None,  taskCallback=None):
    131   65.
    132        
    133   66.
    134                """Insert  a  task  into  the  queue.   task  must  be  callable;
    135   67.
    136               args  and  taskCallback  can  be  None."""
    137   68.
    138                
    139   69.
    140                if  self.__isJoining  ==  True:
    141   70.
    142                    return  False
    143   71.
    144                if  not  callable(task):
    145   72.
    146                    return  False
    147   73.
    148                
    149   74.
    150                self.__taskLock.acquire()
    151   75.
    152                try:
    153   76.
    154                    self.__tasks.append((task,  args,  taskCallback))
    155   77.
    156                    return  True
    157   78.
    158                finally:
    159   79.
    160                    self.__taskLock.release()
    161   80.
    162        
    163   81.
    164            def  getNextTask(self):
    165   82.
    166        
    167   83.
    168                """  Retrieve  the  next  task  from  the  task  queue.   For  use
    169   84.
    170               only  by  ThreadPoolThread  objects  contained  in  the  pool."""
    171   85.
    172                
    173   86.
    174                self.__taskLock.acquire()
    175   87.
    176                try:
    177   88.
    178                    if  self.__tasks  ==  []:
    179   89.
    180                        return  (None,  None,  None)
    181   90.
    182                    else:
    183   91.
    184                        return  self.__tasks.pop(0)
    185   92.
    186                finally:
    187   93.
    188                    self.__taskLock.release()
    189   94.
    190            
    191   95.
    192            def  joinAll(self,  waitForTasks  =  True,  waitForThreads  =  True):
    193   96.
    194        
    195   97.
    196                """  Clear  the  task  queue  and  terminate  all  pooled  threads,
    197   98.
    198               optionally  allowing  the  tasks  and  threads  to  finish."""
    199   99.
    200                
    201  100.
    202                #  Mark  the  pool  as  joining  to  prevent  any  more  task  queueing
    203  101.
    204                self.__isJoining  =  True
    205  102.
    206        
    207  103.
    208                #  Wait  for  tasks  to  finish
    209  104.
    210                if  waitForTasks:
    211  105.
    212                    while  self.__tasks  !=  []:
    213  106.
    214                        sleep(.1)
    215  107.
    216        
    217  108.
    218                #  Tell  all  the  threads  to  quit
    219  109.
    220                self.__resizeLock.acquire()
    221  110.
    222                try:
    223  111.
    224                    self.__setThreadCountNolock(0)
    225  112.
    226                    self.__isJoining  =  True
    227  113.
    228        
    229  114.
    230                    #  Wait  until  all  threads  have  exited
    231  115.
    232                    if  waitForThreads:
    233  116.
    234                        for  t  in  self.__threads:
    235  117.
    236                            t.join()
    237  118.
    238                            del  t
    239  119.
    240        
    241  120.
    242                    #  Reset  the  pool  for  potential  reuse
    243  121.
    244                    self.__isJoining  =  False
    245  122.
    246                finally:
    247  123.
    248                    self.__resizeLock.release()
    249  124.
    250              
    251  125.
    252       class  ThreadPoolThread(threading.Thread):
    253  126.
    254             """  Pooled  thread  class.  """
    255  127.
    256            
    257  128.
    258            threadSleepTime  =  0.1
    259  129.
    260        
    261  130.
    262            def  __init__(self,  pool):
    263  131.
    264        
    265  132.
    266                """  Initialize  the  thread  and  remember  the  pool.  """
    267  133.
    268                
    269  134.
    270                threading.Thread.__init__(self)
    271  135.
    272                self.__pool  =  pool
    273  136.
    274                self.__isDying  =  False
    275  137.
    276                
    277  138.
    278            def  run(self):
    279  139.
    280        
    281  140.
    282                """  Until  told  to  quit,  retrieve  the  next  task  and  execute
    283  141.
    284               it,  calling  the  callback  if  any.   """
    285  142.
    286                
    287  143.
    288                while  self.__isDying  ==  False:
    289  144.
    290                    cmd,  args,  callback  =  self.__pool.getNextTask()
    291  145.
    292                    #  If  there's  nothing  to  do,  just  sleep  a  bit
    293  146.
    294                    if  cmd  is  None:
    295  147.
    296                        sleep(ThreadPoolThread.threadSleepTime)
    297  148.
    298                    elif  callback  is  None:
    299  149.
    300                        cmd(args)
    301  150.
    302                    else:
    303  151.
    304                        callback(cmd(args))
    305  152.
    306            
    307  153.
    308            def  goAway(self):
    309  154.
    310        
    311  155.
    312                """  Exit  the  run  loop  next  time  through."""
    313  156.
    314                
    315  157.
    316                self.__isDying  =  True
    317 
    318 


    这段100多行的代码完成了一个可动态改变的线程池,并且包含了详细的注释,这里是代码的出处。我觉得这段代码比 Python官方给出的那 个还要好些。他们实现的原理都是一样的,使用了一个队列(Queue)来存储任务。

    关于Python中线程同步的问题,这里有不错的介绍。

     

  • 相关阅读:
    Linux 下安装JDK1.8
    INSERT IGNORE 与INSERT INTO的区别
    linux安装redis 完整步骤
    从0开始 图论学习 广度优先搜索 链式前向星表示法
    从0开始 图论学习 深度优先遍历 链式前向星表示法
    从0开始 图论学习 链式前向星 最好的建图方法
    从0开始 图论学习 邻接表 STL vector
    从0开始 图论学习 前向星表示法
    数据结构实习
    数据结构实习
  • 原文地址:https://www.cnblogs.com/nsnow/p/1706794.html
Copyright © 2011-2022 走看看