zoukankan      html  css  js  c++  java
  • [源码解析] 并行分布式任务队列 Celery 之 消费动态流程

    [源码解析] 并行分布式任务队列 Celery 之 消费动态流程

    0x00 摘要

    Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。

    经过多篇文章之后(在文末有链接),我们介绍了 Celery 如何启动,也介绍了 Task。本文我们就看看收到一个任务之后,Celery(包括 Kombu)内部的消费流程脉络(到多进程之前)。

    目的 是 做一个暂时性总结,梳理目前思路,为下一阶段分析多进程做准备。

    因为是具体流程梳理,所以会涉及到比较多的堆栈信息和运行时变量,希望大家理解。

    0x01 来由

    之前在分析celery的worker的启动过程中,我们提到了,Celery 最后开启了loop等待任务来消费,启动时候定义的回调函数就是 on_task_received,缩减版堆栈如下。

    on_task_received, consumer.py:542
    _receive_callback, messaging.py:620
    _callback, base.py:630
    _deliver, base.py:980
    _brpop_read, redis.py:748
    on_readable, redis.py:358
    create_loop, hub.py:361
    asynloop, loops.py:81
    start, consumer.py:592
    start, bootsteps.py:116
    start, consumer.py:311
    start, bootsteps.py:365
    start, worker.py:204
    worker, worker.py:327
    caller, base.py:132
    new_func, decorators.py:21
    invoke, core.py:610
    main, core.py:782
    start, base.py:358
    worker_main, base.py:374
    

    我们可以大致看出一个逻辑流程:

    • 在 Kombu 范畴是:
      • 在消息循环(hub) 中获取了消息;
      • 经由 Broker抽象(Transport)和 执行引擎(MultiChannelPoller)处理之后,把消息解读出来;
      • 开始调用 Celery 的回调函数;
    • 在 Celery 范畴是:
      • 经由回调函数这个逻辑入口开始,先调用到了 Strategy 根据不同条件做不同的处理;
      • 然后把 工作 委托给 Worker,就是由 Worker 来执行用户 task;
      • 因为需要提高效率,所以需要有多线程处理,就是运行多个线程执行多个 worker;

    仅凭堆栈没有一个整体概念,本文我们就看看 Celery 是如何消费消息的。

    具体我们从 poll 开始看起,即 Redis 之中有一个新的任务消息,Celery 的 BRPOP 对应的 FD 收到了 Poll 响应

    0x02 逻辑 in kombu

    我们从 kombu 开始看。

    首先给出 Kombu 部分的整理逻辑图,这样大家就有了一个整体直观的了解:

     +-------------+        +-------------------+                                +-------------------------+
     | hub         |    1   | Transport         |                2               |MultiChannelPoller       |
     |             | fileno |                   |      cycle.on_readable(fileno) |                         |
     |       cb +--------------> on_readable   +-------------------------------------> _fd_to_chan[fileno] |
     |             |        |                   |                                |                         |
     |      poll   |        |                   +-<---------------+              |  chan.handlers[type]+---------------+
     +-------------+        |  _callbacks[queue]|                 |              |                         |           |
                            |        +          |                 |              +-------------------------+           |
                            |        |          |                 |                                                    |
                            +-------------------+                 |                                                    |
                                     |                            |                                                    |
                                     |                            |              +-----------------------+             |
                                     |                            |              | Channel               |      3      |
                                     |                            |              |                       | _brpop_read |
                                     |                            |              |                       |             |
                                     |                            +----------------+ connection          +<------------+
                                     |                   _deliver(message, queue)|                       |
                                     |        5                      4           |                       |
                                     |     callback(message)                     |                       |
                                     +----------------------------------------------> callback(message)+---------------+
                                                                                 +-----------------------+             |
                                                                                                                       |
                                                                                 +----------------------+              |
                                                                                 | Consumer             |              |
                                                                 on_m(message)   |                      |              |
                                                          +---------------------------+  on_message     | <------------+
                                                          |                      |                      |  _receive_callback
        kombu                                             |                      +----------------------+          6
                                                          |
    +-----------------------------------------------------------------------------------------------------------------------+
                                                          |
        Celery                                            |
                                             +---------------------------+
                                             | Consumer   |              |
                                             |            |              |
                                             |            v              |
                                             |      on_task_received     |
                                             |                           |
                                             |                           |
                                             +---------------------------+
    
    

    手机如下:

    我们在上图中可以看到逻辑上分为 Kombu 和 Celery 两个范畴,消息先从 Kombu 开始,然后来到了 Celery。

    2.1 消息循环 -- hub in kombu

    我们首先从消息循环 hub 开始入手。

    在 kombu/asynchronous/hub.py 中有如下代码:

    可以看到,当 poll 有消息,就会调用 readers[fd] 配置的 cb这里的 td 就是 redis socket 对应的 fd

    简略版代码如下:

    def create_loop(self,
                    generator=generator, sleep=sleep, min=min, next=next,
                    Empty=Empty, StopIteration=StopIteration,
                    KeyError=KeyError, READ=READ, WRITE=WRITE, ERR=ERR):
        
        readers, writers = self.readers, self.writers
        poll = self.poller.poll
        
        while 1:
            if readers or writers:
                to_consolidate = []
                try:
                    events = poll(poll_timeout)
    
                for fd, event in events or ():
    
                    if event & READ:
                       cb, cbargs = readers[fd]
    
                    cb(*cbargs)
    

    2.2 Broker抽象 -- Transport in kombu

    readers[fd] 之中注册的是 Transport 类的 on_readable 回调函数,所以代码来到 Transport。

    其作用为调用 MultiChannelPoller 处理。

    代码位置为:kombu/transport/redis.py,这里的 cycle 就是 Transport。

    def on_readable(self, fileno):
        """Handle AIO event for one of our file descriptors."""
        self.cycle.on_readable(fileno)
    

    此时变量为:

    fileno = {int} 34
    
    self = {Transport} <kombu.transport.redis.Transport object at 0x7fcbaeeb6710>
    

    如下,逻辑跑到了 Transport:

    +-------------+         +---------------+
    |   hub       |         |   Transport   |
    |             | fileno  |               |
    |       cb +--------------> on_readable |
    |             |         |               |
    |      poll   |         |               |
    +-------------+         +---------------+
    

    2.3 执行引擎 --- MultiChannelPoller in kombu

    此时代码来到 MultiChannelPoller。由前面系列文章我们知道,MultiChannelPoller 的作用是把 Channel 和 Poll 联系起来。其作用为调用 poll fd 对应的 Channel 进一步处理

    从代码能看到,每一个 fd 对应一个 Channel,因为 poll 只是告诉 Celery 某个 fd 有消息,但是具体怎么读消息,还需要 Celery 进一步处理

    因为 Celery 任务 使用的是 redis BRPOP 操作实现,所以此时获取的是 BRPOP 对应的回调函数 _brpop_read。

    代码位置为:kombu/transport/redis.py。

    def on_readable(self, fileno):
        chan, type = self._fd_to_chan[fileno]
        if chan.qos.can_consume():
            chan.handlers[type]()
    

    此时变量如下,我们可以看到对应的各个逻辑部分:

    chan.handlers[type] = {method} <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>>
    
    chan = {Channel} <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>
    
    fileno = {int} 34
    
    self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7fcbaddfd048>
    
    type = {str} 'BRPOP'
    

    2.4 解读消息 -- Channel in kombu

    此时 代码来到 Channel。代码为:kombu/transport/redis.py。

    Channel 这部分的作用为调用 redis client进行读消息,对消息进行解读,从而提出其中的 queue(就是代码片段里面的 dest 变量),这样就知道应该哪个用户(即 queue 对应的用户)来处理消息。然后使用 self.connection._deliver 对消息进行相应分发

    具体 _brpop_read 代码如下:

    def _brpop_read(self, **options):
        try:
            try:
                dest__item = self.client.parse_response(self.client.connection,
                                                        'BRPOP',
                                                        **options)
            except self.connection_errors:
                # if there's a ConnectionError, disconnect so the next
                # iteration will reconnect automatically.
                self.client.connection.disconnect()
                raise
            if dest__item:
                dest, item = dest__item
                dest = bytes_to_str(dest).rsplit(self.sep, 1)[0]
                self._queue_cycle.rotate(dest)
                self.connection._deliver(loads(bytes_to_str(item)), dest) # 消息分发
                return True
            else:
                raise Empty()
        finally:
            self._in_poll = None
    

    此时变量为:

    dest = {str} 'celery'
    
    dest__item = {tuple: 2} 
     0 = {bytes: 6} b'celery'
     1 = {bytes: 861} b'{"body": "W1syLCAxN10sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "myTest.add", "id": "863cf9b2-
    
    item = b'{"body": "W1syLCAxN10sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "myTest.add", "id": "863cf9b2-
    
    self = {Channel} <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>
    

    2.5 开始回调 -- Transport in kombu

    代码回到 Transport。

    此时代码作用为调用 self._callbacks 的 回调函数 进行处理。可以看出来,这里记录了对于 queue 的 回调。

    _callback 为:<function Channel.basic_consume.._callback at 0x7fcbaef56d08>。

    而且可以看出来任务消息的具体格式和内容,比如 {'exchange': '', 'routing_key': 'celery'},从这里就能知道 对应的 queue 是什么。

    代码位置为:transport/virtual/base.py。

    def _deliver(self, message, queue):
        try:
            callback = self._callbacks[queue]
        except KeyError:
            logger.warning(W_NO_CONSUMERS, queue)
            self._reject_inbound_message(message)
        else:
            callback(message)
    

    变量如下,我们可以看到,Celery 此时的三个不同的回调就对应了三个不同功能。

    • celeryev.c755f81c-415e-478f-bb51-def341a96c0c 就是对应 Event处理;
    • celery@.celery.pidbox 就是对应 control;
    • celery 就是正常消息消费;
    self._callbacks = {dict: 3} 
     'celeryev.c755f81c-415e-478f-bb51-def341a96c0c' = {function} <function Channel.basic_consume.<locals>._callback at 0x7fcbaef23048>
     'celery@.celery.pidbox' = {function} <function Channel.basic_consume.<locals>._callback at 0x7fcbaef56488>
     'celery' = {function} <function Channel.basic_consume.<locals>._callback at 0x7fcbaef56d08>
    
    message = {dict: 5} {'body': 'W1syLCAxN10sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==', 'content-encoding': 'utf-8', 'content-type': 'application/json', 'headers': {'lang': 'py', 'task': 'myTest.add', 'id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'parent_id': None, 'argsrepr': '(2, 17)', 'kwargsrepr': '{}', 'origin': 'gen19806@ demini'}, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'reply_to': 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc'}}
    
    queue = {str} 'celery'
        
    self = {Transport} <kombu.transport.redis.Transport object at 0x7fcbaeeb6710>
    

    此时逻辑图如下:

    +-------------+        +-------------------+                                +-------------------------+
    | hub         |    1   | Transport         |                2               |MultiChannelPoller       |
    |             | fileno |                   |      cycle.on_readable(fileno) |                         |
    |       cb +--------------> on_readable   +-------------------------------------> _fd_to_chan[fileno] |
    |             |        |                   |                                |                         |
    |      poll   |        |                   +-<---------------+              |  chan.handlers[type]+------------+
    +-------------+        |  _callbacks[queue]|                 |              |                         |        |
                           |                   |                 |              +-------------------------+        |
                           |                   |                 |                                                 |
                           +-------------------+                 |                                                 |
                                                                 |                                                 |
                                                                 |              +-----------------+                |
                                                                 |              | Channel         |        3       |
                                                                 |              |                 |   _brpop_read  |
                                                                 |              |                 |                |
                                                                 +----------------+ connection    | <--------------+
                                                        _deliver(message, queue)|                 |
                                                                    4           |                 |
                                                                                +-----------------+
    
    

    手机如下:

    2.6 开始回调 -- Channel in kombu

    代码继续回调到 kombu/transport/virtual/base.py。

    就是 queue 的 回调函数 basic_consume。因为此时 channel 得到了 queue 对应的 redis 消息,所以 Channel 就需要调用这个 queue 对应的回调函数。就是 调用 Consumer 的回调函数

    def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
        """Consume from `queue`."""
        self._tag_to_queue[consumer_tag] = queue
        self._active_queues.append(queue)
    
        def _callback(raw_message):
            message = self.Message(raw_message, channel=self)
            if not no_ack:
                self.qos.append(message, message.delivery_tag)
            return callback(message)
    
        self.connection._callbacks[queue] = _callback
        self._consumers.add(consumer_tag)
    
        self._reset_cycle()
    

    此时 变量为:

    callback = {method} <bound method Consumer._receive_callback of <Consumer: [<Queue celery -> <Exchange celery(direct) bound to chan:1> -> celery bound to chan:1>]>>
        
    message = {Message} <Message object at 0x7fcbaef3eaf8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc', 'body_length': 82, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'}, 'd
    
    raw_message = {dict: 5} {'body': 'W1syLCAxN10sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==', 'content-encoding': 'utf-8', 'content-type': 'application/json', 'headers': {'lang': 'py', 'task': 'myTest.add', 'id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'parent_id': None, 'argsrepr': '(2, 17)', 'kwargsrepr': '{}', 'origin': 'gen19806@ demini'}, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'reply_to': 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc'}}
                                                                        
    self = {Channel} <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>
    

    此时逻辑图如下:

    +-------------+        +-------------------+                                +-------------------------+
    | hub         |    1   | Transport         |                2               |MultiChannelPoller       |
    |             | fileno |                   |      cycle.on_readable(fileno) |                         |
    |       cb +--------------> on_readable   +-------------------------------------> _fd_to_chan[fileno] |
    |             |        |                   |                                |                         |
    |      poll   |        |                   +-<---------------+              |  chan.handlers[type]+---------------+
    +-------------+        |  _callbacks[queue]|                 |              |                         |           |
                           |        +          |                 |              +-------------------------+           |
                           |        |          |                 |                                                    |
                           +-------------------+                 |                                                    |
                                    |                            |                                                    |
                                    |                            |              +-----------------------+             |
                                    |                            |              | Channel               |      3      |
                                    |                            |              |                       | _brpop_read |
                                    |                            |              |                       |             |
                                    |                            +----------------+ connection          +<------------+
                                    |                   _deliver(message, queue)|                       |
                                    |        5                      4           |                       |
                                    |     callback(message)                     |                       |
                                    +----------------------------------------------> callback(message)+--------------->
                                                                                +-----------------------+
    
    

    手机如下:

    2.7 调用回调 -- Consumer in kombu

    Kombu Consumer 回调的代码位于:kombu/messaging.py

    具体是调用 用户注册在 Kombu Consumer 的回调函数。注意的是: Kombu Comsumer 的用户就是 Celery,所以这里马上就调用到了 Celery 之前注册的回调函数。

    def _receive_callback(self, message):
        accept = self.accept
        on_m, channel, decoded = self.on_message, self.channel, None
        try:
            m2p = getattr(channel, 'message_to_python', None)
            if m2p:
                message = m2p(message)
            if accept is not None:
                message.accept = accept
            if message.errors:
                return message._reraise_error(self.on_decode_error)
            decoded = None if on_m else message.decode()
        except Exception as exc:
            if not self.on_decode_error:
                raise
            self.on_decode_error(message, exc)
        else:
            return on_m(message) if on_m else self.receive(decoded, message)
    

    变量为:

    on_m = {function} <function Consumer.create_task_handler.<locals>.on_task_received at 0x7fcbaef562f0>
    
    accept = {set: 1} {'application/json'}
    
    channel = {Channel} <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>
    
    m2p = {method} <bound method Channel.message_to_python of <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>>
    
    message = {Message} <Message object at 0x7fcbaef3eaf8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc', 'body_length': 82, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'}, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}}>
    
    self = {Consumer} <Consumer: [<Queue celery -> <Exchange celery(direct) bound to chan:1> -> celery bound to chan:1>]>
    

    具体逻辑如下:

     +-------------+        +-------------------+                                +-------------------------+
     | hub         |    1   | Transport         |                2               |MultiChannelPoller       |
     |             | fileno |                   |      cycle.on_readable(fileno) |                         |
     |       cb +--------------> on_readable   +-------------------------------------> _fd_to_chan[fileno] |
     |             |        |                   |                                |                         |
     |      poll   |        |                   +-<---------------+              |  chan.handlers[type]+---------------+
     +-------------+        |  _callbacks[queue]|                 |              |                         |           |
                            |        +          |                 |              +-------------------------+           |
                            |        |          |                 |                                                    |
                            +-------------------+                 |                                                    |
                                     |                            |                                                    |
                                     |                            |              +-----------------------+             |
                                     |                            |              | Channel               |      3      |
                                     |                            |              |                       | _brpop_read |
                                     |                            |              |                       |             |
                                     |                            +----------------+ connection          +<------------+
                                     |                   _deliver(message, queue)|                       |
                                     |        5                      4           |                       |
                                     |     callback(message)                     |                       |
                                     +----------------------------------------------> callback(message)+---------------+
                                                                                 +-----------------------+             |
                                                                                                                       |
                                                                                 +----------------------+              |
                                                                                 | Consumer             |              |
                                                                 on_m(message)   |                      |              |
                                                          +---------------------------+  on_message     | <------------+
                                                          |                      |                      |  _receive_callback
                                                          |                      +----------------------+          6
                                                          |
    +-----------------------------------------------------------------------------------------------------------------------+
                                                          |
                                                          v
    
    

    手机如下:

    2.8 来到 Celery 范畴 -- Consumer in Celery

    既然调用到了 Celery 之前注册的回调函数,我们实际就来到了 Celery 领域。

    2.8.1 配置回调

    需要回忆下 Celery 何时配置回调函数。

    在 celery/worker/loops.py 中有如下代码,这样就让consumer可以回调:

    def asynloop(obj, connection, consumer, blueprint, hub, qos,
                 heartbeat, clock, hbrate=2.0):
        """Non-blocking event loop."""
        consumer.on_message = on_task_received
    

    2.8.2 回调函数

    回调函数位于:celery/worker/consumer/consumer.py

    可以看到,create_task_handler 函数中,返回了on_task_received,这就是回调函数。

    def create_task_handler(self, promise=promise):
        strategies = self.strategies
        on_unknown_message = self.on_unknown_message
        on_unknown_task = self.on_unknown_task
        on_invalid_task = self.on_invalid_task
        callbacks = self.on_task_message
        call_soon = self.call_soon
    
        def on_task_received(message):
            # payload will only be set for v1 protocol, since v2
            # will defer deserializing the message body to the pool.
            payload = None
            try:
                type_ = message.headers['task']                # protocol v2
            except TypeError:
                return on_unknown_message(None, message)
            except KeyError:
                try:
                    payload = message.decode()
                except Exception as exc:  # pylint: disable=broad-except
                    return self.on_decode_error(message, exc)
                try:
                    type_, payload = payload['task'], payload  # protocol v1
                except (TypeError, KeyError):
                    return on_unknown_message(payload, message)
            try:
                strategy = strategies[type_]
            except KeyError as exc:
                return on_unknown_task(None, message, exc)
            else:
                try:
                    strategy(
                        message, payload,
                        promise(call_soon, (message.ack_log_error,)),
                        promise(call_soon, (message.reject_log_error,)),
                        callbacks,
                    )
                except (InvalidTaskError, ContentDisallowed) as exc:
                    return on_invalid_task(payload, message, exc)
                except DecodeError as exc:
                    return self.on_decode_error(message, exc)
    
        return on_task_received
    

    此时 变量为:

    call_soon = {method} <bound method Consumer.call_soon of <Consumer: celery@ demini (running)>>
    
    callbacks = {set: 0} set()
    
    message = {Message} <Message object at 0x7fcbaef3eaf8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc', 'body_length': 82, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'}, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}}>
    
    on_invalid_task = {method} <bound method Consumer.on_invalid_task of <Consumer: celery@ demini (running)>>
    
    on_unknown_message = {method} <bound method Consumer.on_unknown_message of <Consumer: celery@ demini (running)>>
    
    on_unknown_task = {method} <bound method Consumer.on_unknown_task of <Consumer: celery@ demini (running)>>
    
    self = {Consumer} <Consumer: celery@ demini (running)>
    
    strategies = {dict: 10} {'celery.chunks': <function default.<locals>.task_message_handler at 0x7fcbaef230d0>, 'celery.backend_cleanup': <function default.<locals>.task_message_handler at 0x7fcbaef23620>, 'celery.chord_unlock': <function default.<locals>.task_message_handler at 0x7fcbaef238c8>, 'celery.group': <function default.<locals>.task_message_handler at 0x7fcbaef23b70>, 'celery.map': <function default.<locals>.task_message_handler at 0x7fcbaef23e18>, 'celery.chain': <function default.<locals>.task_message_handler at 0x7fcbaef48158>, 'celery.starmap': <function default.<locals>.task_message_handler at 0x7fcbaef48400>, 'celery.chord': <function default.<locals>.task_message_handler at 0x7fcbaef486a8>, 'myTest.add': <function default.<locals>.task_message_handler at 0x7fcbaef48950>, 'celery.accumulate': <function default.<locals>.task_message_handler at 0x7fcbaef48bf8>}
    

    此时逻辑为:

     +-------------+        +-------------------+                                +-------------------------+
     | hub         |    1   | Transport         |                2               |MultiChannelPoller       |
     |             | fileno |                   |      cycle.on_readable(fileno) |                         |
     |       cb +--------------> on_readable   +-------------------------------------> _fd_to_chan[fileno] |
     |             |        |                   |                                |                         |
     |      poll   |        |                   +-<---------------+              |  chan.handlers[type]+---------------+
     +-------------+        |  _callbacks[queue]|                 |              |                         |           |
                            |        +          |                 |              +-------------------------+           |
                            |        |          |                 |                                                    |
                            +-------------------+                 |                                                    |
                                     |                            |                                                    |
                                     |                            |              +-----------------------+             |
                                     |                            |              | Channel               |      3      |
                                     |                            |              |                       | _brpop_read |
                                     |                            |              |                       |             |
                                     |                            +----------------+ connection          +<------------+
                                     |                   _deliver(message, queue)|                       |
                                     |        5                      4           |                       |
                                     |     callback(message)                     |                       |
                                     +----------------------------------------------> callback(message)+---------------+
                                                                                 +-----------------------+             |
                                                                                                                       |
                                                                                 +----------------------+              |
                                                                                 | Consumer             |              |
                                                                 on_m(message)   |                      |              |
                                                          +---------------------------+  on_message     | <------------+
                                                          |                      |                      |  _receive_callback
        kombu                                             |                      +----------------------+          6
                                                          |
    +-----------------------------------------------------------------------------------------------------------------------+
                                                          |
        Celery                                            |
                                             +---------------------------+
                                             | Consumer   |              |
                                             |            |              |
                                             |            v              |
                                             |      on_task_received     |
                                             |                           |
                                             |                           |
                                             +---------------------------+
    
    

    手机如下:

    0x03 逻辑 in Celery

    至此,我们开始在 Celery 之中活动。

    3.1 逻辑入口 --- consumer in Celery

    首先来到了 Celery 的 Consumer 组件,这里从概念上说是消费的逻辑入口

    Celery Consumer 的代码位于:celery/worker/consumer/consumer.py,其作用如下:

    • 解析 message,从 header 中拿到 task 名字,比如 'myTest.add';

    • 根据 task 名字,获得 strategy;

    • 调用 strategy;

    代码为:

    def create_task_handler(self, promise=promise):
        strategies = self.strategies
        on_unknown_message = self.on_unknown_message
        on_unknown_task = self.on_unknown_task
        on_invalid_task = self.on_invalid_task
        callbacks = self.on_task_message
        call_soon = self.call_soon
    
        def on_task_received(message):
            # payload will only be set for v1 protocol, since v2
            # will defer deserializing the message body to the pool.
            payload = None
            try:
                type_ = message.headers['task']                # protocol v2
            except TypeError:
                return on_unknown_message(None, message)
            except KeyError:
                try:
                    payload = message.decode()
                except Exception as exc:  # pylint: disable=broad-except
                    return self.on_decode_error(message, exc)
                try:
                    type_, payload = payload['task'], payload  # protocol v1
                except (TypeError, KeyError):
                    return on_unknown_message(payload, message)
            try:
                strategy = strategies[type_]
            except KeyError as exc:
                return on_unknown_task(None, message, exc)
            else:
                try:
                    strategy(
                        message, payload,
                        promise(call_soon, (message.ack_log_error,)),
                        promise(call_soon, (message.reject_log_error,)),
                        callbacks,
                    )
                except (InvalidTaskError, ContentDisallowed) as exc:
                    return on_invalid_task(payload, message, exc)
                except DecodeError as exc:
                    return self.on_decode_error(message, exc)
    
        return on_task_received
    

    变量为:

    self.app.tasks = {TaskRegistry: 10} {'celery.chunks': <@task: celery.chunks of myTest at 0x7fcbade229e8>, 'celery.backend_cleanup': <@task: celery.backend_cleanup of myTest at 0x7fcbade229e8>, 'celery.chord_unlock': <@task: celery.chord_unlock of myTest at 0x7fcbade229e8>, 'celery.group': <@
    
                                         call_soon = {method} <bound method Consumer.call_soon of <Consumer: celery@ demini (running)>>
                                         
    callbacks = {set: 0} set()
                                         
    message = {Message} <Message object at 0x7fcbaef3eaf8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc', 'body_length': 82, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'}, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}}>
                                         
    on_invalid_task = {method} <bound method Consumer.on_invalid_task of <Consumer: celery@ demini (running)>>
                                         
    on_unknown_message = {method} <bound method Consumer.on_unknown_message of <Consumer: celery@ demini (running)>>
                                         
    on_unknown_task = {method} <bound method Consumer.on_unknown_task of <Consumer: celery@ demini (running)>>    
                                         
    self = {Consumer} <Consumer: celery@ demini (running)>
                                         
    strategies = {dict: 10} 
    {'celery.chunks': <function default.<locals>.task_message_handler at 0x7fcbaef230d0>, 'celery.backend_cleanup': <function default.<locals>.task_message_handler at 0x7fcbaef23620>, 'celery.chord_unlock': <function default.<locals>.task_message_handler at 0x7fcbaef238c8>, 'celery.group': <function default.<locals>.task_message_handler at 0x7fcbaef23b70>, 'celery.map': <function default.<locals>.task_message_handler at 0x7fcbaef23e18>, 'celery.chain': <function default.<locals>.task_message_handler at 0x7fcbaef48158>, 'celery.starmap': <function default.<locals>.task_message_handler at 0x7fcbaef48400>, 'celery.chord': <function default.<locals>.task_message_handler at 0x7fcbaef486a8>, 'myTest.add': <function default.<locals>.task_message_handler at 0x7fcbaef48950>, 'celery.accumulate': <function default.<locals>.task_message_handler at 0x7fcbaef48bf8>}
    

    3.1.1 解析 message

    通过 如下代码获得需要对应哪个 task,这里就为 'myTest.add'。

    type_ = message.headers['task']
    

    message.headers如下,我们可以看出来定义一个 message 都需要考虑哪些方面。

    message.headers = {dict: 15} 
     'lang' = {str} 'py'
     'task' = {str} 'myTest.add'
     'id' = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'
     'shadow' = {NoneType} None
     'eta' = {NoneType} None
     'expires' = {NoneType} None
     'group' = {NoneType} None
     'group_index' = {NoneType} None
     'retries' = {int} 0
     'timelimit' = {list: 2} [None, None]
     'root_id' = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'
     'parent_id' = {NoneType} None
     'argsrepr' = {str} '(2, 17)'
     'kwargsrepr' = {str} '{}'
     'origin' = {str} 'gen19806@ demini'
     __len__ = {int} 15
    

    3.1.2 获得 strategy

    依据 task,这里就为 'myTest.add',从 strategies 获得对应的回调 function,回调 function就是开始处理 任务消息。

    strategies = {dict: 10} 
     'celery.chunks' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef230d0>
     'celery.backend_cleanup' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef23620>
     'celery.chord_unlock' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef238c8>
     'celery.group' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef23b70>
     'celery.map' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef23e18>
     'celery.chain' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef48158>
     'celery.starmap' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef48400>
     'celery.chord' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef486a8>
     'myTest.add' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef48950>
     'celery.accumulate' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef48bf8>
     __len__ = {int} 10
    

    3.1.3 调用 strategy

    既然得到 strategy,比如:

    <function default.<locals>.task_message_handler at 0x7fcbaef48950>

    因此会调用这个函数,具体调用如下:

    strategy(
        message, payload,
        promise(call_soon, (message.ack_log_error,)),
        promise(call_soon, (message.reject_log_error,)),
        callbacks,
    )
    

    3.2 策略 --- strategy

    Strategy 的作用是在 Consumer 和 Worker 之间做一个中间层,用来根据不同条件做不同的处理,也就是策略的本意。

    3.2.1 逻辑 in strategy

    代码为:celery/worker/strategy.py,

    功能具体就是:

    • 进一步解析消息;
    • 根据消息构建内部的Req;
    • 如果需要发送,则发送 'task-received'’;
    • 进行时间 eta 处理;
    • 进行qos 和 limit 处理;
    • 调用 Req ,即来到 Worker;

    具体如下:

    def task_message_handler(message, body, ack, reject, callbacks,
                             to_timestamp=to_timestamp):
        if body is None and 'args' not in message.payload:
            body, headers, decoded, utc = (
                message.body, message.headers, False, app.uses_utc_timezone(),
            )
        else:
            if 'args' in message.payload:
                body, headers, decoded, utc = hybrid_to_proto2(message,
                                                               message.payload)
            else:
                body, headers, decoded, utc = proto1_to_proto2(message, body)
    
        req = Req(
            message,
            on_ack=ack, on_reject=reject, app=app, hostname=hostname,
            eventer=eventer, task=task, connection_errors=connection_errors,
            body=body, headers=headers, decoded=decoded, utc=utc,
        )
      
        if (req.expires or req.id in revoked_tasks) and req.revoked():
            return
    
        signals.task_received.send(sender=consumer, request=req)
    
        if task_sends_events:
            send_event(
                'task-received',
                uuid=req.id, name=req.name,
                args=req.argsrepr, kwargs=req.kwargsrepr,
                root_id=req.root_id, parent_id=req.parent_id,
                retries=req.request_dict.get('retries', 0),
                eta=req.eta and req.eta.isoformat(),
                expires=req.expires and req.expires.isoformat(),
            )
    
        bucket = None
        eta = None
        if req.eta:
            try:
                if req.utc:
                    eta = to_timestamp(to_system_tz(req.eta))
                else:
                    eta = to_timestamp(req.eta, app.timezone)
            except (OverflowError, ValueError) as exc:
                error("Couldn't convert ETA %r to timestamp: %r. Task: %r",
                      req.eta, exc, req.info(safe=True), exc_info=True)
                req.reject(requeue=False)
        if rate_limits_enabled:
            bucket = get_bucket(task.name)
    
        if eta and bucket:
            consumer.qos.increment_eventually()
            return call_at(eta, limit_post_eta, (req, bucket, 1),
                           priority=6)
        if eta:
            consumer.qos.increment_eventually()
            call_at(eta, apply_eta_task, (req,), priority=6)
            return task_message_handler
        if bucket:
            return limit_task(req, bucket, 1)
    
        task_reserved(req)
        if callbacks:
            [callback(req) for callback in callbacks]
            
        handle(req) # 在这里
        
    return task_message_handler
    

    具体还要看看细节。

    3.2.2 获得实例

    Strategy 中,以下目的是为了 根据 task 实例 构建一个 Request,从而把 broker 消息,consumer,多进程都联系起来

    具体可以看到 Request. execute_using_pool 这里就会和多进程处理开始关联,比如和 comsumer 的 pool 进程池联系起来。

    Req = create_request_cls(Request, task, consumer.pool, hostname, eventer)
    

    task 实例为:

    myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f]  
    

    获得Requst代码为:

    def create_request_cls(base, task, pool, hostname, eventer,
                           ref=ref, revoked_tasks=revoked_tasks,
                           task_ready=task_ready, trace=trace_task_ret):
        default_time_limit = task.time_limit
        default_soft_time_limit = task.soft_time_limit
        apply_async = pool.apply_async
        acks_late = task.acks_late
        events = eventer and eventer.enabled
    
        class Request(base):
    
            def execute_using_pool(self, pool, **kwargs):
                task_id = self.task_id
                if (self.expires or task_id in revoked_tasks) and self.revoked():
                    raise TaskRevokedError(task_id)
    
                time_limit, soft_time_limit = self.time_limits
                result = apply_async(
                    trace,
                    args=(self.type, task_id, self.request_dict, self.body,
                          self.content_type, self.content_encoding),
                    accept_callback=self.on_accepted,
                    timeout_callback=self.on_timeout,
                    callback=self.on_success,
                    error_callback=self.on_failure,
                    soft_timeout=soft_time_limit or default_soft_time_limit,
                    timeout=time_limit or default_time_limit,
                    correlation_id=task_id,
                )
                # cannot create weakref to None
                # pylint: disable=attribute-defined-outside-init
                self._apply_result = maybe(ref, result)
                return result
    
            def on_success(self, failed__retval__runtime, **kwargs):
                failed, retval, runtime = failed__retval__runtime
                if failed:
                    if isinstance(retval.exception, (
                            SystemExit, KeyboardInterrupt)):
                        raise retval.exception
                    return self.on_failure(retval, return_ok=True)
                task_ready(self)
    
                if acks_late:
                    self.acknowledge()
    
                if events:
                    self.send_event(
                        'task-succeeded', result=retval, runtime=runtime,
                    )
    
        return Request
    

    此时逻辑如下:

                             +
      Consumer               |
                     message |
                             v         strategy  +------------------------------------+
                +------------+------+            | strategies                         |
                | on_task_received  | <--------+ |                                    |
                |                   |            |[myTest.add : task_message_handler] |
                +------------+------+            +------------------------------------+
                             |
                             |
    +---------------------------------------------------------------------------------------+
                             |
     strategy                |
                             |
                             v                Request [myTest.add]
                +------------+-------------+                       +---------------------+
                | task_message_handler     | <-------------------+ | create_request_cls  |
                |                          |                       |                     |
                +--------------------------+                       +---------------------+
    
    

    3.2.3 调用实例

    task_message_handler 最终调用 handle(req),就是开始调用实例。

    handle 函数实际对应了 WorkController._process_task_sem。

    代码如下:

    def task_message_handler(message, body, ack, reject, callbacks,
                                 to_timestamp=to_timestamp):
    
            req = Req(
                message,
                on_ack=ack, on_reject=reject, app=app, hostname=hostname,
                eventer=eventer, task=task, connection_errors=connection_errors,
                body=body, headers=headers, decoded=decoded, utc=utc,
            )
    
            task_reserved(req)
            
            if callbacks:
                [callback(req) for callback in callbacks]
                
            handle(req)
            
        return task_message_handler
    
    

    Request 为:

    req = {Request} myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f]  
     acknowledged = {bool} False
     app = {Celery} <Celery myTest at 0x7fcbade229e8>
     args = {list: 2} [2, 17]
     argsrepr = {str} '(2, 17)'
     body = {bytes: 82} b'[[2, 17], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]'
     chord = {NoneType} None
     connection_errors = {tuple: 8} (<class 'amqp.exceptions.ConnectionError'>, <class 'kombu.exceptions.InconsistencyError'>, <class 'OSError'>, <class 'OSError'>, <class 'OSError'>, <class 'redis.exceptions.ConnectionError'>, <class 'redis.exceptions.AuthenticationError'>, <class 'redis.exceptions.TimeoutError'>)
     content_encoding = {str} 'utf-8'
     content_type = {str} 'application/json'
     correlation_id = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'
     delivery_info = {dict: 4} {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}
     errbacks = {NoneType} None
     eta = {NoneType} None
     eventer = {EventDispatcher} <celery.events.dispatcher.EventDispatcher object at 0x7fcbaeef31d0>
     expires = {NoneType} None
     group = {NoneType} None
     group_index = {NoneType} None
     hostname = {str} 'celery@ demini'
     id = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'
     kwargs = {dict: 0} {}
     kwargsrepr = {str} '{}'
     message = {Message} <Message object at 0x7fcbaef3eaf8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc', 'body_length': 82, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'}, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}}>
     name = {str} 'myTest.add'
     on_ack = {promise} <promise@0x7fcbaeecc210 --> <bound method Consumer.call_soon of <Consumer: celery@ demini (running)>>>
     on_reject = {promise} <promise@0x7fcbaeeccf20 --> <bound method Consumer.call_soon of <Consumer: celery@ demini (running)>>>
     parent_id = {NoneType} None
     reply_to = {str} 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93'
     request_dict = {dict: 25} {'lang': 'py', 'task': 'myTest.add', 'id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'parent_id': None, 'argsrepr': '(2, 17)', 'kwargsrepr': '{}', 'origin': 'gen19806@ demini', 'reply_to': 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93', 'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'hostname': 'celery@ demini', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 17], 'kwargs': {}, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None}
     root_id = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'
     store_errors = {bool} True
     task = {add} <@task: myTest.add of myTest at 0x7fcbade229e8>
     task_id = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'
     task_name = {str} 'myTest.add'
     time_limits = {list: 2} [None, None]
     time_start = {NoneType} None
     type = {str} 'myTest.add'
     tzlocal = {NoneType} None
     utc = {bool} True
     worker_pid = {NoneType} None
    

    handle 为:

    handle = {method} <bound method WorkController._process_task_sem of <Worker: celery@ demini (running)>>
    headers = {dict: 25} {'lang': 'py', 'task': 'myTest.add', 'id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'parent_id': None, 'argsrepr': '(2, 17)', 'kwargsrepr': '{}', 'origin': 'gen19806@ demini', 'reply_to': 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93', 'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'hostname': 'celery@ demini', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 17], 'kwargs': {}, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None}
    

    此时逻辑如下:

                             +
      Consumer               |
                     message |
                             v         strategy  +------------------------------------+
                +------------+------+            | strategies                         |
                | on_task_received  | <--------+ |                                    |
                |                   |            |[myTest.add : task_message_handler] |
                +------------+------+            +------------------------------------+
                             |
                             |
     +------------------------------------------------------------------------------------+
     strategy                |
                             |
                             |
                             v                Request [myTest.add]
                +------------+-------------+                       +---------------------+
                | task_message_handler     | <-------------------+ | create_request_cls  |
                |                          |                       |                     |
                +------------+-------------+                       +---------------------+
                             | _process_task_sem
                             | 
    +--------------------------------------------------------------------------------------+
     Worker                  | req[{Request} myTest.add]
                             v
                    +--------+-------+
                    | WorkController |
                    +----------------+
    
    

    手机如下:

    3.3 打工人 -- Worker in Celery

    程序来到了Worker in Celery。Worker 是具体执行 task 的地方

    代码位于:celery/worker/worker.py

    可以看到,就是:

    • _process_task_sem 调用了 _process_task;
    • _process_task 调用了req.execute_using_pool(self.pool);

    具体如下:

    class WorkController:
        """Unmanaged worker instance."""
    
        def register_with_event_loop(self, hub):
            self.blueprint.send_all(
                self, 'register_with_event_loop', args=(hub,),
                description='hub.register',
            )
    
        def _process_task_sem(self, req):
            return self._quick_acquire(self._process_task, req)
    
        def _process_task(self, req):
            """Process task by sending it to the pool of workers."""
            try:
                req.execute_using_pool(self.pool)
    

    变量为:

    req = {Request} myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f]  
    
    self = {Worker} celery
    

    3.3.1 Request in Celery

    程序来到了Worker in Celery。代码位于:celery/worker/request.py

    因为有:

    apply_async = pool.apply_async
    

    所以调用到:pool.apply_async

    变量为:

    apply_async = {method} <bound method BasePool.apply_async of <celery.concurrency.prefork.TaskPool object at 0x7fcbaddfa2e8>>
    
    pool = {TaskPool} <celery.concurrency.prefork.TaskPool object at 0x7fcbaddfa2e8>
    
    revoked_tasks = {LimitedSet: 0} <LimitedSet(0): maxlen=50000, expires=10800, minlen=0>
        
    self = {Request} myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f]  
    

    代码为:

    class Request(base):
    
        def execute_using_pool(self, pool, **kwargs):
            task_id = self.task_id# 获取任务id
            if (self.expires or task_id in revoked_tasks) and self.revoked():# 检查是否过期或者是否已经执行过
                raise TaskRevokedError(task_id)
    
            time_limit, soft_time_limit = self.time_limits# 获取时间
            result = apply_async(# 执行对应的func并返回结果
                trace,
                args=(self.type, task_id, self.request_dict, self.body,
                      self.content_type, self.content_encoding),
                accept_callback=self.on_accepted,
                timeout_callback=self.on_timeout,
                callback=self.on_success,
                error_callback=self.on_failure,
                soft_timeout=soft_time_limit or default_soft_time_limit,
                timeout=time_limit or default_time_limit,
                correlation_id=task_id,
            )
            # cannot create weakref to None
            # pylint: disable=attribute-defined-outside-init
            self._apply_result = maybe(ref, result)
            return result
    

    此时逻辑为:

                             +
      Consumer               |
                     message |
                             v         strategy  +------------------------------------+
                +------------+------+            | strategies                         |
                | on_task_received  | <--------+ |                                    |
                |                   |            |[myTest.add : task_message_handler] |
                +------------+------+            +------------------------------------+
                             |
                             |
     +------------------------------------------------------------------------------------+
     strategy                |
                             |
                             |
                             v                Request [myTest.add]
                +------------+-------------+                       +---------------------+
                | task_message_handler     | <-------------------+ | create_request_cls  |
                |                          |                       |                     |
                +------------+-------------+                       +---------------------+
                             | _process_task_sem
                             |
    +--------------------------------------------------------------------------------------+
     Worker                  | req[{Request} myTest.add]
                             v
                    +--------+-----------+
                    | WorkController     |
                    |                    |
                    |            pool +-------------------------+
                    +--------+-----------+                      |
                             |                                  |
                             |               apply_async        v
                 +-----------+----------+                   +---+-------+
                 |{Request} myTest.add  | +---------------> | TaskPool  |
                 +----------------------+                   +-----------+
                                            myTest.add
    
    

    手机如下:

    3.3.2 BasePool in Celery

    apply_async 代码来到了Celery 的 Pool,注意,这 还不是 多进程的具体实现,只是来到了多进程实现的入口

    此时就把 任务信息具体传递给了Pool,比如:

    args = {tuple: 6} 
     0 = {str} 'myTest.add'
     1 = {str} 'af6ed084-efc6-4608-a13a-d3065f457cd5'
     2 = {dict: 21} {'lang': 'py', 'task': 'myTest.add', 'id': 'af6ed084-efc6-4608-a13a-d3065f457cd5', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'af6ed084-efc6-4608-a13a-d3065f457cd5', 'parent_id': None, 'argsrepr': '(2, 8)', 'kwargsrepr': '{}', 'origin': 'gen1100@DESKTOP-0GO3RPO', 'reply_to': 'afb85541-d08c-3191-b89d-918e15f9e0bf', 'correlation_id': 'af6ed084-efc6-4608-a13a-d3065f457cd5', 'hostname': 'celery@DESKTOP-0GO3RPO', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 8], 'kwargs': {}}
     3 = {bytes: 81} b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]'
     4 = {str} 'application/json'
     5 = {str} 'utf-8'
    

    文件位于:celery/concurrency/base.py,具体为:

    class BasePool:
        """Task pool."""
    
        def apply_async(self, target, args=None, kwargs=None, **options):
            """Equivalent of the :func:`apply` built-in function.
    
            Callbacks should optimally return as soon as possible since
            otherwise the thread which handles the result will get blocked.
            """
            kwargs = {} if not kwargs else kwargs
            args = [] if not args else args
    
            return self.on_apply(target, args, kwargs,
                                 waitforslot=self.putlocks,
                                 callbacks_propagate=self.callbacks_propagate,
                                 **options)
    

    此时变量为:

    options = {dict: 7} {'accept_callback': <bound method Request.on_accepted of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>>, 'timeout_callback': <bound method Request.on_timeout of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>
    
    self = {TaskPool} <celery.concurrency.prefork.TaskPool object at 0x7fcbaddfa2e8>
    

    3.3.3 AsynPool in Celery

    apply_async 代码位于:celery/billiard/pool.py。

    这里在 __init__ 之中,self._initargs = initargs 就是 (<Celery myTest at 0x2663db3fe48>, 'celery@DESKTOP-0GO3RPO')这样就把 Celery 应用传递了进来

    这里依据操作系统的而不同,会调用 self._taskqueue.put 或者 self._quick_put 来给 多进程 pool 发送任务消息。

    def apply_async(self, func, args=(), kwds={},
                    callback=None, error_callback=None, accept_callback=None,
                    timeout_callback=None, waitforslot=None,
                    soft_timeout=None, timeout=None, lost_worker_timeout=None,
                    callbacks_propagate=(),
                    correlation_id=None):
        '''
        Asynchronous equivalent of `apply()` method.
    
        Callback is called when the functions return value is ready.
        The accept callback is called when the job is accepted to be executed.
    
        Simplified the flow is like this:
    
            >>> def apply_async(func, args, kwds, callback, accept_callback):
            ...     if accept_callback:
            ...         accept_callback()
            ...     retval = func(*args, **kwds)
            ...     if callback:
            ...         callback(retval)
    
        '''
    
        if self._state == RUN:
            waitforslot = self.putlocks if waitforslot is None else waitforslot
            if waitforslot and self._putlock is not None:
                self._putlock.acquire()
            result = ApplyResult(
                self._cache, callback, accept_callback, timeout_callback,
                error_callback, soft_timeout, timeout, lost_worker_timeout,
                on_timeout_set=self.on_timeout_set,
                on_timeout_cancel=self.on_timeout_cancel,
                callbacks_propagate=callbacks_propagate,
                send_ack=self.send_ack if self.synack else None,
                correlation_id=correlation_id,
            )
            if timeout or soft_timeout:
                # start the timeout handler thread when required.
                self._start_timeout_handler()
            if self.threads:
                self._taskqueue.put(([(TASK, (result._job, None,
                                    func, args, kwds))], None))
            else:
                self._quick_put((TASK, (result._job, None, func, args, kwds)))
            return result
    

    变量为:

    accept_callback = {method} <bound method Request.on_accepted of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>>
    args = {tuple: 6} ('myTest.add', '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', {'lang': 'py', 'task': 'myTest.add', 'id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'parent_id': None, 'argsrepr': '(2, 17)', 'kwargsrepr': '{}', 'origin': 'gen19806@ demini', 'reply_to': 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93', 'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'hostname': 'celery@ demini', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 17], 'kwargs': {}, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None}, b'[[2, 17], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8')
    
    callback = {method} <bound method create_request_cls.<locals>.Request.on_success of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>>
    
    error_callback = {method} <bound method Request.on_failure of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>>
    
    self = {AsynPool} <celery.concurrency.asynpool.AsynPool object at 0x7fcbaee2ea20>
    
    timeout_callback = {method} <bound method Request.on_timeout of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>>
    waitforslot = {bool} False
    
    3.3.3.1 部分变量事先设置

    我们首先说说之前的部分变量设置。

    比如如下代码中有:

    inq, outq, synq = self.get_process_queues() 和 self._process_register_queues(w, (inq, outq, synq)) 就是具体设置父进程和子进程之前的管道。

    def _create_worker_process(self, i):
        sentinel = self._ctx.Event() if self.allow_restart else None
        
        inq, outq, synq = self.get_process_queues()
        on_ready_counter = self._ctx.Value('i')
        
        w = self.WorkerProcess(self.Worker(
            inq, outq, synq, self._initializer, self._initargs,
            self._maxtasksperchild, sentinel, self._on_process_exit,
            # Need to handle all signals if using the ipc semaphore,
            # to make sure the semaphore is released.
            sigprotection=self.threads,
            wrap_exception=self._wrap_exception,
            max_memory_per_child=self._max_memory_per_child,
            on_ready_counter=on_ready_counter,
        ))
        self._pool.append(w)
        self._process_register_queues(w, (inq, outq, synq))
        w.name = w.name.replace('Process', 'PoolWorker')
        w.daemon = True
        w.index = i
        w.start()
        self._poolctrl[w.pid] = sentinel
        self._on_ready_counters[w.pid] = on_ready_counter
        if self.on_process_up:
            self.on_process_up(w)
        return w
    

    比如下面是管道的建立。

    def _setup_queues(self):
        self._inqueue = self._ctx.SimpleQueue()
        self._outqueue = self._ctx.SimpleQueue()
        self._quick_put = self._inqueue._writer.send
        self._quick_get = self._outqueue._reader.recv
    

    以及管道相关文件的确立。

    def _create_write_handlers(self, hub,
                               pack=pack, dumps=_pickle.dumps,
                               protocol=HIGHEST_PROTOCOL):
        """Create handlers used to write data to child processes."""
        fileno_to_inq = self._fileno_to_inq
        fileno_to_synq = self._fileno_to_synq
        outbound = self.outbound_buffer
        pop_message = outbound.popleft
        put_message = outbound.append
    

    所以最终预置变量具体如下:

    self._taskqueue = {Queue} <queue.Queue object at 0x7fcbaee57b00>
    self._quick_put = {function} <function AsynPool._create_write_handlers.<locals>.send_job at 0x7fcbaef569d8>
    self._outqueue = {NoneType} None
    self._inqueue = {NoneType} None
    self._fileno_to_synq = {dict: 1} {None: <ForkProcess(ForkPoolWorker-4, started daemon)>}
    self._quick_get = {NoneType} None
    self._fileno_to_inq = {dict: 0} {}
    self.outbound_buffer = {deque: 1} deque([<%s: 0 ack:False ready:False>])
    
    
    self = {Pool} <billiard.pool.Pool object at 0x000002663FD6E948>
     ResultHandler = {type} <class 'billiard.pool.ResultHandler'>
     SoftTimeLimitExceeded = {type} <class 'billiard.exceptions.SoftTimeLimitExceeded'>
     Supervisor = {type} <class 'billiard.pool.Supervisor'>
     TaskHandler = {type} <class 'billiard.pool.TaskHandler'>
     TimeoutHandler = {type} <class 'billiard.pool.TimeoutHandler'>
     Worker = {type} <class 'billiard.pool.Worker'>
    
    3.3.3.2 发送给子进程

    在 windows 就是

    if self.threads:    self._taskqueue.put(([(TASK, (result._job, None,                        func, args, kwds))], None))
    

    *nix 就是:这里建立了job,并且发送。就是通过 put_message(job) 往子进程 pipe发消息。

    def send_job(tup):
        # Schedule writing job request for when one of the process
        # inqueues are writable.
        body = dumps(tup, protocol=protocol)
        body_size = len(body)
        header = pack('>I', body_size)
        # index 1,0 is the job ID.
        job = get_job(tup[1][0])
        job._payload = buf_t(header), buf_t(body), body_size
        put_message(job)
        
    self._quick_put = send_job
    

    此时逻辑为:

                               +
        Consumer               |
                       message |
                               v         strategy  +------------------------------------+
                  +------------+------+            | strategies                         |
                  | on_task_received  | <--------+ |                                    |
                  |                   |            |[myTest.add : task_message_handler] |
                  +------------+------+            +------------------------------------+
                               |
                               |
       +------------------------------------------------------------------------------------+
       strategy                |
                               |
                               |
                               v                Request [myTest.add]
                  +------------+-------------+                       +---------------------+
                  | task_message_handler     | <-------------------+ | create_request_cls  |
                  |                          |                       |                     |
                  +------------+-------------+                       +---------------------+
                               | _process_task_sem
                               |
      +------------------------------------------------------------------------------------+
       Worker                  | req[{Request} myTest.add]
                               v
                      +--------+-----------+
                      | WorkController     |
                      |                    |
                      |            pool +-------------------------+
                      +--------+-----------+                      |
                               |                                  |
                               |               apply_async        v
                   +-----------+----------+                   +---+-------------------+
                   |{Request} myTest.add  | +---------------> | TaskPool              |
                   +----------------------+                   +----+------------------+
                                              myTest.add           |
                                                                   |
    +--------------------------------------------------------------------------------------+
                                                                   |
                                                                   v
                                                              +----+------------------+
                                                              | billiard.pool.Pool    |
                                                              +-------+---------------+
                                                                      |
                                                                      |
     Pool              +---------------------------+                  |
                       | TaskHandler               |                  |
                       |                           |                  |  self._taskqueue.put
                       |              _taskqueue   |  <---------------+
                       |                           |
                       +------------+--------------+
                                    |
                                    |  put(task)
                                    |
    +--------------------------------------------------------------------------------------+
                                    |
     Sub process                    |
                                    v
                                                       
    
    

    手机如下:

    于是从下文开始,我们正式进入多进程是如何处理消息的。

    0xEE 本系列文章

    本系列目前文章如下:

    [源码分析] 消息队列 Kombu 之 mailbox

    [源码分析] 消息队列 Kombu 之 Hub

    [源码分析] 消息队列 Kombu 之 Consumer

    [源码分析] 消息队列 Kombu 之 Producer

    [源码分析] 消息队列 Kombu 之 启动过程

    [源码解析] 消息队列 Kombu 之 基本架构

    [源码解析] 并行分布式框架 Celery 之架构 (1)

    [源码解析] 并行分布式框架 Celery 之架构 (2)

    [源码解析] 并行分布式框架 Celery 之 worker 启动 (1)

    [源码解析] 并行分布式框架 Celery 之 worker 启动 (2)

    [源码解析] 分布式任务队列 Celery 之启动 Consumer

    [源码解析] 并行分布式任务队列 Celery 之 Task是什么

    0xFF 参考

    1: Worker 启动流程概述

    2: Worker 的执行引擎

    3: Task 对象的实现

    4: 定时任务的实现

    5: 远程控制管理

    6: Events 的实现

    7: Worker 之间的交互

    Celery 源码解析一:Worker 启动流程概述

    Celery 源码解析二:Worker 的执行引擎

    Celery 源码解析三:Task 对象的实现

    Celery 源码解析四:定时任务的实现

    Celery 源码解析五:远程控制管理

    Celery 源码解析六:Events 的实现

  • 相关阅读:
    s2sh的MVC执行流程和执行原理
    码支付-个人支付接口-个人收款
    notepad++ FTP同步插件【FanFtpTools】2.0
    自己做的notepad++ FTP同步插件【FanFtpTools】
    thinkphp高并发抢购代码测试-解决高并发下的超卖问题!
    thinkphp5.0 与thinkphp5.1 db()函数的区别
    thinkphp5.1 长连接-单例模式测试之二
    thinkphp5.1长连接-单例模式测试!
    php 全文搜索引擎-讯搜使用
    在线WEB开发编辑器,edt.df5d.com
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/14639556.html
Copyright © 2011-2022 走看看