一、为什么使用消息队列?
为什么使用?其实就是在实际业务中,有个具体的场景,如果不使用MQ,可能会有很多麻烦,用了MQ之后带给我们很多好处。场景其实有很多,常见的有三个:1.解耦、2.异步、3.削峰
1.解耦
A系统要发送一条数据到BCD三个系统,接口调用发送,如果新增个E系统,也需要这条数据呢?如果C系统现在不需要这条数据了呢?如果A系统又要发送第二种数据了呢?而且A系统要时刻关注BCD系统的状态,BCD挂了怎么办?要不要重发?要不要把数据存起来?
如果系统中存在类似情况,你可以考虑这个调用是不是必须要同步调用?如果用MQ来解耦,会省去很多麻烦。
2.异步
A系统接收一个请求,需要在ABCD四个数据库进行写库操作,A本地写库需要30ms,B库需要100ms,C库需要200ms,C库需要200ms,则一共需要30+100+200+200ms。如果用MQ来进行异步操作,则只需要30ms后即可返回,BCD异步写入即可,A不用考虑。
3.削峰
每天0~11点,A系统风平浪静,每秒并发100,12点时,并发暴增到10000,系统每秒只能处理1000个请求,怎么办?这时可以用MQ进行流量削峰。
二、消息队列的优缺点
优点已经说了,解耦,异步,削峰,接下来说消息队列的缺点:
1.系统可用性降低
本来只需要考虑系统本身可用性,现在引入了MQ,如果MQ挂了怎么办?
2.系统复杂性提高
MQ加进来了,消息丢失怎么办?重复投递怎么办?重复消费怎么办?消息的顺序性怎么保证?
3.一致性问题
A处理完返回成功了,调用者以为请求成功了,可是BC成功,D失败了怎么办,数据就不一致了。
所以,消息队列其实结构非常复杂,引入它会带来很多好处,但是同时需要规避很多问题。
三、应对MQ缺点的办法
1.消息丢失怎么办?(消息的可靠性传输)
消息的丢失可能会出现在三个地方:
(1)生产者弄丢数据
生产者将数据发送到RabbitMQ的时候,可能数据就在半路给搞丢了,因为网络啥的问题,都有可能。怎么解决?
①事务:生产者发送数据之前开启RabbitMQ事务(channel.txSelect),然后发送消息,如果消息没有成功被RabbitMQ接收到,那么生产者会收到异常报错,此时就可以回滚事务(channel.txRollback),然后重试发送消息;如果收到了消息,可以提交事务(channel.txCommit)。但是问题是,RabbitMQ事务机制一搞,基本上吞吐量会下来,因为太耗性能。
②confirm模式:在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的id,然后如果写入了RabbitMQ中,RabbitMQ会给你回传一个ack消息,告诉你说这个消息ok了。如果RabbitMQ没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
所以一般在生产者这块避免数据丢失,都是用confirm机制的。
(2)Mq弄丢数据
就是RabbitMQ自己弄丢了数据,这个你必须开启RabbitMQ的持久化,就是消息写入之后会持久化到磁盘,哪怕是RabbitMQ自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。
设置持久化有两个步骤:
①第一个是创建queue和交换器的时候将其设置为持久化,这样就可以保证RabbitMQ持久化相关的元数据,但是不会持久化queue里的数据;
②第二个是发送消息的时候将消息的deliveryMode设置为2,就是将消息设置为持久化的,此时RabbitMQ就会将消息持久化到磁盘上去
必须要同时设置这两个持久化才行
持久化可以和生产者的confirm结合,当持久化成功后,再ack生产者。如果持久化之前RabbitMQ挂了,生产者没收到ack,会重发。
(3)消费者弄丢数据
RabbitMQ如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ认为你都消费了,这数据就丢了。
这个时候得用RabbitMQ提供的ack机制,简单来说,就是你关闭RabbitMQ自动ack,可以通过一个api来调用就行,然后每次你自己代码里确保处理完的时候,再程序里ack一把。这样的话,如果你还没处理完,不就没有ack?那RabbitMQ就认为你还没处理完,这个时候RabbitMQ会把这个消费分配给别的consumer去处理,消息是不会丢的。
2.消费者顺序消费
从根本上说,异步消息是不应该有顺序依赖的。在MQ上估计是没法解决。要实现严格的顺序消息,简单且可行的办法就是:保证生产者- MQServer -消费者是一对一对一的关系。
如果有顺序依赖的消息,要保证消息有一个hashKey,类似于数据库表分区的的分区key列。保证对同一个key的消息发送到相同的队列。A用户产生的消息(包括创建消息和删除消息)都按A的hashKey分发到同一个队列。只需要把强相关的两条消息基于相同的路由就行了,也就是说经过m1和m2的在路由表里的路由是一样的,那自然m1会优先于m2去投递。而且一个queue只对应一个consumer
3.消息的重复
分为两大类情况:1、生产者消息重复发送; 2.MQ向消费者投递时重复投递
终极解决办法:幂等性
1. MVCC:
多版本并发控制,乐观锁的一种实现,在生产者发送消息时进行数据更新时需要带上数据的版本号,消费者去更新时需要去比较持有数据的版本号,版本号不一致的操作无法成功。例如博客点赞次数自动+1的接口: public boolean addCount(Long id, Long version); update blogTable set count= count+1,version=version+1 where id=321 and version=123
每一个version只有一次执行成功的机会,一旦失败了生产者必须重新获取数据的最新版本号再次发起更新。
2. 去重表:
利用数据库表单的特性来实现幂等,常用的一个思路是在表上构建唯一性索引,保证某一类数据一旦执行完毕,后续同样的请求不再重复处理了(利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。)
以电商平台为例子,电商平台上的订单id就是最适合的token。当用户下单时,会经历多个环节,比如生成订单,减库存,减优惠券等等。每一个环节执行时都先检测一下该订单id是否已经执行过这一步骤,对未执行的请求,执行操作并缓存结果,而对已经执行过的id,则直接返回之前的执行结果,不做任何操作。这样可以在最大程度上避免操作的重复执行问题,缓存起来的执行结果也能用于事务的控制等。