zoukankan      html  css  js  c++  java
  • python 五——自定义线程池

    内容概要:

    1.low版线程池

    2.绝版线程池


    1.low版线程池

    设计思路:运用队列queue

    将线程类名放入队列中,执行一个就拿一个出来

     1 import queue
     2 import threading
     3 
     4 
     5 class ThreadPool(object):
     6 
     7     def __init__(self, max_num=20):
     8         self.queue = queue.Queue(max_num)  #创建队列,最大数为20
     9         for i in range(max_num):
    10             self.queue.put(threading.Thread) #将类名放入队列中
    11 
    12     def get_thread(self):
    13         return self.queue.get() #从队列中取出类名
    14 
    15     def add_thread(self):
    16         self.queue.put(threading.Thread) #进类名放入队列中
    17 
    18 def func(arg, p):  #定义一个函数
    19     print(arg)
    20     import time
    21     time.sleep(2)
    22     p.add_thread()
    23 
    24 
    25 pool = ThreadPool(10) #创建对象,并执行该类的构造方法,即将线程的类名放入队列中
    26 
    27 for i in range(30):
    28     thread = pool.get_thread() #调用该对象的get_thread方法,取出类名
    29     t = thread(target=func, args=(i, pool)) #创建对象,执行func,参数在args中
    30     t.start()

    由于此方法要求使用者修改原函数,并在原函数里传参数,且调用方法也发生了改变,并且有空闲线程浪费资源,实际操作中并不方便,故设计了下一版线程池。

     

    2.绝版线程池

    设计思路:运用队列queue

    a.队列里面放任务

    b.线程一次次去取任务,线程一空闲就去取任务

      1 import queue
      2 import threading
      3 import contextlib
      4 import time
      5 
      6 StopEvent = object()
      7 
      8 
      9 class ThreadPool(object):
     10 
     11     def __init__(self, max_num, max_task_num = None):
     12         if max_task_num:
     13             self.q = queue.Queue(max_task_num)
     14         else:
     15             self.q = queue.Queue()
     16         self.max_num = max_num
     17         self.cancel = False
     18         self.terminal = False
     19         self.generate_list = []
     20         self.free_list = []
     21 
     22     def run(self, func, args, callback=None):
     23         """
     24         线程池执行一个任务
     25         :param func: 任务函数
     26         :param args: 任务函数所需参数
     27         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
     28         :return: 如果线程池已经终止,则返回True否则None
     29         """
     30         if self.cancel:
     31             return
     32         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
     33             self.generate_thread()
     34         w = (func, args, callback,)
     35         self.q.put(w)
     36 
     37     def generate_thread(self):
     38         """
     39         创建一个线程
     40         """
     41         t = threading.Thread(target=self.call)
     42         t.start()
     43 
     44     def call(self):
     45         """
     46         循环去获取任务函数并执行任务函数
     47         """
     48         current_thread = threading.currentThread()
     49         self.generate_list.append(current_thread)
     50 
     51         event = self.q.get()
     52         while event != StopEvent:
     53 
     54             func, args, callback = event
     55             try:
     56                 result = func(*args)
     57                 success = True
     58             except Exception as e:
     59                 success = False
     60                 result = None
     61 
     62             if callback is not None:
     63                 try:
     64                     callback(success, result)
     65                 except Exception as e:
     66                     pass
     67 
     68             with self.worker_state(self.free_list, current_thread):
     69                 if self.terminal:
     70                     event = StopEvent
     71                 else:
     72                     event = self.q.get()
     73         else:
     74 
     75             self.generate_list.remove(current_thread)
     76 
     77     def close(self):
     78         """
     79         执行完所有的任务后,所有线程停止
     80         """
     81         self.cancel = True
     82         count = len(self.generate_list)
     83         while count:
     84             self.q.put(StopEvent)
     85             count -= 1
     86 
     87     def terminate(self):
     88         """
     89         无论是否还有任务,终止线程
     90         """
     91         self.terminal = True
     92 
     93         while self.generate_list:
     94             self.q.put(StopEvent)
     95 
     96         self.q.queue.clear()
     97 
     98     @contextlib.contextmanager
     99     def worker_state(self, state_list, worker_thread):
    100         """
    101         用于记录线程中正在等待的线程数
    102         """
    103         state_list.append(worker_thread)
    104         try:
    105             yield
    106         finally:
    107             state_list.remove(worker_thread)
    108 
    109 
    110 
    111 # How to use
    112 
    113 pool = ThreadPool(5)
    114 
    115 def callback(status, result):
    116     # status, execute action status
    117     # result, execute action return value
    118     pass
    119 
    120 def action(i):
    121     print(i)
    122 
    123 for i in range(30):
    124     ret = pool.run(action, (i,), callback)
    125 
    126 time.sleep(3)
    127 print(len(pool.generate_list), len(pool.free_list))
    128 print(len(pool.generate_list), len(pool.free_list))
    129 pool.close()
    130 # pool.terminate()

     

  • 相关阅读:
    第十二周进度表
    第一个冲刺周期-第十天
    第一个冲刺周期-第九天
    团队作业—第二阶段06
    团队作业—第二阶段05
    团队作业—第二阶段04
    团队作业—第二阶段03
    团队作业—第二阶段02
    团队作业—第二阶段01
    对于风行小组第一阶段冲刺成果的概括
  • 原文地址:https://www.cnblogs.com/tangtingmary/p/7729998.html
Copyright © 2011-2022 走看看