zoukankan      html  css  js  c++  java
  • RocketMQ 解析

    1、RocketMQ网络部署图

    • RocketMQ的几个关键角色和配置
    • Producer

    两种消息发送方式:

    1.同步发送:发送成功后继续执行代码逻辑。

    2.异步发送:发送后,不管成功失败执行代码逻辑。成功后调用回调方法。

    • Broker

    两种刷盘方式,flushDiskType配置,SYNC_FLUSH,ASYNC_FLUSH。

    1.同步刷盘方式:消息写入磁盘后再返回成功状态。

    2.异步刷盘方式:消息写入内存后就返回成功状态。

    两种复制方式,表示消息从Master复制到Slave的方式,brokerRole,ASYNC_MASTER(异步master),SYNC_MASTER(同步master),SLAVE(slave)。

    1.同步复制方式:等Master和Slave都写入成功后才返回写入成功。

    2.异步复制方式:Master写入成功后就返回写入成功。

    • Consumer

    三种消费方式 :

    1.Push(服务端主动推送消息),RocketMQ服务器收到消息后自动调用消费者函数来处理消息,自动维护Offset。支持两种消息模式,Clustering模式,同一个ConsumerGroup的每个Consumer消费订阅消息的一部分内容,broadcasting模式,同一个ConsumerGroup的每个Consumer都消费所订阅的的全部消息。

    2.Pull (客户端主动拉取消息),Client端循环从Server端拉取消息。需要客户端自己维护Offset。

    3.长轮询消费方式,Client发送消息请求,Server端接受请求,如果发现Server队列里没有新消息,Server端不立即返回,而是持有这个请求一段时间(通过设置超时时间来实现),在这段时间内轮询Server队列内是否有新的消息,如果有新消息,就利用现有的连接返回消息给消费者;如果这段时间内没有新消息进入队列,则返回空。

    深入了解了上面三个角色,我们来总结下双master,双slave模式下的整个发送,消费流程。生产者发送消息,消息会负载均衡到两个Master上,如果master的刷盘方式是同步刷盘方式,复制方式是同步复制方式,需要消息写到master和slave的硬盘上后,服务器才会放回发送消息成功。消息存储到服务器后,消费者根据自己的消费方式来消费消息,如果是Push,消息到达服务器后马上推送消息到消费者,如果是pull,消费拉取消息后再消费。

    1.1 RocketMQ网络部署特点:

    • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。相对来说,nameserver的稳定性非常高。原因有二:

    1)nameserver互相独立,彼此没有通信关系,单台nameserver挂掉,不影响其他nameserver,即使全部挂掉,也不影响业务系统使用。无状态
    2)nameserver不会有频繁的读写,所以性能开销非常小,稳定性很高。
    • Broker部署相对复杂,Broker四种集群方式

    Broker分为Master与Slave(Slave不可写,但可读,类似于MySQL的主备方式),一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有 NameServer。

    1)单个master:这是一种风险比较大的集群方式,因为一旦Borker重启或宕机期间,将会导致这个服务不可用,因此是不建议线上环境去使用的。
    2)多个master:
    一个集群全部都是Master,没有Slave。

      优点:配置简单,单个Master宕机或者是重启维护对应用没有什么影响的,在磁盘配置为RAID10时,即使机器宕机不可恢复的情况下,消息也不会丢失(异步刷盘丢失少量消息,同步刷盘则是一条都不会丢失),性能最高

      缺点:当单个Broker宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息的实时性会受到影响

    3)多master多salve异步复制,每个Master配置一个Slave,有多对的Master-Slave,HA采用的是异步复制方式,主备有短暂的消息延迟,毫秒级别的(Master收到消息之后立刻向应用返回成功标识,同时向Slave写入消息)。

      优点:即使是磁盘损坏了,消息丢失的非常少,且消息实时性不会受到影响,因为Master宕机之后,消费者仍然可以从Slave消费,此过程对应用透明,不需要人工干预,性能同多个Master模式机会一样。

      缺点:Master宕机,磁盘损坏的情况下,会丢失少量的消息

    4)多master多salve同步双写,每个Master配置一个Slave,有多对的Master-Slave,HA采用的是同步双写模式,主备都写成功,才会向应用返回成功。

      优点:数据与服务都无单点,Master宕机的情况下,消息无延迟,服务可用性与数据可用性都非常高

      缺点:性能比异步复制模式略低,大约低10%左右,发送单个Master的RT会略高,目前主机宕机后,Slave不能自动切换为主机,后续会支持自动切换功能。

    • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
    • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

    1.2 安装及使用步骤:

    1、下载并安装
      根目录执行 mvn clean package -Dmaven.test.skip=true或mvn -Preplease-all -DskipTests clean install -U
      并在distribution/target/apache-rocketmq 目录下找到打好的包.解压至指定目录
     
    2、rocketmq的启动
      启动namesrv服务:nohup sh bin/mqnamesrv &  查看日志:tail -f ~/logs/rocketmqlogs/namesrv.log
      启动broker服务:nohup sh bin/mqbroker &   查看日志:tail -f ~/logs/rocketmqlogs/broker.log
      nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true & 默认自动创建topic,否则报错no route info of this topic
     
    3、rocketmq服务关闭
      关闭namesrv服务:sh bin/mqshutdown namesrv
      关闭broker服务 :sh bin/mqshutdown broker

     2.  RocketMQ配置文件 - broker

    众所周知,RocketMQ有多种集群部署方式,它们的配置文件也是分开的,在安装包conf目录下有官方自带配置文件模版自上而下依次是:两主两从异步,两主两从同步,两主

    broker.conf,这个是相当于配置的简单模板,另外其他的配置也可以到上面说的2m-2s那些目录中去参考一下,这个broker.conf是不能直接使用的,因为broker启动的时候用-c参数传入配置文件,这里只认识*.properties的配置文件,所以这里应该分别执行:

    cp broker.conf broker1.properties
    cp broker1.properties broker2.properties

    如下:

    说明:

    • 2m-noslave: 多Master模式
    • 2m-2s-sync: 多Master多Slave模式,同步双写
    • 2m-2s-async:多Master多Slave模式,异步复制

    其中namesrvAddr:主机地址,brokerClusterName:集群名称,brokerName :分片名称 ,deleteWhen=04:删除文件时间点,默认是凌晨4点
    ,fileReservedTime=120:文件保留时间,默认48小时,brokerId:分片id编号 ;brokerRole分片角色。

    注意:其中主从之间的分片名称相同。主从区分是brokerId 主 0,从 1。brokerRole 主MASTER从SLAVE。

    • RocketMQ默认提供的配置文件都是最基本的,很多配置都是默认值,生产环境中需要根据实际情况进行修改。
    • #所属集群名字 brokerClusterName=rocketmq-cluster
    • #broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-a|broker-b
    • #0表示Master,>0表示Slave brokerId=0 #nameServer地址,分号分割 namesrvAddr=192.168.1.101:9876;192.168.1.102:9876
    • #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4
    • #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true
    • #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true
    • #Broker 对外服务的监听端口 listenPort=10911
    • #删除文件时间点,默认凌晨 4点 deleteWhen=04
    • #文件保留时间,默认 48 小时 fileReservedTime=120
    • #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824
    • #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000
    • #destroyMapedFileIntervalForcibly=120000
    • #redeleteHangedFileInterval=120000
    • #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88
    • #存储路径 storePathRootDir=/usr/local/alibaba-rocketmq/store
    • #commitLog 存储路径 storePathCommitLog=/usr/local/alibaba-rocketmq/store/commitlog
    • #消费队列存储路径存储路径 storePathConsumeQueue=/usr/local/alibaba-rocketmq/store/consumequeue
    • #消息索引存储路径 storePathIndex=/usr/local/alibaba-rocketmq/store/index
    • #checkpoint 文件存储路径 storeCheckpoint=/usr/local/alibaba-rocketmq/store/checkpoint
    • #abort 文件存储路径 abortFile=/usr/local/alibaba-rocketmq/store/abort
    • #限制的消息大小 maxMessageSize=65536
    • #flushCommitLogLeastPages=4
    • #flushConsumeQueueLeastPages=2
    • #flushCommitLogThoroughInterval=10000
    • #flushConsumeQueueThoroughInterval=60000
    • #Broker 的角色

        - ASYNC_MASTER 异步复制Master

        - SYNC_MASTER 同步双写Master

        - SLAVE brokerRole=ASYNC_MASTER

    • 刷盘方式 #- ASYNC_FLUSH
    • 异步刷盘 #- SYNC_FLUSH
    • 同步刷盘 flushDiskType=ASYNC_FLUSH
    • #checkTransactionMessageEnable=false
    • #发消息线程池数量 #sendMessageThreadPoolNums=128
    • #拉消息线程池数量 #pullMessageThreadPoolNums=128
    • Broker向namesrv注册
    1.      获取namesrv的地址列表(是乱序的)
    
    2.      遍历向每个namesrv注册topic的配置信息topicconfig
    
    

    •  Topic在broker文件上的存储json格式

      Namesrv接收Broker注册的topic信息, namesrv只存内存,但是broker有任务定时推送

      1.   接收数据向RouteInfoManager注册。

        Broker初始化加载本地配置,配置信息是以json格式存储在本地, rocketmq强依赖fastjson作转换, RocketMq通过ConfigMananger来管理配置加载以及持久化

    复制代码
     
    1.      加载topic配置${user.home}/store/config/topics.json
    {
    "dataVersion":{
               "counter":2,
               "timestatmp":1393729865073
    },
    "topicConfigTable":{
               //根据consumer的group生成的重试topic
               "%RETRY% group_name":{
                        "perm":6,
                        "readQueueNums":1,
                        "topicFilterType":"SINGLE_TAG",
                        "topicName":"%RETRY%group_name",
                        "writeQueueNums":1
               },

            "TopicTest":{
                "perm":6,  // 100读权限 , 10写权限   6是110读写权限
                        "readQueueNums":8,
                        "topicFilterType":"SINGLE_TAG",
                        "topicName":"TopicTest",
                        "writeQueueNums":8
               }
    
    }
    
    }

    2.加载消费进度偏移量  ${user.home}/store/config/consumerOffset.json
    {
    
    "offsetTable":{
           //重试队列消费进度为零
          "%RETRY% group_name@group_name":{0:0}, 
          //分组名group_name消费topic为TopicTest的进度为:
          // 队列queue=0  消费进度23
          // 队列 queue=2  消费进度为22  等等…

           "TopicTest@ group_name":{0:23,1:23,2:22,3:22,4:21,5:18,6:18,7:18}
    }
    }
     
    3. 加载消费者订阅关系 ${user.home}/store/config/subscriptionGroup.json
    {
    
             "dataVersion":{
                       "counter":1,
                       "timestatmp":1393641744664
             },
             "group_name":{
                                "brokerId":0,  //0代表这台broker机器为master,若要设为slave值大于0
                                "consumeBroadcastEnable":true,
                                "consumeEnable":true,
                                "consumeFromMinEnable":true,
                                "groupName":"group_name",
                                "retryMaxTimes":5,
                                "retryQueueNums":1,
                                "whichBrokerWhenConsumeSlowly":1
                       }
             }
    }
     
    复制代码

     2.1 broker的消息存储

     存储特点:

    如上图所示:
    (1)消息主体以及元数据都存储在**CommitLog**当中
    (2)Consume Queue相当于kafka中的partition,是一个逻辑队列,存储了这个Queue在CommiLog中的起始offset,log大小和MessageTag的hashCode。
    (3)每次读取消息队列先读取consumerQueue,然后再通过consumerQueue去commitLog中拿到消息主体。

    Rocketmq的消息的存储是由consume queue和 commitLog 配合完成的。

    ConsumeQueue是定长的结构,每1条记录固定的20个字节。

    Consumer消费消息的时候,要读2次:先读ConsumeQueue得到offset,再读CommitLog得到消息内容。

    2.1.1  CommitLog文件(物理队列

    CommitLog是用于存储真实的物理消息的结构,保存消息元数据,所有消息到达Broker后都会保存到commitLog文件,这里需要强调的是所有topic的消息都会统一保存在commitLog中。
    举个例子:当前集群有TopicA, TopicB,
    这两个Toipc的消息会按照消息到达的先后顺序保存到同一个commitLog中,而不是每个Topic有自己独立的commitLog
    onsumeQueue是逻辑队列,仅仅存储了CommitLog的位移而已,真实的存储都在本结构中。
    首先这里会使用CommitLog.this.topicQueueTable.put(key, queueOffset),其中的key是 topic-queueId, queueOffset是当前这个key中的消息数,每增加一个消息增加一(不会自减);
    这里queueOffset的用途如下:每次用户请求putMessage的时候,将queueOffset返回给客户端使用,这里的queueoffset表示逻辑上的队列偏移。

    消息存放物理文件,每台broker上的commitLog被本机器所有queue共享不做区分
    • commitlog文件的存储地址:$HOMEstorecommitlog${fileName}
    • 一个消息存储单元长度是不定的,顺序写但是随机读
    • 每个commitLog文件的默认大小为 1G =1024*1024*1024,满1G之后会自动新建CommitLog文件做保存数据用
    • commitlog的文件名fileName,名字长度为20位,左边补零,剩余为起始偏移量;比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当这个文件满了,

        第二个文件名字为00000000001073741824,起始偏移量为1073741824,以此类推,第三个文件名字为00000000002147483648,起始偏移量为2147483648消息存储的时候会顺序写入文件,

        当文件满了,写入下一个文件。

      CommitLog的清理机制:

    • 按时间清理,rocketmq默认会清理3天前的commitLog文件;
    • 按磁盘水位清理:当磁盘使用量到达磁盘容量75%,开始清理最老的commitLog文件。
    
    

    1)、CommitLog 文件生成规则

    偏移量:每个 CommitLog 文件的大小为 1G,一般情况下第一个 CommitLog 的起始偏移量为 0,第二个 CommitLog 的起始偏移量为 1073741824 (1G = 1073741824byte)。

    2)、怎么知道消息存储在哪个 CommitLog 文件上?

    假设 1073742827 为物理偏移量(物理偏移量也即全局偏移量),则其对应的相对偏移量为 1003(1003 = 1073742827 - 1073741824),并且该偏移量位于第二个 CommitLog。

    index 和 ComsumerQueue 中都有消息对应的物理偏移量,通过物理偏移量就可以计算出该消息位于哪个 CommitLog 文件上。

    复制代码
    文件地址:${user.home} store${commitlog}${fileName}
    
    消息存储结构:
      flag  这个标志值rocketmq不做处理,只存储后透传
      QUEUEOFFSET这个值是个自增值不是真正的consume queue的偏移量,可以代表这个队列中消息的个数,要通过这个值查找到consume queue中数据,QUEUEOFFSET * 20才是偏移地址
      PHYSICALOFFSET 代表消息在commitLog中的物理起始地址偏移量
      SYSFLAG消息标志,指明消息是事物事物状态等等消息特征
      BORNTIMESTAMP 消息产生端(producer)的时间戳
      BORNHOST     消息产生端(producer)地址(address:port)
      STORETIMESTAMP 消息在broker存储时间
      STOREHOSTADDRESS 消息存储到broker的地址(address:port)
      RECONSUMETIMES消息被某个订阅组重新消费了几次(订阅组之间独立计数),因为重试消息发送到了topic名字为%retry%groupName的队列queueId=0的队列中去了
      Prepared Transaction Offset 表示是prepared状态的事物消息
    复制代码

    2.1.2 ConsumeQueue文件组织:

    ConsumerQueue相当于CommitLog的索引文件,消费者消费时会先从ConsumerQueue中查找消息的在commitLog中的offset,再去CommitLog中找元数据。

    如果某个消息只在CommitLog中有数据,没在ConsumerQueue中, 则消费者无法消费,Rocktet的事务消息就是这个原理

    Consumequeue类对应的是每个topic和queuId下面的所有文件,相当于字典的目录用来指定消息在消息的真正的物理文件commitLog上的位置

    每条数据的结构如下图所示:

    消息的起始物理偏移量physical offset(long 8字节)+消息大小size(int 4字节)+tagsCode(long 8字节)。

    • 每个topic下的每个queue都有一个对应的consumequeue文件。
    • 文件默认存储路径:${user.home} storeconsumequeue${topicName}${queueId}${fileName}
    • 每个文件由30W条数据组成,每条数据的大小为20个字节,从而每个文件的默认大小为600万个字节(consume queue中存储单元是一个20字节定长的数据)是顺序写顺序读
    • commitLogOffset是指这条消息在commitLog文件实际偏移量
    • size就是指消息大小
    • 消息tag的哈希值
    ConsumeQueue几个重要的字段
      private final String topic; private final int queueId;//队列id private final ByteBuffer byteBufferIndex;// 写索引时用到的ByteBuffer private long maxPhysicOffset = -1;// 最后一个消息对应的物理Offset

    每个cosumequeue文件的名称fileName,名字长度为20位,左边补零,剩余为起始偏移量;

    比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为600W,

    当第一个文件满之后创建的第二个文件的名字为00000000000006000000,起始偏移量为6000000,以此类推,

    第三个文件名字为00000000000012000000,起始偏移量为12000000,消息存储的时候会顺序写入文件,当文件满了,写入下一个文件。

    • topic queueId来组织的:比如TopicA配了读写队列0、1,那么TopicA和Queue=0组成一个ConsumeQueue, TopicA和Queue=1组成一个另一个ConsumeQueue.
    • 按消费端group分组重试队列,如果消费端消费失败,发送到retry消费队列中
    • 按消费端group分组死信队列,如果消费端重试超过指定次数,发送死信队列
    • 每个ConsumeQueue可以由多个文件组成无限队列被MapedFileQueue对象管理

     2.1.3  MapedFile 是PageCache文件封装

    操作物理文件在内存中的映射以及将内存数据持久化到物理文件中,代码中写死了要求os系统的页大小为4k, 消息刷盘根据参数(commitLog默认至少刷4页, consumeQueue默认至少刷2页)才刷 

    以下io对象构建了物理文件映射内存的对象
    FileChannel fileChannel = new RandomAccessFile(file,“rw”).getChannel();
    MappedByteBuffer mappedByteBuffer=fileChannel.map(READE_WRITE,0,fileSize);
    构建mapedFile对象需要两个参数
    fileSize: 映射的物理文件的大小
    commitLog每个文件的大小默认1G =1024*1024*1024
    ConsumeQueue每个文件默认存30W条 = 300000 *CQStoreUnitSize(每条大小)
    filename: filename文件名称但不仅仅是名称还表示文件记录的初始偏移量, 文件名其实是个long类型的值

    2.1.4  MapedFileQueue 存储队列,数据定时删除,无限增长。

    队列有多个文件(MapedFile)组成,由集合对象List表示升序排列,前面讲到文件名即是消息在此文件的中初始偏移量,排好序后组成了一个连续的消息队     

     当消息到达broker时,需要获取最新的MapedFile写入数据,调用MapedFileQueue的getLastMapedFile获取,此函数如果集合中一个也没有创建一个,如果最后一个写满了也创建一个新的。
     MapedFileQueue在获取getLastMapedFile时,如果需要创建新的MapedFile会计算出下一个MapedFile文件地址,通过预分配服务AllocateMapedFileService异步预创建下一个MapedFile文件,这样下次创建新文件请求就不要等待,因为创建文件特别是一个1G的文件还是有点耗时的,
     getMinOffset获取队列消息最少偏移量,即第一个文件的文件起始偏移量
     getMaxOffset获取队列目前写到位置偏移量
     getCommitWhere刷盘刷到哪里了

    2.1.5 消息存储及消费过程

    1)消息发送流程:

    • Broker启动时,向NameServer注册信息
    • 客户端调用producer发送消息时,会先从NameServer获取该topic的路由信息。消息头code为GET_ROUTEINFO_BY_TOPIC
    • 从NameServer返回的路由信息,包括topic包含的队列列表和broker列表
    • Producer端根据查询策略,选出其中一个队列,用于后续存储消息
    • 每条消息会生成一个唯一id,添加到消息的属性中。属性的key为UNIQ_KEY
    • 对消息做一些特殊处理,比如:超过4M会对消息进行压缩
    • producer向Broker发送rpc请求,将消息保存到broker端。消息头的code为SEND_MESSAGE或SEND_MESSAGE_V2(配置文件设置了特殊标志)

    消息存储流程

    • Broker端收到消息后,将消息原始信息保存在CommitLog文件对应的MappedFile中,然后异步刷新到磁盘
    • ReputMessageServie线程异步的将CommitLog中MappedFile中的消息保存到ConsumerQueue和IndexFile中
    • ConsumerQueue和IndexFile只是原始文件的索引信息

    1)消息消费过程:

    现在我们再来看 Broker 服务器端。首先我们应该知道,消息往 Broker 存储就是在向 CommitLog 消息文件中写入数据的一个过程。

    在 Broker 启动过程中,其会启动一个叫做 ReputMessageService 的服务,这个服务每隔 1 秒会检查一下这个 CommitLog 是否有新的数据写入。

    ReputMessageService 自身维护了一个偏移量 reputFromOffset,用以对比和 CommitLog 文件中的消息总偏移量的差距。

    当这两个偏移量不同的时候,就代表有新的消息到来了,在有新的消息到来之后,doReput() 函数会取出新到来的所有消息,每一条消息都会封装为一个 DispatchRequest 请求,

    进而将这条请求分发给不同的请求消费者,我们在这篇文章中只会关注利用消息创建消费队列的服务 CommitLogDispatcherBuildConsumeQueue,

    CommitLogDispatcherBuildConsumeQueue 服务会根据这条请求按照不同的队列 ID 创建不同的消费队列文件,并在内存中维护一份消费队列列表。

    然后将 DispatchRequest 请求中这条消息的消息偏移量、消息大小以及消息在发送时候附带的标签的 Hash 值写入到相应的消费队列文件中去。

    3)客户端如何记录自己所消费的队列消费到哪里了呢?

    答案就是:消费队列偏移量

    集群模式:由于每个客户端所消费的消息队列不同,所以每个消息队列已经消费到哪里的消费偏移量是记录在 Broker 服务器端的。

    广播模式:由于每个客户端分配消费这个话题的所有消息队列,所以每个消息队列已经消费到哪里的消费偏移量是记录在客户端本地的。

    (1) 集群模式

    在集群模式下,消费者客户端在内存中维护了一个 offsetTable 表,同样在 Broker 服务器端也维护了一个偏移量表,在消费者客户端,RebalanceService 服务会定时地 (默认 20 秒) 从 Broker 服务器获取当前客户端所需要消费的消息队列,并与当前消费者客户端的消费队列进行对比,看是否有变化。对于每个消费队列,会从 Broker 服务器查询这个队列当前的消费偏移量。然后根据这几个消费队列,创建对应的拉取请求 PullRequest 准备从 Broker 服务器拉取消息,当从 Broker 服务器拉取下来消息以后,只有当用户成功消费的时候,才会更新本地的偏移量表。本地的偏移量表再通过定时服务每隔 5 秒同步到 Broker 服务器端,而维护在 Broker 服务器端的偏移量表也会每隔 5 秒钟序列化到磁盘中(文件地址:${user.home} /store/config/consume/consumerOffset.json)

    保存的格式如下所示:

    broker_offset_table

    (2) 广播模式

    对于广播模式而言,每个消费队列的偏移量肯定不能存储在 Broker 服务器端,因为多个消费者对于同一个队列的消费可能不一致,偏移量会互相覆盖掉。因此,在广播模式下,每个客户端的消费偏移量是存储在本地的,然后每隔 5 秒将内存中的 offsetTable 持久化到磁盘中。当首次从服务器获取可消费队列的时候,偏移量不像集群模式下是从 Broker 服务器读取的,而是直接从本地文件中读取

    这里提一下,在广播模式下,消息队列的偏移量默认放在用户目录下的 .rocketmq_offsets 目录下

    存储格式如下:

    broadcasting_offset_table_persist

    3. load、recover

    Broker启动的时候需要加载一系列的配置,启动一系列的任务,主要分布在BrokerController 的initialize()和start()方法中

    复制代码
    1.加载topic配置
    2.加载消费进度consumer offset
    3.加载消费者订阅关系consumer subscription
    4.加载本地消息messageStore.load()
      Load 定时进度,Load commit log,commitLog其实调用存储消费队列mapedFileQueue.load()方法来加载的。
      遍历出${user.home} store${commitlog}目录下所有commitLog文件,按文件名(文件名就是文件的初始偏移量)升序排一下, 每个文件构建一个MapedFile对象, 在MapedFileQueue中用集合list把这些MapedFile文件组成一个逻辑上连续的队列
    
      Load consume Queue
      遍历${user.home} storeconsumequeue下的所有文件夹(每个topic就是一个文件夹)
      遍历${user.home} storeconsumequeue${topic}下的所有文件夹(每个queueId就是一个文件夹)
      遍历${user.home} storeconsumequeue${topic}${queueId}下所有文件,根据topic, queueId, 文件来构建ConsueQueue对象
      DefaultMessageStore中存储结构Map<topic,Map<queueId, CosnueQueue>>
      每个Consumequeue利用MapedFileQueue把mapedFile组成一个逻辑上连续的队列
    
      加载事物模块
      加载存储检查点
      加载${user.home} storecheckpoint 这个文件存储了3个long类型的值来记录存储模型最终一致的时间点,这个3个long的值为
      physicMsgTimestamp为commitLog最后刷盘的时间
      logicMsgTimestamp为consumeQueue最终刷盘的时间
      indexMsgTimestamp为索引最终刷盘时间
      checkpoint作用是当异常恢复时需要根据checkpoint点来恢复消息
    
      加载索引服务indexService
      recover尝试数据恢复
      判断是否是正常恢复,系统启动的启动存储服务(DefaultMessageStore)的时候会创建一个临时文件abort, 当系统正常关闭的时候会把这个文件删掉,这个类似在Linux下打开vi编辑器生成那个临时文件,所有当这个abort文件存在,系统认为是异常恢复 
    复制代码

     

    复制代码
    1)  先按照正常流程恢复ConsumeQueue
    
    什么是恢复ConsumeQueue, 前面不是有步骤load了ConsumeQueue吗,为什么还要恢复?
    
    前面load步骤创建了MapedFile对象建立了文件的内存映射,但是数据是否正确,现在文件写到哪了(wrotePosition),
    Flush到了什么位置(committedPosition)?恢复数据来帮我解决这些问题。 每个ConsumeQueue的mapedFiles集合中,从倒数第三个文件开始恢复(为什么只恢复倒数三个文件,看消息消费程度),
    因为consumequeue的存储单元是20字节的定长数据,所以是依次分别取了 Offset long类型存储了commitLog的数据偏移量 Size int类型存储了在commitLog上消息大小 tagcode tag的哈希值 目前rocketmq判断存储的consumequeue数据是否有效的方式为判断offset>= 0 && size > 0 如果数据有效读取下20个字节判断是否有效 如果数据无效跳出循环,记录此时有效数据的偏移量processOffset 如果读到文件尾,读取下一个文件 proccessOffset是有效数据的偏移量,获取这个值的作用什么? (1)proccessOffset后面的数据属于脏数据,后面的文件要删除掉 (2)设置proccessOffset所在文件MapedFile的wrotePosition和commitedPosition值,值为 proccessOffset%mapedFileSize 2正常恢复commitLog文件 步骤跟流程恢复Consume Queue 判断消息有效, 根据消息的存储格式读取消息到DispatchRequest对象,获取消息大小值msgSize   大于 0 正常数据   等于-1 文件读取错误 恢复结束   等于0 读到文件末尾 3) 异常数据恢复,OSCRASH或者JVM CRASH或者机器掉电 当${user.home}storeabort文件存在,代表异常恢复 读取${user.home} storecheckpoint获取最终一致的时间点 判断最终一致的点所在的文件是哪个 从最新的mapedFile开始,获取存储的一条消息在broker的生成时间,大于checkpoint时间点的放弃找前一个文件,小于等于checkpoint时间点的说明checkpoint
    在此mapedfile文件中 从checkpoint所在mapedFile开始恢复数据,它的整体过程跟正常恢复commitlog类似,最重要的区别在于 (1)读取消息后派送到分发消息服务DispatchMessageService中,来重建ConsumeQueue以及索引 (2)根据恢复的物理offset,清除ConsumeQueue多余的数据 4)恢复TopicQueueTable=Map<topic-queueid,offset> (1)恢复写入消息时,消费记录队列的offset (2)恢复每个队列的最小offset 初始化通信层 初始化线程池 注册broker端处理器用来接收client请求后选择处理器处理 启动每天凌晨00:00:00统计消费量任务 启动定时刷消费进度任务 启动扫描数据被删除了的topic,offset记录也对应删除任务 如果namesrv地址不是指定的,而是从静态服务器取的,启动定时向静态服务器获取namesrv地址的任务 如果broker是master,启动任务打印slave落后master没有同步的bytes 如果broker是slave,启动任务定时到mastser同步配置信息
    复制代码

    3. master slave

    在broker启动的时候BrokerController如果是slave,配置了master地址更新,没有配置所有broker会想namesrv注册,从namesrv获取haServerAddr,然后更新到HAClient

    当HAClient的MasterAddress不为空的时候(因为broker  master和slave都构建了HAClient)会主动连接master获取SocketChannel Master监听Slave请求的端口,默认为服务端口+1

    接收slave上传的offset long类型 int pos = this.byteBufferRead.position() -(this.byteBufferRead.position() % 8) 

    //没有理解意图

     long readOffset =this.byteBufferRead.getLong(pos - 8);  this.processPostion = pos;

    主从复制从哪里开始复制:如果请求时0 ,从最后一个文件开始复制

    Slave启动的时候brokerController开启定时任务定时拷贝master的配置信息

    复制代码
    SlaveSynchronize类代表slave从master同步信息(非消息)
    
             syncTopicConfig       同步topic的配置信息
    
             syncConsumerOffset       同步消费进度
    
             syncDelayOffset                同步定时进度
    
             syncSubcriptionGroupConfig  同步订阅组配7F6E 
    复制代码

    HaService类实现了HA服务,负责同步双写,异步复制功能, 这个类master和slave的broker都会实例化,

    Master通过AcceptSocketService监听slave的连接,每个masterslave连接都会构建一个HAConnection对象搭建他们之间的桥梁,对于一个master多slave部署结构的会有多个HAConnection实例,

    Master构建HAConnection时会构建向slave写入数据服务线程对象WriteSocketService对象和读取Slave反馈服务线程对象ReadSocketService

    WriteSocketService

    向slave同步commitLog数据线程,
    
    slaveRequestOffset是每次slave同步完数据都会向master发送一个ack表示下次同步的数据的offset。
    
    如果slave是第一次启动的话slaveRequestOffset=0, master会从最近那个commitLog文件开始同步。(如果要把master上的所有commitLog文件同步到slave的话, 把masterOffset值赋为minOffset)

    向socket写入同步数据: 传输数据协议<Phy Offset> <Body Size> <Body Data>

    ReadSocketService:

    4 ReadSocketService

    复制代码
       读取slave通过HAClient向master返回同步commitLog的物理偏移量phyOffset值
    
             通知前端线程,如果是同步复制的话通知是否复制成功
    
     
    
    Slave 通过HAClient建立与master的连接, 
    
    来定时汇报slave最大物理offset,默认5秒汇报一次也代表了跟master之间的心跳检测
    
    读取master向slave写入commitlog的数据, master向slave写入数据的格式是
    复制代码

    复制代码
    Slave初始化DefaultMessageStore时候会构建ReputMessageService服务线程并在启动存储服务的start方法中被启动
    
    ReputMessageService的作用是slave从物理队列(由commitlog文件构成的MapedFileQueue)加载数据,并分发到各个逻辑队列
    
    HA同步复制, 当msg写入master的commitlog文件后,判断maser的角色如果是同步双写SYNC_MASTER, 等待master同步到slave在返回结果
    复制代码

    5 HA异步复制

    6.索引服务

    1索引结构

    IndexFile 存储具体消息索引的文件,文件的内容结构如图:

    索引文件由索引文件头IndexHeader, 槽位Slot和消息的索引内容三部分构成

    IndexHeader:索引文件头信息40个字节的数据组成

     

    复制代码
     beginTimestamp    8位long类型,索引文件构建第一个索引的消息落在broker的时间
    
    endTimestamp         8位long类型,索引文件构建最后一个索引消息落broker时间
    
    beginPhyOffset         8位long类型,索引文件构建第一个索引的消息commitLog偏移量
    
    endPhyOffset            8位long类型,索引文件构建最后一个索引消息commitLog偏移量
    
    hashSlotCount    4位int类型,构建索引占用的槽位数(这个值貌似没有具体作用)
    
    indexCount                4位int类型,索引文件中构建的索引个数
    复制代码

    槽位slot, 默认每个文件配置的slot个数为500万个,每个slot是4位的int类型数据

    计算消息的对应的slotPos=Math.abs(keyHash)%hashSlotNum

    消息在IndexFile中的偏移量absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos *HASH_SLOT_SIZE

    Slot存储的值为消息个数索引

    消息的索引内容是20位定长内容的数据

           

    复制代码
     4位int值, 存储的是key的hash值
    
             8位long值     存储的是消息在commitlog的物理偏移量phyOffset
    
             4位int值        存储了当前消息跟索引文件中第一个消息在broker落地的时间差
    
             4位int值        如果存在hash冲突,存储的是上一个消息的索引地址
    复制代码

    7. 索引服务IndexService线程

    复制代码
    1.      索引配置:hashSlotNum哈希槽位个数、indexNum存储索引的最大个数、storePath索引文件indexFile存储的路径
    
    2.      Load broker启动的时候加载本地IndexFile,
    
    如果是异常启动删除之后storeCheckPoint文件,因为commitLog根据storeCheckPoint会重建之后的索引文件,
    
    3.      Run方法,任务从阻塞队列中获取请求构建索引
    
    4.      queryOffset 根据topic key 时间跨度来查询消息
    
    倒叙遍历所有索引文件
    
    每一个indexfile存储了第一个消息和最后一个消息的存储时间,根据传入时间范围来判断索引是否落在此索引文件
    复制代码

    8. 构建索引服务

    复制代码
    分发消息索引服务将消息位置分发到ConsumeQueue中后,加入IndexService的LinkedBlockingQueue队列中,IndexService通过任务向队列中获取请求来构建索引
    
    剔除commitType或者rollbackType消息,因为这两种消息都有对应的preparedType的消息
    
    构建索引key(topic + "#" + key)
    
    根据key的hashcode计算槽位,即跟槽位最大值取余数
    
    计算槽位在indexfile的具体偏移量位置
    
    根据槽位偏移量获取存储的上一个索引
    
    计算消息跟文件头存储开始时间的时间差
    
    根据消息头记录的存储消息个数计算消息索引存储的集体偏移量位置
    
    写入真正的索引,内容参考上面索引内容格式
    
    将槽位中的更新为此消息索引
    
    更新索引头文件信息
    复制代码

     

     

    9.  Broker与client(comsumer ,producer)之间的心跳

    复制代码
    一:Broker接收client心跳ClientManageProcessor处理client的心跳请求
    
    1.      构建ClientChannelInfo对象
    
    1)  持有channel对象,表示与客户端的连接通道
    
    2)  ClientID表示客户端
    
    …..
    
    2.      每次心跳会更新ClientChannelInfo的时间戳,来表示client还活着
    
    3.      注册或者更新consumer的订阅关系(是以group为单位来组织的, group下可能有多个订阅关系)
    
    4.      注册producer,其实就是发送producer的group(这个在事物消息中才有点作用)
    
    二:ClientHouseKeepingService线程定时清除不活动的连接
    
    1)  ProducerManager.scanNotActiveChannel    默认两分钟producer没有发送心跳清除
    
    2)  ConsumerManager.scanNotActiveChannel   默认两份中Consumer没有发送心跳清除
    复制代码

    10. Broker与namesrv之间的心跳

    复制代码
    1)  namesrv接收borker心跳DefaultRequestProcessor的REGISTER_BROKE事件处理,
    
    (1)      注册broker的topic信息
    
    (2)      构建或者更新BrokerLiveInfo的时间戳
    
    NamesrvController初始化时启动线程定时调用RouteInfoManger的scanNotActiveBroker方法来定时不活动的broker(默认两分钟没有向namesrv发送心跳更新时间戳的) 
    复制代码
     

     

     
     
  • 相关阅读:
    OpenJDK源码研究笔记(十二):JDBC中的元数据,数据库元数据(DatabaseMetaData),参数元数据(ParameterMetaData),结果集元数据(ResultSetMetaDa
    Java实现 LeetCode 257 二叉树的所有路径
    Java实现 LeetCode 257 二叉树的所有路径
    Java实现 LeetCode 257 二叉树的所有路径
    Java实现 LeetCode 242 有效的字母异位词
    Java实现 LeetCode 242 有效的字母异位词
    Java实现 LeetCode 242 有效的字母异位词
    Java实现 LeetCode 241 为运算表达式设计优先级
    Java实现 LeetCode 241 为运算表达式设计优先级
    Java实现 LeetCode 241 为运算表达式设计优先级
  • 原文地址:https://www.cnblogs.com/mzcx/p/11412508.html
Copyright © 2011-2022 走看看