zoukankan      html  css  js  c++  java
  • Python 线程池的原理和实现及subprocess模块

    最近由于项目需要一个与linux shell交互的多线程程序,需要用python实现,之前从没接触过python,这次匆匆忙忙的使用python,发现python确实语法非常简单,功能非常强大,因为自己是从零开始使用python,连语法都是现学的,所以将一些使用记录下来,希望能帮到大家。

    使用python的需求简单的说是调用liunux下的ffmpeg获取音频的一些信息,需要用多线程实现

    一、subprocess

    因为是多线程,首先想到的是subprocess模块(官方文档说明此模块将会代替os模块和Popen2模块和command模块),这是python提供的开启子线程的标准库。可以通过pipe将子线程的stdin、stdout和stderr与主线程交互。

    subprocess.call(["ls", "-l"])
    
    subprocess.check_call(["ls", "-l"])

    这是两个非常简单的例子,主线程都会等待子线程命令的完成,然后获取返回值,两个方法的唯一区别就是check_call会检查返回值,如果返回值不为0(即正确执行),则会抛出CalledProcessError异常。

    Popen

    那么我用的是Popen方法,实际上subprocess模块中其它的方法都是对Popen的封装,为了更方便的使用,如果我们自己需要定制某些功能,最后还是会回到Popen。

    Popen具体的参数使用可以参考Python document

    Popen接受元组为参数

    child = subprocess.Popen(["ping","-c","5","www.google.com"])

    与上述方法不同的是,使用Popen,主线程不会等待子线程完成,如果要等待,需要使用wait()方法。

    先上我使用的代码:

    command = ["ffmpeg","-i",songPath];
    
    stdoutData,stderrData = subprocess.Popen(command,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate();

        上面也提到了,使用Popen可以自定义标准输入、标准输出和标准错误输出。

    那么在这行代码中,我定义了stdoutData和stderrData分别接受命令行程序的标准输出和标准错误输出(即stdout=subprocess.PIPE,stderr=subprocess.PIPE),都是通过管道(Pipe)实现。大家要注意,如果我们要在接下来的主线程中使用Popen方法执行命令行程序后的输出数据,如print(stdoutData),那么记住使用Popen.wait(),因为主线程不会等待Popen执行完成。为什么我这里没有用呢?因为我这里使用了communicate方法,communicate方法代表子线程与主线程之间的通信,是阻塞式的,如果使用了communicate方法,主线程会等待子线程的完成

        其实从另一个层面上来说,也是linux标准输入输出的管道重定向,只不过是把标准输入输出重定向到程序而已。

        按照这种方法,我就用ffmpeg获取到了音频的输出。

    那么问题来了,说好的多线程呢?因为communicate方法是阻塞式的,并不能开启多线程。所以在思考过后,我决定用在主线程中开启多个子线程,分别调用subprocess模块去获取音频的信息。既然都是用到了多线程,考虑到效率问题,自然联想到了线程池。

    二、线程池

        为什么需要线程池呢?

            设想一下,如果我们使用有任务就开启一个子线程处理,处理完成后,销毁子线程或等得子线程自然死亡,那么如果我们的任务所需时间比较短,但是任务数量比较多,那么更多的时间是花在线程的创建和结束上面,效率肯定就低了。

        线程池的原理:

            既然是线程池(Thread pool),其实名字很形象,就是把指定数量的可用子线程放进一个"池里",有任务时取出一个线程执行,任务执行完后,并不立即销毁线程,而是放进线程池中,等待接收下一个任务。这样内存和cpu的开销也比较小,并且我们可以控制线程的数量。

        线程池的实现:

            线程池有很多种实现方式,在python中,已经给我们提供了一个很好的实现方式:Queue-队列。因为python中Queue本身就是同步的,所以也就是线程安全的,所以我们可以放心的让多个线程共享一个Queue。

            那么说到线程池,那么理应也得有一个任务池,任务池中存放着待执行的任务,各个线程到任务池中取任务执行,那么用Queue来实现任务池是最好不过的。

    先上代码:

    class TaskManager():
    
        def __init__(self,maxTasks,maxThreads):
            #最大任务书,也就是Queue的容量
            self._maxTasks = maxTasks;
            #线程池中线程数量    
            self._maxThreads = maxThreads;
            #业务代码
            ….
            ….
    
            #任务池
            self._taskQueue = Queue.Queue(maxTasks);
            #线程池,使用列表实现
            self._threads = [];
    
            #在__init__中调用方法
            self.initThreads();
            self.initTaskQueue();
    
        #初始化任务池
        def initTaskQueue(self):
            while True:
            #业务代码
                if not self._taskQueue.full():
                    getTasks(self._maxTasks - self._taskQueue.qsize());
                    for task in taskMap["tasks"]:
                    self._taskQueue.put(task);
                    time.sleep(1);
    
        #初始化线程池
        def initThreads(self):
            for i in range(self._maxThreads):
            #调用每个线程执行的具体任务
            self._threads.append(Work(self,self._reportUrl));
    
        def getTask(self):
            return self._taskQueue.get();
    
    #具体执行的任务
    class Work(threading.Thread):
        def __init__(self,taskmgr):
        threading.Thread.__init__(self);
        self._logger = logging.getLogger("");
        self.start();
    
        def run(self):
            while True:
                try:
                    #取出任务并执行相关操作
                    self._taskmgr.getTask();
                    ……
                    ……
    
                    time.sleep(1);
                except Exception,e:
                    self._logger.exception(e);            

    线程池的实现主要分两部分,一部分是TaskMagager,即任务管理类,用来调度任务,一部分是Work,即具体需要执行的业务代码。线程池的这种设计模式在很多地方都可以借鉴

    TaskManager

        先来看TaskManager,主要包含四个方法,一个构造方法,接受传进来的参数,执行任务池和线程池的大小等初始化信息,然后调用initTaskQueue和initThread方法初始化任务池和线程池。

        最后一个方法getTask返回TaskManager类的一个实例。

    Work

        执行具体的业务

    过程分析

    1. TaskManager的__init__方法初始化线程池和任务池
    2. initTaskQueue方法,初始化任务池,将任务填充到任务队列。
    3. initThreads方法,初始化线程池,调用Work类执行任务。
    4. getTask方法,返回TaskManager实例,主要作用是传给Work类,让子线程从任务队列中取出任务执行。
    5. Work类的__init__方法初始化线程,并启动线程。
    6. Run方法,执行任务,并且从任务队列中取出任务。

    关键点:

    1. 在主线程,也即TaskManager的initTaskQueue方法中获取任务并填充任务池
    2. 在各个子线程中,也即Work类的run方法中获取任务池中的任务并执行。

      这里需要注意的是,前面提到过,Python中的Queue是线程安全的,Queue的get方法是阻塞式,也即,如果Queue为空,子线程取不到任务,会进行等待,直到Queue中有任务可取

        三、在TaskManager的__init__方法中,最好先启动线程,在启动任务池

    self.initThreads();
    
    self.initTaskQueue();

         

    否则在initTaskQueue(主线程)中的while循环会一直执行,将会阻塞线程池的执行。在第二点中说明过,先启动线程池,就算任务池没有任务,子线程也会阻塞等待任务池中出现新任务。

     

    就写到这里,如果错误,请大家指正:)

  • 相关阅读:
    sh_02_判断年龄改进版
    面向对象-内置函数和内置方法
    python面向对象-三大特性
    面向对象编程
    面向对象编程-回合制游戏
    练习Dream-购物车
    bytes和str的区别与转换
    字符串的操作和使用方法。
    程序员必须掌握的600个单词
    移动端的网页设计流程有哪些?
  • 原文地址:https://www.cnblogs.com/edwinchen/p/4061786.html
Copyright © 2011-2022 走看看