zoukankan      html  css  js  c++  java
  • python进程池剖析(一)

     python中两个常用来处理进程的模块分别是subprocess和multiprocessing,其中subprocess通常用于执行外部程序,比如一些第三方应用程序,而不是Python程序。如果需要实现调用外部程序的功能,python的psutil模块是更好的选择,它不仅支持subprocess提供的功能,而且还能对当前主机或者启动的外部程序进行监控,比如获取网络、cpu、内存等信息使用情况,在做一些自动化运维工作时支持的更加全面。multiprocessing是python的多进程模块,主要通过启动python进程,调用target回调函数来处理任务,与之对应的是python的多线程模块threading,它们拥有类似的接口,通过定义multiprocessing.Process、threading.Thread,指定target方法,调用start()运行进程或者线程。

      在python中由于全局解释锁(GIL)的存在,使用多线程,并不能大大提高程序的运行效率【1】。因此,用python处理并发问题时,尽量使用多进程而非多线程。并发编程中,最简单的模式是,主进程等待任务,当有新任务到来时,启动一个新的进程来处理当前任务。这种每个任务一个进程的处理方式,每处理一个任务都会伴随着一个进程的创建、运行、销毁,如果进程的运行时间越短,创建和销毁的时间所占的比重就越大,显然,我们应该尽量避免创建和销毁进程本身的额外开销,提高进程的运行效率。我们可以用进程池来减少进程的创建和开销,提高进程对象的复用。

      实际上,python中已经实现了一个功能强大的进程池(multiprocessing.Pool),这里我们来简单剖析下python自带的进程池是如何实现的。


      要创建进程池对象,需要调用Pool函数,函数的声明如下:

    Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)
        Returns a process pool object
    processes表示工作进程的个数,默认为None,表示worker进程数为cpu_count()
    initializer表示工作进程start时调用的初始化函数,initargs表示initializer函数的参数,如果initializer不为None,在每个工作进程start之前会调用initializer(*initargs)
    maxtaskperchild表示每个工作进程在退出/被其他新的进程替代前,需要完成的工作任务数,默认为None,表示工作进程存活时间与pool相同,即不会自动退出/被替换。
    函数返回一个进程池(Pool)对象

      Pool函数返回的进程池对象中有下面一些数据结构:

    self._inqueue  接收任务队列(SimpleQueue),用于主进程将任务发送给worker进程
    self._outqueue  发送结果队列(SimpleQueue),用于worker进程将结果发送给主进程
    self._taskqueue  同步的任务队列,保存线程池分配给主进程的任务
    self._cache = {}  任务缓存
    self._processes  worker进程个数
    self._pool = []  woker进程队列

      进程池工作时,任务的接收、分配。结果的返回,均由进程池内部的各个线程合作完成,来看看进程池内部由那些线程:

    • _work_handler线程,负责保证进程池中的worker进程在有退出的情况下,创建出新的worker进程,并添加到进程队列(pools)中,保持进程池中的worker进程数始终为processes个。_worker_handler线程回调函数为Pool._handler_workers方法,在进程池state==RUN时,循环调用_maintain_pool方法,监控是否有进程退出,并创建新的进程,append到进程池pools中,保持进程池中的worker进程数始终为processes个。
      复制代码
      self._worker_handler = threading.Thread(
                  target=Pool._handle_workers,
                  args=(self, )
      )
      Pool._handle_workers方法在_worker_handler线程状态为运行时(status==RUN),循环调用_maintain_pool方法: def _maintain_pool(self): if self._join_exited_workers(): self._repopulate_pool()
      _join_exited_workers()监控pools队列中的进程是否有结束的,有则等待其结束,并从pools中删除,当有进程结束时,调用_repopulate_pool(),创建新的进程: w = self.Process(target=worker,    args=(self._inqueue, self._outqueue,   self._initializer, self._initargs,           
      self._maxtasksperchild) ) self._pool.append(w)
      w是新创建的进程,它是用来处理实际任务的进程,worker是它的回调函数: def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) put = outqueue.put get = inqueue.get if hasattr(inqueue, '_writer'): inqueue._writer.close() outqueue._reader.close() if initializer is not None: initializer(*initargs) completed = 0 while maxtasks is None or (maxtasks and completed < maxtasks): try: task = get() except (EOFError, IOError): debug('worker got EOFError or IOError -- exiting') break if task is None: debug('worker got sentinel -- exiting') break job, i, func, args, kwds = task try: result = (True, func(*args, **kwds)) except Exception, e: result = (False, e) try: put((job, i, result)) except Exception as e: wrapped = MaybeEncodingError(e, result[1]) debug("Possible encoding error while sending result: %s" % ( wrapped)) put((job, i, (False, wrapped))) completed += 1 debug('worker exiting after %d tasks' % completed) 所有worker进程都使用worker回调函数对任务进行统一的处理,从源码中可以看出:
      它的功能是从接入任务队列中(inqueue)读取出task任务,然后根据任务的函数、参数进行调用(result = (True, func(*args, **kwds),
      再将结果放入结果队列中(outqueue),如果有最大处理上限的限制maxtasks,那么当进程处理到任务数上限时退出。
      复制代码
    • _task_handler线程,负责从进程池中的task_queue中,将任务取出,放入接收任务队列(Pipe),
      self._task_handler = threading.Thread(
                  target=Pool._handle_tasks,
                  args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
      )
      Pool._handle_tasks方法不断从task_queue中获取任务,并放入接受任务队列(in_queue),以此触发worker进程进行任务处理。当从task_queue读取到None元素时,
      表示进程池将要被终止(terminate),不再处理之后的任务请求,同时向接受任务队列和结果任务队列put None元素,通知其他线程结束。
    • _handle_results线程,负责将处理完的任务结果,从outqueue(Pipe)中读取出来,放在任务缓存cache中,
      self._result_handler = threading.Thread(
              target=Pool._handle_results,
              args=(self._outqueue, self._quick_get, self._cache)
      )
    • _terminate,这里的_terminate并不是一个线程,而是一个Finalize对象
      复制代码
      self._terminate = Finalize(
                  self, self._terminate_pool,
                  args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
                        self._worker_handler, self._task_handler,
                        self._result_handler, self._cache),
                  exitpriority=15
      )
      Finalize类的构造函数与线程构造函数类似,_terminate_pool是它的回调函数,args回调函数的参数。
      _terminate_pool函数负责终止进程池的工作:终止上述的三个线程,终止进程池中的worker进程,清除队列中的数据。
      _terminate是个对象而非线程,那么它如何像线程调用start()方法一样,来执行回调函数_terminate_pool呢?查看Pool源码,发现进程池的终止函数:
      def terminate(self):
          debug('terminating pool')
          self._state = TERMINATE
          self._worker_handler._state = TERMINATE
          self._terminate()
      函数中最后将_terminate对象当做一个方法来执行,而_terminate本身是一个Finalize对象,我们看一下Finalize类的定义,发现它实现了__call__方法:
      def __call__(self, wr=None):
          try:
              del _finalizer_registry[self._key]
          except KeyError:
              sub_debug('finalizer no longer registered')
          else:
              if self._pid != os.getpid():
                  res = None
              else:
                  res = self._callback(*self._args, **self._kwargs)
              self._weakref = self._callback = self._args =
                              self._kwargs = self._key = None
              return res
      而方法中 self._callback(*self._args, **self._kwargs) 这条语句,就执行了_terminate_pool函数,进而将进程池终止。
      复制代码

      进程池中的数据结构、各个线程之间的合作关系如下图所示:

      【1】这里针对的是CPU密集型程序,多线程并不能带来效率上的提升,相反还可能会因为线程的频繁切换,导致效率下降;如果是IO密集型,多线程进程可以利用IO阻塞等待时的空闲时间执行其他线程,提升效率。

  • 相关阅读:
    ACM-ICPC 2018 徐州赛区网络预赛 G题
    ACM-ICPC 2018 沈阳赛区网络预赛 K题
    ACM-ICPC 2018 沈阳赛区网络预赛 K题
    数据结构实验之栈与队列七:出栈序列判定
    数据结构实验之栈与队列七:出栈序列判定
    Python isspace()方法
    Python isnumeric()方法
    Python islower()方法
    Python isdigit()方法
    Python isdecimal()方法
  • 原文地址:https://www.cnblogs.com/jinan1/p/10649730.html
Copyright © 2011-2022 走看看