zoukankan      html  css  js  c++  java
  • [源码分析] 消息队列 Kombu 之 Hub

    [源码分析] 消息队列 Kombu 之 Hub

    0x00 摘要

    本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 中的 Hub 概念。

    0x01 示例代码

    下面使用如下代码来进行说明。

    本示例来自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感谢。

    def main(arguments):
        hub = Hub()
        exchange = Exchange('asynt_exchange')
        queue = Queue('asynt_queue', exchange, 'asynt_routing_key')
    
        def send_message(conn):
            producer = Producer(conn)
            producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')
            print('message sent')
    
        def on_message(message):
            print('received: {0!r}'.format(message.body))
            message.ack()
            # hub.stop()  # <-- exit after one message
    
        conn = Connection('redis://localhost:6379')
        conn.register_with_event_loop(hub)
    
        def p_message():
            print(' kombu ')
    
        with Consumer(conn, [queue], on_message=on_message):
            send_message(conn)
            hub.timer.call_repeatedly(3, p_message)
            hub.run_forever()
    
    if __name__ == '__main__':
        sys.exit(main(sys.argv[1:]))
    

    0x02 来由

    前文中,Consumer部分有一句代码没有分析:

    hub.run_forever()
    

    此时,hub与Connection已经联系起来,具体如下:

    具体如下图:

    +----------------------+               +-------------------+
    | Consumer             |               | Channel           |
    |                      |               |                   |        +-----------------------------------------------------------+
    |                      |               |    client  +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
    |      channel  +--------------------> |                   |        +-----------------------------------------------------------+
    |                      |               |    pool           |
    |                      |   +---------> |                   | <------------------------------------------------------------+
    |      queues          |   |           |                   |                                                              |
    |                      |   |    +----> |    connection +---------------+                                                  |
    |        |             |   |    |      |                   |           |                                                  |
    +----------------------+   |    |      +-------------------+           |                                                  |
             |                 |    |                                      v                                                  |
             |                 |    |      +-------------------+       +---+-----------------+       +--------------------+   |
             |                 |    |      | Connection        |       | redis.Transport     |       | MultiChannelPoller |   |
             |                 |    |      |                   |       |                     |       |                    |   |
             |                 |    |      |                   |       |                     |       |     _channels +--------+
             |                 |    |      |                   |       |        cycle +------------> |     _fd_to_chan    |
             |                 |    |      |     transport +---------> |                     |       |     _chan_to_sock  |
             |       +-------->+    |      |                   |       |                     |    +------+ poller         |
             |       |              |      +-------------------+       +---------------------+    |  |     after_read     |
             |       |              |                                                             |  |                    |
             |       |              |                                                             |  +--------------------+
             |       |              |      +------------------+                   +---------------+
             |       |              |      | Hub              |                   |
             |       |              |      |                  |                   v
             |       |              |      |                  |            +------+------+
             |       |              |      |      poller +---------------> | _poll       |
             |       |              |      |                  |            |             |         +-------+
             |       |              |      |                  |            |    _poller+---------> |  poll |
             v       |              |      +------------------+            |             |         +-------+
                     |              |                                      +-------------+
        +-------------------+       |      +----------------+
        | Queue      |      |       |      | Exchange       |
        |      _chann+l     |       +----+ |                |
        |                   |              |                |
        |      exchange +----------------> |     channel    |
        |                   |              |                |
        |                   |              |                |
        +-------------------+              +----------------+
    
    

    手机如下:

    现在我们知道:

    • Consumers:接受消息的抽象类,consumer需要声明一个queue,并将queue与指定的exchange绑定,然后从queue里面接收消息。
    • Exchange:MQ 路由,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列。
    • Queue:对应的 queue 抽象,存储着即将被应用消费掉的消息,Exchange负责将消息分发Queue,消费者从Queue接收消息;
    • Channel:与AMQP中概念类似,可以理解成共享一个Connection的多个轻量化连接,是操作的抽象;

    但是,我们只是大致知道 poll 是用来做什么的,但是不知道consumer,poll 究竟如何与Hub交互。我们本文就接着分析。

    0x03 Poll一般步骤

    在linux系统中,使用Poll的一般步骤如下:

    1. Create an epoll object——创建1个epoll对象;
    2. Tell the epoll object to monitor specific events on specific sockets——告诉epoll对象,在指定的socket上监听指定的事件;
    3. Ask the epoll object which sockets may have had the specified event since the last query——询问epoll对象,从上次查询以来,哪些socket发生了哪些指定的事件;
    4. Perform some action on those sockets——在这些socket上执行一些操作;
    5. Tell the epoll object to modify the list of sockets and/or events to monitor——告诉epoll对象,修改socket列表和(或)事件,并监控;
    6. Repeat steps 3 through 5 until finished——重复步骤3-5,直到完成;
    7. Destroy the epoll object——销毁epoll对象;

    所以我们就需要在 Hub 代码中看看 kombu 如何使用 Poll。

    0x04 建立 Hub

    在建立 Hub 这里会建立 Hub 内部的 Poller。

    _get_poller, eventio.py:312
    poll, eventio.py:328
    _create_poller, hub.py:113
    __init__, hub.py:96
    main, hub_receive.py:23
    <module>, hub_receive.py:46
    

    具体代码是:

    def _get_poller():
        if detect_environment() != 'default':
            # greenlet
            return _select
        elif epoll:
            # Py2.6+ Linux
            return _epoll
        elif kqueue and 'netbsd' in sys.platform:
            return _kqueue
        elif xpoll:
            return _poll
        else:
            return _select
    

    这样,在 Hub内部就建立了 poller。

    class Hub:
        """Event loop object.
    
        Arguments:
            timer (kombu.asynchronous.Timer): Specify custom timer instance.
        """
        def __init__(self, timer=None):
            self.timer = timer if timer is not None else Timer()
    
            self.readers = {}
            self.writers = {}
            self.on_tick = set()
            self.on_close = set()
            self._ready = set()
    
            self._running = False
            self._loop = None
    
            self._create_poller()
    
        @property
        def poller(self):
            if not self._poller:
                self._create_poller()
            return self._poller
    
        @poller.setter
        def poller(self, value):
            self._poller = value
    
        def _create_poller(self):
            self._poller = poll()
            self._register_fd = self._poller.register
            self._unregister_fd = self._poller.unregister
    

    这里需要注意的是:

    在 MultiChannelPoller 之中,也会生成一个 poller,但是在注册时候,Transport 会使用 hub 的 poller,而非 MultiChannelPoller 内部的 poller

    on_poll_init, redis.py:333
    register_with_event_loop, redis.py:1061
    register_with_event_loop, connection.py:266
    main, hub_receive.py:38
    <module>, hub_receive.py:46
    

    在 kombu.transport.redis.Transport 代码如下:

    def register_with_event_loop(self, connection, loop):
        cycle = self.cycle
        cycle.on_poll_init(loop.poller) # 这里赋值。
        cycle_poll_start = cycle.on_poll_start
        add_reader = loop.add_reader
        on_readable = self.on_readable   
    

    继续深入,看到进一步赋值:

    def on_poll_init(self, poller):
        self.poller = poller # 这里赋值
        for channel in self._channels:
            return channel.qos.restore_visible(
                num=channel.unacked_restore_limit,
            )
    

    0x05 Forever in Hub

    hub.run_forever() 主要作用是:

    • 建立loop
    • 因为Hub里面有Channel,有poll,所以现在就把Channel与poll联系起来,包括socket,socket的file等待。
    • 进行poll,有消息就相应处理;

    比如维护如下变量:

    self._fd_to_chan[sock.fileno()] = (channel, type)
    self._chan_to_sock[(channel, client, type)] = sock
    self.poller.register(sock, self.eventflags)
    

    具体 run_forever 如下:

    def run_forever(self):
        self._running = True
        try:
            while 1:
                try:
                    self.run_once()
                except Stop:
                    break
        finally:
            self._running = False
    

    于是又有调用如下,这里就进入了loop:

    def run_once(self):
        try:
            next(self.loop)
        except StopIteration:
            self._loop = None
    

    5.1 建立loop

    next(self.loop) 继续调用,建立loop。这就是Hub的作用

    调用stack如下:

    create_loop, hub.py:279
    run_once, hub.py:193
    run_forever, hub.py:185
    main, testUb.py:51
    <module>, testUb.py:55
    

    简化版代码如下:

    def create_loop(self, ...):
    
        while 1:
            todo = self._ready
            self._ready = set()
    
            for tick_callback in on_tick:
                tick_callback() # 这里回调用户方法
    
            for item in todo:
                if item:
                    item()
    
            poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
    
            if readers or writers:
                to_consolidate = []
                events = poll(poll_timeout) # 等待消息
    
                for fd, event in events or ():
                    if fd in consolidate and 
                            writers.get(fd) is None:
                        to_consolidate.append(fd)
                        continue
                    cb = cbargs = None
    
                    if event & READ:
                        cb, cbargs = readers[fd] # 读取redis
                    elif event & WRITE:
                        cb, cbargs = writers[fd] # 处理redis
    
                    if isinstance(cb, generator):
                        try:
                            next(cb) 
                    else:
                        cb(*cbargs) # 调用用户代码
                if to_consolidate:
                    consolidate_callback(to_consolidate)
            else:
                # no sockets yet, startup is probably not done.
                sleep(min(poll_timeout, 0.1))
            yield
    

    下面我们逐步分析。

    0x06 启动Poll

    循环最开始将启动 Poll。 tick_callback 的作用就是启动 Poll。就是建立一个机制,当 redis 有消息时候,得到通知

    while 1:
        todo = self._ready
        self._ready = set()
    
        for tick_callback in on_tick:
            tick_callback()
    

    此时:tick_callback的数值为:<function Transport.register_with_event_loop.<locals>.on_poll_start >,所以 tick_callback就调用到 Transport.register_with_event_loop.<locals>.on_poll_start

    6.1 回顾如何注册回调

    Transport方法如何注册,我们需要回顾,在前面代码这里会注册回调方法。

    conn.register_with_event_loop(hub)
    

    具体注册如下:

    def register_with_event_loop(self, connection, loop):
    
        cycle_poll_start = cycle.on_poll_start
        add_reader = loop.add_reader
        on_readable = self.on_readable
    
        def _on_disconnect(connection):
            if connection._sock:
                loop.remove(connection._sock)
        cycle._on_connection_disconnect = _on_disconnect
    
        def on_poll_start():
            cycle_poll_start()
            [add_reader(fd, on_readable, fd) for fd in cycle.fds]
            
        loop.on_tick.add(on_poll_start)
    

    on_poll_start就是在这里注册的,就是把 on_poll_start 注册到 hub 的 on_tick 回调之中

    loop.on_tick.add(on_poll_start)
    

    所以前面的如下代码就调用到了 on_poll_start。

    for tick_callback in on_tick:
        tick_callback()
    

    6.2 Transport启动

    所以,我们回到on_poll_start。

    def on_poll_start():
        cycle_poll_start()
        [add_reader(fd, on_readable, fd) for fd in cycle.fds]
    

    可以看到,有两部分代码:

    • poll_start : 这部分是 把 Channel 对应的 socket 同poll联系起来,一个 socket 在 linux 系统中就是一个file,就可以进行 poll 操作;
    • add_reader :这部分是 把 poll 对应的 fd 添加到 MultiChannelPoller 这里,这样 MultiChannelPoller 就可以 打通 redis queue ----> Channel ---> socket ---> poll ---> fd ---> 读取 redis 这条通路了,就是如果 redis 有数据来了,MultiChannelPoller 就马上通过 poll 得到通知,就去 redis 读取;

    让我们逐一看看。

    6.3 poll_start in MultiChannelPoller

    这里就是把Channel对应的 socket 同poll联系起来,一个 socket 在 linux 系统中就是一个file,就可以进行 poll 操作

    此时代码进入到MultiChannelPoller,数据如下:

    self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7f84e7928940>
     after_read = {set: 0} set()
     eventflags = {int} 25
     fds = {dict: 0} {}
     poller = {_poll} <kombu.utils.eventio._poll object at 0x7f84e75f4d68>
    

    可以看出来,此处就是针对channel来进行注册,把所有的channel注册到 poll上。

    def on_poll_start(self):
        for channel in self._channels:
            if channel.active_queues:           # BRPOP mode?
                if channel.qos.can_consume():
                    self._register_BRPOP(channel)
            if channel.active_fanout_queues:    # LISTEN mode?
                self._register_LISTEN(channel)
    

    对于 redis 的使用,有两种方法:BRPOP mode 和 LISTEN mode。分别对应 list 和 subscribe。

    6.3.1 _register_BRPOP

    我们先来看看 _register_BRPOP,这里做了两个判断,第一个是判断当前的 channel 是否放进了 epoll 模型里面,如果没有,那么就放进去;同时,如果之前这个 channel 不在 epoll 里面,那么这次放进去了,但是,这个 connection 还没有对 epoll 其效果,所以发送一个 _brpop_start

    def _register_BRPOP(self, channel):
        """Enable BRPOP mode for channel."""
        ident = channel, channel.client, 'BRPOP'
        if not self._client_registered(channel, channel.client, 'BRPOP'):
            channel._in_poll = False
            self._register(*ident)
        if not channel._in_poll:  # send BRPOP
            channel._brpop_start()
    
    6.3.1.1 注册到MultiChannelPoller

    一个 Connection 对应一个 Hub它们之间的枢纽是 MultiChannelPoller,它负责找出哪个 Channel 是可用的,这些 Channel 都是来自同一个 Connection。具体注册代码如下:

    def _register(self, channel, client, type):
        if (channel, client, type) in self._chan_to_sock:
            self._unregister(channel, client, type)
        if client.connection._sock is None:   # not connected yet.
            client.connection.connect()
            
        sock = client.connection._sock
        self._fd_to_chan[sock.fileno()] = (channel, type)
        self._chan_to_sock[(channel, client, type)] = sock
        self.poller.register(sock, self.eventflags)
    

    这里的client是Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>

    注意到这里client.connection._sock的数值是socket。

    client.connection._sock = {socket} <socket.socket fd=8, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 52353), raddr=('127.0.0.1', 6379)>
     family = {AddressFamily} AddressFamily.AF_INET
     proto = {int} 6
     timeout = {NoneType} None
     type = {SocketKind} SocketKind.SOCK_STREAM
    

    经过此阶段之后。

    _fd_to_chan有意义,具体fd是 chanel 对应的 redis socket的fd

    def _register(self, channel, client, type):
        if (channel, client, type) in self._chan_to_sock:
            self._unregister(channel, client, type)
        if client.connection._sock is None:   # not connected yet.
            client.connection.connect()
        sock = client.connection._sock
        self._fd_to_chan[sock.fileno()] = (channel, type)
        self._chan_to_sock[(channel, client, type)] = sock
        self.poller.register(sock, self.eventflags)
    

    这里就是把channel与自己对应的socket联系起来,也把channel与socket的file联系起来

    变量如下:

    self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7f9056a436a0>
     after_read = {set: 0} set()
     eventflags = {int} 25
     fds = {dict: 1} 
      8 = {tuple: 2} (<kombu.transport.redis.Channel object at 0x7f9056a57278>, 'BRPOP')
      __len__ = {int} 1
     poller = {_poll} <kombu.utils.eventio._poll object at 0x7f9056583048>
    

    这样,从 socket fd 可以找到 对应的 channel,也能从 channel 找到 对应的 socket fd 。

    如下图:

    +----------------------------------------------------------------------------------+
    |                                                                                  |
    |   MultiChannelPoller                                                             |
    |                                                                                  |
    |                                       +---------------------------------------+  |
    |                                       |  socket fd 1 : [ Channel 1, 'BRPOP']  |  |
    |           fds   +------------------>  |                                       |  |
    |                                       |  socket fd 2 : [ Channel 2, 'BRPOP']  |  |
    |                                       |                                       |  |
    |                                       |             ......                    |  |
    |                                       |                                       |  |
    |                                       |  socket fd 3 : [ Channel 3, 'BRPOP']  |  |
    |                                       +---------------------------------------+  |
    |                                                                                  |
    |                                                                                  |
    |                                                                                  |
    +----------------------------------------------------------------------------------+
    
    6.3.1.2 注册到Poll

    继续处理register,就是把socket注册到poll

    class _poll:
    
        def __init__(self):
            self._poller = xpoll()
            self._quick_poll = self._poller.poll
            self._quick_register = self._poller.register
            self._quick_unregister = self._poller.unregister
    
        def register(self, fd, events):
            fd = fileno(fd)
            poll_flags = 0
            if events & ERR:
                poll_flags |= POLLERR
            if events & WRITE:
                poll_flags |= POLLOUT
            if events & READ:
                poll_flags |= POLLIN
            self._quick_register(fd, poll_flags)
            return fd
    

    此时如下,我们仅仅以 fd 3 为例:

    下面就是 Channel ---> socket ---> poll ---> fd 这条通路。

    +----------------------------------------------------------------------------------+
    |                                                                                  |
    |   MultiChannelPoller                                                             |
    |                                                                                  |
    |                                       +---------------------------------------+  |
    |                                       |  socket fd 1 : [ Channel 1, 'BRPOP']  |  |
    |           fds   +------------------>  |                                       |  |
    |                                       |  socket fd 2 : [ Channel 2, 'BRPOP']  |  |
    |                                       |                                       |  |
    |                                       |             ......                    |  |
    |                                       |                                       |  |
    |                                       |  socket fd 3 : [ Channel 3, 'BRPOP']  |  |
    |                                       |      +                                |  |
    |                                       |      |                                |  |
    |                                       +---------------------------------------+  |
    |                                              |                                   |
    +----------------------------------------------------------------------------------+
                                                   |
                                                   |
                                                   v
    
                                                poll with OS
    
    6.3.1.3 _brpop_start

    若这个 connection 还没有对 epoll 其效果,就发送一个 _brpop_start作用为选择下一次读取的queue

    _brpop_start如下:

    def _brpop_start(self, timeout=1):
        queues = self._queue_cycle.consume(len(self.active_queues))
        if not queues:
            return
        keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps
                for queue in queues] + [timeout or 0]
        self._in_poll = self.client.connection
        self.client.connection.send_command('BRPOP', *keys)
    

    此时stack如下:

    _register, redis.py:296
    _register_BRPOP, redis.py:312
    on_poll_start, redis.py:328
    on_poll_start, redis.py:1072
    create_loop, hub.py:294
    run_once, hub.py:193
    run_forever, hub.py:185
    main, testUb.py:51
    <module>, testUb.py:55
    

    此时如下,现在我们有两条通路:

    • Channel ---> socket ---> poll ---> fd 这条通路;
    • MultiChannelPoller ---> 读取 redis 这条通路;
    • 因为这个时候 下一次 读取的 queue 已经确定了,所以已经 打通 Redis queue ----> Channel ---> socket ---> poll ---> fd 这条通路了。
    +----------------------------------------------------------------------------------+
    |                                                                                  |
    |   MultiChannelPoller                                                             |
    |                                                                                  |
    |                                       +---------------------------------------+  |
    |                                       |  socket fd 1 : [ Channel 1, 'BRPOP']  |  |
    |           fds   +------------------>  |                                       |  |
    |                                       |  socket fd 2 : [ Channel 2, 'BRPOP']  |  |
    |                                       |                                       |  |
    |                                       |             ......                    |  |
    |                                       |                                       |  |
    |                                       |  socket fd 3 : [ Channel 3, 'BRPOP']  |  |
    |          connection                   |      +                                |  |
    |              +                        |      |                                |  |
    |              |                        +---------------------------------------+  |
    |              |                               |                                   |
    +----------------------------------------------------------------------------------+
                   |                               |
                   |                               |
                   v                               v
    
            Redis Queue   +----------------->   poll with OS
    
    

    6.3.2 _register_LISTEN

    本文没有相关部分,如果有topic 相关则会调用这里。Celery event 就利用了这种方法

    def _register_LISTEN(self, channel):
        """Enable LISTEN mode for channel."""
        if not self._client_registered(channel, channel.subclient, 'LISTEN'):
            channel._in_listen = False
            self._register(channel, channel.subclient, 'LISTEN')
        if not channel._in_listen:
            channel._subscribe()  # send SUBSCRIBE
    

    注册如下:

    _subscribe, redis.py:656
    _register_LISTEN, redis.py:322
    on_poll_start, redis.py:330
    on_poll_start, redis.py:1072
    create_loop, hub.py:294
    asynloop, loops.py:81
    start, consumer.py:592
    start, bootsteps.py:116
    start, consumer.py:311
    start, bootsteps.py:365
    start, bootsteps.py:116
    start, worker.py:204
    worker, worker.py:327
    

    此时变量如下:

    c = {PubSub} <redis.client.PubSub object at 0x7fb09e750400>
    keys = {list: 1}
     0 = {str} '/0.celery.pidbox'
         
    self = {Channel} <kombu.transport.redis.Channel object at 0x7fb09e6c8c88>
    

    6.4 注册 reader in MultiChannelPoller

    上面可以看到,把所有的 channel 注册到 poll上,对所有的 queue 都发起了监听请求,也就是说任一个队列有消息过来,那么都会被响应到,那么响应给谁呢?需要看看 add_reader 这个函数做了啥:

    就是说,前面那些注册到 poll,其实没有注册响应方法,现在需要注册

    复习下,add_reader 在 on_poll_start 这里。

    def on_poll_start():
        cycle_poll_start()
        [add_reader(fd, on_readable, fd) for fd in cycle.fds]
    

    cycle.fds 具体是得到了所有fd。

    @property
    def fds(self):
        return self._fd_to_chan
    

    具体添加是在 Hub 类中。

    • 这里会再次尝试添加。
    • 然后会把 fd 与 callback 联系起来。
    class Hub:
        def add_reader(self, fds, callback, *args):
            return self.add(fds, callback, READ | ERR, args)
    
        def add(self, fd, callback, flags, args=(), consolidate=False):
            fd = fileno(fd)
            try:
                self.poller.register(fd, flags)
            except ValueError:
                self._remove_from_loop(fd)
                raise
            else:
                dest = self.readers if flags & READ else self.writers
                if consolidate:
                    self.consolidate.add(fd)
                    dest[fd] = None
                else:
                    dest[fd] = callback, args
    
    

    注意,这里设置的是:hub 的成员变量,self.readers ,其在后续 poll 消息产生的就用到了,就调用这些callback,就是 Transport.on_readable。

    readers = {dict: 1} 
     8 = {tuple: 2} (<bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7faee4128f98>>, (8,))
      0 = {method} <bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7faee4128f98>>
      1 = {tuple: 1} 8
    

    stack为:

    register, eventio.py:187
    add, hub.py:164
    add_reader, hub.py:213
    <listcomp>, redis.py:1073
    on_poll_start, redis.py:1073
    create_loop, hub.py:294
    run_once, hub.py:193
    run_forever, hub.py:185
    main, testUb.py:51
    <module>, testUb.py:55
    

    所以此时为如下,依然不知道响应给谁

    +----------------------------------------------------------------------------------+
    |                                                                                  |
    |   MultiChannelPoller                                                             |
    |                                                                                  |
    |                                       +---------------------------------------+  |
    |                                       |  socket fd 1 : [ Channel 1, 'BRPOP']  |  |
    |           fds   +------------------>  |                                       |  |
    |                                       |  socket fd 2 : [ Channel 2, 'BRPOP']  |  |
    |                                       |                                       |  |
    |                                       |             ......                    |  |
    |                                       |                                       |  |
    |                                       |  socket fd 3 : [ Channel 3, 'BRPOP']  |  |
    |          connection                   |      +                                |  |
    |              +                        |      |                                |  |
    |              |                        +---------------------------------------+  |
    |              |                               |                                   |
    +----------------------------------------------------------------------------------+
                   |                               |
                   |                               |
                   v                               v
    
            Redis Queue   +------------------>  poll with OS
    
    
    
     +---------------------------------------------------------------------------------+
     |                                                                                 |
     |    Hub                                                                          |
     |                                     +--------------------------------------+    |
     |                                     |fd 3 : [ Transport.on_readable, fd 3] |    |
     |                                     |                                      |    |
     |       readers  +------------------> |       ......                         |    |
     |                                     |                                      |    |
     |                                     |fd 1 : [ Transport.on_readable, fd 1] |    |
     |                                     +--------------------------------------+    |
     |                                                                                 |
     +---------------------------------------------------------------------------------+
    
    

    因为这个流程十分复杂,为了简化,我们这里提前剧透,在 消费函数时候,Transport 会设置 自己的 _callbacks[queue] 为一个回调函数,所以 MultiChannelPoller 读取 queue 这部分也可以联系起来

        def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
            """Consume from `queue`."""
            self._tag_to_queue[consumer_tag] = queue
            self._active_queues.append(queue)
    
            def _callback(raw_message):
                message = self.Message(raw_message, channel=self)
                if not no_ack:
                    self.qos.append(message, message.delivery_tag)
                return callback(message)
    
            self.connection._callbacks[queue] = _callback # 这里设置
            
            self._consumers.add(consumer_tag)
    
            self._reset_cycle()
    

    6.5 启动timer

    然后是启动poll的timer,定期做业务操作。

    poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
    #  print('[[[HUB]]]: %s' % (self.repr_active(),))
    if readers or writers:
        to_consolidate = []
        try:
            events = poll(poll_timeout)
            #  print('[EVENTS]: %s' % (self.repr_events(events),))
        except ValueError:  # Issue celery/#882
            return
    

    6.6 poll

    然后是进行poll,若对应的file有消息,就处理(读取redis中的内容),然后进行下一次poll。

    对于我们例子,下面简略版代码就是进行Poll:

    poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
    
    if readers or writers:
        to_consolidate = []
        try:
            events = poll(poll_timeout)
        except ValueError:  # Issue celery/#882
            return
    
        for fd, event in events or ():
            cb = cbargs = None
    
            if event & READ:
                try:
                    cb, cbargs = readers[fd]
            elif event & WRITE:
                try:
                    cb, cbargs = writers[fd]
    
            if isinstance(cb, generator):
                next(cb)
    
            else:
                try:
                    cb(*cbargs)
                except Empty:
                    pass
    else:
        # no sockets yet, startup is probably not done.
        sleep(min(poll_timeout, 0.1))
    yield
    

    6.6.1 poll方法

    具体的poll方法如下,就是调用系统的方法来进行poll:

    def poll(self, timeout, round=math.ceil,
             POLLIN=POLLIN, POLLOUT=POLLOUT, POLLERR=POLLERR,
             READ=READ, WRITE=WRITE, ERR=ERR, Integral=Integral):
        timeout = 0 if timeout and timeout < 0 else round((timeout or 0) * 1e3)
        event_list = self._quick_poll(timeout)
    
        ready = []
        for fd, event in event_list:
            events = 0
            if event & POLLIN:
                events |= READ
            if event & POLLOUT:
                events |= WRITE
            if event & POLLERR or event & POLLNVAL or event & POLLHUP:
                events |= ERR
            assert events
            if not isinstance(fd, Integral):
                fd = fd.fileno()
            ready.append((fd, events))
        return ready
    

    6.6.2 callback

    在 create_loop 代码中可以看到

    def create_loop(self,
                    generator=generator, sleep=sleep, min=min, next=next,
                    Empty=Empty, StopIteration=StopIteration,
                    KeyError=KeyError, READ=READ, WRITE=WRITE, ERR=ERR):
        readers, writers = self.readers, self.writers
        
        cb, cbargs = readers[fd]
        cb(*cbargs)
        
    

    这就是说,poll回调的时候,会调用reader中对应fd的回调函数来处理。

    readers就是在之前 6.4 那节 设定的。

    其内容是,就是 8 这个fd 对应的回调函数是Transport.on_readable:

    readers = {dict: 1} 
     8 = {tuple: 2} (<bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7ffe7482ddd8>>, (8,))
      0 = {method} <bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7ffe7482ddd8>>
      1 = {tuple: 1} 8
      __len__ = {int} 2
    

    因此回调到<kombu.transport.redis.Transport object at 0x7ffe7482ddd8>。

    def on_readable(self, fileno):
        """Handle AIO event for one of our file descriptors."""
        self.cycle.on_readable(fileno)
    

    进而调用到

    <kombu.transport.redis.MultiChannelPoller object at 0x7faee4166d68>

    def on_readable(self, fileno):
        chan, type = self._fd_to_chan[fileno]
        if chan.qos.can_consume():
            chan.handlers[type]()
    

    从 socket fd 可以找到 对应的 channel,也能从 channel 找到 对应的 socket fd 。从 channel 找到 channel 的 callback。

    对应 self._fd_to_chan[fileno],取出 socket fd 对应 callback,进行处理。这里的callback如下:

    handlers = {dict: 2}
     'BRPOP' = {method} <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7faee418dfd0>>
     'LISTEN' = {method} <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7faee418dfd0>>
    

    于是调用 Channel._brpop_read 或者 Channel._receive 从redis 中 读取消息。

    具体调用堆栈如下:

    _brpop_read, redis.py:734
    on_readable, redis.py:358
    on_readable, redis.py:1087
    create_loop, hub.py:361
    run_once, hub.py:193
    run_forever, hub.py:185
    main, testUb.py:51
    <module>, testUb.py:55
    

    逻辑如下:

    +--------------+    socket
    |     redis    | <------------> port +-->  fd +--->+                    +--->  channel +--> handlers  'BRPOP' = Channel._brpop_read
    |              |                                   |                    |                             'LISTEN' = Channel._receive
    |              |    socket                         |                    |
    |              | <------------> port +-->  fd +--->---> _fd_to_chan +------->  channel +--> handlers  'BRPOP' = Channel._brpop_read
    |  port=6379   |                                   |                    |                             'LISTEN' = Channel._receive
    |              |    socket                         |                    |
    |              | <------------> port +-->  fd +--->+                    +--->  channel +--> handlers  'BRPOP' = Channel._brpop_read
    +--------------+                                                                                      'LISTEN' = Channel._receive
    
    

    此时手机为:

    如果加入poll,则如下:

                +---------------------------------------------------------------------------------------------------------------------------------------+
                |                                     +--------------+                                   6                       parse_response         |
                |                                +--> | Linux Kernel | +---+                                                                            |
                |                                |    +--------------+     |                                                                            |
                |                                |                         |                                                                            |
                |                                |                         |  event                                                                     |
                |                                |  1                      |                                                                            |
                |                                |                         |  2                                                                         |
                |                                |                         |                                                                            |
        +-------+---+    socket                  +                         |                                                                            |
        |   redis   | <------------> port +-->  fd +--->+                  v                                                                            |
        |           |                                   |           +------+--------+                                                                   |
        |           |    socket                         |           |  Hub          |                                                                   |
        |           | <------------> port +-->  fd +--->----------> |               |                                                                   |
        | port=6379 |                                   |           |               |                                                                   |
        |           |    socket                         |           |     readers +----->  Transport.on_readable                                        |
        |           | <------------> port +-->  fd +--->+           |               |                     +                                             |
        +-----------+                                               +---------------+                     |                                             |
                                                                                                          |                                             |
                                                            3                                             |                                             |
                 +----------------------------------------------------------------------------------------+                                             |
                 |                                                                                                                                      v
                 |                                                                                                                                                  _receive_callback
                 |                                                                                                                            5    +-------------+                      +-----------+
    +------------+------+                     +-------------------------+                                    'BRPOP' = Channel._brpop_read +-----> | Channel     | +------------------> | Consumer  |
    |       Transport   |                     |  MultiChannelPoller     |      +--->  channel +--> handlers  'LISTEN' = Channel._receive           +-------------+                      +---+-------+
    |                   |                     |                         |      |                                                                                                            |
    |                   | on_readable(fileno) |                         |      |                                                                         ^                                  |
    |           cycle +---------------------> |          _fd_to_chan +------------->  channel +--> handlers  'BRPOP' = Channel._brpop_read               |                                  |
    |                   |        4            |                         |      |                             'LISTEN' = Channel._receive                 |                                  |
    |  _callbacks[queue]|                     |                         |      |                                                                         |                            on_m  |
    |          +        |                     +-------------------------+      +--->  channel +--> handlers  'BRPOP' = Channel._brpop_read               |                                  |
    +-------------------+                                                                                    'LISTEN' = Channel._receive                 |                                  |
               |                                                                                                                                         |                                  v
               |                                                7           _callback                                                                    |
               +-----------------------------------------------------------------------------------------------------------------------------------------+                            User Function
    
    

    此时手机为:

    0x07 接收消息

    现在消息已经被放置于redis 队列中,那么消息又被如何使用呢?

    从上节得知,当poll提示有消息时候,会通过 Channel._brpop_read 或者 Channel._receive 从 redis 中 读取消息。

    具体堆栈如下:

    _brpop_read, redis.py:734
    on_readable, redis.py:358
    on_readable, redis.py:1087
    create_loop, hub.py:361
    run_once, hub.py:193
    run_forever, hub.py:185
    main, testUb.py:51
    <module>, testUb.py:55
    

    即:在 hub 的 loop中,通过 redis 驱动代码 从 redis 队列中取出消息,然后调用Transport传递过来的_deliver方法,最后调用userfunction

    def _brpop_read(self, **options):
        try:
            try:
                dest__item = self.client.parse_response(self.client.connection,
                                                        'BRPOP',
                                                        **options)
            except self.connection_errors:
                # if there's a ConnectionError, disconnect so the next
                # iteration will reconnect automatically.
                self.client.connection.disconnect()
                raise
            if dest__item:
                dest, item = dest__item
                dest = bytes_to_str(dest).rsplit(self.sep, 1)[0]
                self._queue_cycle.rotate(dest)
                self.connection._deliver(loads(bytes_to_str(item)), dest) #调用用户function
                return True
            else:
                raise Empty()
        finally:
            self._in_poll = None
    

    7.1 从驱动读取

    7.1.1 从redis读取

    这里会从redis驱动读取,文件是 redis/connection.py,具体就是通过 SocketBuffer 类从 redis 对应的 socket 读取。代码为:

    def readline(self):
        buf = self._buffer
        buf.seek(self.bytes_read)
        data = buf.readline()
        while not data.endswith(SYM_CRLF):
            # there's more data in the socket that we need
            self._read_from_socket()
            buf.seek(self.bytes_read)
            data = buf.readline()
    
        self.bytes_read += len(data)
    
        # purge the buffer when we've consumed it all so it doesn't
        # grow forever
        if self.bytes_read == self.bytes_written:
            self.purge()
    
        return data[:-2]
    

    当读到 response 之后,调用 Redis驱动中对应命令的 回调方法来处理。此处命令为BRPOP。回调方法为:string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None)

    代码为:

    def parse_response(self, connection, command_name, **options):
        "Parses a response from the Redis server"
        try:
            response = connection.read_response()
        except ResponseError:
            if EMPTY_RESPONSE in options:
                return options[EMPTY_RESPONSE]
            raise
        if command_name in self.response_callbacks:
            return self.response_callbacks[command_name](response, **options)
        return response
    

    此时上下文相关变量为:

    command_name = {str} 'BRPOP'
    connection = {Connection} Connection<host=localhost,port=6379,db=0>
    options = {dict: 0} {}
    self = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
     connection = {Connection} Connection<host=localhost,port=6379,db=0>
     connection_pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
     response_callbacks = {CaseInsensitiveDict: 179} {.
      'AUTH' = {type} <class 'bool'>
      'EXPIRE' = {type} <class 'bool'>
    	.....
      'LLEN' = {type} <class 'int'>
      'LPUSHX' = {type} <class 'int'>
      'PFADD' = {type} <class 'int'>
      'PFCOUNT' = {type} <class 'int'>
    		......
      'SWAPDB' = {function} <function bool_ok at 0x7fbad4276620>
      'WATCH' = {function} <function bool_ok at 0x7fbad4276620>
      'UNWATCH' = {function} <function bool_ok at 0x7fbad4276620>
      'BLPOP' = {function} <function Redis.<lambda> at 0x7fbad4276f28>
      'BRPOP' = {function} <function Redis.<lambda> at 0x7fbad4276f28>
       ....
    

    这些代码堆栈如下:

    readline, connection.py:251
    read_response, connection.py:324
    read_response, connection.py:739
    parse_response, client.py:915
    _brpop_read, redis.py:738
    on_readable, redis.py:358
    handle_event, redis.py:362
    get, redis.py:380
    drain_events, base.py:960
    drain_events, connection.py:318
    main, testUb.py:50
    <module>, testUb.py:53
    

    7.2 分发消息

    loop从驱动得到消息之后,进行 deliver 分发。

    self.connection._deliver(loads(bytes_to_str(item)), dest)
    

    所做的事情是根据队列取出注册到此队列的回调函数列表,然后对消息执行列表中的所有回调函数

    def _deliver(self, message, queue):
        try:
            callback = self._callbacks[queue]
        except KeyError:
            logger.warning(W_NO_CONSUMERS, queue)
            self._reject_inbound_message(message)
        else:
            callback(message)
    

    7.2.1 找到callback

    此时 self是

    <kombu.transport.redis.Transport object at 0x7faee4128f98>

    callback如下:

    self._callbacks = {dict: 1} 
     'asynt_queue' = {function} <function Channel.basic_consume.<locals>._callback at 0x7faee244a2f0>
    

    这里意味着 asynt_queue 这个 queue 对应的 callback 是 Channel.basic_consume。

    7.2.2 何时设定callback

    调用的 callback 是 Channel 这里定义的。basic_consume就是把传入的参数 callback 数值,实际这个传入的参数 callback就是 Consumer. _receive_callback。

    def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
        """Consume from `queue`."""
        self._tag_to_queue[consumer_tag] = queue
        self._active_queues.append(queue)
    
        def _callback(raw_message):
            message = self.Message(raw_message, channel=self)
            if not no_ack:
                self.qos.append(message, message.delivery_tag)
            return callback(message)
    
        self.connection._callbacks[queue] = _callback
        self._consumers.add(consumer_tag)
    
        self._reset_cycle()
    

    设置是在上面函数里面这句,

        self.connection._callbacks[queue] = _callback
    

    stack如下:

    basic_consume, base.py:632
    basic_consume, redis.py:598
    consume, entity.py:738
    _basic_consume, messaging.py:594
    consume, messaging.py:473
    __enter__, messaging.py:430
    main, testUb.py:46
    <module>, testUb.py:55
    

    7.2.3 调用到用户方法

    Consumer的函数定义如下:

    def _receive_callback(self, message):
        accept = self.accept
        on_m, channel, decoded = self.on_message, self.channel, None
        try:
            m2p = getattr(channel, 'message_to_python', None)
            if m2p:
                message = m2p(message)
            if accept is not None:
                message.accept = accept
            if message.errors:
                return message._reraise_error(self.on_decode_error)
            decoded = None if on_m else message.decode()
        except Exception as exc:
            if not self.on_decode_error:
                raise
            self.on_decode_error(message, exc)
        else:
            return on_m(message) if on_m else self.receive(decoded, message)
    

    self.on_message就是用户方法,所以最终调用到用户方法

    on_message, testUb.py:36
    _receive_callback, messaging.py:620
    _callback, base.py:630
    _deliver, base.py:980
    _brpop_read, redis.py:748
    on_readable, redis.py:358
    on_readable, redis.py:1087
    create_loop, hub.py:361
    run_once, hub.py:193
    run_forever, hub.py:185
    main, testUb.py:51
    <module>, testUb.py:55
    

    此时如下:

    +----------------------+               +-------------------+
    | Producer             |               | Channel           |
    |                      |               |                   |        +-----------------------------------------------------------+
    |                      |               |    client  +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
    |      channel   +------------------>  |                   |        +-----------------------------------------------------------+
    |                      |               |    pool           |
    |      exchange        |   +---------> |                   | <------------------------------------------------------------+
    |                      |   |           |                   |                                                              |
    |      connection      |   |    +----> |    connection +---------------+                                                  |
    |             +        |   |    |      |                   |           |                                                  |
    |             |        |   |    |      +-------------------+   +----------------------------------------------------------------+
    +--+-------------------+   |    |                              |       v                                                  |     |
       |          |            |    |      +-------------------+   |   +---+-----------------+       +--------------------+   |     |
       |          |            |    |      | Connection        |   |   | redis.Transport     |       | MultiChannelPoller |   |     |
       |          +----------------------> |                   |   |   |                     |       |                    |   |     |
       |                       |    |      |     _sock  <----------+   |                     |       |     _channels +--------+     |
       |                       |    |      |                   |       |        cycle +------------> |     _fd_to_chan    |         |
       |                       |    |      |     transport +---------> |                     |       |     _chan_to_sock+-----------+
       |             +-------->+    |      |                   |       |                     |    +------+ poller         |
       |             |              |      +-------------------+       +---------------------+    |  |     after_read     |
       |             |              |                                                             |  |                    |
       |             |              |                                                             |  +--------------------+
       |             |              |      +------------------+                   +---------------+
       |             |              |      | Hub              |                   |
       |             |              |      |                  |                   v
       |             |              |      |                  |            +------+------+
       |             |              |      |      poller +---------------> | _poll       |
       | publish     |              |      |                  |            |             |         +-------+
       +--------------------------------+  |                  |            |    _poller+---------> |  poll |
                     |              |   |  +------------------+            |             |         +-------+
                     |              |   |                                  +-------------+
        +-------------------+       |   +-----> +----------------+
        | Queue      |      |       |           | Exchange       |
        |      _channel     |       +---------+ |                |
        |                   |                   |                |
        |      exchange +-------------------->  |     channel    |
        |                   |                   |                |
        |                   |                   |                |
        +-------------------+                   +----------------+
    
    

    手机如下:

    动态逻辑如下:

     +-----+           +-----------+     +--------------------+ +---------+ +-------+ +--------+
     | Hub |           | Transport |     | MultiChannelPoller | |  _poll  | |Channel| |Consumer|
     +--+--+           +----+------+     +------------+-------+ +----+----+ +---+---+ +------+-+
        |                   |                         |              |          |            |
        v                   |                         |              |          |            |
    create_loop             |                         |              |          |            |
        +                   |                         |              |          |            |
        |   on_poll_start   |                         |              |          |            |
        |                   |                         |              |          |            |
        | +---------------> |     on_poll_start       |              |          |            |
        |                   |                         |              |          |            |
        |                   | +-------------------->  |              |          |            |
        |                   |                         |              |          |            |
        |                   |                  _register_BRPOP       |          |            |
        |                   |                         |              |          |            |
        |               add_reader                    |   register   |          |            |
        |                   +                         | +----------> |          |            |
        |                   |         register        |              |          |            |
     fire_timers            | +------------------------------------> |          |            |
        |                   |                         |              |          |            |
        |    poll           |                         |              |          |            |
        | +--------------------------------------------------------> |          |            |
        |                   |                         |              |          |            |
        |                   |                         |              |          |            |
        +                   |                         |              |          |            |
    for fd, event in events |                         |              |          |            |
        |                   |                         |              |          |            |
        |                   |                         |              |          |            |
    cb, cbargs = readers[fd]|                         |              |          |            |
        +                   |                         |              |          |            |
        |                   |                         |              |          |            |
        |                   |                         |              |          |            |
     cb(*cbargs             |                         |              |          |            |
        +                   |                         |              |          |            |
        |   on_readable     |                         |              |          |            |
        |                   |                         |              |          |            |
        | +-------------->  |    on_readable          |              |          |            |
        |                   |                         |              |          |            |
        |                   +-----------------------> |              |          |            |
        |                   |                         |              |          |            |
        |                   |                         |              |          |            |
        |                   |        chan, type = _fd_to_chan[fileno]|          |            |
        |                   |                         |              |          |            |
        |                   |                         |  _brpop_read |          |            |
        |                   |                         |              |          |            |
        |                   |                         | +---------------------> |            |
        |                   |     _deliver            |              |          |            |
        |                   |                         |              |          |            |
        |                   |  <----------------------------------------------+ |            |
        |                   |                         |              |          |            |
        |                   |                         |              |          |            |
        |                   |     _callback           |              |          |            |
        |                   |                         |              |          |            |
        |                   |  +----------------------------------------------> |            |
        |                   |                         |              |          +            +
        |                   |                         |              |         _receive_callback
        |                   |                         |              |          |            +
        |                   |                         |              |          | +--------->+
        |                   |                         |              |          |            |
        v                   v                         v              v          v            v
    
    

    手机如下:

    0xFF 参考

    celery 7 优秀开源项目kombu源码分析之registry和entrypoint

    (二)放弃pika,选择kombu

    kombu消息框架<二>

    AMQP中的概念

    AMQP的基本概念

    深入理解AMQP协议

    kombu和消息队列总结

    关于epoll版服务器的理解(Python实现)

    celery源码解读

    Kombu源码分析(一)概述

  • 相关阅读:
    Tomcat自定义classLoader加密解密
    阿里巴巴2015秋季校园招聘研发工程师在线笔试题
    【Machine Learning】Mahout基于协同过滤(CF)的用户推荐
    基于Jenkins自动构建系统开发
    反射invoke()方法
    java对象序列化与反序列化
    从文本文件逐行读入数据
    Linux下MySQL小尝试
    【Html 学习笔记】第四节——框架
    穷举法
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/14455294.html
Copyright © 2011-2022 走看看