zoukankan      html  css  js  c++  java
  • [源码分析] 消息队列 Kombu 之 mailbox

    [源码分析] 消息队列 Kombu 之 mailbox

    0x00 摘要

    本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 中的 mailbox 概念,顺便可以把之前几篇文章内容再次梳理下。

    0x01 示例代码

    本文实例代码来自 https://liqiang.io/post/celery-source-analysis-remote-manager-control,深表感谢。

    示例代码分为两部分˛

    Node可以理解为广播Consumer。Client可以认为是广播发起者。

    1.1 Node

    import sys
    import kombu
    from kombu import pidbox
    
    hostname = "localhost"
    connection = kombu.Connection('redis://localhost:6379')
    mailbox = pidbox.Mailbox("testMailbox", type="direct")
    node = mailbox.Node(hostname, state={"a": "b"})
    node.channel = connection.channel()
    
    def callback(body, message):
        print(body)
        print(message)
    
    def main(arguments):
        consumer = node.listen(callback=callback)
        try:
            while True:
                print('Consumer Waiting')
                connection.drain_events()
        finally:
            consumer.cancel()
    
    if __name__ == '__main__':
        sys.exit(main(sys.argv[1:]))
    

    1.2 client

    import sys
    import kombu
    from kombu import pidbox
    
    def callback():
        print("callback")
    
    def main(arguments):
        connection = kombu.Connection('redis://localhost:6379')
        mailbox = pidbox.Mailbox("testMailbox", type="direct")
        bound = mailbox(connection)
        bound._broadcast("print_msg", {'msg': 'Message for you'})
    
    if __name__ == '__main__':
        sys.exit(main(sys.argv[1:]))
    

    0x02 核心思路

    广播功能是利用了Redis的 pubSub 机制完成。

    2.1 Redis PubSub

    为了支持消息多播,Redis单独使用了一个模块来支持消息多播,也就是PubSub。

    Redis作为消息发布和订阅之间的服务器,起到桥梁的作用,在Redis里面有一个channel的概念,也就是频道,发布者通过指定发布到某个频道,只要有订阅者订阅了该频道,该消息就会发送给订阅者。

    消费者可以启动多个,PubSub会保证它们收到的都是相同的消息序列。

    2.2 概述

    在 Kombu 的 mailbox 实现中,分为 Consumer 和 Producer两部分。

    Consumer 模块,在 Kombu 的 Channel 类中,当注册 listener 时候,实际就是利用了 redis 驱动的 PubSub功能,把 consumer 注册订阅到了一个 key 上。从而 Consumer 的 queue 和 回调函数 就通过 Channel 与 redis 联系起来。这样后续就可以从 Redis 读取消息。

    psubscribe, client.py:3542
    _subscribe, redis.py:664
    _register_LISTEN, redis.py:322
    get, redis.py:375
    drain_events, base.py:960
    drain_events, connection.py:318
    main, node.py:24
    <module>, node.py:29
    

    0x03 Consumer

    下面我们就依据示例代码,一步一步剖析如何完成广播功能。

    3.1 建立Connection

    当完成以下代码之后,系统建立了Connection。

    connection = kombu.Connection('redis://localhost:6379')
    

    具体如下图,我们把问题域分为用户领域和Kombu领域两部分,以便大家理解:

    user scope                 +            kombu scope
                               |
                               |
    +------------+             |            +--------------------------------------+
    | connection | +----------------------> | Connection: redis://localhost:6379// |
    +------------+             |            +--------------------------------------+
                               |
                               |
                               |
                               |
                               |
                               +
    

    3.2 建立mailbox

    当完成以下代码之后,系统建立了Connection和mailbox。

    connection = kombu.Connection('redis://localhost:6379')
    mailbox = pidbox.Mailbox("testMailbox", type="fanout")
    

    但是此时两者没有建立联系,而mailbox的某些成员变量也没有实际含义。

    mailbox变量举例如下:

    mailbox = {Mailbox} <kombu.pidbox.Mailbox object at 0x7fea4b81df28>
     accept = {list: 1} ['json']
     clock = {LamportClock} 0
     connection = {NoneType} None
     exchange = {Exchange} Exchange testMailbox.pidbox(fanout)
     exchange_fmt = {str} '%s.pidbox'
     namespace = {str} 'testMailbox'
     node_cls = {type} <class 'kombu.pidbox.Node'>
     oid = {str} '9386a23b-ae96-3c6c-b036-ae7646455ebb'
     producer_pool = {NoneType} None
     queue_expires = {NoneType} None
     queue_ttl = {NoneType} None
     reply_exchange = {Exchange} Exchange reply.testMailbox.pidbox(direct)
     reply_exchange_fmt = {str} 'reply.%s.pidbox'
     reply_queue = {Queue} <unbound Queue 9386a23b-ae96-3c6c-b036-ae7646455ebb.reply.testMailbox.pidbox -> <unbound Exchange reply.testMailbox.pidbox(direct)> -> 9386a23b-ae96-3c6c-b036-ae7646455ebb>
     reply_queue_expires = {float} 10.0
     reply_queue_ttl = {NoneType} None
     serializer = {NoneType} None
     type = {str} 'fanout'
     unclaimed = {defaultdict: 0} defaultdict(<class 'collections.deque'>, {})
    

    此时逻辑如下:

    user scope        +      kombu scope
                      |
                      |
    +------------+    |       +--------------------------------------+
    | Connection|-----------> | Connection: redis://localhost:6379// |
    +------------+    |       +--------------------------------------+
                      |
                      |
                      |                                           +----------------------------+
                      |                                           | Exchange                   |
                      |       +--------------------------+   +--> |                            |
    +---------+       |       | Mailbox                  |   |    | testMailbox.pidbox(fanout) |
    | mailbox|--------------> |                          |   |    +----------------------------+
    +---------+       |       |                          |   |
                      |       |        exchange  +-----------+    +---------------------------------+
                      |       |                          |        | Exchange                        |
                      |       |        reply_exchange +-------->  |                                 |
                      |       |                          |        | reply.testMailbox.pidbox(direct)|
                      |       |        reply_queue +---------+    +-------------------+-------------+
                      |       |                          |   |                        ^
                      |       |                          |   |    +--------+          |
                      |       +--------------------------+   +--> | Queue  +----------+
                      |                                           +--------+
                      |
                      +
    
    

    手机如下:

    3.3 建立Node

    当完成以下代码之后,系统建立了Connection,mailbox和node。

    Node是mailbox中的概念,可以理解为是具体的邮箱。

    connection = kombu.Connection('redis://localhost:6379')
    mailbox = pidbox.Mailbox("testMailbox", type="fanout")
    node = mailbox.Node(hostname, state={"a": "b"})
    
    

    node变量举例如下:

    node = {Node} <kombu.pidbox.Node object at 0x7fea4b8bffd0>
     channel = {NoneType} None
     handlers = {dict: 0} {}
     hostname = {str} 'localhost'
     mailbox = {Mailbox} <kombu.pidbox.Mailbox object at 0x7fea4b81df28>
     state = {dict: 1} {'a': 'b'}
    
    

    逻辑如下图:

    user scope        +
                      |
                      |
    +------------+    |       +--------------------------------------+
    | Connection|-----------> | Connection: redis://localhost:6379// |
    +------------+    |       +--------------------------------------+
                      |
                      |
                      |                                               +----------------------------+
                      |                                               | Exchange                   |
                      |       +------------------------------+   +--> |                            |
    +---------+       |       | Mailbox                      |   |    | testMailbox.pidbox(fanout) |
    | mailbox|--------------> |                              |   |    +----------------------------+
    +---------+       |       |                              |   |
                      |       |        exchange  +---------------+    +---------------------------------+
                      |       |                              |        | Exchange                        |
                      |       |        reply_exchange +------------>  |                                 |
                      |       |                              |        | reply.testMailbox.pidbox(direct)|
                      |       |        reply_queue +-------------+    +-------------------+-------------+
                      |       |                              |   |                        ^
                      |       |                              |   |    +--------+          |
                      |       +-------------------------+----+   +--> | Queue  +----------+
                      |                                 ^             +--------+
                      |                                 |
                      |       +---------------------+   |
    +-----+           |       |                     |   |
    |node | +---------------->+ Node      channel   |   |
    +-----+           |       |                     |   |
                      |       |           mailbox +-----+
                      |       |                     |
                      +       +---------------------+
    
    

    手机如下

    3.4 建立channel

    经过如下代码之后,这才建立channel。

    connection = kombu.Connection('redis://localhost:6379')
    mailbox = pidbox.Mailbox("testMailbox", type="fanout")
    node = mailbox.Node(hostname, state={"a": "b"})
    node.channel = connection.channel()
    

    3.4.1 联系

    这里关于channel,Connection 与 Transport 的联系解释如下:

    • Connection:对 MQ 连接的抽象,一个 Connection 就对应一个 MQ 的连接;Connection 是 AMQP 对 连接的封装;
    • Channel:与AMQP中概念类似,可以理解成共享一个Connection的多个轻量化连接;Channel 是 AMQP 对 MQ 的操作的封装;
    • Transport:kombu 支持将不同的消息中间件以插件的方式进行灵活配置,使用transport这个术语来表示一个具体的消息中间件,可以认为是对broker的抽象:
      • 对 MQ 的操作必然离不开连接,但是,Kombu 并不直接让 Channel 使用 Connection 来发送/接受请求,而是引入了一个新的抽象 Transport,Transport 负责具体的 MQ 的操作,也就是说 Channel 的操作都会落到 Transport 上执行。引入transport这个抽象概念可以使得后续添加对non-AMQP的transport非常简单;
      • Transport是真实的 MQ 连接,也是真正连接到 MQ(redis/rabbitmq) 的实例,区分底层消息队列的实现;
      • 当前Kombu中build-in支持有Redis、Beanstalk、Amazon SQS、CouchDB,、MongoDB,、ZeroMQ,、ZooKeeper、SoftLayer MQ和Pyro;

    3.4.2 Channel

    在 Transport 有两个channels 列表:

    self._avail_channels
    self.channels
    

    如果_avail_channels 有内容则直接获取,否则生成一个新的Channel。

    在真正连接时候,会调用 establish_connection 放入self._avail_channels。

    def establish_connection(self):
        # creates channel to verify connection.
        # this channel is then used as the next requested channel.
        # (returned by ``create_channel``).
        self._avail_channels.append(self.create_channel(self))
        return self     # for drain events
    

    Channel关键初始化代码如下:

    class Channel(virtual.Channel):
        """Redis Channel."""
    
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
    
            self._queue_cycle = cycle_by_name(self.queue_order_strategy)()
            self.Client = self._get_client()
    
            self.active_fanout_queues = set()
            self.auto_delete_queues = set()
            self._fanout_to_queue = {}
            self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive}
    
            self.connection.cycle.add(self)  # add to channel poller. # 加入消息循环
    
                if register_after_fork is not None:
                    register_after_fork(self, _after_fork_cleanup_channel)
    

    3.4.3 MultiChannelPoller

    MultiChannelPoller 定义如下,可以理解为执行engine,主要作用是:

    • 收集channel;
    • 建立fd到channel的映射;
    • 建立channel到socks的映射;
    • 使用poll;
    class MultiChannelPoller:
    
        def __init__(self):
            # active channels
            self._channels = set()
            # file descriptor -> channel map.
            self._fd_to_chan = {}
            # channel -> socket map
            self._chan_to_sock = {}
            # poll implementation (epoll/kqueue/select)
            self.poller = poll()
            # one-shot callbacks called after reading from socket.
            self.after_read = set()
    
        def add(self, channel):
            self._channels.add(channel)
    

    最后Channel变量举例如下:

    self = {Channel} <kombu.transport.redis.Channel object at 0x7ffc6d9c5fd0>
     Client = {type} <class 'redis.client.Redis'>
     Message = {type} <class 'kombu.transport.virtual.base.Message'>
     QoS = {type} <class 'kombu.transport.redis.QoS'>
     active_fanout_queues = {set: 0} set()
     active_queues = {set: 0} set()
     async_pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
     auto_delete_queues = {set: 0} set()
     body_encoding = {str} 'base64'
     channel_id = {int} 1
     client = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
     connection = {Transport} <kombu.transport.redis.Transport object at 0x7ffc6d9c5f60>
     cycle = {FairCycle} <FairCycle: 0/0 []>
     exchange_types = {dict: 3} {'direct': <kombu.transport.virtual.exchange.DirectExchange object at 0x7ffc6d9c5f98>, 'topic': <kombu.transport.virtual.exchange.TopicExchange object at 0x7ffc6d9c5d68>, 'fanout': <kombu.transport.virtual.exchange.FanoutExchange object at 0x7ffc6d9d2b70>}
     handlers = {dict: 2} {'BRPOP': <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7ffc6d9c5fd0>>, 'LISTEN': <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7ffc6d9c5fd0>>}
     health_check_interval = {int} 25
     keyprefix_fanout = {str} '/0.'
     keyprefix_queue = {str} '_kombu.binding.%s'
     pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
     priority_steps = {list: 4} [0, 3, 6, 9]
     qos = {QoS} <kombu.transport.redis.QoS object at 0x7ffc6d9fbc88>
     queue_order_strategy = {str} 'round_robin'
     state = {BrokerState} <kombu.transport.virtual.base.BrokerState object at 0x7ffc6d969e10>
     subclient = {PubSub} <redis.client.PubSub object at 0x7ffc6d9fbd68>
    
    

    最后Transport变量举例如下:

    connection = {Transport} <kombu.transport.redis.Transport object at 0x7ffc6d9c5f60>
     Channel = {type} <class 'kombu.transport.redis.Channel'>
     Cycle = {type} <class 'kombu.utils.scheduling.FairCycle'>
     Management = {type} <class 'kombu.transport.virtual.base.Management'>
     channels = {list: 1} [<kombu.transport.redis.Channel object at 0x7ffc6da8c748>]
     client = {Connection} <Connection: redis://localhost:6379// at 0x7ffc6d5f0e80>
     connection_errors = {tuple: 8} (<class 'amqp.exceptions.ConnectionError'>, <class 'kombu.exceptions.InconsistencyError'>, <class 'OSError'>, <class 'OSError'>, <class 'OSError'>, <class 'redis.exceptions.ConnectionError'>, <class 'redis.exceptions.AuthenticationError'>, <class 'redis.exceptions.TimeoutError'>)
     cycle = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7ffc6d9d2198>
      after_read = {set: 0} set()
      eventflags = {int} 25
      fds = {dict: 0} {}
      poller = {_poll} <kombu.utils.eventio._poll object at 0x7ffc6d9d21d0>
     driver_name = {str} 'redis'
     driver_type = {str} 'redis'
     implements = {Implements: 3} {'asynchronous': True, 'exchange_type': frozenset({'direct', 'fanout', 'topic'}), 'heartbeats': False}
     manager = {Management} <kombu.transport.virtual.base.Management object at 0x7ffc6da8c6d8>
     state = {BrokerState} <kombu.transport.virtual.base.BrokerState object at 0x7ffc6d969e10>
    
    

    此时逻辑如下:

                                                                          +-----------------------+    +-----------------------+
                                                                          | Transport             |    | MultiChannelPoller    |
    user scope        +                                                   |                       |    |                       |
                      |       +--------------------------------------+    |            cycle +-------> |           _channels +----+
                      |       |                                      |    |                       |    +-----------------------+  |
    +------------+    |       | Connection: redis://localhost:6379// |    |            channels +--------+                        |
    | Connection|-----------> |                                      |    |                       |      |                        |
    +------------+    |       |                                      |    |     _avail_channels+---------+                        |
                      |       |                       connection+-------> |                       |      |                        |
                      |       |                                      |    +-----------------------+      |                        v
                      |       +--------------------------------------+                                   |      +-----------------+---+
                      |                                                                                  +----->+ Channel             |     +-----------+
                      |                                               +----------------------------+            |            cycle +------> | FairCycle |
                      |                                               | Exchange                   |            |                     |     |           |
                      |       +------------------------------+   +--> |                            |            |                     |     +-----------+
    +---------+       |       | Mailbox                      |   |    | testMailbox.pidbox(fanout) |            |           handlers+-----+
    | mailbox|--------------> |                              |   |    +----------------------------+            +----+----------------+   |
    +---------+       |       |                              |   |                                                   ^                    |
                      |       |        exchange  +---------------+    +---------------------------------+            |                    v
                      |       |                              |        | Exchange                        |            |    +---------------+---------------+
                      |       |        reply_exchange +------------>  |                                 |            |    |  'BRPOP': Channel._brpop_read |
                      |       |                              |        | reply.testMailbox.pidbox(direct)|            |    |                               |
                      |       |        reply_queue +-------------+    +-------------------+-------------+            |    |  'LISTEN': Channel._receive   |
                      |       |                              |   |                        ^                          |    |                               |
                      |       |                              |   |    +--------+          |                          |    +-------------------------------+
                      |       +-------------------------+----+   +--> | Queue  +----------+                          |
                      |                                 ^             +--------+                                     |
                      |                                 |                                                            |
                      |       +---------------------+   |                                                            |
    +-----+           |       |                     |   |                                                            |
    |node | +---------------->+ Node      channel+-------------------------------------------------------------------+
    +-----+           |       |                     |   |
                      |       |           mailbox +-----+
                      |       |                     |
                      +       +---------------------+
    
    
    

    手机如下:

    3.5 建立 Consumer

    如下代码建立一个Consumer,也建立了对应的Queue。就是说,广播还是需要依赖Consumer完成,或者说是借助Consumer功能。

    def main(arguments):
        consumer = node.listen(callback=callback)
    
    

    listen代码如下:

    def listen(self, channel=None, callback=None):
        consumer = self.Consumer(channel=channel,
                                 callbacks=[callback or self.handle_message],
                                 on_decode_error=self.on_decode_error)
        consumer.consume()
        return consumer
    
    

    此时对应Queue变量如下:

    queue = {Queue} <unbound Queue localhost.testMailbox.pidbox -> <unbound Exchange testMailbox.pidbox(fanout)> -> >
     ContentDisallowed = {type} <class 'kombu.exceptions.ContentDisallowed'>
     alias = {NoneType} None
     auto_delete = {bool} True
     binding_arguments = {NoneType} None
     bindings = {set: 0} set()
     can_cache_declaration = {bool} False
     channel = {str} 'line 178, in _getPyDictionary
        attr = getattr(var, n)
      File "
     consumer_arguments = {NoneType} None
     durable = {bool} False
     exchange = {Exchange} Exchange testMailbox.pidbox(fanout)
    
    

    逻辑如下:

                                                                           +-----------------------+    +-----------------------+
                                                                           | Transport             |    | MultiChannelPoller    |
     user scope        +                                                   |                       |    |                       |
                       |       +--------------------------------------+    |            cycle +-------> |           _channels +----+
                       |       |                                      |    |                       |    +-----------------------+  |
     +------------+    |       | Connection: redis://localhost:6379// |    |            channels +--------+                        |
     | Connection|-----------> |                                      |    |                       |      |                        |
     +------------+    |       |                                      |    |     _avail_channels+---------+                        |
                       |       |                       connection+-------> |                       |      |                        |
                       |       |                                      |    +-----------------------+      |                        v
                       |       +--------------------------------------+                                   |      +-----------------+---+
                       |                                                                                  +----->+ Channel             |     +-----------+
                       |                                               +----------------------------+            |            cycle +------> | FairCycle |
                       |                                               | Exchange                   |            |                     |     |           |
                       |       +------------------------------+   +--> |                            |  <-----+   |                     |     +-----------+
     +---------+       |       | Mailbox                      |   |    | testMailbox.pidbox(fanout) |        |   |           handlers+-----+
     | mailbox|--------------> |                              |   |    +----------------------------+        |   +-+--+----------------+   |
     +---------+       |       |                              |   |                                          |     ^  ^                    |
                       |       |        exchange  +---------------+    +---------------------------------+   |     |  |                    v
                       |       |                              |        | Exchange                        |   |     |  |    +---------------+---------------+
                       |       |        reply_exchange +------------>  |                                 |   |     |  |    |  'BRPOP': Channel._brpop_read |
                       |       |                              |        | reply.testMailbox.pidbox(direct)|   |     |  |    |                               |
                       |       |        reply_queue +-------------+    +-------------------+-------------+   |     |  |    |  'LISTEN': Channel._receive   |
                       |       |                              |   |                        ^                 |     |  |    |                               |
                       |       |                              |   |    +--------+          |                 |     |  |    +-------------------------------+
                       |       +-------------------------+----+   +--> | Queue  +----------+                 |     |  |
                       |                                 ^             +--------+                            |     |  |
                       |                                 |                                                   |     |  |
                       |       +---------------------+   |                                                   |     |  |
     +-----+           |       |                     |   |                                                   |     |  |
     |node | +---------------->+ Node      channel+-------------------------------------------------------------------+
     +-----+           |       |                     |   |                                                   |     |
                       |       |           mailbox +-----+                                                   |     |
                       |       |                     |        +----------------------------------------------------+
                       |       +---------------------+        |                                              |
                       |                                      |                                              |
                       |                                      |                                              |
                       |       +------------------------+     |         +-----------------------------------------------------------------------------+
    +----------+       |       |                        |     |         | Queue                              |                                        |
    | consumer |       |       | Consumer    channel  +-------+         |                                    +                                        |
    +----------+       |       |                        |               |                                 exchange                                    |
                       |       |             queues  +--------------->  |                                                                             |
                       |       |                        |               |                                                                             |
    +-----------+      |       |             callbacks  |               |     <localhost.testMailbox.pidbox -> Exchange testMailbox.pidbox(fanout)>   |
    | callback  |      |       |                  +     |               |                                                                             |
    +------+----+      |       +------------------------+               +-----------------------------------------------------------------------------+
           ^           |                          |
           |           |                          |
           +--------------------------------------+
                       |
                       +
    
    
    

    手机如下

    3.5.1 binding 写入Redis

    此时会把binding关系写入Redis,这样后续就可以利用这个binding来进行路由。

    具体堆栈如下:

    sadd, client.py:2243
    _queue_bind, redis.py:817
    queue_bind, base.py:568
    bind_to, entity.py:674
    queue_bind, entity.py:662
    _create_queue, entity.py:617
    declare, entity.py:606
    declare, messaging.py:417
    revive, messaging.py:404
    __init__, messaging.py:382
    Consumer, pidbox.py:78
    listen, pidbox.py:91
    main, node.py:20
    <module>, node.py:29
    
    

    逻辑如图,此时出现了Redis。

     user scope        +      Kombu                                        +-----------------------+    +-----------------------+                              +          redis
                       |                                                   | Transport             |    | MultiChannelPoller    |                              |
                       |                                                   |                       |    |                       |                              |
                       |       +--------------------------------------+    |            cycle +-------> |           _channels +----+                           |
                       |       |                                      |    |                       |    +-----------------------+  |                           |
     +------------+    |       | Connection: redis://localhost:6379// |    |            channels +--------+                        |                           |
     | Connection|-----------> |                                      |    |                       |      |                        |                           |
     +------------+    |       |                                      |    |     _avail_channels+---------+                        |                           |
                       |       |                       connection+-------> |                       |      |                        |                           |
                       |       |                                      |    +-----------------------+      |                        v                           |
                       |       +--------------------------------------+                                   |      +-----------------+---+                       |
                       |                                                                                  +----->+ Channel             |     +-----------+     |
                       |                                               +----------------------------+            |            cycle +------> | FairCycle |     |
                       |                                               | Exchange                   |            |                     |     |           |     |
                       |       +------------------------------+   +--> |                            |  <-----+   |                     |     +-----------+     |
     +---------+       |       | Mailbox                      |   |    | testMailbox.pidbox(fanout) |        |   |           handlers+-----+                   |
     | mailbox|--------------> |                              |   |    +----------------------------+        |   +-+--+----------------+   |                   |
     +---------+       |       |                              |   |                                          |     ^  ^                    |                   |
                       |       |        exchange  +---------------+    +---------------------------------+   |     |  |                    v                   |
                       |       |                              |        | Exchange                        |   |     |  |    +---------------+---------------+   |
                       |       |        reply_exchange +------------>  |                                 |   |     |  |    |  'BRPOP': Channel._brpop_read |   |
                       |       |                              |        | reply.testMailbox.pidbox(direct)|   |     |  |    |                               |   |
                       |       |        reply_queue +-------------+    +-------------------+-------------+   |     |  |    |  'LISTEN': Channel._receive   |   |
                       |       |                              |   |                        ^                 |     |  |    |                               |   |
                       |       |                              |   |    +--------+          |                 |     |  |    +-------------------------------+   |
                       |       +-------------------------+----+   +--> | Queue  +----------+                 |     |  |                                        |
                       |                                 ^             +--------+                            |     |  |                                        |
                       |                                 |                                                   |     |  |                                        |  +----------------------------------------------------+
                       |       +---------------------+   |                                                   |     |  |                                        |  |      _kombu.binding.testMailbox.pidbox             |
     +-----+           |       |                     |   |                                                   |     |  |                                        |  |                                                    |
     |node | +---------------->+ Node      channel+-------------------------------------------------------------------+                                        |  |                                                    |
     +-----+           |       |                     |   |                                                   |     |                                           |  |   "x06x16x06x16localhost.testMailbox.pidbox"   |
                       |       |           mailbox +-----+                                                   |     |                                           |  |                                                    |
                       |       |                     |        +----------------------------------------------------+                                           |  +---------+------------------------------------------+
                       |       +---------------------+        |                                              |                                                 |            ^
                       |                                      |                                              |                                                 |            |
                       |                                      |                                              |                                                 |            |
                       |       +------------------------+     |         +-----------------------------------------------------------------------------+        |            |
    +----------+       |       |                        |     |         | Queue                              |                                        |        |            |
    | consumer |       |       | Consumer    channel  +-------+         |                                    +                                        |        |            |
    +----------+       |       |                        |               |                                 exchange                                    |        |            |
                       |       |             queues  +--------------->  |                                                                             |        |            |
                       |       |                        |               |                                                                             | +-------------------+
    +-----------+      |       |             callbacks  |               |     <localhost.testMailbox.pidbox -> Exchange testMailbox.pidbox(fanout)>   |        |
    | callback  |      |       |                  +     |               |                                                                             |        |
    +------+----+      |       +------------------------+               +-----------------------------------------------------------------------------+        |
           ^           |                          |                                                                                                            |
           |           |                          |                                                                                                            |
           +--------------------------------------+                                                                                                            +
                       |
                       +
    
    

    手机如下:

    3.5.2 配置

    代码来到了kombu/transport/virtual/base.py,这里工作如下:

    • 把 consumer 的 queue 加入到 Channel;
    • 把回调函数加入到 Channel;
    • 把 Consumer 加入循环;

    这样,Comuser 的 queue 和 回调函数 就通过 Channel 联系起来。

    代码如下:

        def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
            """Consume from `queue`."""
            self._tag_to_queue[consumer_tag] = queue
            self._active_queues.append(queue)
    
            def _callback(raw_message):
                message = self.Message(raw_message, channel=self)
                if not no_ack:
                    self.qos.append(message, message.delivery_tag)
                return callback(message)
    
            self.connection._callbacks[queue] = _callback
            self._consumers.add(consumer_tag)
    
            self._reset_cycle()
    
    

    调用堆栈如下:

    basic_consume, base.py:635
    basic_consume, redis.py:598
    consume, entity.py:738
    _basic_consume, messaging.py:594
    consume, messaging.py:473
    listen, pidbox.py:92
    main, node.py:20
    <module>, node.py:29
    

    此时依然在Channel

    self = {Channel} <kombu.transport.redis.Channel object at 0x7fc252239908>
     Client = {type} <class 'redis.client.Redis'>
     Message = {type} <class 'kombu.transport.virtual.base.Message'>
     active_fanout_queues = {set: 1} {'localhost.testMailbox.pidbox'}
     active_queues = {set: 0} set()
     async_pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
     auto_delete_queues = {set: 1} {'localhost.testMailbox.pidbox'}
     body_encoding = {str} 'base64'
     channel_id = {int} 1
     client = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
     closed = {bool} False
     codecs = {dict: 1} {'base64': <kombu.transport.virtual.base.Base64 object at 0x7fc25218f5c0>}
     connection = {Transport} <kombu.transport.redis.Transport object at 0x7fc2522295f8>
     cycle = {FairCycle} <FairCycle: 0/1 ['localhost.testMailbox.pidbox']>
     deadletter_queue = {NoneType} None
     default_priority = {int} 0
     do_restore = {bool} True
     exchange_types = {dict: 3} {'direct': <kombu.transport.virtual.exchange.DirectExchange object at 0x7fc252239fd0>, 'topic': <kombu.transport.virtual.exchange.TopicExchange object at 0x7fc252239f60>, 'fanout': <kombu.transport.virtual.exchange.FanoutExchange object at 0x7fc252239f28>}
     handlers = {dict: 2} {'BRPOP': <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7fc252239908>>, 'LISTEN': <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7fc252239908>>}
     keyprefix_fanout = {str} '/0.'
     keyprefix_queue = {str} '_kombu.binding.%s'
     pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
     priority_steps = {list: 4} [0, 3, 6, 9]
     qos = {QoS} <kombu.transport.redis.QoS object at 0x7fc252264320>
     queue_order_strategy = {str} 'round_robin'
     state = {BrokerState} <kombu.transport.virtual.base.BrokerState object at 0x7fc25218f6a0>
     subclient = {PubSub} <redis.client.PubSub object at 0x7fc252264400>
    

    具体循环如下:

    def _reset_cycle(self):
        self._cycle = FairCycle(
            self._get_and_deliver, self._active_queues, Empty)
    

    FairCycle定义如下:

    class FairCycle:
        """Cycle between resources.
    
        Consume from a set of resources, where each resource gets
        an equal chance to be consumed from.
    
        Arguments:
            fun (Callable): Callback to call.
            resources (Sequence[Any]): List of resources.
            predicate (type): Exception predicate.
        """
    
        def __init__(self, fun, resources, predicate=Exception):
            self.fun = fun
            self.resources = resources
            self.predicate = predicate
            self.pos = 0
    
        def _next(self):
            while 1:
                try:
                    resource = self.resources[self.pos]
                    self.pos += 1
                    return resource
                except IndexError:
                    self.pos = 0
                    if not self.resources:
                        raise self.predicate()
    
        def get(self, callback, **kwargs):
            """Get from next resource."""
            for tried in count(0):  # for infinity
                resource = self._next()
                try:
                    return self.fun(resource, callback, **kwargs)
                except self.predicate:
                    # reraise when retries exchausted.
                    if tried >= len(self.resources) - 1:
                        raise
    

    回调函数如下:

    fun = {method} <bound method AbstractChannel._get_and_deliver of <kombu.transport.redis.Channel object at 0x7fc252239908>>
    resources = {list: 1} ['localhost.testMailbox.pidbox']
    
    

    逻辑如下:

     user scope        +      Kombu                                        +-----------------------+    +-----------------------+                                   +          redis
                       |                                                   | Transport             |    | MultiChannelPoller    |                                   |
                       |                                                   |                       |    |                       |                                   |
                       |       +--------------------------------------+    |            cycle +-------> |           _channels +----+                                |
                       |       |                                      |    |                       |    +-----------------------+  |                                |
     +------------+    |       | Connection: redis://localhost:6379// |    |            channels +--------+                        v                                |
     | Connection|-----------> |                                      |    |                       |      |      +-----------------+---+                            |
     +------------+    |       |                                      |    |     _avail_channels+---------+      | Channel             |  <------------+            |
                       |       |                       connection+-------> |                       |      |      |                     |               |            |
                       |       |                                      |    +-----------------------+      |      |     _active_queues +------------------------+    |
                       |       +--------------------------------------+                                   |      |                     |               |       |    |
                       |                                                                                  +----->+                     |     +---------+-+     |    |
                       |                                               +----------------------------+            |            cycle +------> | FairCycle |     |    |
                       |                                               | Exchange                   |            |                     |     |           |     |    |
                       |       +------------------------------+   +--> |                            |  <-----+   |                     |     +-----------+     |    |
     +---------+       |       | Mailbox                      |   |    | testMailbox.pidbox(fanout) |        |   |           handlers+-----+                   |    |
     | mailbox|--------------> |                              |   |    +----------------------------+        |   +-+--+----------------+   |                   |    |
     +---------+       |       |                              |   |                                          |     ^  ^                    |                   |    |
                       |       |        exchange  +---------------+    +---------------------------------+   |     |  |                    v                   |    |
                       |       |                              |        | Exchange                        |   |     |  |    +---------------+---------------+   |    |
                       |       |        reply_exchange +------------>  |                                 |   |     |  |    |  'BRPOP': Channel._brpop_read |   |    |
                       |       |                              |        | reply.testMailbox.pidbox(direct)|   |     |  |    |                               |   |    |
                       |       |        reply_queue +-------------+    +-------------------+-------------+   |     |  |    |  'LISTEN': Channel._receive   |   |    |
                       |       |                              |   |                        ^                 |     |  |    |                               |   |    |
                       |       |                              |   |    +--------+          |                 |     |  |    +-------------------------------+   |    |
                       |       +-------------------------+----+   +--> | Queue  +----------+                 |     |  |                                        |    |
                       |                                 ^             +--------+                            |     |  |                                        |    |
                       |                                 |                                                   |     |  |                                        |    |  +----------------------------------------------------+
                       |       +---------------------+   |                                                   |     |  |                                        |    |  |      _kombu.binding.testMailbox.pidbox             |
     +-----+           |       |                     |   |                                                   |     |  |                                        |    |  |                                                    |
     |node | +---------------->+ Node      channel+-------------------------------------------------------------------+                                        |    |  |                                                    |
     +-----+           |       |                     |   |                                                   |     |                                           |    |  |   "x06x16x06x16localhost.testMailbox.pidbox"   |
                       |       |           mailbox +-----+                                                   |     |                                           |    |  |                                                    |
                       |       |                     |        +----------------------------------------------------+                                           |    |  +---------+------------------------------------------+
                       |       +---------------------+        |                                              |                                                 |    |            ^
                       |                                      |                                              |                                                 |    |            |
                       |                                      |                                              |                                                 |    |            |
                       |       +------------------------+     |         +-----------------------------------------------------------------------------+        |    |            |
    +----------+       |       |                        |     |         | Queue                              |                                        |        |    |            |
    | consumer |       |       | Consumer    channel  +-------+         |                                    +                                        |  <-----+    |            |
    +----------+       |       |                        |               |                                 exchange                                    |             |            |
                       |       |             queues  +--------------->  |                                                                             |             |            |
                       |       |                        |               |                                                                             | +------------------------+
    +-----------+      |       |             callbacks  |               |     <localhost.testMailbox.pidbox -> Exchange testMailbox.pidbox(fanout)>   |             |
    | callback  |      |       |                  +     |               |                                                                             |             |
    +------+----+      |       +------------------------+               +-----------------------------------------------------------------------------+             |
           ^           |                          |                                                                                                                 |
           |           |                          |                                                                                                                 |
           +--------------------------------------+                                                                                                                 +
                       |
    
    

    手机如下

    3.5.3 配置负载均衡

    回到 Channel 类,这里最后会配置负载均衡,就是具体下一次使用哪一个 Queue 的消息。

    def basic_consume(self, queue, *args, **kwargs):
        if queue in self._fanout_queues:
            exchange, _ = self._fanout_queues[queue]
            self.active_fanout_queues.add(queue)
            self._fanout_to_queue[exchange] = queue
        ret = super().basic_consume(queue, *args, **kwargs)
    
        # Update fair cycle between queues.
        #
        # We cycle between queues fairly to make sure that
        # each queue is equally likely to be consumed from,
        # so that a very busy queue will not block others.
        #
        # This works by using Redis's `BRPOP` command and
        # by rotating the most recently used queue to the
        # and of the list.  See Kombu github issue #166 for
        # more discussion of this method.
        self._update_queue_cycle()
        return ret
      
      
    def _update_queue_cycle(self):
        self._queue_cycle.update(self.active_queues)
    

    堆栈如下:

    update, scheduling.py:75
    _update_queue_cycle, redis.py:1018
    basic_consume, redis.py:610
    consume, entity.py:738
    _basic_consume, messaging.py:594
    consume, messaging.py:473
    listen, pidbox.py:92
    main, node.py:20
    <module>, node.py:29
    

    策略如下:

    class round_robin_cycle:
        """Iterator that cycles between items in round-robin."""
    
        def __init__(self, it=None):
            self.items = it if it is not None else []
    
        def update(self, it):
            """Update items from iterable."""
            self.items[:] = it
    
        def consume(self, n):
            """Consume n items."""
            return self.items[:n]
    
        def rotate(self, last_used):
            """Move most recently used item to end of list."""
            items = self.items
            try:
                items.append(items.pop(items.index(last_used)))
            except ValueError:
                pass
            return last_used
    

    逻辑如下:

     user scope        +      Kombu                                        +-----------------------+    +-----------------------+                                   +          redis
                       |                                                   | Transport             |    | MultiChannelPoller    |                                   |
                       |                                                   |                       |    |                       |                                   |
                       |       +--------------------------------------+    |            cycle +-------> |           _channels +----+                                |
                       |       |                                      |    |                       |    +-----------------------+  |                                |
     +------------+    |       | Connection: redis://localhost:6379// |    |            channels +--------+                        v                                |
     | Connection|-----------> |                                      |    |                       |      |      +-----------------+---+                            |
     +------------+    |       |                                      |    |     _avail_channels+---------+      | Channel             |  <------------+            |
                       |       |                       connection+-------> |                       |      |      |                     |               |            |
                       |       |                                      |    +-----------------------+      |      |     _active_queues +------------------------+    |
                       |       +--------------------------------------+                                   |      |                     |               |       |    |
                       |                                                                                  +----->+            cycle +------>  +--------+--+    |    |
                       |                                               +----------------------------+            |                     |      | FairCycle |    |    |
                       |                                               | Exchange                   |            |                     |      +-----------+    |    |
                       |       +------------------------------+   +--> |                            |  <-----+   |       _queue_cycle+-----------+             |    |
     +---------+       |       | Mailbox                      |   |    | testMailbox.pidbox(fanout) |        |   |                     |         |             |    |
     | mailbox|--------------> |                              |   |    +----------------------------+        |   |           handlers  |         v             |    |
     +---------+       |       |                              |   |                                          |   |               +     |      round_robin_cycle|    |
                       |       |        exchange  +---------------+    +---------------------------------+   |   +-+--+----------------+                       |    |
                       |       |                              |        | Exchange                        |   |     ^  ^          |                             |    |
                       |       |        reply_exchange +------------>  |                                 |   |     |  |          |                             |    |
                       |       |                              |        | reply.testMailbox.pidbox(direct)|   |     |  |          |                             |    |
                       |       |        reply_queue +-------------+    +-------------------+-------------+   |     |  |          v                             |    |
                       |       |                              |   |                        ^                 |     |  |    +-----+-------------------------+   |    |
                       |       |                              |   |    +--------+          |                 |     |  |    |  'BRPOP': Channel._brpop_read |   |    |
                       |       +-------------------------+----+   +--> | Queue  +----------+                 |     |  |    |                               |   |    |
                       |                                 ^             +--------+                            |     |  |    |  'LISTEN': Channel._receive   |   |    |
                       |                                 |                                                   |     |  |    |                               |   |    |  +----------------------------------------------------+
                       |       +---------------------+   |                                                   |     |  |    +-------------------------------+   |    |  |      _kombu.binding.testMailbox.pidbox             |
     +-----+           |       |                     |   |                                                   |     |  |                                        |    |  |                                                    |
     |node | +---------------->+ Node      channel+-------------------------------------------------------------------+                                        |    |  |                                                    |
     +-----+           |       |                     |   |                                                   |     |                                           |    |  |   "x06x16x06x16localhost.testMailbox.pidbox"   |
                       |       |           mailbox +-----+                                                   |     |                                           |    |  |                                                    |
                       |       |                     |        +----------------------------------------------------+                                           |    |  +---------+------------------------------------------+
                       |       +---------------------+        |                                              |                                                 |    |            ^
                       |                                      |                                              |                                                 |    |            |
                       |                                      |                                              |                                                 |    |            |
                       |       +------------------------+     |         +-----------------------------------------------------------------------------+        |    |            |
    +----------+       |       |                        |     |         | Queue                              |                                        |        |    |            |
    | consumer |       |       | Consumer    channel  +-------+         |                                    +                                        |  <-----+    |            |
    +----------+       |       |                        |               |                                 exchange                                    |             |            |
                       |       |             queues  +--------------->  |                                                                             |             |            |
                       |       |                        |               |                                                                             | +------------------------+
    +-----------+      |       |             callbacks  |               |     <localhost.testMailbox.pidbox -> Exchange testMailbox.pidbox(fanout)>   |             |
    | callback  |      |       |                  +     |               |                                                                             |             |
    +------+----+      |       +------------------------+               +-----------------------------------------------------------------------------+             |
           ^           |                          |                                                                                                                 |
           |           |                          |                                                                                                                 |
           +--------------------------------------+                                                                                                                 +
                       |
    
    

    手机如下:

    3.6 消费

    3.2.1 消费主体

    如下代码完成消费。

    def main(arguments):
        consumer = node.listen(callback=callback)
        try:
            while True:
                print('Consumer Waiting')
                connection.drain_events()
        finally:
            consumer.cancel()
    
    

    具体就是使用 drain_events 里读取消息,其代码如下:

    def drain_events(self, connection, timeout=None):
        time_start = monotonic()
        get = self.cycle.get
        polling_interval = self.polling_interval
        if timeout and polling_interval and polling_interval > timeout:
            polling_interval = timeout
        while 1:
            try:
                get(self._deliver, timeout=timeout)
            except Empty:
                if timeout is not None and monotonic() - time_start >= timeout:
                    raise socket.timeout()
                if polling_interval is not None:
                    sleep(polling_interval)
            else:
                break
    
    

    3.2.2 业务逻辑

    3.2.2.1 注册

    get方法功能如下(需要注意的是,每次消费都要使用一次get函数,即,都要进行注册,消费....):

    • 注册响应方式;
    • 进行poll操作,这是通用操作,或者 BRPOP,或者 LISTEN;
    • 调用 handle_event 进行读取redis,具体消费;
    def get(self, callback, timeout=None):
        self._in_protected_read = True
        try:
            for channel in self._channels:
                if channel.active_queues:           # BRPOP mode?
                    if channel.qos.can_consume():
                        self._register_BRPOP(channel)
                if channel.active_fanout_queues:    # LISTEN mode?
                    self._register_LISTEN(channel)
    
            events = self.poller.poll(timeout)
            if events:
                for fileno, event in events:
                    ret = self.handle_event(fileno, event) # 具体读取redis,进行消费
                    if ret:
                        return
            # - no new data, so try to restore messages.
            # - reset active redis commands.
            self.maybe_restore_messages()
            raise Empty()
        finally:
            self._in_protected_read = False
            while self.after_read:
                try:
                    fun = self.after_read.pop()
                except KeyError:
                    break
                else:
                    fun()
    
    

    因为这里利用了pubsub,所以调用到 channel._subscribe 来注册订阅,具体如下:

    def _register_LISTEN(self, channel):
        """Enable LISTEN mode for channel."""
        if not self._client_registered(channel, channel.subclient, 'LISTEN'):
            channel._in_listen = False
            self._register(channel, channel.subclient, 'LISTEN')
        if not channel._in_listen:
            channel._subscribe()  # send SUBSCRIBE
    
    

    具体类如下:

    self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7fc2522297f0>
    
    

    _register会把channel,socket fd的信息结合起来,作用就是:如果对应的socket fd有poll,就会调用对应的channel。

    def _register(self, channel, client, type):
        if (channel, client, type) in self._chan_to_sock:
            self._unregister(channel, client, type)
        if client.connection._sock is None:   # not connected yet.
            client.connection.connect()
        sock = client.connection._sock
        self._fd_to_chan[sock.fileno()] = (channel, type)
        self._chan_to_sock[(channel, client, type)] = sock
        self.poller.register(sock, self.eventflags)
    

    具体_subscribe就是与具体redis联系,进行注册。

    这样,对于 consumer 来说,redis 也联系上了,poll 也联系上了,下面就可以消费了。

    def _subscribe(self):
        keys = [self._get_subscribe_topic(queue)
                for queue in self.active_fanout_queues]
        if not keys:
            return
        c = self.subclient
        if c.connection._sock is None:
            c.connection.connect()
        self._in_listen = c.connection
        c.psubscribe(keys)
    

    堆栈如下:

    _subscribe, redis.py:663
    _register_LISTEN, redis.py:322
    get, redis.py:375
    drain_events, base.py:960
    drain_events, connection.py:318
    main, node.py:24
    <module>, node.py:29
    

    相应变量如下,这里 client 是 redis 驱动的 PubSub 对象:

    c = {PubSub} <redis.client.PubSub object at 0x7fc252264400>
    
    keys = {list: 1} ['/0.testMailbox.pidbox']
    
    self = {Channel} <kombu.transport.redis.Channel object at 0x7fc252239908>
    

    此时逻辑如下:

                                                                                                                                                                    +
    user scope         +      Kombu                                                                                                                                 |          redis
                       |                                                                               psubscribe                                                   |
                       |                                                                                                                                            |           +----------------------------+
            +--------------------->   drain_events   +--------------------------------------------------------------------------------------------------------------------->    |  '/0.testMailbox.pidbox'   |
            |          |                                                                                                                                            |           +----------------------------+
            |          |                                                                                                                                            |
            |          |                                                   +-----------------------+    +-----------------------+                                   |
            |          |                                                   | Transport             |    | MultiChannelPoller    |                                   |
            |          |                                                   |                       |    |                       |                                   |
            |          |       +--------------------------------------+    |            cycle +-------> |           _channels +----+                                |
            |          |       |                                      |    |                       |    +-----------------------+  |                                |
     +------+-----+    |       | Connection: redis://localhost:6379// |    |            channels +--------+                        v                                |
     | Connection|-----------> |                                      |    |                       |      |      +-----------------+---+                            |
     +------------+    |       |                                      |    |     _avail_channels+---------+      | Channel             |  <------------+            |
                       |       |                       connection+-------> |                       |      |      |                     |               |            |
                       |       |                                      |    +-----------------------+      |      |     _active_queues +------------------------+    |
                       |       +--------------------------------------+                                   |      |                     |               |       |    |
                       |                                                                                  +----->+            cycle +------>  +--------+--+    |    |
                       |                                               +----------------------------+            |                     |      | FairCycle |    |    |
                       |                                               | Exchange                   |            |                     |      +-----------+    |    |
                       |       +------------------------------+   +--> |                            |  <-----+   |       _queue_cycle+-----------+             |    |
     +---------+       |       | Mailbox                      |   |    | testMailbox.pidbox(fanout) |        |   |                     |         |             |    |
     | mailbox|--------------> |                              |   |    +----------------------------+        |   |           handlers  |         v             |    |
     +---------+       |       |                              |   |                                          |   |               +     |      round_robin_cycle|    |
                       |       |        exchange  +---------------+    +---------------------------------+   |   +-+--+----------------+                       |    |
                       |       |                              |        | Exchange                        |   |     ^  ^          |                             |    |
                       |       |        reply_exchange +------------>  |                                 |   |     |  |          |                             |    |
                       |       |                              |        | reply.testMailbox.pidbox(direct)|   |     |  |          |                             |    |
                       |       |        reply_queue +-------------+    +-------------------+-------------+   |     |  |          v                             |    |
                       |       |                              |   |                        ^                 |     |  |    +-----+-------------------------+   |    |
                       |       |                              |   |    +--------+          |                 |     |  |    |  'BRPOP': Channel._brpop_read |   |    |
                       |       +-------------------------+----+   +--> | Queue  +----------+                 |     |  |    |                               |   |    |
                       |                                 ^             +--------+                            |     |  |    |  'LISTEN': Channel._receive   |   |    |
                       |                                 |                                                   |     |  |    |                               |   |    |  +----------------------------------------------------+
                       |       +---------------------+   |                                                   |     |  |    +-------------------------------+   |    |  |      _kombu.binding.testMailbox.pidbox             |
     +-----+           |       |                     |   |                                                   |     |  |                                        |    |  |                                                    |
     |node | +---------------->+ Node      channel+-------------------------------------------------------------------+                                        |    |  |                                                    |
     +-----+           |       |                     |   |                                                   |     |                                           |    |  |   "x06x16x06x16localhost.testMailbox.pidbox"   |
                       |       |           mailbox +-----+                                                   |     |                                           |    |  |                                                    |
                       |       |                     |        +----------------------------------------------------+                                           |    |  +---------+------------------------------------------+
                       |       +---------------------+        |                                              |                                                 |    |            ^
                       |                                      |                                              |                                                 |    |            |
                       |                                      |                                              |                                                 |    |            |
                       |       +------------------------+     |         +-----------------------------------------------------------------------------+        |    |            |
    +----------+       |       |                        |     |         | Queue                              |                                        |        |    |            |
    | consumer |       |       | Consumer    channel  +-------+         |                                    +                                        |  <-----+    |            |
    +----------+       |       |                        |               |                                 exchange                                    |             |            |
                       |       |             queues  +--------------->  |                                                                             |             |            |
                       |       |                        |               |                                                                             | +------------------------+
    +-----------+      |       |             callbacks  |               |     <localhost.testMailbox.pidbox -> Exchange testMailbox.pidbox(fanout)>   |             |
    | callback  |      |       |                  +     |               |                                                                             |             |
    +------+----+      |       +------------------------+               +-----------------------------------------------------------------------------+             |
           ^           |                          |                                                                                                                 |
           |           |                          |                                                                                                                 |
           +--------------------------------------+                                                                                                                 +
                       |
    
    

    手机如下

    3.2.2.2 消费

    前小节提到了,handle_event 之中会具体读取redis,进行消费。

    当接受到信息之后,会调用如下:

    def _deliver(self, message, queue):
        try:
            callback = self._callbacks[queue]
        except KeyError:
            self._reject_inbound_message(message)
        else:
            callback(message)
    

    堆栈如下:

    _deliver, base.py:975
    _receive_one, redis.py:721
    _receive, redis.py:692
    on_readable, redis.py:358
    handle_event, redis.py:362
    get, redis.py:380
    drain_events, base.py:960
    drain_events, connection.py:318
    main, node.py:24
    <module>, node.py:29
    

    此时变量如下,就是 basic_consume 之中的 _callback :

    self._callbacks = {dict: 1} 
     'localhost.testMailbox.pidbox' = {function} <function Channel.basic_consume.<locals>._callback at 0x7fc2522c1840>
    

    继续调用,处理信息

    def receive(self, body, message):
        """Method called when a message is received.
    
        This dispatches to the registered :attr:`callbacks`.
    
        Arguments:
            body (Any): The decoded message body.
            message (~kombu.Message): The message instance.
    
        Raises:
            NotImplementedError: If no consumer callbacks have been
                registered.
        """
        callbacks = self.callbacks
        [callback(body, message) for callback in callbacks]
    

    堆栈如下:

    receive, messaging.py:583
    _receive_callback, messaging.py:620
    _callback, base.py:630
    _deliver, base.py:980
    _receive_one, redis.py:721
    _receive, redis.py:692
    on_readable, redis.py:358
    handle_event, redis.py:362
    get, redis.py:380
    drain_events, base.py:960
    drain_events, connection.py:318
    main, node.py:24
    <module>, node.py:29
    

    变量如下:

    body = {dict: 5} {'method': 'print_msg', 'arguments': {'msg': 'Message for you'}, 'destination': None, 'pattern': None, 'matcher': None}
    
    message = {Message} <Message object at 0x7fc2522e20d8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': '7dd6ad01-4162-42c3-b8db-bb40dc7dfda0', 'body_length': 119, 'properties': {}, 'delivery_info': {'exchange': 'testMailbox.pidbox', 'rout
    
    self = {Consumer} <Consumer: [<Queue localhost.testMailbox.pidbox -> <Exchange testMailbox.pidbox(fanout) bound to chan:1> ->  bound to chan:1>]>
    

    最后调用到用户方法:

    callback, node.py:15
    <listcomp>, messaging.py:586
    receive, messaging.py:586
    _receive_callback, messaging.py:620
    _callback, base.py:630
    _deliver, base.py:980
    _receive_one, redis.py:721
    _receive, redis.py:692
    on_readable, redis.py:358
    handle_event, redis.py:362
    get, redis.py:380
    drain_events, base.py:960
    drain_events, connection.py:318
    main, node.py:24
    <module>, node.py:29
    

    这样,mailbox 的 consumer 端就分析完毕。

    0x04 Producer

    Producer 就是发送邮件,此处逻辑要简单许多。

    代码如下:

    def main(arguments):
        connection = kombu.Connection('redis://localhost:6379')
        mailbox = pidbox.Mailbox("testMailbox", type="fanout")
        bound = mailbox(connection)
        bound._broadcast("print_msg", {'msg': 'Message for you'})
    

    4.1 Mailbox

    现在位于Mailbox,可以看到就是调用 _publish。

    def _broadcast(self, command, arguments=None, destination=None,
                   reply=False, timeout=1, limit=None,
                   callback=None, channel=None, serializer=None,
                   pattern=None, matcher=None):
    
        arguments = arguments or {}
        reply_ticket = reply and uuid() or None
        chan = channel or self.connection.default_channel
    
        # Set reply limit to number of destinations (if specified)
        if limit is None and destination:
            limit = destination and len(destination) or None
    
        serializer = serializer or self.serializer
        self._publish(command, arguments, destination=destination,
                      reply_ticket=reply_ticket,
                      channel=chan,
                      timeout=timeout,
                      serializer=serializer,
                      pattern=pattern,
                      matcher=matcher)
    
        if reply_ticket:
            return self._collect(reply_ticket, limit=limit,
                                 timeout=timeout,
                                 callback=callback,
                                 channel=chan)
    

    变量如下:

    arguments = {dict: 1} {'msg': 'Message for you'}
    self = {Mailbox} <kombu.pidbox.Mailbox object at 0x7fccf19514e0>
    

    继续调用 _publish,其中如果需要回复,则做相应设置,否则直接调用 producer 进行发送。

    def _publish(self, type, arguments, destination=None,
                 reply_ticket=None, channel=None, timeout=None,
                 serializer=None, producer=None, pattern=None, matcher=None):
        message = {'method': type,
                   'arguments': arguments,
                   'destination': destination,
                   'pattern': pattern,
                   'matcher': matcher}
        chan = channel or self.connection.default_channel
        exchange = self.exchange
        if reply_ticket:
            maybe_declare(self.reply_queue(channel))
            message.update(ticket=reply_ticket,
                           reply_to={'exchange': self.reply_exchange.name,
                                     'routing_key': self.oid})
        serializer = serializer or self.serializer
        with self.producer_or_acquire(producer, chan) as producer:
            producer.publish(
                message, exchange=exchange.name, declare=[exchange],
                headers={'clock': self.clock.forward(),
                         'expires': time() + timeout if timeout else 0},
                serializer=serializer, retry=True,
            )
    

    此时变量如下:

    exchange = {Exchange} Exchange testMailbox.pidbox(fanout)
    message = {dict: 5} {'method': 'print_msg', 'arguments': {'msg': 'Message for you'}, 'destination': None, 'pattern': None, 'matcher': None}
    

    4.2 producer

    下面产生了producer。于是由producer进行操作。

    def _publish(self, body, priority, content_type, content_encoding,
                 headers, properties, routing_key, mandatory,
                 immediate, exchange, declare):
        channel = self.channel
        message = channel.prepare_message(
            body, priority, content_type,
            content_encoding, headers, properties,
        )
    
        # handle autogenerated queue names for reply_to
        reply_to = properties.get('reply_to')
        if isinstance(reply_to, Queue):
            properties['reply_to'] = reply_to.name
        return channel.basic_publish(
            message,
            exchange=exchange, routing_key=routing_key,
            mandatory=mandatory, immediate=immediate,
        )
    

    4.3 Channel

    继续执行到 Channel,就是要对 redis 进行处理了。

    def basic_publish(self, message, exchange, routing_key, **kwargs):
        """Publish message."""
        self._inplace_augment_message(message, exchange, routing_key)
        if exchange:
            return self.typeof(exchange).deliver(  # 这里
                message, exchange, routing_key, **kwargs
            )
        # anon exchange: routing_key is the destination queue
        return self._put(routing_key, message, **kwargs)
    

    4.4 FanoutExchange

    直接用 Exchange 进行发送。

    class FanoutExchange(ExchangeType):
        """Fanout exchange.
    
        The `fanout` exchange implements broadcast messaging by delivering
        copies of all messages to all queues bound to the exchange.
    
        To support fanout the virtual channel needs to store the table
        as shared state.  This requires that the `Channel.supports_fanout`
        attribute is set to true, and the `Channel._queue_bind` and
        `Channel.get_table` methods are implemented.
        """
    
        type = 'fanout'
    
        def lookup(self, table, exchange, routing_key, default):
            return {queue for _, _, queue in table}
    
        def deliver(self, message, exchange, routing_key, **kwargs):
            if self.channel.supports_fanout:
                self.channel._put_fanout(
                    exchange, message, routing_key, **kwargs)
    

    4.5 Channel

    流程进入到 Channel,这时候调用 redis 驱动进行发送。

    def _put_fanout(self, exchange, message, routing_key, **kwargs):
        """Deliver fanout message."""
        with self.conn_or_acquire() as client:
            client.publish(
                self._get_publish_topic(exchange, routing_key),
                dumps(message),
            )
    

    4.6 redis 驱动

    最后,redis 驱动进行发送。

    def publish(self, channel, message):
        """
        Publish ``message`` on ``channel``.
        Returns the number of subscribers the message was delivered to.
        """
        return self.execute_command('PUBLISH', channel, message)
    

    关键变量如下:

    channel = {str} '/0.testMailbox.pidbox'
    message = {str} '{"body": "eyJtZXRob2QiOiAicHJpbnRfbXNnIiwgImFyZ3VtZW50cyI6IHsibXNnIjogIk1lc3NhZ2UgZm9yIHlvdSJ9LCAiZGVzdGluYXRpb24iOiBudWxsLCAicGF0dGVybiI6IG51bGwsICJtYXRjaGVyIjogbnVsbH0=", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"cloc
    self = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
    

    0xFF 参考

    Kombu 源码解析一

    Kombu 源码解析二

    Kombu 源码解析三

    Kombu 源码解析四

    Kombu 源码解析五

  • 相关阅读:
    高斯核函数的代码体现
    程序编译
    DoH
    随笔1
    获取节点值的方式
    DOM解析XML
    URLConnection发送请求,并接收数据
    myeclipse编译后的jsp文件存放位置
    各种中文乱码
    各种提交的区别
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/14455431.html
Copyright © 2011-2022 走看看