python中两个常用来处理进程的模块分别是subprocess和multiprocessing,其中subprocess通常用于执行外部程序,比如一些第三方应用程序,而不是Python程序。如果需要实现调用外部程序的功能,python的psutil模块是更好的选择,它不仅支持subprocess提供的功能,而且还能对当前主机或者启动的外部程序进行监控,比如获取网络、cpu、内存等信息使用情况,在做一些自动化运维工作时支持的更加全面。multiprocessing是python的多进程模块,主要通过启动python进程,调用target回调函数来处理任务,与之对应的是python的多线程模块threading,它们拥有类似的接口,通过定义multiprocessing.Process、threading.Thread,指定target方法,调用start()运行进程或者线程。
在python中由于全局解释锁(GIL)的存在,使用多线程,并不能大大提高程序的运行效率【1】。因此,用python处理并发问题时,尽量使用多进程而非多线程。并发编程中,最简单的模式是,主进程等待任务,当有新任务到来时,启动一个新的进程来处理当前任务。这种每个任务一个进程的处理方式,每处理一个任务都会伴随着一个进程的创建、运行、销毁,如果进程的运行时间越短,创建和销毁的时间所占的比重就越大,显然,我们应该尽量避免创建和销毁进程本身的额外开销,提高进程的运行效率。我们可以用进程池来减少进程的创建和开销,提高进程对象的复用。
实际上,python中已经实现了一个功能强大的进程池(multiprocessing.Pool),这里我们来简单剖析下python自带的进程池是如何实现的。
要创建进程池对象,需要调用Pool函数,函数的声明如下:
Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)
Returns a process pool object
processes表示工作进程的个数,默认为None,表示worker进程数为cpu_count()
initializer表示工作进程start时调用的初始化函数,initargs表示initializer函数的参数,如果initializer不为None,在每个工作进程start之前会调用initializer(*initargs)
maxtaskperchild表示每个工作进程在退出/被其他新的进程替代前,需要完成的工作任务数,默认为None,表示工作进程存活时间与pool相同,即不会自动退出/被替换。
函数返回一个进程池(Pool)对象
Pool函数返回的进程池对象中有下面一些数据结构:
self._inqueue 接收任务队列(SimpleQueue),用于主进程将任务发送给worker进程
self._outqueue 发送结果队列(SimpleQueue),用于worker进程将结果发送给主进程
self._taskqueue 同步的任务队列,保存线程池分配给主进程的任务
self._cache = {} 任务缓存
self._processes worker进程个数
self._pool = [] woker进程队列
进程池工作时,任务的接收、分配。结果的返回,均由进程池内部的各个线程合作完成,来看看进程池内部由那些线程:
- _work_handler线程,负责保证进程池中的worker进程在有退出的情况下,创建出新的worker进程,并添加到进程队列(pools)中,保持进程池中的worker进程数始终为processes个。_worker_handler线程回调函数为Pool._handler_workers方法,在进程池state==RUN时,循环调用_maintain_pool方法,监控是否有进程退出,并创建新的进程,append到进程池pools中,保持进程池中的worker进程数始终为processes个。
self._worker_handler = threading.Thread( target=Pool._handle_workers, args=(self, ) ) Pool._handle_workers方法在_worker_handler线程状态为运行时(status==RUN),循环调用_maintain_pool方法: def _maintain_pool(self): if self._join_exited_workers(): self._repopulate_pool() _join_exited_workers()监控pools队列中的进程是否有结束的,有则等待其结束,并从pools中删除,当有进程结束时,调用_repopulate_pool(),创建新的进程: w = self.Process(target=worker, args=(self._inqueue, self._outqueue, self._initializer, self._initargs, self._maxtasksperchild) ) self._pool.append(w) w是新创建的进程,它是用来处理实际任务的进程,worker是它的回调函数: def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) put = outqueue.put get = inqueue.get if hasattr(inqueue, '_writer'): inqueue._writer.close() outqueue._reader.close() if initializer is not None: initializer(*initargs) completed = 0 while maxtasks is None or (maxtasks and completed < maxtasks): try: task = get() except (EOFError, IOError): debug('worker got EOFError or IOError -- exiting') break if task is None: debug('worker got sentinel -- exiting') break job, i, func, args, kwds = task try: result = (True, func(*args, **kwds)) except Exception, e: result = (False, e) try: put((job, i, result)) except Exception as e: wrapped = MaybeEncodingError(e, result[1]) debug("Possible encoding error while sending result: %s" % ( wrapped)) put((job, i, (False, wrapped))) completed += 1 debug('worker exiting after %d tasks' % completed) 所有worker进程都使用worker回调函数对任务进行统一的处理,从源码中可以看出: 它的功能是从接入任务队列中(inqueue)读取出task任务,然后根据任务的函数、参数进行调用(result = (True, func(*args, **kwds), 再将结果放入结果队列中(outqueue),如果有最大处理上限的限制maxtasks,那么当进程处理到任务数上限时退出。
- _task_handler线程,负责从进程池中的task_queue中,将任务取出,放入接收任务队列(Pipe),
self._task_handler = threading.Thread( target=Pool._handle_tasks, args=(self._taskqueue, self._quick_put, self._outqueue, self._pool) ) Pool._handle_tasks方法不断从task_queue中获取任务,并放入接受任务队列(in_queue),以此触发worker进程进行任务处理。当从task_queue读取到None元素时, 表示进程池将要被终止(terminate),不再处理之后的任务请求,同时向接受任务队列和结果任务队列put None元素,通知其他线程结束。
- _handle_results线程,负责将处理完的任务结果,从outqueue(Pipe)中读取出来,放在任务缓存cache中,
self._result_handler = threading.Thread( target=Pool._handle_results, args=(self._outqueue, self._quick_get, self._cache) )
-
_terminate,这里的_terminate并不是一个线程,而是一个Finalize对象
self._terminate = Finalize( self, self._terminate_pool, args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, self._worker_handler, self._task_handler, self._result_handler, self._cache), exitpriority=15 ) Finalize类的构造函数与线程构造函数类似,_terminate_pool是它的回调函数,args回调函数的参数。 _terminate_pool函数负责终止进程池的工作:终止上述的三个线程,终止进程池中的worker进程,清除队列中的数据。 _terminate是个对象而非线程,那么它如何像线程调用start()方法一样,来执行回调函数_terminate_pool呢?查看Pool源码,发现进程池的终止函数: def terminate(self): debug('terminating pool') self._state = TERMINATE self._worker_handler._state = TERMINATE self._terminate() 函数中最后将_terminate对象当做一个方法来执行,而_terminate本身是一个Finalize对象,我们看一下Finalize类的定义,发现它实现了__call__方法: def __call__(self, wr=None): try: del _finalizer_registry[self._key] except KeyError: sub_debug('finalizer no longer registered') else: if self._pid != os.getpid(): res = None else: res = self._callback(*self._args, **self._kwargs) self._weakref = self._callback = self._args = self._kwargs = self._key = None return res 而方法中 self._callback(*self._args, **self._kwargs) 这条语句,就执行了_terminate_pool函数,进而将进程池终止。
进程池中的数据结构、各个线程之间的合作关系如下图所示:
【1】这里针对的是CPU密集型程序,多线程并不能带来效率上的提升,相反还可能会因为线程的频繁切换,导致效率下降;如果是IO密集型,多线程进程可以利用IO阻塞等待时的空闲时间执行其他线程,提升效率。
未完待续……