zoukankan      html  css  js  c++  java
  • [源码解析] 并行分布式框架 Celery 之 worker 启动 (1)

    [源码解析] 并行分布式框架 Celery 之 worker 启动 (1)

    0x00 摘要

    Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。Celery 是调用其Worker 组件来完成具体任务处理。

    $ celery --app=proj worker -l INFO
    $ celery -A proj worker -l INFO -Q hipri,lopri
    $ celery -A proj worker --concurrency=4
    $ celery -A proj worker --concurrency=1000 -P eventlet
    $ celery worker --autoscale=10,0
    

    所以我们本文就来讲解 worker 的启动过程。

    0x01 Celery的架构

    前面我们用几篇文章分析了 Kombu,为 Celery 的分析打下了基础。

    [源码分析] 消息队列 Kombu 之 mailbox

    [源码分析] 消息队列 Kombu 之 Hub

    [源码分析] 消息队列 Kombu 之 Consumer

    [源码分析] 消息队列 Kombu 之 Producer

    [源码分析] 消息队列 Kombu 之 启动过程

    [源码解析] 消息队列 Kombu 之 基本架构

    以及

    源码解析 并行分布式框架 Celery 之架构 (2)

    [源码解析] 并行分布式框架 Celery 之架构 (2)

    下面我们再回顾下 Celery 的结构。Celery的架构图如下所示:

     +-----------+            +--------------+
     | Producer  |            |  Celery Beat |
     +-------+---+            +----+---------+
             |                     |
             |                     |
             v                     v
    
           +-------------------------+
           |          Broker         |
           +------------+------------+
                        |
                        |
                        |
         +-------------------------------+
         |              |                |
         v              v                v
    +----+-----+   +----+------+   +-----+----+
    | Exchange |   |  Exchange |   | Exchange |
    +----+-----+   +----+------+   +----+-----+
         |              |               |
         v              v               v
    
      +-----+       +-------+       +-------+
      |queue|       | queue |       | queue |
      +--+--+       +---+---+       +---+---+
         |              |               |
         |              |               |
         v              v               v
    
    +---------+     +--------+     +----------+
    | worker  |     | Worker |     |  Worker  |
    +-----+---+     +---+----+     +----+-----+
          |             |               |
          |             |               |
          +-----------------------------+
                        |
                        |
                        v
                    +---+-----+
                    | backend |
                    +---------+
    
    

    0x02 示例代码

    其实网上难以找到调试Celery worker的办法。我们可以去其源码看看,发现如下:

    # def test_worker_main(self):
    #     from celery.bin import worker as worker_bin
    #
    #     class worker(worker_bin.worker):
    #
    #         def execute_from_commandline(self, argv):
    #             return argv
    #
    #     prev, worker_bin.worker = worker_bin.worker, worker
    #     try:
    #         ret = self.app.worker_main(argv=['--version'])
    #         assert ret == ['--version']
    #     finally:
    #         worker_bin.worker = prev
    

    所以我们可以模仿来进行,使用如下启动worker,进行调试。

    from celery import Celery
    
    app = Celery('tasks', broker='redis://localhost:6379')
    
    @app.task()
    def add(x, y):
        return x + y
    
    if __name__ == '__main__':
        app.worker_main(argv=['worker'])
    

    0x03 逻辑概述

    当启动一个worker的时候,这个worker会与broker建立链接(tcp长链接),然后如果有数据传输,则会创建相应的channel, 这个连接可以有多个channel。然后,worker就会去borker的队列里面取相应的task来进行消费了,这也是典型的消费者生产者模式。

    这个worker主要是有四部分组成的,task_pool, consumer, scheduler, mediator。其中,task_pool主要是用来存放的是一些worker,当启动了一个worker,并且提供并发参数的时候,会将一些worker放在这里面。

    celery默认的并发方式是prefork,也就是多进程的方式,这里只是celery对multiprocessing pool进行了轻量的改造,然后给了一个新的名字叫做prefork,这个pool与多进程的进程池的区别就是这个task_pool只是存放一些运行的worker。

    consumer也就是消费者,主要是从broker那里接受一些message,然后将message转化为celery.worker.request.Request 的一个实例。

    Celery 在适当的时候,会把这个请求包装进Task中,Task就是用装饰器app_celery.task()装饰的函数所生成的类,所以可以在自定义的任务函数中使用这个请求参数,获取一些关键的信息。此时,已经了解了task_pool和consumer。

    接下来,这个worker具有两套数据结构,这两套数据结构是并行运行的,他们分别是 'ET时刻表' 、就绪队列。

    就绪队列:那些 立刻就需要运行的task, 这些task到达worker的时候会被放到这个就绪队列中等待consumer执行。

    我们下面看看如何启动Celery

    0x04 Celery应用

    程序首先会来到Celery类,这是Celery的应用。

    可以看到主要就是:各种类名称,TLS, 初始化之后的各种signal。

    位置在:celery/app/base.py,其定义如下:

    class Celery:
        """Celery application."""
    
        amqp_cls = 'celery.app.amqp:AMQP'
        backend_cls = None
        events_cls = 'celery.app.events:Events'
        loader_cls = None
        log_cls = 'celery.app.log:Logging'
        control_cls = 'celery.app.control:Control'
        task_cls = 'celery.app.task:Task'
        registry_cls = 'celery.app.registry:TaskRegistry'
    
        #: Thread local storage.
        _local = None
        _fixups = None
        _pool = None
        _conf = None
        _after_fork_registered = False
    
        #: Signal sent when app is loading configuration.
        on_configure = None
    
        #: Signal sent after app has prepared the configuration.
        on_after_configure = None
    
        #: Signal sent after app has been finalized.
        on_after_finalize = None
    
        #: Signal sent by every new process after fork.
        on_after_fork = None
    

    对于我们的示例代码,入口是:

    def worker_main(self, argv=None):
        if argv is None:
            argv = sys.argv
    
        if 'worker' not in argv:
            raise ValueError(
                "The worker sub-command must be specified in argv.
    "
                "Use app.start() to programmatically start other commands."
            )
    
        self.start(argv=argv)
    

    4.1 添加子command

    celery/bin/celery.py 会进行添加 子command,我们可以看出来。

    这些 Commnd 是可以在命令行作为子命令直接使用的

    celery.add_command(purge)
    celery.add_command(call)
    celery.add_command(beat)
    celery.add_command(list_)
    celery.add_command(result)
    celery.add_command(migrate)
    celery.add_command(status)
    celery.add_command(worker)
    celery.add_command(events)
    celery.add_command(inspect)
    celery.add_command(control)
    celery.add_command(graph)
    celery.add_command(upgrade)
    celery.add_command(logtool)
    celery.add_command(amqp)
    celery.add_command(shell)
    celery.add_command(multi)
    

    每一个都是command。我们以worker为例,具体如下:

    worker = {CeleryDaemonCommand} <CeleryDaemonCommand worker>
     add_help_option = {bool} True
     allow_extra_args = {bool} False
     allow_interspersed_args = {bool} True
     context_settings = {dict: 1} {'allow_extra_args': True}
     epilog = {NoneType} None
     name = {str} 'worker'
     options_metavar = {str} '[OPTIONS]'
     params = {list: 32} [<CeleryOption hostname>, ...... , <CeleryOption executable>]
    

    4.2 入口点

    然后会引入Celery 命令入口点 Celery。

    def start(self, argv=None):
        from celery.bin.celery import celery
    
        celery.params[0].default = self
    
        try:
            celery.main(args=argv, standalone_mode=False)
        except Exit as e:
            return e.exit_code
        finally:
            celery.params[0].default = None
    

    4.3 缓存属性cached_property

    Celery 中,大量的成员变量是被cached_property修饰的

    使用 cached_property修饰过的函数,就变成是对象的属性,该对象第一次引用该属性时,会调用函数,对象第二次引用该属性时就直接从词典中取了,即 Caches the return value of the get method on first call。

    很多知名Python项目都自己实现过 cached_property,比如Werkzeug,Django。

    因为太有用,所以 Python 3.8 给 functools 模块添加了 cached_property 类,这样就有了官方的实现。

    Celery 的代码举例如下:

        @cached_property
        def Worker(self):
            """Worker application.
            """
            return self.subclass_with_self('celery.apps.worker:Worker')
    
        @cached_property
        def Task(self):
            """Base task class for this app."""
            return self.create_task_cls()
    
        @property
        def pool(self):
            """Broker connection pool: :class:`~@pool`.
            """
            if self._pool is None:
                self._ensure_after_fork()
                limit = self.conf.broker_pool_limit
                pools.set_limit(limit)
                self._pool = pools.connections[self.connection_for_write()]
            return self._pool
    

    所以,最终,Celery的内容应该是这样的:

    app = {Celery} <Celery tasks at 0x7fb8e1538400>
     AsyncResult = {type} <class 'celery.result.AsyncResult'>
     Beat = {type} <class 'celery.apps.beat.Beat'>
     GroupResult = {type} <class 'celery.result.GroupResult'>
     Pickler = {type} <class 'celery.app.utils.AppPickler'>
     ResultSet = {type} <class 'celery.result.ResultSet'>
     Task = {type} <class 'celery.app.task.Task'>
     WorkController = {type} <class 'celery.worker.worker.WorkController'>
     Worker = {type} <class 'celery.apps.worker.Worker'>
     amqp = {AMQP} <celery.app.amqp.AMQP object at 0x7fb8e2444860>
     annotations = {tuple: 0} ()
     autofinalize = {bool} True
     backend = {DisabledBackend} <celery.backends.base.DisabledBackend object at 0x7fb8e25fd668>
     builtin_fixups = {set: 1} {'celery.fixups.django:fixup'}
     clock = {LamportClock} 1
     conf = {Settings: 163} Settings({'broker_url': 'redis://localhost:6379', 'deprecated_settings': set(), 'cache_...
     configured = {bool} True
     control = {Control} <celery.app.control.Control object at 0x7fb8e2585f98>
     current_task = {NoneType} None
     current_worker_task = {NoneType} None
     events = {Events} <celery.app.events.Events object at 0x7fb8e25ecb70>
     loader = {AppLoader} <celery.loaders.app.AppLoader object at 0x7fb8e237a4a8>
     main = {str} 'tasks'
     on_after_configure = {Signal} <Signal: app.on_after_configure providing_args={'source'}>
     on_after_finalize = {Signal} <Signal: app.on_after_finalize providing_args=set()>
     on_after_fork = {Signal} <Signal: app.on_after_fork providing_args=set()>
     on_configure = {Signal} <Signal: app.on_configure providing_args=set()>
     pool = {ConnectionPool} <kombu.connection.ConnectionPool object at 0x7fb8e26e9e80>
     producer_pool = {ProducerPool} <kombu.pools.ProducerPool object at 0x7fb8e26f02b0>
     registry_cls = {type} <class 'celery.app.registry.TaskRegistry'>
     set_as_current = {bool} True
     steps = {defaultdict: 2} defaultdict(<class 'set'>, {'worker': set(), 'consumer': set()})
     tasks = {TaskRegistry: 10} {'celery.chain': <@task: celery.chain of tasks at 0x7fb8e1538400>, 'celery.starmap': <@task: celery.starmap of tasks at 0x7fb8e1538400>, 'celery.chord': <@task: celery.chord of tasks at 0x7fb8e1538400>, 'celery.backend_cleanup': <@task: celery.backend_clea
     user_options = {defaultdict: 0} defaultdict(<class 'set'>, {})
    

    具体部分成员变量举例如下图:

    +---------------------------------------+
    |  Celery                               |
    |                                       |
    |                              Beat+-----------> celery.apps.beat.Beat
    |                                       |
    |                              Task+-----------> celery.app.task.Task
    |                                       |
    |                     WorkController+----------> celery.worker.worker.WorkController
    |                                       |
    |                            Worker+-----------> celery.apps.worker.Worker
    |                                       |
    |                              amqp +----------> celery.app.amqp.AMQP
    |                                       |
    |                           control +----------> celery.app.control.Control
    |                                       |
    |                            events  +---------> celery.app.events.Events
    |                                       |
    |                            loader +----------> celery.loaders.app.AppLoader
    |                                       |
    |                              pool +----------> kombu.connection.ConnectionPool
    |                                       |
    |                     producer_pool +----------> kombu.pools.ProducerPool
    |                                       |
    |                             tasks +----------> TaskRegistry
    |                                       |
    |                                       |
    +---------------------------------------+
    

    0x05 Celery 命令

    Celery的命令总入口为celery方法,具体在:celery/bin/celery.py。

    代码缩减版如下:

    @click.pass_context
    def celery(ctx, app, broker, result_backend, loader, config, workdir,
               no_color, quiet, version):
        """Celery command entrypoint."""
    
        if loader:
            # Default app takes loader from this env (Issue #1066).
            os.environ['CELERY_LOADER'] = loader
        if broker:
            os.environ['CELERY_BROKER_URL'] = broker
        if result_backend:
            os.environ['CELERY_RESULT_BACKEND'] = result_backend
        if config:
            os.environ['CELERY_CONFIG_MODULE'] = config
        ctx.obj = CLIContext(app=app, no_color=no_color, workdir=workdir,
                             quiet=quiet)
    
        # User options
        worker.params.extend(ctx.obj.app.user_options.get('worker', []))
        beat.params.extend(ctx.obj.app.user_options.get('beat', []))
        events.params.extend(ctx.obj.app.user_options.get('events', []))
    
        for command in celery.commands.values():
            command.params.extend(ctx.obj.app.user_options.get('preload', []))
    

    在方法中,会遍历celery.commands,拓展param,具体如下。这些 commands 就是之前刚刚提到的子Command:

    celery.commands = 
     'report' = {CeleryCommand} <CeleryCommand report>
     'purge' = {CeleryCommand} <CeleryCommand purge>
     'call' = {CeleryCommand} <CeleryCommand call>
     'beat' = {CeleryDaemonCommand} <CeleryDaemonCommand beat>
     'list' = {Group} <Group list>
     'result' = {CeleryCommand} <CeleryCommand result>
     'migrate' = {CeleryCommand} <CeleryCommand migrate>
     'status' = {CeleryCommand} <CeleryCommand status>
     'worker' = {CeleryDaemonCommand} <CeleryDaemonCommand worker>
     'events' = {CeleryDaemonCommand} <CeleryDaemonCommand events>
     'inspect' = {CeleryCommand} <CeleryCommand inspect>
     'control' = {CeleryCommand} <CeleryCommand control>
     'graph' = {Group} <Group graph>
     'upgrade' = {Group} <Group upgrade>
     'logtool' = {Group} <Group logtool>
     'amqp' = {Group} <Group amqp>
     'shell' = {CeleryCommand} <CeleryCommand shell>
     'multi' = {CeleryCommand} <CeleryCommand multi>
    

    0x06 worker 子命令

    Work子命令是 Command 总命令的一员,也是我们直接在命令行加入 worker 参数时候,调用到的子命令。

    $ celery -A proj worker -l INFO -Q hipri,lopri
    

    worker 子命令继承了click.BaseCommand,为

    定义在celery/bin/worker.py。

    因此如下代码间接调用到 worker 命令:

    celery.main(args=argv, standalone_mode=False)
    

    定义如下:

    def worker(ctx, hostname=None, pool_cls=None, app=None, uid=None, gid=None,
               loglevel=None, logfile=None, pidfile=None, statedb=None,
               **kwargs):
        """Start worker instance.
    
        Examples
        --------
        $ celery --app=proj worker -l INFO
        $ celery -A proj worker -l INFO -Q hipri,lopri
        $ celery -A proj worker --concurrency=4
        $ celery -A proj worker --concurrency=1000 -P eventlet
        $ celery worker --autoscale=10,0
    
        """
        app = ctx.obj.app
        maybe_drop_privileges(uid=uid, gid=gid)
        worker = app.Worker(
            hostname=hostname, pool_cls=pool_cls, loglevel=loglevel,
            logfile=logfile,  # node format handled by celery.app.log.setup
            pidfile=node_format(pidfile, hostname),
            statedb=node_format(statedb, hostname),
            no_color=ctx.obj.no_color,
            **kwargs)
        worker.start()
        return worker.exitcode
    

    此时流程如下图,可以看到,从 Celery 应用就进入到了具体的 worker 命令:

          +----------+
          |   User   |
          +----+-----+
               |
               |  worker_main
               |
               v
     +---------+------------+
     |        Celery        |
     |                      |
     |  Celery application  |
     |  celery/app/base.py  |
     |                      |
     +---------+------------+
               |
               |  celery.main
               |
               v
     +---------+------------+
     |  @click.pass_context |
     |       celery         |
     |                      |
     |                      |
     |    CeleryCommand     |
     | celery/bin/celery.py |
     |                      |
     +---------+------------+
               |
               |
               |
               v
    +----------+------------+
    |   @click.pass_context |
    |        worker         |
    |                       |
    |                       |
    |     WorkerCommand     |
    | celery/bin/worker.py  |
    +-----------------------+
    

    0x07 Worker application

    此时在该函数中会实例化app的Worker,Worker application 就是 worker 的实例此时的app就是前面定义的Celery类的实例app

    定义在:celery/app/base.py。

    @cached_property
    def Worker(self):
        """Worker application.
    
        See Also:
            :class:`~@Worker`.
        """
        return self.subclass_with_self('celery.apps.worker:Worker')
    

    此时subclass_with_self利用了Python的type动态生成类实例的属性。

    def subclass_with_self(self, Class, name=None, attribute='app',
                           reverse=None, keep_reduce=False, **kw):
        """Subclass an app-compatible class.
        """
        Class = symbol_by_name(Class)                               # 导入该类
        reverse = reverse if reverse else Class.__name__            # 判断是否传入值,如没有则使用类的名称
    
        def __reduce__(self):                                       # 定义的方法 该方法在pickle过程中会被调用
            return _unpickle_appattr, (reverse, self.__reduce_args__()) 
    
        attrs = dict(
            {attribute: self},                                      # 默认设置app的值为self
            __module__=Class.__module__,    
            __doc__=Class.__doc__,
            **kw)                                                   # 填充属性
        if not keep_reduce:                                         
            attrs['__reduce__'] = __reduce__                        # 如果默认则生成的类设置__reduce__方法
    
        return type(bytes_if_py2(name or Class.__name__), (Class,), attrs) # 利用type实诚类实例
    
    

    此时就已经从 worker 命令 得到了一个celery.apps.worker:Worker的实例,然后调用该实例的start方法,此时首先分析一下Worker类的实例化的过程。

    我们先回顾下:

    我们的执行从 worker_main 这个程序入口,来到了 Celery 应用。然后进入了 Celery Command,然后又进入到了 Worker 子Command,具体如下图。

                                         +----------------------+
          +----------+                   |  @cached_property    |
          |   User   |                   |      Worker          |
          +----+-----+            +--->  |                      |
               |                  |      |                      |
               |  worker_main     |      |  Worker application  |
               |                  |      |  celery/app/base.py  |
               v                  |      +----------------------+
     +---------+------------+     |
     |        Celery        |     |
     |                      |     |
     |  Celery application  |     |
     |  celery/app/base.py  |     |
     |                      |     |
     +---------+------------+     |
               |                  |
               |  celery.main     |
               |                  |
               v                  |
     +---------+------------+     |
     |  @click.pass_context |     |
     |       celery         |     |
     |                      |     |
     |                      |     |
     |    CeleryCommand     |     |
     | celery/bin/celery.py |     |
     |                      |     |
     +---------+------------+     |
               |                  |
               |                  |
               |                  |
               v                  |
    +----------+------------+     |
    |   @click.pass_context |     |
    |        worker         |     |
    |                       |     |
    |                       |     |
    |     WorkerCommand     |     |
    | celery/bin/worker.py  |     |
    +-----------+-----------+     |
                |                 |
                +-----------------+
    
    

    下面就会正式进入 worker,Celery 把 worker 的正式逻辑成为 work as a program。

    我们在下文将接下来继续看后续 work as a program 的启动过程。

    0xFF 参考

    Celery 源码学习(二)多进程模型

    celery原理初探

    celery源码分析-wroker初始化分析(上)

    celery源码分析-worker初始化分析(下)

    celery worker初始化--DAG实现

    python celery多worker、多队列、定时任务

    celery 详细教程-- Worker篇

    使用Celery

    Celery 源码解析一:Worker 启动流程概述

    Celery 源码解析二:Worker 的执行引擎

    Celery 源码解析三:Task 对象的实现

    Celery 源码解析四:定时任务的实现

    Celery 源码解析五:远程控制管理

    Celery 源码解析六:Events 的实现

    Celery 源码解析七:Worker 之间的交互

    Celery 源码解析八:State 和 Result

  • 相关阅读:
    数据结构:散列函数的构造方法
    数据结构:散列表的基本概念
    数据结构:判断是否为同一棵二叉搜索树
    数据结构:二叉搜索树
    数据结构:二叉树遍历及其递归实现
    数据结构:二叉树遍历及其堆栈实现和应用
    数据结构:二叉树的定义与存储
    poj 2312 Battle City(优先队列+bfs)
    hdu 2112 HDU Today (最短路)
    hdu 1874 畅通工程续
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/14563763.html
Copyright © 2011-2022 走看看