1.概述
我们知道在使用RabbitMQ时,生产者将消息发布出去之后,消息是否顺利到达broker代理服务器呢?默认情况下发布操作没有任何信息返回给生产者,也就是生产者是不知道消息有没有顺利到达broker。如果在消息到达broker之前已经丢失了,那发布的消息就更不会到达队列并被消费者消费。如果出现上述情况,就会造成生产者发布的消息与消费者消费的消息不一致的问题。
2.RabbitMQ自带的解决方案
RabbitMQ提供以下两种方式解决上述问题。
2.1事务机制
事务机制能够解决生产者与broker之间消息确认的问题,只有消息成功被broker接受,事务才能提交成功,否则就进行事务回滚操作并进行消息重发。但是使用事务机制会降低RabbitMQ的消息吞吐量,不适用于需要发布大量消息的业务场景。
2.2消息确认机制
生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了。
与事务机制相比较,确认机制采用异步回调方式来处理确认消息,性能得到了较大的提升,可以确保数据同步的一致性。
3.新的解决方案
为了最大限度的提升MQ数据同步的性能,自己制定了一个更好的解决方案,现分享如下。
解决方案:MQ+Redis+接口。
MQ:作为消息队列中间件负责同步数据;
Redis:负责存储每天(或每批次等)生产者已发送数据的唯一标识,即全量存储已发送数据唯一标识,方便消费者检查并同步失败数据;
接口:作为补偿措施,用于消费者获取同步失败的数据。
下面分两个使用场景说明。
4.单表数据同步场景
(1)生产者发送数据至MQ Server,同时记录已发送数据的唯一标识(如id),每同步一批次(比如N条)后,再把该批次的唯一标识存入Redis。
(2)存储唯一标识的key及过期时间,需要根据数据的同步策略具体制定。比如:若每天同步一次数据,就可以以“队列名称+日期”为key,把这一天所有生产者已发送数据的唯一标识存入同一个list中。
(3)消费者消费数据后,负责检查已消费数据唯一标识与Redis中唯一标识是否有差异,若存在差异,则说明有数据同步失败。
(4)对于同步失败数据,消费者调用生产者提供的接口实时获取。接口以唯一标识为入参,并控制每次请求的数据量,比如每次最多同步200条等。
5.复杂业务数据同步场景
复杂业务数据是指生成者需要一定的业务逻辑处理产生的数据。
关于复杂业务数据的同步,考虑到同步失败的场景,需要持久化这类数据。然后按单表数据同步场景进行数据的同步。