一、什么是Exchange
RabbitMQ 是 AMQP(高级消息队列协议)的标准实现:
从 AMQP 协议可以看出,Queue、Exchange 和 Binding 构成了 AMQP 协议的核心
-
Producer:消息生产者,即投递消息的程序。
-
Broker:消息队列服务器实体。
-
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
-
Binding:绑定,它的作用就是把 Exchange 和 Queue 按照路由规则绑定起来。
-
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
-
-
Consumer:消息消费者,即接受消息的程序。
二、Exchange的类型
RabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种
fanout
fanout类型的Exchange路由规则非常简单,它会把所有发送到fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。
direct
direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
direct Exchange是RabbitMQ Broker的默认Exchange
,它有一个特别的属性对一些简单的应用来说是非常有用的,在使用这个类型的Exchange时,可以不必指定routing key的名字,在此类型下创建的Queue有一个默认的routing key,这个routing key一般同Queue同名。
direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。
topic
前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:
b)、binding key与routing key一样也是句点号“. ”分隔的字符串
c)、binding key中可以存在两种特殊字符"*"与“#”,用于做模糊匹配,其中" * "用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
headers
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
示例代码
const amqp = require('amqplib'); async function producer() { try { // 1. 创建链接对象 const connection = await amqp.connect('amqp://localhost:5672'); // 2. 获取通道 const channel = await connection.createChannel(); // 3. 声明参数 const exchangeName = 'qosEx'; const routingKey = 'qos.test001.cool'; const msg = 'Producer'; // 4. 声明交换机 await channel.assertExchange(exchangeName, 'topic', { durable: true }); for (let i = 0; i < 5; i++) { // 5. 发送消息 console.log(i); await channel.publish( exchangeName, routingKey, Buffer.from(`${msg} 第${i}条消息`) ); } await channel.close(); } catch (error) { console.log(error); } } producer();
消费者
const amqp = require('amqplib'); async function consumer() { // 1. 创建链接对象 const connection = await amqp.connect('amqp://122.51.9.11:5672'); // 2. 获取通道 const channel = await connection.createChannel(); // 3. 声明参数 const exchangeName = 'qosEx'; const queueName = 'qosQueue'; const bindingKey = 'qos.#'; // const bindingKey = 'qos.*'; 无法匹配 // 4. 声明交换机、对列进行绑定 await channel.assertExchange(exchangeName, 'topic', { durable: true }); await channel.assertQueue(queueName); await channel.bindQueue(queueName, exchangeName, bindingKey); // 5. 限流参数设置 await channel.prefetch(1, false);// count:每次推送给消费端 N 条消息数目,如果这 N 条消息没有被ack,生产端将不会再次推送直到这 N 条消息被消费。global:在哪个级别上做限制,ture 为 channel 上做限制,false 为消费端上做限制,默认为 false。
// 6. 限流,noAck参数必须设置为false await channel.consume( queueName, (msg) => { console.log('Consumer:', msg.content.toString()); channel.ack(msg); }, { noAck: false } ); } consumer();