zoukankan      html  css  js  c++  java
  • [源码解析] 并行分布式任务队列 Celery 之 Task是什么

    [源码解析] 并行分布式任务队列 Celery 之 Task是什么

    0x00 摘要

    Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。本文目的是看看 Celery 的 task 究竟是什么,以及 如果我们想从无到有实现一个 task 机制,有哪些地方需要注意,应该如何处理。

    因为 task 和 Consumer 消费密切相关,为了更好的说明,故本文与上文有部分重复,请谅解。

    0x01 思考出发点

    我们可以大致想想需要一些问题,也就是我们下面剖析的出发点和留意点。

    • task 究竟是什么?
    • task 有什么分类?
    • 有没有内置的 task?
    • task 如何注册到系统中?
    • 用户自定义的 task 如何注册到系统中?

    我们在下面会逐一回答这些问题。

    0x02 示例代码

    示例代码服务端如下,这里使用了装饰器来包装待执行任务。

    Task就是用户自定义的业务代码,这里的 task 就是一个加法功能。

    from celery import Celery
    
    app = Celery('myTest', broker='redis://localhost:6379')
    
    @app.task
    def add(x,y):
        print(x+y)
        return x+y
    
    if __name__ == '__main__':
        app.worker_main(argv=['worker'])
    
    

    发送代码如下:

    from myTest import add
    re = add.apply_async((2,17))
    

    0x03 任务是什么

    为了了解 task 是什么,我们首先打印出运行变量看看,这里选取了主要成员变量:

    self = {add} <@task: myTest.add of myTest at 0x7faf35f0a208>
     Request = {str} 'celery.worker.request:Request'
     Strategy = {str} 'celery.worker.strategy:default'
     app = {Celery} <Celery myTest at 0x7faf35f0a208>
     backend = {DisabledBackend} <celery.backends.base.DisabledBackend object at 0x7faf364aea20>
     from_config = {tuple: 9} (('serializer', 'task_serializer'), ('rate_limit', 'task_default_rate_limit'), ('priority', 'task_default_priority'), ('track_started', 'task_track_started'), ('acks_late', 'task_acks_late'), ('acks_on_failure_or_timeout', 'task_acks_on_failure_or_timeout'), ('reject_on_worker_lost', 'task_reject_on_worker_lost'), ('ignore_result', 'task_ignore_result'), ('store_errors_even_if_ignored', 'task_store_errors_even_if_ignored'))
     name = {str} 'myTest.add'
     priority = {NoneType} None
     request = {Context} <Context: {}>
     request_stack = {_LocalStack: 0} <celery.utils.threads._LocalStack object at 0x7faf36405e48>
     serializer = {str} 'json'
    

    可以看出来,'myTest.add' 是一个Task变量。

    于是我们需要看看Task 是什么。Task 的实现在 Celery 中你会发现有两处,

    • 一处位于 celery/app/task.py

    • 第二个位于 celery/task/base.py 中;

    他们之间是有关系的,你可以认为第一个是对外暴露的接口,而第二个是具体的实现。

    0x04 Celery应用与任务

    任务是 Celery 里不可缺少的一部分,它可以是任何可调用对象。每一个任务通过一个唯一的名称进行标识, worker 通过这个名称对任务进行检索。任务可以通过 app.task 装饰器进行注册,需要注意的一点是,当函数有多个装饰器时,为了保证 Celery 的正常运行,app.task 装饰器需要在最外层。

    Task 承载的功能就是在 Celery 应用中,启动对应的消息消费者。

    任务最基本的形式就是函数,任务发布最直接的想法就是client将要执行的相关函数代码打包,发布到broker。分布式计算框架 spark 就是使用这种方式(Spark的思想比较简单:挪计算不挪数据)。2.0之前的celery也支持这种任务发布的方式。

    这种方式显而易见的一个坏处是传递给broker的数据量可能会比较大。解决的办法也很容易想到,就是把要发布的任务相关的代码,提前告诉worker这就是 全局集合 和 注解注册的作用

    当采用 "提前告诉 worker 我们自定义的 task" 时候,定义 task 的方法如下:

    @app.task(name='hello_task')
    def hello():
      print('hello')
    

    其中的app是worker中的application,通过装饰器的方式,对任务函数注册。

    app会维护一个字典,key是任务的名字,也就是这里的hello_task,value是这个函数的内存地址。任务名必须唯一,但是任务名这个参数不是必须的,如果没有给这个参数,celery会自动根据包的路径和函数名生成一个任务名。

    通过上面这种方式,client发布任务只需要提供任务名以及相关参数,不必提供任务相关代码:

    # client端
    app.send_task('hello_task')
    

    这里需要注意:client发布任务后,任务会以一个消息的形式写入broker队列,带有任务名称等相关参数,等待worker获取。这里任务的发布,是完全独立于worker端的,即使worker没有启动,消息也会被写入队列。

    这种方式也有显而易见的坏处,所有要执行的任务代码都需要提前在worker端注册好,client端和worker端的耦合变强了。

    因此,我们需要从 Celery 应用启动时候开始看。

    4.1 全局回调集合 和 内置任务

    Celery 启动首先就是来到 celery/_state.py

    这里建立了一个 全局 set,用来收集所有的 任务 tasks

    #: Global set of functions to call whenever a new app is finalized.
    #: Shared tasks, and built-in tasks are created by adding callbacks here.
    _on_app_finalizers = set()
    

    在启动时候,系统通过调用如下函数来添加 任务。

    def connect_on_app_finalize(callback):
        """Connect callback to be called when any app is finalized."""
        _on_app_finalizers.add(callback)
        return callback
    

    首先,celery/app/builtins.py 就定义了很多内置任务,需要一一添加到全局回调集合中。

    @connect_on_app_finalize
    def add_map_task(app):
        from celery.canvas import signature
    
        @app.task(name='celery.map', shared=False, lazy=False)
        def xmap(task, it):
            task = signature(task, app=app).type
            return [task(item) for item in it]
        return xmap
    

    其次,系统流程会来到我们的自定义task,把这个 task 注册到全局回调集合中。

    即,可以这么理解:Celery 启动之后,会查找代码中,哪些类或者函数使用了 @task注解,然后就把这些 类或者函数注册到全局回调集合中

    @app.task
    def add(x,y):
        print(x+y)
        return x+y
    

    4.2 装饰器@app.task

    我们顺着 @app.task 来到了 Celery 应用本身。

    代码位于:celery/app/base.py。

    @app.task 的作用是返回 _create_task_cls 来构建一个task proxy,然后加入 应用待处理队列 pending,并且利用connect_on_app_finalize(cons) 加入全局回调集合

    _create_task_cls = {function} <function Celery.task.<locals>.inner_create_task_cls.<locals>._create_task_cls at 0x7ff1a7b118c8>
    

    具体代码如下:

    def task(self, *args, **opts):
        if USING_EXECV and opts.get('lazy', True):
            from . import shared_task
            return shared_task(*args, lazy=False, **opts)
    
        def inner_create_task_cls(shared=True, filter=None, lazy=True, **opts):
            _filt = filter
    
            def _create_task_cls(fun):
                if shared:
                    def cons(app):
                        return app._task_from_fun(fun, **opts)
                    cons.__name__ = fun.__name__
                    connect_on_app_finalize(cons)   # 这里是重点,加入全局回调集合
                if not lazy or self.finalized:
                    ret = self._task_from_fun(fun, **opts)
                else:
                    # return a proxy object that evaluates on first use
                    ret = PromiseProxy(self._task_from_fun, (fun,), opts,
                                       __doc__=fun.__doc__)
                    self._pending.append(ret) # 加入应用pending
                if _filt:
                    return _filt(ret)
                return ret
    
            return _create_task_cls
    
        if len(args) == 1:
            if callable(args[0]):
                return inner_create_task_cls(**opts)(*args)
    
        return inner_create_task_cls(**opts)
    

    4.2.1 建立 Proxy 实例

    按照示例中的调用,Celery 返回了Proxy的实例,传入参数就是task_by_cons。

    此时查看一下Proxy类的实现,该类位于celery/local.py中。

    class Proxy(object):
        """Proxy to another object."""
    
        # Code stolen from werkzeug.local.Proxy.
        __slots__ = ('__local', '__args', '__kwargs', '__dict__')
    
        def __init__(self, local,
                     args=None, kwargs=None, name=None, __doc__=None):
            object.__setattr__(self, '_Proxy__local', local)            # 将传入参数local设置到_Proxy__local属性中
            object.__setattr__(self, '_Proxy__args', args or ())        # 设置列表属性
            object.__setattr__(self, '_Proxy__kwargs', kwargs or {})    # 设置键值属性
            if name is not None:
                object.__setattr__(self, '__custom_name__', name)       
            if __doc__ is not None:
                object.__setattr__(self, '__doc__', __doc__)
        ...
        def _get_current_object(self):
            """Get current object.
    
            This is useful if you want the real
            object behind the proxy at a time for performance reasons or because
            you want to pass the object into a different context.
            """
            loc = object.__getattribute__(self, '_Proxy__local')        # 获取初始化传入的local
            if not hasattr(loc, '__release_local__'):                   # 如果没有__release_local__属性
                return loc(*self.__args, **self.__kwargs)               # 函数调用,将初始化的值传入调用该函数
            try:  # pragma: no cover
                # not sure what this is about
                return getattr(loc, self.__name__)                      # 获取当前__name__属性值
            except AttributeError:  # pragma: no cover
                raise RuntimeError('no object bound to {0.__name__}'.format(self))
        ...
        def __getattr__(self, name):
            if name == '__members__':
                return dir(self._get_current_object())
            return getattr(self._get_current_object(), name)            # 获取obj的属性
    
        def __setitem__(self, key, value):
            self._get_current_object()[key] = value                     # 设置key val
    
        def __delitem__(self, key):
            del self._get_current_object()[key]                         # 删除对应key
    
        def __setslice__(self, i, j, seq):
            self._get_current_object()[i:j] = seq                       # 列表操作
    
        def __delslice__(self, i, j):
            del self._get_current_object()[i:j]
    
        def __setattr__(self, name, value):
            setattr(self._get_current_object(), name, value)            # 设置属性
    
        def __delattr__(self, name):
            delattr(self._get_current_object(), name)                   # 删除对应属性
    
    

    我们只展示了部分属性,分析如上,主要是根据传入的是否local是否是函数,或者包含release_local来判断是否是调用函数,或是获取属性来处理

    4.2.2 添加待处理

    上面代码中,如下会把 task 添加到 Celery 应用的 pending queue。

    self._pending.append(ret)
    

    _pending定义如下,就是一个 deque:

    class Celery:
        """Celery application.
        """
    
        def __init__(self, main=None, loader=None, backend=None,
                     amqp=None, events=None, log=None, control=None,
                     set_as_current=True, tasks=None, broker=None, include=None,
                     changes=None, config_source=None, fixups=None, task_cls=None,
                     autofinalize=True, namespace=None, strict_typing=True,
                     **kwargs):
    
            self._pending = deque()
    

    此时全局集合如下:

    _on_app_finalizers = {set: 10} 
     {function} <function add_chunk_task at 0x7fc200a81400>
     {function} <function add_backend_cleanup_task at 0x7fc200a81048>
     {function} <function add_starmap_task at 0x7fc200a81488>
     {function} <function add_group_task at 0x7fc200a812f0>
     {function} <function add_map_task at 0x7fc200a81510>
     {function} <function Celery.task.<locals>.inner_create_task_cls.<locals>._create_task_cls.<locals>.cons at 0x7fc200af4510>
     {function} <function add_accumulate_task at 0x7fc200aa0158>
     {function} <function add_chain_task at 0x7fc200a81378>
     {function} <function add_unlock_chord_task at 0x7fc200a81598>
     {function} <function add_chord_task at 0x7fc200aa01e0>
    

    具体逻辑如图:

                               +------------------------------+
                               |  _on_app_finalizers = set()  |
                               |                              |
                               +--------------+---------------+
                                              |
                     connect_on_app_finalize  |
     +------------+                           |
     | builtins.py| +-----------------------> |
     +------------+                           |
                                              |
                     connect_on_app_finalize  |
    +-------------+                           |
    |User Function| +---------------------->  |
    +-------------+                           |
                                              v
    
                 +----------------------------------------------------------------------------------------------------+
                 |                                        _on_app_finalizers                                          |
                 |                                                                                                    |
                 |                                                                                                    |
                 |    ^function add_chunk_task>                                                                       |
                 |    <function add_backend_cleanup_task>                                                             |
                 |    <function add_starmap_task>                                                                     |
                 |    <function add_group_task>                                                                       |
                 |    <function add_map_task^                                                                         |
                 |    <function Celery.task.vlocals^.inner_create_task_cls.<locals>._create_task_cls.<locals>.cons>   |
                 |    <function add_accumulate_taskv                                                                  |
                 |    <function add_chain_task>                                                                       |
                 |    <function add_unlock_chord_task>                                                                |
                 |    vfunction add_chord_task>                                                                       |
                 |                                                                                                    |
                 +----------------------------------------------------------------------------------------------------+
    
    

    至此,得倒了一个 全局 set :_on_app_finalizers,用来收集所有的 任务 tasks。

    手机上如图:

    4.3 Celery Worker 启动

    目前 Celery 知道了有哪些 task,并且把它们收集起来,但是还不知道它们的逻辑意义。或者可以这么认为,Celery 只是知道有哪些类,但是没有这些类的实例。

    因为消费 task 是 Celery 的核心功能,所以我们不可避免的要再回顾下 Worker 的启动,但是这里我们注重 worker 之中 与 task 相关的部分。

    其实就是处理上面的 全局 set :_on_app_finalizers把这些暂时没有意义的 task 与 Celery 应用关联起来

    具体来说,就是:

    • 根据 task 的具体类生成 task 的实例;
    • 把这些具体task 实例与 Celery 联系起来,比如用 task 名字就可以找到具体实例
    • 配合实例的各种属性;

    4.3.1 Worker 示例

    这里的Worker 就是 Celery 用来消费的 worker 实例

    所以,我们直接来到 worker 看看。

    代码位于:celery/bin/worker.py

    @click.pass_context
    @handle_preload_options
    def worker(ctx, hostname=None, pool_cls=None, app=None, uid=None, gid=None,
               loglevel=None, logfile=None, pidfile=None, statedb=None,
               **kwargs):
        """Start worker instance."""
        app = ctx.obj.app
    
        worker = app.Worker(
            hostname=hostname, pool_cls=pool_cls, loglevel=loglevel,
            logfile=logfile,  # node format handled by celery.app.log.setup
            pidfile=node_format(pidfile, hostname),
            statedb=node_format(statedb, hostname),
            no_color=ctx.obj.no_color,
            **kwargs)  # 运行到这里
        
        worker.start()
        
        return worker.exitcode
    

    4.3.2 WorkController

    worker = app.Worker 之中,我们会发现,间接调用到了 WorkerController。

    代码运行到这里,位于:celery/worker/worker.py。

    这里做了一些初始化工作,我们继续探究。

    class WorkController:
        """Unmanaged worker instance."""
    
        def __init__(self, app=None, hostname=None, **kwargs):
            self.app = app or self.app
            self.hostname = default_nodename(hostname)
            self.startup_time = datetime.utcnow()
            self.app.loader.init_worker()
            self.on_before_init(**kwargs) # 运行到这里
    

    4.3.3 Worker(WorkController)

    代码运行到这里,位于:celery/apps/worker.py

    这里调用到了 trace.setup_worker_optimizations,这样马上就看到 task 了。

    class Worker(WorkController):
        """Worker as a program."""
    
        def on_before_init(self, quiet=False, **kwargs):
            self.quiet = quiet
            trace.setup_worker_optimizations(self.app, self.hostname)
    

    4.3.4 trace 进入任务联系

    代码运行到这里,位于:celery/app/trace.py。

    调用到 app.finalize(),目的是启动之前,搞定所有任务

    def setup_worker_optimizations(app, hostname=None):
        """Setup worker related optimizations."""
        global trace_task_ret
    
        hostname = hostname or gethostname()
    
        # make sure custom Task.__call__ methods that calls super
        # won't mess up the request/task stack.
        _install_stack_protection()
    
        app.set_default()
    
        # evaluate all task classes by finalizing the app.
        app.finalize()
    

    4.3.5 把任务和应用关联起来

    费了半天劲,我们才来到了关键逻辑。

    app.finalize() 会添加任务到 Celery 应用。

    即:之前系统把所有的task都收集起来了,得倒了一个全局 set :_on_app_finalizers。但是这个 set 中的task 目前没有逻辑意义,需要和 Celery 应用联系起来才行,所以这里就是要建立关联

    堆栈如下:

    _task_from_fun, base.py:450
    _create_task_cls, base.py:425
    add_chunk_task, builtins.py:128
    _announce_app_finalized, _state.py:52
    finalize, base.py:511
    setup_worker_optimizations, trace.py:643
    on_before_init, worker.py:90
    __init__, worker.py:95
    worker, worker.py:326
    caller, base.py:132
    new_func, decorators.py:21
    invoke, core.py:610
    invoke, core.py:1066
    invoke, core.py:1259
    main, core.py:782
    start, base.py:358
    worker_main, base.py:374
    

    代码如下:

    def finalize(self, auto=False):
        """Finalize the app.
    
        This loads built-in tasks, evaluates pending task decorators,
        reads configuration, etc.
        """
        with self._finalize_mutex:
            if not self.finalized:
                if auto and not self.autofinalize:
                    raise RuntimeError('Contract breach: app not finalized')
                self.finalized = True
                
                _announce_app_finalized(self) # 这里是关键,建立关联
    
                pending = self._pending
                while pending:
                    maybe_evaluate(pending.popleft()) 
    
                for task in self._tasks.values():
                    task.bind(self)
    
                self.on_after_finalize.send(sender=self)
    
    4.3.5.1 添加任务

    _announce_app_finalized(self) 函数是为了 : 把全局回调集合 _on_app_finalizers 中的回调函数运行,得到任务的实例,然后就把它们加入到 Celery 的任务列表,用户可以通过 task 名字得到对应的 task 实例

    def _announce_app_finalized(app):
        callbacks = set(_on_app_finalizers)
        for callback in callbacks:
            callback(app)
    

    对于我们的用户自定义任务,callback 就是 _create_task_cls,因此就是运行 _create_task_cls 进行添加。

    def inner_create_task_cls(shared=True, filter=None, lazy=True, **opts):
        _filt = filter
    
        def _create_task_cls(fun):
            if shared:
                def cons(app):
                    return app._task_from_fun(fun, **opts)
                
                cons.__name__ = fun.__name__
                connect_on_app_finalize(cons)
                
            if not lazy or self.finalized:
                ret = self._task_from_fun(fun, **opts) # 这里
    

    于是,在初始化过程中,为每个 app 添加该任务时,会调用到 app._task_from_fun(fun, **options)。

    _task_from_fun 之中,使用如下代码把任务添加到 celery 之中。这样就关联起来

    self._tasks[task.name] = task
    

    于是 self._tasks就为:

    _tasks = {TaskRegistry: 10} 
     NotRegistered = {type} <class 'celery.exceptions.NotRegistered'>
     'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest at 0x25da0ca0d88>
     'celery.chord' = {chord} <@task: celery.chord of myTest at 0x25da0ca0d88>
     'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest at 0x25da0ca0d88>
     'celery.chunks' = {chunks} <@task: celery.chunks of myTest at 0x25da0ca0d88>
     'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest at 0x25da0ca0d88>
     'celery.group' = {group} <@task: celery.group of myTest at 0x25da0ca0d88>
     'celery.map' = {xmap} <@task: celery.map of myTest at 0x25da0ca0d88>
     'myTest.add' = {add} <@task: myTest.add of myTest at 0x25da0ca0d88>
     'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest at 0x25da0ca0d88>
     'celery.chain' = {chain} <@task: celery.chain of myTest at 0x25da0ca0d88>
     __len__ = {int} 10
    

    具体代码如下:

    def _task_from_fun(self, fun, name=None, base=None, bind=False, **options):
        if not self.finalized and not self.autofinalize:
            raise RuntimeError('Contract breach: app not finalized')
        name = name or self.gen_task_name(fun.__name__, fun.__module__)
        base = base or self.Task
    
        if name not in self._tasks:
            run = fun if bind else staticmethod(fun)
            task = type(fun.__name__, (base,), dict({
                'app': self,
                'name': name,
                'run': run,
                '_decorated': True,
                '__doc__': fun.__doc__,
                '__module__': fun.__module__,
                '__annotations__': fun.__annotations__,
                '__header__': staticmethod(head_from_fun(fun, bound=bind)),
                '__wrapped__': run}, **options))()
    
            self._tasks[task.name] = task
            task.bind(self)  # connects task to this app
            add_autoretry_behaviour(task, **options)
        else:
            task = self._tasks[name]
        return task
    
    4.3.5.2 bind

    其中task在默认情况下是celery.app.task:Task,在动态生成该实例后,调用了task.bind(self)方法,这里就是设置 app 各种属性。

    @classmethod
    def bind(cls, app):
        was_bound, cls.__bound__ = cls.__bound__, True
        cls._app = app                                          # 设置类的_app属性
        conf = app.conf                                         # 获取app的配置信息
        cls._exec_options = None  # clear option cache
    
        if cls.typing is None:
            cls.typing = app.strict_typing
    
        for attr_name, config_name in cls.from_config:          # 设置类中的默认值
            if getattr(cls, attr_name, None) is None:           # 如果获取该属性为空
                setattr(cls, attr_name, conf[config_name])      # 使用app配置中的默认值
    
        # decorate with annotations from config.
        if not was_bound:
            cls.annotate()
    
            from celery.utils.threads import LocalStack
            cls.request_stack = LocalStack()                    # 使用线程栈保存数据
    
        # PeriodicTask uses this to add itself to the PeriodicTask schedule.
        cls.on_bound(app)
    
        return app
    
    
    
    4.3.5.3 处理 "待处理"

    运行回到 Celery,此时代码位于:celery/app/base.py

    变量如下:

    pending = {deque: 1} deque([<@task: myTest.add of myTest at 0x7fd907623550>])
    self = {Celery} <Celery myTest at 0x7fd907623550>
    

    从pending 中提取任务之后,会进行处理。前面我们提到,有一些 task 的待处理工作,就是在这里执行。

    代码位于:celery/local.py

    def __maybe_evaluate__(self):
        return self._get_current_object()
      
    def _get_current_object(self):
        try:
            return object.__getattribute__(self, '__thing') 
    

    此时self如下,就是任务本身:

    self = {add} <@task: myTest.add of myTest at 0x7fa09ee1e320>
    

    返回就是 myTest.add 任务本身。

    4.3.6 多进程 VS Task

    目前已经得到了所有的 task,并且每一个task都有自己的实例,可以进行调用。

    因为任务消费需要用到多进程,所以我们需要先大致看看多进程如何启动的

    让我们继续看看 Celery Worker 的启动。

    在 Celery Worker 启动过程中,会启动不同的bootsteps,在 Worker 启动过程中,对应的 steps 为:[<step: Hub>, <step: Pool>, <step: Consumer>]。

    start, bootsteps.py:116
    start, worker.py:204
    worker, worker.py:327
    caller, base.py:132
    new_func, decorators.py:21
    invoke, core.py:610
    invoke, core.py:1066
    invoke, core.py:1259
    main, core.py:782
    start, base.py:358
    worker_main, base.py:374
    

    代码位于:celery/bootsteps.py

    def start(self, parent):
        self.state = RUN
        if self.on_start:
            self.on_start()
        for i, step in enumerate(s for s in parent.steps if s is not None):
            self.started = i + 1
            step.start(parent)
    

    变量为:

    parent.steps = {list: 3} 
     0 = {Hub} <step: Hub>
     1 = {Pool} <step: Pool>
     2 = {Consumer} <step: Consumer>
     __len__ = {int} 3
    

    具体 任务处理的逻辑 启动 就在 Pool 之中。

    在 Pool(bootsteps.StartStopStep) 中,如下代码 w.process_task = w._process_task 给具体的 pool 配置了回调方法。 即 当 pool 接到通知,有运行机会时候,他知道用什么回调函数来获取/执行具体的task

    class Pool(bootsteps.StartStopStep):
        """Bootstep managing the worker pool.
    
        Describes how to initialize the worker pool, and starts and stops
        the pool during worker start-up/shutdown.
    
        Adds attributes:
    
            * autoscale
            * pool
            * max_concurrency
            * min_concurrency
        """
     
        def create(self, w):
    
            procs = w.min_concurrency
            
            w.process_task = w._process_task # 这里配置回调函数
    

    方法如下,可以预计,未来会通过 req.execute_using_pool(self.pool) 这里调用到 多进程

    def _process_task(self, req):
        """Process task by sending it to the pool of workers."""
    
            req.execute_using_pool(self.pool)
    
    

    此时 变量为:

    self = {Pool} <step: Pool>
    semaphore = {NoneType} None
    threaded = {bool} False
    w = {Worker} celery
    

    4.3.7 总结

    最后得到如下逻辑,这个TaskRegistry 在执行任务会用到

    self._tasks = {TaskRegistry: 10} 
     NotRegistered = {type} <class 'celery.exceptions.NotRegistered'>
     'celery.chunks' = {chunks} <@task: celery.chunks of myTest at 0x7fb652da5fd0>
     'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest at 0x7fb652da5fd0>
     'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest at 0x7fb652da5fd0>
     'celery.group' = {group} <@task: celery.group of myTest at 0x7fb652da5fd0>
     'celery.map' = {xmap} <@task: celery.map of myTest at 0x7fb652da5fd0>
     'celery.chain' = {chain} <@task: celery.chain of myTest at 0x7fb652da5fd0>
     'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest at 0x7fb652da5fd0>
     'celery.chord' = {chord} <@task: celery.chord of myTest at 0x7fb652da5fd0>
     'myTest.add' = {add} <@task: myTest.add of myTest at 0x7fb652da5fd0>
     'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest at 0x7fb652da5fd0>
     __len__ = {int} 10
    

    图例如下:

                               +------------------------------+
                               |  _on_app_finalizers = set()  |
                               |                              |
                               +--------------+---------------+
                                              |
                     connect_on_app_finalize  |
     +------------+                           |
     | builtins.py| +-----------------------> |
     +------------+                           |
                                              |
                     connect_on_app_finalize  |
    +-------------+                           |
    |User Function| +---------------------->  |
    +-------------+                           |
                                              v
    
                 +----------------------------------------------------------------------------------------------------+
                 |                                        _on_app_finalizers                                          |
                 |                                                                                                    |
                 |                                                                                                    |
                 |    ^function add_chunk_task>                                                                       |
                 |    <function add_backend_cleanup_task>                                                             |
                 |    <function add_starmap_task>                                                                     |
                 |    <function add_group_task>                                                                       |
                 |    <function add_map_task^                                                                         |
                 |    <function Celery.task.vlocals^.inner_create_task_cls.<locals>._create_task_cls.<locals>.cons>   |
                 |    <function add_accumulate_taskv                                                                  |
                 |    <function add_chain_task>                                                                       |
                 |    <function add_unlock_chord_task>                                                                |
                 |    vfunction add_chord_task>                                                                       |
                 |                                                                                                    |
                 +----------------------------+-----------------------------------------------------------------------+
                                              |
                                              |
                                              |                           +--------------------------------------------------------------------------------------------+
                                  finalize    v                           |                                                                                            |
                                                                          |                          TaskRegistry                                                      |
                               +---------------------------+              |                                                                                            |
                               |                           |              |                                                                                            |
                               |           Celery          |              |                                                                                            |
                               |                           |              |   NotRegistered = {type} <class 'celery.exceptions.NotRegistered'>                         |
        _process_task   <-------------------+  process_task|              |   'celery.chunks' = {chunks} <@task: celery.chunks of myTest>                              |
                               |                           |              |   'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest >  |
                               |                           |              |   'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest>            |
                               |                _tasks  +------------->   |   'celery.group' = {group} <@task: celery.group of myTest>                                 |
                               |                           |              |   'celery.map' = {xmap} <@task: celery.map of myTest>                                      |
                               |                           |              |   'celery.chain' = {chain} <@task: celery.chain of myTest>                                 |
                               +---------------------------+              |   'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest>                          |
                                                                          |   'celery.chord' = {chord} <@task: celery.chord of myTest>                                 |
                                                                          |   'myTest.add' = {add} <@task: myTest.add of myTest>                                       |
                                                                          |   'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest>                  |
                                                                          |                                                                                            |
                                                                          +--------------------------------------------------------------------------------------------+
    
    

    手机如下:

    或者我们调整 图结构,从另一个角度看看。

                +------------------------------+
                |  _on_app_finalizers = set()  |
                |                              |
                +--------------+---------------+
                               |
                               |
                               |       connect_on_app_finalize     +------------+
                               |   <----------------------------+  | builtins.py|
                               |                                   +------------+
                               |
                               |       connect_on_app_finalize
                               |                                  +-------------+
     +                         |   <---------------------------+  |User Function|
                               |                                  +-------------+
                               v
    
    +------------------------------------------------------------------------------------------------+
    |                                      _on_app_finalizers                                        |
    |                                                                                                |
    |                                                                                                |
    |  ^function add_chunk_task>                                                                     |
    |  <function add_backend_cleanup_task>                                                           |
    |  <function add_starmap_task>                                                                   |
    |  <function add_group_task>                                                                     |
    |  <function add_map_task^                                                                       |
    |  <function Celery.task.vlocals^.inner_create_task_cls.<locals>._create_task_cls.<locals>.cons> |
    |  <function add_accumulate_taskv                                                                |
    |  <function add_chain_task>                                                                     |
    |  <function add_unlock_chord_task>                                                              |
    |  vfunction add_chord_task>                                                                     |
    |                                                                                                |
    +--------------------------+---------------------------------------------------------------------+
                               |
                               |
                   finalize    |
                               |
                               |
                               v
                 +-------------+-------------+
                 |                           |
                 |           Celery          |
                 |                           |
                 |                 _tasks    |
                 |                    +      |
                 |                    |      |
                 +---------------------------+
                                      |
                                      |
                                      |
                                      v
    
      +--------------------------------------------------------------------------------------------+
      |                                                                                            |
      |                          TaskRegistry                                                      |
      |                                                                                            |
      |   NotRegistered = {type} <class 'celery.exceptions.NotRegistered'>                         |
      |   'celery.chunks' = {chunks} <@task: celery.chunks of myTest>                              |
      |   'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest >  |
      |   'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest>            |
      |   'celery.group' = {group} <@task: celery.group of myTest>                                 |
      |   'celery.map' = {xmap} <@task: celery.map of myTest>                                      |
      |   'celery.chain' = {chain} <@task: celery.chain of myTest>                                 |
      |   'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest>                          |
      |   'celery.chord' = {chord} <@task: celery.chord of myTest>                                 |
      |   'myTest.add' = {add} <@task: myTest.add of myTest>                                       |
      |   'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest>                  |
      |                                                                                            |
      +--------------------------------------------------------------------------------------------+
    
    

    手机如下:

    0x05 Task定义

    Task 定义的代码位于:celery/app/task.py。

    从其成员变量可以清楚的看到大致功能分类如下:

    基础信息,比如:

    • 对应的Celery应用;
    • task 名字;
    • 功能类信息;

    错误处理信息,比如:

    • 速率控制;
    • 最大重试次数;
    • 重试间隔时间;
    • 重试时候的错误处理;

    业务控制,比如:

    • 是否ack late;
    • ack错误处理;
    • 自动注册;
    • 后端存储信息;
    • worker 出错如何处理;

    任务控制,比如:

    • 请求stack;
    • 缺省request;
    • 优先级;
    • 失效时间;
    • 执行option;

    具体定义如下:

    @abstract.CallableTask.register
    class Task:
        __trace__ = None
        __v2_compat__ = False  # set by old base in celery.task.base
    
        MaxRetriesExceededError = MaxRetriesExceededError
        OperationalError = OperationalError
    
        Strategy = 'celery.worker.strategy:default'
        Request = 'celery.worker.request:Request'
    
        _app = None
        name = None
        typing = None
    
        max_retries = 3
        default_retry_delay = 3 * 60
    
        rate_limit = None
        ignore_result = None
    
        trail = True
        send_events = True
        store_errors_even_if_ignored = None
        serializer = None
        time_limit = None
        soft_time_limit = None
    
        backend = None
        autoregister = True
        track_started = None
        acks_late = None
        acks_on_failure_or_timeout = None
        reject_on_worker_lost = None
        throws = ()
    
        expires = None
        priority = None
        resultrepr_maxsize = 1024
        request_stack = None
        _default_request = None
        abstract = True
        _exec_options = None
        __bound__ = False
    
        from_config = (
            ('serializer', 'task_serializer'),
            ('rate_limit', 'task_default_rate_limit'),
            ('priority', 'task_default_priority'),
            ('track_started', 'task_track_started'),
            ('acks_late', 'task_acks_late'),
            ('acks_on_failure_or_timeout', 'task_acks_on_failure_or_timeout'),
            ('reject_on_worker_lost', 'task_reject_on_worker_lost'),
            ('ignore_result', 'task_ignore_result'),
            ('store_errors_even_if_ignored', 'task_store_errors_even_if_ignored'),
        )
        _backend = None  # set by backend property.
    
    

    0x06 consumer

    因为 task 是通过 Consumer 来调用,所以我们要看看 Consumer 中关于 task 的部分,就是把 task 和 consumer 联系起来,这样才能够让 Consumer 具体调用到 task

    6.1 Consumer steps

    Consumer启动时候,也是要运行多个 steps。

    parent.steps = {list: 8} 
     0 = {Connection} <step: Connection>
     1 = {Events} <step: Events>
     2 = {Heart} <step: Heart>
     3 = {Mingle} <step: Mingle>
     4 = {Gossip} <step: Gossip>
     5 = {Tasks} <step: Tasks>
     6 = {Control} <step: Control>
     7 = {Evloop} <step: event loop>
     __len__ = {int} 8
    

    6.2 Tasks steps

    consumer 会启动 Tasks 这个bootsteps,这里会:

    • update_strategies :配置每个任务的回调方法,比如:'celery.chunks' = {function} <function default.<locals>.task_message_handler at 0x7fc5a47d5a60>
    • task_consumer = c.app.amqp.TaskConsumer :这样 task 就和 amqp.Consumer 联系起来
    • 设置 QoS;
    • 设置 预取;

    因此,task 的回调就和 amqp.Consumer 联系,消息通路就构建完成

    代码位于:celery/worker/consumer/tasks.py

    class Tasks(bootsteps.StartStopStep):
        """Bootstep starting the task message consumer."""
    
        requires = (Mingle,)
    
        def __init__(self, c, **kwargs):
            c.task_consumer = c.qos = None
            super().__init__(c, **kwargs)
    
        def start(self, c):
            """Start task consumer."""
            c.update_strategies() # 配置每个任务的回调方法
    
            # - RabbitMQ 3.3 completely redefines how basic_qos works..
            # This will detect if the new qos smenatics is in effect,
            # and if so make sure the 'apply_global' flag is set on qos updates.
            qos_global = not c.connection.qos_semantics_matches_spec
    
            # set initial prefetch count
            c.connection.default_channel.basic_qos(
                0, c.initial_prefetch_count, qos_global,
            )
    
            c.task_consumer = c.app.amqp.TaskConsumer(
                c.connection, on_decode_error=c.on_decode_error,
            ) # task 就和 amqp.Consumer 联系起来
    
            def set_prefetch_count(prefetch_count):
                return c.task_consumer.qos(
                    prefetch_count=prefetch_count,
                    apply_global=qos_global,
                )
            c.qos = QoS(set_prefetch_count, c.initial_prefetch_count)
    

    5.2.1 策略

    关于 task 运行其实是需要一定策略的,这也可以认为是一种负载均衡。其策略如下:

    SCHED_STRATEGY_FCFS = 1
    SCHED_STRATEGY_FAIR = 4
    
    SCHED_STRATEGIES = {
        None: SCHED_STRATEGY_FAIR,
        'default': SCHED_STRATEGY_FAIR,
        'fast': SCHED_STRATEGY_FCFS,
        'fcfs': SCHED_STRATEGY_FCFS,
        'fair': SCHED_STRATEGY_FAIR,
    }
    

    5.2.2 更新策略

    update_strategies 会配置每个任务的回调策略以及回调方法,比如:'celery.chunks' = {function} <function default.<locals>.task_message_handler at 0x7fc5a47d5a60>

    堆栈如下:

    update_strategies, consumer.py:523
    start, tasks.py:26
    start, bootsteps.py:116
    start, consumer.py:311
    start, bootsteps.py:365
    start, bootsteps.py:116
    start, worker.py:204
    worker, worker.py:327
    caller, base.py:132
    new_func, decorators.py:21
    invoke, core.py:610
    invoke, core.py:1066
    invoke, core.py:1259
    main, core.py:782
    start, base.py:358
    worker_main, base.py:374
    

    代码位于:celery/worker/consumer/consumer.py

    def update_strategies(self):
            loader = self.app.loader                                                # app的加载器
            for name, task in items(self.app.tasks):                                # 遍历所有的任务
                self.strategies[name] = task.start_strategy(self.app, self)         # 将task的name设为key 将task start_strategy调用的返回值作为 value
                task.__trace__ = build_tracer(name, task, loader, self.hostname,
                                              app=self.app)                         # 处理相关执行结果的函数
    
    

    app.tasks变量如下,这就是目前 Celery 注册的所有 tasks:

    self.app.tasks = {TaskRegistry: 10} 
     NotRegistered = {type} <class 'celery.exceptions.NotRegistered'>
     'celery.chunks' = {chunks} <@task: celery.chunks of myTest at 0x7fc5a36e8160>
     'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest at 0x7fc5a36e8160>
     'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest at 0x7fc5a36e8160>
     'celery.group' = {group} <@task: celery.group of myTest at 0x7fc5a36e8160>
     'celery.map' = {xmap} <@task: celery.map of myTest at 0x7fc5a36e8160>
     'celery.chain' = {chain} <@task: celery.chain of myTest at 0x7fc5a36e8160>
     'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest at 0x7fc5a36e8160>
     'celery.chord' = {chord} <@task: celery.chord of myTest at 0x7fc5a36e8160>
     'myTest.add' = {add} <@task: myTest.add of myTest at 0x7fc5a36e8160>
     'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest at 0x7fc5a36e8160>
     __len__ = {int} 10
    

    此时我们继续查看task.start_strategy函数,

    def start_strategy(self, app, consumer, **kwargs):
        return instantiate(self.Strategy, self, app, consumer, **kwargs)    # 生成task实例
    

    此时self.Strategy的默认值是celery.worker.strategy:default,

    def default(task, app, consumer,
            info=logger.info, error=logger.error, task_reserved=task_reserved,
            to_system_tz=timezone.to_system, bytes=bytes, buffer_t=buffer_t,
            proto1_to_proto2=proto1_to_proto2):
        """Default task execution strategy.
    
        Note:
            Strategies are here as an optimization, so sadly
            it's not very easy to override.
        """
        hostname = consumer.hostname                                # 设置相关的消费者信息
        connection_errors = consumer.connection_errors              # 设置错误值
        _does_info = logger.isEnabledFor(logging.INFO)
    
        # task event related
        # (optimized to avoid calling request.send_event)
        eventer = consumer.event_dispatcher                                             
        events = eventer and eventer.enabled
        send_event = eventer.send
        task_sends_events = events and task.send_events
    
        call_at = consumer.timer.call_at
        apply_eta_task = consumer.apply_eta_task
        rate_limits_enabled = not consumer.disable_rate_limits
        get_bucket = consumer.task_buckets.__getitem__
        handle = consumer.on_task_request
        limit_task = consumer._limit_task
        body_can_be_buffer = consumer.pool.body_can_be_buffer
        
        Req = create_request_cls(Request, task, consumer.pool, hostname, eventer)       # 返回一个请求类
    
        revoked_tasks = consumer.controller.state.revoked
    
        def task_message_handler(message, body, ack, reject, callbacks,
                                 to_timestamp=to_timestamp):
            if body is None:
                body, headers, decoded, utc = (
                    message.body, message.headers, False, True,
                )
                if not body_can_be_buffer:
                    body = bytes(body) if isinstance(body, buffer_t) else body
            else:
                body, headers, decoded, utc = proto1_to_proto2(message, body)           # 解析接受的数据
    
            req = Req(
                message,
                on_ack=ack, on_reject=reject, app=app, hostname=hostname,
                eventer=eventer, task=task, connection_errors=connection_errors,
                body=body, headers=headers, decoded=decoded, utc=utc,
            )                                                                           # 实例化请求
    
            if (req.expires or req.id in revoked_tasks) and req.revoked():
                return
    
            if task_sends_events:
                send_event(
                    'task-received',
                    uuid=req.id, name=req.name,
                    args=req.argsrepr, kwargs=req.kwargsrepr,
                    root_id=req.root_id, parent_id=req.parent_id,
                    retries=req.request_dict.get('retries', 0),
                    eta=req.eta and req.eta.isoformat(),
                    expires=req.expires and req.expires.isoformat(),
                )                                                                       # 如果需要发送接受请求则发送
    
            if req.eta:                                                                 # 时间相关处理
                try:
                    if req.utc:
                        eta = to_timestamp(to_system_tz(req.eta))
                    else:
                        eta = to_timestamp(req.eta, timezone.local)
                else:
                    consumer.qos.increment_eventually()
                    call_at(eta, apply_eta_task, (req,), priority=6)
            else:
                if rate_limits_enabled:                                                 # 速率限制
                    bucket = get_bucket(task.name)
                    if bucket:
                        return limit_task(req, bucket, 1)
                task_reserved(req)                                                      # 
                if callbacks:
                    [callback(req) for callback in callbacks] 
                handle(req)                                                             # 处理接受的请求
    
        return task_message_handler
    
    

    此时处理的 handler 就是在 consumer 初始化的时候传入的 w.process_task,

    def _process_task(self, req):
        """Process task by sending it to the pool of workers."""
            req.execute_using_pool(self.pool)
    

    操作之后,得到了每个task的回调策略,这样当多进程调用时候,就知道如何调用task了,即对于我们目前的各个 task,当从broker 拿到任务消息之后,我们都调用 task_message_handler

    strategies = {dict: 10} 
     'celery.chunks' = {function} <function default.<locals>.task_message_handler at 0x7fc5a47d5a60>
     'celery.backend_cleanup' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878400>
     'celery.chord_unlock' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878598>
     'celery.group' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878840>
     'celery.map' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878ae8>
     'celery.chain' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878d90>
     'celery.starmap' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b0d0>
     'celery.chord' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b378>
     'myTest.add' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b620>
     'celery.accumulate' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b8c8>
     __len__ = {int} 10
    

    5.2.3 Request

    celery.worker.strategy:default 之中,这部分代码需要看看:

    Req = create_request_cls(Request, task, consumer.pool, hostname, eventer)  # 返回一个请求类
    

    Strategy 中,以下目的是为了 根据 task 实例 构建一个 Request,从而把 broker 消息,consumer,多进程都联系起来。

    具体可以看到 Request. execute_using_pool 这里就会和多进程处理开始关联,比如和 comsumer 的 pool 进程池联系起来。

    Req = create_request_cls(Request, task, consumer.pool, hostname, eventer)
    

    task 实例为:

    myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f]  
    

    获得Requst代码为:

    def create_request_cls(base, task, pool, hostname, eventer,
                           ref=ref, revoked_tasks=revoked_tasks,
                           task_ready=task_ready, trace=trace_task_ret):
        default_time_limit = task.time_limit
        default_soft_time_limit = task.soft_time_limit
        apply_async = pool.apply_async
        acks_late = task.acks_late
        events = eventer and eventer.enabled
    
        class Request(base):
    
            def execute_using_pool(self, pool, **kwargs):
                task_id = self.task_id
                if (self.expires or task_id in revoked_tasks) and self.revoked():
                    raise TaskRevokedError(task_id)
    
                time_limit, soft_time_limit = self.time_limits
                result = apply_async(
                    trace,
                    args=(self.type, task_id, self.request_dict, self.body,
                          self.content_type, self.content_encoding),
                    accept_callback=self.on_accepted,
                    timeout_callback=self.on_timeout,
                    callback=self.on_success,
                    error_callback=self.on_failure,
                    soft_timeout=soft_time_limit or default_soft_time_limit,
                    timeout=time_limit or default_time_limit,
                    correlation_id=task_id,
                )
                # cannot create weakref to None
                # pylint: disable=attribute-defined-outside-init
                self._apply_result = maybe(ref, result)
                return result
    
            def on_success(self, failed__retval__runtime, **kwargs):
                failed, retval, runtime = failed__retval__runtime
                if failed:
                    if isinstance(retval.exception, (
                            SystemExit, KeyboardInterrupt)):
                        raise retval.exception
                    return self.on_failure(retval, return_ok=True)
                task_ready(self)
    
                if acks_late:
                    self.acknowledge()
    
                if events:
                    self.send_event(
                        'task-succeeded', result=retval, runtime=runtime,
                    )
    
        return Request
    

    5.2.4 如何调用到多进程

    前面回调函数 task_message_handler中有 req = Req(...),这就涉及到了如何调用多进程,即 Request 类处理。

    def task_message_handler(message, body, ack, reject, callbacks,
                             to_timestamp=to_timestamp):
    
        req = Req(
            message,
            on_ack=ack, on_reject=reject, app=app, hostname=hostname,
            eventer=eventer, task=task, connection_errors=connection_errors,
            body=body, headers=headers, decoded=decoded, utc=utc,
        )                                                                     # 实例化请求
    
        if req.eta:                                                           # 时间相关
        else:
            task_reserved(req)                                                # 
            if callbacks:
                [callback(req) for callback in callbacks] 
            handle(req)                                                       # 处理接受的请求
    
    return task_message_handler
    

    注意:

    此时处理的 handle(req) 的 handle函数 就是在 consumer 初始化的时候传入的 w.process_task,

    def _process_task(self, req):
        """Process task by sending it to the pool of workers."""
            req.execute_using_pool(self.pool)
    

    所以,handle(req) 实际上就是调用 Request 的 execute_using_pool 函数,就来到了多进程。

    代码为:

    class Request(base):
    
        def execute_using_pool(self, pool, **kwargs):
            task_id = self.task_id# 获取任务id
            if (self.expires or task_id in revoked_tasks) and self.revoked():# 检查是否过期或者是否已经执行过
                raise TaskRevokedError(task_id)
    
            time_limit, soft_time_limit = self.time_limits# 获取时间
            result = apply_async(# 执行对应的func并返回结果
                trace,
                args=(self.type, task_id, self.request_dict, self.body,
                      self.content_type, self.content_encoding),
                accept_callback=self.on_accepted,
                timeout_callback=self.on_timeout,
                callback=self.on_success,
                error_callback=self.on_failure,
                soft_timeout=soft_time_limit or default_soft_time_limit,
                timeout=time_limit or default_time_limit,
                correlation_id=task_id,
            )
            # cannot create weakref to None
            # pylint: disable=attribute-defined-outside-init
            self._apply_result = maybe(ref, result)
            return result
    

    5.3 总结

    因为信息量太大,所以分为三个图展示。

    5.3.1 Strategy

    strategy 逻辑为:

                                          +-----------------------+                      +---------------------------+
                                          | Celery                |                      | Consumer                  |
                                          |                       |                      |                           |
                                          |            consumer +--------------------->  |                           |            +---------------+
                                          |                       |                      |        task_consumer +---------------> | amqp.Consumer |
                                          |             _tasks    |                      |                           |            +---------------+
                                          |                +      |                      |                           |
                                          |                |      |                      |        strategies +----------------+
                                          +-----------------------+                      |                           |        |
                                                           |                             |                           |        |
                                                           |                             +---------------------------+        |
                                                           |                                                                  v
                                                           v
    +------------------------------------------------------+-------------------------------------+  +-----------------------------------------------------------------------------+
    |                                                                                            |  | strategies = {dict: 10}                                                     |
    |                          TaskRegistry                                                      |  |  'celery.chunks' = function default.<locals>.task_message_handler           |
    |                                                                                            |  |  'celery.backend_cleanup' = function default.<locals>.task_message_handler  |
    |   NotRegistered = {type} <class 'celery.exceptions.NotRegistered'>                         |  |  'celery.chord_unlock' = function default.^locals>.task_message_handler     |
    |   'celery.chunks' = {chunks} <@task: celery.chunks of myTest>                              |  |  'celery.group' = function default.<localsv.task_message_handler            |
    |   'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest >  |  |  'celery.map' = function default.<locals>.task_message_handler              |
    |   'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest>            |  |  'celery.chain' = function default.<locals>.task_message_handler            |
    |   'celery.group' = {group} <@task: celery.group of myTest>                                 |  |  'celery.starmap' = function default.<locals>.task_message_handler          |
    |   'celery.map' = {xmap} <@task: celery.map of myTest>                                      |  |  'celery.chord' = function default.<locals>.task_message_handler            |
    |   'celery.chain' = {chain} <@task: celery.chain of myTest>                                 |  |  'myTest.add' = function default.<locals^.task_message_handler              |
    |   'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest>                          |  |  'celery.accumulate' = function default.vlocals>.task_message_handler       |
    |   'celery.chord' = {chord} <@task: celery.chord of myTest>                                 |  |                                                                             |
    |   'myTest.add' = {add} <@task: myTest.add of myTest>                                       |  +-----------------------------------------------------------------------------+
    |   'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest>                  |
    |                                                                                            |
    +--------------------------------------------------------------------------------------------+
    
    

    手机如下

    5.3.2 注册 task 逻辑

    Celery 应用中注册的task 逻辑为

                               +------------------------------+
                               |  _on_app_finalizers = set()  |
                               |                              |
                               +--------------+---------------+
                                              |
                     connect_on_app_finalize  |
     +------------+                           |
     | builtins.py| +-----------------------> |
     +------------+                           |
                                              |
                     connect_on_app_finalize  |
    +-------------+                           |
    |User Function| +---------------------->  |
    +-------------+                           |
                                              v
    
                 +----------------------------------------------------------------------------------------------------+
                 |                                        _on_app_finalizers                                          |
                 |                                                                                                    |
                 |                                                                                                    |
                 |    ^function add_chunk_task>                                                                       |
                 |    <function add_backend_cleanup_task>                                                             |
                 |    <function add_starmap_task>                                                                     |
                 |    <function add_group_task>                                                                       |
                 |    <function add_map_task^                                                                         |
                 |    <function Celery.task.vlocals^.inner_create_task_cls.<locals>._create_task_cls.<locals>.cons>   |
                 |    <function add_accumulate_taskv                                                                  |
                 |    <function add_chain_task>                                                                       |
                 |    <function add_unlock_chord_task>                                                                |
                 |    vfunction add_chord_task>                                                                       |
                 |                                                                                                    |
                 +----------------------------+-----------------------------------------------------------------------+
                                              |
                                              |
                                              |                           +--------------------------------------------------------------------------------------------+
                                  finalize    v                           |                                                                                            |
                                                                          |                          TaskRegistry                                                      |
                               +---------------------------+              |                                                                                            |
                               |                           |              |                                                                                            |
                               |           Celery          |              |                                                                                            |
                               |                           |              |   NotRegistered = {type} <class 'celery.exceptions.NotRegistered'>                         |
        _process_task   <-------------------+  process_task|              |   'celery.chunks' = {chunks} <@task: celery.chunks of myTest>                              |
                               |                           |              |   'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest >  |
                               |                           |              |   'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest>            |
                               |                _tasks  +------------->   |   'celery.group' = {group} <@task: celery.group of myTest>                                 |
     +---------------+         |                           |              |   'celery.map' = {xmap} <@task: celery.map of myTest>                                      |
     | amqp.Consumer |  <--------+  task_consumer          |              |   'celery.chain' = {chain} <@task: celery.chain of myTest>                                 |
     +---------------+         |                           |              |   'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest>                          |
                               +---------------------------+              |   'celery.chord' = {chord} <@task: celery.chord of myTest>                                 |
                                                                          |   'myTest.add' = {add} <@task: myTest.add of myTest>                                       |
                                                                          |   'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest>                  |
                                                                          |                                                                                            |
                                                                          +--------------------------------------------------------------------------------------------+
    
    

    手机如下:

    5.3.3 处理任务逻辑

    当从broker获取消息之后,处理任务时候逻辑为:

                             +
      Consumer               |
                     message |
                             v         strategy  +------------------------------------+
                +------------+------+            | strategies                         |
                | on_task_received  | <--------+ |                                    |
                |                   |            |[myTest.add : task_message_handler] |
                +------------+------+            +------------------------------------+
                             |
                             |
     +------------------------------------------------------------------------------------+
     strategy                |
                             |
                             |
                             v                Request [myTest.add]
                +------------+-------------+                       +---------------------+
                | task_message_handler     | <-------------------+ | create_request_cls  |
                |                          |                       |                     |
                +------------+-------------+                       +---------------------+
                             | _process_task_sem
                             |
    +--------------------------------------------------------------------------------------+
     Worker                  | req[{Request} myTest.add]
                             v
                    +--------+-----------+
                    | WorkController     |
                    |                    |
                    |            pool +-------------------------+
                    +--------+-----------+                      |
                             |                                  |
                             |               apply_async        v
                 +-----------+----------+                   +---+-------+
                 |{Request} myTest.add  | +---------------> | TaskPool  |
                 +----------------------+                   +-----------+
                                            myTest.add
    
    

    手机如下图:

    至此,Celery启动全部分析结束,我们下一步看看一个完整的例子,即消息如何从发送到被消费的流程。

    0xFF 参考

    celery源码分析-Task的初始化与发送任务

    Celery 源码解析三: Task 对象的实现

    Celery-4.1 用户指南: Application(应用)

  • 相关阅读:
    BZOJ 3189. [Coci2011]Slika
    BZOJ3188. [Coci 2011]Upit
    P4304 [TJOI2013]攻击装置
    P3966 [TJOI2013]单词
    P3964 [TJOI2013]松鼠聚会
    BZOJ 3157: 国王奇遇记
    设计模式--策略模式
    SSM整合步骤
    Mybatis笔记二
    Java调用WebService之Axis实现
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/14613110.html
Copyright © 2011-2022 走看看