一、可靠传输
本篇文章主要讲 RabbitMQ 如何保证消息的可靠传输,所以在讲RabbitMQ的实现之前,我们需要先来搞懂一个问题,就是什么是消息的可靠传输。
在 RabbitMQ 中,一个消息从产生到被消费大致需要经过三个步骤,即生产者生产消息,消息投递到 RabbitMQ,RabbitMQ 再将消息推送消费者(或者是消费者拉取),最终消费者将这条消息成功消费。
所以消息丢失也可以划分为三种情况——生产者、消息队列、消费者,如下图所示:
所以消息的可靠传输,就是确保消息能够百分百从生产者发送到服务器,再从服务器发送到消费者。接下来我们就针对这三种情况分别讨论解决方案。
二、生产者投递消息失败
1. 事务机制
使用 RabbitMQ 的事务功能,此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务 channel.txSelect
,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务 channel.txRollback
,然后重试发送消息;如果收到了消息,那么可以提交事务 channel.txCommit
。
// 开启事务
channel.txSelect();
try {
// 发送消息
} catch(Exception e) {
channel.txRollback();
// 重发消息
}
// 提交事务
channel.txCommit();
事务机制可以基本确保生产者投递消息成功,但是这种方式有比较大的缺点,基本上 RabbitMQ 事务机制(同步)一搞,基本上吞吐量会下来,因为太耗性能了。
2. confirm机制
针对上述问题,如果还要确保 RabbitMQ 生产者的消息正确投递,可以开启 confirm 模式,在生产者端设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ACK 消息,告诉你说这个消息 ok 了。
如果 RabbitMQ 没能处理这个消息,会回调你一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
try {
channel.confirmSelect(); //将信道置为 publisher confirm 模式
// 之后正常发送消息
channel.basicPushlish("exchange", "routingKey", null,
"publisher confirm test".getBytes());
if (!channel.waitFormConfirms()) {
System.out.println("Send message failed");
// do something else...
} catch (InterruptedException e) {
e.printStackTrace();
}
}
事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收之后会异步回调你一个接口通知你这个消息接收到了。
所以一般生产者到 RabbitMQ 这块避免数据丢失,会采用 confirm 机制更多些。
三、消息队列自身丢失
就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。
设置持久化有两个步骤:
-
创建 queue 的时候将其设置为持久化
这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 里的数据。
-
发送消息时将消息的 deliveryMode 设置为 2
就是将消息设置为持久化,此时 RabbitMQ 就会将消息持久化到磁盘上去。
必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。
同时持久化也可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ACK,你也是可以自己重发的。
注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存中的数据丢失。
四、消费者宕机
对于消费者端所产生的情况就是:消费者成功接收到消息,但是还未将消息处理完毕就宕机了。针对这种情况,可以利用 RabbitMQ 提供的 消息确认机制。
1. 消息确认机制
为了保证消息从队列可靠地到达消费者,RabbitMQ 提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数:
- 当 autoAck 等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息;
- 当 autoAck 等于 false 时,RabbitMQ 会等待消费者显示地回复确认信号后才从内存(后者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。
所以对于消费者可能发生宕机地情况,我们可以将 autoAck 参数置为 false,消费者就有足够的时间处理这条消息,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为 RabbitMQ 会一直等待并且持有这条消息,直到消费者显示调用 Basic.Ack
命令为止。
当 autoAck 参数设置为 false,对于 RabbitMQ 服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费者的消息,另一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也可能还是原来的那个消费者。