1. RabbitMQ简介
RabbitMQ是由erlang语言开发,基于AMQP(高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
RabbitMQ官方地址:http://www.rabbitmq.com
1.1 消息队列
MQ全称为Message Queue,即消息队列。
“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。可以看出消息的生产和消费都是异步的,生产者和消费者只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
1.2 消息队列使用场景
- 任务异步处理:
高并发环境下,由于来不及同步处理,请求往往会发生堵塞。我们可以异步处理请求,从而缓解系统的压力。将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。减少了应用程序的响应时间。
场景:
用户注册后,需要发送注册邮件和注册成功短信通知【这两个操作都不是必须的,只是一个通知,没必要让用户等待收到邮箱和短信后才算注册成功】。引入消息队列后,将发送邮件和短信的业务交给消息队列,实现异步处理。
- 应用程序解耦合:
MQ相当于一个中介,生产方通过MQ与消费方交互,不直接进行较胡,它将生产方与消费方进行解耦合,不至于当时功能里面的某一个操作因为宕机了导致后续操作无法进行。
场景:
双十一购物狂欢节,用户下单后,订单系统需要通知库存系统减少响应库存量,若库存系统出现故障,此笔订单就不能成功。
引入消息队列后,订单系统向消息队列发送用户下单的消息,并持久化数据到rabbitMQ【防止rabbitMQ宕机后消息丢失或库存系统宕机没消费消息】,库存系统监听消息队列的消息。
- 流量削峰:
场景:
秒杀活动,一般流量会很大,可能导致某个系统直接扛不住而挂掉。
引入消息队列(一般会在前端系统引入),用户发起请求时,先来到消息队列再去秒杀系统。在消息队列中对消息进行处理(比如请求达到某阈值时,直接抛弃那些请求或跳转错误页面),如此一来可缓解因高并发请求所导致秒杀系统扛不住挂掉的问题。
2. 为什么要学习RabbitMQ
先来看下面一个电商项目的场景:
商品的原始数据保存在数据库中,增删改查都在数据库中完成。
搜索服务数据来源是索引库(Elasticsearch),如果数据库商品发生变化,索引库数据不能及时更新。
商品详情做了页面静态化处理,静态页面数据也不会随着数据库商品更新而变化。
如果我们在后台修改了商品的价格,搜索页面和商品详情页显示的依然是旧的价格,这样显然不对。该如何解决?
我们可能会想到这么做:
方案1:每当后台对商品做增删改操作,同时修改索引库数据及更新静态页面。
方案2:搜索服务和商品页面静态化服务对外提供操作接口,后台在商品增删改后,调用接口。
这两种方案都有个严重的问题:就是代码耦合,后台服务中需要嵌入搜索和商品页面服务,违背了微服务的独立原则。并且是同步操作,耗费时间过长。
这时,我们就会采用另外一种解决办法,那就是消息队列:
商品服务对商品增删改以后,无需去操作索引库和静态页面,只需向MQ发送一条消息(比如包含商品id的消息),也不关心消息被谁接收。
搜索服务和静态页面服务监听MQ,接收消息,然后分别去处理索引库和静态页面(根据商品id去更新索引库和商品详情静态页面)。
3. 常见MQ产品
- ActiveMQ:基于JMS
- RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
- RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
- Kafka:分布式消息系统,高吞吐量
语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多们语言,RabbitMQ支持多种语言
效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的
消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案
4. RabbitMQ架构
4.1 简单架构图
4.2 完整架构图
5. RabbitMQ的通讯方式
6. 用Java方式操作RabbitMQ
6.1 RabbitMQ的连接
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
public static Connection getConnection() {
// 创建Connection工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.114.186.28");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/test");
// 创建Connection
Connection conn = null;
try {
conn = factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
// 返回
return conn;
}
@Test
public void getConnection() throws IOException {
Connection connection = RabbitMQClient.getConnection();
connection.close();
}
6.2 RabbitMQ的“Hello World”通讯方式
生产者
public class Publisher {
@Test
public void publish() throws Exception {
//1. 获取Connection对象
Connection connection = RabbitMQClient.getConnection();
//2. 创建Channel通道
Channel channel = connection.createChannel();
//3. 发布消息到exchange,发布的同时需要指定路由的规则
// 参数1:指定exchange,如果使用""【表示使用默认exchange】。
// 参数2:指定路由规则【“Hello World”通讯方式 直接使用具体的队列名称即可】。
// 参数3:指定传递的消息所携带的properties【即:传递的消息的额外设置】,没有就写null。
// 参数4:指定发布的具体消息内容,byte[]类型
String msg = "Hello-World!";
channel.basicPublish("","HelloWorld",null,msg.getBytes());
// Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。发布者是和交换机打交道的,所以这里不能帮助实现持久化本地
System.out.println("生产者发布消息成功!");
//4. 释放资源
channel.close();
connection.close();
}
}
消费者
public class Consumer {
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQClient.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明[创建]队列-HelloWorld
//参数1:queue - 指定队列的名称 【若该队列不存在,则自动创建】
//参数2:durable - 当前队列是否需要持久化 【持久化:将队列接收到的消息持久化到硬盘,若队列中的消息还没被消费,就算RabbitMQ重启了该消息也不会丢失】
//参数3:exclusive - 是否独占 【当前队列只允许一个消费者连接可用,其他消费者再来连接时不能再用】【当连接对象Connection被close()之后,当前队列会自动删除】
//参数4:autoDelete - 是否自动删除 【如果这个队列没有消费者在消费,队列自动删除】
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("HelloWorld", true, false, false, null);
DefaultConsumer consume = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body, "UTF-8"));
}
};
//4. 开启监听队列
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true:接收到消息后,会立即自动告诉RabbitMQ消息已消费了)
//参数3:consumer - 指定消费时的消费回调(收到消息后,消费者要干点什么事)
channel.basicConsume("HelloWorld", true, consume); // 这个才是开启监听的方法
System.out.println("消费者开始监听队列!");
// 可以实现输入字符 , 用来将程序在这里等着,相当于debug
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
}
6.3 RabbitMQ的“Work”通讯方式
“Hello world”那种通讯方式存在一个弊端,若消费者消费消息的速度很慢,可能会导致生产者发布的消息形成堆积。
我们接下来介绍一种通讯方式,就是一个生产者发布消息,有多个消费者监听,谁收到消息,谁就消费【该通讯方式中,消息一旦被消费了,就消失。所以不会出现重复消费的情况】。这样就解决了上述所说的问题。
public class Consumer1 {
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQClient.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明队列-HelloWorld
//参数1:queue - 指定队列的名称 【若该队列不存在,则自动创建】
//参数2:durable - 当前队列是否需要持久化 【持久化:将队列接收到的消息持久化到硬盘,若队列中的消息还没被消费,就算RabbitMQ重启了该消息也不会丢失】
//参数3:exclusive - 是否独占 【当前队列只允许一个消费者连接可用,其他消费者再来连接时不能再用】【当连接对象Connection被close()之后,当前队列会自动删除】
//参数4:autoDelete - 是否自动删除 【如果这个队列没有消费者在消费,队列自动删除】
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("Work", true, false, false, null);
//3.1 默认情况下是平均消费,有10个消息,那么2个消费者各消费5个消息。 如果想指定当前消费者一次消费多少个消息,可通过basicQos设置。
//【【如果要想让某个消费能力强的消费者消费更多的消息,就可以指定该消费者的消费能力。PS: 此时必须改为手动ACK】】
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1号接收到消息:" + new String(body, "UTF-8"));
// 【【手动ack 表示我已经消费完了】】
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//4. 开启监听Queue
// 将自动消费改为false 表示手动消费
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true:接收到消息后,会立即自动告诉RabbitMQ消息已消费了)
//参数3:consumer - 指定消费时的消费回调(收到消息后,消费者要干点什么事)
channel.basicConsume("Work", false, consumer);
System.out.println("开始消费消息。。。。");
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
}
public class Consumer2 {
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQClient.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明队列-HelloWorld
channel.queueDeclare("Work", true, false, false, null);
//3.1 指定当前消费者,一次消费多少个消息。
channel.basicQos(1);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2号接收到消息:" + new String(body, "UTF-8"));
// 手动ack,消费完了告诉rabbitMQ我消费完了
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("Work", false, consumer);
System.out.println("开始消费消息。。。。");
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
}
public class Publisher {
@Test
public void publish() throws Exception {
//1. 获取Connection对象
Connection connection = RabbitMQClient.getConnection();
//2. 创建Channel通道
Channel channel = connection.createChannel();
//3. 发布消息到exchange,同时指定路由的规则
for (int i = 0; i < 10; i++) {
// 参数1:指定exchange,如果使用""【表示使用默认exchange】。
// 参数2:指定路由规则【“Hello World”通讯方式 直接使用具体的队列名称即可】。
// 参数3:指定传递的消息所携带的properties【即:传递的消息的额外设置】,没有就写null。
// 参数4:指定发布的具体消息内容,byte[]类型
String msg = "Hello-World!" + i;
channel.basicPublish("", "Work", null, msg.getBytes());
}
System.out.println("生产者发布消息成功!");
//4. 释放资源
channel.close();
connection.close();
}
}
6.4 RabbitMQ的“Publish/Subscribe”通讯方式 (广播通讯方式)
public class Consumer1 {
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQClient.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明队列要消费的队列:pubsub-queue1
//参数1:queue - 指定队列的名称 【若该队列不存在,则自动创建】
//参数2:durable - 当前队列是否需要持久化 【持久化:将队列接收到的消息持久化到硬盘,若队列中的消息还没被消费,就算RabbitMQ重启了该消息也不会丢失】
//参数3:exclusive - 是否独占 【当前队列只允许一个消费者连接可用,其他消费者再来连接时不能再用】【当连接对象Connection被close()之后,当前队列会自动删除】
//参数4:autoDelete - 是否自动删除 【如果这个队列没有消费者在消费,队列自动删除】
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("pubsub-queue1", true, false, false, null);
//3.5 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1号接收到消息:" + new String(body, "UTF-8"));
// 手动ack
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//4. 开启监听Queue
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true:接收到消息后,会立即自动告诉RabbitMQ消息已消费了)
//参数3:consumer - 指定消费时的消费回调(收到消息后,消费者要干点什么事)
channel.basicConsume("pubsub-queue1", false, consumer);
System.out.println("开始消费消息。。。。");
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
}
public class Consumer2 {
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQClient.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明队列要消费的队列:pubsub-queue1
channel.queueDeclare("pubsub-queue2", true, false, false, null);
//3.5 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1号接收到消息:" + new String(body, "UTF-8"));
// 手动ack
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("pubsub-queue2", false, consumer);
System.out.println("开始消费消息。。。。");
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
}
public class Publisher {
@Test
public void publish() throws Exception {
//1. 获取Connection对象
Connection connection = RabbitMQClient.getConnection();
//2. 创建Channel
Channel channel = connection.createChannel();
//3. 创建exchange
//参数1: exchange的名称 若该交换机不存在,则创建
//参数2: 指定exchange的类型 常用的有默认的,direct,fanout,topic。前面2种我们都是使用的默认的交换机。 FANOUT:Publish/Subscribe通讯方式 DIRECT:Routing通讯方式 TOPIC:Topic通讯方式
channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
//3.1 exchange绑定某一个队列 该操作可以在生产者干 也 可以在消费者干
//参数1:指定队列
//参数2:指定交换机
//参数3:指定routingKey规则
channel.queueBind("pubsub-queue1", "pubsub-exchange", "");
channel.queueBind("pubsub-queue2", "pubsub-exchange", "");
//4. 发布消息到exchange,同时指定路由的规则
for (int i = 0; i < 10; i++) {
String msg = "Hello-World!" + i;
// 注意这里,之前发布消息采用默认的交换机 我们使用的是 ""。 现在我们要指定交换机了。
// 参数1:指定exchange,如果使用""【表示使用默认exchange】。
// 参数2:指定路由规则【“Hello World”通讯方式 直接使用具体的队列名称即可】。
// 参数3:指定传递的消息所携带的properties【即:传递的消息的额外设置】,没有就写null。
// 参数4:指定发布的具体消息内容,byte[]类型
channel.basicPublish("pubsub-exchange", "", null, msg.getBytes()); // 表示绑定到pubsub-exchange交换机,该交换机有两个队列:pubsub-queue1和pubsub-queue2
}
System.out.println("生产者发布消息成功!");
//5. 释放资源
channel.close();
connection.close();
}
}
6.5 RabbitMQ的“Routing”通讯方式
public class Consumer1 {
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQClient.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明队列-HelloWorld
//参数1:queue - 指定队列的名称 【若该队列不存在,则自动创建】
//参数2:durable - 当前队列是否需要持久化 【持久化:将队列接收到的消息持久化到硬盘,若队列中的消息还没被消费,就算RabbitMQ重启了该消息也不会丢失】
//参数3:exclusive - 是否独占 【当前队列只允许一个消费者连接可用,其他消费者再来连接时不能再用】【当连接对象Connection被close()之后,当前队列会自动删除】
//参数4:autoDelete - 是否自动删除 【如果这个队列没有消费者在消费,队列自动删除】
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("routing-queue-error", true, false, false, null);
//3.5 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者ERROR接收到消息:" + new String(body, "UTF-8"));
// 手动ack
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//4. 开启监听Queue
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true:接收到消息后,会立即自动告诉RabbitMQ消息已消费了)
//参数3:consumer - 指定消费时的消费回调(收到消息后,消费者要干点什么事)
channel.basicConsume("routing-queue-error", false, consumer);
System.out.println("开始消费消息。。。。");
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
}
public class Consumer2 {
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQClient.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明队列-HelloWorld
channel.queueDeclare("routing-queue-info", true, false, false, null);
//3.5 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者INFO接收到消息:" + new String(body, "UTF-8"));
// 手动ack
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("routing-queue-info", false, consumer);
System.out.println("开始消费消息。。。。");
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
}
public class Publisher {
@Test
public void publish() throws Exception {
//1. 获取Connection
Connection connection = RabbitMQClient.getConnection();
//2. 创建Channel
Channel channel = connection.createChannel();
//3. 创建exchange, 绑定队列:routing-queue-error 和 routing-queue-info
//参数1: exchange的名称 若该交换机不存在,则创建
//参数2: 指定exchange的类型 常用的有默认的,direct,fanout,topic。前面2种我们都是使用的默认的交换机。 FANOUT:Publish/Subscribe通讯方式 DIRECT:Routing通讯方式 TOPIC:Topic通讯方式
channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
//3.1 exchange绑定某一个队列 该操作可以在生产者干 也 可以在消费者干
//参数1:指定队列
//参数2:指定交换机
//参数3:指定routingKey规则
channel.queueBind("routing-queue-error", "routing-exchange", "ERROR");
channel.queueBind("routing-queue-info", "routing-exchange", "INFO");
//4. 发布消息到exchange,同时指定路由的规则
// 参数1:指定exchange,如果使用""【表示使用默认exchange】。
// 参数2:指定路由规则【“Hello World”通讯方式 直接使用具体的队列名称即可】。
// 参数3:指定传递的消息所携带的properties【即:传递的消息的额外设置】,没有就写null。
// 参数4:指定发布的具体消息内容,byte[]类型
channel.basicPublish("routing-exchange", "ERROR", null, "ERROR".getBytes());// 发布到"routing-exchange",并且要求该交换机的队列的routingKey是 ERROR
channel.basicPublish("routing-exchange", "INFO", null, "INFO-1".getBytes());
channel.basicPublish("routing-exchange", "INFO", null, "INFO-2".getBytes());
channel.basicPublish("routing-exchange", "INFO", null, "INFO-3".getBytes());
System.out.println("生产者发布消息成功!");
//4. 释放资源
channel.close();
connection.close();
}
}
6.6 RabbitMQ的“Topic”通讯方式
public class Consumer1 {
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQClient.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明队列-HelloWorld
channel.queueDeclare("topic-queue-1", true, false, false, null);
//3.5 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("对红色动物感兴趣接收到消息:" + new String(body, "UTF-8"));
// 手动ack
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("topic-queue-1", false, consumer);
System.out.println("开始消费消息。。。。");
// System.in.read();
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
}
public class Consumer2 {
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQClient.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明队列-HelloWorld
channel.queueDeclare("topic-queue-2", true, false, false, null);
//3.5 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("对快的和兔子感兴趣接收到消息:" + new String(body, "UTF-8"));
// 手动ack
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("topic-queue-2", false, consumer);
System.out.println("开始消费消息。。。。");
// System.in.read();
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
}
public class Publisher {
@Test
public void publish() throws Exception {
//1. 获取Connection
Connection connection = RabbitMQClient.getConnection();
//2. 创建Channel
Channel channel = connection.createChannel();
// 3. 创建exchange绑定队列 topic-queue-1 topic-queue-2
//参数1: exchange的名称 若该交换机不存在,则创建
//参数2: 指定exchange的类型 常用的有默认的,direct,fanout,topic。前面2种我们都是使用的默认的交换机。 FANOUT:Publish/Subscribe通讯方式 DIRECT:Routing通讯方式 TOPIC:Topic通讯方式
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
//3.1 exchange绑定某一个队列 该操作可以在生产者干 也 可以在消费者干
//参数1:指定队列
//参数2:指定交换机
//参数3:指定routingKey规则
// 该模式与routing模式的区别就在于 他们的routingKey有所不同
// 需求:我们要发布动物的信息,该动物有3个属性 <speed>.<color>.<what>
// *.red.* -> *占位符 即:一个*代表一个内容
// fast.# -> #通配符 即:一个#代表多个内容
// 如: *.*.rabbit 等同于 #.rabbit
channel.queueBind("topic-queue-1", "topic-exchange", "*.red.*"); // topic-queue-1队列只对红色动物感兴趣
channel.queueBind("topic-queue-2", "topic-exchange", "fast.#");
channel.queueBind("topic-queue-2", "topic-exchange", "*.*.rabbit"); // topic-queue-2队列对快的动物感兴趣 和 对兔子感兴趣。 注意: 一个队列可以被多次绑定
//4. 发布消息到exchange,同时指定路由的规则
// 参数1:指定exchange,如果使用""【表示使用默认exchange】。
// 参数2:指定路由规则【“Hello World”通讯方式 直接使用具体的队列名称即可】。
// 参数3:指定传递的消息所携带的properties【即:传递的消息的额外设置】,没有就写null。
// 参数4:指定发布的具体消息内容,byte[]类型
channel.basicPublish("topic-exchange", "fast.red.monkey", null, "红快猴子".getBytes());
channel.basicPublish("topic-exchange", "slow.black.dog", null, "黑漫狗".getBytes());
channel.basicPublish("topic-exchange", "fast.white.cat", null, "快白猫".getBytes());
System.out.println("生产者发布消息成功!");
//4. 释放资源
channel.close();
connection.close();
}
}
![](https://img2020.cnblogs.com/blog/1471584/202104/1471584-20210421013420008-737808453.png)
7. SpringBoot整合RabbitMQ[基于配置类]
整合各种模式可以参考:https://juejin.cn/post/6976033887449251876
7.1 导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
7.2 编写配置文件
spring:
rabbitmq:
host: 47.114.216.28
port: 5672
username: admin
password: admin
virtual-host: /test
7.3 声明exchange,queue
@Configuration
public class RabbitMQConfig {
// @Bean即表示声明该方法返回的实例是受 Spring 管理的 Bean。 如果想自定义Bean名称,可以再@Bean()注解里面配置。
//1. 创建exchange - topic 以topic通讯方式为例,因为其最灵活
@Bean
public TopicExchange getTopicExchange(){
// 参数1:交换机名称 参数2:是否持久化数据 参数3:是否自动删除
return new TopicExchange("boot-topic-exchange",true,false);
}
//2. 创建queue
@Bean
public Queue getQueue(){
// 参数1:队列名称 参数2:是否持久化 参数3:是否独占 参数4:是否自动删除 参数5:指定当前队列的其他信息
//参数1:queue - 指定队列的名称 【若该队列不存在,则自动创建】
//参数2:durable - 当前队列是否需要持久化 【持久化:将队列接收到的消息持久化到硬盘,若队列中的消息还没被消费,就算RabbitMQ重启了该消息也不会丢失】
//参数3:exclusive - 是否独占 【当前队列只允许一个消费者连接可用,其他消费者再来连接时不能再用】【当连接对象Connection被close()之后,当前队列会自动删除】
//参数4:autoDelete - 是否自动删除 【如果这个队列没有消费者在消费,队列自动删除】
//参数5:arguments - 指定当前队列的其他信息
return new Queue("boot-queue",true,false,false,null);
}
//3. 绑定在一起【创建Binding】
@Bean
public Binding getBinding(TopicExchange topicExchange,Queue queue){
// .bind() 绑定哪个队列 .to() 到哪个交换机 .with() 指定routingKey
return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");
}
}
// 上述写法可以写成下面这种
@Configuration
public class RabbitMQConfig {
public static final String TOPIC_NAME = "topicExchange";
public static final String QUEUE_NAME = "topicQueue";
// @Bean即表示声明该方法返回的实例是受 Spring 管理的 Bean。 如果想自定义Bean名称,可以再@Bean()注解里面配置。
//1. 创建exchange - topic 以topic通讯方式为例,因为其最灵活
@Bean("exchange")
public Exchange getTopicExchange(){
return ExchangeBuilder.topicExchange(TOPIC_NAME).durable(true).build(); // 它是这种构建者的方式构建来指定 是否持久化 是否自动删除 ...
}
//2. 创建queue
@Bean("queue")
public Queue getQueue(){
return QueueBuilder.durable(QUEUE_NAME).build(); // 他也是这种构建者
}
//3. 绑定在一起【创建Binding】
@Bean
public Binding getBinding(@Qualifier("exchange")Exchange exchange,@Qualifier("queue")Queue queue){ // 这个配置类可能又很多交换机和配置类,所以一般都会用 @Bean注解指定它的名称。而这里在绑定的时候,也要区分不同的交换机与队列.
// .bind() 绑定哪个队列 .to() 到哪个交换机 .with() 指定routingKey .noargs() 不需要指定参数-如果不写这个也代表不需要指定参数
return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*").noargs();
}
}
7.4 发布者发布消息到rabbitMQ
@SpringBootTest
class SpringbootRabbitmqApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() throws IOException {
// 参数1:指定交换机 参数2:指定routingKey 参数3:指定发送的数据内容
rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!");
}
}
7.5 消费者监听rabbitMQ接收消息
// 编写监听类
@Component
public class Consumer {
@RabbitListener(queues = "boot-queue") // 指定要监听的队列
public void getMessage(Message message) throws IOException { // 将来来消息了,就会被这个Message对象接收
System.out.println("接收到消息:" + message);
}
}
8. 持久化
思考几个问题?
- 如果消息已经到达了RabbitMQ,还没发送给消费者时,RabbitMQ宕机了,消息会丢失吗?————————————————————不会,因为RabbitMQ的队列有持久化机制,若消息到了RabbitMQ已经到了队列那里了,就能持久化。当RabbitMQ重连的时候消息就能发送给消费者了。
- 消费者在消费消息的时候,还没消费完,此时消费者宕机了,消息会丢失吗?————————————————————不会,因为RabbitMQ提供了手动ACK。当成功消费完消息的时候再手动ACK告诉生产者我消费完了。
- 生产者在发布消息到RabbitMQ的交换机时,由于网络问题,导致没有真发送成功到交换机,消息会丢失吗?————————————————————会,因为生产者已经执行了发布消息的方法,就会认为已经发布过去了。可利用Confirm(确认)机制实现或利用其提供的事务操作机制【影响效率,这里不介绍它】。 PS:Confirm机制是保证了消息发送到Exchange上。而消费者监听的不是Exchange,而是队列。
- 生产者成功发布消息到交换机了,但是交换机分发消息到队列的时候出现了问题,导致没有真分发成功,消息会丢失吗?————————————————————会,因为消费者是与队列交互的,如果消息没有分发到队列,队列就没有消息。可利用Return机制实现。 PS:Return机制是保证Exchange的消息分发到队列。
持久化可以提高 RabbitMQ 的可靠性,以防在异常情况(重启、关闭、宕机等)下的数据丢失。
持久化可分为以下几种情况:
- 交换机的持久化
- 队列的持久化
- 消息的持久化
8.1 交换机的持久化
交换器的持久化是在声明交换器的时候,将 durable 属性设置为 true。如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换器就会被删除。对于长期使用的交换器来说,建议将其置为持久化。
//原生Api
/**
* 参数1:交换机名称
* 参数2:交换机类型
* 参数3:是否持久化 默认 false
*/
channel.exchangeDeclare("logs_direct", BuiltinExchangeType.DIRECT,true);
// springboot方式
@Bean
public TopicExchange payTopicExchange(){
/**
* 参数1:交换机名称
* 参数2:是否持久化 true是, 默认为 true
* 参数3:是否自动删除 true是, 默认为 false
*/
return new TopicExchange(exchangeMame,true,false);
}
8.2 队列的持久化
队列的持久化也是在声明队列的时候,将durable参数设置为true。如果队列不设置持久化,那么 RabbitMQ服务重启之后,队列就会被删除,既然队列都不存在了,队列中的消息也会丢失。
// 原生Api
/**
* 参数1:String queue 队列名称 如果队列不存在会自动创建
* 参数2:boolean durable 队列是否持久化 true 持久化 false 不持久化 默认:false
* 参数3:boolean exclusive 是否独占队列 true 独占队列 false 不独占 默认:true
* 参数4:boolean autoDelete 是否在消费完成后自动删除 true 自动删除 默认:true
* 参数5:Map<String, Object> arguments 额外附加参数
*/
channel.queueDeclare("hello-1",true,false,false,null);
@Bean
public Queue dlQueue(){
/**
* 参数1:队列名称
* 参数2:是否持久化 默认:true
*/
return new Queue(dlQueue,true);
}
8.3 消息的持久化
队列的持久化能保证其本身不会因重启、关闭、宕机的情况而丢失,但是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将消息设置为持久化。
在发送消息的时候,通过将BasicProperties中的属性deliveryMode(投递模式)设置为 2 即可实现消息的持久化。
// 原生Api
channel.basicPublish("exchangeName" , "routingKey",
new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.build(),
"ddf".getBytes());
// springboot方式
MessagePostProcessor messagePostProcessor = message -> {
MessageProperties messageProperties = message.getMessageProperties();
//设置消息持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
};
rabbitTemplate.convertAndSend("exchangeName","routingKey","消息内容",messagePostProcessor);
可以将所有的消息都设置为持久化,但是这样会严重影响 RabbitMQ 的性能。写入磁盘的速度比写入内存的速度慢得不只一点点。对于可靠性不是那么高的消息可以不采用持久化处理,以提高整体的吞吐量。在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做权衡。
一般的系统也用不到对消息进行持久化。不过交换机和队列的持久化还是要支持的。
上述几种方式我们只是保证了消息发送到交换机、队列时不会由于RabbitMQ的重启、关闭、宕机的情况而丢失消息。但如果消费者在消费的时候出现问题了呢?
对于消费者来说,如果在订阅消息的时候,将autoAck设置为了true,消费者接收到相关消息后,还没有正式处理消息逻辑之前,就出现了异常挂掉了,但消息已经被自动确认了,这样也算数据丢失。
对此有如下几种方式解决:
1. 可以用手动Ack,消费者成功消费后告诉mq我成功消费了。
2. 将消息重试并设置死信队列
9. SpringBoot实现手动ACK
RabbitMQ的确认机制是自动确认的,消费者收到消息后立马确认。自动确认可能会出现消费者最后没有成功消费信息的可能,所以我们一般需要手动确认,在成功消费后再告诉MQ。
如果消费者在消费过程中,出现了异常,我们可以调用basicNack或basicReject拒绝消息,让MQ重新发送。
在上述操作基础上更改
8.1 更改配置文件
spring:
rabbitmq:
host: 47.174.116.28
port: 5672
username: admin
password: admin
virtual-host: /test
listener:
simple:
acknowledge-mode: manual # 默认是auto manual:表示手动ACK
8.2 更改消费者
如果此时不更改消费者,虽然消费者能拿到消息消费,但是在MQ中会显示该消息未被消费。
@Component
public class Consumer {
@RabbitListener(queues = "boot-queue")
public void getMessage(Message message, Channel channel,String msg) throws IOException {
System.out.println("接收到消息:" + message);
// ======== 手动ACK 告诉rabbitMQ我消费成功 ===========
// void basicAck(long deliveryTag, boolean multiple) throws IOException; 用于确认当前消息
// deliveryTag :消息的编号 multiple:是否批量进行签收。设置为 false 时,则表示确认消费编号为 deliveryTag 的这一条消息,该参数为 true 时,则可以一次性确认消费小于等于 deliveryTag 值的所有消息。
// void basicNack(long deliveryTag, boolean multiple, boolean requeue); 用于否打当前消息
// deliveryTag:消息的编号
// multiple:设置为 false 时,则表示拒绝编号为 deliveryTag 的这一条消息,这时候 basicNack 方法和 basicReject 方法一样, multiple 参数设置为 true 则表示拒绝小于 deliveryTag 编号之前所有未被当前消费者确认的消息。
// 如:channel.BasicNack(3, true, false); 第一个参数DeliveryTag中如果输入3,则消息DeliveryTag小于等于3的,这个Channel的,都会被拒收
// requeue:参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果 requeue 参数设置为 false,则 RabbitMQ 会立即把消息从队列中移除。
// void basicReject(long deliveryTag, boolean requeue); 用于拒绝当前消息
// deliveryTag: 消息的编号 requeue: 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果 requeue 参数设置为 false,则 RabbitMQ 会立即把消息从队列中移除。
// 注:Basic.Reject命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用 Basic.Nack 这个命令。
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
10. 如何保证RabbitMQ的可靠性【confirm机制与return机制】
10.1 可靠性
这里的可靠性是指:保证生产者每次消息最终都能成功发送给消费者。
实际上消息队列可以说是没法百分之百保证可靠性的!RabbitMQ 提供的相关机制也只是在于缩小消息丢失的概率,或者说提供了消息丢失后的我们可以记录日志的功能。 在解决这些问题时有必要明白一点,其实小公司业务量不大,并发量不高的情况下这些问题是几乎不会发生的......即使偶尔出现,开发人员手动修复数据处理就好。所以可结合公司实际业务场景看有没有必要解决这些问题。
思考几个问题?
- 如果消息已经到达了RabbitMQ,还没发送给消费者时,RabbitMQ宕机了,消息会丢失吗?————————————————————不会,因为RabbitMQ的队列有持久化机制,若消息到了RabbitMQ已经到了队列那里了,就能持久化。当RabbitMQ重连的时候消息就能发送给消费者了。
- 消费者在消费消息的时候,还没消费完,此时消费者宕机了,消息会丢失吗?————————————————————不会,因为RabbitMQ提供了手动ACK。当成功消费完消息的时候再手动ACK告诉生产者我消费完了。
- 生产者在发布消息到RabbitMQ的交换机时,由于网络问题,导致没有真发送成功到交换机,消息会丢失吗?————————————————————会,因为生产者已经执行了发布消息的方法,就会认为已经发布过去了。可利用Confirm(确认)机制实现或利用其提供的事务操作机制【影响效率,这里不介绍它】。 PS:Confirm机制是保证了消息发送到Exchange上。而消费者监听的不是Exchange,而是队列。
- 生产者成功发布消息到交换机了,但是交换机分发消息到队列的时候出现了问题,导致没有真分发成功,消息会丢失吗?————————————————————会,因为消费者是与队列交互的,如果消息没有分发到队列,队列就没有消息。可利用Return机制实现。 PS:Return机制是保证Exchange的消息分发到队列。
10.2 SpringBoot整合RabbitMQ实现可靠性
10.2.1 Java方式实现RabbitMQ的Confirm机制
异步 confirm 方法的编程实现最为复杂,也是最高效的。
在客户端 Channel 接口中提供的addConfirmListener方法可以添加 ConfirmListener这个回调接口.
这个ConfirmListener 接口包含两个方法: handleAck、handleNack,分别用来处理 RabbitMQ 回传的 Basic.Ack、Basic.Nack 。
在这两个方法中都包含有两个参数 deliveryTag(标记消息的唯一有序序号) 、multiple(是否批量confirm true代表是)
普通confirm:同步等待确认,简单,但吞吐量非常有限。
批量confirm:批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪条消息出现了问题。
异步confirm:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微有些麻烦。
10.2.2 Java方式实现RabbitMQ的Return机制
在客户端 Channel 接口中提供的addReturnListener方法,可以添加 ReturnListener这个回调接口。
这个ReturnListener接口包含一个方法:handleReturn,用来处理交换机发送消息到队列失败,则执行此方法。
/*
* 参数1:响应code
* 参数2:响应文本
* 参数3:交换机名称
* 参数4:路由key
* 参数5:消息的基本属性集
* 参数6:消息内容
*/
public void handleReturn(int replyCode, String replyText,
String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body) throws IOException {
这里的basicPublish()方法要选带有mandatory的那个重载,并设置为true。mandatory的默认值是:false。
当 mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列时, 那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。
当 mandatory参数设置为 false 时,出现上述情形,则消息直接被丢弃。
10.2.3 SpringBoot方式实现RabbitMQ的confirm与return机制
编写配置文件
spring:
rabbitmq:
host: 47.144.116.28
port: 5672
username: admin
password: admin
virtual-host: /test
listener:
simple:
acknowledge-mode: manual # 默认是auto manual:表示手动ACK
publisher-confirm-type: simple # 开启confirm机制
publisher-returns: true # 开启return机制
# 关于publisher-confirm-type的取值
NONE: 禁用发布确认模式,是默认值
CORRELATED: 发布消息成功到交换器后会触发回调方法
SIMPLE: 经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待
broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker。
//创建关于 SpringBoot 实现可靠性 的配置类
@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct // 表示在初始化PublisherConfirmAndReturnConfig对象时,会执行该方法。
public void initMethod(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
// 基于confirm的回调 此方法用于监听消息是否发送到交换机
// correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
// ack:消息投递到broker 的状态,true表示成功。
// cause:表示投递失败的原因
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("消息已经送达到Exchange");
}else{
System.out.println("消息没有送达到Exchange");
}
}
@Override // 基于return的回调 // 消息没有送达队列时才会执行
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息没有送达到Queue");
}
}
生产者发布消息
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() throws IOException {
rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!");
}
消费者消费消息
@Component
public class Consumer {
@RabbitListener(queues = "boot-queue")
public void getMessage(Message message, Channel channel,String msg) throws IOException {
System.out.println("接收到消息:" + message);
// 手动ACK 告诉rabbitMQ我消费成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
11. 如何对消息设置过期时间
https://www.cnblogs.com/itlihao/p/14961534.html
12. 什么是死信交换机与死信队列
出处:https://juejin.cn/post/6976778266472366087
DLX ,全称为 Dead-Letter-Exchange ,可以称之为死信交换机。它其实也是一个正常的交换机,和一般交换机没什么区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时 RabbitMQ 就会自动地将这个消息新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。我们可以监听这个死信交换机的死信队列中的消息、以进行相应的处理。
当消息在一个队列中变成死信之后,如果你对该队列配置了死信队列,那么它能被重新发送到另一个交换机中,这个交换器就是 DLX(死信交换机),而于 DLX 绑定的队列就称之为死信队列。
那种消息会变成死信?
- 消息过期,也就是笔者在上篇提到的 TTL。消息在队列的存活时间超过所设置的 TTL 时间。
- 消息被拒绝。调用了 channel.basicNack 或 channel.basicReject方法,井且设置 requeue 参数为false。 requeue: 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果 requeue 参数设置为 false,则 RabbitMQ 会立即把消息从队列中移除。
- 队列的接收消息数长度达到最大长度。
基于消息过期配置死信队列案例:
mq:
queueBinding:
queue: prod_queue_pay
dlQueue: dl-queue
exchange:
name: exchang_prod_pay
dlTopicExchange: dl-topic-exchange
key: prod_pay
dlRoutingKey: dl-routing-key
//==============创建死信交换机并于死信队列进行绑定====================
@Value("${mq.queueBinding.exchange.dlTopicExchange}")
private String dlTopicExchange;
@Value("${mq.queueBinding.dlRoutingKey}")
private String dlRoutingKey;
@Value("${mq.queueBinding.dlQueue}")
private String dlQueue;
//创建死信交换机 【可以是任意类型的交换机,这里采用的是topic类型的】
@Bean
public TopicExchange dlTopicExchange(){
return new TopicExchange(dlTopicExchange,true,false);
}
//创建死信队列
@Bean
public Queue dlQueue(){
return new Queue(dlQueue,true);
}
//死信队列与死信交换机进行绑定
@Bean
public Binding BindingErrorQueueAndExchange(Queue dlQueue, TopicExchange dlTopicExchange){
return BindingBuilder.bind(dlQueue).to(dlTopicExchange).with(dlRoutingKey);
}
//==============创建要执行我们义务的交换机与队列====================
//==============我们要基于队列设置过期时间=========================
@Value("${mq.queueBinding.queue}")
private String queueName;
@Value("${mq.queueBinding.exchange.name}")
private String exchangeName;
@Value("${mq.queueBinding.key}")
private String key;
private final String dle = "x-dead-letter-exchange"; // 必须叫这个名称
private final String dlk = "x-dead-letter-routing-key"; // 必须叫这个名称
private final String ttl = "x-message-ttl";
/**
* 业务队列
* @return
*/
@Bean
public Queue payQueue(){
Map<String,Object> params = new HashMap<>();
//设置队列的过期时间
//队列中所有消息都有相同的过期时间
params.put(ttl,10000);
//==============================================================声明当前队列绑定的死信交换机============================================================================================
params.put(dle,dlTopicExchange);
//声明当前队列的死信路由键 如果没有指定,则使用原队列的路由键。因为我们指定的死信交换机是topic,所以会有路由键。如果是finaot模式,就可不配路由键。
params.put(dlk,dlRoutingKey);
return QueueBuilder.durable(queueName).withArguments(params).build();
}
@Bean
public TopicExchange payTopicExchange(){
return new TopicExchange(exchangeName,true,false);
}
//队列与交换机进行绑定
@Bean
public Binding BindingPayQueueAndPayTopicExchange(Queue payQueue, TopicExchange payTopicExchange){
return BindingBuilder.bind(payQueue).to(payTopicExchange).with(key);
}
// 生产者发送消息
@Component
@Slf4j
public class RabbitSender {
@Value("${mq.queueBinding.exchange.name}")
private String exchangeName;
@Value("${mq.queueBinding.key}")
private String key;
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String msg){
log.info("RabbitSender.send() msg = {}",msg);
// 将消息发送给业务交换机
rabbitTemplate.convertAndSend(exchangeName,key,msg);
}
}
启动服务,可以看到同时创建了业务队列、业务交换机以及死信队列、死信交换机。而且可以看到业务队列上带了 DLX、DLK标签。
然后调用接口:http://localhost:8080/?msg=红红火火 ,消息会被发送到 prod_queue_pay这
如果 10s 内没有消费者消费这条消息,那么判定这条消息为过期消息。由于设置了 DLX ,过期时消息被丢给 dlxExchange 交换机中,根据所配置的dlRoutingKey 找到与 dlxExchange 匹配的队列 dlQueue后,消息被存储在 dlxQueue这个死信队列中。
13. RabbitMQ 的重试机制
原文:https://juejin.cn/post/6979390382371143694#heading-0
前言:以下2种情况,是没有开启手动ack的前提下。
情况一:消费者在处理消息的过程中可能会发生异常,此时,rabbitMQ会不断重试。由于没有我们没有给rabbirMQ明确重试次数,会造就无限重试,这是一个致命的问题,最终导致宕机。
情况二:手动告诉MQ拒绝消息,channel.basicNack()并设置requeue为true。 requeue:参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者。
但是为了保证消息被消费与解决上述问题,我们控制MQ让它知道他该重试几次。所以一般有如下方式解决:
- 我们一般用Springboot提供retry功能告诉rabbitMQ重试几次,还不行最后将配合死信队列来解决。【默认重试后还是失败时,会自动删除该消息,就导致消息丢失了】
- 用redis,使用redis记数,若超过指定次数,直接拒绝消息,并且设置不让其回到队列。并把该消息记录下,后期来由人工处理。==
下面展示retry功能的开启:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者进行重试 默认情况下是 false ====配置yml重试策略=====
max-attempts: 5 # 最大重试次数
initial-interval: 3000 # 重试时间间隔
default-requeue-rejected: false # 重试次数超过上面的设置后,是否丢弃 默认true,指定为false时,应编写相应代码将该消息加入死信队列
# 重试并不是RabbitMQ重新发送了消息,仅仅是消费者内部进行的重试,换句话说就是重试跟mq没有任何关系
14. 如何保证RabbitMQ重复消费消息
14.1 为什么要解决重复消费问题
幂等性操作:就是指比如删除操作,这类操作执行多少次都没影响。
非幂等性操作:添加操作,而且数据库还是自增的,这类操作执行多次和执行一次差别是很大的!
所以,针对非幂等性操作,一定要保证消息不被重复消费。
14.2 重复消费消息的原因
- 生产时消息重复
- 消费时消息重复
生产消息时重复生产
原文:https://www.cnblogs.com/zhixie/p/13444213.html
由于生产者发送消息给MQ,在MQ确认的时候出现了网络波动,生产者没有收到确认,实际上MQ已经接收到了消息。这时候生产者就会重新发送一遍这条消息。
生产者中如果消息未被确认,或确认失败,我们可以使用定时任务+(redis/db)来进行消息重试。
消费消息时重复消费
情况一:消费者消费成功后,再给MQ确认的时候出现了网络波动,MQ没有接收到确认【手动ack】。为了保证消息被消费(我们配置了重试机制),MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。
情况二:生产者将消息发送给消费者后,断开了连接,等连接恢复后,生产者又重新发送消息给消费者。这时候消费者就接收到了两条一样的消息。
解决思路:
让每个消息携带一个全局的唯一ID,引入redis,在消息被消费之前,将消息的唯一ID放到redis中,并设置它的值,如值为0:正在执行业务,值为1:执行业务成功。
注意:一个比较极端的情况,消费者设置redis值为0后,执行业务,出现死锁,一直执行下去。所以,我们可以为这个redis设置一个过期时间,比如10秒之后,这个redis就消失。
具体消费过程为:
- 消费者获取到消息后先根据id去查询redis/db是否存在该消息
- 如果不存在,则设置redis的值为0(执行业务中,并设置过期时间),消费完毕后设置redis值为1(执行业务成功,并设置过期时间),并ack告诉MQ。
- 如果存在,判断其状态,若为1则证明消息被消费过,直接ack,若为0不执行任何操作。
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者进行重试
max-attempts: 5 # 最大重试次数
initial-interval: 3000 # 重试时间间隔
redis:
host: 192.168.199.109
port: 6379
// 生产者发布消息
@Test
void contextLoads() throws IOException {
// 用于创建消息的唯一标识
CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
// rabbitMQ的convertAndSend() 方法就有一个带消息唯一标识的重载
rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!",messageId);
System.in.read();
}
@Autowired
private StringRedisTemplate redisTemplate;
@RabbitListener(queues = "boot-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
//0. 获取MessageId 是从消息头里面的 spring_returned_message_correlation 获得的
String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
//1. 设置key到Redis setIfAbsent:就相当于 setnx【在指定的 key 不存在时,为 key 设置指定的值】
if(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {
//2. 消费消息
System.out.println("接收到消息:" + msg);
//3. 设置key的value为1
redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
//4. 手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else {
//5. 获取Redis中的value即可 如果是1,手动ack
if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
5. 延迟队列
延迟队列:消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
注意:在RabbitMQ中,没有延迟队列的概念,但是我们可以利用ttl和死信队列达到延迟的效果。这种需求往往在某些应用场景中出现。当然还可以使用插件。
实现原理:
- 生产者生产一个消息到队列1
- 队列1中的消息过期转发到死信队列
- 消费者获取死信队列的信息进行消费
场景举例:
下订单后,30分钟如果还没支付,则取消订单回滚库存。
5.1 使用插件实现延迟队列
如果我现在有不同的场景,比如分别5s、10s、15s之后延迟消费,那就需要创建三个队列。每次有一个不同的时间段的需求过来,我都需要创建一个队列,这肯定不行。
https://juejin.cn/post/6977516798828609567#heading-9