丢消息的情况有多种,可能是生产者丢了,可能是MQ丢了,可能是消费者丢了。
1、rabbitMQ 一般是用来承载核心业务的,数据是绝对不能丢的。解决方案的核心是接收消息方给发送消息方返回ack
一句话总结:生产者开启confirm模式 + MQ持久化消息 + 消费者关闭autoAck,手动提交 ack
三种情况丢数据:
(1)生产者弄丢了数据:
场景:写消息的过程中,消息都没到rabbitMQ,在网络传输过程中就丢了;或者是消息到了rabbitMQ但是MQ内部出错了没保存;
推荐 解决方案1:开启confirm模式:在生产者那里设置开启confirm模式后,生产者发送消息给rabbitMQ,
若rabbitMQ接收到了消息,则回调生产者提供的回调接口,返回一个ack,通知生产者说消息已经收到了;
若rabbitMQ在接收消息时报错了,则回调生产者的回调接口,返回一个nack,通知生产者消息接收失败,可以尝试重发。
此种方案是异步的,发消息后不会阻塞生产者发送新的消息。
解决方案2:选择用rabbitMQ提供的事务功能:生产者发送数据之前开启rabbitMQ事务(channel.txselect),然后发送消息,
若rabbitMQ接收到消息,那么可以提交事务(channel.txCommit);
若rabbitMQ未接收到消息,生产者会收到异常报错,此时就可以回滚事务(channel.txRollback),然后重发
问题是:rabbitMQ事务机制一搞,吞吐量会下来,太耗性能
因为事务机制是同步的,生产者发送消息会同步阻塞卡住等待是成功还是失败,那吞吐量就降下来了。
(2)rabbitMQ弄丢了数据:
场景:rabbitMQ接受到了消息之后先暂存在自己的内存里,结果消费者还没来得及消费,rabbitMQ自己就挂掉了,就导致暂存在内存里的数据丢失了。
解决方案是:开启rabbitMQ的持久化。就是消息写入后会持久化到磁盘,哪怕rabbitMQ挂了,恢复之后会自动读取之前存储的数据
两个步骤设置持久化:1 创建queue时将其设置为持久化的,可保证rabbitMQ持久化queue的元数据;
2发送消息时将消息的deliveryMode设置为2,就是将消息设置为持久化的,此时rabbitMQ会将消息写入磁盘;
注意:必须要同时设置这两个持久化,才会在rabbitMQ重启时从磁盘恢复queue及其数据。
持久化可以结合生产者的confirm机制使用,只有当消息被持久化到磁盘后,才通知生产者ack了,否则生产者都是可以重发消息的。
(3)消费者弄丢了数据:
场景:若打开了消费者的autoAck机制。消费者消费到了这个消息就自动autoAck了,通知rabbitMQ说这消息已经消费了,但是还没来得及处理(例如写库)自己就挂了,
但是rabbitMQ以为这个消息已经消费了处理完了。
解决方案:消费者将autoAck关闭,每次确定处理完一条消息后,在发送ack给rabbitMQ。
若rabbitMQ没收到你发的ack消息,会将这条消息重新分配给其他消费者处理。
2、kafka
在用kafka作为消息队列使用是,主要是要处理消费端和kafka环节丢失数据的问题。
一句话总结:消费端关闭自动提交offset,改为手动提交offset + MQ设置4个参数
(1)消费端弄丢了数据
场景:在解决重复消费问题的时候,我们知道消费者会自动提交offset到zookeeper,来通知说已经消费到哪条数据了,但是此时还没处理这个消息就自己挂了,此时这条消息就丢了。因为已经自动提交了offset,kafka会以为你已经消费过了,实际上你没处理成功。
解决方案:既然是因为消费端自动提交offset造成的的可能丢失消息,那就关闭自动提交offset,在处理完消息后再自己手动提交offset,就可以保证数据不会丢。
但是可能有另一种情况就是,处理完消息了,offset还没来得及提交就自己挂了,会重复消费一次,此时要自己去保证幂等性。
(2)kafka弄丢了数据
kafka高可用性的时候我们知道,一个topic可以被划分成多个partition,每个partition又有多个replica副本存在于多台机器broker节点上。
场景:那么,broker的某个作为leader的partition如果挂了,而他的其他follower还没同步完数据,此时根据kafka本身的机制,会从剩下的follower中重新选举出一个新的leader来用,那么新选出的leader,可能数据就是丢失了某些的。
解决方案:一般要求设置4个参数:
1 给这个topic设置replication.factor参数:值必须大于1,要求每个partition必须至少有2个副本;
2 在kafka服务端设置min.insync.replicas参数:值必须大于1,这是要求一个leader至少感知到有至少一个follower还是跟自己保持联系的,没掉队,这样才能确保leader挂了还有一个follower;
3 在producer端设置acks = all:要求每条数据必须的写入所有replica后,才能认为是写入成功
4 在producer端设置retries=MAX:要求一旦写入失败,就无限重试,卡在这里了。
设置了3即保证了生产者不会丢数据,因为所有副本都写入了数据。