zoukankan      html  css  js  c++  java
  • [源码解析] 并行分布式任务队列 Celery 之 EventDispatcher & Event 组件

    [源码解析] 并行分布式任务队列 Celery 之 EventDispatcher & Event 组件

    0x00 摘要

    Celery是一个简单、灵活且可靠的,处理大量事件的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。

    本文讲解 EventDispatcher 和 Event 组件 如何实现。

    0x01 思路

    EventDispatcher 和 Event 组件负责 Celery 内部事件(Event)的处理。

    从字面上可以知道,EventDispatcher 组件的功能是事件(Event)分发,所以我们可以有如下已知信息:

    • 事件分发 势必有生产者,消费者,EventDispatcher 就是作为 事件生产者;
    • 涉及到生产消费,那么需要有一个 broker 存储中间事件;
    • 因为 Celery 底层依赖于 Kombu,而 Kombu 本身就有生产者,消费者概念,所以这里可以直接利用这两个概念;
    • Kombu 也提供了 Mailbox 的实现,它的作用就是通过 Mailbox 我们可以实现不同实例之间的事件发送和处理,具体可以是单播 和 广播;

    所以我们可以大致推论:EventDispatcher 可以利用 kombu 的 producer, consumer 或者 Mailbox。

    而 Events 是负责事件(Event)的接受,所以我们也可以推论:

    • Events 利用 Kombu 的消费者来处理 事件;
    • 具体如何处理事件,则会依据 Celery 的当前状态决定,这就涉及到了 State 功能;

    我们下面就看看具体是怎么实现的。

    为了让大家更好理解,我们先给出一个逻辑图如下:

    0x02 定义

    EventDispatcher 代码位于:celeryeventsdispatcher.py

    可以看到一个事件分发者需要拥有哪些成员变量以实现自己的功能:

    • connection (kombu.Connection) :就是用来和 Broker 交互的连接功能;
    • channel (kombu.Channel) : Channel 可以理解成共享一个Connection的多个轻量化连接。就是真正的连接。
      • Connection 是 AMQP 对 连接的封装;
      • Channel 是 AMQP 对 MQ 的操作的封装;
      • 具体以 "针对redis的轻量化连接" 来说,Channel 可以认为是 redis 操作和连接的封装。每个 Channel 都可以与 redis 建立一个连接,在此连接之上对 redis 进行操作,每个连接都有一个 socket,每个 socket 都有一个 file,从这个 file 可以进行 poll。
    • producer :事件生产者,使用 kombu producer 概念;
    • exchange :生产者发布事件时,先将事件发送到Exchange,通过Exchange与队列的绑定规则将事件发送到队列。
    • hostname : 用来标示自己,这样 EventDispatcher 的使用者可以知道并且使用;
    • groups :事件组功能;
    • _outbound_buffer :事件缓存;
    • clock :Lamport 逻辑时钟,在分布式系统中用于区分事件的发生顺序的时间机制;

    具体类的定义是:

    class EventDispatcher:
        """Dispatches event messages.
        """
    
        DISABLED_TRANSPORTS = {'sql'}
    
        app = None
    
        def __init__(self, connection=None, hostname=None, enabled=True,
                     channel=None, buffer_while_offline=True, app=None,
                     serializer=None, groups=None, delivery_mode=1,
                     buffer_group=None, buffer_limit=24, on_send_buffered=None):
            self.app = app_or_default(app or self.app)
            self.connection = connection
            self.channel = channel
            self.hostname = hostname or anon_nodename()
            self.buffer_while_offline = buffer_while_offline
            self.buffer_group = buffer_group or frozenset()
            self.buffer_limit = buffer_limit
            self.on_send_buffered = on_send_buffered
            self._group_buffer = defaultdict(list)
            self.mutex = threading.Lock()
            self.producer = None
            self._outbound_buffer = deque()
            self.serializer = serializer or self.app.conf.event_serializer
            self.on_enabled = set()
            self.on_disabled = set()
            self.groups = set(groups or [])
            self.tzoffset = [-time.timezone, -time.altzone]
            self.clock = self.app.clock
            self.delivery_mode = delivery_mode
            if not connection and channel:
                self.connection = channel.connection.client
            self.enabled = enabled
            conninfo = self.connection or self.app.connection_for_write()
            self.exchange = get_exchange(conninfo,
                                         name=self.app.conf.event_exchange)
            if conninfo.transport.driver_type in self.DISABLED_TRANSPORTS:
                self.enabled = False
            if self.enabled:
                self.enable()
            self.headers = {'hostname': self.hostname}
            self.pid = os.getpid()
    

    我们先给出此时变量内容,大家可以先有所了解。

    self = {EventDispatcher} <celery.events.dispatcher.EventDispatcher object at 0x000001D37765B308>
     DISABLED_TRANSPORTS = {set: 1} {'sql'}
     app = {Celery} <Celery myTest at 0x1d375a69e88>
     buffer_group = {frozenset: 0} frozenset()
     buffer_limit = {int} 24
     buffer_while_offline = {bool} True
     channel = {NoneType} None
     clock = {LamportClock} 0
     connection = {Connection} <Connection: redis://localhost:6379// at 0x1d37765b388>
     delivery_mode = {int} 1
     enabled = {bool} True
     exchange = {Exchange} Exchange celeryev(fanout)
     groups = {set: 1} {'worker'}
     headers = {dict: 1} {'hostname': 'celery@DESKTOP-0GO3RPO'}
     hostname = {str} 'celery@DESKTOP-0GO3RPO'
     mutex = {lock} <unlocked _thread.lock object at 0x000001D377623A20>
     on_disabled = {set: 1} {<bound method Heart.stop of <celery.worker.heartbeat.Heart object at 0x000001D377636408>>}
     on_enabled = {set: 1} {<bound method Heart.start of <celery.worker.heartbeat.Heart object at 0x000001D377636408>>}
     on_send_buffered = {NoneType} None
     pid = {int} 26144
     producer = {Producer} <Producer: <promise: 0x1d37761cf78>>
     publisher = {Producer} <Producer: <promise: 0x1d37761cf78>>
     serializer = {str} 'json'
     tzoffset = {list: 2} [28800, 32400]
      _group_buffer = {defaultdict: 0} defaultdict(<class 'list'>, {})
      _outbound_buffer = {deque: 0} deque([])
    

    0x03 Producer

    我们发现,EventDispatcher 确实使用了 Kombu 的 Producer,当然 Celery 这里使用 ampq 对 Kombu 做了封装。所以我们重点就需要看如何配置 Producer。

    具体需要配置的是:

    • Connection,需要以此来知道联系哪一个 Redis;

    • Exchange,需要知道读取哪一个 Queue;

    下面我们就逐一分析。

    3.1 Connection

    由代码可以看到,Connection 是直接使用 Celery 的 connection_for_write

    conninfo = self.connection or self.app.connection_for_write()
    

    此时变量为:

    connection = {Connection} <Connection: redis://localhost:6379// at 0x1be931de148>
    conninfo = {Connection} <Connection: redis://localhost:6379// at 0x1be931de148>
    

    3.2 Exchange

    Exchange 概念如下:

    • Exchange:交换机 或者 路由。事件发送者将事件发至Exchange,Exchange负责将事件分发至队列;
    • Queue:事件队列,存储着即将被应用消费掉的事件,Exchange负责将事件分发Queue,消费者从Queue接收事件;

    具体来说,Exchange 用于路由事件(事件发给exchange,exchange发给对应的queue)。

    交换机通过匹配事件的 routing_key 和 binding_key来转发事件,binding_key 是consumer 声明队列时与交换机的绑定关系。

    路由就是比较routing-key(这个 message 提供)和 binding-key(这个queue 注册到 exchange 的时候提供)。

    使用时,需要指定exchange的名称和类型(direct,topic和fanout)。可以发现,和RabbitMQ中的exchange概念是一样的。事件发送给exchages。交换机可以被命名,可以通过路由算法进行配置。

    具体回到代码上。

    def get_exchange(conn, name=EVENT_EXCHANGE_NAME):
        """Get exchange used for sending events.
    
        Arguments:
            conn (kombu.Connection): Connection used for sending/receiving events.
            name (str): Name of the exchange. Default is ``celeryev``.
    
        Note:
            The event type changes if Redis is used as the transport
            (from topic -> fanout).
        """
        ex = copy(event_exchange)
        if conn.transport.driver_type == 'redis':
            # quick hack for Issue #436
            ex.type = 'fanout'
        if name != ex.name:
            ex.name = name
        return ex
    

    此时变量为:

    EVENT_EXCHANGE_NAME = 'celeryev'
        
    self.exchange = {Exchange} Exchange celeryev(fanout)
    

    所以我们知道,这里默认的 Exchange 就是一个 celeryev(fanout) 类型。

    3.3 建立

    于是,我们具体就看到了 Producer。

        def enable(self):
            self.producer = Producer(self.channel or self.connection,
                                     exchange=self.exchange,
                                     serializer=self.serializer,
                                     auto_declare=False)
            self.enabled = True
            for callback in self.on_enabled:
                callback()
    

    0x04 分发事件

    既然建立了 Producer,我们就可以进行发送。

    4.1 Send 发送

    发送事件就是直接是否需要成组发送。

    • 如果需要分组发送,就内部有一个缓存,然后成组发送;
    • 否则就直接调用 Producer publish API 发送。

    关于如何区分分组是依靠如下代码:

    groups, group = self.groups, group_from(type)
    

    相关变量为:

    group = {str} 'worker'
    groups = {set: 1} {'worker'}
    type = {str} 'worker-online'
    

    发送具体代码如下:

        def send(self, type, blind=False, utcoffset=utcoffset, retry=False,
                 retry_policy=None, Event=Event, **fields):
            """Send event.
            """
            if self.enabled:
                groups, group = self.groups, group_from(type)
                if groups and group not in groups:
                    return
                if group in self.buffer_group:
                    clock = self.clock.forward()
                    event = Event(type, hostname=self.hostname,
                                  utcoffset=utcoffset(),
                                  pid=self.pid, clock=clock, **fields)
                    buf = self._group_buffer[group]
                    buf.append(event)
                    if len(buf) >= self.buffer_limit:
                        self.flush()
                    elif self.on_send_buffered:
                        self.on_send_buffered()
                else:
                    return self.publish(type, fields, self.producer, blind=blind,
                                        Event=Event, retry=retry,
                                        retry_policy=retry_policy)
    

    4.2 publish 与 broker 交互

    send 会调用到这里。

    这里构建了 routing_key :

    routing_key=type.replace('-', '.')
    

    于是得倒了routing_key 为 'worker.online'。

    也构建了 Event;

    event = {dict: 13} 
     'hostname' = {str} 'celery@DESKTOP-0GO3RPO'
     'utcoffset' = {int} -8
     'pid' = {int} 24320
     'clock' = {int} 1
     'freq' = {float} 2.0
     'active' = {int} 0
     'processed' = {int} 0
     'loadavg' = {tuple: 3} (0.0, 0.0, 0.0)
     'sw_ident' = {str} 'py-celery'
     'sw_ver' = {str} '5.0.5'
     'sw_sys' = {str} 'Windows'
     'timestamp' = {float} 1611464767.3456059
     'type' = {str} 'worker-online'
     __len__ = {int} 13
    

    publish 代码如下:

        def publish(self, type, fields, producer,
                    blind=False, Event=Event, **kwargs):
            """Publish event using custom :class:`~kombu.Producer`.
    
            Arguments:
                type (str): Event type name, with group separated by dash (`-`).
                    fields: Dictionary of event fields, must be json serializable.
                producer (kombu.Producer): Producer instance to use:
                    only the ``publish`` method will be called.
                retry (bool): Retry in the event of connection failure.
                retry_policy (Mapping): Map of custom retry policy options.
                    See :meth:`~kombu.Connection.ensure`.
                blind (bool): Don't set logical clock value (also don't forward
                    the internal logical clock).
                Event (Callable): Event type used to create event.
                    Defaults to :func:`Event`.
                utcoffset (Callable): Function returning the current
                    utc offset in hours.
            """
            clock = None if blind else self.clock.forward()
            event = Event(type, hostname=self.hostname, utcoffset=utcoffset(),
                          pid=self.pid, clock=clock, **fields)
            with self.mutex:
                return self._publish(event, producer,
                                     routing_key=type.replace('-', '.'), **kwargs)
    
        def _publish(self, event, producer, routing_key, retry=False,
                     retry_policy=None, utcoffset=utcoffset):
            exchange = self.exchange
            try:
                producer.publish(
                    event,
                    routing_key=routing_key,
                    exchange=exchange.name,
                    retry=retry,
                    retry_policy=retry_policy,
                    declare=[exchange],
                    serializer=self.serializer,
                    headers=self.headers,
                    delivery_mode=self.delivery_mode,
                )
            except Exception as exc:  # pylint: disable=broad-except
                if not self.buffer_while_offline:
                    raise
                self._outbound_buffer.append((event, routing_key, exc))
    
    

    因为是 pubsub,所以此时在 redis 之中看不到事件内容。

    此时redis内容如下(看不到事件):

    redis-cli.exe -p 6379
    127.0.0.1:6379> keys *
    1) "_kombu.binding.celery.pidbox"
    2) "_kombu.binding.celery"
    3) "_kombu.binding.celeryev"
    127.0.0.1:6379> smembers _kombu.binding.celeryev
     1) "worker.#x06x16x06x16celeryev.64089900-d397-4564-b343-742664c1b214"
    127.0.0.1:6379> smembers _kombu.binding.celery
    1) "celeryx06x16x06x16celery"
    127.0.0.1:6379> smembers _kombu.binding.celery.pidbox
    1) "x06x16x06x16celery@DESKTOP-0GO3RPO.celery.pidbox"
    127.0.0.1:6379>
    

    现在,EventDispatcher 组件已经把事件发送出去。

    这个事件将如何处理?我们需要看看 Events 组件

    0x05 Events 组件

    5.1 Event 有什么用

    前面说了,Celery 在 Task/Worker 的状态发生变化的时候就会发出 Event,所以,一个很明显的应用就是监控 Event 的状态,例如 Celery 大家所熟知的基于 WebUI 的管理工具 flower 就用到了 Event,但是,这也是一个比较明显的应用,除此之外,我们还可以利用 Event 来给 Task 做快照,甚至实时对 Task 的状态转变做出响应,例如任务失败之后触发报警,任务成功之后执行被依赖的任务等等,总结一下,其实就是:

    • 对 Task 的状态做快照;
    • 对 Task 的状态做实时处理;
    • 监控 Celery(Worker/Task) 的执行状态;

    5.2 调试

    Celery Events 可以用来开启快照相机,或者将事件dump到标准输出。

    比如:

    celery -A proj events -c myapp.DumpCam --frequency=2.0
    
    celery -A proj events --camera=<camera-class> --frequency=1.0
    
    celery -A proj events --dump 
    

    为了调试,我们需要采用如下方式:

    app.start(argv=['events'])
    

    具体命令实现是:

    def events(ctx, dump, camera, detach, frequency, maxrate, loglevel, **kwargs):
        """Event-stream utilities."""
        app = ctx.obj.app
        if dump:
            return _run_evdump(app)
    
        if camera:
            return _run_evcam(camera, app=app, freq=frequency, maxrate=maxrate,
                              loglevel=loglevel,
                              detach=detach,
                              **kwargs)
    
        return _run_evtop(app)
    

    5.3 入口

    Events入口为:

    def _run_evtop(app):
        try:
            from celery.events.cursesmon import evtop
            _set_process_status('top')
            return evtop(app=app)
    

    接着跟踪看看。

    def evtop(app=None):  # pragma: no cover
        """Start curses monitor."""
        app = app_or_default(app)
        state = app.events.State()
        display = CursesMonitor(state, app)
        display.init_screen()
        refresher = DisplayThread(display)
        refresher.start()
       
        capture_events(app, state, display)
    

    5.4 事件循环

    我们来到了事件循环。

    这里建立了一个 app.events.Receiver。

    注意,这里给 Receiver 传入的 handlers={'*': state.event},是后续处理事件时候的处理函数。

    def capture_events(app, state, display):  # pragma: no cover
    
        while 1:
            with app.connection_for_read() as conn:
                try:
                    conn.ensure_connection(on_connection_error,
                                           app.conf.broker_connection_max_retries)
                    
                    recv = app.events.Receiver(conn, handlers={'*': state.event})
                    
                    display.resetscreen()
                    display.init_screen()
                    
                    recv.capture()
                    
                except conn.connection_errors + conn.channel_errors as exc:
                    print(f'Connection lost: {exc!r}', file=sys.stderr)
    

    结果发现是循环调用 recv.capture()。

    具体如下:

    Events
    
    
       +--------------------+
       |      loop          |
       |                    |
       |                    |
       |                    |
       |                    |
       |                    v
       |
       |        EventReceiver.capture()
       |
       |                    +
       |                    |
       |                    |
       |                    |
       |                    |
       |                    |
       |                    |
       +--------------------+
    

    5.5 EventReceiver

    EventReceiver 就是用来接收Event,并且处理的。而且需要留意,EventReceiver 是继承 ConsumerMixin。

    class EventReceiver(ConsumerMixin):
        """Capture events.
    
        Arguments:
            connection (kombu.Connection): Connection to the broker.
            handlers (Mapping[Callable]): Event handlers.
                This is  a map of event type names and their handlers.
                The special handler `"*"` captures all events that don't have a
                handler.
        """
    
    

    其代码如下:

        def capture(self, limit=None, timeout=None, wakeup=True):
            """Open up a consumer capturing events.
    
            This has to run in the main process, and it will never stop
            unless :attr:`EventDispatcher.should_stop` is set to True, or
            forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
            """
            for _ in self.consume(limit=limit, timeout=timeout, wakeup=wakeup):
                pass
    
    

    对应变量如下:

    self.consume = {method} <bound method ConsumerMixin.consume of <celery.events.receiver.EventReceiver object at 0x000001CA8C22AB08>>
    
    self = {EventReceiver} <celery.events.receiver.EventReceiver object at 0x000001CA8C22AB08>
    

    可以看到利用了 ConsumerMixin 来处理事件。其实从文章开始时候我们就知道,既然有 kombu . producer ,就必然有 kombu . consumer。

    这里其实是有多个 EventReceiver 绑定了这个 Connection,然后 ConsumerMixin 帮助协调这些 Receiver,每个 Receiver 都可以收到这些 Event,但是能不能处理就看他们的 routing_key 设置得好不好了

    所以如下:

    Events
    
    
       +--------------------+
       |      loop          |
       |                    |
       |                    |
       |                    |
       |                    |
       |                    v
       |
       |     EventReceiver(ConsumerMixin).capture()
       |
       |                    +
       |                    |
       |                    |
       |                    |
       |                    |
       |                    |
       |                    |
       +--------------------+
    
    

    5.6 ConsumerMixin

    ConsumerMixin 是 Kombu 提供的 组合模式类,可以用来方便的实现 Consumer Programs。

    class ConsumerMixin:
        """Convenience mixin for implementing consumer programs.
    
        It can be used outside of threads, with threads, or greenthreads
        (eventlet/gevent) too.
    
        The basic class would need a :attr:`connection` attribute
        which must be a :class:`~kombu.Connection` instance,
        and define a :meth:`get_consumers` method that returns a list
        of :class:`kombu.Consumer` instances to use.
        Supporting multiple consumers is important so that multiple
        channels can be used for different QoS requirements.
    	"""
    

    文件在 :kombumixins.py

        def consume(self, limit=None, timeout=None, safety_interval=1, **kwargs):
            elapsed = 0
            with self.consumer_context(**kwargs) as (conn, channel, consumers):
                for i in limit and range(limit) or count():
                    if self.should_stop:
                        break
                    self.on_iteration()
                    try:
                        conn.drain_events(timeout=safety_interval)
                    except socket.timeout:
                        conn.heartbeat_check()
                        elapsed += safety_interval
                        if timeout and elapsed >= timeout:
                            raise
                    except OSError:
                        if not self.should_stop:
                            raise
                    else:
                        yield
                        elapsed = 0
    

    5.6.1 Consumer

    ConsumerMixin 内部建立 Consumer如下:

        @contextmanager
        def Consumer(self):
            with self.establish_connection() as conn:
                self.on_connection_revived()
    
                channel = conn.default_channel
                cls = partial(Consumer, channel,
                              on_decode_error=self.on_decode_error)
                with self._consume_from(*self.get_consumers(cls, channel)) as c:
                    yield conn, channel, c
    
                self.on_consume_end(conn, channel)
    

    在 具体建立时候,把self._receive设置为 Consumer callback。

        def get_consumers(self, Consumer, channel):
            return [Consumer(queues=[self.queue],
                             callbacks=[self._receive], no_ack=True,
                             accept=self.accept)]
    

    堆栈为:

    get_consumers, receiver.py:72
    Consumer, mixins.py:230
    __enter__, contextlib.py:112
    consumer_context, mixins.py:181
    __enter__, contextlib.py:112
    consume, mixins.py:188
    capture, receiver.py:91
    evdump, dumper.py:95
    _run_evdump, events.py:21
    events, events.py:87
    caller, base.py:132
    new_func, decorators.py:21
    invoke, core.py:610
    invoke, core.py:1066
    invoke, core.py:1259
    main, core.py:782
    start, base.py:358
    <module>, myEvent.py:18
    

    此时变量为:

    self.consume = {method} <bound method ConsumerMixin.consume of <celery.events.receiver.EventReceiver object at 0x000001FE106E06C8>>
    self.queue = {Queue} <unbound Queue celeryev.6e24485e-9f27-46e1-90c9-6b52f44b9902 -> <unbound Exchange celeryev(fanout)> -> #>
    self._receive = {method} <bound method EventReceiver._receive of <celery.events.receiver.EventReceiver object at 0x000001FE106E06C8>>
    Consumer = {partial} functools.partial(<class 'kombu.messaging.Consumer'>, <kombu.transport.redis.Channel object at 0x000001FE1080CC08>, on_decode_error=<bound method ConsumerMixin.on_decode_error of <celery.events.receiver.EventReceiver object at 0x000001FE106E06C8>>)
    channel = {Channel} <kombu.transport.redis.Channel object at 0x000001FE1080CC08>
    self = {EventReceiver} <celery.events.receiver.EventReceiver object at 0x000001FE106E06C8>
    

    此时为:

     Events
    
    
    +-----------------------------------------+
    | EventReceiver(ConsumerMixin)            |
    |                                         |
    |                                         |
    |                                         |  consume
    |                                         |               +------------------+
    |                            capture  +-----------------> | Consumer         |
    |                                         |               |                  |
    |                                         |               |                  |
    |                                         |               |                  |
    |                           _receive  <----------------------+ callbacks     |
    |                                         |               |                  |
    |                                         |               |                  |
    |                                         |               +------------------+
    +-----------------------------------------+
    
    

    5.7 接收

    当有事件时候,就调用 _receive 进行接收。

        def _receive(self, body, message, list=list, isinstance=isinstance):
            if isinstance(body, list):  # celery 4.0+: List of events
                process, from_message = self.process, self.event_from_message
                [process(*from_message(event)) for event in body]
            else:
                self.process(*self.event_from_message(body))
    

    5.8 处理

    接受之后,就可以进行处理。

        def process(self, type, event):
            """Process event by dispatching to configured handler."""
            handler = self.handlers.get(type) or self.handlers.get('*')
            handler and handler(event)
    

    此时如下:

    这里的 Receiver . handlers 是建立 Receiver时候 传入的 handlers={'*': state.event},是后续处理事件时候的处理函数。

    Events
    
    
    +-----------------------------------------+
    | EventReceiver(ConsumerMixin)            |
    |                                         |
    |                                         |
    |                                         |  consume
    |                                         |               +------------------+
    |                            capture  +-----------------> | Consumer         |
    |                                         |               |                  |
    |                                         |               |                  |
    |                                         |               |                  |
    |                           _receive  <----------------------+ callbacks     |
    |                                         |               |                  |
    |                                         |               |                  |
    |                                         |               +------------------+
    |                                         |
    |                            handlers +------------+
    |                                         |        |      +------------------+
    +-----------------------------------------+        |      |state             |
                                                       |      |                  |
                                                       |      |                  |
                                                       +-------->event           |
                                                              |                  |
                                                              |                  |
                                                              +------------------+
    

    5.9 state处理函数

    具体如下:

        @cached_property
        def _event(self):
            return self._create_dispatcher()
    

    概括起来是这样的:

    1. 先找 group 的 handler,有的话就用这个了,否则看下面;这个默认是没东西的,所以可以先pass
    2. 如果是 worker 的 Event,就执行 worker 对应的处理
    3. 如果是 task 的 Event,就执行 task 的对应处理
        def _create_dispatcher(self):
            # noqa: C901
            # pylint: disable=too-many-statements
            # This code is highly optimized, but not for reusability.
            get_handler = self.handlers.__getitem__
            event_callback = self.event_callback
            wfields = itemgetter('hostname', 'timestamp', 'local_received')
            tfields = itemgetter('uuid', 'hostname', 'timestamp',
                                 'local_received', 'clock')
            taskheap = self._taskheap
            th_append = taskheap.append
            th_pop = taskheap.pop
            # Removing events from task heap is an O(n) operation,
            # so easier to just account for the common number of events
            # for each task (PENDING->RECEIVED->STARTED->final)
            #: an O(n) operation
            max_events_in_heap = self.max_tasks_in_memory * self.heap_multiplier
            add_type = self._seen_types.add
            on_node_join, on_node_leave = self.on_node_join, self.on_node_leave
            tasks, Task = self.tasks, self.Task
            workers, Worker = self.workers, self.Worker
            # avoid updating LRU entry at getitem
            get_worker, get_task = workers.data.__getitem__, tasks.data.__getitem__
    
            get_task_by_type_set = self.tasks_by_type.__getitem__
            get_task_by_worker_set = self.tasks_by_worker.__getitem__
    
            def _event(event,
                       timetuple=timetuple, KeyError=KeyError,
                       insort=bisect.insort, created=True):
                self.event_count += 1
                if event_callback:
                    event_callback(self, event)
                group, _, subject = event['type'].partition('-')
                try:
                    handler = get_handler(group)
                except KeyError:
                    pass
                else:
                    return handler(subject, event), subject
    
                if group == 'worker':
                    try:
                        hostname, timestamp, local_received = wfields(event)
                    except KeyError:
                        pass
                    else:
                        is_offline = subject == 'offline'
                        try:
                            worker, created = get_worker(hostname), False
                        except KeyError:
                            if is_offline:
                                worker, created = Worker(hostname), False
                            else:
                                worker = workers[hostname] = Worker(hostname)
                        worker.event(subject, timestamp, local_received, event)
                        if on_node_join and (created or subject == 'online'):
                            on_node_join(worker)
                        if on_node_leave and is_offline:
                            on_node_leave(worker)
                            workers.pop(hostname, None)
                        return (worker, created), subject
                elif group == 'task':
                    (uuid, hostname, timestamp,
                     local_received, clock) = tfields(event)
                    # task-sent event is sent by client, not worker
                    is_client_event = subject == 'sent'
                    try:
                        task, task_created = get_task(uuid), False
                    except KeyError:
                        task = tasks[uuid] = Task(uuid, cluster_state=self)
                        task_created = True
                    if is_client_event:
                        task.client = hostname
                    else:
                        try:
                            worker = get_worker(hostname)
                        except KeyError:
                            worker = workers[hostname] = Worker(hostname)
                        task.worker = worker
                        if worker is not None and local_received:
                            worker.event(None, local_received, timestamp)
    
                    origin = hostname if is_client_event else worker.id
    
                    # remove oldest event if exceeding the limit.
                    heaps = len(taskheap)
                    if heaps + 1 > max_events_in_heap:
                        th_pop(0)
    
                    # most events will be dated later than the previous.
                    timetup = timetuple(clock, timestamp, origin, ref(task))
                    if heaps and timetup > taskheap[-1]:
                        th_append(timetup)
                    else:
                        insort(taskheap, timetup)
    
                    if subject == 'received':
                        self.task_count += 1
                    task.event(subject, timestamp, local_received, event)
                    task_name = task.name
                    if task_name is not None:
                        add_type(task_name)
                        if task_created:  # add to tasks_by_type index
                            get_task_by_type_set(task_name).add(task)
                            get_task_by_worker_set(hostname).add(task)
                    if task.parent_id:
                        try:
                            parent_task = self.tasks[task.parent_id]
                        except KeyError:
                            self._add_pending_task_child(task)
                        else:
                            parent_task.children.add(task)
                    try:
                        _children = self._tasks_to_resolve.pop(uuid)
                    except KeyError:
                        pass
                    else:
                        task.children.update(_children)
    
                    return (task, task_created), subject
            return _event
    
    

    具体如下:

     Events
    
    
    +-----------------------------+
    | EventReceiver(ConsumerMixin |
    |                             |
    |                             |               +------------------+
    |                             |  consume      | Consumer         |
    |                             |               |                  |
    |                capture  +-----------------> |                  |
    |                             |               |                  |
    |                             |               |                  |
    |                             |               |                  |
    |               _receive  <----------------------+ callbacks     |
    |                             |               |                  |
    |                             |               |                  |
    |                             |               +------------------+
    |                             |
    |                handlers +------------+
    |                             |        |      +------------------------+
    +-----------------------------+        |      |state                   |
                                           |      |                        |
                                           |      |                        |
                                           +---------> event +---+         |
                                                  |              |         |
                                                  |              |         |
                                                  |              v         |
                                                  |     _create_dispatcher |
                                                  |              +         |
                                                  |              |         |
                                                  |              |         |
                                                  |              |         |
                                                  +------------------------+
                                                                 |
                                                                 |
                                                        +--------+------+
                                    group == 'worker'   |               | group == 'task'
                                                        |               |
                                                        v               v
                                              worker.event          task.event
    
    

    最终,逻辑如下:

                         Producer Scope   +         Broker      +   Consumer Scope
                                          |                     |
    +-----------------------------+       |     Redis pubsub    |     Events
    | EventDispatcher             |       |                     |
    |                             |       |                     |     +-----------------------------+
    |                             |       |                     |     | EventReceiver(ConsumerMixin |
    |                             |       |                     |     |                             |
    |        connection           |       |                     |     |                             |               +------------------+
    |                             |       |                     |     |                             |  consume      | Consumer         |
    |        channel              |       |                     |     |                             |               |                  |
    |                             |       |                     |     |                capture  +-----------------> |                  |
    |        producer  +----------------------->  Event +-----------> |                             |               |                  |
    |                             |       |                     |     |                             |               |                  |
    |        exchange             |       |                     |     |                             |               |                  |
    |                             |       |                     |     |               _receive  <----------------------+ callbacks     |
    |        hostname             |       |                     |     |                             |               |                  |
    |                             |       |                     |     |                             |               |                  |
    |        groups               |       |                     |     |                             |               +------------------+
    |                             |       |                     |     |                             |
    |        _outbound_buffer     |       |                     |     |                handlers +------------+
    |                             |       |                     |     |                             |        |      +------------------------+
    |        clock                |       |                     |     +-----------------------------+        |      |state                   |
    |                             |       |                     |                                            |      |                        |
    +-----------------------------+       |                     |                                            |      |                        |
                                          |                     |                                            +---------> event +---+         |
                                          |                     |                                                   |              |         |
                                          |                     |                                                   |              |         |
                                          |                     |                                                   |              v         |
                                          |                     |                                                   |     _create_dispatcher |
                                          |                     |                                                   |              +         |
                                          |                     |                                                   |              |         |
                                          |                     |                                                   |              |         |
                                          |                     |                                                   |              |         |
                                          |                     |                                                   +------------------------+
                                          |                     |                                                                  |
                                          |                     |                                                                  |
                                          |                     |                                                         +--------+------+
                                          |                     |                                     group == 'worker'   |               | group == 'task'
                                          |                     |                                                         |               |
                                          |                     |                                                         v               v
                                          +                     +                                               worker.event          task.event
    
    

    手机如下:

    至此,Celery 内部的事件发送,接受处理 的两个组件就讲解完毕。

    0xEE 个人信息

    ★★★★★★关于生活和技术的思考★★★★★★

    微信公众账号:罗西的思考

    如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,可以扫描下面二维码(或者长按识别二维码)关注个人公众号)。

    在这里插入图片描述

    0xFF 参考

    6: Events 的实现

    Celery用户指引------监控与管理

  • 相关阅读:
    Enables DNS lookups on client IP addresses 域名的分层结构
    移除 URL 中的 index.php
    Design and Architectural Goals
    The Model represents your data structures.
    If a cache file exists, it is sent directly to the browser, bypassing the normal system execution.
    UNION WHERE
    cookie less
    http 2
    UNION DISTINCT
    联合约束 CONCAT()
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/14732589.html
Copyright © 2011-2022 走看看