zoukankan      html  css  js  c++  java
  • python进程池剖析(一)

      python中两个常用来处理进程的模块分别是subprocess和multiprocessing,其中subprocess通常用于执行外部程序,比如一些第三方应用程序,而不是Python程序。如果需要实现调用外部程序的功能,python的psutil模块是更好的选择,它不仅支持subprocess提供的功能,而且还能对当前主机或者启动的外部程序进行监控,比如获取网络、cpu、内存等信息使用情况,在做一些自动化运维工作时支持的更加全面。multiprocessing是python的多进程模块,主要通过启动python进程,调用target回调函数来处理任务,与之对应的是python的多线程模块threading,它们拥有类似的接口,通过定义multiprocessing.Process、threading.Thread,指定target方法,调用start()运行进程或者线程。

      在python中由于全局解释锁(GIL)的存在,使用多线程,并不能大大提高程序的运行效率【1】。因此,用python处理并发问题时,尽量使用多进程而非多线程。并发编程中,最简单的模式是,主进程等待任务,当有新任务到来时,启动一个新的进程来处理当前任务。这种每个任务一个进程的处理方式,每处理一个任务都会伴随着一个进程的创建、运行、销毁,如果进程的运行时间越短,创建和销毁的时间所占的比重就越大,显然,我们应该尽量避免创建和销毁进程本身的额外开销,提高进程的运行效率。我们可以用进程池来减少进程的创建和开销,提高进程对象的复用。

      实际上,python中已经实现了一个功能强大的进程池(multiprocessing.Pool),这里我们来简单剖析下python自带的进程池是如何实现的。


      要创建进程池对象,需要调用Pool函数,函数的声明如下:

    Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)
        Returns a process pool object
    processes表示工作进程的个数,默认为None,表示worker进程数为cpu_count()
    initializer表示工作进程start时调用的初始化函数,initargs表示initializer函数的参数,如果initializer不为None,在每个工作进程start之前会调用initializer(*initargs)
    maxtaskperchild表示每个工作进程在退出/被其他新的进程替代前,需要完成的工作任务数,默认为None,表示工作进程存活时间与pool相同,即不会自动退出/被替换。
    函数返回一个进程池(Pool)对象

      Pool函数返回的进程池对象中有下面一些数据结构:

    self._inqueue  接收任务队列(SimpleQueue),用于主进程将任务发送给worker进程
    self._outqueue  发送结果队列(SimpleQueue),用于worker进程将结果发送给主进程
    self._taskqueue  同步的任务队列,保存线程池分配给主进程的任务
    self._cache = {}  任务缓存
    self._processes  worker进程个数
    self._pool = []  woker进程队列

      进程池工作时,任务的接收、分配。结果的返回,均由进程池内部的各个线程合作完成,来看看进程池内部由那些线程:

    • _work_handler线程,负责保证进程池中的worker进程在有退出的情况下,创建出新的worker进程,并添加到进程队列(pools)中,保持进程池中的worker进程数始终为processes个。_worker_handler线程回调函数为Pool._handler_workers方法,在进程池state==RUN时,循环调用_maintain_pool方法,监控是否有进程退出,并创建新的进程,append到进程池pools中,保持进程池中的worker进程数始终为processes个。
      self._worker_handler = threading.Thread(
                  target=Pool._handle_workers,
                  args=(self, )
      )
      Pool._handle_workers方法在_worker_handler线程状态为运行时(status==RUN),循环调用_maintain_pool方法:
      def _maintain_pool(self): if self._join_exited_workers(): self._repopulate_pool()
      _join_exited_workers()监控pools队列中的进程是否有结束的,有则等待其结束,并从pools中删除,当有进程结束时,调用_repopulate_pool(),创建新的进程: w
      = self.Process(target=worker,    args=(self._inqueue, self._outqueue,   self._initializer, self._initargs,           
      self._maxtasksperchild) ) self._pool.append(w)
      w是新创建的进程,它是用来处理实际任务的进程,worker是它的回调函数:
      def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) put = outqueue.put get = inqueue.get if hasattr(inqueue, '_writer'): inqueue._writer.close() outqueue._reader.close() if initializer is not None: initializer(*initargs) completed = 0 while maxtasks is None or (maxtasks and completed < maxtasks): try: task = get() except (EOFError, IOError): debug('worker got EOFError or IOError -- exiting') break if task is None: debug('worker got sentinel -- exiting') break job, i, func, args, kwds = task try: result = (True, func(*args, **kwds)) except Exception, e: result = (False, e) try: put((job, i, result)) except Exception as e: wrapped = MaybeEncodingError(e, result[1]) debug("Possible encoding error while sending result: %s" % ( wrapped)) put((job, i, (False, wrapped))) completed += 1 debug('worker exiting after %d tasks' % completed) 所有worker进程都使用worker回调函数对任务进行统一的处理,从源码中可以看出:
      它的功能是从接入任务队列中(inqueue)读取出task任务,然后根据任务的函数、参数进行调用(result = (True, func(*args, **kwds),
      再将结果放入结果队列中(outqueue),如果有最大处理上限的限制maxtasks,那么当进程处理到任务数上限时退出。
    • _task_handler线程,负责从进程池中的task_queue中,将任务取出,放入接收任务队列(Pipe)
      self._task_handler = threading.Thread(
                  target=Pool._handle_tasks,
                  args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
      )
      Pool._handle_tasks方法不断从task_queue中获取任务,并放入接受任务队列(in_queue),以此触发worker进程进行任务处理。当从task_queue读取到None元素时,
      表示进程池将要被终止(terminate),不再处理之后的任务请求,同时向接受任务队列和结果任务队列put None元素,通知其他线程结束。
    • _handle_results线程,负责将处理完的任务结果,从outqueue(Pipe)中读取出来,放在任务缓存cache中,
      self._result_handler = threading.Thread(
              target=Pool._handle_results,
              args=(self._outqueue, self._quick_get, self._cache)
      )
    • _terminate,这里的_terminate并不是一个线程,而是一个Finalize对象
      self._terminate = Finalize(
                  self, self._terminate_pool,
                  args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
                        self._worker_handler, self._task_handler,
                        self._result_handler, self._cache),
                  exitpriority=15
      )
      Finalize类的构造函数与线程构造函数类似,_terminate_pool是它的回调函数,args回调函数的参数。
      _terminate_pool函数负责终止进程池的工作:终止上述的三个线程,终止进程池中的worker进程,清除队列中的数据。
      _terminate是个对象而非线程,那么它如何像线程调用start()方法一样,来执行回调函数_terminate_pool呢?查看Pool源码,发现进程池的终止函数:
      def terminate(self):
          debug('terminating pool')
          self._state = TERMINATE
          self._worker_handler._state = TERMINATE
          self._terminate()
      函数中最后将_terminate对象当做一个方法来执行,而_terminate本身是一个Finalize对象,我们看一下Finalize类的定义,发现它实现了__call__方法:
      def __call__(self, wr=None):
          try:
              del _finalizer_registry[self._key]
          except KeyError:
              sub_debug('finalizer no longer registered')
          else:
              if self._pid != os.getpid():
                  res = None
              else:
                  res = self._callback(*self._args, **self._kwargs)
              self._weakref = self._callback = self._args =
                              self._kwargs = self._key = None
              return res
      而方法中 self._callback(*self._args, **self._kwargs) 这条语句,就执行了_terminate_pool函数,进而将进程池终止。

      进程池中的数据结构、各个线程之间的合作关系如下图所示:

      【1】这里针对的是CPU密集型程序,多线程并不能带来效率上的提升,相反还可能会因为线程的频繁切换,导致效率下降;如果是IO密集型,多线程进程可以利用IO阻塞等待时的空闲时间执行其他线程,提升效率。

      未完待续……

  • 相关阅读:
    springcloud 项目源码 微服务 分布式 Activiti6 工作流 vue.js html 跨域 前后分离
    springcloud 项目源码 微服务 分布式 Activiti6 工作流 vue.js html 跨域 前后分离
    OA办公系统 Springboot Activiti6 工作流 集成代码生成器 vue.js 前后分离 跨域
    java企业官网源码 自适应响应式 freemarker 静态引擎 SSM 框架
    java OA办公系统源码 Springboot Activiti工作流 vue.js 前后分离 集成代码生成器
    springcloud 项目源码 微服务 分布式 Activiti6 工作流 vue.js html 跨域 前后分离
    java 视频播放 弹幕技术 视频弹幕 视频截图 springmvc mybatis SSM
    最后阶段总结
    第二阶段学习总结
    第一阶段学习总结
  • 原文地址:https://www.cnblogs.com/Tour/p/4537212.html
Copyright © 2011-2022 走看看