zoukankan      html  css  js  c++  java
  • RocketMQ集群

    一、Broker主从同步模式:异步复制、同步双写

    二、刷盘策略:同步刷盘、异步刷盘

    三、消息存储

      1. 生产者发送消息,Broker收到后,将消息持久化,返回ACK。

      2. 虽然消费方式有push和pull,但在RocketMQ中实际上都是由消费者主动去获取的,因为当消费者非常多时,服务端的压力会非常大。

      对于push,消费者封装了轮询过程,并注册到监听器中,取到消息后,唤醒MessageListener的consumeMessage()来消费。消费者轮询方式为长轮询,若没有数据,服务端会阻塞请求,而不会立刻返回,直到有数据或超时才返回给客户端。

      对于pull,获取消息的过程由用户实现。对Topic的MessageQueue集合遍历,针对每一个MessageQueue批量取消息,每取一次,记录下下次要取的offset。

      3. 消息的删除依赖CommitLog的清理机制

      push实时性高,增加了服务端负载,如果push速度过快,消费端会出现很多问题。pull控制权在消费端,但是拉取的时间间隔不好控制,容易造成消息积压或者多次空请求。

    四、存储介质

      1. 关系型数据库 2.文件系统

      RocketMQ采用文件系统,顺序写保证存储效率

      消息存储结构

      RocketMQ的存储由consumequeue和commitlog配合完成的

      1. commitlog存放真正的消息文件,请参考:https://blog.csdn.net/GAMEloft9/article/details/100562191

      2. consumequeue类似索引,存储的是消息在commitlog中的地址。每个messageQueue有对应的consumeQueue文件。

      消费者只需要遍历consumeQueue就能快速找到需要消费的消息,而不是从commitLog中遍历。

      按文件写入,文件对象是MappedFile,大小1G(1024*1024*1024=1073741824),写满一个写下一个。文件名记录偏移量,初始文件00000000000000000000,第二个文件为00000000001073741824。新文件名在前一个文件名上加1073741824。

      

      生产者的消息先持久化到commitLog中,然后通过异步线程记录到consumequeue中。

      consumequeue中每条数据的结构:offset记录位移,size记录消息长度,tagCode记录的是tag的哈希值

      

      3. IndexFile文件(底层实现是HashMap)

      通过key(生产者设置的msgId和用户设置的keys)可以快速定位到消息在commitlog中的物理偏移量,通过文件名筛选出存放的文件,然后根据物理偏移量-文件名得到起始位置,消息前4个字节存储的是消息大小,可以快速定位到结束位置。 

      4. CommitLog清理机制 

    1. 按照时间清理,默认清理三天前的commitLog 文件
    2. 按照磁盘水位清理,当已用容量达到75%时,清理最老的commitLog文件    

    五、零拷贝技术

      两种方式:

      1. 使用mmap+write方式      

      优点:即使频繁使用,使用小块文件传输,效率也很高

      缺点:不能很好的利用DMA方式,会比sendfile多消耗CPU,内存安全性控制复杂,需要避免JVM Crash问题

      2. 使用sendfile方式

      优点:可以利用DMA方式,消耗CPU较少,大块文件传输效率高,无内存安全问题

      缺点:小块文件效率低于mmap方式,只能是BIO方式传输,不能使用NIO

    六、刷盘机制  

      消息存储时,先将消息存储到内存,再根据不同的刷盘策略进行刷盘

      同步刷盘:同步调用MappedByteBuffer的force()方法,同步等待刷盘结果,进行刷盘结果返回告知发送端

      异步刷盘:立刻返回发送端发送成功,有单独的线程执行刷盘

      请见:https://www.jianshu.com/p/6ef2f03c0ff6

    七、高可用机制

      7.1 发送消息的高可用

      创建topic时,将topic的多个Message Queue建立在多个Broker组上。当一个Broker组的Master不可用时,只要其他组的Master可用,则依然可以发送消息。目前RocketMQ不支持自动主从切换。如需要将Slave转化成Master,需要手动停止Slave角色的Broker,更改配置文件,用新的配置启动Broker。

      消息发送时默认选择重试机制(默认2次)保证发送消息的高可用。非同步发送模式下,只发送一次不会重试 DefaultMQProducerImpl#sendDefaultImpl

    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

      7.2 消费消息的高可用

      消费者可以从Master和Slave读取消息,支持自动切换

      7.3 消息主从复制

        7.3.1 同步复制

          等Master和Slave均写入成功后,才反馈给客户端写入成功。Master和Slave数据高度一致,容易恢复,但是会增大数据写入延迟,降低系统吞吐量。        

        7.3.2 异步复制   

           Master写入成功后,即反馈客户端写入成功状态。

        结合刷盘机制,一般生产中将刷盘设置为异步刷盘,主从复制设置为同步复制。

    八、消息投递机制

      8.1 默认投递方式:基于Queue队列轮询算法投递

      保证每个队列的消息数量尽可能均匀。

      发送端发送消息时,轮询Topic下所有的MessageQueue,依次发送(DefaultMQProducerImpl#selectOneMessageQueue方法)。注意不同的queue可能存在不同的broker上,在源码中有优化,轮询时,会尽量避免上次失败的broker

       8.2 增强版

      某些Queue由于自身数量积压等问题,投递时间较长,会影响后续的投递效果。所以RocketMQ每发送一个MQ消息后,都会统计一下时间延迟,选择延迟最小的发送策略

       8.3 顺序消息的投递方式

      生产者将消息放置在同一个queue队列中,消费者采用一定的策略(一个线程独立处理一个queue)保证顺序性

         8.3.1. MessageQueueSelector#select方法选择消息投递队列,有3个实现类

          1. SelectMessageQueueByRandom:随机分配策略

          2. SelectMessageQueueByHash:基于Hash分配策略

          3. SelectMessageQueueByMachineRoom:基于机器机房位置分配策略    

         8.3.2. 如何为消费者分配queue队列

          8.3.2.1 广播模式下

            所有的consumer会分到所有的queue

          8.3.2.2 集群模式下

            将queue指定给某个consumer。RocketMQ主动拉取时,需要指定拉取哪一条message queue。

            AllocateMessageQueueStrategy#allocate方法选择消费的queue,有6个实现类

            1. AllocateMessageQueueAveragely:平均分配算法

            2. AllocateMessageQueueAveragelyByCircle:基于环形平均分配算法

            3. AllocateMachineRoomNearby:基于机房临近原则算法

            4. AllocateMessageQueueByMachineRoom:基于机房分配算法

            5. AllocateMessageQueueConsistenHash:基于一致性hash算法

            6. AllocateMessageQueueByConfig:基于配置分配算法

       8.4 Consumer模式

        consumer被分为两类:MQPullConsumer和MQPushConsumer。

    九、消息重试

      集群模式下,消费失败后,broker通过消息重试重新投递消息

      消费失败的类型:(1)消费端返回ConsumeConcurrentlyStatus.RECONSUME_LATER (2)消费端返回null (3)消费端抛出异常,并没有捕获

      超过最大重试次数,消息会投递到死信队列。

       9.1 顺序消息的重试

      消费失败后会自动不断进行重试,重试时间间隔1秒,期间会阻塞消息的消费。使用顺序消息要确保完善的处理消费失败的情况

      9.2 无序消息的重试

      包括普通、定时、延时、事务消息。只针对集群模式,广播模式不提供重试。默认重试16次,重试时常总共4小时46分钟,消息重试时,消息ID是不变的。

      9.3 消息重试和延迟消息

      延迟消息默认有18个级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。重试消息选用了后16个级别,实际是基于延迟消息实现的

    十、死信队列

      死信Topic命令:%DLQ%+Consumer组名,如 %DLQ%test。每个消费者组(同一个GroupId)都对应一个死信队列(必须先产生死信,否则不会创建)。

      死信队列无法被正常消费,内部消息保存三天。

    十一、消息幂等

      11.1 消息重复的场景:  

        1. 发送时消息重复

        超时时间内,发送者发送失败会重新发送一条一样的消息(包括消息ID和消息内容),注意只有同步发送模式才会有重试机制,否则即使设置了重试次数也不生效

        只有首次发送时会设置消息ID

        还有一点需要注意,发送端的重试不仅有次数限制还有超时限制,当执行时间+重试时间超过超时时间,直接抛出异常,不再重试

        2. 投递时消息重复

        消费者收到消息后应答服务端的确认丢失,服务器会再次投递

        3. 负载均衡时消息重复(网络抖动、broker重启、消费者重启等)

        当broker或客户端重启、扩容或者收缩,触发Reblance,消费者可能收到重复消息

      11.2 实现幂等

        1. 消息幂等的两要素:

        1. 幂等令牌:业务中具备唯一性表示的字符串,比如订单号。

        2. 处理唯一性的确保:一个业务逻辑一定不会重复执行成功多次,比如一个订单只能支付成功一次。

          一般使用缓存去重+数据库唯一索引实现幂等

          数据库唯一索引保证数据唯一性,将支付记录存放在缓存,可以避免多次数据库调用,以应对高并发场景。

        消息ID可以作为去重ID吗?参考 https://www.jianshu.com/p/fa80604054a3

        生产者的消息ID生成方式(msgId):IP + 进程id + 生产消息的自增id + 从当月开始计算的时间戳。IP保证了分布式下的唯一,进程id保证了单机多客户端实例的唯一,自增id保证了单实例生产消息的唯一,时间戳保证了月内重启的唯一。但是月初重启,这个月和上个月的的消息ID会重复,那么CommitLog中会有重复的消息ID吗?并不会,因为RocketMQ只保存三天内的消息。

        服务端的消息ID生成方式(offsetMsgId):服务端IP地址+消息的文件偏移量(commitLog的offset)

    public class SendResult {
        private SendStatus sendStatus;
        private String msgId;// 客户端生成的id
        private MessageQueue messageQueue;
        private long queueOffset;
        private String transactionId;
        private String offsetMsgId;//服务端生成的id
        private String regionId;
        private boolean traceOn = true;
      
        .....
    }

        生产端:去重方式,在发送消息时,用业务唯一标识设置消息的key。消费端根据消息的key做幂等。 

        消费端:

          1. 业务操作之前进行状态查询,比如订单支付状态已完成则不允许再次支付,直接返回消费成功

          2. 业务操作前进行数据检索,根据唯一标识查到库中已经存在记录则直接返回成功

          3. 唯一性约束保证,数据库对唯一标识添加唯一索引针对单机情况

          4. 引入锁机制,分布式锁保证分布式场景的去重,状态机保证状态的正确变迁(或每条消息附加一个时间戳,保证不乱序消费)

    十二、消息堆积    

      1. 消息堆积的本质:生产者的生产速度大于消费者的处理速度

        1. 生产者生产速度骤增,比如突发流量

        2. 消费者速度变慢,比如消费者实例I/O阻塞严重或者宕机

      2. 如何处理消息堆积

        如何通过解决系统问题、优化代码来避免消息堆积?

        消息已经堆积,线上如何快速处理?

        2.1 消费端性能优化

          1. 增加单个消费者的处理能力,代码优化/提高配置

          2. 水平扩容消费者个数 

        发生大量堆积应该怎么处理?先分析问题,是否是消费者代码产生了bug或者在消费流程中依赖其他服务但是那个服务挂了,先恢复正常消费。

        1. 消费端扩容:建立一个临时的topic,这个topic下的queue是原先的几倍,并且分布在不同broker中,这样整个的消费能力就上来了。写一个分发任务将原先消息存放到新的topic下,然后启用消费者消费。消费完成后在还原

        2. 服务降级,快速失败

        3. 跳过不重要的消息

        参考:https://www.jianshu.com/p/f265258477e7

    十三、消息的查询

       三种查询条件:

      1. 按照Message Key查询:程序代码明确指定的key,setkeys方法,可能存在多条记录,原因和2一致

      2. 按照Unique Key查询(msgId):生产者在发送消息前,自动生成一个UNIQ_KEY,自动重试的时候这个值是一样的。所以可能存在多条记录

      3. 按照Message Id查询(offsetMsgId):这里指的是Broker端生成的,前8个字节是Broker的IP和端口,后8个字节是消息在CommitLog中的偏移量,唯一定位一条消息

      通过1,2查询需要用到索引文件(其中记录了CommitLog的偏移量),所以3查询效率最高

      客户端查询API:org.apache.rocketmq.client.MQAdmin这个接口定义了方法

    /**
         * Query message according tto message id
         *
         * @param offsetMsgId message id
         * @return message
         */
        MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
            InterruptedException, MQClientException;
    
        /**
         * Query messages
         *
         * @param topic message topic
         * @param key message key index word
         * @param maxNum max message number
         * @param begin from when
         * @param end to when
         * @return Instance of QueryResult
         */
        QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
            final long end) throws MQClientException, InterruptedException;
    
        /**
         * @return The {@code MessageExt} of given msgId
         */
        MessageExt viewMessage(String topic,
            String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

        而生产者和消费者都实现了这个接口,在服务上就可以通过接口的方式查询。

        之前讲到,发送者重试,msgId是一样的,服务端有可能收到多条一样的消息,那么根据msgId查询是怎么处理的呢

        public MessageExt queryMessageByUniqKey(String topic,
            String uniqKey) throws InterruptedException, MQClientException {
    
            QueryResult qr = this.queryMessage(topic, uniqKey, 32,
                MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE, true);
            if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) {
                return qr.getMessageList().get(0);
            } else {
                return null;
            }
        }

        可以看到这种情况,只返回了其中一条。  

    十四、Rebalance

      详解:http://www.tianshouzhi.com/api/tutorials/rocketmq/409

      将一个Topic下的多个队列在同一个消费者组下的多个消费者实例重新分配。通常发生在增加消费者或者消费者下线的情况。

      Reblance危害:

        1. 消费暂停:在rebalance期间,受到影响的消费者需要先暂停,等到重新分配完成后才能再次消费

        2. 重复消费:消费者消费完成之后,offset是异步通知给broker的,有延迟。所以切换的时候,可能新的消费者又消费了之前消费过的消息

        3. 消费突增:由1,2引起,暂停引发了大量消息积压或者重复消费了大量消息,会造成消费突增

      14.1 rebalance触发时机

        1. broker端变化:broker宕机、broker升级、队列扩容和缩容

        2. 消费者组变化:消费者宕机、消费者与broker断开连接、主动进行消费者数量扩容/缩容、Topic订阅信息发生变化

        Broker在rebalance中充当一个协调者的角色,在其内部通过元数据管理器维护了Rebalance元数据信息(内部实现是Map),当元数据信息发生变化时,主动通知消费者进行rebalance。

        问题一、消费者自己重新选择queue,如何确保彼此不冲突?

        问题二、broker的rebalance通知丢失了怎么办?每个consumer会定时触发rebalance。

        问题三、重新分配后消费进度如何确认?ConsumerOffsetManager保存每个队列的消费信息。消费者发送UPDATE_CONSUMER_OFFSET更新消费进度、发送QUERY_CONSUMER_OFFSET查询消费进度

        消费者触发rebalance:

        1. 启动时触发 DefaultMQPushConsumerImpl#start方法

          启动5个步骤

          1. 启动准备工作

          2. 从nameserver更新topic路由信息,收集rebalance需要的queue信息

          3. 检查consumer配置

          4. 向每个broker发送心跳,将自己加入消费者组(注册到consumerManage中),broker收到后会通知其他消费者rebalance

          5. 立即触发rebalance 

         2. 运行时触发

          1. 监听broker通知

          2. 周期性触发rebalance,一个定时任务,默认每20秒一次

        3. 停止时触发 DefaultMQPushConsumerImpl#shutdown方法

          停止的5个步骤

          1. 停止正在消费的消息

          2. 持久化offset,向broker同步自己消费的offset

          3. 取消注册consumer,将自己移除消费者组(从consumerManager中移除),broker收到后会通知其他消费者rebalance

          4. 关闭与nameserver和broker的连接

          5. 丢弃尚未处理的消息

      14.2 Consumer Rebalance流程

        MQClientInstance#doRebalance方法

        public void doRebalance() {
            for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                MQConsumerInner impl = entry.getValue();
                if (impl != null) {
                    try {
                        impl.doRebalance();
                    } catch (Throwable e) {
                        log.error("doRebalance exception", e);
                    }
                }
            }
        }

        只要这台机器上有一个消费者触发了rebalance,所有的消费者都会执行rebalance。维度是按照topic执行的,每次rebalance一个topic,分配时有不同策略(8.3.2.2)。如果一个消费者组订阅了多个topic,那么可能出现分配不均的情况,因为每次排序和选取的规则是一样的。

        单个的Rebalance流程:

        1. 获取Rebalance元数据信息,Topic的队列信息(从缓存的Topic路由中获取)和消费者组实例id

        2. 进行队列分配,分配策略见8.3.2.2。回答问题一:1. 分配前对queue和消费者id进行排序 2. 选用相同的分配策略

        3. 分配结果处理

        RebalanceImpl#updateProcessQueueTableInRebalance方法

        消费者新增了队列,要先计算从哪个位置开始消费,从这个位置拉取消息

        消费者移除了队列,清除缓存消息,停止拉取消息,并持久化offset(将offset同步给broker)

        

    面试题:https://zhuanlan.zhihu.com/p/161661032

    RocketMQ重试机制:https://blog.csdn.net/hejingyuan6/article/details/108365294

    水平扩容和负载均衡:https://zhuanlan.zhihu.com/p/25140744

    人生就像蒲公英,看似自由,其实身不由己。
  • 相关阅读:
    idea用法
    pagehelper用法
    mybatis
    多线程2
    radio 标签状态改变时 触发事件
    多线程
    a标签点击后,给a标签添加样式
    servlet
    mybatis 查询
    springmvc 发送PUT 和 DELETE 请求
  • 原文地址:https://www.cnblogs.com/walker993/p/14563978.html
Copyright © 2011-2022 走看看