zoukankan      html  css  js  c++  java
  • rocketmq消息存储概述

    了解消息存储部分首先需要关注的几个方法,load()--Load previously stored messages、start()--Launch this message store、putMessage--Store a(or batch) message into store.
    以及一些关键词:
      commitLog:      消息的物理存储相关
      consumeQueue:    逻辑队列存储相关
      IndexFile:           消息存储索引
      刷盘:             将写入内存的消息持久化
      主从同步(HAService):       Master中的数据同步到Slave中
      load()方法:         用于重启时,加载数据
    load()方法用于重启时,加载数据,初始化boker时,boker中的initialize方法中会调用messageStore.load(),包括:commitLog.load()、 loadConsumeQueue()、 indexService.load、 recover(lastExitOK)
     
    正文:
    (一)消息存储开启基础服务 -- 在后台运行,时刻准备为存储服务
    boker启动时,会初始化DefaultMessageStore,调用DefaultMessageStore.start()服务。
    I. 初始化DefaultMessageStore时开启的服务
      预分配MapedFile对象服务(线程):AllocateMapedFileService
      分发消息索引服务(线程):   DispatchMessageService --(注:rockemq4.0版本中抛弃了该服务,对应变成了CommitLogDispatcherBuildConsumeQueue、CommitLogDispatcherBuildIndex内部类)
      消息索引服务(线程):     IndexService
    II. DefaultMessageStore.start()会开启的服务(或服务线程)
      逻辑队列刷盘服务(线程):  FlushConsumeQueueService
      物理队列刷盘服务(线程):  FlushCommitLogService (该服务在初始化commitlog对象时开启)
      运行时数据统计服务(线程):    StoreStatsService
      从物理队列解析消息重新发送到逻辑队列服务(线程):ReputMessageService
      HA服务: HAService
      定时服务:ScheduleMessageService,如定时删除过期文件--cleanFilesPeriodically等。
    (注:这些服务对象基本都在初始化DefaultMessageStore实例对象时被创建)
     
     
    (二) 存储过程
    rockemq4.0数据存储的过程与之前的版本存入过程与有很大的不同:
      如rocketmq 3.2.4中只有角色为SLAVE的boker会开启ReputMessageService服务。
      如rockemq4.0中将之前版本中废除了处理分发消息索引服务DispatchMessageService服务,更改为这两个类CommitLogDispatcherBuildConsumeQueue、CommitLogDispatcherBuildIndex。
      如rocketmq 3.2.4可以通过是否开启消息索引功能可以控制是否执行 Index索引。
     
    2.1 rocketmq 3.2.4版本的数据存入过程:
    I. commitLog数据存入
    • 如果boker角色为MASTER
      生产者每写入一条数据,boker端接受到消息后,DefaultMessageStore.putMessage调用Commit.putMessage方法,PutMessage首先要检查一些条件,比如:
        1. 每条数据第一写入的broker的属性必须为master,否则回返回PutMessageStatus.SERVICE_NOT_AVAILABLE状态,“message store is slave mode, so putMessage is forbidden ”. 
        2. 这条msg是否具有被写入的权限,否则回返回PutMessageStatus.SERVICE_NOT_AVAILABLE状态,"message store is not writeable, so putMessage is forbidden ".
        3. message topic长度校验
        4. message properties长度校验
      Commit.putMessage 首先将数据写入到commitlog对应的mapedFile中,每写入一条消息,通过mapedFile.appendMessage追加到MapedFile文件中,当MapedFile写满后,生成一个新的MapedFile,然后向这个MapedFile中追加消息,如此不断 ... ...,这些MapedFile装在MapedFileQueue中。
      commitLog中每向mapedFile中写入一个消息后,会返回一个AppendMessageResult对象,根据AppendMessageResult与msg消息信息,生成一个DispatchRequest对象,调用commit的内部类DispatchMessageService.putRequest(dispatchRequest)方法,将写入的消息对应dispatchRequest写入到定义的List<DispatchRequest> requestsWrite列表中。
    • 如果boker角色为SLAVE
      没有putMessage过程,数据加载通过HAService进行主从同步,同步MASTER中的逻辑队列,向commitLog存入数据。(过程比较复杂,有机会以后单独成文分析)
    II. consumeQueue数据存入(indexFile数据存入可选)
    • 如果boker角色为MASTER
    DispatchMessageService线程在后台一直运行,不断执行doDispatch()
    while (!this.isStoped()) {
      try {
        this.waitForRunning(0);
        this.doDispatch();
      } catch (Exception e) {
        DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
      }
    }
    doDispatch()会将requestsWrite列表中的dispatchRequest处理,将它们转换成consumeQueue单元结构对应数据,这些数据追加到consumerQueue对应的MapedFile中。然后添加到consumerQueue的MapedFileQueue中。如果开启了消息索引功能即:isMessageIndexEnable==true,则将requestsWrite列表中的dispatchRequest传给indexService服务,然后indexService将这些消息写入IndexFile中。
    • 如果boker角色为SLAVE
      由于生产者产生消息不会直接到SLAVE,因此在SLAVE不会执行putMessage逻辑,它主要靠ReputMessageService 服务线程,从物理队列(commitlog)解析消息重新发送到逻辑队列,大致过程为: 从物理队列解析数据,生成dispatchRequest,如果数据正常,则将dispatchRequest传入给DispatchMessageService的List<DispatchRequest> requestsWrite,之后DispatchMessageService处理dispatchRequest的过程与上文一样。
     
    2.2. rocketmq 4.0 版本的数据存入过程
    I. commitLog数据存入过程基本不变
      不同的是,commit.putMessage过程并不会根据AppendMessageResult与msg消息信息,生成一个DispatchRequest对象,该版本中DispatchRequest对象的生成过程放在了ReputMessageService中,通过ReputMessageService生成DispatchRequest对象。该版本中ReputMessageService服务线程不像rocketmq 3.2.4中那样只为boker角色为SLAVE单独开设。
    II. consumeQueue与indexFile数据存入
      rocketmq4.0中此过程的核心服务是ReputMessageService,与之前版本不同的是在rocketmq4.0版本中,consumeQueue与indexFile数据存入的服务线程独立出来了,分别使用CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex类处理,初始化DefaultMessageStore时,将这两个类存放入dispatcherList列表中:
    this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
    this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
    在ReputMessageService服务线程启开后,不断从commitLog中解析数据,生成dispatchRequest :
    DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
     之后向dispatcherList中的所有分发器分发dispatcherList:
    DefaultMessageStore.this.doDispatch(dispatchRequest);
    即执行:
    CommitLogDispatcherBuildConsumeQueue.doDispatch(dispatchRequest)生成consumeQueue数据
    CommitLogDispatcherBuildIndex.doDispatch(dispatchRequest)生成IndexFile数据
     
     
    (三) 数据写入内存小结
    即:内存映射
    生成commitLog数据的核心接口:
    this.commitLog.putMessage(msg)
    将数据写入到commitlog对应的MapedFiLe对象中。
    生成consumeQueue数据的核心接口:
    public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
      ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
      cq.putMessagePositionInfoWrapper(dispatchRequest);
    }
    将数据写入到consumeQueue对应的MapedFiLe对象中。
    生成IndexFile数据的核心接口:
    DefaultMessageStore.this.indexService.buildIndex(request);
    将数据写入到IndexFile对应的MapedFiLe对象中。
     
     
    (四)  内存文件落地 -- 刷盘
    上文介绍了数据如何写入到逻辑队列、物理队列、索引的MapedFiLe中,这里介绍如何将逻辑队列、物理队列内存数据持久化到磁盘(索引文件的写入可以在以后的文章中单独分析)。
     
    逻辑队列、物理队列内存文件刷盘方式相同,它们生成的MapedFile文件会放在各自对应的MapedFileQueue对象中,通过刷盘的方式,将MapedFileQueue持久化到物理磁盘上。
    初始化DefaultMessageStore的时候会开启: 逻辑队列刷盘服务线程--FlushConsumeQueueService、将ConsumeQueue.mapedFileQueues刷入磁盘;
    初始化commitlog对象时开启:物理队列刷盘服务线程--FlushCommitLogService,将commitlog.mapedFileQueues刷入磁盘。
    这两个线程会分别将MapedFileQueue持久化到物理磁盘上。
    对于commitlog的刷盘策略:
    if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
      this.flushCommitLogService = new GroupCommitService();
    } else {
      this.flushCommitLogService = new FlushRealTimeService();
    }
    异步刷盘使用的是FlushRealTimeService,同步刷盘使用的是GroupCommitService
    刷盘过程要涉及到MapedFile,MapedFile以及java NIO相关的知识如MappedByteBuffer、FileChannel,可以学到的到东西很多,具体的刷盘实现过程见(下一篇)。
     
     
  • 相关阅读:
    指针如何影响结构体,细节的思考
    【转】oracle null
    【转】JavaScript闭包
    【转】Ext JS xtype
    【转】EXT JS MVC开发模式
    【转】Ext.ajax.request 中的success和failure
    【转】Oracle job procedure 存储过程定时任务
    JDK重要包和Java学习方法论
    rownum
    【转】Js获取当前日期时间及格式化操作
  • 原文地址:https://www.cnblogs.com/chenjunjie12321/p/8260840.html
Copyright © 2011-2022 走看看