zoukankan      html  css  js  c++  java
  • 快速弄明白RocketMQ的CommitLog、ConsumeQueue、indexFile、offset

    CommitLog

    消息内容原文的存储文件,同Kafka一样,消息是变长的,顺序写入

    生成规则:

    每个文件的默认1G =1024 * 1024 * 1024,commitlog的文件名fileName,名字长度为20位,左边补零,剩余为起始偏移量;比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1 073 741 824Byte;当这个文件满了,第二个文件名字为00000000001073741824,起始偏移量为1073741824, 消息存储的时候会顺序写入文件,当文件满了则写入下一个文件

     ConsumeQueue

    ConsumeQueue中并不需要存储消息的内容,而存储的是消息在CommitLog中的offset。也就是说,ConsumeQueue其实是CommitLog的一个索引文件。

    一个ConsumeQueue文件对应topic下的一个队列

     ConsumeQueue是定长的结构,每1条记录固定的20个字节。很显然,Consumer消费消息的时候,要读2次:先读ConsumeQueue得到offset,再读CommitLog得到消息内容

     ConsumeQueue的作用

    1. 通过broker保存的offset可以在ConsumeQueue中获取消息,从而快速的定位到commitLog的消息位置
    2. 过滤tag是也是通过遍历ConsumeQueue来实现的(先比较hash(tag)符合条件的再到consumer比较tag原文)
    3. 并且ConsumeQueue还能保存于操作系统的PageCache进行缓存提升检索性能

    下面是我解析的ConsumeQueue

      public static void main(String[] args) throws IOException {
            decodeCQ(new File("D:\00000000000000000000"));
        }
    
        static void decodeCQ(File consumeQueue) throws IOException {
            FileInputStream fis = new FileInputStream(consumeQueue);
            DataInputStream dis = new DataInputStream(fis);
    
            long preTag = 0;
            long count = 1;
            while (true) {
                long offset = dis.readLong();
                int size = dis.readInt();
                long tag = dis.readLong();
    
                if (size == 0) {
                    break;
                }
                preTag = tag;
                System.out.printf(" %d %d %d
    ",   tag, size, offset);
            }
            fis.close();
      }
    hash(tag)|size|offset(commitLog)
    3552231 191 180081 3552231 191 180654 3552231 191 180845 3552231 191 182182 3552231 192 182565 121074 201 182757 3552231 245 190411 3552231 245 190656 3552231 245 190901 3552231 245 191146 3552231 245 191391 3552231 245 191636 3552231 245 191881 99 197 219910 99 197 220107 99 197 220304

    indexFile

    如果我们需要根据消息ID,来查找消息,consumequeue 中没有存储消息ID,如果不采取其他措施,又得遍历 commitlog文件了,indexFile就是为了解决这个问题的文件

    offset

    offset就是message queue的下标(和commitLog的offset不是一回事,这个offset是ConsumeQueue文件的下标/行数),一条消息进入队列下标就会+1

    offset持久化

    类型(父类是OffsetStore):

    • 本地文件类型

    DefaultMQPushConsumer的BROADCASTING模式,各个Consumer没有互相干扰,使用LoclaFileOffsetStore,把Offset存储在Consumer本地

    • Broker代存储类型

    DefaultMQPushConsumer的CLUSTERING模式,由Broker端存储和控制Offset的值,使用RemoteBrokerOffsetStore

    {
        "offsetTable":{
            "zxp_test_topic@zxp_test_group2":{0:16,1:17,2:23,3:43
            },
            "TopicTest@please_rename_unique_group_name_4":{0:250,1:250,2:250,3:250
            },
            "%RETRY%zxp_test_group2@zxp_test_group2":{0:3
            },
            "%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0
            },
            "%RETRY%zxp_test_group3@zxp_test_group3":{0:0
            },
            "order_topic@zxp_test_group3":{0:0,1:3,2:3,3:3
            }
        }
    }

  • 相关阅读:
    InterView
    单例设计模式(附AS3)
    网络游戏的分类
    flex的动态链接库和静态链接库
    flash 中位图九宫格的用法
    删除Flex Builder中没用的工作空间
    一个3D游戏DEMO, 同步教程更新
    LVS NET部署
    zabbix安装
    MySQL+keepalived高可用配置
  • 原文地址:https://www.cnblogs.com/zxporz/p/12336476.html
Copyright © 2011-2022 走看看