zoukankan      html  css  js  c++  java
  • 线程池

    线程池

    #!/usr/bin/env python
    # coding=utf-8
    
    import queue
    import threading
    
    class ThreadPool:
        def __init__(self, maxsize):
            self.maxsize = maxsize
            self._q = queue.Queue(maxsize)
            for i in range(maxsize):
                self._q.put(threading.Thread)  # 队列里放5个类
    
        def get_thread(self):
            return self._q.get()
    
        def add_thread(self):
            self._q.put(threading.Thread)
    
    
    def task(arg, p):
        print(arg)
        p.add_thread()  # 执行完毕,在线程队列里再加一个线程,不然阻塞
    
    pool = ThreadPool(5)  # 最大个数5
    
    for i in range(100):   # 第六个会被阻塞,线程池里没线程了,所以线程推出后要新加线程,将创建的对象pool传入task,执行完毕后新增一个线程
        t = pool.get_thread()  # 获取线程类
        obj = t(target=task, args=(i, pool,))
        obj.start()
    
    low版线程池
    low版

    存在问题:

    • 线程没有被重用,原来线程执行完之后就死掉了,没有被重用,而是新建线程。
    • 线程池个数为5个,如果只创建两个线程,线程池利用率为五分之二,利用率不高
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    # 队列里面放任务 不放类了
    # 每一个任务都是一个函数
    # 元组(函数名,参数1) (函数名,参数2), (函数名,参数4)
    # 将上面东西放到队列中
    # 创建线程,默认情况下创建一个线程,源源不断取任务执行,执行完,不让它销毁,再取任务,while循环实现,重复利用线程
    # 
    
    import queue
    import threading
    import contextlib
    import time
    
    StopEvent = object()  # 终止线程
    
    
    class ThreadPool(object):
    
        def __init__(self, max_num, max_task_num = None):
            if max_task_num:
                self.q = queue.Queue(max_task_num)  # 指定任务最大数
            else:
                self.q = queue.Queue()
            self.max_num = max_num  # 最多多少线程
            self.cancel = False
            self.terminal = False
            self.generate_list = []  # 已创建线程
            self.free_list = []  # 空闲多少线程
    
        def run(self, func, args, callback=None):
            """
            线程池执行一个任务
            :param func: 任务函数
            :param args: 任务函数所需参数
            :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
            :return: 如果线程池已经终止,则返回True否则None
            """
            if self.cancel:
                return
            if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:  # 没有空闲线程并且已创建线程小于最大线程数才创建线程, 也就是说有空闲线程不用创建,已创建线程数等于最大线程也不创建,达到重复利用线程
    # 最大线程数5, 第一个任务来,没有空闲线程(没有线程),创建一个,第二个来,再创建一个。已经有两个线程正在执行任务,这时第三个来了,第一二任务执行完了,空闲线程为2个,第三个任务不用重新创建线程。
    # 五个任务一块来了,创建五个线程,第六个不创建了,放到任务队列里面
                self.generate_thread()  # 满足创建线程,
            w = (func, args, callback,)
            #  函数,元组,函数 ,将这三个参数放在元组里面,当成一个整体传到队列里面
            self.q.put(w)  # 满足创建线程,不然把任务放队列里面
    
    
        def generate_thread(self):
            """
            创建一个线程
            """
            t = threading.Thread(target=self.call)  # 每一个线程被创建,执行call方法
            t.start()
    
        def call(self):
            """
            循环去获取任务函数并执行任务函数
            """
            current_thread = threading.currentThread()
            self.generate_list.append(current_thread)  # 创建一个线程,在列表里将当前线程名加进已创建的线程列表
    
            event = self.q.get()  # 取出一个任务, 没任务子线程就阻塞,等待取到任务,主线程继续向下执行,执行run方法里面将任务put进对列, 然后取到任务继续向下执行
            while event != StopEvent:
    
                func, arguments, callback = event  # 取出
                try:
                    result = func(*arguments)  # 执行函数,并将参数传进去
                    success = True
                except Exception as e:
                    success = False
                    result = None
    
                if callback is not None:
                    try:
                        callback(success, result)
                    except Exception as e:
                        pass
    
                with self.worker_state(self.free_list, current_thread):  # 如果当前线程执行完任务后,将当前线程置于空闲状态,这个线程等待队列中下一个任务到来,如果没来,一直处于空闲, 如果到来,去任务
                    if self.terminal:
                        event = StopEvent
                    else:
                        event = self.q.get()   # 将当前任务加入到空闲列表后,如果有任务,取到,没有阻塞 取到后,移除当前线程
            else:
                self.generate_list.remove(current_thread)  # 如果是空值,移除线程 
    
        def close(self):
            """
            执行完所有的任务后,所有线程停止
            """
            self.cancel = True   # 标志设置为True
            full_size = len(self.generate_list)  # 已生成线程个数
            while full_size:
                self.q.put(StopEvent)  # 
                full_size -= 1
    
        def terminate(self):
            """
            无论是否还有任务,终止线程
            """
            self.terminal = True
    
            while self.generate_list:
                self.q.put(StopEvent)  # 传空值
    
            self.q.queue.clear()
    
        @contextlib.contextmanager
        def worker_state(self, state_list, worker_thread):
            """
            用于记录线程中正在等待的线程数
            """
            state_list.append(worker_thread)  # 将当前空闲线程加入空闲列表
            try:
                yield
            finally:
                state_list.remove(worker_thread)  # 取到任务后,将当前空闲线程从空闲线程里移除,
    
    
    
    # How to use
    
    
    pool = ThreadPool(5)  # 创建pool对象,最多创建5个线程
    
    def callback(status, result):
        # status, execute action status
        # result, execute action return value
        pass
    
    
    def action(i):
        return i
    
    for i in range(2):  # 30个任务
        ret = pool.run(action, (i,), callback)  # 将action函数,及action的参数,一级callback函数传给run()方法
    
    
    #print(len(pool.generate_list), len(pool.free_list))
    pool.close()
    #pool.terminate()
  • 相关阅读:
    Data Base mysql备份与恢复
    java 乱码问题解决方案
    【知识强化】第二章 物理层 2.1 通信基础
    【知识强化】第二章 进程管理 2.2 处理机调度
    【知识强化】第二章 进程管理 2.1 进程与线程
    【知识强化】第一章 操作系统概述 1.3 操作系统的运行环境
    【知识强化】第一章 网络体系结构 1.1 数据结构的基本概念
    【知识强化】第一章 网络体系结构 1.2 计算机网络体系结构与参考模型
    【知识强化】第一章 网络体系结构 1.1 计算机网络概述
    【知识强化】第一章 操作系统概述 1.1 操作系统的基本概念
  • 原文地址:https://www.cnblogs.com/xiaoming279/p/6121315.html
Copyright © 2011-2022 走看看