zoukankan      html  css  js  c++  java
  • [源码分析] 分布式任务队列 Celery 多线程模型 之 子进程

    [源码分析] 分布式任务队列 Celery 多线程模型 之 子进程

    0x00 摘要

    Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。

    在前文中,我们介绍了Celery 多线程模型,但是我们略过了子进程这一个阶段,本文看看子进程如何继续启动。

    我们依然先要提出几个问题:

    • 在启动子进程之前,需要做哪些准备?
      • 如何知道子进程要运行什么命令?
      • 如何构建父子进程通讯机制?
      • 如何把父进程信息传递给子进程?
    • 目前,Celery 应用是在父进程中。
      • 子进程如何得到 Celery 应用?
      • 如何恢复 Celery 应用?
    • 父进程如何知道子进程已经ready,从而可以给子进程安排工作?
    • 子进程如何接受父进程安排的任务?

    为了便于大家理解,我们先给出本文一个最终关系图。

    0x01 前文回顾

    1.1 基类作用

    前文我们讲到,在 AsynPool 的基类 Pool(object) 之中,建立了各种 消息处理函数,并且建立了子进程

    代码位置在:billiard/pool.py

    具体代码如下,这里 _create_worker_process 就建立了子进程。

    class Pool(object):
        def __init__(self, processes=None, initializer=None, initargs=(),
                     maxtasksperchild=None, timeout=None, soft_timeout=None,
                     lost_worker_timeout=None,
                     max_restarts=None, max_restart_freq=1,
                     on_process_up=None,
                     on_process_down=None,
                     on_timeout_set=None,
                     on_timeout_cancel=None,
                     threads=True,
                     semaphore=None,
                     putlocks=False,
                     allow_restart=False,
                     synack=False,
                     on_process_exit=None,
                     context=None,
                     max_memory_per_child=None,
                     enable_timeouts=False,
                     **kwargs):
            for i in range(self._processes):
                self._create_worker_process(i)
    

    1.2 子进程抽象

    如下代码建立子进程抽象。

    for i in range(self._processes):
        self._create_worker_process(i)
    

    _create_worker_process 主要工作如下:

    • inq, outq, synq = self.get_process_queues() 拿到的是一个读和写的管道的抽象对象。这个管道是之前预先创建好的(就是上面 self.create_process_queues() 创建的)。主要是给即将 fork 的子进程用的,子进程会监听这管道数据结构抽象实例中的读事件,还可以从写管道写数据。

    • w,也就是 self.WorkerProcess 的实例,其实是对 fork 出来的子进程的一个抽象封装。用来方便快捷的管理子进程,抽象成一个进程池,这个 w 会记录 fork 出来的子进程的一些 meta 信息,比如 pid,管道的读写的 fd 等等,并注册在主进程中,主进程可以利用它进行任务分发

    • 把 WorkerProcess 的实例记录在 self._pool;

    • w.start() 中包含具体的 fork 过程;

    w.start() 中包含具体的 fork 过程。

    def _create_worker_process(self, i):
    
        w = self.WorkerProcess(self.Worker(
            inq, outq, synq, self._initializer, self._initargs,
            self._maxtasksperchild, sentinel, self._on_process_exit,
            # Need to handle all signals if using the ipc semaphore,
            # to make sure the semaphore is released.
            sigprotection=self.threads,
            wrap_exception=self._wrap_exception,
            max_memory_per_child=self._max_memory_per_child,
            on_ready_counter=on_ready_counter,
        ))
    
        w.start() # 到了这里
    
        return w
    

    1.3 Fork过程

    Fork 的具体代码如下:

    class BaseProcess(object):
        '''
        Process objects represent activity that is run in a separate process
    
        The class is analagous to `threading.Thread`
        '''
    
        def run(self):
            '''
            Method to be run in sub-process; can be overridden in sub-class
            '''
            if self._target:
                self._target(*self._args, **self._kwargs)
    
        def start(self):
            '''
            Start child process
            '''
            assert self._popen is None, 'cannot start a process twice'
            assert self._parent_pid == os.getpid(), 
                'can only start a process object created by current process'
            _cleanup()
            self._popen = self._Popen(self)
            self._sentinel = self._popen.sentinel
            _children.add(self)
    

    其中主要是 self._popen = self._Popen(self) 比较重要。

    代码位于:billiard/context.py。

    其中可以看到,因为操作系统的不同,具体使用也不同。

    下面为 *nix 系统的各种 类fork函数。

        class ForkProcess(process.BaseProcess):
            _start_method = 'fork'
    
            @staticmethod
            def _Popen(process_obj):
                from .popen_fork import Popen
                return Popen(process_obj)
    
        class SpawnProcess(process.BaseProcess):
            _start_method = 'spawn'
    
            @staticmethod
            def _Popen(process_obj):
                from .popen_spawn_posix import Popen
                return Popen(process_obj)
    
        class ForkServerProcess(process.BaseProcess):
            _start_method = 'forkserver'
    
            @staticmethod
            def _Popen(process_obj):
                from .popen_forkserver import Popen
                return Popen(process_obj)
    

    下面为 windows系统。

    class SpawnProcess(process.BaseProcess):
        _start_method = 'spawn'
    
        @staticmethod
        def _Popen(process_obj):
            from .popen_spawn_win32 import Popen
            return Popen(process_obj)
    

    于是我们就具体看看子进程如何进行处理。

    0x02 预先准备

    在子进程启动之前,Celery 会做很多准备,比如构建子进程运行的命令,设置管道,传递父进程信息等等。

    2.1 总体准备流程

    经过调试我们发现,无论是windows或者*nix系统,调试中都各有不便之处,所以我们下面以windows系统为例分析。

    注:
    有同学指出,Windows上 Celery 多线程出错。特此说明下。

    我的环境复杂,有mac, linux, windows,而且有的操作系统有多台,各种切换很无奈。

    当分析本文部分代码时候,手上只有Windows,所以就只能贴出来Windows下面的具体调试变量。

    其实具体OS不重要,重要的是通过代码来剖析Celery的设计思路。

    另外,mac调试Celery,也是各种容易出错,在分析代码这点上,和Windows相比没什么太大优势。

    前文因为是 *nix 系统,所以子进程抽象是 ForkProcess, 本文因为是 windows,替换为 SpawnProcess

    因为是windows 系统,所以我们调用到:

    class SpawnProcess(process.BaseProcess):
        _start_method = 'spawn'
    
        @staticmethod
        def _Popen(process_obj):
            from .popen_spawn_win32 import Popen
            return Popen(process_obj)
    

    因此使用 from .popen_spawn_win32 import Popen

    下面代码位于:billiard/popen_spawn_win32.py

    主要功能如下:

    • 首先调用 _winapi.CreatePipe(None, 0) 来得到 之前建立的 pipe 的读写管道;

    • 其次调用 get_command_line 来拼凑出子进程执行命令,注意这里传递的 pipe_handle 为 读管道,parent_pid 就为父进程的pid,子进程中,*nix 和 windows 分别依据 pipe_handle 和 parent_pid 得到读管道;

    • 然后打开读管道,这个很重要;

    • 再次调用 windows 系统方法 CreateProcess 来执行子进程;

    • 因为已经打开了读管道,所以通过 reduction.dump(prep_data, to_child) 把父进程的关键辅助信息传递给子进程,通过这些信息子进程才可以解读父进程信息

    • 通过 reduction.dump(process_obj, to_child) 把父进程信息传递给子进程,父进程信息就为 SpawnProcess

    • 在父进程中,通过 _winapi.CloseHandle(rhandle) 关闭父进程的读管道。这样父进程,子进程就通过子进程的读管道联系;

    具体如下:

    class Popen(object):
        '''
        Start a subprocess to run the code of a process object
        '''
        method = 'spawn'
        sentinel = None
    
        def __init__(self, process_obj):
            os.environ["MULTIPROCESSING_FORKING_DISABLE"] = "1"
            spawn._Django_old_layout_hack__save()
            prep_data = spawn.get_preparation_data(process_obj._name)
    
            # read end of pipe will be "stolen" by the child process
            # -- see spawn_main() in spawn.py.
            rhandle, whandle = _winapi.CreatePipe(None, 0)
            wfd = msvcrt.open_osfhandle(whandle, 0)
            cmd = spawn.get_command_line(parent_pid=os.getpid(),
                                         pipe_handle=rhandle)
            cmd = ' '.join('"%s"' % x for x in cmd)
    
            with io.open(wfd, 'wb', closefd=True) as to_child:
                # start process
                try:
                    hp, ht, pid, tid = CreateProcess(
                        spawn.get_executable(), cmd,
                        None, None, False, 0, None, None, None)
                    close_thread_handle(ht)
                except:
                    _winapi.CloseHandle(rhandle)
                    raise
    
                # set attributes of self
                self.pid = pid
                self.returncode = None
                self._handle = hp
                self.sentinel = int(hp)
    
                # send information to child
                context.set_spawning_popen(self)
                try:
                    reduction.dump(prep_data, to_child)
                    reduction.dump(process_obj, to_child)
                finally:
                    context.set_spawning_popen(None)
    

    我们下面具体看看这个准备流程中的几个重要点。

    2.2 获取命令

    首先,重要点是:调用 get_command_line 来拼凑出子进程执行命令

    代码位于:billiard/spawn.py。

    就是拼接出一个celery运行命令。

    def get_command_line(**kwds):
        '''
        Returns prefix of command line used for spawning a child process
        '''
        if getattr(sys, 'frozen', False):
            return ([sys.executable, '--billiard-fork'] +
                    ['%s=%r' % item for item in kwds.items()])
        else:
            prog = 'from billiard.spawn import spawn_main; spawn_main(%s)'
            prog %= ', '.join('%s=%r' % item for item in kwds.items())
            opts = util._args_from_interpreter_flags()
            return [_python_exe] + opts + ['-c', prog, '--billiard-fork']
    

    命令行结果就是类似于:

    python -c 'from billiard.spawn import spawn_main; spawn_main(....)' -billiard+fork ..

    2.3 调用 windows 系统方法

    然后会调用windows 系统方法启动子进程。

    hp, ht, pid, tid = CreateProcess(
        spawn.get_executable(), cmd,
        None, None, False, 0, None, None, None)
    

    因此,逻辑如下:

                     +-----------------------------+
                     |        SpawnProcess         |
                     |                             |
                     |                             |
                     |               os.getpid() +-----------------+
                     |                             |               |
                     |                   rhandle +---------------+ |
                     |           Popen             |             | |
                     |             +     whandle   |             | |
                     |             |               |             | |
                     +-----------------------------+             | |
                                   |                             | |
                                   |                             | |
                                   |  get_command_line           | |
                                   |                             | |                       .
                                   |                             | |
                                   v                             | |
                                                                 v v
    python -c 'from billiard.spawn import spawn_main; spawn_main(....)' --billiard+fork ..
                                   +
                                   |
                                   |
                                   |
                                   |  CreateProcess
                                   |
                                   |
                                   |
                                   v
                          +--------+--------+
                          | windows kernel  |
                          +-----------------+
    
    

    2.4 传递父进程信息

    因为已经打开了读管道,所以通过 reduction.dump(prep_data, to_child) 把父进程的关键辅助信息传递给子进程,通过这些信息才可以解读父进程信息。这里父进程信息 obj 就为 SpawnProcess 本身

    代码通过 picker 完成,具体如下:

    def dump(obj, file, protocol=None):
        '''Replacement for pickle.dump() using ForkingPickler.'''
        ForkingPickler(file, protocol).dump(obj)
    

    以及:

    if PY3:
        import copyreg
    
        class ForkingPickler(pickle.Pickler):
            '''Pickler subclass used by multiprocessing.'''
            _extra_reducers = {}
            _copyreg_dispatch_table = copyreg.dispatch_table
    
            def __init__(self, *args):
                super(ForkingPickler, self).__init__(*args)
                self.dispatch_table = self._copyreg_dispatch_table.copy()
                self.dispatch_table.update(self._extra_reducers)
    
            @classmethod
            def register(cls, type, reduce):
                '''Register a reduce function for a type.'''
                cls._extra_reducers[type] = reduce
    
            @classmethod
            def dumps(cls, obj, protocol=None):
                buf = io.BytesIO()
                cls(buf, protocol).dump(obj)
                return buf.getbuffer()
    
            @classmethod
            def loadbuf(cls, buf, protocol=None):
                return cls.loads(buf.getbuffer())
    
            loads = pickle.loads
    
    else:
    
        class ForkingPickler(pickle.Pickler):  # noqa
            '''Pickler subclass used by multiprocessing.'''
            dispatch = pickle.Pickler.dispatch.copy()
    
            @classmethod
            def register(cls, type, reduce):
                '''Register a reduce function for a type.'''
                def dispatcher(self, obj):
                    rv = reduce(obj)
                    self.save_reduce(obj=obj, *rv)
                cls.dispatch[type] = dispatcher
    
            @classmethod
            def dumps(cls, obj, protocol=None):
                buf = io.BytesIO()
                cls(buf, protocol).dump(obj)
                return buf.getvalue()
    
            @classmethod
            def loadbuf(cls, buf, protocol=None):
                return cls.loads(buf.getvalue())
    
            @classmethod
            def loads(cls, buf, loads=pickle.loads):
                if isinstance(buf, io.BytesIO):
                    buf = buf.getvalue()
                return loads(buf)
    

    至此,准备工作完毕,将会进入到子进程。

    0x03 子进程启动

    既然已经通知了 windows,所以 windows 就进行系统调用。

    3.1 从命令行进入

    既然前面的命令行结果中明确提到了spawn_main:

    python -c 'from billiard.spawn import spawn_main; spawn_main(....)' -billiard+fork ..

    于是子进程从spawn_main启动

    代码位于:billiard/spawn.py

    def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
        '''
        Run code specified by data received over pipe
        '''
        assert is_forking(sys.argv)
        if sys.platform == 'win32':
            import msvcrt
            from .reduction import steal_handle
            new_handle = steal_handle(parent_pid, pipe_handle)
            fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
        else:
            from . import semaphore_tracker
            semaphore_tracker._semaphore_tracker._fd = tracker_fd
            fd = pipe_handle
        exitcode = _main(fd) # 将会调用到这里。
        sys.exit(exitcode)
    

    注意:

    这里的 pipe_handle 就为 传递进来的读管道。parent_pid为父进程ID。子进程中,*nix 和 windows 分别依据 pipe_handle 和 parent_pid 得到读管道。

    此时逻辑为:

                                                                                              +
                                                                    parent process            |                child process
                     +-----------------------------+                                          |
                     |        SpawnProcess         |                                          |
                     |                             |                                          |
                     |                os.getpid()+-----------------+                          |
                     |                             |               |                          |
                     |                   rhandle +---------------+ |                          |                          +---------------+
                     |           Popen             |             | |                          |                          |  spawn_main   |
                     |             +     whandle   |             | |                          |           parent_pid     |               |
                     |             |               |             | |                          |                          |               |
                     +---+-------------------------+             | |                          |       +--------------->  |               |
                         |         |                             | |                          |       |                  |          fd   |
                         |         |                             | |                          |       |   +----------->  |           ^   |
                         |         |  get_command_line           | |                          |       |   | pipe_handle  |           |   |
                         |         |                             | |                          |       |   |              +---------------+
                         |         |                             | |                          |       |   |                          |
                         |         v                             | |                          |       |   |                     ^    |
                         |                                       v v                          |       |   |                     |    |
    python -c 'from billi|rd.spawn import spawn_main; spawn_main(....)' --billiard-fork ...   |       |   |                     |    |
                         |         +                              + +                         |       |   |                     |    |
                         |         |                              | |                         |       |   |                     |    |
                         |         |                              | |                         |       |   |                     |    |
                         |         |                              | |                         |       |   |                     |    |
                         |         |  CreateProcess               | |                         |       |   |                     |    |
                         |         |                              | +---------------------------------+   |                     |    |
                         |         |                              +---------------------------------------+                     |    |
                         |         |                                                                                            |    |
                         |         |                 1                               +-----------------+              2         |    |
                         |         +---------------------------------------------->  | windows kernel  |  +---------------------+    |
                         |                                                           +-----------------+                             |
                         |                                                                                                           |
                         |                                                                                                           |
                         +-----------------------------------------------------------------------------------------------------------+
                                       3  reduction.dump(process_obj, to_child)
    
    

    手机如下:

    因此,程序进行调用 _main。

    3.2 _main 读取父进程关键信息

    前面提到,父进程会写入关键信息。所以子进程这里打开了读管道,读取父进程的关键信息,这里父进程信息 就为 SpawnProcess 本身,因此子进程可以操作 SpawnProcess

    def _main(fd):
        _Django_old_layout_hack__load()
        with io.open(fd, 'rb', closefd=True) as from_parent:
            process.current_process()._inheriting = True
            try:
                preparation_data = pickle.load(from_parent)
                prepare(preparation_data)
                _setup_logging_in_child_hack()
                self = pickle.load(from_parent) #读取父进程的关键信息,就为SpawnProcess
            finally:
                del process.current_process()._inheriting
        return self._bootstrap()
    

    逻辑如下:

                                                                                              +
                                                                    parent process            |                child process
                     +-----------------------------+                                          |
                     |        SpawnProcess         |                                          |
                     |                             |                                          |
                     |                os.getpid()+-----------------+                          |
                     |                             |               |                          |
                     |                   rhandle +---------------+ |                          |                          +---------------+
                     |           Popen             |             | |                          |                          |  spawn_main   |
                     |             +     whandle   |             | |                          |           parent_pid     |               |  4    +-------------+
                     |             |               |             | |                          |                          |        self+--------> |SpawnProcess |
                     +---+-------------------------+             | |                          |       +--------------->  |               |       +------+------+
                         |         |                             | |                          |       |                  |          fd   |              |
                         |         |                             | |                          |       |   +----------->  |           ^   |              |
                         |         |  get_command_line           | |                          |       |   | pipe_handle  |           |   |           5  | _bootstrap()
                         |         |                             | |                          |       |   |              +---------------+              |
                         |         |                             | |                          |       |   |                          |                  v
                         |         v                             | |                          |       |   |                     ^    |
                         |                                       v v                          |       |   |                     |    |
    python -c 'from billi|rd.spawn import spawn_main; spawn_main(....)' --billiard-fork ...   |       |   |                     |    |
                         |         +                              + +                         |       |   |                     |    |
                         |         |                              | |                         |       |   |                     |    |
                         |         |                              | |                         |       |   |                     |    |
                         |         |                              | |                         |       |   |                     |    |
                         |         |  CreateProcess               | |                         |       |   |                     |    |
                         |         |                              | +---------------------------------+   |                     |    |
                         |         |                              +---------------------------------------+                     |    |
                         |         |                                                                                            |    |
                         |         |                 1                               +-----------------+              2         |    |
                         |         +---------------------------------------------->  | windows kernel  |  +---------------------+    |
                         |                                                           +-----------------+                             |
                         |                                                                                                           |
                         |                                                                                                           |
                         +-----------------------------------------------------------------------------------------------------------+
                                       3  reduction.dump(process_obj, to_child)
    
    

    手机如下:

    3.3 SpawnProcess 启动

    既然子进程已经知道了SpawnProcess,因此调用到了 SpawnProcess 的基类。

    代码位于:billiard/process.py。

    class BaseProcess(object):
        '''
        Process objects represent activity that is run in a separate process
    
        The class is analagous to `threading.Thread`
        '''
    

    3.3.1 _bootstrap 配置必要信息

    基类 billiard/process.py 之中,会通过 _bootstrap 来 配置必要信息,比如 stdin ,然后调用 run。

    def _bootstrap(self):
        from . import util, context
        global _current_process, _process_counter, _children
    
        try:
            # 设置 stdin等等
            if self._start_method is not None:
                context._force_start_method(self._start_method)
            _process_counter = itertools.count(1)
            _children = set()
            if sys.stdin is not None:
                try:
                    sys.stdin.close()
                    sys.stdin = open(os.devnull)
                except (EnvironmentError, OSError, ValueError):
                    pass
            old_process = _current_process
            _set_current_process(self)
    
            # 设置 logger等等
            loggerDict = logging.Logger.manager.loggerDict
            logger_names = list(loggerDict.keys())
            logger_names.append(None)  # for root logger
            for name in logger_names:
                if not name or not isinstance(loggerDict[name],
                                              logging.PlaceHolder):
                    for handler in logging.getLogger(name).handlers:
                        handler.createLock()
            logging._lock = threading.RLock()
    
            try:
                util._finalizer_registry.clear()
                util._run_after_forkers()
            finally:
                # delay finalization of the old process object until after
                # _run_after_forkers() is executed
                del old_process
            util.info('child process %s calling self.run()', self.pid)
            try:
                self.run() # 运行到这里
                exitcode = 0
            finally:
                util._exit_function()
    
        return exitcode
    

    3.3.2 启动服务 Worker

    SpawnProcess 继续调用 run。

    def run(self):
        '''
        Method to be run in sub-process; can be overridden in sub-class
        '''
        if self._target:
            self._target(*self._args, **self._kwargs)
    

    由前文可知道,

     _target = {Worker} <celery.concurrency.asynpool.Worker object at 0x7f9ad358b240>
    

    因此来到了 celery.concurrency.asynpool.Worker ,这就是子进程工作循环

    如下:

                                                                                              +
                                                                    parent process            |                child process
                     +-----------------------------+                                          |
                     |        SpawnProcess         |                                          |
                     |                             |                                          |
                     |                os.getpid()+-----------------+                          |
                     |                             |               |                          |
                     |                   rhandle +---------------+ |                          |                          +---------------+
                     |           Popen             |             | |                          |                          |  spawn_main   |
                     |             +     whandle   |             | |                          |           parent_pid     |               |  4    +-------------+
                     |             |               |             | |                          |                          |        self+--------> |SpawnProcess |
                     +---+-------------------------+             | |                          |       +--------------->  |               |       +------+------+
                         |         |                             | |                          |       |                  |          fd   |              |
                         |         |                             | |                          |       |   +----------->  |           ^   |              |
                         |         |  get_command_line           | |                          |       |   | pipe_handle  |           |   |           5  | _bootstrap()
                         |         |                             | |                          |       |   |              +---------------+              |
                         |         |                             | |                          |       |   |                          |                  v
                         |         v                             | |                          |       |   |                     ^    |
                         |                                       v v                          |       |   |                     |    |             +----------+
    python -c 'from billi|rd.spawn import spawn_main; spawn_main(....)' --billiard-fork ...   |       |   |                     |    |             | Worker   |
                         |         +                              + +                         |       |   |                     |    |             |          |
                         |         |                              | |                         |       |   |                     |    |             +-----+----+
                         |         |                              | |                         |       |   |                     |    |                   |
                         |         |                              | |                         |       |   |                     |    |                   |
                         |         |  CreateProcess               | |                         |       |   |                     |    |                   |
                         |         |                              | +---------------------------------+   |                     |    |                   |
                         |         |                              +---------------------------------------+                     |    |                   |
                         |         |                                                                                            |    |                   v
                         |         |                 1                               +-----------------+              2         |    |
                         |         +---------------------------------------------->  | windows kernel  |  +---------------------+    |
                         |                                                           +-----------------+                             |
                         |                                                                                                           |
                         |                                                                                                           |
                         +-----------------------------------------------------------------------------------------------------------+
                                       3  reduction.dump(process_obj, to_child)
    
    

    手机如下:

    3.4 Worker 服务

    代码位于 :celery/billiard/pool.py

    进入 Worker 之后,就来到了 __call__,主要功能如下:

                                                                                              +
                                                                    parent process            |                child process
                     +-----------------------------+                                          |
                     |        SpawnProcess         |                                          |
                     |                             |                                          |
                     |                os.getpid()+-----------------+                          |
                     |                             |               |                          |
                     |                   rhandle +---------------+ |                          |                          +---------------+
                     |           Popen             |             | |                          |                          |  spawn_main   |
                     |             +     whandle   |             | |                          |           parent_pid     |               |  4    +-------------+
                     |             |               |             | |                          |                          |        self+--------> |SpawnProcess |
                     +---+-------------------------+             | |                          |       +--------------->  |               |       +------+------+
                         |         |                             | |                          |       |                  |          fd   |              |
                         |         |                             | |                          |       |   +----------->  |           ^   |              |
                         |         |  get_command_line           | |                          |       |   | pipe_handle  |           |   |           5  | _bootstrap()
                         |         |                             | |                          |       |   |              +---------------+              |
                         |         |                             | |                          |       |   |                          |                  v
                         |         v                             | |                          |       |   |                     ^    |
                         |                                       v v                          |       |   |                     |    |             +----------+
    python -c 'from billi|rd.spawn import spawn_main; spawn_main(....)' --billiard-fork ...   |       |   |                     |    |             | Worker   |
                         |         +                              + +                         |       |   |                     |    |             |          |
                         |         |                              | |                         |       |   |                     |    |             +-----+----+
                         |         |                              | |                         |       |   |                     |    |                   |
                         |         |                              | |                         |       |   |                     |    |                   |
                         |         |  CreateProcess               | |                         |       |   |                     |    |                   |
                         |         |                              | +---------------------------------+   |                     |    |                   |
                         |         |                              +---------------------------------------+                     |    |                   |
                         |         |                                                                                            |    |                   v
                         |         |                 1                               +-----------------+              2         |    |
                         |         +---------------------------------------------->  | windows kernel  |  +---------------------+    |               __call__
                         |                                                           +-----------------+                             |
                         |                                                                                                           |
                         |                                                                                                           |
                         +-----------------------------------------------------------------------------------------------------------+
                                       3  reduction.dump(process_obj, to_child)
                                                                                              +
                                                                                              |
                                                                                              |
                                                                                              |
                                                                                              +
    
    

    手机如下:

    __call__,主要功能如下:

    • 使用 _make_child_methods 配置 监听 任务 和 同步的方法;

    • 使用 after_fork 来恢复应用信息;

    • 使用 on_loop_start 来发送一个 WORKER_UP,以此通知父进程;

    • 使用 sys.exit(self.workloop(pid=pid)) 正式进入循环;

    class Worker(object):
    
        def __call__(self):
            _exit = sys.exit
            _exitcode = [None]
    
            def exit(status=None):
                _exitcode[0] = status
                return _exit(status)
            sys.exit = exit
    
            pid = os.getpid()
    
            self._make_child_methods()
            self.after_fork()
            self.on_loop_start(pid=pid)  # callback on loop start
            try:
                sys.exit(self.workloop(pid=pid))
            except Exception as exc:
                error('Pool process %r error: %r', self, exc, exc_info=1)
                self._do_exit(pid, _exitcode[0], exc)
            finally:
                self._do_exit(pid, _exitcode[0], None)
    

    我们下面详细分析。

    3.4.1 配置 监听 任务 和 同步的方法

    子进程 使用 _make_child_methods 配置 监听 任务 和 同步的方法

    def _make_child_methods(self, loads=pickle_loads):
        self.wait_for_job = self._make_protected_receive(self.inq)
        self.wait_for_syn = (self._make_protected_receive(self.synq)
                             if self.synq else None)
    

    3.4.2 配置应用相关信息

    于是我们遇到一个问题:Celery 应用是在父进程中,子进程如何得到

    虽然在一些多进程机制中,父进程的变量是会复制到子进程中,但是这并不是一定的,所以必然有一个父进程把 Celery 应用 设置给子进程的机制。

    所以,我们需要梳理父进程是如何给子进程配置 Celery应用,以及子进程如何得到这个应用的

    3.4.2.1 应用信息来源

    之前在父进程中,当启动进程池时候, class Pool(object): 对应配置如下(路径在 :billiard/pool.py):

    需要注意的是:

    • 这里是回到父进程来探讨;
    • 参数 initializer 就是 Celery 变量本身

    代码为:

    class Pool(object):
        '''
        Class which supports an async version of applying functions to arguments.
        '''
        _wrap_exception = True
        Worker = Worker
        Supervisor = Supervisor
        TaskHandler = TaskHandler
        TimeoutHandler = TimeoutHandler
        ResultHandler = ResultHandler
        SoftTimeLimitExceeded = SoftTimeLimitExceeded
    
        def __init__(self, processes=None, initializer=None, initargs=(),
    								 ......
                     **kwargs):
            self._ctx = context or get_context()
            self.synack = synack
            self._setup_queues()
            self._taskqueue = Queue()
            self._cache = {}
            self._state = RUN
            self.timeout = timeout
            self.soft_timeout = soft_timeout
            self._maxtasksperchild = maxtasksperchild
            self._max_memory_per_child = max_memory_per_child
            self._initializer = initializer
            self._initargs = initargs
    

    于是 Pool 类的相关变量为如下,这里的 Celery myTest 就是 Celery 应用本身

    self._initializer = {function} <function process_initializer at 0x7f90c9387488>
    self._initargs = {tuple: 2} (<Celery myTest at 0x7f90c8812f98>, 'celery')
    self = {AsynPool} <celery.concurrency.asynpool.AsynPool object at 0x7f90c97379b0>
    

    从而父进程中 class Worker(object): 配置如下,可以看到设置了 initializer:

    class Worker(object):
    
        def __init__(self, inq, outq, synq=None, initializer=None, initargs=(),
                     maxtasks=None, sentinel=None, on_exit=None,
                     sigprotection=True, wrap_exception=True,
                     max_memory_per_child=None, on_ready_counter=None):
            assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
            self.initializer = initializer
            self.initargs = initargs
    
    3.4.2.2 调用恢复

    前面提到,在子进程启动之后,会调用 after_fork 来进行恢复应用。

    process_initializer, prefork.py:44
    after_fork, pool.py:421
    __call__, pool.py:289
    run, process.py:114
    _bootstrap, process.py:327
    _main, spawn.py:210
    spawn_main, spawn.py:165
    <frame not available>
    

    具体看看,发现 after_fork 通过 self.initializer(*self.initargs) 恢复应用信息

    def after_fork(self):
        if hasattr(self.inq, '_writer'):
            self.inq._writer.close()
        if hasattr(self.outq, '_reader'):
            self.outq._reader.close()
    
        if self.initializer is not None:
            self.initializer(*self.initargs)
    
        # Make sure all exiting signals call finally: blocks.
        # This is important for the semaphore to be released.
        reset_signals(full=self.sigprotection)
    
        # install signal handler for soft timeouts.
        if SIG_SOFT_TIMEOUT is not None:
            signal.signal(SIG_SOFT_TIMEOUT, soft_timeout_sighandler)
    
        try:
            signal.signal(signal.SIGINT, signal.SIG_IGN)
        except AttributeError:
            pass
    
    3.4.2.3 恢复应用信息

    具体恢复方法在 process_initializer。

    代码位置为 :celery/concurrency/prefork.py

    这里重要的是 app.set_current()就是把 传入的 Celery 配置到 子进程本身之中

    具体代码为:

    def process_initializer(app, hostname):
        """Pool child process initializer.
    
        Initialize the child pool process to ensure the correct
        app instance is used and things like logging works.
        """
        _set_task_join_will_block(True)
        platforms.signals.reset(*WORKER_SIGRESET)
        platforms.signals.ignore(*WORKER_SIGIGNORE)
        platforms.set_mp_process_title('celeryd', hostname=hostname)
        # This is for Windows and other platforms not supporting
        # fork().  Note that init_worker makes sure it's only
        # run once per process.
        app.loader.init_worker()
        app.loader.init_worker_process()
    
        if os.environ.get('FORKED_BY_MULTIPROCESSING'):
            # pool did execv after fork
            trace.setup_worker_optimizations(app, hostname)
        else:
            app.set_current() # 这里进行配置
            set_default_app(app)
            app.finalize()
            trace._tasks = app._tasks  # enables fast_trace_task optimization.
        # rebuild execution handler for all tasks.
        from celery.app.trace import build_tracer
        for name, task in app.tasks.items():
            task.__trace__ = build_tracer(name, task, app.loader, hostname,
                                          app=app)
        from celery.worker import state as worker_state
        worker_state.reset_state()
        signals.worker_process_init.send(sender=None)
    
    配置 Celery 自己

    子进程中,具体配置代码位于:celery/app/base.py,我们可以看到 TLS 相关信息。

    def set_current(self):
        """Make this the current app for this thread."""
        _set_current_app(self)
        
    def _set_current_app(app):
        _tls.current_app = app
    
    def _get_current_app():
        if default_app is None:
            #: creates the global fallback app instance.
            from celery.app.base import Celery
            set_default_app(Celery(
                'default', fixups=[], set_as_current=False,
                loader=os.environ.get('CELERY_LOADER') or 'default',
            ))
        return _tls.current_app or default_app
    
    TLS

    TLS 定义位于:celery/_state.py

    就是各个进程或者线程独立的变量,区别取决于不同实现方式。

    class _TLS(threading.local):
        #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
        #: sets this, so it will always contain the last instantiated app,
        #: and is the default app returned by :func:`app_or_default`.
        current_app = None
    
    
    _tls = _TLS()
    
    后续使用

    这样后续的使用就可以使用 get_current_app 提出来 Celery 本身,获取应用信息。

    具体后续是在 celery/_state.py 做了进一步封装,并且使用,如何使用,我们下文讲解。

    if os.environ.get('C_STRICT_APP'):  # pragma: no cover
        def get_current_app():
            """Return the current app."""
            raise RuntimeError('USES CURRENT APP')
    elif os.environ.get('C_WARN_APP'):  # pragma: no cover
        def get_current_app():  # noqa
            import traceback
            print('-- USES CURRENT_APP', file=sys.stderr)  # noqa+
            traceback.print_stack(file=sys.stderr)
            return _get_current_app()
    

    3.4.3 通知父进程

    子进程启动最后,会使用 on_loop_start 来发送一个 WORKER_UP,可以看到是通过管道进行交互。

    于是在父进程 ResultHandler . on_process_alive 会响应。

    class Worker(_pool.Worker):
        """Pool worker process."""
    
        def on_loop_start(self, pid):
            # our version sends a WORKER_UP message when the process is ready
            # to accept work, this will tell the parent that the inqueue fd
            # is writable.
            self.outq.put((WORKER_UP, (pid,)))
    

    父进程启动时候,会设置一个消息响应 函数,这样父进程就知道子进程已经ready,可以给子进程安排工作

    class ResultHandler(_pool.ResultHandler):
        """Handles messages from the pool processes."""
    
        def __init__(self, *args, **kwargs):
            self.fileno_to_outq = kwargs.pop('fileno_to_outq')
            self.on_process_alive = kwargs.pop('on_process_alive')
            super().__init__(*args, **kwargs)
            # add our custom message handler
            self.state_handlers[WORKER_UP] = self.on_process_alive
    

    3.4.4 正式进入业务逻辑

    子进程使用 sys.exit(self.workloop(pid=pid)) 正式进入循环;

    代码位置:billiard/pool.py

    可以看到,使用 req = wait_for_job() 来监听任务信息,然后运行

    具体堆栈为:

    workloop, pool.py:351
    __call__, pool.py:292
    run, process.py:114
    _bootstrap, process.py:327
    _main, spawn.py:210
    spawn_main, spawn.py:165
    <frame not available>
    

    具体代码逻辑如下:

    def workloop(self, debug=debug, now=monotonic, pid=None):
        pid = pid or os.getpid()
        put = self.outq.put
        inqW_fd = self.inqW_fd
        synqW_fd = self.synqW_fd
        maxtasks = self.maxtasks
        max_memory_per_child = self.max_memory_per_child or 0
        prepare_result = self.prepare_result
    
        wait_for_job = self.wait_for_job
        _wait_for_syn = self.wait_for_syn
    
        def wait_for_syn(jid):
            i = 0
            while 1:
                if i > 60:
                    error('!!!WAIT FOR ACK TIMEOUT: job:%r fd:%r!!!',
                          jid, self.synq._reader.fileno(), exc_info=1)
                req = _wait_for_syn()
                if req:
                    type_, args = req
                    if type_ == NACK:
                        return False
                    assert type_ == ACK
                    return True
                i += 1
    
        completed = 0
        try:
            while maxtasks is None or (maxtasks and completed < maxtasks):
                req = wait_for_job()
                if req:
                    type_, args_ = req
                    assert type_ == TASK
                    job, i, fun, args, kwargs = args_
                    put((ACK, (job, i, now(), pid, synqW_fd)))
                    if _wait_for_syn:
                        confirm = wait_for_syn(job)
                        if not confirm:
                            continue  # received NACK
                    try:
                        result = (True, prepare_result(fun(*args, **kwargs)))
                    except Exception:
                        result = (False, ExceptionInfo())
                    try:
                        put((READY, (job, i, result, inqW_fd)))
                    except Exception as exc:
                        _, _, tb = sys.exc_info()
                        try:
                            wrapped = MaybeEncodingError(exc, result[1])
                            einfo = ExceptionInfo((
                                MaybeEncodingError, wrapped, tb,
                            ))
                            put((READY, (job, i, (False, einfo), inqW_fd)))
                        finally:
                            del(tb)
                    completed += 1
                    if max_memory_per_child > 0:
                        used_kb = mem_rss()
                        if used_kb <= 0:
                            error('worker unable to determine memory usage')
                        if used_kb > 0 and used_kb > max_memory_per_child:
                            warning(MAXMEM_USED_FMT.format(
                                used_kb, max_memory_per_child))
                            return EX_RECYCLE
    
            if maxtasks:
                return EX_RECYCLE if completed == maxtasks else EX_FAILURE
            return EX_OK
        finally:
            # Before exiting the worker, we want to ensure that that all
            # messages produced by the worker have been consumed by the main
            # process. This prevents the worker being terminated prematurely
            # and messages being lost.
            self._ensure_messages_consumed(completed=completed)
    

    逻辑如下:

                                                                                              +
                                                                    parent process            |                child process
                     +-----------------------------+                                          |
                     |        SpawnProcess         |                                          |                                                                     +-----------+
                     |                             |                                          |                                                                     |  Celery   |
                     |                os.getpid()+-----------------+                          |                                                                     |           |
                     |                             |               |                          |                                                                     +-----------+
                     |                   rhandle +---------------+ |                          |                          +---------------+
                     |           Popen             |             | |                          |                          |  spawn_main   |                                  ^
                     |             +     whandle   |             | |                          |           parent_pid     |               |  4    +-------------+            |
                     |             |               |             | |                          |                          |        self+--------> |SpawnProcess |            |
                     +---+-------------------------+             | |                          |       +--------------->  |               |       +------+------+            |
                         |         |                             | |                          |       |                  |          fd   |              |                   |
                         |         |                             | |                          |       |   +----------->  |           ^   |              |                   |
                         |         |  get_command_line           | |                          |       |   | pipe_handle  |           |   |           5  | _bootstrap()      |
                         |         |                             | |                          |       |   |              +---------------+              |                   |
                         |         |                             | |                          |       |   |                          |                  v         +---------+
                         |         v                             | |                          |       |   |                     ^    |                            |
                         |                                       v v                          |       |   |                     |    |      +---------------------------+
    python -c 'from billi|rd.spawn import spawn_main; spawn_main(....)' --billiard-fork ...   |       |   |                     |    |      |        Worker       |     |
                         |         +                              + +                         |       |   |                     |    |      |                     |     | <---------+
                         |         |                              | |                         |       |   |                     |    |      |                     +     |           |
                         |         |                              | |                         |       |   |                     |    |      |      _tls.current_app     |           |
                         |         |                              | |                         |       |   |                     |    |      |                           |           |
                         |         |  CreateProcess               | |                         |       |   |                     |    |      +------------+--------------+           |
                         |         |                              | +---------------------------------+   |                     |    |                   |                          |
                         |         |                              +---------------------------------------+                     |    |                   |                          |
                         |         |                                                                                            |    |                   |                          |
                         |         |                 1                               +-----------------+              2         |    |                   |                          |
                         |         +---------------------------------------------->  | windows kernel  |  +---------------------+    |                   |                          |
                         |                                                           +-----------------+                             |                   |                          |
                         |                                                                                                           |                   |                          |
                         |                                                                                                           |                   |                          |
                         +-----------------------------------------------------------------------------------------------------------+                   |                          |
                                       3  reduction.dump(process_obj, to_child)                                                                          |           6    __call__  |
                                                                                              +                                                          |                          |
                                                                                              |                                                          +------------------------->+
                                                                                              |
                                                                                              |
                                                                                              +
    
    

    手机如下:

    至此,子进程启动完毕,具体如何运行父进程传来的任务,我们下期进行介绍。

    0xFF 参考

    Celery 源码学习(二)多进程模型

  • 相关阅读:
    教你如何在 Visual Studio 2013 上使用 Github
    如果你也会C#,那不妨了解下F#(1):F# 数据类型
    博客园主题美化
    适配器模式 实战
    mysql in 超过1000 解决方案
    shell: 循环日期+1
    sqoop export 报错:Got exception running Sqoop: org.kitesdk.data.DatasetNotFoundException: Descriptor location does not exist:
    git 回滚到执行版本,并推送到远程分支
    hive表增加字段,并指定字段位置
    微信抢红包架构设计
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/14672168.html
Copyright © 2011-2022 走看看