zoukankan      html  css  js  c++  java
  • rocketMq 消息偏移量 Offset

    消息偏移量 Offset

    queue0  offset 0   0-20  offset 1  20-40

    纠错:每条消息的tag对应的HashCode.

    queue1  offset 0  0-20  offset 1  20-40

    queue2  offset 0  0-20  offset 1  20-40

    queue3  offset 0 0-20  offset 1  20-40

    SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2BFB0000, offsetMsgId=C0A81F9800002A9F0000000000000000(每条消息偏移量,以十六进制表示消息长度,转换成十进制0), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0(队列偏移量,当前队列增加1条就自增1)]
    SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DE20001, offsetMsgId=C0A81F9800002A9F00000000000000CB(十六进制表示消息长度,转换成十进制203), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]
    SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DEC0002, offsetMsgId=C0A81F9800002A9F0000000000000196(十六进制表示消息长度,转换成十进制406), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=0]
    SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DEC0003, offsetMsgId=C0A81F9800002A9F0000000000000261(十六进制表示消息长度,转换成十进制609), messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=0]
    SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DF60004, offsetMsgId=C0A81F9800002A9F000000000000032C, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=1]
    SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DF60005, offsetMsgId=C0A81F9800002A9F00000000000003F7, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=1]
    SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2DF60006, offsetMsgId=C0A81F9800002A9F00000000000004C2, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=1]
    SendResult [sendStatus=SEND_OK, msgId=C0A81F98545C18B4AAC270AF2E000007, offsetMsgId=C0A81F9800002A9F000000000000058D, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=1]

    概念

    • message queue 是无限长的数组,一条消息进来下标就会涨1,下标就是 offset,消息在某个 MessageQueue 里的位置,通过 offset 的值可以定位到这条消息,或者指示 Consumer 从这条消息开始向后处理。
    • message queue 中的 maxOffset 表示消息的最大 offset,maxOffset 并不是最新的那条消息的 offset,而是最新消息的 offset+1,minOffset 则是现存在的最小 offset。
    • fileReserveTime=48 默认消息存储48小时后,消费会被物理地从磁盘删除,message queue 的 minOffset 也就对应增长。所以比 minOffset 还要小的那些消息已经不在 broker上了,就无法被消费

    类型(父类是OffsetStore):

    • 本地文件类型
      • DefaultMQPushConsumer 的 BROADCASTING 广播模式,各个 Consumer 没有互相干扰,使用 LocalFileOffsetStore,把 Offset 存储在本地
    • Broker 代存储类型
      • DefaultMQPushConsumer 的 CLUSTERING 集群模式,由 Broker 端存储和控制 Offset 的值,使用 RemoteBrokerOffsetStore

    作用

    • 主要是记录消息的偏移量,有多个消费者进行消费
    • 集群模式下采用 RemoteBrokerOffsetStore,broker 控制 offset 的值
    • 广播模式下采用 LocalFileOffsetStore,消费端存储

    建议采用 pushConsumer,RocketMQ 自动维护 OffsetStore,如果用另外一种 pullConsumer 需要自己进行维护 OffsetStore

    消息存储 CommitLog

    消息存储是由 ConsumeQueue 和 CommitLog 配合完成

    • ConsumeQueue 是逻辑队列,CommitLog 是真正存储消息文件的,ConsumeQueue 存储的是指向物理存储的地址。Topic 下的每个 message queue 都有对应的 ConsumeQueue 文件,内容也会被持久化到磁盘。默认地址:store/consumequeue/{topicName}/{queueid}/fileName
    • CommitLog:存储消息真正内容的文件。
      • 生成规则:
        • 每个文件的默认1G =1024 * 1024 * 1024,commitlog 的文件名 fileName,名字长度为20位,左边补零,剩余为起始偏移量;比如 00000000000000000000 代表了第一个文件,起始偏移量为0,文件大小为1G=1 073 741 824 Byte;当这个文件满了,第二个文件名字为00000000001073741824,起始偏移量为1073741824,消息存储的时候会顺序写入文件,当文件满了则写入下一个文件。
      • 判断消息存储在哪个 CommitLog 上
        • 例如 1073742827 为物理偏移量,则其对应的相对偏移量为 1003 = 1073742827 - 1073741824,并且该偏移量位于第二个 CommitLog。

    Broker 里面一个 Topic 里面有多个 MesssageQueue,每个 MessageQueue 对应一个 ConsumeQueue,ConsumeQueue 里面记录的是消息在 CommitLog 里面的物理存储地址。

    IndexFile 消息索引文件

        ConsumerQueue是通过偏移量offset去CommitLog文件中查找消息,但实际工作应用中,我们想查找某条具体的消息,并不知道offset值,那该怎么办呢?那IndexFile作用就来了。
        IndexFile是消息索引文件,如果一个生产者发送的消息包含key值的话,会使用IndexFile存储消息索引,主要用于使用key来查询消息。文件的内容结构如图

     在Broker端,通过Key来计算Hash槽的位置,从而找到Index索引数据。从Index索引中拿到消息的物理偏移量,最后根据这个物理偏移量,直接到CommitLog文件中去找就可以了。另外说明下,通过IndexFile来查找消息的方法不影响RocketMQ的正常生产-消费流程,它只是查询定位消息的方法而已。

  • 相关阅读:
    Understanding about Baire Category Theorem
    Isometric embedding of metric space
    Convergence theorems for measurable functions
    Mindmap for "Principles of boundary element methods"
    Various formulations of Maxwell equations
    Existence and uniqueness theorems for variational problems
    Kernels and image sets for an operator and its dual
    [loj6498]农民
    [luogu3781]切树游戏
    [atAGC051B]Three Coins
  • 原文地址:https://www.cnblogs.com/jrkl/p/14167543.html
Copyright © 2011-2022 走看看