zoukankan      html  css  js  c++  java
  • python进程池剖析(一)

      python中两个常用来处理进程的模块分别是subprocess和multiprocessing,其中subprocess通常用于执行外部程序,比如一些第三方应用程序,而不是Python程序。如果需要实现调用外部程序的功能,python的psutil模块是更好的选择,它不仅支持subprocess提供的功能,而且还能对当前主机或者启动的外部程序进行监控,比如获取网络、cpu、内存等信息使用情况,在做一些自动化运维工作时支持的更加全面。multiprocessing是python的多进程模块,主要通过启动python进程,调用target回调函数来处理任务,与之对应的是python的多线程模块threading,它们拥有类似的接口,通过定义multiprocessing.Process、threading.Thread,指定target方法,调用start()运行进程或者线程。

      在python中由于全局解释锁(GIL)的存在,使用多线程,并不能大大提高程序的运行效率【1】。因此,用python处理并发问题时,尽量使用多进程而非多线程。并发编程中,最简单的模式是,主进程等待任务,当有新任务到来时,启动一个新的进程来处理当前任务。这种每个任务一个进程的处理方式,每处理一个任务都会伴随着一个进程的创建、运行、销毁,如果进程的运行时间越短,创建和销毁的时间所占的比重就越大,显然,我们应该尽量避免创建和销毁进程本身的额外开销,提高进程的运行效率。我们可以用进程池来减少进程的创建和开销,提高进程对象的复用。

      实际上,python中已经实现了一个功能强大的进程池(multiprocessing.Pool),这里我们来简单剖析下python自带的进程池是如何实现的。


      要创建进程池对象,需要调用Pool函数,函数的声明如下:

    Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)
        Returns a process pool object
    processes表示工作进程的个数,默认为None,表示worker进程数为cpu_count()
    initializer表示工作进程start时调用的初始化函数,initargs表示initializer函数的参数,如果initializer不为None,在每个工作进程start之前会调用initializer(*initargs)
    maxtaskperchild表示每个工作进程在退出/被其他新的进程替代前,需要完成的工作任务数,默认为None,表示工作进程存活时间与pool相同,即不会自动退出/被替换。
    函数返回一个进程池(Pool)对象

      Pool函数返回的进程池对象中有下面一些数据结构:

    self._inqueue  接收任务队列(SimpleQueue),用于主进程将任务发送给worker进程
    self._outqueue  发送结果队列(SimpleQueue),用于worker进程将结果发送给主进程
    self._taskqueue  同步的任务队列,保存线程池分配给主进程的任务
    self._cache = {}  任务缓存
    self._processes  worker进程个数
    self._pool = []  woker进程队列

      进程池工作时,任务的接收、分配。结果的返回,均由进程池内部的各个线程合作完成,来看看进程池内部由那些线程:

    • _work_handler线程,负责保证进程池中的worker进程在有退出的情况下,创建出新的worker进程,并添加到进程队列(pools)中,保持进程池中的worker进程数始终为processes个。_worker_handler线程回调函数为Pool._handler_workers方法,在进程池state==RUN时,循环调用_maintain_pool方法,监控是否有进程退出,并创建新的进程,append到进程池pools中,保持进程池中的worker进程数始终为processes个。
      复制代码
      self._worker_handler = threading.Thread(
                  target=Pool._handle_workers,
                  args=(self, )
      ) 
      Pool._handle_workers方法在_worker_handler线程状态为运行时(status==RUN),循环调用_maintain_pool方法:
      def _maintain_pool(self):
          if self._join_exited_workers():
              self._repopulate_pool() 
      _join_exited_workers()监控pools队列中的进程是否有结束的,有则等待其结束,并从pools中删除,当有进程结束时,调用_repopulate_pool(),创建新的进程:
      w = self.Process(target=worker,
                      args=(self._inqueue, self._outqueue,
                            self._initializer, self._initargs,                                         self._maxtasksperchild)
                       )
      self._pool.append(w) 
      w是新创建的进程,它是用来处理实际任务的进程,worker是它的回调函数:
      def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
          assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
          put = outqueue.put
          get = inqueue.get
          if hasattr(inqueue, '_writer'):
              inqueue._writer.close()
              outqueue._reader.close()
      
          if initializer is not None:
              initializer(*initargs)
      
          completed = 0
          while maxtasks is None or (maxtasks and completed < maxtasks):
              try:
                  task = get()
              except (EOFError, IOError):
                  debug('worker got EOFError or IOError -- exiting')
                  break
      
              if task is None:
                  debug('worker got sentinel -- exiting')
                  break
      
              job, i, func, args, kwds = task
              try:
                  result = (True, func(*args, **kwds))
              except Exception, e:
                  result = (False, e)
              try:
                  put((job, i, result))
              except Exception as e:
                  wrapped = MaybeEncodingError(e, result[1])
                  debug("Possible encoding error while sending result: %s" % (
                      wrapped))
                  put((job, i, (False, wrapped)))
              completed += 1
          debug('worker exiting after %d tasks' % completed)
      
      所有worker进程都使用worker回调函数对任务进行统一的处理,从源码中可以看出: 它的功能是从接入任务队列中(inqueue)读取出task任务,然后根据任务的函数、参数进行调用(result = (True, func(*args, **kwds), 再将结果放入结果队列中(outqueue),如果有最大处理上限的限制maxtasks,那么当进程处理到任务数上限时退出。
      复制代码
    • _task_handler线程,负责从进程池中的task_queue中,将任务取出,放入接收任务队列(Pipe),
      self._task_handler = threading.Thread(
                  target=Pool._handle_tasks,
                  args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
      ) Pool._handle_tasks方法不断从task_queue中获取任务,并放入接受任务队列(in_queue),以此触发worker进程进行任务处理。当从task_queue读取到None元素时, 表示进程池将要被终止(terminate),不再处理之后的任务请求,同时向接受任务队列和结果任务队列put None元素,通知其他线程结束。 
    • _handle_results线程,负责将处理完的任务结果,从outqueue(Pipe)中读取出来,放在任务缓存cache中,
      self._result_handler = threading.Thread(         target=Pool._handle_results,         args=(self._outqueue, self._quick_get, self._cache) )
    • _terminate,这里的_terminate并不是一个线程,而是一个Finalize对象
      复制代码
      self._terminate = Finalize(
                  self, self._terminate_pool,
                  args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
                        self._worker_handler, self._task_handler,
                        self._result_handler, self._cache),
                  exitpriority=15
      ) Finalize类的构造函数与线程构造函数类似,_terminate_pool是它的回调函数,args回调函数的参数。 _terminate_pool函数负责终止进程池的工作:终止上述的三个线程,终止进程池中的worker进程,清除队列中的数据。 _terminate是个对象而非线程,那么它如何像线程调用start()方法一样,来执行回调函数_terminate_pool呢?查看Pool源码,发现进程池的终止函数: def terminate(self):     debug('terminating pool')     self._state = TERMINATE     self._worker_handler._state = TERMINATE     self._terminate() 函数中最后将_terminate对象当做一个方法来执行,而_terminate本身是一个Finalize对象,我们看一下Finalize类的定义,发现它实现了__call__方法: def __call__(self, wr=None):     try:         del _finalizer_registry[self._key]     except KeyError:         sub_debug('finalizer no longer registered')     else:         if self._pid != os.getpid():             res = None         else:             res = self._callback(*self._args, **self._kwargs)         self._weakref = self._callback = self._args =                          self._kwargs = self._key = None         return res 而方法中 self._callback(*self._args, **self._kwargs) 这条语句,就执行了_terminate_pool函数,进而将进程池终止。 
      复制代码

      进程池中的数据结构、各个线程之间的合作关系如下图所示:

      【1】这里针对的是CPU密集型程序,多线程并不能带来效率上的提升,相反还可能会因为线程的频繁切换,导致效率下降;如果是IO密集型,多线程进程可以利用IO阻塞等待时的空闲时间执行其他线程,提升效率。

      未完待续……

  • 相关阅读:
    语料和文本处理
    seq2seq+torch7聊天机器人bug处理
    unity3d inputfield标签控制台打印object
    多种语言tcp编程
    处理json中的空格
    安卓无法访问Azure服务器和微软API
    Xamarin/Unity3d无法访问Azure服务器或者微软API
    unity3d C# soket客户端接受失败
    unity3d之public变量引发错误
    unity3d,java,c#,python,rospy的socket通信测试
  • 原文地址:https://www.cnblogs.com/chengzhengfu/p/4574170.html
Copyright © 2011-2022 走看看