消息队列
1.1 什么是消息队列
我们可以把消息队列比作是一个存放消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。目前使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ。
1.2 为什么要用消息队列
使用消息队列主要有两点好处:
1.通过异步处理提高系统性能(削峰、减少响应所需时间;
2.降低系统耦合性。【结合你自己的项目来回答】
1.2.1 通过异步处理提高系统性能(削峰、减少响应所需时间)
消息队列具有很好的削峰作用的功能——即通过异步处理,将短时间高并发产生的事务消息存储在消息队列中,从而削平高峰期的并发事务。举例:在电子商务一些秒杀、促销活动中,合理使用消息队列可以有效抵御促销活动刚开始大量订单涌入对系统的冲击。
1.2.2 降低系统耦合性
我们知道如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展
性无疑更好一些。
我们最常见的事件驱动架构类似生产者消费者模式,在大型网站中通常用利用消息队列实现事件驱动结构。如下图所示:
消息队列使利用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅
消息。
从上图可以看到消息发送者(生产者)和消息接受者(消费者)之间没有直接耦合,消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接受者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计。
1.3 使用消息队列带来的一些问题
1.3.1 系统可用性降低
系统可用性在某种程度上降低,为什么这样说呢?在加入MQ之前,你不用考虑消息丢失或者说MQ挂掉等等的情况,但是,引入MQ之后你就需要去考虑了!
1.3.2 系统复杂性提高
加入MQ之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题!
1.3.3 一致性问题
我上面讲了消息队列可以实现异步,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了!
2 JMS VS AMQP
2.1 JMS
JMS(JAVA Message Service,java消息服务)是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。JMS(JAVA Message Service,Java消息服务)API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。ActiveMQ 就是基于JMS 规范实现的。
2.1.1 JMS两种消息模型
l 点到点(P2P)模型
使用队列(Queue)作为消息通信载体;满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。
比如:我们生产者发送100条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)
l 发布/订阅(Pub/Sub)模型
发布订阅模型(Pub/Sub)使用主题(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的。
2.1.2 JMS 五种不同的消息正文格式
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
l StreamMessage --Java原始值的数据流
l MapMessage--一套名称-值对
l TextMessage--一个字符串对象
l ObjectMessage--一个序列化的Java对象
l BytesMessage--一个字节的数据流
2.2 AMQP
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容JMS。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。
RabbitMQ 就是基于AMQP 协议实现的。
2.3 JMS vs AMQP
总结:
l AMQP 为消息定义了线路层(wire-level protocol)的协议,而JMS所定义的是API规范。在Java 体系中,多个client均可以通过JMS进行交互,不需要应用修改代码,但是其对跨平台的支持较差。而AMQP天然具有跨平台、跨语言特性。
l JMS 支持TextMessage、MapMessage 等复杂的消息类型;而AMQP 仅支持byte[] 消息类型(复杂的类型可序列化后发送)。
l 由于Exchange 提供的路由算法,AMQP可以提供多样化的路由方式来传递消息到消息队列,而JMS 仅支持队列和主题/订阅方式两种。
2.4 市面上比较常用的消息队列的对比
3 ActiveMQ的高可用
ActiveMQ提供了master-slave、broker cluste两种部署方式结合来满足分布式和高可用的需求。
3.1 master-slave+消息持久化解决高可用
使用ZooKeeper(集群)注册所有的ActiveMQ Broker。只有其中的一个Broker可以提供服务,被视为Master,其他的Broker 处于待机状态,被视为Slave。如果Master因故障而不能提供服务,Zookeeper会从Slave中选举出一个Broker充当Master。
Slave连接Master并同步他们的存储状态,Slave不接受客户端连接。所有的存储操作都将被复制到连接至Master的Slaves。如果Master宕了,得到了最新更新的Slave会成为Master。故障节点在恢复后会重新加入到集群中并连接Master进入Slave模式。
是不是觉得和Redis Sentinel主从高可用的方式很像,这里的zookeeper起到的作用和reids里的sentinel作用差不多。
ActiveMQ有三种持久化方式(在activemq.xml可配):分别是基于共享文件,基于JDBC,基于LevelDB
3.2 Broker-Cluster解决负载和分布式
Master-Slave的方式虽然能解决多服务热备的高可用问题,但无法解决负载均衡和分布式的问题。Broker-Cluster的部署方式就可以解决负载均衡的问题。
Broker-Cluster部署方式中,各个broker通过网络互相连接,并共享queue。当broker-A上面指定的queue-A中接收到一个message处于pending状态,而此时没有consumer连接broker-A时。如果cluster中的broker-B上面由一个consumer在消费queue-A的消息,那么broker-B会先通过内部网络获取到broker-A上面的message,并通知自己的consumer来消费。
3.3 MQ异常导致的问题
发送方异常:使用消息的持久化机制保障重启即可
接收方异常:可能会出现重复消费问题,只要我们能搞高正保证幂等性,那么重复消费也无所谓。
给你举个例子吧。假设你有个系统,消费一条往数据库里插入一条,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下已经消费过了,直接扔了,不就保留了一条数据?
一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性
幂等性,我通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。
其实还是得结合业务来思考,我这里给几个思路:
(1)比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update一下好吧
(2)比如你是写redis,那没问题了,反正每次都是set,天然幂等性
(3)比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的id,类似订单id之类的东西,然后你这里消费到了之后,先根据这个id去比如redis里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个id写redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
还有比如基于数据库的唯一键来保证重复数据不会重复插入多条,我们之前线上系统就有这个问题,就是拿到数据的时候,每次重启可能会有重复,因为kafka消费者还没来得及提交offset,重复数据拿到了以后我们插入的时候,因为有唯一键约束了,所以重复数据只会插入报错,不会导致数据库中出现脏数据
如何保证MQ的消费是幂等性的,需要结合具体的业务来看
3.1 队列积压如何解决
3.1.1 没有设置过期时间
一般这个时候,只能操作临时紧急扩容了,具体操作步骤和思路如下:
1)先修复consumer的问题,确保其恢复消费速度,然后将现有cnosumer都停掉
2)新建一个topic,partition是原来的10倍,临时建立好原先10倍或者20倍的queue数量
3)然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue
4)接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据
5)这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据
6)等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息
3.1.2 设置了过期时间
假设你用的是ActiveMQ是可以设置过期时间的,就是TTL,
1)message过期则客户端不能接收
2)ttlCeiling:表示过期时间上限(程序写的过期时间不能超过此时间,超过则以此时间为准)
3)zeroExpirationOverride:表示过期时间(给未分配过期时间的消息分配过期时间
如果消息在queue中积压超过一定的时间就会被rabbitmq给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在mq里,而是大量的数据会直接搞丢。
这个情况下,就不是说要增加consumer消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后再重新导入。