zoukankan      html  css  js  c++  java
  • python第十一天-----补:线程池

    低版本:

     1 #!/usr/bin/env python
     2 import threading
     3 import time
     4 import queue
     5 
     6 
     7 class TreadPool:
     8     """
     9     将线程加入到队列中作为资源去完成任务
    10     优点:简单好写容易理解
    11     缺点:太尼玛多了.....
    12     """
    13     def __init__(self, maxsize):
    14         self.maxsize = maxsize
    15         self._q = queue.Queue(maxsize)
    16         for i in range(maxsize):
    17             self._q.put(threading.Thread)
    18 
    19     def get_thread(self):
    20         return self._q.get()
    21 
    22     def add_thread(self):
    23         self._q.put(threading.Thread)
    24 
    25 
    26 def task(arg, p):
    27     print(arg)
    28     time.sleep(1)
    29     p.add_thread()
    30 
    31 pool = TreadPool(5)
    32 
    33 for i in range(100):
    34     t = pool.get_thread()
    35     obj = t(target=task, args=(i, pool))
    36     obj.start()

    高级版本:

      1 #!/usr/bin/env python
      2 # -*- coding:utf-8 -*-
      3 
      4 import queue        # 队列模块
      5 import threading    # 线程模块
      6 import contextlib   # 上下文模块
      7 import time         # 时间模块
      8 
      9 StopEvent = object()            # 创建一个停止时所需要用到的对象
     10 
     11 
     12 class ThreadPool(object):
     13     """
     14     线程池(用于放置任务,将任务作为队列中元素让线程去取得,可以复用线程减少开销)
     15     """
     16     def __init__(self, max_num, max_task_num=None):
     17         """
     18         构造方法
     19         :param max_num:
     20         :param max_task_num:所创建的队列内最大支持的任务个数
     21         """
     22         if max_task_num:
     23             self.q = queue.Queue(max_task_num)  # 指定队列任务数量则创建有限队列
     24         else:
     25             self.q = queue.Queue()              # 未指定队列任务数量则创建无限队列
     26         self.max_num = max_num                  # 每次使用的最大线程个数
     27         self.cancel = False                     # 任务取消,默认False,用于线程停止的判断
     28         self.terminal = False                   # 任务终止,默认False,用于线程池终止的判断
     29         self.generate_list = []                 # 定义一个已生成任务列表
     30         self.free_list = []                     # 定义一个空闲任务列表
     31 
     32     def run(self, func, args, callback=None):
     33         """
     34         线程池执行一个任务方法
     35         :param func: 传递进来的任务函数
     36         :param args: 任务函数使用的参数
     37         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数
     38                             1、任务函数执行状态;
     39                             2、任务函数返回值(默认为None,即:不执行回调函数)
     40         :return: 如果线程池已经终止,则返回True否则None
     41         """
     42         if self.cancel:                         # 如果条件为真则不会继续执行
     43             return
     44         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
     45             self.generate_thread()              # 如果现有空闲列表无元素并且已生成任务列表内
     46                                                 # 元素个数小于队列支持的最大数量则创建一个线程
     47         w = (func, args, callback,)             # 具体任务
     48         self.q.put(w)                           # 将任务放入队列当中
     49 
     50     def generate_thread(self):
     51         """
     52         创建一个线程方法
     53         """
     54         t = threading.Thread(target=self.call)  # 调用线程类创建一个线程,参数传递self.call方法
     55         t.start()                               # 线程开始
     56 
     57     def call(self):
     58         """
     59         循环去获取任务函数并执行任务函数
     60         """
     61         current_thread = threading.currentThread()      # 创建当前任务
     62         self.generate_list.append(current_thread)       # 生成任务列表添加当前任务
     63 
     64         event = self.q.get()                            # 事件获取
     65         while event != StopEvent:                       # 当前事件不是停止时执行
     66 
     67             func, arguments, callback = event           # 任务具体函数,参数获取
     68             try:
     69                 result = func(*arguments)               # 结果为任务处理的出的结果
     70                 success = True                          # 任务处理成功
     71             except Exception as e:
     72                 success = False                         # 任务处理失败
     73                 result = None                           # 结果为None
     74 
     75             if callback is not None:                    # 回调不为空
     76                 try:
     77                     callback(success, result)            # 将刚才执行结果返回
     78                 except Exception as e:
     79                     pass
     80 
     81             with self.worker_state(self.free_list, current_thread):
     82                 if self.terminal:                       # 如果线程池已经被终止
     83                     event = StopEvent                   # 事件变为空任务
     84                 else:   
     85                     event = self.q.get()                # 事件为正常任务
     86         else:
     87 
     88             self.generate_list.remove(current_thread)   # 生成任务列表移除当前任务
     89 
     90     def close(self):
     91         """
     92         执行完所有的任务后,所有线程停止
     93         """
     94         self.cancel = True                      # 线程停止,判定条件变为真
     95         full_size = len(self.generate_list)     # 获取还有几个在执行任务的线程
     96         while full_size:                        # 向队列中添加相应个数的空任务
     97             self.q.put(StopEvent)
     98             full_size -= 1
     99 
    100     def terminate(self):
    101         """
    102         无论是否还有任务,终止线程
    103         """
    104         self.terminal = True                    # 线程池关闭,判定条件变为真
    105 
    106         while self.generate_list:               # 当还有线程存在时放置空任务
    107             self.q.put(StopEvent)
    108 
    109         self.q.queue.clear()                    # 将队列中所有任务清空
    110 
    111     @contextlib.contextmanager
    112     def worker_state(self, state_list, worker_thread):
    113         """
    114         用于记录线程中正在等待的线程数
    115         """
    116         state_list.append(worker_thread)        # 等待状态列表中添加正在等待的线程数
    117         try:
    118             yield
    119         finally:
    120             state_list.remove(worker_thread)    # 移除正在等待的线程数
    121 
    122 
    123 
    124 # How to use
    125 
    126 
    127 pool = ThreadPool(5)                            # 创建一个每次支持5线程的线程池
    128 
    129 def callback(status, result):
    130     # status, execute action status
    131     # result, execute action return value
    132     pass
    133 
    134 
    135 def action(i):                                          # 任务函数
    136     print(i)
    137 
    138 for i in range(30):                                     # 使用线程池执行30次任务
    139     ret = pool.run(action, (i,), callback)
    140 
    141 time.sleep(1)                                           # 1秒等待
    142 print(len(pool.generate_list), len(pool.free_list))     # 打印线程池内当前任务个数及空任务个数
    143 pool.close()                                            # 线程停止
    144 pool.terminate()                                        # 线程池终止
  • 相关阅读:
    MS SQLSERVER 第三天
    MS SQLSERVER 第二天
    今天开始我的 MSSQLSERVER 之旅
    从今天开始就正式我的博客之旅
    mac 本地搭建mybatisGenerator代码生成环境
    idea中git远程版本回退
    Junit调试解决本地多线程异步调用
    Lambda表达式总结
    JDK8函数式编程之Stream API
    MySql分页查询慢的解决方案
  • 原文地址:https://www.cnblogs.com/bfmq/p/5912240.html
Copyright © 2011-2022 走看看