[源码分析] 消息队列 Kombu 之 Producer
0x00 摘要
本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 中的 Producer 概念。
0x01 示例代码
下面使用如下代码来进行说明。
本示例来自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感谢。
def main(arguments):
hub = Hub()
exchange = Exchange('asynt_exchange')
queue = Queue('asynt_queue', exchange, 'asynt_routing_key')
def send_message(conn):
producer = Producer(conn)
producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')
print('message sent')
def on_message(message):
print('received: {0!r}'.format(message.body))
message.ack()
# hub.stop() # <-- exit after one message
conn = Connection('redis://localhost:6379')
conn.register_with_event_loop(hub)
def p_message():
print(' kombu ')
with Consumer(conn, [queue], on_message=on_message):
send_message(conn)
hub.timer.call_repeatedly(3, p_message)
hub.run_forever()
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
0x02 来由
前文已经完成了构建部分,Consumer部分,下面来到了Producer部分,即如下代码:
def send_message(conn):
producer = Producer(conn)
producer.publish('hello world', exchange=exchange, routing_key='asynt')
print('message sent')
我们知道,Transport需要把Channel与文件信息联系起来,但是此时Transport信息如下,文件信息依然没有,这是我们以后需要留意的:
transport = {Transport} <kombu.transport.redis.Transport object at 0x7f9056a26f98>
Channel = {type} <class 'kombu.transport.redis.Channel'>
Cycle = {type} <class 'kombu.utils.scheduling.FairCycle'>
Management = {type} <class 'kombu.transport.virtual.base.Management'>
channel_max = {int} 65535
channels = {list: 2} [<kombu.transport.redis.Channel object at 0x7f9056a57278>, <kombu.transport.redis.Channel object at 0x7f9056b79cc0>]
client = {Connection} <Connection: redis://localhost:6379// at 0x7f9056a26cc0>
cycle = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7f9056a436a0>
after_read = {set: 0} set()
eventflags = {int} 25
fds = {dict: 0} {}
poller = {_poll} <kombu.utils.eventio._poll object at 0x7f9056583048>
default_connection_params = {dict: 2} {'port': 6379, 'hostname': 'localhost'}
default_port = {int} 6379
driver_name = {str} 'redis'
driver_type = {str} 'redis'
implements = {Implements: 3} {'asynchronous': True, 'exchange_type': frozenset({'direct', 'topic', 'fanout'}), 'heartbeats': False}
manager = {Management} <kombu.transport.virtual.base.Management object at 0x7f9056b79be0>
polling_interval = {NoneType} None
state = {BrokerState} <kombu.transport.virtual.base.BrokerState object at 0x7f9056a9ec50>
0x03 建立
3.1 定义
Producer中,主要变量是:
- _channel :就是channel;
- exchange :exchange;
但是本文示例没有传入exchange,这就有些奇怪,我们需要继续看看。
class Producer:
"""Message Producer.
Arguments:
channel (kombu.Connection, ChannelT): Connection or channel.
exchange (kombu.entity.Exchange, str): Optional default exchange.
routing_key (str): Optional default routing key.
"""
#: Default exchange
exchange = None
#: Default routing key.
routing_key = ''
#: Default serializer to use. Default is JSON.
serializer = None
#: Default compression method. Disabled by default.
compression = None
#: By default, if a defualt exchange is set,
#: that exchange will be declare when publishing a message.
auto_declare = True
#: Basic return callback.
on_return = None
#: Set if channel argument was a Connection instance (using
#: default_channel).
__connection__ = None
3.2 init
init代码如下。
def __init__(self, channel, exchange=None, routing_key=None,
serializer=None, auto_declare=None, compression=None,
on_return=None):
self._channel = channel
self.exchange = exchange
self.routing_key = routing_key or self.routing_key
self.serializer = serializer or self.serializer
self.compression = compression or self.compression
self.on_return = on_return or self.on_return
self._channel_promise = None
if self.exchange is None:
self.exchange = Exchange('')
if auto_declare is not None:
self.auto_declare = auto_declare
if self._channel:
self.revive(self._channel)
3.2.1 转换channel
这里有个重要转换。
- 最开始是把输入参数 Connection 赋值到 self._channel。
- 然后 revive 方法做了转换为 channel,即 self._channel 最终是 channel 类型。
但是 exchange 依然没有意义,是 direct 类型。
代码如下:
def revive(self, channel):
"""Revive the producer after connection loss."""
if is_connection(channel):
connection = channel
self.__connection__ = connection
channel = ChannelPromise(lambda: connection.default_channel)
if isinstance(channel, ChannelPromise):
self._channel = channel
self.exchange = self.exchange(channel)
else:
# Channel already concrete
self._channel = channel
if self.on_return:
self._channel.events['basic_return'].add(self.on_return)
self.exchange = self.exchange(channel)
此时变量为:
producer = {Producer}
auto_declare = {bool} True
channel = {Channel} <kombu.transport.redis.Channel object at 0x7f9056a57278>
compression = {NoneType} None
connection = {Connection} <Connection: redis://localhost:6379// at 0x7f9056a26cc0>
exchange = {Exchange} Exchange ''(direct)
on_return = {NoneType} None
routing_key = {str} ''
serializer = {NoneType} None
逻辑如图:
+----------------------+ +-------------------+
| Producer | | Channel |
| | | | +-----------------------------------------------------------+
| | | client +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
| channel +------------------> | | +-----------------------------------------------------------+
| | | pool |
| exchange | +---------> | | <------------------------------------------------------------+
| | | | | |
| connection | | +----> | connection +---------------+ |
| + | | | | | | |
+--+-------------------+ | | +-------------------+ | |
| | | | v |
| | | | +-------------------+ +---+-----------------+ +--------------------+ |
| | | | | Connection | | redis.Transport | | MultiChannelPoller | |
| +----------------------> | | | | | | |
| | | | | | | | _channels +--------+
| | | | | | cycle +------------> | _fd_to_chan |
| | | | transport +---------> | | | _chan_to_sock |
| +-------->+ | | | | | +------+ poller |
| | | +-------------------+ +---------------------+ | | after_read |
| | | | | |
| | | | +--------------------+
| | | +------------------+ +---------------+
| | | | Hub | |
| | | | | v
| | | | | +------+------+
| | | | poller +---------------> | _poll |
| publish | | | | | | +-------+
+--------------------------------+ | | | _poller+---------> | poll |
| | | +------------------+ | | +-------+
| | | +-------------+
+-------------------+ | +-----> +----------------+
| Queue | | | | Exchange |
| _channel | +---------+ | |
| | | |
| exchange +--------------------> | channel |
| | | |
| | | |
+-------------------+ +----------------+
手机如图:
0x04 发送
发送消息是通过producer.publish完成。
def send_message(conn):
producer = Producer(conn)
producer.publish('hello world', exchange=exchange, routing_key='asynt')
print('message sent')
此时传入exchange作为参数。原来如果没有 Exchange,是可以在这里进行补救。
producer.publish继续调用到如下,可以看到分为两步:
- 调用channel的组装消息函数
prepare_message
; - 调用channel的发送消息
basic_publish
;
因此,最终发送消息还是通过channel完成。
def _publish(self, body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory,
immediate, exchange, declare):
channel = self.channel
message = channel.prepare_message(
body, priority, content_type,
content_encoding, headers, properties,
)
if declare:
maybe_declare = self.maybe_declare
[maybe_declare(entity) for entity in declare]
# handle autogenerated queue names for reply_to
reply_to = properties.get('reply_to')
if isinstance(reply_to, Queue):
properties['reply_to'] = reply_to.name
return channel.basic_publish(
message,
exchange=exchange, routing_key=routing_key,
mandatory=mandatory, immediate=immediate,
)
4.1 组装消息 in channel
channel 的组装消息函数prepare_message
完成组装功能,基本上是为消息添加各种属性。
def prepare_message(self, body, priority=None, content_type=None,
content_encoding=None, headers=None, properties=None):
"""Prepare message data."""
properties = properties or {}
properties.setdefault('delivery_info', {})
properties.setdefault('priority', priority or self.default_priority)
return {'body': body,
'content-encoding': content_encoding,
'content-type': content_type,
'headers': headers or {},
'properties': properties or {}}
消息如下:
message = {dict: 5}
'body' = {str} 'aGVsbG8gd29ybGQ='
'content-encoding' = {str} 'utf-8'
'content-type' = {str} 'text/plain'
'headers' = {dict: 0} {}
__len__ = {int} 0
'properties' = {dict: 5}
'delivery_mode' = {int} 2
'delivery_info' = {dict: 2} {'exchange': 'asynt_exchange', 'routing_key': 'asynt_routing_key'}
'priority' = {int} 0
'body_encoding' = {str} 'base64'
'delivery_tag' = {str} '1b03590e-501c-471f-a5f9-f4fdcbe3379a'
__len__ = {int} 5
4.2 发送消息 in channel
channel的发送消息basic_publish
完成发送功能。此时使用了传入的参数exchange。
发送消息basic_publish
方法是调用_put
方法:
def basic_publish(self, message, exchange, routing_key, **kwargs):
"""Publish message."""
self._inplace_augment_message(message, exchange, routing_key)
if exchange:
return self.typeof(exchange).deliver(
message, exchange, routing_key, **kwargs
)
# anon exchange: routing_key is the destination queue
return self._put(routing_key, message, **kwargs)
4.3 deliver in exchange
self.typeof(exchange).deliver
代码接着来到exchange。本文是DirectExchange。
注意,这里用到了self.channel._put。就是Exchange的成员变量channel。
class DirectExchange(ExchangeType):
"""Direct exchange.
The `direct` exchange routes based on exact routing keys.
"""
type = 'direct'
def lookup(self, table, exchange, routing_key, default):
return {
queue for rkey, _, queue in table
if rkey == routing_key
}
def deliver(self, message, exchange, routing_key, **kwargs):
_lookup = self.channel._lookup
_put = self.channel._put
for queue in _lookup(exchange, routing_key):
_put(queue, message, **kwargs)
4.4 binding 转换
我们知道,Exchange的作用只是将发送的 routing_key
转化为 queue
的名字。这样发送就知道发到哪个 queue
。
因此依据_lookup方法得到对应的queue
。
def _lookup(self, exchange, routing_key, default=None):
"""Find all queues matching `routing_key` for the given `exchange`.
Returns:
str: queue name -- must return the string `default`
if no queues matched.
"""
if default is None:
default = self.deadletter_queue
if not exchange: # anon exchange
return [routing_key or default]
try:
R = self.typeof(exchange).lookup(
self.get_table(exchange),
exchange, routing_key, default,
)
except KeyError:
R = []
if not R and default is not None:
warnings.warn(UndeliverableWarning(UNDELIVERABLE_FMT.format(
exchange=exchange, routing_key=routing_key)),
)
self._new_queue(default)
R = [default]
return R
此处具体逻辑为:
第一,调用到channel的方法。这里的 exchange 名字为 asynt_exchange。
def get_table(self, exchange):
key = self.keyprefix_queue % exchange
with self.conn_or_acquire() as client:
values = client.smembers(key)
if not values:
raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key))
return [tuple(bytes_to_str(val).split(self.sep)) for val in values]
我们看看Redis内容,发现集合内容如下:
127.0.0.1:6379> smembers _kombu.binding.asynt_exchange
1) "asynt_routing_keyx06x16x06x16asynt_queue"
第二,因此得到对应binding为:
{b'asynt_routing_keyx06x16x06x16asynt_queue'}
即从 exchange 得到 routing_key ---> queue 的规则,然后再依据 routing_key 得到 queue。就知道 Consumer 和 Producer 需要依据哪个 queue 交换消息。
逻辑如下:
+---------------------------------+
| exchange |
| |
1 routing_key x | |
+----------+ | | +------------+
| Producer | +-----------------> | routing_key x ---> queue x | | Consumer |
+--------+-+ | | +------------+
| | routing_key y ---> queue y |
| | | ^
| | routing_key z ---> queue z | |
| | | |
| +---------------------------------+ |
| |
| |
| |
| |
| |
| |
| |
| |
| +-----------+ |
| 2 message | | 3 message |
+-------------------------------> | queue X | +--------------------+
| |
+-----------+
4.5 _put in channel
channel的_put 方法被用来继续处理,可以看到其最终调用到了client.lpush。
client为:
Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
代码为:
def _put(self, queue, message, **kwargs):
"""Deliver message."""
pri = self._get_message_priority(message, reverse=False)
with self.conn_or_acquire() as client:
client.lpush(self._q_for_pri(queue, pri), dumps(message))
redis怎么区别不同的queue?
实际是每个 queue 被赋予一个字符串 name,这个 name 就是 redis 对应的 list 的 key。知道应该向哪个 list 放消息,后续就是向此 list 中 lpush 消息。
如下方法完成转换功能。
def _q_for_pri(self, queue, pri):
pri = self.priority(pri)
if pri:
return f"{queue}{self.sep}{pri}"
return queue
现在发消息之后,redis内容如下,我们可以看出来,消息作为list 的item,放入到之中。
127.0.0.1:6379> lrange asynt_queue 0 -1
1) "{"body": "aGVsbG8gd29ybGQ=", "content-encoding": "utf-8", "content-type": "text/plain", "headers": {}, "properties": {"delivery_mode": 2, "delivery_info": {"exchange": "asynt_exchange", "routing_key": "asynt_routing_key"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "df7af424-e1ab-4c08-84b5-1cd5c97ed25d"}}"
127.0.0.1:6379>
0x05 总结
现在我们总结如下:
- Producers: 发送消息的抽象类;
- Consumers:接受消息的抽象类,consumer需要声明一个queue,并将queue与指定的exchange绑定,然后从queue里面接收消息;
- Exchange:MQ 路由,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列;
- Queue:对应的 queue 抽象,存储着即将被应用消费掉的消息,Exchange负责将消息分发Queue,消费者从Queue接收消息;
- Channel:与AMQP中概念类似,可以理解成共享一个Connection的多个轻量化连,就是真实redis连接;
于是逻辑链已经形成,大约是这样的:
- Producer的publish方法接受参数Exchange,于是就发送消息到此Exchange;
- Producer调用channel的组装消息函数
prepare_message
为消息添加各种属性; - Producer调用channel的发送消息basic_publish发送消息,此时使用了传入的参数exchange。
- basic_publish方法调用exchange.deliver(exchange, routing_key)来发送消息;
- Exchange中有成员变量Channel,也有成员变量Queues,每个queue对应一个routing_key;
- deliver使用_lookup方法依据key得到对应的queue;
- deliver使用Exchange成员变量Channel的_put方法来向queue中投放消息;
- Channel拿到自己的redis连接池,即client为
Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
;于是可以基于此进行redis操作; - redis怎么区别不同的queue,实际是每个queue被赋予一个字符串name,这就是redis对应的list的key;
- 既然得到了名字为queue的list,则向此list中lpush消息。
- Consumer去Queue取消息;
动态逻辑如下:
+------------+ +------------+ +------------+ +-----------------------+
| producer | | channel | | exchange | | Redis<ConnectionPool> |
+---+--------+ +----+-------+ +-------+----+ +----------+------------+
| | | |
| | | |
publish('', exchange, routing_key) | | |
| | | |
| prepare_message | | |
| | | |
| +----------------------------------> | | |
| | | |
| basic_publish (exchange, routing_key)| | |
| | | |
| +----------------------------------> | | |
| | | |
| | deliver(exchange, routing_key)| |
| | | |
| +-----------------------------> | |
| | | |
| | | |
| | _lookup(exchange, routing_key) |
| | | |
| | | |
| | _put(queue, message) | |
| v | |
| | <---------------------------+ | |
| | | |
| _q_for_pri(queue, pri) | |
| + | |
v | | |
| | client.lpush | |
| | +--------------------------------------------------> |
| | | |
v v v v
手机如下: