zoukankan      html  css  js  c++  java
  • RocketMQ(4.8.0)——Broker消息存储机制

    Broker消息存储机制

      RocketMQ 使用 CommitLog 文件将消息存储到磁盘上,那么 RocketMQ 存储消息到磁盘的过程是怎么样的呢?

      RocketMQ 首先将消息数据写入操作系统 PageCache,然后定时将数据刷入磁盘。

    一、Broker 消息存储的流程是什么?

      下面主要介绍 RocketMQ 是如何接收发送消息请求并将消息写入 PageCache 的,整个过程如下:

    (1)Broker 接收客户端发送消息的请求并做预处理。

      SendMessageProcessor.processRequest()方法会自动被调用者接收、解析客户端请求为消息实例。

      • 解析请求参数
      • 执行发送处理前的 Hook
      • 调用保存方法存储消息
      • 执行发送处理后的 Hook

    (2)Broker存储前预处理消息。

      预处理方法:org.apache.rocketmq.broker.processor.SendMessageProcessor.sendMessage(),

      首先,设置请求处理返回对象标志,代码路径:D: ocketmq-masterrokersrcmainjavaorgapache ocketmqrokerprocessorSendMessageProcessor.java,代码如下:

    1     final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
    2     final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
    3 
    4     response.setOpaque(request.getOpaque());

      Netty 是异步执行的,请求发送到 Broker 被处理后,返回结果时,在客户端的处理线程已经不再是发送请求的线程,那么客户端如何确定返回结果对应哪个请求呢? 通过返回标志来判断。

      其次,做一些列存储前发送请求的数据检查,比如死信消息处理、Broker 是否拒绝事务消息处理、消息基本检查等。消息基本检查方法代码路径:D: ocketmq-masterrokersrcmainjavaorgapache ocketmqrokerprocessorAbstractSendMessageProcessor.java 中 msgCheck(),该方法的主要功能如下:

      • 校验 Broker 是否配置可写
      • 校验 Topic 名字是否为莫认证
      • 校验 Topic 配置是否存在
      • 校验 queueId 与读写队列数是否匹配
      • 校验 Broker 是否支持事务消息(msgCheck之后进行的校验)

     (3)执行 DefaultMessageStore.putMessage() 方法进行消息校验和存储模块检查

      在真正保存消息前,会对消息数据做基本检查、对存储服务做可用性检查、对 Broker 做是否 Slave 的检查等,总结如下:

      • 校验存储模块是否已经关闭
      • 校验 Broker 是否是Slave
      • 校验存储模块运行标记
      • 校验 Topic 长度
      • 校验扩展信息的长度
      • 校验操作系统 PageCache 是否繁忙。

      (4)执行 D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreCommitLog.java putMessage() 方法,将消息写入 CommitLog。

      存储消息的核心处理过程如下:

      • 设置消息保存时间为当前时间戳,设置消息完整性校验码 CRC(循环冗余码)。
      • 延迟消息处理。如果发送的消息是延迟消息,这里会单独设置延迟消息的数据字段,比如修改 Topic 为延迟消息特有的 Topic —— SCHEDULE_TOPIC_XXXX,并且备份原来的 Topic 和 queueId,以便延迟消息在投递后被消费者消费。

    延迟消息的处理代码如下:

     1         final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
     2         if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
     3                 || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
     4             // Delay Delivery
     5             if (msg.getDelayTimeLevel() > 0) {
     6                 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
     7                     msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
     8                 }
     9 
    10                 topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
    11                 queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    12 
    13                 // Backup real topic, queueId
    14                 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    15                 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
    16                 msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
    17 
    18                 msg.setTopic(topic);
    19                 msg.setQueueId(queueId);
    20             }
    21         }
    • 获取最后一个 CommitLog 文件实例 MappedFile,锁住该 MappedFile。默认为自旋锁,也可以通过 useReentrantLockWhenPutMessage 进行配置、修改和使用 ReentrantLock。
    • 校验最后一个 MappedFile,如果结果为空或已写满,则新创建一个 MappedFile 返回。
    • 调用 MappedFile.appendMessage(final MessageExtBrokerInner msg,final AppendMessageCallback cb),将消息写入 MappedFile。

    根据消息是单个消息还是批量消息来调用 AppendMessageCallback.doAppend()方法,并将消息写入 Page Cache,该方法的功能包含以下几点:

    1. 查找即将写入的消息物理机 Offset
    2. 事务消息单独处理。这里主要处理 Prepared 类型和 Rollback 类型的消息,设置消息 queueOffset 为 0 。
    3. 序列化消息,并将序列化结果保存到 ByteBuffer 中(文件内存映射的 Page Cache 或 Direct Memory,简称 DM)。特别地,如果将刷盘设置为异步刷盘,那么当ransientStorePoolEnablTrue时,会先写入DM,DM中的数据再异步写入文件内存映射的Page Cache 中,因为消费者始终是从 Page Cache 中读取消息消费的,所以这个机制也称为 "读写分离"。
    4. 更新消息所在 Queue 的位点。

      在消息存储完成后,会处理刷盘逻辑和主从同步逻辑,分别调用 D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreCommitLog.java 中 handleDiskFlush() 方法,代码如下

     1     public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
     2         // Synchronization flush
     3         if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
     4             final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
     5             if (messageExt.isWaitStoreMsgOK()) {
     6                 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
     7                 service.putRequest(request);
     8                 CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
     9                 PutMessageStatus flushStatus = null;
    10                 try {
    11                     flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
    12                             TimeUnit.MILLISECONDS);
    13                 } catch (InterruptedException | ExecutionException | TimeoutException e) {
    14                     //flushOK=false;
    15                 }
    16                 if (flushStatus != PutMessageStatus.PUT_OK) {
    17                     log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
    18                         + " client address: " + messageExt.getBornHostString());
    19                     putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
    20                 }
    21             } else {
    22                 service.wakeup();
    23             }
    24         }
    25         // Asynchronous flush
    26         else {
    27             if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
    28                 flushCommitLogService.wakeup();
    29             } else {
    30                 commitLogService.wakeup();
    31             }
    32         }
    33     }

    和handleHA()方法,代码如下:

     1     public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
     2         if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
     3             HAService service = this.defaultMessageStore.getHaService();
     4             if (messageExt.isWaitStoreMsgOK()) {
     5                 // Determine whether to wait
     6                 if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
     7                     GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
     8                     service.putRequest(request);
     9                     service.getWaitNotifyObject().wakeupAll();
    10                     PutMessageStatus replicaStatus = null;
    11                     try {
    12                         replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
    13                                 TimeUnit.MILLISECONDS);
    14                     } catch (InterruptedException | ExecutionException | TimeoutException e) {
    15                     }
    16                     if (replicaStatus != PutMessageStatus.PUT_OK) {
    17                         log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
    18                             + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
    19                         putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
    20                     }
    21                 }
    22                 // Slave problem
    23                 else {
    24                     // Tell the producer, slave not available
    25                     putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
    26                 }
    27             }
    28         }
    29 
    30     }

      在 Broker 处理发送消息请求时,由于处理器 SendMessageProcessor 本身是一个线程池服务,所以设计了快速失败逻辑,方便在高峰时自我保护,代码路径:D: ocketmq-masterrokersrcmainjavaorgapache ocketmqrokerlatencyBrokerFastFailure.java中的cleanExpiredRequest()方法,代码如下:

     1     private void cleanExpiredRequest() {
     2         while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
     3             try {
     4                 if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
     5                     final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
     6                     if (null == runnable) {
     7                         break;
     8                     }
     9 
    10                     final RequestTask rt = castRunnable(runnable);
    11                     rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
    12                 } else {
    13                     break;
    14                 }
    15             } catch (Throwable ignored) {
    16             }
    17         }
    18 
    19         cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
    20             this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());
    21 
    22         cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
    23             this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
    24 
    25         cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
    26             this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
    27 
    28         cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
    29             .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
    30     }

      在 BrokerController 启动 BrokerFastFailure 服务时,会启动一个定时任务处理快速失败的异常,启动及扫描代码路径:D: ocketmq-masterrokersrcmainjavaorgapache ocketmqrokerlatencyBrokerFastFailure.java,具体代码如下:

        public void start() {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
                        cleanExpiredRequest(); #每间隔 10ms 执行一次该方法,清理非法、过期的请求。
                    }
                }
            }, 1000, 10, TimeUnit.MILLISECONDS);
        }

    cleanExpiredRequest()方法处理方式有3种:

    1. 系统繁忙时发送消息请求快速失败处理。当操作系统 PageCache繁忙时,会将发送消息请求从发送消息请求线程池工作队列中取出来,直接返回SYSTEM_BUSY。如果此种情况发生说明系统已经不堪重负,需要增加系统资源或者扩容来减轻当前 Broker 的压力。
    2. 发送请求超时处理。
    3. 拉取消息请求超时处理。

      第2种和第3种的代码逻辑和第1种代码逻辑处理类似,如果出现了,说明请求在线程池的工作队列中的排队时间超过预期配置的时间,那么增加排队等待时间即可。如果请求持续超时,说明系统可能达到瓶颈,那么需要增加系统资源或者扩容。

    二、Broker如何保证高效存储?——内存映射机制与高效写磁盘

      RocketMQ 在存储设计中通过内存映射、顺序写文件等方式实现了高吞吐。

      那么这些怎么实现的呢?

      RocketMQ 的基本数据结构:

      org.apache.rocketmq.store.CommitLog:RocketMQ 对存储消息的物理文件的抽象实现,也就是物理 CommitLog 文件的具体实现。

      org.apache.rocketmq.store.MappedFile:CommitLog 文件在内存中的映射文件,映射文件同时具有内存的写入速度和磁盘一样可靠的持久化方式。

      org.apache.rocketmq.store.MappedFileQueue:映射文件队列中有全部的 CommitLog 映射文件,第一个映射文件为最先过期的文件,最后一个文件是最后过期的文件,最新的消息总是写入最后一个映射文件中。

      CommitLog、MappedFile、MappedFileQueue 与物理 CommitLog 文件的关系如下:

      每个 MappedFileQueue 包含多个 MappedFile,就是真实的物理 CommitLog文件,Java 通过 java.nio.MappedByteBuffer 来实现文件的内存映射,即文件读写都是通过 MappedByteBuffer(其实是 Page Cache)来操作的。

      写入数据时先加锁,然后通过 Append 方式写入最新 MappedFile。对于读取消息,大部分情况下用户只关心最新数据,而这些数据都在 Page Cache 中,也就是说,读写文件就是在 Page Cache 中进行的,其速度几乎等于志杰操作内存的速度。

    三、文件刷盘机制

      消息存储完成后,会被操作系统持久化到磁盘,也就是刷盘。

      RocketMQ 支持2种刷盘方式,在 Broker 启动时:

      • 配置 flushDiskType = SYNC_FLUSH 表示同步刷盘
      • 配置 flushDiskType = ASYNC_FLUSH 表示异步刷盘

                                                     GroupCommitService 就是 org.apahce.rocketmq.store.CommitLog.GroupCommitServie —— 同步刷盘服务。在 Broker 存储消息到 Page Cache 后,同步将 Page Cache 刷到磁盘,再返回客户端消息并写入结果,具体过程如下所示: 

      FlushRealTimeService 就是 org.apahce.rocketmq.store.CommitLog.FlushRealTimeService —— 异步刷盘服务。在 Broker 存储消息到 Page Cache 后,立即返回客户端写入结果,然后异步刷盘服务将 Page Cache 异步刷盘到磁盘。

      CommitRealTimeService 就是 org.apahce.rocketmq.store.CommitLog.CommitRealTimeService —— 异步转存服务。Broker 通过配置读写分离将消息写入直接内存(Direct Memory),简称 DM),然后通过异步转存服务,将 DM 中的数据再次存储到 Page Cache 中,以供异步刷盘服务将 Page Cache 刷到磁盘中,转存服务过程如下:

      将消息成功保存到 CommitLog 映射文件后,调用 D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreCommitLog.java 中 handleDiskFlush() 方法处理刷盘逻辑,代码如下:

     1     public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
     2         // Synchronization flush
     3         if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
     4             final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
     5             if (messageExt.isWaitStoreMsgOK()) {
     6                 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
     7                 service.putRequest(request);
     8                 CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
     9                 PutMessageStatus flushStatus = null;
    10                 try {
    11                     flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
    12                             TimeUnit.MILLISECONDS);
    13                 } catch (InterruptedException | ExecutionException | TimeoutException e) {
    14                     //flushOK=false;
    15                 }
    16                 if (flushStatus != PutMessageStatus.PUT_OK) {
    17                     log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
    18                         + " client address: " + messageExt.getBornHostString());
    19                     putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
    20                 }
    21             } else {
    22                 service.wakeup();
    23             }
    24         }
    25         // Asynchronous flush
    26         else {
    27             if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
    28                 flushCommitLogService.wakeup();
    29             } else {
    30                 commitLogService.wakeup();
    31             }
    32         }
    33     }

      通过以上代码可知,同步刷盘、异步刷盘都是在这里发起的。异步刷盘的实现根据是否配置读写分离机制而稍有不同。

      接下来我们介绍两种刷盘方式:

      (1)同步刷盘:

      同步刷盘是一个后台线程服务 ,消息进行同步刷盘的流程如下图:

      存储消息线程:主要负责将消息存储到 Page Cache 或者 DM 中,存储成功后通过调用 handleDiskFlush() 方法将同步刷盘请求 "发送" 给 GroupCommitService 服务,并在该刷盘请求上执行锁等待,代码路径:D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreCommitLog.java 中 handleDiskFlush(),具体代码如下:

     1     public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
     2         // Synchronization flush
     3         if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
     4             final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
     5             if (messageExt.isWaitStoreMsgOK()) {           #客户端可以设置,默认为True
     6                 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
     7                 service.putRequest(request);               #保存同步磁盘请求
     8                 CompletableFuture<PutMessageStatus> flushOkFuture = request.future(); #请求同步锁等待
     9                 PutMessageStatus flushStatus = null;
    10                 try {
    11                     flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
    12                             TimeUnit.MILLISECONDS);
    13                 } catch (InterruptedException | ExecutionException | TimeoutException e) {
    14                     //flushOK=false;
    15                 }
    16                 if (flushStatus != PutMessageStatus.PUT_OK) {
    17                     log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() #记录刷盘超时设置
    18                         + " client address: " + messageExt.getBornHostString());
    19                     putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
    20                 }
    21             } else {
    22                 service.wakeup();    #异步刷盘,不用同步返回
    23             }
    24         }
    25         // Asynchronous flush
    26         else {
    27             if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
    28                 flushCommitLogService.wakeup();
    29             } else {
    30                 commitLogService.wakeup();
    31             }
    32         }
    33     }

      同步刷盘服务线程:通过 GroupCommitService 类实现的同步刷盘服务。

      具体同步刷盘是怎么执行的,执行完成后又是如何将刷盘结果通知存储数据线程的呢?

      正常同步刷盘线程会间隔 10ms 执行一次 org.apache.rocketmq.store.CommitLog.GroupCommitServcie.doCommit()方法,该方法循环每一个同步刷盘请求,如果刷盘成功,那么唤醒等待刷盘请求锁的存储消息线程,并告知刷盘成功。

      由于操作系统刷盘耗时及每次刷多少字节数据到磁盘等,都不是 RocketMQ 进程能掌控的,所以在每次刷盘前都需要做必要的检查,以确认当前同步刷盘请求对应位点的消息是否已经被刷盘,如果已经被刷盘,当前刷盘请求就不需要执行。

      在 RocketMQ 进程正常关闭时,如果有同步刷盘请求未执行完,那么数据会丢失吗?

      答案是:不会的。在上图,我们得知,关闭刷盘服务时,会执行 Thread.sleep(10) 等待所有的同步刷盘请求保存到刷盘请求队列中后,交换保存刷盘请求的队列,再执行 doCommit() 方法。

    (2)异步刷盘:

      如果 Broker 配置读写分离,则异步刷盘过程包含异步转存数据和真正的异步刷盘操作。

      异步转存数据是通过 org.apache.rocketmq.store.CommitLog.GroupCommitServcie.doCommit()方法实现的。

      下面将介绍异步转存数据服务的核心的执行过程。

      (1)获取转存参数。整个转存过程的参数都是可配置的。

      (2)执行转存数据。

      (3)转存失败,唤醒异步刷盘线程。转存数据失败,并不代表没有数据被转存到 Page Cache 中,而是说明有部分数据转存成功,部分数据转存失败。所以可以唤醒刷盘线程执行刷盘操作。而如果转存成功,则正常执行异步刷盘即可。

      在异步转存服务和存储服务把消息写入 Page Cache 后,由异步刷盘将消息刷入磁盘中。异步刷盘服务的主要功能是将 Page Cache 中的数据异步刷入磁盘,并记录 Checkpoint 信息。异步刷盘的实现代码主要在 org.apache.rocketmq.store.CommitLog.FlushRealTimeService.run() 方法中,步骤拆解如下:

      第一步:获取刷盘参数;

      第二步:等待刷盘间隔;

      第三步:执行刷盘;

      第四步:记录 CheckPoint 和耗时日志。这里主要记录最后刷盘成功过时间和刷盘耗时超过 500ms 的情况。

    总结:

  • 相关阅读:
    log4j2分析总结(一)
    Idea(三)常用插件以及快捷键总结
    设计模式(七)_模板方法模式
    Idea(二) 解决IDEA卡顿问题及相关基本配置
    设计模式(六)_观察者模式
    SpringBoot(十一)_springboot热部署
    UML类图学习
    设计模式(五)_工厂方法模式
    设计模式(四)_简单工厂模式
    设计模式(三)_装饰器模式
  • 原文地址:https://www.cnblogs.com/zuoyang/p/14447942.html
Copyright © 2011-2022 走看看