zoukankan      html  css  js  c++  java
  • [源码解析] 并行分布式框架 Celery 之架构 (1)

    [源码解析] 并行分布式框架 Celery 之架构 (1)

    0x00 摘要

    Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。

    前面我们用几篇文章分析了 Kombu,为 Celery 的分析打下了基础。

    [源码分析] 消息队列 Kombu 之 mailbox

    [源码分析] 消息队列 Kombu 之 Hub

    [源码分析] 消息队列 Kombu 之 Consumer

    [源码分析] 消息队列 Kombu 之 Producer

    [源码分析] 消息队列 Kombu 之 启动过程

    [源码解析] 消息队列 Kombu 之 基本架构

    本系列将继续通过源码分析,和大家一起深入学习 Celery。本文是系列第一篇,借鉴了几位网友的大作,按照自己的理解再重新整理,遂得此文。

    0x01 Celery 简介

    1.1 什么是 Celery

    Celery是Python世界中最受欢迎的后台工作管理者之一。它是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。

    利用多线程,如Eventlet,gevent等,Celery的任务能被并发地执行在单个或多个工作服务器(worker servers)上。任务能异步执行(后台运行)或同步执行(等待任务完成)。Celery用于生产系统时候每天可以处理数以百万计的任务。

    Celery是用Python编写的,但该协议可以在任何语言实现。它也可以与其他语言通过webhooks实现。

    Celery建议的消息队列是RabbitMQ,但也支持Redis, Beanstalk, MongoDB, CouchDB, 和数据库(使用SQLAlchemy的或Django的 ORM) 。并且可以同时充当生产者和消费者。

    1.2 场景

    使用Celery的常见场景如下:

    • Web应用。当用户触发的一个操作需要较长时间才能执行完成时,可以把它作为任务交给Celery去异步执行,执行完再返回给用户。这段时间用户不需要等待,提高了网站的整体吞吐量和响应时间。

    • 定时任务。生产环境经常会跑一些定时任务。假如你有上千台的服务器、上千种任务,定时任务的管理很困难,Celery可以帮助我们快速在不同的机器设定不同种任务。

    • 同步完成的附加工作都可以异步完成。比如发送短信/邮件、推送消息、清理/设置缓存等。

    1.3 特性

    Celery提供了如下的特性:

    • 方便地查看定时任务的执行情况,比如执行是否成功、当前状态、执行任务花费的时间等。

    • 可以使用功能齐备的管理后台或者命令行添加、更新、删除任务。

    • 方便把任务和配置管理相关联。

    • 可选多进程、Eventlet 和 Gevent 三种模式并发执行。

    • 提供错误处理机制。

    • 提供多种任务原语,方便实现任务分组、拆分和调用链。

    • 支持多种消息代理和存储后端。

    1.4 区别

    消息队列和任务队列,最大的不同之处就在于理念的不同 -- 消息队列传递的是“消息”,任务队列传递的是“任务”

    • 消息队列用来快速消费队列中的消息。消息队列更侧重于消息的吞吐、处理,具有有处理海量信息的能力。另外利用消息队列的生长者和消费者的概念,也可以实现任务队列的功能,但是还需要进行额外的开发。
    • 任务队列是用来执行一个耗时任务。任务队列则提供了执行任务所需的功能,比如任务的重试,结果的返回,任务状态记录等。虽然也有并发的处理能力,但一般不适用于高吞吐量快速消费的场景。

    0x02 Celery的架构

    Celery 的基本逻辑为:分布式异步消息任务队列。

    在 Celery 中,采用的是分布式的管理方式,每个节点之间都是通过广播/单播进行通信,从而达到协同效果。实际上,只有部分辅助管理功能才会协同,基础业务功能反而没有借助协同

    2.1 组件

    Celery包含如下组件:

    • Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。

    • Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。

    • Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。

    • Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。

    • Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery默认已支持 Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy 等方式。

    再理解一下:

    • 系统可以有多个"消息队列"(message Queue),不同的消息可以指定发送给不同的Message Queue。
    • 上述功能是通过Exchange来实现的,发送消息到"消息队列"中时,可以指定 routing_key,Exchange 通过routing_key 来把消息路由(routes)到不同的"消息队列"中去(Celery的底层依赖Kombu,里面涉及Exchange)。
    • exchange 对应 一个消息队列(queue),即:通过 "消息路由" 的机制使exchange对应queue,每个queue对应每个worker。

    2.2 任务流程

    Celery 通过消息机制进行通信,通常使用中间人(Broker)作为客户端和职程(Worker)调节。启动一个任务的流程是:

    • 客户端向消息队列发送一条消息;
    • 然后中间人(Broker)将消息传递给一个职程(Worker),支持RabbitMQ、Redis等作为Broker。;
    • 最后由职程(Worker)进行执行中间人(Broker)分配的任务;

    2.3 架构图

    Celery的架构图如下所示:

     +-----------+            +--------------+
     | Producer  |            |  Celery Beat |
     +-------+---+            +----+---------+
             |                     |
             |                     |
             v                     v
    
           +-------------------------+
           |          Broker         |
           +------------+------------+
                        |
                        |
                        |
         +-------------------------------+
         |              |                |
         v              v                v
    +----+-----+   +----+------+   +-----+----+
    | Exchange |   |  Exchange |   | Exchange |
    +----+-----+   +----+------+   +----+-----+
         |              |               |
         v              v               v
    
      +-----+       +-------+       +-------+
      |queue|       | queue |       | queue |
      +--+--+       +---+---+       +---+---+
         |              |               |
         |              |               |
         v              v               v
    
    +---------+     +--------+     +----------+
    | worker  |     | Worker |     |  Worker  |
    +-----+---+     +---+----+     +----+-----+
          |             |               |
          |             |               |
          +-----------------------------+
                        |
                        |
                        v
                    +---+-----+
                    | backend |
                    +---------+
    
    

    0x03 Celery 设计推理

    目前我们得到如下信息:

    • Celery 的基本逻辑为:分布式异步消息任务队列;
    • Celery底层依赖 Kombu,基于 Kombu 完成基本功能;
    • 之前我们通过若干文章,基本了解了 Kombu 的大致逻辑;

    下面我们就需要依据 Kombu来推论 Celery 应该如何设计。

    3.1 Celery 基本功能

    首先,我们看看为了完成基本功能,Celery 应该具备哪些组件(模块),我们会提出一些问题,这些问题将在后续的分析中陆续得到解答

    因为Celery 的基本逻辑为:分布式异步消息任务队列,所以Celery包含如下基础组件:

    • Producer:需要有一个组件完成如下功能 :把用户定义的代码打包整合成任务提交给任务队列处理。问题就在于:
      • 对于任务,也就是task如何处理?
      • task的本质是什么?
      • task 应该包括哪些功能?
      • 如果task是函数,如何把task函数传递给服务端?如果task函数内容很大怎么办?
      • 如何把task相关信息从客户端传递到服务端?
    • Broker:为了解耦合,需要有一个中间组件来缓存消息。这就是 消息代理,或者叫作消息中间件。其作用是接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。问题在于:
      • 如何区分不同的消息来源,即如何路由?
      • 是否有容错机制?
    • Worker:需要有一个组件来执行任务,这就是 Worker:
      • Worker 需要从 broker 接受任务。这就需要一个consumer,问题就是:Consumer 如何从 broker 获取消息
      • 接受任务之后,Worker 需要了解任务,知道怎么执行任务,执行任务。所以有一个问题:Worker 怎么知道 client 端的任务?
      • 通常会在多台服务器运行多个 worker 来提高执行效率。这就涉及到一个问题:多个 worker 之间如何协调?如何在多个 Worker 之间分配任务?
    • Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery默认已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。

    3.2 Celery 辅助功能

    以上为基础功能,但是作为分布式异步消息任务队列,我们还需要辅助功能(以及相关问题),比如。

    • 用于执行定时任务的timer;

    • 需要处理监控事件;

    • 如何通过远程命令管理;

    • worker 出现问题,如何处理;

    • 如何提高并发?

    • 如何封装amqp?

    • 如何进行消息循环引擎?

    • 以上功能哪些属于带有分布式特点的?

    3.3 如何划分

    进一步问题是:这些辅助功能是作为基础功能模块的一部分?还是独立出来成为一个功能模块?

    这其实是一个哲学问题,每种实现都有其道理,或者说,很多决定其实就是作者灵光一现(临时拍脑袋)的产物。

    比如我们后面提到的 Consumer 组件,表面上看,就是一个从broker获取消息的功能模块,直接使用 kombu 的 consumer 就可以做到。

    但是实际上,celery Consumer 组件的概念远远要大于Kombu的Consumer,不只是利用了Kombu的Consumer从broker取得消息。也包括消息的消费,分发,监控,心跳等一系列功能。可以说,除了消息循环引擎 被 hub 承担,多进程被 Pool,Autoscaler 承担,定时任务被 timer,beat 承担之外,其他主要功能都被 Consumer 承担。

    因此,我们需要看看:

    • 哪些组件可以利用 Kombu直接完成,哪些需要Celery自己重新设计。

    • 若重新设计,哪些可以基于Kombu设计,如何调用相应Kombu模块。

    • 若使用Kombu模块作为Celery模块的变量,这些Kombu模块分别属于哪些Celery模块。

    0x04 对 AMQP / Kombu 的封装

    Celery如果想成为消息处理系统,首先需要解决消息协议和消息传输问题。

    • 消息协议由 AMQP(Advanced Message Queuing Protocol:高级消息队列协议)解决。Celery 支持所有AMQP路由机制,可以通过配置的方式,执行相关的消息路由。
    • 消息实现和传输由 Kombu 解决。由之前对 Kombu 的分析我们知道,Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象,是一个把消息传递封装成统一接口的库。

    所以我们首先看看如何封装 AMQP / Kombu。

    具体封装是在 celery/app/amqp.py 文件中,其中主要有两个类:AMQP 和 Queues。

    4.1 封装

    AMQP类的功能是 发送/接受消息,是对amqp协议实现的再一次封装,在这里其实就是对 kombu 类的再一次封装。

    我们可以看到,其内部成员变量都是来自于 Kombu。比如 Connection, Consumer, Exchange, Producer, Queue, pools。

    from kombu import Connection, Consumer, Exchange, Producer, Queue, pools
    
    class AMQP:
        """App AMQP API: app.amqp."""
    
        Connection = Connection
        Consumer = Consumer
        Producer = Producer
    
        #: compat alias to Connection
        BrokerConnection = Connection
    
        queues_cls = Queues
    
        #: Cached and prepared routing table.
        _rtable = None
    
        #: Underlying producer pool instance automatically
        #: set by the :attr:`producer_pool`.
        _producer_pool = None
    
        # Exchange class/function used when defining automatic queues.
        # For example, you can use ``autoexchange = lambda n: None`` to use the
        # AMQP default exchange: a shortcut to bypass routing
        # and instead send directly to the queue named in the routing key.
        autoexchange = None
    

    为了更好的理解,我们打印出amqp类的具体内容来看看。

    amqp = {AMQP}  
     BrokerConnection = {type} <class 'kombu.connection.Connection'>
     Connection = {type} <class 'kombu.connection.Connection'>
     Consumer = {type} <class 'kombu.messaging.Consumer'>
     Producer = {type} <class 'kombu.messaging.Producer'>
     app = {Celery} <Celery myTest at 0x252bd2903c8>
     autoexchange = {NoneType} None
     default_exchange = {Exchange} Exchange celery(direct)
     default_queue = {Queue} <unbound Queue celery -> <unbound Exchange celery(direct)> -> celery>
     producer_pool = {ProducerPool} <kombu.pools.ProducerPool object at 0x00000252BDC8F408>
     publisher_pool = {ProducerPool} <kombu.pools.ProducerPool object at 0x00000252BDC8F408>
     queues = {Queues: 1} {'celery': <unbound Queue celery -> <unbound Exchange celery(direct)> -> celery>}
     queues_cls = {type} <class 'celery.app.amqp.Queues'>
     router = {Router} <celery.app.routes.Router object at 0x00000252BDC6B248>
     routes = {tuple: 0} ()
     task_protocols = {dict: 2} {1: <bound method AMQP.as_task_v1 of <celery.app.amqp.AMQP object at 0x00000252BDC74148>>, 2: <bound method AMQP.as_task_v2 of <celery.app.amqp.AMQP object at 0x00000252BDC74148>>}
      _event_dispatcher = {EventDispatcher} <celery.events.dispatcher.EventDispatcher object at 0x00000252BE750348>
      _producer_pool = {ProducerPool} <kombu.pools.ProducerPool object at 0x00000252BDC8F408>
      _rtable = {tuple: 0} ()
    

    具体逻辑如下:

    +---------+
    | Celery  |    +----------------------------+
    |         |    |   celery.app.amqp.AMQP     |
    |         |    |                            |
    |         |    |                            |
    |         |    |          BrokerConnection +----->  kombu.connection.Connection
    |         |    |                            |
    |   amqp+----->+          Connection       +----->  kombu.connection.Connection
    |         |    |                            |
    +---------+    |          Consumer         +----->  kombu.messaging.Consumer
                   |                            |
                   |          Producer         +----->  kombu.messaging.Producer
                   |                            |
                   |          producer_pool    +----->  kombu.pools.ProducerPool
                   |                            |
                   |          queues           +----->  celery.app.amqp.Queues
                   |                            |
                   |          router           +----->  celery.app.routes.Router
                   +----------------------------+
    

    4.2 Queues

    Queues 则是一个扩展,一个逻辑概念,可以认为是 Broker 概念的进一步缩减版

    Producer 把任务发送给 Queues,Worker 从 Queues 获取任务,进行消费。

    app.amqp.queues 就是 Queues 的一个实例,在其中存储了本 Worker 可以读取的所有 kombu.Queue。

    class Queues(dict):
        """Queue name⇒ declaration mapping.
    
        Arguments:
            queues (Iterable): Initial list/tuple or dict of queues.
            create_missing (bool): By default any unknown queues will be
                added automatically, but if this flag is disabled the occurrence
                of unknown queues in `wanted` will raise :exc:`KeyError`.
            max_priority (int): Default x-max-priority for queues with none set.
        """
    
        #: If set, this is a subset of queues to consume from.
        #: The rest of the queues are then used for routing only.
        _consume_from = None
    
        def __init__(self, queues=None, default_exchange=None,
                     create_missing=True, autoexchange=None,
                     max_priority=None, default_routing_key=None):
            dict.__init__(self)
            self.aliases = WeakValueDictionary()
            self.default_exchange = default_exchange
            self.default_routing_key = default_routing_key
            self.create_missing = create_missing
            self.autoexchange = Exchange if autoexchange is None else autoexchange
            self.max_priority = max_priority
            if queues is not None and not isinstance(queues, Mapping):
                queues = {q.name: q for q in queues}
            queues = queues or {}
            for name, q in queues.items():
                self.add(q) if isinstance(q, Queue) else self.add_compat(name, **q)
    

    对于一个 Consumer,可以配置其 queue,一个 Consumer 可以有多个queue,比如:

    def add_consumer(state, queue, exchange=None, exchange_type=None,
                     routing_key=None, **options):
        """Tell worker(s) to consume from task queue by name."""
        state.consumer.call_soon(
            state.consumer.add_task_queue,
            queue, exchange, exchange_type or 'direct', routing_key, **options)
        return ok(f'add consumer {queue}')
    

    add_consumer 名字个人认为有一定误导,其实是添加 queue,但是名字看起来像添加 Consumer。

    而在 Consumer 之中,会对 queues 进行具体配置。

    def add_task_queue(self, queue, exchange=None, exchange_type=None,
                       routing_key=None, **options):
        cset = self.task_consumer
        queues = self.app.amqp.queues
        if queue in queues:
            q = queues[queue]
        else:
            exchange = queue if exchange is None else exchange
            exchange_type = ('direct' if exchange_type is None
                             else exchange_type)
            q = queues.select_add(queue,
                                  exchange=exchange,
                                  exchange_type=exchange_type,
                                  routing_key=routing_key, **options)
        if not cset.consuming_from(queue):
            cset.add_queue(q)
            cset.consume()
            info('Started consuming from %s', queue)
    

    0x05 TBC

    通过以上的分析,大家应该对 Celery 的架构有了初步的了解。在下篇文章中,我们将从几个方面做进一步思考,敬请期待。

    0xFF 参考

    Nginx资料之Master与Worker基础概念

    1: Worker 启动流程概述

    2: Worker 的执行引擎

    3: Task 对象的实现

    4: 定时任务的实现

    5: 远程控制管理

    6: Events 的实现

    7: Worker 之间的交互

    8: State 和 Result

    Spark分布式计算引擎的应用

    mfc 消息消息队列概念_消息队列和任务队列到底有什么不同?

  • 相关阅读:
    前端--HTML
    并发函数--线程
    并发编程--进程
    一个好用的网站,各种在线
    django Models与数据库关系
    流文件下载
    小白都能秒懂的各数据库在Django的配置
    关于django 内建缓存 信号 及自定义json的配置
    django批量创建数据
    关于drf的组件
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/14562308.html
Copyright © 2011-2022 走看看