zoukankan      html  css  js  c++  java
  • rocketMq 消息偏移量 Offset

    消息偏移量 Offset

    queue0  offset 0   0-20  offset 1  20-40

    纠错:每条消息的tag对应的HashCode.

    queue1  offset 0  0-20  offset 1  20-40

    queue2  offset 0  0-20  offset 1  20-40

    queue3  offset 0 0-20  offset 1  20-40

    SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2BFB0000, offsetMsgId=C0A81F9800002A9F0000000000000000(每条消息偏移量,以十六进制表示消息长度,转换成十进制0), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0(队列偏移量,当前队列增加1条就自增1)]
    SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DE20001, offsetMsgId=C0A81F9800002A9F00000000000000CB(十六进制表示消息长度,转换成十进制203), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]
    SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DEC0002, offsetMsgId=C0A81F9800002A9F0000000000000196(十六进制表示消息长度,转换成十进制406), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=0]
    SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DEC0003, offsetMsgId=C0A81F9800002A9F0000000000000261(十六进制表示消息长度,转换成十进制609), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=0]
    SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DF60004, offsetMsgId=C0A81F9800002A9F000000000000032C, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=1]
    SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DF60005, offsetMsgId=C0A81F9800002A9F00000000000003F7, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=1]
    SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DF60006, offsetMsgId=C0A81F9800002A9F00000000000004C2, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=1]
    SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2E000007, offsetMsgId=C0A81F9800002A9F000000000000058D, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=1]

    概念

    • message queue 是无限长的数组,一条消息进来下标就会涨1,下标就是 offset,消息在某个 MessageQueue 里的位置,通过 offset 的值可以定位到这条消息,或者指示 Consumer 从这条消息开始向后处理。
    • message queue 中的 maxOffset 表示消息的最大 offset,maxOffset 并不是最新的那条消息的 offset,而是最新消息的 offset+1,minOffset 则是现存在的最小 offset。
    • fileReserveTime=48 默认消息存储48小时后,消费会被物理地从磁盘删除,message queue 的 minOffset 也就对应增长。所以比 minOffset 还要小的那些消息已经不在 broker上了,就无法被消费

    类型(父类是OffsetStore):

    • 本地文件类型
      • DefaultMQPushConsumer 的 BROADCASTING 广播模式,各个 Consumer 没有互相干扰,使用 LocalFileOffsetStore,把 Offset 存储在本地
    • Broker 代存储类型
      • DefaultMQPushConsumer 的 CLUSTERING 集群模式,由 Broker 端存储和控制 Offset 的值,使用 RemoteBrokerOffsetStore

    作用

    • 主要是记录消息的偏移量,有多个消费者进行消费
    • 集群模式下采用 RemoteBrokerOffsetStore,broker 控制 offset 的值
    • 广播模式下采用 LocalFileOffsetStore,消费端存储

    建议采用 pushConsumer,RocketMQ 自动维护 OffsetStore,如果用另外一种 pullConsumer 需要自己进行维护 OffsetStore

    消息存储 CommitLog

    消息存储是由 ConsumeQueue 和 CommitLog 配合完成

    • ConsumeQueue 是逻辑队列,CommitLog 是真正存储消息文件的,ConsumeQueue 存储的是指向物理存储的地址。Topic 下的每个 message queue 都有对应的 ConsumeQueue 文件,内容也会被持久化到磁盘。默认地址:store/consumequeue/{topicName}/{queueid}/fileName
    • CommitLog:存储消息真正内容的文件。
      • 生成规则:
        • 每个文件的默认1G =1024 * 1024 * 1024,commitlog 的文件名 fileName,名字长度为20位,左边补零,剩余为起始偏移量;比如 00000000000000000000 代表了第一个文件,起始偏移量为0,文件大小为1G=1 073 741 824 Byte;当这个文件满了,第二个文件名字为00000000001073741824,起始偏移量为1073741824,消息存储的时候会顺序写入文件,当文件满了则写入下一个文件。
      • 判断消息存储在哪个 CommitLog 上
        • 例如 1073742827 为物理偏移量,则其对应的相对偏移量为 1003 = 1073742827 - 1073741824,并且该偏移量位于第二个 CommitLog。

    Broker 里面一个 Topic 里面有多个 MesssageQueue,每个 MessageQueue 对应一个 ConsumeQueue,ConsumeQueue 里面记录的是消息在 CommitLog 里面的物理存储地址。

    IndexFile 消息索引文件

        ConsumerQueue是通过偏移量offset去CommitLog文件中查找消息,但实际工作应用中,我们想查找某条具体的消息,并不知道offset值,那该怎么办呢?那IndexFile作用就来了。
        IndexFile是消息索引文件,如果一个生产者发送的消息包含key值的话,会使用IndexFile存储消息索引,主要用于使用key来查询消息。文件的内容结构如图

     在Broker端,通过Key来计算Hash槽的位置,从而找到Index索引数据。从Index索引中拿到消息的物理偏移量,最后根据这个物理偏移量,直接到CommitLog文件中去找就可以了。另外说明下,通过IndexFile来查找消息的方法不影响RocketMQ的正常生产-消费流程,它只是查询定位消息的方法而已。

  • 相关阅读:
    BZOJ1862: [Zjoi2006]GameZ游戏排名系统
    BZOJXXXX: [IOI2000]邮局——四边形不等式优化初探
    BZOJ1801: [Ahoi2009]chess 中国象棋
    BZOJ1505: [NOI2004]小H的小屋
    BZOJ1899: [Zjoi2004]Lunch 午餐
    BZOJ1057: [ZJOI2007]棋盘制作
    BZOJ1100: [POI2007]对称轴osi
    BZOJ1123: [POI2008]BLO
    线性规划之单纯形讲解
    BZOJ 3265 志愿者招募加强版(单纯形)
  • 原文地址:https://www.cnblogs.com/jrkl/p/14167543.html
Copyright © 2011-2022 走看看