zoukankan      html  css  js  c++  java
  • [源码分析] 消息队列 Kombu 之 Consumer

    [源码分析] 消息队列 Kombu 之 Consumer

    0x00 摘要

    本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 中的 Consumer 概念。

    0x01 综述功能

    Consumer 的作用主要如下:

    • Exchange:MQ 路由,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列。
    • Queue:对应的队列抽象,存储着即将被应用消费掉的消息,Exchange负责将消息分发Queue,消费者从Queue接收消息;
    • Consumers : 是接受消息的抽象类,consumer需要声明一个queue,并将queue与指定的exchange绑定,然后从queue里面接收消息。就是说,从用户角度,知道了一个 exchange,就可以从中读取消息,具体这个消息就是从 queue 中读取的。

    在具体的实现中,Consumer 把 queue 与 channel 联系起来。queue 里面有一个 channel,用来访问redis。Queue 也有 Exchange,知道访问具体 redis 哪个key(就是queue对应的那个key)。即 Consumer 消费消息是通过 Queue 来消费,然后 Queue 又转嫁给 Channel。

    所以服务端的逻辑大致为:

    1. 建立连接;
    2. 创建Exchange ;
    3. 创建Queue,并将Exchange与Queue绑定,Queue的名称为routing_key ;
    4. 创建Consumer对Queue监听;

    0x02 示例代码

    下面使用如下代码来进行说明。

    本示例来自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感谢。

    def main(arguments):
        hub = Hub()
        exchange = Exchange('asynt_exchange')
        queue = Queue('asynt_queue', exchange, 'asynt_routing_key')
    
        def send_message(conn):
            producer = Producer(conn)
            producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')
            print('message sent')
    
        def on_message(message):
            print('received: {0!r}'.format(message.body))
            message.ack()
            # hub.stop()  # <-- exit after one message
    
        conn = Connection('redis://localhost:6379')
        conn.register_with_event_loop(hub)
    
        def p_message():
            print(' kombu ')
    
        with Consumer(conn, [queue], on_message=on_message):
            send_message(conn)
            hub.timer.call_repeatedly(3, p_message)
            hub.run_forever()
    
    if __name__ == '__main__':
        sys.exit(main(sys.argv[1:]))
    

    前文已经完成了构建部分,下面来到了Consumer部分,即如下代码:

    with Consumer(conn, [queue], on_message=on_message):
        send_message(conn)
        hub.timer.call_repeatedly(
            3, p_message
        )
        hub.run_forever()
    

    0x03 定义

    3.1 定义

    Consumer主要成员变量如下:

    • channel:存在 (kombu.Connection, Channel) 这两种可能,一个 Connection 就对应一个 MQ 的连接,Channel可以理解成共享一个Connection的多个轻量化连接。
    • queues:(Sequence[kombu.Queue])类型。对应 queue 抽象,存储着即将被应用消费掉的消息,Exchange负责将消息分发Queue,消费者从Queue接收消息
    • on_message:消息响应方法;

    这也是调用时传入的变量。

    class Consumer:
        """Message consumer.
    
        Arguments:
            channel (kombu.Connection, ChannelT): see :attr:`channel`.
            queues (Sequence[kombu.Queue]): see :attr:`queues`.
            no_ack (bool): see :attr:`no_ack`.
            auto_declare (bool): see :attr:`auto_declare`
            callbacks (Sequence[Callable]): see :attr:`callbacks`.
            on_message (Callable): See :attr:`on_message`
            on_decode_error (Callable): see :attr:`on_decode_error`.
            prefetch_count (int): see :attr:`prefetch_count`.
        """
        
        #: The connection/channel to use for this consumer.
        channel = None
    
        #: A single :class:`~kombu.Queue`, or a list of queues to
        #: consume from.
        queues = None
    
        #: Flag for automatic message acknowledgment.
        no_ack = None
    
        #: List of callbacks called in order when a message is received.
        callbacks = None
    
        #: Optional function called whenever a message is received.
        on_message = None
    
        #: List of accepted content-types.
        accept = None
    
        #: Initial prefetch count
        prefetch_count = None
    
        #: Mapping of queues we consume from.
        _queues = None
    
        _tags = count(1)   # global
    

    3.2 Queue

    我们也给出 Queue 的定义,其中主要成员变量如下:

    • exchange (Exchange): 就是 queue 绑定的 Exchange;
    • routing_key (str): 就是 queue 对应的 key;
    • channel :queue 绑定的 信道;

    具体定义如下:

    class Queue(MaybeChannelBound):
        """A Queue declaration.
    
            channel (ChannelT): The channel the Queue is bound to (if bound).
    
        """
    
        ContentDisallowed = ContentDisallowed
    
        name = ''
        exchange = Exchange('')
        routing_key = ''
    
        durable = True
        exclusive = False
        auto_delete = False
        no_ack = False
    
        attrs = (
            ('name', None),
            ('exchange', None),
            ('routing_key', None),
            ('queue_arguments', None),
            ('binding_arguments', None),
            ('consumer_arguments', None),
            ('durable', bool),
            ('exclusive', bool),
            ('auto_delete', bool),
            ('no_ack', None),
            ('alias', None),
            ('bindings', list),
            ('no_declare', bool),
            ('expires', float),
            ('message_ttl', float),
            ('max_length', int),
            ('max_length_bytes', int),
            ('max_priority', int)
        )
    

    0x04 Init

    在此方法中,先处理调用,随之建立联系。

    def __init__(self, channel, queues=None, no_ack=None, auto_declare=None,
                 callbacks=None, on_decode_error=None, on_message=None,
                 accept=None, prefetch_count=None, tag_prefix=None):
        self.channel = channel
        self.queues = maybe_list(queues or [])
        self.no_ack = self.no_ack if no_ack is None else no_ack
        self.callbacks = (self.callbacks or [] if callbacks is None
                          else callbacks)
        self.on_message = on_message
        self.tag_prefix = tag_prefix
        self._active_tags = {}
    
        self.accept = prepare_accept_content(accept)
        self.prefetch_count = prefetch_count
    
        if self.channel:
            self.revive(self.channel)
    

    4.1 处理调用

    4.1.1 queues

    传入的参数queues被作为成员变量保存起来。

    self.queues = maybe_list(queues or [])
    

    4.1.2 channel

    传入的参数Connection被作为成员变量保存起来。

    self.channel = channel
    

    4.1.3 on_message

    传入的参数on_message 作为消息响应方法保存起来。

    self.on_message = on_message
    

    4.2 建立联系

    用如下方法把 Exchange,Queue 与 Connection 联系起来

    def revive(self, channel):
        
        """Revive consumer after connection loss."""
        self._active_tags.clear()
        channel = self.channel = maybe_channel(channel)
        
        # modify dict size while iterating over it is not allowed
        for qname, queue in list(self._queues.items()):
            # name may have changed after declare
            self._queues.pop(qname, None)
            queue = self._queues[queue.name] = queue(self.channel)
            queue.revive(channel)
    
        if self.auto_declare:
            self.declare()
    
        if self.prefetch_count is not None:
            self.qos(prefetch_count=self.prefetch_count)
    

    进一步调用:

    when_bound, entity.py:598
    maybe_bind, abstract.py:76
    bind, abstract.py:70
    bind, entity.py:590
    __call__, abstract.py:66
    revive, messaging.py:400
    __init__, messaging.py:382
    main, testUb.py:46
    <module>, testUb.py:55
    

    由此进入到了Queue类。

    4.2.1 channel与queue

    这里用如下方法把queue与channel联系起来。queue 里面有一个 channel,用来访问redis,Queue 也有 Exchange,知道访问具体 redis 哪里

    每一个 Consumer 初始化的时候都是和 Channel 绑定的,也就是说我们 Consumer 包含了 Queue 也就和 Connection 关联起来了!

    Consumer 消费消息是通过 Queue 来消费,然后 Queue 又转嫁给 Channel

    channel = {Channel} <kombu.transport.redis.Channel object at 0x7f9056a57278>
    
    self = {Queue} <Queue asynt -> <Exchange asynt(direct) bound to chan:1> -> asynt bound to chan:1>
    

    这样,conneciton就是queue的成员变量。

    def revive(self, channel):
        """Revive channel after the connection has been re-established.
        """
        if self.is_bound:
            self._channel = channel
            self.when_bound()
    

    4.2.2 channel与exchange

    之前我们知道,Queue是包括了exchange成员变量,目前channel也是exchange的成员变量

    Exchange:交换机,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列。

    于是经由如下方法,准备把channel与exchange联系起来。

    def when_bound(self):
        if self.exchange:
            self.exchange = self.exchange(self.channel)
    

    此时变量如下:

    channel = {Channel} <kombu.transport.redis.Channel object at 0x7f9056a57278>
    
    self = {Exchange} Exchange asynt(direct)
    

    进而直接在Exchange基类,使用方法maybe_bind把channel与exchange联系起来。

    class MaybeChannelBound(Object):
        """Mixin for classes that can be bound to an AMQP channel."""
    
        _channel = None
    
        def __call__(self, channel):
            """`self(channel) -> self.bind(channel)`."""
            return self.bind(channel)
    
        def bind(self, channel):
            """Create copy of the instance that is bound to a channel."""
            return copy(self).maybe_bind(channel)
    
        def maybe_bind(self, channel):
            """Bind instance to channel if not already bound."""
            if not self.is_bound and channel:
                self._channel = maybe_channel(channel)
                self.when_bound()
                self._is_bound = True
            return self
    

    4.2.3 Exchange & Binding

    这里会把 Exchange 和 queue 联系。就是把 Exchange 和 routing_key 联系起来,然后把这些联系规则放到redis 之中

    堆栈如下:

    _queue_bind, redis.py:814
    queue_bind, base.py:568
    bind_to, entity.py:674
    queue_bind, entity.py:662
    _create_queue, entity.py:617
    declare, entity.py:606
    declare, messaging.py:417
    revive, messaging.py:404
    __init__, messaging.py:382
    

    具体为

    class Queue(MaybeChannelBound):
    
        def __init__(self, name='', exchange=None, routing_key='',
                     channel=None, bindings=None, on_declared=None,
                     **kwargs):
            super().__init__(**kwargs)
            self.name = name or self.name
            
            if isinstance(exchange, str):
                self.exchange = Exchange(exchange)
            elif isinstance(exchange, Exchange):
                self.exchange = exchange
                
            self.routing_key = routing_key or self.routing_key
            self.bindings = set(bindings or [])
            self.on_declared = on_declared
    
            # allows Queue('name', [binding(...), binding(...), ...])
            if isinstance(exchange, (list, tuple, set)):
                self.bindings |= set(exchange)
            if self.bindings:
                self.exchange = None
    
            # exclusive implies auto-delete.
            if self.exclusive:
                self.auto_delete = True
            self.maybe_bind(channel)
    
        def queue_bind(self, nowait=False, channel=None):
            """Create the queue binding on the server."""
            return self.bind_to(self.exchange, self.routing_key,
                                self.binding_arguments,
                                channel=channel, nowait=nowait)
    
        def bind_to(self, exchange='', routing_key='',
                    arguments=None, nowait=False, channel=None):
            if isinstance(exchange, Exchange):
                exchange = exchange.name
    
            return (channel or self.channel).queue_bind(
                queue=self.name,
                exchange=exchange,
                routing_key=routing_key,
                arguments=arguments,
                nowait=nowait,
            )
    
    4.2.3.1 Channel binding

    具体调用到Channel,代码位于 kombu/transport/redis.py。

    def _queue_bind(self, exchange, routing_key, pattern, queue):
        if self.typeof(exchange).type == 'fanout':
            # Mark exchange as fanout.
            self._fanout_queues[queue] = (
                exchange, routing_key.replace('#', '*'),
            )
        with self.conn_or_acquire() as client:
            client.sadd(self.keyprefix_queue % (exchange,),
                        self.sep.join([routing_key or '',
                                       pattern or '',
                                       queue or '']))
    

    代码然后调用到redis client。

    # SET COMMANDS
    def sadd(self, name, *values):
        "Add ``value(s)`` to set ``name``"
        return self.execute_command('SADD', name, *values)
    

    具体变量如下,我们代码中,exchange内容为_kombu.binding.asynt_exchange。routing_key的是asynt_routing_key。

    name = {str} '_kombu.binding.asynt_exchange'
    self = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
    values = {tuple: 1} asynt_routing_keysynt_queue
    

    我们看看Redis内容,发现新建内容如下:

    127.0.0.1:6379> smembers _kombu.binding.asynt_exchange
    1) "asynt_routing_keyx06x16x06x16asynt_queue"
    

    集合名字为:self.keyprefix_queue % (exchange,), 对于我们就为:_kombu.binding.asynt_exchange
    集合每个item为:routing_key + sep + pattern + sep + queue。我们这里sep = 'x06x16'。

    4.2.3.2 使用

    当发消息时候,Exchange的作用是将发送的 routing_key 转化为 queue 的名字。这样发送就知道发到哪个 queue 。这里的 exchange 内容为 _kombu.binding.asynt_exchange。

    def get_table(self, exchange):
        key = self.keyprefix_queue % exchange
        with self.conn_or_acquire() as client:
            values = client.smembers(key)
            if not values:
                raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key))
            return [tuple(bytes_to_str(val).split(self.sep)) for val in values]
    

    得到的集合内容为:

    {b'asynt_routing_keyx06x16x06x16asynt_queue'}

    即从 exchange 得到 routing_key ---> queue 的规则,然后再依据 routing_key 得到 queue。就知道 Consumer 和 Producer 需要依据哪个 queue 交换消息。

    逻辑如下:

                                      +---------------------------------+
                                      |         exchange                |
                                      |                                 |
                     1 routing_key x  |                                 |
    +----------+                      |                                 |      +------------+
    | Producer |  +-----------------> |   routing_key x --->  queue x   |      |  Consumer  |
    +--------+-+                      |                                 |      +------------+
             |                        |   routing_key y --->  queue y   |
             |                        |                                 |           ^
             |                        |   routing_key z --->  queue z   |           |
             |                        |                                 |           |
             |                        +---------------------------------+           |
             |                                                                      |
             |                                                                      |
             |                                                                      |
             |                                                                      |
             |                                                                      |
             |                                                                      |
             |                                                                      |
             |                                                                      |
             |                                  +-----------+                       |
             |        2 message                 |           |        3 message      |
             +------------------------------->  |  queue X  |  +--------------------+
                                                |           |
                                                +-----------+
    
    

    因此,此时总体逻辑如下图:

    +----------------------+               +-------------------+
    | Consumer             |               | Channel           |
    |                      |               |                   |        +-----------------------------------------------------------+
    |                      |               |    client  +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
    |      channel  +--------------------> |                   |        +-----------------------------------------------------------+
    |                      |               |    pool           |
    |                      |   +---------> |                   | <------------------------------------------------------------+
    |      queues          |   |           |                   |                                                              |
    |                      |   |    +----> |    connection +---------------+                                                  |
    |        |             |   |    |      |                   |           |                                                  |
    +----------------------+   |    |      +-------------------+           |                                                  |
             |                 |    |                                      v                                                  |
             |                 |    |      +-------------------+       +---+-----------------+       +--------------------+   |
             |                 |    |      | Connection        |       | redis.Transport     |       | MultiChannelPoller |   |
             |                 |    |      |                   |       |                     |       |                    |   |
             |                 |    |      |                   |       |                     |       |     _channels +--------+
             |                 |    |      |                   |       |        cycle +------------> |     _fd_to_chan    |
             |                 |    |      |     transport +---------> |                     |       |     _chan_to_sock  |
             |       +-------->+    |      |                   |       |                     |    +------+ poller         |
             |       |              |      +-------------------+       +---------------------+    |  |     after_read     |
             |       |              |                                                             |  |                    |
             |       |              |                                                             |  +--------------------+
             |       |              |      +------------------+                   +---------------+
             |       |              |      | Hub              |                   |
             |       |              |      |                  |                   v
             |       |              |      |                  |            +------+------+
             |       |              |      |      poller +---------------> | _poll       |
             |       |              |      |                  |            |             |         +-------+
             |       |              |      |                  |            |    _poller+---------> |  poll |
             v       |              |      +------------------+            |             |         +-------+
                     |              |                                      +-------------+
        +-------------------+       |      +----------------+
        | Queue      |      |       |      | Exchange       |
        |      _chann+l     |       +----+ |                |
        |                   |              |                |
        |      exchange +----------------> |     channel    |
        |                   |              |                |
        |                   |              |                |
        +-------------------+              +----------------+
    
    

    手机如下:

    现在我们知道:

    • Consumers:接受消息的抽象类,consumer需要声明一个queue,并将queue与指定的exchange绑定,然后从queue里面接收消息。
    • Exchange:MQ 路由,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列。
    • Queue:对应的 queue 抽象,存储着即将被应用消费掉的消息,Exchange负责将消息分发Queue,消费者从Queue接收消息;
    • Channel:与AMQP中概念类似,可以理解成共享一个Connection的多个轻量化连;

    于是逻辑链已经形成,大约是这样的,后文完善:

    • Producer发送消息到Exchange;
    • Exchange中有成员变量Channel,也有成员变量Queues。
    • 于是Exchange负责通过Channel将消息分发至Queue,Exchange的作用只是将发送的 routing_key 转化为 queue 的名字。
    • Consumer去Queue取消息;

    逻辑大致通了,但是缺少动态操作完成此逻辑,我们将在后续完善动态逻辑。

    0x05 完善联系

    在init之后,第二步会完善联系。

    python的上下文管理。在python中实现了__enter__和__exit__方法,即支持上下文管理器协议。上下文管理器就是支持上下文管理器协议的对象,它是为了with而生。当with语句在开始运行时,会在上下文管理器对象上调用 enter 方法。with语句运行结束后,会在上下文管理器对象上调用 exit 方法。

    所以这里是调用__enter__,即 consumer 函数,其目的如下:

    • 调用Channel继续处理,ChannelConsumer标签,Consumer要消费的队列,以及标签与队列的映射关系都记录下来,等待循环调用。
    • 另外,还通过Transport将队列与回调函数列表的映射关系记录下来,以便于从队列中取出消息后执行回调函数。
    class Consumer:
        """Message consumer.
    
        Arguments:
            channel (kombu.Connection, ChannelT): see :attr:`channel`.
            queues (Sequence[kombu.Queue]): see :attr:`queues`.
            no_ack (bool): see :attr:`no_ack`.
            auto_declare (bool): see :attr:`auto_declare`
            callbacks (Sequence[Callable]): see :attr:`callbacks`.
            on_message (Callable): See :attr:`on_message`
            on_decode_error (Callable): see :attr:`on_decode_error`.
            prefetch_count (int): see :attr:`prefetch_count`.
        """
    
        def __enter__(self):
            self.consume()
            return self
    

    5.1 遍历Queue

    使用_basic_consume方法处理Consumer相关的队列列表中的每一项,其中处理最后一个Queue时设置标志nowait=False

    def consume(self, no_ack=None):
        """Start consuming messages.
    
        Can be called multiple times, but note that while it
        will consume from new queues added since the last call,
        it will not cancel consuming from removed queues (
        use :meth:`cancel_by_queue`).
    
        Arguments:
            no_ack (bool): See :attr:`no_ack`.
        """
        queues = list(self._queues.values())
        if queues:
            no_ack = self.no_ack if no_ack is None else no_ack
    
            H, T = queues[:-1], queues[-1]
            for queue in H:
                self._basic_consume(queue, no_ack=no_ack, nowait=True)
            self._basic_consume(T, no_ack=no_ack, nowait=False)
    

    _basic_consume方法代码如下:

    是将消费者标签以及回调函数传给Queueconsume方法。

    def _basic_consume(self, queue, consumer_tag=None,
                       no_ack=no_ack, nowait=True):
        tag = self._active_tags.get(queue.name)
        if tag is None:
            tag = self._add_tag(queue, consumer_tag)
            queue.consume(tag, self._receive_callback,
                          no_ack=no_ack, nowait=nowait)
        return tag
    

    5.2 consume in Queue

    对于每一个 queue,都会调用其 consume 函数。

    Queueconsume方法代码:

    class Queue(MaybeChannelBound):
    
      def consume(self, consumer_tag='', callback=None,
                  no_ack=None, nowait=False):
          """Start a queue consumer.
    
          Consumers last as long as the channel they were created on, or
          until the client cancels them.
    
          Arguments:
              consumer_tag (str): Unique identifier for the consumer.
                  The consumer tag is local to a connection, so two clients
                  can use the same consumer tags. If this field is empty
                  the server will generate a unique tag.
    
              no_ack (bool): If enabled the broker will automatically
                  ack messages.
    
              nowait (bool): Do not wait for a reply.
    
              callback (Callable): callback called for each delivered message.
          """
          if no_ack is None:
              no_ack = self.no_ack
          return self.channel.basic_consume(
              queue=self.name,
              no_ack=no_ack,
              consumer_tag=consumer_tag or '',
              callback=callback,
              nowait=nowait,
              arguments=self.consumer_arguments)
    

    前面提到,queue与channel已经联系了起来。

    每一个 Consumer 初始化的时候都是和 Channel 绑定的,也就是说我们 Consumer 包含了 Queue 也就和 Connection 关联起来了!

    Consumer 消费消息是通过 Queue 来消费,然后 Queue 又转嫁给 Channel。

    5.3 consume in Channel

    因此又回到了Channel,就是Channelbasic_consume代码:

    调用到基类basic_consume方法。

    class Channel(virtual.Channel):
    
        def basic_consume(self, queue, *args, **kwargs):
            
            if queue in self._fanout_queues:
                exchange, _ = self._fanout_queues[queue]
                self.active_fanout_queues.add(queue)
                self._fanout_to_queue[exchange] = queue
                
            ret = super().basic_consume(queue, *args, **kwargs)
    
            # Update fair cycle between queues.
            #
            # We cycle between queues fairly to make sure that
            # each queue is equally likely to be consumed from,
            # so that a very busy queue will not block others.
            #
            # This works by using Redis's `BRPOP` command and
            # by rotating the most recently used queue to the
            # and of the list.  See Kombu github issue #166 for
            # more discussion of this method.
            self._update_queue_cycle()
            return ret
    

    基类是 virtual.Channel,其作用是:

    ChannelConsumer标签,Consumer要消费的队列,以及标签与队列的映射关系都记录下来,等待循环调用。另外,还通过Transport将队列与回调函数列表的映射关系记录下来,以便于从队列中取出消息后执行回调函数。

    变量是:

    • _tag_to_queue:标签与队列的映射关系;
    • _active_queues:Consumer要消费的队列;
    • _consumers:Consumer标签;
    • connection:Transport
    • connection._callbacks:队列与回调函数列表的映射关系;

    数值如下:

    self._tag_to_queue = {dict: 1} {'None1': 'asynt'}
    self._active_queues = {list: 1} ['asynt']
    self._consumers = {set: 1} {'None1'}
    self.connection = {Transport} <kombu.transport.redis.Transport object at 0x7fb3ee0155f8>
    self.connection._callbacks = {dict: 1} {'asynt': <function Channel.basic_consume.<locals>._callback at 0x7fb3ecd4a2f0>}
    

    代码如下:

    def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
        """Consume from `queue`."""
        self._tag_to_queue[consumer_tag] = queue
        self._active_queues.append(queue)
    
        def _callback(raw_message):
            message = self.Message(raw_message, channel=self)
            if not no_ack:
                self.qos.append(message, message.delivery_tag)
            return callback(message)
    
        self.connection._callbacks[queue] = _callback
        self._consumers.add(consumer_tag)
    
        self._reset_cycle()
    

    _reset_cycle 代码如下,看起来就是调用了 FairCycle,实际上没有用到,因为cycle已经有预设。cycle是一个MultiChannelPoller实例。

    def _reset_cycle(self):
        self._cycle = FairCycle(
            self._get_and_deliver, self._active_queues, Empty)
    

    具体如下图:

    +----------+    +-------+     +---------+
    | Consumer |    | Queue |     | Channel |
    +----+-----+    +---+---+     +-----+---+
         |              |               |
         |              |               |
    __enter__           |               |
         |              |               |
         |              |               |
     consume            |               |
         |              |               |
         |              |               |
    _basic_consume      |               |
         |              |               |
         |              |               |
         |   consume    |               |
         +------------> |               |
         |              | basic_consume |
         |              |               |
         |              | +-----------> |
         |              |               |
         |              |               |
         |              |          _reset_cycle
         |              |               |
         |              |               |
         |              |               |
         |              |               |
         |              |               |
         v              v               v
    
    

    0x06 消费消息

    为了更好的分析,我们暂时注销hub,使用drain_events消费消息,这样更直观。

    就是说,Consumer 已经和 Channel 联系起来,知道读取redis 中的哪个key。但是现在缺少一个读取消息的引擎。这个引擎可以驱动消息读取,每次有消息,就调用 consumer 中的回调函数来处理消息

    在没有引擎的情况下,drain_events 就可以起到引擎的作用

    with Consumer(conn, [queue], on_message=on_message):
        send_message(conn)
        # hub.timer.call_repeatedly(3, p_message)
        # hub.run_forever()
        conn.drain_events(timeout=1)
    

    6.1 drain_events in Connection

    drain_events 调用 Connection 的方法来进行消费。

    def drain_events(self, **kwargs):
        """Wait for a single event from the server.
    
        Arguments:
            timeout (float): Timeout in seconds before we give up.
        """
        return self.transport.drain_events(self.connection, **kwargs)
    

    6.2 drain_events in Transport

    在 Transport中的drain_events ,是在无限执行get(self._deliver, timeout=timeout)

    getself.cycle的一个方法,cycle是一个MultiChannelPoller实例:

    所以get<bound method MultiChannelPoller.get of <kombu.transport.redis.MultiChannelPoller object at 0x7feab312b358>>

    def drain_events(self, connection, timeout=None):
        time_start = monotonic()
        get = self.cycle.get
        polling_interval = self.polling_interval
        if timeout and polling_interval and polling_interval > timeout:
            polling_interval = timeout
        while 1:
            try:
                get(self._deliver, timeout=timeout)
            except Empty:
                if timeout is not None and monotonic() - time_start >= timeout:
                    raise socket.timeout()
                if polling_interval is not None:
                    sleep(polling_interval)
            else:
                break
    

    6.3 get in MultiChannelPoller

    Transport相关联的每一个channel都要执行drain_events。具体分两步:

    • 对于每一个channel都注册;

    • 进行poll;

    代码如下:

    def get(self, callback, timeout=None):
        self._in_protected_read = True
        try:
            for channel in self._channels:
                if channel.active_queues:           # BRPOP mode?
                    if channel.qos.can_consume():
                        self._register_BRPOP(channel)
                if channel.active_fanout_queues:    # LISTEN mode?
                    self._register_LISTEN(channel)
    
            events = self.poller.poll(timeout)
            if events:
                for fileno, event in events:
                    ret = self.handle_event(fileno, event)
                    if ret:
                        return
            # - no new data, so try to restore messages.
            # - reset active redis commands.
            self.maybe_restore_messages()
            raise Empty()
        finally:
            self._in_protected_read = False
            while self.after_read:
                try:
                    fun = self.after_read.pop()
                except KeyError:
                    break
                else:
                    fun()
    

    6.3.1 _register_BRPOP in MultiChannelPoller

    具体注册如下,我们先来看看 _register_BRPOP,这里做了两个判断,第一个是判断当前的 channel 是否放进了 epoll 模型里面,如果没有,那么就放进去;同时,如果之前这个 channel 不在 epoll 里面,那么这次放进去了。

    def _register_BRPOP(self, channel):
        """Enable BRPOP mode for channel."""
        ident = channel, channel.client, 'BRPOP'
        if not self._client_registered(channel, channel.client, 'BRPOP'):
            channel._in_poll = False
            self._register(*ident)
        if not channel._in_poll:  # send BRPOP
            channel._brpop_start()
    

    6.3.2 register in _poll

    最终进行Poll注册,这样当redis的socket对应的fd有消息,就会进行处理。

    变量如下:<kombu.utils.eventio._poll object at 0x7feab2d7d780>

    def register(self, fd, events):
        fd = fileno(fd)
        poll_flags = 0
        if events & ERR:
            poll_flags |= POLLERR
        if events & WRITE:
            poll_flags |= POLLOUT
        if events & READ:
            poll_flags |= POLLIN
        self._quick_register(fd, poll_flags)
        return fd
    

    6.3.3 poll(timeout) in MultiChannelPoller

    当poll有消息,则相应处理。

    events = self.poller.poll(timeout)
    if events:
        for fileno, event in events:
            ret = self.handle_event(fileno, event)
            if ret:
                return
    

    6.3.4 注册到redis驱动,负载均衡

    但是,这个 connection 还没有对 epoll 起效果,所以发送一个 _brpop_start

    这里可以看到,是对 asynt_queue 发起了监听请求,也就是说队列有消息过来,会被响应到。

    变量如下:

    keys = {list: 5} ['asynt_queue', 'asynt_queuex06x163', 'asynt_queuex06x166', 'asynt_queuex06x169', 1]
    queues = {list: 1} ['asynt_queue']
    

    代码如下:

    def _brpop_start(self, timeout=1):
        queues = self._queue_cycle.consume(len(self.active_queues))
        if not queues:
            return
        keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps
                for queue in queues] + [timeout or 0]
        self._in_poll = self.client.connection
        self.client.connection.send_command('BRPOP', *keys)
    

    此处有一个负载均衡需要说明:

    _queue_cycle属于均衡策略,就是选择下一次哪个queue的策略,items就是具体queue列表。比如:

    class round_robin_cycle:
        """Iterator that cycles between items in round-robin."""
    
        def __init__(self, it=None):
            self.items = it if it is not None else []
    
        def update(self, it):
            """Update items from iterable."""
            self.items[:] = it
    
        def consume(self, n):
            """Consume n items."""
            return self.items[:n]
    

    _brpop_start就是启动了下一次读取,选择哪一个queue

    consume, scheduling.py:79
    _brpop_start, redis.py:725
    _register_BRPOP, redis.py:314
    on_poll_start, redis.py:328
    on_poll_start, redis.py:1072
    create_loop, hub.py:294
    run_once, hub.py:193
    run_forever, hub.py:185
    main, testUb.py:49
    <module>, testUb.py:53
    

    6.3.4 handle_event in MultiChannelPoller

    因为已经把 file 和 poll 联系起来,所以对调用对应的响应方法,而响应方法会进行read消息。

    def handle_event(self, fileno, event):
        if event & READ:
            return self.on_readable(fileno), self
        elif event & ERR:
            chan, type = self._fd_to_chan[fileno]
            chan._poll_error(type)
    

    6.3.5 on_readable in MultiChannelPoller

    这里听说 Redis 已经准备好了,所以就来获取拿到的结果,然后就解析起来了,解析成功之后,自然要处理这个消息呀,于是乎又回到了这里 redis.py

    提取fd对应的channel的响应方法如下:

    def on_readable(self, fileno):
        chan, type = self._fd_to_chan[fileno]
        if chan.qos.can_consume():
            chan.handlers[type]()
    

    6.3.6 _brpop_read in Channel

    前面对chan.handlers已经进行了注册。

    handlers = {dict: 2} 
     'BRPOP' = {method} <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7fbad4170f28>>
     'LISTEN' = {method} <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7fbad4170f28>>
    

    因此调用_brpop_read。

    def _brpop_read(self, **options):
        try:
            try:
                dest__item = self.client.parse_response(self.client.connection,
                                                        'BRPOP',
                                                        **options)
            except self.connection_errors:
                # if there's a ConnectionError, disconnect so the next
                # iteration will reconnect automatically.
                self.client.connection.disconnect()
                raise
            if dest__item:
                dest, item = dest__item
                dest = bytes_to_str(dest).rsplit(self.sep, 1)[0]
                self._queue_cycle.rotate(dest)
                self.connection._deliver(loads(bytes_to_str(item)), dest)
                return True
            else:
                raise Empty()
        finally:
            self._in_poll = None
    

    6.3.7 从redis读取

    这里会从redis驱动读取,文件/redis/connection.py,从SocketBuffer读取。

    代码为:

    def readline(self):
        buf = self._buffer
        buf.seek(self.bytes_read)
        data = buf.readline()
        while not data.endswith(SYM_CRLF):
            # there's more data in the socket that we need
            self._read_from_socket()
            buf.seek(self.bytes_read)
            data = buf.readline()
    
        self.bytes_read += len(data)
    
        # purge the buffer when we've consumed it all so it doesn't
        # grow forever
        if self.bytes_read == self.bytes_written:
            self.purge()
    
        return data[:-2]
    

    当读到 response 之后,调用 Redis驱动中对应命令的 回调方法来处理。此处命令为BRPOP。回调方法为:string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None)

    代码为:

    def parse_response(self, connection, command_name, **options):
        "Parses a response from the Redis server"
        try:
            response = connection.read_response()
        except ResponseError:
            if EMPTY_RESPONSE in options:
                return options[EMPTY_RESPONSE]
            raise
        if command_name in self.response_callbacks:
            return self.response_callbacks[command_name](response, **options)
        return response
    

    变量为:

    command_name = {str} 'BRPOP'
    connection = {Connection} Connection<host=localhost,port=6379,db=0>
    options = {dict: 0} {}
    self = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
     connection = {Connection} Connection<host=localhost,port=6379,db=0>
     connection_pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
     response_callbacks = {CaseInsensitiveDict: 179} {.
      'LPUSH' = {function} <function Redis.<lambda> at 0x7fbad4276ea0>
      'RPUSH' = {function} <function Redis.<lambda> at 0x7fbad4276ea0>
      'SORT' = {function} <function sort_return_tuples at 0x7fbad4275f28>
      'ZSCORE' = {function} <function float_or_none at 0x7fbad4276598>
      'ZINCRBY' = {function} <function float_or_none at 0x7fbad4276598>
      'BLPOP' = {function} <function Redis.<lambda> at 0x7fbad4276f28>
      'BRPOP' = {function} <function Redis.<lambda> at 0x7fbad4276f28>
       ....
    

    这些代码堆栈如下:

    readline, connection.py:251
    read_response, connection.py:324
    read_response, connection.py:739
    parse_response, client.py:915
    _brpop_read, redis.py:738
    on_readable, redis.py:358
    handle_event, redis.py:362
    get, redis.py:380
    drain_events, base.py:960
    drain_events, connection.py:318
    main, testUb.py:50
    <module>, testUb.py:53
    

    6.3.8 回到_brpop_read

    从Redis驱动获得消息后,回到了 _brpop_read,信息如下:

    dest__item = {tuple: 2} 
     0 = {bytes: 11} b'asynt_queue'
     1 = {bytes: 321} b'{"body": "aGVsbG8gd29ybGQ=", "content-encoding": "utf-8", "content-type": "text/plain", "headers": {}, "properties": {"delivery_mode": 2, "delivery_info": {"exchange": "asynt_exchange", "routing_key": "asynt_routing_key"}, "priority": 0, "body_encoding":
    

    6.3.9 _deliver in Transport

    当获得消息之后,会取出对应queue的callback,进行调用。

    变量如下:<kombu.transport.redis.Transport object at 0x7feab30f25c0>

    def _deliver(self, message, queue):
        try:
            callback = self._callbacks[queue]
        except KeyError:
            logger.warning(W_NO_CONSUMERS, queue)
            self._reject_inbound_message(message)
        else:
            callback(message)
    

    6.3.10 basic_consume in Channel

    代码继续走到 basic_consume

    <kombu.transport.redis.Channel object at 0x7feab3235f28>

    def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
        """Consume from `queue`."""
        self._tag_to_queue[consumer_tag] = queue
        self._active_queues.append(queue)
    
        def _callback(raw_message):
            message = self.Message(raw_message, channel=self)
            if not no_ack:
                self.qos.append(message, message.delivery_tag)
            return callback(message)
    
        self.connection._callbacks[queue] = _callback
        self._consumers.add(consumer_tag)
    
        self._reset_cycle()
    

    6.3.11 _receive_callback in Consumer

    上文的 _callback 就是 _receive_callback in Consumer,所以这时候就调用过去。

    <Consumer: [<Queue asynt -> <Exchange asynt(direct) bound to chan:1> -> asynt bound to chan:1>]>

    def _receive_callback(self, message):
        accept = self.accept
        on_m, channel, decoded = self.on_message, self.channel, None
        try:
            m2p = getattr(channel, 'message_to_python', None)
            if m2p:
                message = m2p(message)
            if accept is not None:
                message.accept = accept
            if message.errors:
                return message._reraise_error(self.on_decode_error)
            decoded = None if on_m else message.decode()
        except Exception as exc:
            if not self.on_decode_error:
                raise
            self.on_decode_error(message, exc)
        else:
            return on_m(message) if on_m else self.receive(decoded, message)
    

    最终调用用户方法。

    on_message, testUb.py:36
    _receive_callback, messaging.py:620
    _callback, base.py:630
    _deliver, base.py:980
    _brpop_read, redis.py:748
    on_readable, redis.py:358
    handle_event, redis.py:362
    get, redis.py:380
    drain_events, base.py:960
    drain_events, connection.py:318
    main, testUb.py:50
    <module>, testUb.py:53
    

    具体如下:

    +----------+ +---------+ +------------------+    +------+ +---------+    +-----+  +---------+
    |Connection| |Transport| |MultiChannelPoller|    |_poll | | Channel |    |redis|  |Consumer |
    +----+-----+ +------+--+ +------------+-----+    +----+-+ +-----+---+    +--+--+  +---+-----+
         |              |                 |               |         |           |         |
         +              |                 |               |         |           |         |
      drain_events      |                 |               |         |           |         |
         +              +                 |               |         |           |         |
         +------->  drain_events          |               |         |           |         |
         |              +                 +               |         |           |         |
         |              | +------------> get              |         |           |         |
         |              |                 +               |         |           |         |
         |              |                 +               |         |           |         |
         |              |              _register_BRPOP    |         |           |         |
         |              |                 +               +         |           |         |
         |              |                 | +-----------> register  |           |         |
         |              |                 |               +         |           |         |
         |              |                 +               |         |           |         |
         |              |               poll              |         |           |         |
         |              |                 +               |         |           |         |
         |              |                 +               |         |           |         |
         |              |             handle_event        |         |           |         |
         |              |                 +               |         |           |         |
         |              |                 +               |         |           |         |
         |              |            on_readable          |         |           |         |
         |              |                 +               |         +           |         |
         |              |                 |  +----------------->_brpop_read     |         |
         |              |                 |               |         +           |         |
         |              +                 |               |         +---------> |         |
         |          _deliver  <-------------------------------------+           |         |
         |              +                 |               |         |           |         |
         |              |                 |               |         |           |         |
         |              |                 |               |         |           |         |
         |              | +----------------------------------> basic|consume    |         |
         |              |                 |               |         |           |         |
         |              |                 |               |         +---------> |         |
         |              |                 |               |         |           |         |
         |              |                 |               |         |           |         |
         |              |                 |               |         |           v         |
         |              |                 |               |         |                     |
         |              |                 |               |         |          _receive_ca|lback
         |              |                 |               |         |                     |
         v              v                 v               v         v                     |
                                                                                          v
    
    

    从上图可以看出模块的用途。

    手机上如图

    至此,我们已经完成了 Consumer 的分析,下文我们进行 Producer 的分析。

    0xFF 参考

    celery 7 优秀开源项目kombu源码分析之registry和entrypoint

    (二)放弃pika,选择kombu

    kombu消息框架<二>

    AMQP中的概念

    AMQP的基本概念

    深入理解AMQP协议

    kombu和消息队列总结

    关于epoll版服务器的理解(Python实现)

    celery源码解读

    Kombu源码分析(一)概述

  • 相关阅读:
    [JSOI2012][bzoj4332] 分零食 [FFT]
    [MUTC2013][bzoj3513] idiots [FFT]
    [bzoj4259][bzoj4503] 残缺的字符串 [FFT]
    [bzoj3160] 万径人踪灭 [FFT+manacher]
    [AHOI2017/HNOI2017][bzoj4827] 礼物 [FFT]
    [ZJOI2014][bzoj3527]力 [FFT]
    [CQOI2012][bzoj2668] 交换棋子 [费用流]
    [CQOI2014][bzoj3504] 危桥 [最大流]
    [ZJOI2011][bzoj2229] 最小割 [最小割树]
    移动游戏ui设计(一)
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/14455093.html
Copyright © 2011-2022 走看看