zoukankan      html  css  js  c++  java
  • Python 线程池的原理和实现及subprocess模块

    最近由于项目需要一个与linux shell交互的多线程程序,需要用python实现,之前从没接触过python,这次匆匆忙忙的使用python,发现python确实语法非常简单,功能非常强大,因为自己是从零开始使用python,连语法都是现学的,所以将一些使用记录下来,希望能帮到大家。

    使用python的需求简单的说是调用liunux下的ffmpeg获取音频的一些信息,需要用多线程实现

    一、subprocess

    因为是多线程,首先想到的是subprocess模块(官方文档说明此模块将会代替os模块和Popen2模块和command模块),这是python提供的开启子线程的标准库。可以通过pipe将子线程的stdin、stdout和stderr与主线程交互。

    subprocess.call(["ls", "-l"])
    
    subprocess.check_call(["ls", "-l"])

    这是两个非常简单的例子,主线程都会等待子线程命令的完成,然后获取返回值,两个方法的唯一区别就是check_call会检查返回值,如果返回值不为0(即正确执行),则会抛出CalledProcessError异常。

    Popen

    那么我用的是Popen方法,实际上subprocess模块中其它的方法都是对Popen的封装,为了更方便的使用,如果我们自己需要定制某些功能,最后还是会回到Popen。

    Popen具体的参数使用可以参考Python document

    Popen接受元组为参数

    child = subprocess.Popen(["ping","-c","5","www.google.com"])

    与上述方法不同的是,使用Popen,主线程不会等待子线程完成,如果要等待,需要使用wait()方法。

    先上我使用的代码:

    command = ["ffmpeg","-i",songPath];
    
    stdoutData,stderrData = subprocess.Popen(command,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate();

        上面也提到了,使用Popen可以自定义标准输入、标准输出和标准错误输出。

    那么在这行代码中,我定义了stdoutData和stderrData分别接受命令行程序的标准输出和标准错误输出(即stdout=subprocess.PIPE,stderr=subprocess.PIPE),都是通过管道(Pipe)实现。大家要注意,如果我们要在接下来的主线程中使用Popen方法执行命令行程序后的输出数据,如print(stdoutData),那么记住使用Popen.wait(),因为主线程不会等待Popen执行完成。为什么我这里没有用呢?因为我这里使用了communicate方法,communicate方法代表子线程与主线程之间的通信,是阻塞式的,如果使用了communicate方法,主线程会等待子线程的完成

        其实从另一个层面上来说,也是linux标准输入输出的管道重定向,只不过是把标准输入输出重定向到程序而已。

        按照这种方法,我就用ffmpeg获取到了音频的输出。

    那么问题来了,说好的多线程呢?因为communicate方法是阻塞式的,并不能开启多线程。所以在思考过后,我决定用在主线程中开启多个子线程,分别调用subprocess模块去获取音频的信息。既然都是用到了多线程,考虑到效率问题,自然联想到了线程池。

    二、线程池

        为什么需要线程池呢?

            设想一下,如果我们使用有任务就开启一个子线程处理,处理完成后,销毁子线程或等得子线程自然死亡,那么如果我们的任务所需时间比较短,但是任务数量比较多,那么更多的时间是花在线程的创建和结束上面,效率肯定就低了。

        线程池的原理:

            既然是线程池(Thread pool),其实名字很形象,就是把指定数量的可用子线程放进一个"池里",有任务时取出一个线程执行,任务执行完后,并不立即销毁线程,而是放进线程池中,等待接收下一个任务。这样内存和cpu的开销也比较小,并且我们可以控制线程的数量。

        线程池的实现:

            线程池有很多种实现方式,在python中,已经给我们提供了一个很好的实现方式:Queue-队列。因为python中Queue本身就是同步的,所以也就是线程安全的,所以我们可以放心的让多个线程共享一个Queue。

            那么说到线程池,那么理应也得有一个任务池,任务池中存放着待执行的任务,各个线程到任务池中取任务执行,那么用Queue来实现任务池是最好不过的。

    先上代码:

    class TaskManager():
    
        def __init__(self,maxTasks,maxThreads):
            #最大任务书,也就是Queue的容量
            self._maxTasks = maxTasks;
            #线程池中线程数量    
            self._maxThreads = maxThreads;
            #业务代码
            ….
            ….
    
            #任务池
            self._taskQueue = Queue.Queue(maxTasks);
            #线程池,使用列表实现
            self._threads = [];
    
            #在__init__中调用方法
            self.initThreads();
            self.initTaskQueue();
    
        #初始化任务池
        def initTaskQueue(self):
            while True:
            #业务代码
                if not self._taskQueue.full():
                    getTasks(self._maxTasks - self._taskQueue.qsize());
                    for task in taskMap["tasks"]:
                    self._taskQueue.put(task);
                    time.sleep(1);
    
        #初始化线程池
        def initThreads(self):
            for i in range(self._maxThreads):
            #调用每个线程执行的具体任务
            self._threads.append(Work(self,self._reportUrl));
    
        def getTask(self):
            return self._taskQueue.get();
    
    #具体执行的任务
    class Work(threading.Thread):
        def __init__(self,taskmgr):
        threading.Thread.__init__(self);
        self._logger = logging.getLogger("");
        self.start();
    
        def run(self):
            while True:
                try:
                    #取出任务并执行相关操作
                    self._taskmgr.getTask();
                    ……
                    ……
    
                    time.sleep(1);
                except Exception,e:
                    self._logger.exception(e);            

    线程池的实现主要分两部分,一部分是TaskMagager,即任务管理类,用来调度任务,一部分是Work,即具体需要执行的业务代码。线程池的这种设计模式在很多地方都可以借鉴

    TaskManager

        先来看TaskManager,主要包含四个方法,一个构造方法,接受传进来的参数,执行任务池和线程池的大小等初始化信息,然后调用initTaskQueue和initThread方法初始化任务池和线程池。

        最后一个方法getTask返回TaskManager类的一个实例。

    Work

        执行具体的业务

    过程分析

    1. TaskManager的__init__方法初始化线程池和任务池
    2. initTaskQueue方法,初始化任务池,将任务填充到任务队列。
    3. initThreads方法,初始化线程池,调用Work类执行任务。
    4. getTask方法,返回TaskManager实例,主要作用是传给Work类,让子线程从任务队列中取出任务执行。
    5. Work类的__init__方法初始化线程,并启动线程。
    6. Run方法,执行任务,并且从任务队列中取出任务。

    关键点:

    1. 在主线程,也即TaskManager的initTaskQueue方法中获取任务并填充任务池
    2. 在各个子线程中,也即Work类的run方法中获取任务池中的任务并执行。

      这里需要注意的是,前面提到过,Python中的Queue是线程安全的,Queue的get方法是阻塞式,也即,如果Queue为空,子线程取不到任务,会进行等待,直到Queue中有任务可取

        三、在TaskManager的__init__方法中,最好先启动线程,在启动任务池

    self.initThreads();
    
    self.initTaskQueue();

         

    否则在initTaskQueue(主线程)中的while循环会一直执行,将会阻塞线程池的执行。在第二点中说明过,先启动线程池,就算任务池没有任务,子线程也会阻塞等待任务池中出现新任务。

     

    就写到这里,如果错误,请大家指正:)

  • 相关阅读:
    HDU 1495 非常可乐
    ja
    Codeforces Good Bye 2016 E. New Year and Old Subsequence
    The 2019 Asia Nanchang First Round Online Programming Contest
    Educational Codeforces Round 72 (Rated for Div. 2)
    Codeforces Round #583 (Div. 1 + Div. 2, based on Olympiad of Metropolises)
    AtCoder Regular Contest 102
    AtCoder Regular Contest 103
    POJ1741 Tree(点分治)
    洛谷P2634 [国家集训队]聪聪可可(点分治)
  • 原文地址:https://www.cnblogs.com/edwinchen/p/4061786.html
Copyright © 2011-2022 走看看