zoukankan      html  css  js  c++  java
  • [源码解析] 并行分布式任务队列 Celery 之 消费动态流程

    [源码解析] 并行分布式任务队列 Celery 之 消费动态流程

    0x00 摘要

    Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。

    经过多篇文章之后(在文末有链接),我们介绍了 Celery 如何启动,也介绍了 Task。本文我们就看看收到一个任务之后,Celery(包括 Kombu)内部的消费流程脉络(到多进程之前)。

    目的 是 做一个暂时性总结,梳理目前思路,为下一阶段分析多进程做准备。

    因为是具体流程梳理,所以会涉及到比较多的堆栈信息和运行时变量,希望大家理解。

    0x01 来由

    之前在分析celery的worker的启动过程中,我们提到了,Celery 最后开启了loop等待任务来消费,启动时候定义的回调函数就是 on_task_received,缩减版堆栈如下。

    on_task_received, consumer.py:542
    _receive_callback, messaging.py:620
    _callback, base.py:630
    _deliver, base.py:980
    _brpop_read, redis.py:748
    on_readable, redis.py:358
    create_loop, hub.py:361
    asynloop, loops.py:81
    start, consumer.py:592
    start, bootsteps.py:116
    start, consumer.py:311
    start, bootsteps.py:365
    start, worker.py:204
    worker, worker.py:327
    caller, base.py:132
    new_func, decorators.py:21
    invoke, core.py:610
    main, core.py:782
    start, base.py:358
    worker_main, base.py:374
    

    我们可以大致看出一个逻辑流程:

    • 在 Kombu 范畴是:
      • 在消息循环(hub) 中获取了消息;
      • 经由 Broker抽象(Transport)和 执行引擎(MultiChannelPoller)处理之后,把消息解读出来;
      • 开始调用 Celery 的回调函数;
    • 在 Celery 范畴是:
      • 经由回调函数这个逻辑入口开始,先调用到了 Strategy 根据不同条件做不同的处理;
      • 然后把 工作 委托给 Worker,就是由 Worker 来执行用户 task;
      • 因为需要提高效率,所以需要有多线程处理,就是运行多个线程执行多个 worker;

    仅凭堆栈没有一个整体概念,本文我们就看看 Celery 是如何消费消息的。

    具体我们从 poll 开始看起,即 Redis 之中有一个新的任务消息,Celery 的 BRPOP 对应的 FD 收到了 Poll 响应

    0x02 逻辑 in kombu

    我们从 kombu 开始看。

    首先给出 Kombu 部分的整理逻辑图,这样大家就有了一个整体直观的了解:

     +-------------+        +-------------------+                                +-------------------------+
     | hub         |    1   | Transport         |                2               |MultiChannelPoller       |
     |             | fileno |                   |      cycle.on_readable(fileno) |                         |
     |       cb +--------------> on_readable   +-------------------------------------> _fd_to_chan[fileno] |
     |             |        |                   |                                |                         |
     |      poll   |        |                   +-<---------------+              |  chan.handlers[type]+---------------+
     +-------------+        |  _callbacks[queue]|                 |              |                         |           |
                            |        +          |                 |              +-------------------------+           |
                            |        |          |                 |                                                    |
                            +-------------------+                 |                                                    |
                                     |                            |                                                    |
                                     |                            |              +-----------------------+             |
                                     |                            |              | Channel               |      3      |
                                     |                            |              |                       | _brpop_read |
                                     |                            |              |                       |             |
                                     |                            +----------------+ connection          +<------------+
                                     |                   _deliver(message, queue)|                       |
                                     |        5                      4           |                       |
                                     |     callback(message)                     |                       |
                                     +----------------------------------------------> callback(message)+---------------+
                                                                                 +-----------------------+             |
                                                                                                                       |
                                                                                 +----------------------+              |
                                                                                 | Consumer             |              |
                                                                 on_m(message)   |                      |              |
                                                          +---------------------------+  on_message     | <------------+
                                                          |                      |                      |  _receive_callback
        kombu                                             |                      +----------------------+          6
                                                          |
    +-----------------------------------------------------------------------------------------------------------------------+
                                                          |
        Celery                                            |
                                             +---------------------------+
                                             | Consumer   |              |
                                             |            |              |
                                             |            v              |
                                             |      on_task_received     |
                                             |                           |
                                             |                           |
                                             +---------------------------+
    
    

    手机如下:

    我们在上图中可以看到逻辑上分为 Kombu 和 Celery 两个范畴,消息先从 Kombu 开始,然后来到了 Celery。

    2.1 消息循环 -- hub in kombu

    我们首先从消息循环 hub 开始入手。

    在 kombu/asynchronous/hub.py 中有如下代码:

    可以看到,当 poll 有消息,就会调用 readers[fd] 配置的 cb这里的 td 就是 redis socket 对应的 fd

    简略版代码如下:

    def create_loop(self,
                    generator=generator, sleep=sleep, min=min, next=next,
                    Empty=Empty, StopIteration=StopIteration,
                    KeyError=KeyError, READ=READ, WRITE=WRITE, ERR=ERR):
        
        readers, writers = self.readers, self.writers
        poll = self.poller.poll
        
        while 1:
            if readers or writers:
                to_consolidate = []
                try:
                    events = poll(poll_timeout)
    
                for fd, event in events or ():
    
                    if event & READ:
                       cb, cbargs = readers[fd]
    
                    cb(*cbargs)
    

    2.2 Broker抽象 -- Transport in kombu

    readers[fd] 之中注册的是 Transport 类的 on_readable 回调函数,所以代码来到 Transport。

    其作用为调用 MultiChannelPoller 处理。

    代码位置为:kombu/transport/redis.py,这里的 cycle 就是 Transport。

    def on_readable(self, fileno):
        """Handle AIO event for one of our file descriptors."""
        self.cycle.on_readable(fileno)
    

    此时变量为:

    fileno = {int} 34
    
    self = {Transport} <kombu.transport.redis.Transport object at 0x7fcbaeeb6710>
    

    如下,逻辑跑到了 Transport:

    +-------------+         +---------------+
    |   hub       |         |   Transport   |
    |             | fileno  |               |
    |       cb +--------------> on_readable |
    |             |         |               |
    |      poll   |         |               |
    +-------------+         +---------------+
    

    2.3 执行引擎 --- MultiChannelPoller in kombu

    此时代码来到 MultiChannelPoller。由前面系列文章我们知道,MultiChannelPoller 的作用是把 Channel 和 Poll 联系起来。其作用为调用 poll fd 对应的 Channel 进一步处理

    从代码能看到,每一个 fd 对应一个 Channel,因为 poll 只是告诉 Celery 某个 fd 有消息,但是具体怎么读消息,还需要 Celery 进一步处理

    因为 Celery 任务 使用的是 redis BRPOP 操作实现,所以此时获取的是 BRPOP 对应的回调函数 _brpop_read。

    代码位置为:kombu/transport/redis.py。

    def on_readable(self, fileno):
        chan, type = self._fd_to_chan[fileno]
        if chan.qos.can_consume():
            chan.handlers[type]()
    

    此时变量如下,我们可以看到对应的各个逻辑部分:

    chan.handlers[type] = {method} <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>>
    
    chan = {Channel} <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>
    
    fileno = {int} 34
    
    self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7fcbaddfd048>
    
    type = {str} 'BRPOP'
    

    2.4 解读消息 -- Channel in kombu

    此时 代码来到 Channel。代码为:kombu/transport/redis.py。

    Channel 这部分的作用为调用 redis client进行读消息,对消息进行解读,从而提出其中的 queue(就是代码片段里面的 dest 变量),这样就知道应该哪个用户(即 queue 对应的用户)来处理消息。然后使用 self.connection._deliver 对消息进行相应分发

    具体 _brpop_read 代码如下:

    def _brpop_read(self, **options):
        try:
            try:
                dest__item = self.client.parse_response(self.client.connection,
                                                        'BRPOP',
                                                        **options)
            except self.connection_errors:
                # if there's a ConnectionError, disconnect so the next
                # iteration will reconnect automatically.
                self.client.connection.disconnect()
                raise
            if dest__item:
                dest, item = dest__item
                dest = bytes_to_str(dest).rsplit(self.sep, 1)[0]
                self._queue_cycle.rotate(dest)
                self.connection._deliver(loads(bytes_to_str(item)), dest) # 消息分发
                return True
            else:
                raise Empty()
        finally:
            self._in_poll = None
    

    此时变量为:

    dest = {str} 'celery'
    
    dest__item = {tuple: 2} 
     0 = {bytes: 6} b'celery'
     1 = {bytes: 861} b'{"body": "W1syLCAxN10sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "myTest.add", "id": "863cf9b2-
    
    item = b'{"body": "W1syLCAxN10sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "myTest.add", "id": "863cf9b2-
    
    self = {Channel} <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>
    

    2.5 开始回调 -- Transport in kombu

    代码回到 Transport。

    此时代码作用为调用 self._callbacks 的 回调函数 进行处理。可以看出来,这里记录了对于 queue 的 回调。

    _callback 为:<function Channel.basic_consume.._callback at 0x7fcbaef56d08>。

    而且可以看出来任务消息的具体格式和内容,比如 {'exchange': '', 'routing_key': 'celery'},从这里就能知道 对应的 queue 是什么。

    代码位置为:transport/virtual/base.py。

    def _deliver(self, message, queue):
        try:
            callback = self._callbacks[queue]
        except KeyError:
            logger.warning(W_NO_CONSUMERS, queue)
            self._reject_inbound_message(message)
        else:
            callback(message)
    

    变量如下,我们可以看到,Celery 此时的三个不同的回调就对应了三个不同功能。

    • celeryev.c755f81c-415e-478f-bb51-def341a96c0c 就是对应 Event处理;
    • celery@.celery.pidbox 就是对应 control;
    • celery 就是正常消息消费;
    self._callbacks = {dict: 3} 
     'celeryev.c755f81c-415e-478f-bb51-def341a96c0c' = {function} <function Channel.basic_consume.<locals>._callback at 0x7fcbaef23048>
     'celery@.celery.pidbox' = {function} <function Channel.basic_consume.<locals>._callback at 0x7fcbaef56488>
     'celery' = {function} <function Channel.basic_consume.<locals>._callback at 0x7fcbaef56d08>
    
    message = {dict: 5} {'body': 'W1syLCAxN10sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==', 'content-encoding': 'utf-8', 'content-type': 'application/json', 'headers': {'lang': 'py', 'task': 'myTest.add', 'id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'parent_id': None, 'argsrepr': '(2, 17)', 'kwargsrepr': '{}', 'origin': 'gen19806@ demini'}, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'reply_to': 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc'}}
    
    queue = {str} 'celery'
        
    self = {Transport} <kombu.transport.redis.Transport object at 0x7fcbaeeb6710>
    

    此时逻辑图如下:

    +-------------+        +-------------------+                                +-------------------------+
    | hub         |    1   | Transport         |                2               |MultiChannelPoller       |
    |             | fileno |                   |      cycle.on_readable(fileno) |                         |
    |       cb +--------------> on_readable   +-------------------------------------> _fd_to_chan[fileno] |
    |             |        |                   |                                |                         |
    |      poll   |        |                   +-<---------------+              |  chan.handlers[type]+------------+
    +-------------+        |  _callbacks[queue]|                 |              |                         |        |
                           |                   |                 |              +-------------------------+        |
                           |                   |                 |                                                 |
                           +-------------------+                 |                                                 |
                                                                 |                                                 |
                                                                 |              +-----------------+                |
                                                                 |              | Channel         |        3       |
                                                                 |              |                 |   _brpop_read  |
                                                                 |              |                 |                |
                                                                 +----------------+ connection    | <--------------+
                                                        _deliver(message, queue)|                 |
                                                                    4           |                 |
                                                                                +-----------------+
    
    

    手机如下:

    2.6 开始回调 -- Channel in kombu

    代码继续回调到 kombu/transport/virtual/base.py。

    就是 queue 的 回调函数 basic_consume。因为此时 channel 得到了 queue 对应的 redis 消息,所以 Channel 就需要调用这个 queue 对应的回调函数。就是 调用 Consumer 的回调函数

    def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
        """Consume from `queue`."""
        self._tag_to_queue[consumer_tag] = queue
        self._active_queues.append(queue)
    
        def _callback(raw_message):
            message = self.Message(raw_message, channel=self)
            if not no_ack:
                self.qos.append(message, message.delivery_tag)
            return callback(message)
    
        self.connection._callbacks[queue] = _callback
        self._consumers.add(consumer_tag)
    
        self._reset_cycle()
    

    此时 变量为:

    callback = {method} <bound method Consumer._receive_callback of <Consumer: [<Queue celery -> <Exchange celery(direct) bound to chan:1> -> celery bound to chan:1>]>>
        
    message = {Message} <Message object at 0x7fcbaef3eaf8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc', 'body_length': 82, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'}, 'd
    
    raw_message = {dict: 5} {'body': 'W1syLCAxN10sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==', 'content-encoding': 'utf-8', 'content-type': 'application/json', 'headers': {'lang': 'py', 'task': 'myTest.add', 'id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'parent_id': None, 'argsrepr': '(2, 17)', 'kwargsrepr': '{}', 'origin': 'gen19806@ demini'}, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'reply_to': 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc'}}
                                                                        
    self = {Channel} <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>
    

    此时逻辑图如下:

    +-------------+        +-------------------+                                +-------------------------+
    | hub         |    1   | Transport         |                2               |MultiChannelPoller       |
    |             | fileno |                   |      cycle.on_readable(fileno) |                         |
    |       cb +--------------> on_readable   +-------------------------------------> _fd_to_chan[fileno] |
    |             |        |                   |                                |                         |
    |      poll   |        |                   +-<---------------+              |  chan.handlers[type]+---------------+
    +-------------+        |  _callbacks[queue]|                 |              |                         |           |
                           |        +          |                 |              +-------------------------+           |
                           |        |          |                 |                                                    |
                           +-------------------+                 |                                                    |
                                    |                            |                                                    |
                                    |                            |              +-----------------------+             |
                                    |                            |              | Channel               |      3      |
                                    |                            |              |                       | _brpop_read |
                                    |                            |              |                       |             |
                                    |                            +----------------+ connection          +<------------+
                                    |                   _deliver(message, queue)|                       |
                                    |        5                      4           |                       |
                                    |     callback(message)                     |                       |
                                    +----------------------------------------------> callback(message)+--------------->
                                                                                +-----------------------+
    
    

    手机如下:

    2.7 调用回调 -- Consumer in kombu

    Kombu Consumer 回调的代码位于:kombu/messaging.py

    具体是调用 用户注册在 Kombu Consumer 的回调函数。注意的是: Kombu Comsumer 的用户就是 Celery,所以这里马上就调用到了 Celery 之前注册的回调函数。

    def _receive_callback(self, message):
        accept = self.accept
        on_m, channel, decoded = self.on_message, self.channel, None
        try:
            m2p = getattr(channel, 'message_to_python', None)
            if m2p:
                message = m2p(message)
            if accept is not None:
                message.accept = accept
            if message.errors:
                return message._reraise_error(self.on_decode_error)
            decoded = None if on_m else message.decode()
        except Exception as exc:
            if not self.on_decode_error:
                raise
            self.on_decode_error(message, exc)
        else:
            return on_m(message) if on_m else self.receive(decoded, message)
    

    变量为:

    on_m = {function} <function Consumer.create_task_handler.<locals>.on_task_received at 0x7fcbaef562f0>
    
    accept = {set: 1} {'application/json'}
    
    channel = {Channel} <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>
    
    m2p = {method} <bound method Channel.message_to_python of <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>>
    
    message = {Message} <Message object at 0x7fcbaef3eaf8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc', 'body_length': 82, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'}, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}}>
    
    self = {Consumer} <Consumer: [<Queue celery -> <Exchange celery(direct) bound to chan:1> -> celery bound to chan:1>]>
    

    具体逻辑如下:

     +-------------+        +-------------------+                                +-------------------------+
     | hub         |    1   | Transport         |                2               |MultiChannelPoller       |
     |             | fileno |                   |      cycle.on_readable(fileno) |                         |
     |       cb +--------------> on_readable   +-------------------------------------> _fd_to_chan[fileno] |
     |             |        |                   |                                |                         |
     |      poll   |        |                   +-<---------------+              |  chan.handlers[type]+---------------+
     +-------------+        |  _callbacks[queue]|                 |              |                         |           |
                            |        +          |                 |              +-------------------------+           |
                            |        |          |                 |                                                    |
                            +-------------------+                 |                                                    |
                                     |                            |                                                    |
                                     |                            |              +-----------------------+             |
                                     |                            |              | Channel               |      3      |
                                     |                            |              |                       | _brpop_read |
                                     |                            |              |                       |             |
                                     |                            +----------------+ connection          +<------------+
                                     |                   _deliver(message, queue)|                       |
                                     |        5                      4           |                       |
                                     |     callback(message)                     |                       |
                                     +----------------------------------------------> callback(message)+---------------+
                                                                                 +-----------------------+             |
                                                                                                                       |
                                                                                 +----------------------+              |
                                                                                 | Consumer             |              |
                                                                 on_m(message)   |                      |              |
                                                          +---------------------------+  on_message     | <------------+
                                                          |                      |                      |  _receive_callback
                                                          |                      +----------------------+          6
                                                          |
    +-----------------------------------------------------------------------------------------------------------------------+
                                                          |
                                                          v
    
    

    手机如下:

    2.8 来到 Celery 范畴 -- Consumer in Celery

    既然调用到了 Celery 之前注册的回调函数,我们实际就来到了 Celery 领域。

    2.8.1 配置回调

    需要回忆下 Celery 何时配置回调函数。

    在 celery/worker/loops.py 中有如下代码,这样就让consumer可以回调:

    def asynloop(obj, connection, consumer, blueprint, hub, qos,
                 heartbeat, clock, hbrate=2.0):
        """Non-blocking event loop."""
        consumer.on_message = on_task_received
    

    2.8.2 回调函数

    回调函数位于:celery/worker/consumer/consumer.py

    可以看到,create_task_handler 函数中,返回了on_task_received,这就是回调函数。

    def create_task_handler(self, promise=promise):
        strategies = self.strategies
        on_unknown_message = self.on_unknown_message
        on_unknown_task = self.on_unknown_task
        on_invalid_task = self.on_invalid_task
        callbacks = self.on_task_message
        call_soon = self.call_soon
    
        def on_task_received(message):
            # payload will only be set for v1 protocol, since v2
            # will defer deserializing the message body to the pool.
            payload = None
            try:
                type_ = message.headers['task']                # protocol v2
            except TypeError:
                return on_unknown_message(None, message)
            except KeyError:
                try:
                    payload = message.decode()
                except Exception as exc:  # pylint: disable=broad-except
                    return self.on_decode_error(message, exc)
                try:
                    type_, payload = payload['task'], payload  # protocol v1
                except (TypeError, KeyError):
                    return on_unknown_message(payload, message)
            try:
                strategy = strategies[type_]
            except KeyError as exc:
                return on_unknown_task(None, message, exc)
            else:
                try:
                    strategy(
                        message, payload,
                        promise(call_soon, (message.ack_log_error,)),
                        promise(call_soon, (message.reject_log_error,)),
                        callbacks,
                    )
                except (InvalidTaskError, ContentDisallowed) as exc:
                    return on_invalid_task(payload, message, exc)
                except DecodeError as exc:
                    return self.on_decode_error(message, exc)
    
        return on_task_received
    

    此时 变量为:

    call_soon = {method} <bound method Consumer.call_soon of <Consumer: celery@ demini (running)>>
    
    callbacks = {set: 0} set()
    
    message = {Message} <Message object at 0x7fcbaef3eaf8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc', 'body_length': 82, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'}, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}}>
    
    on_invalid_task = {method} <bound method Consumer.on_invalid_task of <Consumer: celery@ demini (running)>>
    
    on_unknown_message = {method} <bound method Consumer.on_unknown_message of <Consumer: celery@ demini (running)>>
    
    on_unknown_task = {method} <bound method Consumer.on_unknown_task of <Consumer: celery@ demini (running)>>
    
    self = {Consumer} <Consumer: celery@ demini (running)>
    
    strategies = {dict: 10} {'celery.chunks': <function default.<locals>.task_message_handler at 0x7fcbaef230d0>, 'celery.backend_cleanup': <function default.<locals>.task_message_handler at 0x7fcbaef23620>, 'celery.chord_unlock': <function default.<locals>.task_message_handler at 0x7fcbaef238c8>, 'celery.group': <function default.<locals>.task_message_handler at 0x7fcbaef23b70>, 'celery.map': <function default.<locals>.task_message_handler at 0x7fcbaef23e18>, 'celery.chain': <function default.<locals>.task_message_handler at 0x7fcbaef48158>, 'celery.starmap': <function default.<locals>.task_message_handler at 0x7fcbaef48400>, 'celery.chord': <function default.<locals>.task_message_handler at 0x7fcbaef486a8>, 'myTest.add': <function default.<locals>.task_message_handler at 0x7fcbaef48950>, 'celery.accumulate': <function default.<locals>.task_message_handler at 0x7fcbaef48bf8>}
    

    此时逻辑为:

     +-------------+        +-------------------+                                +-------------------------+
     | hub         |    1   | Transport         |                2               |MultiChannelPoller       |
     |             | fileno |                   |      cycle.on_readable(fileno) |                         |
     |       cb +--------------> on_readable   +-------------------------------------> _fd_to_chan[fileno] |
     |             |        |                   |                                |                         |
     |      poll   |        |                   +-<---------------+              |  chan.handlers[type]+---------------+
     +-------------+        |  _callbacks[queue]|                 |              |                         |           |
                            |        +          |                 |              +-------------------------+           |
                            |        |          |                 |                                                    |
                            +-------------------+                 |                                                    |
                                     |                            |                                                    |
                                     |                            |              +-----------------------+             |
                                     |                            |              | Channel               |      3      |
                                     |                            |              |                       | _brpop_read |
                                     |                            |              |                       |             |
                                     |                            +----------------+ connection          +<------------+
                                     |                   _deliver(message, queue)|                       |
                                     |        5                      4           |                       |
                                     |     callback(message)                     |                       |
                                     +----------------------------------------------> callback(message)+---------------+
                                                                                 +-----------------------+             |
                                                                                                                       |
                                                                                 +----------------------+              |
                                                                                 | Consumer             |              |
                                                                 on_m(message)   |                      |              |
                                                          +---------------------------+  on_message     | <------------+
                                                          |                      |                      |  _receive_callback
        kombu                                             |                      +----------------------+          6
                                                          |
    +-----------------------------------------------------------------------------------------------------------------------+
                                                          |
        Celery                                            |
                                             +---------------------------+
                                             | Consumer   |              |
                                             |            |              |
                                             |            v              |
                                             |      on_task_received     |
                                             |                           |
                                             |                           |
                                             +---------------------------+
    
    

    手机如下:

    0x03 逻辑 in Celery

    至此,我们开始在 Celery 之中活动。

    3.1 逻辑入口 --- consumer in Celery

    首先来到了 Celery 的 Consumer 组件,这里从概念上说是消费的逻辑入口

    Celery Consumer 的代码位于:celery/worker/consumer/consumer.py,其作用如下:

    • 解析 message,从 header 中拿到 task 名字,比如 'myTest.add';

    • 根据 task 名字,获得 strategy;

    • 调用 strategy;

    代码为:

    def create_task_handler(self, promise=promise):
        strategies = self.strategies
        on_unknown_message = self.on_unknown_message
        on_unknown_task = self.on_unknown_task
        on_invalid_task = self.on_invalid_task
        callbacks = self.on_task_message
        call_soon = self.call_soon
    
        def on_task_received(message):
            # payload will only be set for v1 protocol, since v2
            # will defer deserializing the message body to the pool.
            payload = None
            try:
                type_ = message.headers['task']                # protocol v2
            except TypeError:
                return on_unknown_message(None, message)
            except KeyError:
                try:
                    payload = message.decode()
                except Exception as exc:  # pylint: disable=broad-except
                    return self.on_decode_error(message, exc)
                try:
                    type_, payload = payload['task'], payload  # protocol v1
                except (TypeError, KeyError):
                    return on_unknown_message(payload, message)
            try:
                strategy = strategies[type_]
            except KeyError as exc:
                return on_unknown_task(None, message, exc)
            else:
                try:
                    strategy(
                        message, payload,
                        promise(call_soon, (message.ack_log_error,)),
                        promise(call_soon, (message.reject_log_error,)),
                        callbacks,
                    )
                except (InvalidTaskError, ContentDisallowed) as exc:
                    return on_invalid_task(payload, message, exc)
                except DecodeError as exc:
                    return self.on_decode_error(message, exc)
    
        return on_task_received
    

    变量为:

    self.app.tasks = {TaskRegistry: 10} {'celery.chunks': <@task: celery.chunks of myTest at 0x7fcbade229e8>, 'celery.backend_cleanup': <@task: celery.backend_cleanup of myTest at 0x7fcbade229e8>, 'celery.chord_unlock': <@task: celery.chord_unlock of myTest at 0x7fcbade229e8>, 'celery.group': <@
    
                                         call_soon = {method} <bound method Consumer.call_soon of <Consumer: celery@ demini (running)>>
                                         
    callbacks = {set: 0} set()
                                         
    message = {Message} <Message object at 0x7fcbaef3eaf8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc', 'body_length': 82, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'}, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}}>
                                         
    on_invalid_task = {method} <bound method Consumer.on_invalid_task of <Consumer: celery@ demini (running)>>
                                         
    on_unknown_message = {method} <bound method Consumer.on_unknown_message of <Consumer: celery@ demini (running)>>
                                         
    on_unknown_task = {method} <bound method Consumer.on_unknown_task of <Consumer: celery@ demini (running)>>    
                                         
    self = {Consumer} <Consumer: celery@ demini (running)>
                                         
    strategies = {dict: 10} 
    {'celery.chunks': <function default.<locals>.task_message_handler at 0x7fcbaef230d0>, 'celery.backend_cleanup': <function default.<locals>.task_message_handler at 0x7fcbaef23620>, 'celery.chord_unlock': <function default.<locals>.task_message_handler at 0x7fcbaef238c8>, 'celery.group': <function default.<locals>.task_message_handler at 0x7fcbaef23b70>, 'celery.map': <function default.<locals>.task_message_handler at 0x7fcbaef23e18>, 'celery.chain': <function default.<locals>.task_message_handler at 0x7fcbaef48158>, 'celery.starmap': <function default.<locals>.task_message_handler at 0x7fcbaef48400>, 'celery.chord': <function default.<locals>.task_message_handler at 0x7fcbaef486a8>, 'myTest.add': <function default.<locals>.task_message_handler at 0x7fcbaef48950>, 'celery.accumulate': <function default.<locals>.task_message_handler at 0x7fcbaef48bf8>}
    

    3.1.1 解析 message

    通过 如下代码获得需要对应哪个 task,这里就为 'myTest.add'。

    type_ = message.headers['task']
    

    message.headers如下,我们可以看出来定义一个 message 都需要考虑哪些方面。

    message.headers = {dict: 15} 
     'lang' = {str} 'py'
     'task' = {str} 'myTest.add'
     'id' = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'
     'shadow' = {NoneType} None
     'eta' = {NoneType} None
     'expires' = {NoneType} None
     'group' = {NoneType} None
     'group_index' = {NoneType} None
     'retries' = {int} 0
     'timelimit' = {list: 2} [None, None]
     'root_id' = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'
     'parent_id' = {NoneType} None
     'argsrepr' = {str} '(2, 17)'
     'kwargsrepr' = {str} '{}'
     'origin' = {str} 'gen19806@ demini'
     __len__ = {int} 15
    

    3.1.2 获得 strategy

    依据 task,这里就为 'myTest.add',从 strategies 获得对应的回调 function,回调 function就是开始处理 任务消息。

    strategies = {dict: 10} 
     'celery.chunks' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef230d0>
     'celery.backend_cleanup' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef23620>
     'celery.chord_unlock' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef238c8>
     'celery.group' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef23b70>
     'celery.map' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef23e18>
     'celery.chain' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef48158>
     'celery.starmap' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef48400>
     'celery.chord' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef486a8>
     'myTest.add' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef48950>
     'celery.accumulate' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef48bf8>
     __len__ = {int} 10
    

    3.1.3 调用 strategy

    既然得到 strategy,比如:

    <function default.<locals>.task_message_handler at 0x7fcbaef48950>

    因此会调用这个函数,具体调用如下:

    strategy(
        message, payload,
        promise(call_soon, (message.ack_log_error,)),
        promise(call_soon, (message.reject_log_error,)),
        callbacks,
    )
    

    3.2 策略 --- strategy

    Strategy 的作用是在 Consumer 和 Worker 之间做一个中间层,用来根据不同条件做不同的处理,也就是策略的本意。

    3.2.1 逻辑 in strategy

    代码为:celery/worker/strategy.py,

    功能具体就是:

    • 进一步解析消息;
    • 根据消息构建内部的Req;
    • 如果需要发送,则发送 'task-received'’;
    • 进行时间 eta 处理;
    • 进行qos 和 limit 处理;
    • 调用 Req ,即来到 Worker;

    具体如下:

    def task_message_handler(message, body, ack, reject, callbacks,
                             to_timestamp=to_timestamp):
        if body is None and 'args' not in message.payload:
            body, headers, decoded, utc = (
                message.body, message.headers, False, app.uses_utc_timezone(),
            )
        else:
            if 'args' in message.payload:
                body, headers, decoded, utc = hybrid_to_proto2(message,
                                                               message.payload)
            else:
                body, headers, decoded, utc = proto1_to_proto2(message, body)
    
        req = Req(
            message,
            on_ack=ack, on_reject=reject, app=app, hostname=hostname,
            eventer=eventer, task=task, connection_errors=connection_errors,
            body=body, headers=headers, decoded=decoded, utc=utc,
        )
      
        if (req.expires or req.id in revoked_tasks) and req.revoked():
            return
    
        signals.task_received.send(sender=consumer, request=req)
    
        if task_sends_events:
            send_event(
                'task-received',
                uuid=req.id, name=req.name,
                args=req.argsrepr, kwargs=req.kwargsrepr,
                root_id=req.root_id, parent_id=req.parent_id,
                retries=req.request_dict.get('retries', 0),
                eta=req.eta and req.eta.isoformat(),
                expires=req.expires and req.expires.isoformat(),
            )
    
        bucket = None
        eta = None
        if req.eta:
            try:
                if req.utc:
                    eta = to_timestamp(to_system_tz(req.eta))
                else:
                    eta = to_timestamp(req.eta, app.timezone)
            except (OverflowError, ValueError) as exc:
                error("Couldn't convert ETA %r to timestamp: %r. Task: %r",
                      req.eta, exc, req.info(safe=True), exc_info=True)
                req.reject(requeue=False)
        if rate_limits_enabled:
            bucket = get_bucket(task.name)
    
        if eta and bucket:
            consumer.qos.increment_eventually()
            return call_at(eta, limit_post_eta, (req, bucket, 1),
                           priority=6)
        if eta:
            consumer.qos.increment_eventually()
            call_at(eta, apply_eta_task, (req,), priority=6)
            return task_message_handler
        if bucket:
            return limit_task(req, bucket, 1)
    
        task_reserved(req)
        if callbacks:
            [callback(req) for callback in callbacks]
            
        handle(req) # 在这里
        
    return task_message_handler
    

    具体还要看看细节。

    3.2.2 获得实例

    Strategy 中,以下目的是为了 根据 task 实例 构建一个 Request,从而把 broker 消息,consumer,多进程都联系起来

    具体可以看到 Request. execute_using_pool 这里就会和多进程处理开始关联,比如和 comsumer 的 pool 进程池联系起来。

    Req = create_request_cls(Request, task, consumer.pool, hostname, eventer)
    

    task 实例为:

    myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f]  
    

    获得Requst代码为:

    def create_request_cls(base, task, pool, hostname, eventer,
                           ref=ref, revoked_tasks=revoked_tasks,
                           task_ready=task_ready, trace=trace_task_ret):
        default_time_limit = task.time_limit
        default_soft_time_limit = task.soft_time_limit
        apply_async = pool.apply_async
        acks_late = task.acks_late
        events = eventer and eventer.enabled
    
        class Request(base):
    
            def execute_using_pool(self, pool, **kwargs):
                task_id = self.task_id
                if (self.expires or task_id in revoked_tasks) and self.revoked():
                    raise TaskRevokedError(task_id)
    
                time_limit, soft_time_limit = self.time_limits
                result = apply_async(
                    trace,
                    args=(self.type, task_id, self.request_dict, self.body,
                          self.content_type, self.content_encoding),
                    accept_callback=self.on_accepted,
                    timeout_callback=self.on_timeout,
                    callback=self.on_success,
                    error_callback=self.on_failure,
                    soft_timeout=soft_time_limit or default_soft_time_limit,
                    timeout=time_limit or default_time_limit,
                    correlation_id=task_id,
                )
                # cannot create weakref to None
                # pylint: disable=attribute-defined-outside-init
                self._apply_result = maybe(ref, result)
                return result
    
            def on_success(self, failed__retval__runtime, **kwargs):
                failed, retval, runtime = failed__retval__runtime
                if failed:
                    if isinstance(retval.exception, (
                            SystemExit, KeyboardInterrupt)):
                        raise retval.exception
                    return self.on_failure(retval, return_ok=True)
                task_ready(self)
    
                if acks_late:
                    self.acknowledge()
    
                if events:
                    self.send_event(
                        'task-succeeded', result=retval, runtime=runtime,
                    )
    
        return Request
    

    此时逻辑如下:

                             +
      Consumer               |
                     message |
                             v         strategy  +------------------------------------+
                +------------+------+            | strategies                         |
                | on_task_received  | <--------+ |                                    |
                |                   |            |[myTest.add : task_message_handler] |
                +------------+------+            +------------------------------------+
                             |
                             |
    +---------------------------------------------------------------------------------------+
                             |
     strategy                |
                             |
                             v                Request [myTest.add]
                +------------+-------------+                       +---------------------+
                | task_message_handler     | <-------------------+ | create_request_cls  |
                |                          |                       |                     |
                +--------------------------+                       +---------------------+
    
    

    3.2.3 调用实例

    task_message_handler 最终调用 handle(req),就是开始调用实例。

    handle 函数实际对应了 WorkController._process_task_sem。

    代码如下:

    def task_message_handler(message, body, ack, reject, callbacks,
                                 to_timestamp=to_timestamp):
    
            req = Req(
                message,
                on_ack=ack, on_reject=reject, app=app, hostname=hostname,
                eventer=eventer, task=task, connection_errors=connection_errors,
                body=body, headers=headers, decoded=decoded, utc=utc,
            )
    
            task_reserved(req)
            
            if callbacks:
                [callback(req) for callback in callbacks]
                
            handle(req)
            
        return task_message_handler
    
    

    Request 为:

    req = {Request} myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f]  
     acknowledged = {bool} False
     app = {Celery} <Celery myTest at 0x7fcbade229e8>
     args = {list: 2} [2, 17]
     argsrepr = {str} '(2, 17)'
     body = {bytes: 82} b'[[2, 17], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]'
     chord = {NoneType} None
     connection_errors = {tuple: 8} (<class 'amqp.exceptions.ConnectionError'>, <class 'kombu.exceptions.InconsistencyError'>, <class 'OSError'>, <class 'OSError'>, <class 'OSError'>, <class 'redis.exceptions.ConnectionError'>, <class 'redis.exceptions.AuthenticationError'>, <class 'redis.exceptions.TimeoutError'>)
     content_encoding = {str} 'utf-8'
     content_type = {str} 'application/json'
     correlation_id = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'
     delivery_info = {dict: 4} {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}
     errbacks = {NoneType} None
     eta = {NoneType} None
     eventer = {EventDispatcher} <celery.events.dispatcher.EventDispatcher object at 0x7fcbaeef31d0>
     expires = {NoneType} None
     group = {NoneType} None
     group_index = {NoneType} None
     hostname = {str} 'celery@ demini'
     id = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'
     kwargs = {dict: 0} {}
     kwargsrepr = {str} '{}'
     message = {Message} <Message object at 0x7fcbaef3eaf8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc', 'body_length': 82, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'}, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}}>
     name = {str} 'myTest.add'
     on_ack = {promise} <promise@0x7fcbaeecc210 --> <bound method Consumer.call_soon of <Consumer: celery@ demini (running)>>>
     on_reject = {promise} <promise@0x7fcbaeeccf20 --> <bound method Consumer.call_soon of <Consumer: celery@ demini (running)>>>
     parent_id = {NoneType} None
     reply_to = {str} 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93'
     request_dict = {dict: 25} {'lang': 'py', 'task': 'myTest.add', 'id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'parent_id': None, 'argsrepr': '(2, 17)', 'kwargsrepr': '{}', 'origin': 'gen19806@ demini', 'reply_to': 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93', 'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'hostname': 'celery@ demini', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 17], 'kwargs': {}, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None}
     root_id = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'
     store_errors = {bool} True
     task = {add} <@task: myTest.add of myTest at 0x7fcbade229e8>
     task_id = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'
     task_name = {str} 'myTest.add'
     time_limits = {list: 2} [None, None]
     time_start = {NoneType} None
     type = {str} 'myTest.add'
     tzlocal = {NoneType} None
     utc = {bool} True
     worker_pid = {NoneType} None
    

    handle 为:

    handle = {method} <bound method WorkController._process_task_sem of <Worker: celery@ demini (running)>>
    headers = {dict: 25} {'lang': 'py', 'task': 'myTest.add', 'id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'parent_id': None, 'argsrepr': '(2, 17)', 'kwargsrepr': '{}', 'origin': 'gen19806@ demini', 'reply_to': 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93', 'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'hostname': 'celery@ demini', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 17], 'kwargs': {}, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None}
    

    此时逻辑如下:

                             +
      Consumer               |
                     message |
                             v         strategy  +------------------------------------+
                +------------+------+            | strategies                         |
                | on_task_received  | <--------+ |                                    |
                |                   |            |[myTest.add : task_message_handler] |
                +------------+------+            +------------------------------------+
                             |
                             |
     +------------------------------------------------------------------------------------+
     strategy                |
                             |
                             |
                             v                Request [myTest.add]
                +------------+-------------+                       +---------------------+
                | task_message_handler     | <-------------------+ | create_request_cls  |
                |                          |                       |                     |
                +------------+-------------+                       +---------------------+
                             | _process_task_sem
                             | 
    +--------------------------------------------------------------------------------------+
     Worker                  | req[{Request} myTest.add]
                             v
                    +--------+-------+
                    | WorkController |
                    +----------------+
    
    

    手机如下:

    3.3 打工人 -- Worker in Celery

    程序来到了Worker in Celery。Worker 是具体执行 task 的地方

    代码位于:celery/worker/worker.py

    可以看到,就是:

    • _process_task_sem 调用了 _process_task;
    • _process_task 调用了req.execute_using_pool(self.pool);

    具体如下:

    class WorkController:
        """Unmanaged worker instance."""
    
        def register_with_event_loop(self, hub):
            self.blueprint.send_all(
                self, 'register_with_event_loop', args=(hub,),
                description='hub.register',
            )
    
        def _process_task_sem(self, req):
            return self._quick_acquire(self._process_task, req)
    
        def _process_task(self, req):
            """Process task by sending it to the pool of workers."""
            try:
                req.execute_using_pool(self.pool)
    

    变量为:

    req = {Request} myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f]  
    
    self = {Worker} celery
    

    3.3.1 Request in Celery

    程序来到了Worker in Celery。代码位于:celery/worker/request.py

    因为有:

    apply_async = pool.apply_async
    

    所以调用到:pool.apply_async

    变量为:

    apply_async = {method} <bound method BasePool.apply_async of <celery.concurrency.prefork.TaskPool object at 0x7fcbaddfa2e8>>
    
    pool = {TaskPool} <celery.concurrency.prefork.TaskPool object at 0x7fcbaddfa2e8>
    
    revoked_tasks = {LimitedSet: 0} <LimitedSet(0): maxlen=50000, expires=10800, minlen=0>
        
    self = {Request} myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f]  
    

    代码为:

    class Request(base):
    
        def execute_using_pool(self, pool, **kwargs):
            task_id = self.task_id# 获取任务id
            if (self.expires or task_id in revoked_tasks) and self.revoked():# 检查是否过期或者是否已经执行过
                raise TaskRevokedError(task_id)
    
            time_limit, soft_time_limit = self.time_limits# 获取时间
            result = apply_async(# 执行对应的func并返回结果
                trace,
                args=(self.type, task_id, self.request_dict, self.body,
                      self.content_type, self.content_encoding),
                accept_callback=self.on_accepted,
                timeout_callback=self.on_timeout,
                callback=self.on_success,
                error_callback=self.on_failure,
                soft_timeout=soft_time_limit or default_soft_time_limit,
                timeout=time_limit or default_time_limit,
                correlation_id=task_id,
            )
            # cannot create weakref to None
            # pylint: disable=attribute-defined-outside-init
            self._apply_result = maybe(ref, result)
            return result
    

    此时逻辑为:

                             +
      Consumer               |
                     message |
                             v         strategy  +------------------------------------+
                +------------+------+            | strategies                         |
                | on_task_received  | <--------+ |                                    |
                |                   |            |[myTest.add : task_message_handler] |
                +------------+------+            +------------------------------------+
                             |
                             |
     +------------------------------------------------------------------------------------+
     strategy                |
                             |
                             |
                             v                Request [myTest.add]
                +------------+-------------+                       +---------------------+
                | task_message_handler     | <-------------------+ | create_request_cls  |
                |                          |                       |                     |
                +------------+-------------+                       +---------------------+
                             | _process_task_sem
                             |
    +--------------------------------------------------------------------------------------+
     Worker                  | req[{Request} myTest.add]
                             v
                    +--------+-----------+
                    | WorkController     |
                    |                    |
                    |            pool +-------------------------+
                    +--------+-----------+                      |
                             |                                  |
                             |               apply_async        v
                 +-----------+----------+                   +---+-------+
                 |{Request} myTest.add  | +---------------> | TaskPool  |
                 +----------------------+                   +-----------+
                                            myTest.add
    
    

    手机如下:

    3.3.2 BasePool in Celery

    apply_async 代码来到了Celery 的 Pool,注意,这 还不是 多进程的具体实现,只是来到了多进程实现的入口

    此时就把 任务信息具体传递给了Pool,比如:

    args = {tuple: 6} 
     0 = {str} 'myTest.add'
     1 = {str} 'af6ed084-efc6-4608-a13a-d3065f457cd5'
     2 = {dict: 21} {'lang': 'py', 'task': 'myTest.add', 'id': 'af6ed084-efc6-4608-a13a-d3065f457cd5', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'af6ed084-efc6-4608-a13a-d3065f457cd5', 'parent_id': None, 'argsrepr': '(2, 8)', 'kwargsrepr': '{}', 'origin': 'gen1100@DESKTOP-0GO3RPO', 'reply_to': 'afb85541-d08c-3191-b89d-918e15f9e0bf', 'correlation_id': 'af6ed084-efc6-4608-a13a-d3065f457cd5', 'hostname': 'celery@DESKTOP-0GO3RPO', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 8], 'kwargs': {}}
     3 = {bytes: 81} b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]'
     4 = {str} 'application/json'
     5 = {str} 'utf-8'
    

    文件位于:celery/concurrency/base.py,具体为:

    class BasePool:
        """Task pool."""
    
        def apply_async(self, target, args=None, kwargs=None, **options):
            """Equivalent of the :func:`apply` built-in function.
    
            Callbacks should optimally return as soon as possible since
            otherwise the thread which handles the result will get blocked.
            """
            kwargs = {} if not kwargs else kwargs
            args = [] if not args else args
    
            return self.on_apply(target, args, kwargs,
                                 waitforslot=self.putlocks,
                                 callbacks_propagate=self.callbacks_propagate,
                                 **options)
    

    此时变量为:

    options = {dict: 7} {'accept_callback': <bound method Request.on_accepted of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>>, 'timeout_callback': <bound method Request.on_timeout of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>
    
    self = {TaskPool} <celery.concurrency.prefork.TaskPool object at 0x7fcbaddfa2e8>
    

    3.3.3 AsynPool in Celery

    apply_async 代码位于:celery/billiard/pool.py。

    这里在 __init__ 之中,self._initargs = initargs 就是 (<Celery myTest at 0x2663db3fe48>, 'celery@DESKTOP-0GO3RPO')这样就把 Celery 应用传递了进来

    这里依据操作系统的而不同,会调用 self._taskqueue.put 或者 self._quick_put 来给 多进程 pool 发送任务消息。

    def apply_async(self, func, args=(), kwds={},
                    callback=None, error_callback=None, accept_callback=None,
                    timeout_callback=None, waitforslot=None,
                    soft_timeout=None, timeout=None, lost_worker_timeout=None,
                    callbacks_propagate=(),
                    correlation_id=None):
        '''
        Asynchronous equivalent of `apply()` method.
    
        Callback is called when the functions return value is ready.
        The accept callback is called when the job is accepted to be executed.
    
        Simplified the flow is like this:
    
            >>> def apply_async(func, args, kwds, callback, accept_callback):
            ...     if accept_callback:
            ...         accept_callback()
            ...     retval = func(*args, **kwds)
            ...     if callback:
            ...         callback(retval)
    
        '''
    
        if self._state == RUN:
            waitforslot = self.putlocks if waitforslot is None else waitforslot
            if waitforslot and self._putlock is not None:
                self._putlock.acquire()
            result = ApplyResult(
                self._cache, callback, accept_callback, timeout_callback,
                error_callback, soft_timeout, timeout, lost_worker_timeout,
                on_timeout_set=self.on_timeout_set,
                on_timeout_cancel=self.on_timeout_cancel,
                callbacks_propagate=callbacks_propagate,
                send_ack=self.send_ack if self.synack else None,
                correlation_id=correlation_id,
            )
            if timeout or soft_timeout:
                # start the timeout handler thread when required.
                self._start_timeout_handler()
            if self.threads:
                self._taskqueue.put(([(TASK, (result._job, None,
                                    func, args, kwds))], None))
            else:
                self._quick_put((TASK, (result._job, None, func, args, kwds)))
            return result
    

    变量为:

    accept_callback = {method} <bound method Request.on_accepted of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>>
    args = {tuple: 6} ('myTest.add', '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', {'lang': 'py', 'task': 'myTest.add', 'id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'parent_id': None, 'argsrepr': '(2, 17)', 'kwargsrepr': '{}', 'origin': 'gen19806@ demini', 'reply_to': 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93', 'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'hostname': 'celery@ demini', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 17], 'kwargs': {}, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None}, b'[[2, 17], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8')
    
    callback = {method} <bound method create_request_cls.<locals>.Request.on_success of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>>
    
    error_callback = {method} <bound method Request.on_failure of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>>
    
    self = {AsynPool} <celery.concurrency.asynpool.AsynPool object at 0x7fcbaee2ea20>
    
    timeout_callback = {method} <bound method Request.on_timeout of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>>
    waitforslot = {bool} False
    
    3.3.3.1 部分变量事先设置

    我们首先说说之前的部分变量设置。

    比如如下代码中有:

    inq, outq, synq = self.get_process_queues() 和 self._process_register_queues(w, (inq, outq, synq)) 就是具体设置父进程和子进程之前的管道。

    def _create_worker_process(self, i):
        sentinel = self._ctx.Event() if self.allow_restart else None
        
        inq, outq, synq = self.get_process_queues()
        on_ready_counter = self._ctx.Value('i')
        
        w = self.WorkerProcess(self.Worker(
            inq, outq, synq, self._initializer, self._initargs,
            self._maxtasksperchild, sentinel, self._on_process_exit,
            # Need to handle all signals if using the ipc semaphore,
            # to make sure the semaphore is released.
            sigprotection=self.threads,
            wrap_exception=self._wrap_exception,
            max_memory_per_child=self._max_memory_per_child,
            on_ready_counter=on_ready_counter,
        ))
        self._pool.append(w)
        self._process_register_queues(w, (inq, outq, synq))
        w.name = w.name.replace('Process', 'PoolWorker')
        w.daemon = True
        w.index = i
        w.start()
        self._poolctrl[w.pid] = sentinel
        self._on_ready_counters[w.pid] = on_ready_counter
        if self.on_process_up:
            self.on_process_up(w)
        return w
    

    比如下面是管道的建立。

    def _setup_queues(self):
        self._inqueue = self._ctx.SimpleQueue()
        self._outqueue = self._ctx.SimpleQueue()
        self._quick_put = self._inqueue._writer.send
        self._quick_get = self._outqueue._reader.recv
    

    以及管道相关文件的确立。

    def _create_write_handlers(self, hub,
                               pack=pack, dumps=_pickle.dumps,
                               protocol=HIGHEST_PROTOCOL):
        """Create handlers used to write data to child processes."""
        fileno_to_inq = self._fileno_to_inq
        fileno_to_synq = self._fileno_to_synq
        outbound = self.outbound_buffer
        pop_message = outbound.popleft
        put_message = outbound.append
    

    所以最终预置变量具体如下:

    self._taskqueue = {Queue} <queue.Queue object at 0x7fcbaee57b00>
    self._quick_put = {function} <function AsynPool._create_write_handlers.<locals>.send_job at 0x7fcbaef569d8>
    self._outqueue = {NoneType} None
    self._inqueue = {NoneType} None
    self._fileno_to_synq = {dict: 1} {None: <ForkProcess(ForkPoolWorker-4, started daemon)>}
    self._quick_get = {NoneType} None
    self._fileno_to_inq = {dict: 0} {}
    self.outbound_buffer = {deque: 1} deque([<%s: 0 ack:False ready:False>])
    
    
    self = {Pool} <billiard.pool.Pool object at 0x000002663FD6E948>
     ResultHandler = {type} <class 'billiard.pool.ResultHandler'>
     SoftTimeLimitExceeded = {type} <class 'billiard.exceptions.SoftTimeLimitExceeded'>
     Supervisor = {type} <class 'billiard.pool.Supervisor'>
     TaskHandler = {type} <class 'billiard.pool.TaskHandler'>
     TimeoutHandler = {type} <class 'billiard.pool.TimeoutHandler'>
     Worker = {type} <class 'billiard.pool.Worker'>
    
    3.3.3.2 发送给子进程

    在 windows 就是

    if self.threads:    self._taskqueue.put(([(TASK, (result._job, None,                        func, args, kwds))], None))
    

    *nix 就是:这里建立了job,并且发送。就是通过 put_message(job) 往子进程 pipe发消息。

    def send_job(tup):
        # Schedule writing job request for when one of the process
        # inqueues are writable.
        body = dumps(tup, protocol=protocol)
        body_size = len(body)
        header = pack('>I', body_size)
        # index 1,0 is the job ID.
        job = get_job(tup[1][0])
        job._payload = buf_t(header), buf_t(body), body_size
        put_message(job)
        
    self._quick_put = send_job
    

    此时逻辑为:

                               +
        Consumer               |
                       message |
                               v         strategy  +------------------------------------+
                  +------------+------+            | strategies                         |
                  | on_task_received  | <--------+ |                                    |
                  |                   |            |[myTest.add : task_message_handler] |
                  +------------+------+            +------------------------------------+
                               |
                               |
       +------------------------------------------------------------------------------------+
       strategy                |
                               |
                               |
                               v                Request [myTest.add]
                  +------------+-------------+                       +---------------------+
                  | task_message_handler     | <-------------------+ | create_request_cls  |
                  |                          |                       |                     |
                  +------------+-------------+                       +---------------------+
                               | _process_task_sem
                               |
      +------------------------------------------------------------------------------------+
       Worker                  | req[{Request} myTest.add]
                               v
                      +--------+-----------+
                      | WorkController     |
                      |                    |
                      |            pool +-------------------------+
                      +--------+-----------+                      |
                               |                                  |
                               |               apply_async        v
                   +-----------+----------+                   +---+-------------------+
                   |{Request} myTest.add  | +---------------> | TaskPool              |
                   +----------------------+                   +----+------------------+
                                              myTest.add           |
                                                                   |
    +--------------------------------------------------------------------------------------+
                                                                   |
                                                                   v
                                                              +----+------------------+
                                                              | billiard.pool.Pool    |
                                                              +-------+---------------+
                                                                      |
                                                                      |
     Pool              +---------------------------+                  |
                       | TaskHandler               |                  |
                       |                           |                  |  self._taskqueue.put
                       |              _taskqueue   |  <---------------+
                       |                           |
                       +------------+--------------+
                                    |
                                    |  put(task)
                                    |
    +--------------------------------------------------------------------------------------+
                                    |
     Sub process                    |
                                    v
                                                       
    
    

    手机如下:

    于是从下文开始,我们正式进入多进程是如何处理消息的。

    0xEE 本系列文章

    本系列目前文章如下:

    [源码分析] 消息队列 Kombu 之 mailbox

    [源码分析] 消息队列 Kombu 之 Hub

    [源码分析] 消息队列 Kombu 之 Consumer

    [源码分析] 消息队列 Kombu 之 Producer

    [源码分析] 消息队列 Kombu 之 启动过程

    [源码解析] 消息队列 Kombu 之 基本架构

    [源码解析] 并行分布式框架 Celery 之架构 (1)

    [源码解析] 并行分布式框架 Celery 之架构 (2)

    [源码解析] 并行分布式框架 Celery 之 worker 启动 (1)

    [源码解析] 并行分布式框架 Celery 之 worker 启动 (2)

    [源码解析] 分布式任务队列 Celery 之启动 Consumer

    [源码解析] 并行分布式任务队列 Celery 之 Task是什么

    0xFF 参考

    1: Worker 启动流程概述

    2: Worker 的执行引擎

    3: Task 对象的实现

    4: 定时任务的实现

    5: 远程控制管理

    6: Events 的实现

    7: Worker 之间的交互

    Celery 源码解析一:Worker 启动流程概述

    Celery 源码解析二:Worker 的执行引擎

    Celery 源码解析三:Task 对象的实现

    Celery 源码解析四:定时任务的实现

    Celery 源码解析五:远程控制管理

    Celery 源码解析六:Events 的实现

  • 相关阅读:
    (网页)中的简单的遮罩层
    (后端)shiro:Wildcard string cannot be null or empty. Make sure permission strings are properly formatted.
    (网页)jQuery的时间datetime控件在AngularJs中使用实例
    Maven Myeclipse 搭建项目
    MyBatis 环境搭建 (一)
    java 常用方法
    XML 基础
    JS BOM
    js 事件
    js 的使用原则
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/14639556.html
Copyright © 2011-2022 走看看