zoukankan      html  css  js  c++  java
  • RocketMq源码学习-消息存储

    Jdk基础

    ByteBuffer

    • position
    • limit
    • capacity
    • DirectByteBuffer与HeapByteBuffer

    MappedByteBuffer

    在深入MappedByteBuffer之前,先看看计算机内存管理的几个术语:参考

    • MMC:CPU的内存管理单元。
    • 物理内存:即内存条的内存空间。
    • 虚拟内存:计算机系统内存管理的一种技术。它使得应用程序认为它拥有连续的可用的内存(一个连续完整的地址空间),而实际上,它通常是被分隔成多个物理内存碎片,还有部分暂时存储在外部磁盘存储器上,在需要时进行数据交换。
    • 页面文件:操作系统反映构建并使用虚拟内存的硬盘空间大小而创建的文件,在windows下,即pagefile.sys文件,其存在意味着物理内存被占满后,将暂时不用的数据移动到硬盘上。
    • 缺页中断:当程序试图访问已映射在虚拟地址空间中但未被加载至物理内存的一个分页时,由MMC发出的中断。如果操作系统判断此次访问是有效的,则尝试将相关的页从虚拟内存文件中载入物理内存。

    java nio中引入了一种基于MappedByteBuffer操作大文件的方式,通过内存映射的方法访问硬盘上的文件。一般来说,从硬盘上将文件读入内存,都要经过文件系统进行数据拷贝,并且数据拷贝操作是由文件系统和硬件驱动实现的。

    • 传统的read() 操作是系统调用,需要将文件从硬盘拷贝到内核空间的缓冲区,再将数据拷贝到用户空间,实际上进行两次数据拷贝。
    • map()也是系统调用,但不需要数据拷贝,当发生缺页中断时,直接将数据从硬盘拷贝到用户空间,只进行一次数据拷贝。

    参考资料

    NIO进阶篇:Page Cache、零拷贝、顺序读写、堆外内存

    MappedByteBuffer的一点优化

    Understanding Java Buffer Pool Memory Space

    关于mapperbytebuffer自己的一点总结:

    • Page cache 是操作系统级别,用于平衡io慢于cpu速度的一种机制。读写文件时,不直接访问磁盘,而是先写入内核空间的Page chche,操作系统会异步将dirty page写入磁盘中。
    • MappedByteBuffer底层调用系统级别的mmap即内存映射,MappedByteBuffer.map()基于mmap技术。
    • mmap可以将磁盘文件直接映射内存中,即程序可以在用户空间直接访问page cache。正常的read/write需要将page cache再拷贝一次到用户空间,程序再操作用户空间的数据。
    • MappedByteBuffer子类DirectByteBuffer与HeapByteBuffer,由于HeapByteBuffer存在于堆空间中,受GC影响可能会在物理内存地址中移动,所以不能直接用于IO写入。需要先将HeapByteBuffer拷贝至DirectByteBuffer,再写入page cache。

    示例1

    /**
     * 1.创建文件
     * 消息存储在commitlog,MappedFile使用RandomAccessFile创建MappedByteBuffer
     * 参考代码:
     *
     * @see MappedFileQueue#getLastMappedFile(long, boolean)
     * @see MappedFile#MappedFile(java.lang.String, int)
     * 以下示例会创建一个mappedBufFile.txt文件,并使用MappedByteBuffer映射为10M大小的文件。
     * RocketMq通过移动文件中的offset写入数据
     */
    @Test
    public void test1MappedByteBuffer() throws IOException {
        final RandomAccessFile randomAccessFile = new RandomAccessFile(rootPath + "testFile/mappedBufFile.txt", "rw");
        final MappedByteBuffer buffer = randomAccessFile.getChannel().map(MapMode.READ_WRITE, 0, 10 * 1024 * 1024);
        randomAccessFile.close();
    }
    

    如代码示例,通过MappedByteBuffer会再硬盘创建一个10M大小的空白文件。虽然没有写入任何数据,但已经占用了10M空间。RocketMq也是通过MappedByteBuffer默认创建1G大小的文件保存每条消息,再通过操作MappedByteBuffer中的位置来写入和读取数据。

    示例2

    /**
         * 2.写入数据
         * Rocket提供两种方式写入:
         * 1)直接操作offset写入数据
         * 2)通过AppendMessageCallback写入数据
         *
         * @see MappedFile#appendMessage(byte[])
         * @see MappedFile#appendMessage(MessageExtBrokerInner, AppendMessageCallback)
         */
    @Test
    public void test2WriteMsg() throws IOException {
        //删除旧文件
        final String name = rootPath + "testFile/mappedBufFile2.txt";
        final Path path = new File(name).toPath();
        if (Files.exists(path)) {
            Files.delete(path);
        }
        // 新建文件,并完成内存映射
        // MappedByteBuffer一旦创建就于FileChannel无关
        final RandomAccessFile randomAccessFile = new RandomAccessFile(name, "rw");
        final MappedByteBuffer buffer = randomAccessFile.getChannel().map(MapMode.READ_WRITE, 0, 10 * 1024 * 1024);
        // 写入数据
        /*        int writePosition = 0;
            final ByteBuffer allocate = ByteBuffer.allocate(1024);
            final byte[] bytes = "hello world!!!".getBytes();
            allocate.put(bytes);
            allocate.flip();*/
        // #####  FileInputStream和FileChannel最终均调用了native的ReadFile方法,本质时一样的 ###########
        // #####  所以下面的代码不正确 #######
        /*        final FileChannel channel = randomAccessFile.getChannel();
            channel.position(writePosition);
            channel.write(allocate);
            buffer.put("hello world!!!".getBytes());*/
        //        assert allocate.position() == bytes.length;
        // 指定文件写入位置,吸入数据
        /*        writePosition = allocate.position() + 100;
            channel.position(writePosition);
            allocate.flip();
            channel.write(allocate);
            channel.close();
            randomAccessFile.close();*/
    
        // 通过map内存映射才有速度优势
        buffer.put("I love china".getBytes());
    
        //读入数据
        final RandomAccessFile readFile = new RandomAccessFile(name, "rw");
        final FileChannel readChannel = readFile.getChannel();
        final ByteBuffer readAllocate = ByteBuffer.allocate(8096);
        readChannel.read(readAllocate);
        readAllocate.flip();
        final byte[] readBytes = new byte[8096];
        readAllocate.get(readBytes);
        //这里byte为0没有输入,所以这里输出的hello world是连续的
        System.out.println(new String(readBytes));
        /*        final byte[] charByte = new byte[1];
            //读取第二次写入的字符
            System.arraycopy(readBytes, writePosition, charByte,0 ,1);
            System.out.println(new String(charByte));*/
    }
    

    通过MappedByteBuffer内存映射,并指定position完成数据写入,如果position位置原来就有数据则会覆盖。

    这里给出第一个示例时错误的。即代码注释部分。

    FileInputStream和 FileChannel 最终均调用了native的ReadFile方法,所以FileChannel读写并没有优势,都需要将内核空间page cache中的内存copy到用户空间。

    只有使用map()方法通过底层mmap()内存映射,直接将内核空间page cache映射到用户空间,用户空间可以直接操作pagecache,避免一次copy才有读写优势。

    关键对象

    1. DefaultMessageStore:消息存储的入口,封装了所有涉及存储相关的类,如CommitLog写数据,FlushConsumeQueueService刷盘,IndexService索引文件服务
    2. MessageStoreConfig:消息存储相关的所有配置,如文件大小1G
    3. CommitLog:负责文件相关操作,如调用MappedFileQueue写入数据,删除过期文件,PutMessageLock上锁等
    4. MappedFileQueue:维护MappedFile列表,负责创建新的MappedFile等
    5. ReputMessageService:定时获取commitlog最新的消息,并转发写入ComsumeQueue和IndexFile
    6. ConsumeQueue:每个topic会有单独的ConsumeQueue文件,记录该topic下的消息对应到commitLog的offset
    7. IndexFile:commitlog所有msg的索引文件,可以直接根据topic+msgId查具体的msg
    8. MappedFile:对应具体的消息文件,负责消息写入和读取相关操作。
    9. Message:Produce发送的消息结构
    10. MessageExt/MessageExtInner:RocketMq系统内部流转的消息结构,增加如消息id,创建时间,存储时间等必要信息。
    11. TransientStorePool:堆外内存池,由MessageStoreConfig配置的transientStorePoolEnable是否启用。如果启用,MappedFile写数据时使用TransientStorePool创建的堆外内存。
    12. MessageDecoder:见名识义,提供消息的编/解码,如解码存储在commitLog中的消息的properties。在org.apache.rocketmq.common.message包下

    消息结构

    UML

    image-20210710140739022

    • Message:Producer发送的消息定义为Message类,如发送消息需要指定topic,body,tag标签存储在properties的Map中
    • MessageExt/MessageExtBrokerInner:在RocketMQ内部流转的消息结构,记录了更多必要的信息,如生成消息的id,消息创建时间,存储时间等

    MessageExt

    /**
     * 对于发送方来说,上述Message的定义以足够。但对于RocketMQ的整个处理流程来说,
     * 还需要更多的字段信息用以记录一些必要内容,比如消息的id、创建时间、存储时间等等。
     */
    public class MessageExt extends Message {
    
        private static final long serialVersionUID = 5720810158625748049L;
    
        private String brokerName;
        /**
         * 记录MessageQueue编号,消息会被发送到Topic下的MessageQueue
         */
        private int queueId;
        /**
         * 记录消息在Broker存盘大小
         */
        
        private int storeSize;
        /**
         * 记录在ConsumeQueue中的偏移
         *
         */
        private long queueOffset;
        /**
         * 记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识
         * @see MessageSysFlag
         */
        private int sysFlag;
        /**
         * 消息创建时间,在Producer发送消息时设置
         */
        private long bornTimestamp;
        /**
         * 消息创建的主机地址
         */
        private SocketAddress bornHost;
        /**
         * 消息存储时间
         */
        private long storeTimestamp;
        /**
         * 记录存储该消息的Broker地址
         */
        private SocketAddress storeHost;
        /**
         * 消息Id.
         * msgId则是消息在Broker端进行存储时通过MessageDecoder.createMessageId方法生成的
         * @see MessageDecoder#createMessageId(java.nio.ByteBuffer, java.nio.ByteBuffer, long)
         * broker address + commitlog offset
         */
        private String msgId;
        /**
         * 	记录在Broker中存储偏移
         */
        private long commitLogOffset;
        /**
         * 消息内容CRC校验值
         */
        private int bodyCRC;
        /**
         * 消息重试消费次数
         */
        private int reconsumeTimes;
        /**
         * 事务详细相关字段
         */
        private long preparedTransactionOffset;
    }
    

    CommitLog消息存储图示:来源

    CommitLog 存储结构:

    第几位 字段 说明 数据类型 字节数
    1 MsgLen 消息总长度 Int 4
    2 MagicCode MESSAGE_MAGIC_CODE Int 4
    3 BodyCRC 消息内容CRC Int 4
    4 QueueId 消息队列编号 Int 4
    5 Flag flag Int 4
    6 QueueOffset 消息队列位置 Long 8
    7 PhysicalOffset 物理位置。在 CommitLog 的顺序存储位置。 Long 8
    8 SysFlag MessageSysFlag Int 4
    9 BornTimestamp 生成消息时间戳 Long 8
    10 BornHost 生效消息的地址+端口 Long 8
    11 StoreTimestamp 存储消息时间戳 Long 8
    12 StoreHost 存储消息的地址+端口 Long 8
    13 ReconsumeTimes 重新消费消息次数 Int 4
    14 PreparedTransationOffset Long 8
    15 BodyLength + Body 内容长度 + 内容 Int + Bytes 4 + bodyLength
    16 TopicLength + Topic Topic长度 + Topic Byte + Bytes 1 + topicLength
    17 PropertiesLength + Properties 拓展字段长度 + 拓展字段 Short + Bytes 2 + PropertiesLength

    indexFile结构

    • indexFile的文件名为起始时间戳

    • IndexHeader

      beginTimestampIndex 第一个消息的存储时间
      endTimestampIndex 最后一个消息的存储时间
      beginPhyoffsetIndex 索引文件第一个消息在CommitLog的物理位置偏移量
      endPhyoffsetIndex 索引文件最后一个消息在CommitLog的物理位置偏移量
      hashSlotcountIndex 索引文件hash slot已写入的个数
      indexCountIndex 索引文件已写入索引个数
    • hash slot:hash 槽

    • index enrty

      hashcode hash值
      phyOffset message在CommitLog的物理文件地址
      timeDiff 消息存储时间与header里的beginTimestamp的差值(节省空间)
      prevIndex 保存上条消息的位置,形成访问链,因为hash slot只保存最后的index entry位置

    写入流程

    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        //key是 DispatchRequest 中的uniqKey(uuid)+Topic生成key
        if (this.indexHeader.getIndexCount() < this.indexNum) {
            //即key的hash值
            int keyHash = indexKeyHashMethod(key);
            //取余得到hash slot的位置
            int slotPos = keyHash % this.hashSlotNum;
            // 计数hash slot的位置,注意这里是slot
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
    
            FileLock fileLock = null;
    
            try {
    
                // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
                // false);
                // 取出当前hash槽的值,这里slot的值总是保存为当前indexHeader的IndexCount
                // 初始值为0,如果取出不为0,这说明hash冲突
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }
                 //计算时间差
                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
    
                timeDiff = timeDiff / 1000;
    
                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }
                // 根据当前的index count计算index entry的位置,这里是index
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;
    
                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                // 保存为上一个slot value,形成链表,可以通过此slot value访问hash冲突的值
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
                // 将hash 槽的值保存为当前的index count
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
                // 更新header
                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }
    
                if (invalidIndex == slotValue) {
                    this.indexHeader.incHashSlotCount();
                }
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);
    
                return true;
            } 
            //省略部分代码。。。。。
        } 
    
        return false;
    }
    

    读出流程,则是根据key取余运算,通过hash slot的value,作为链表的末端不断向前遍历,获得所有的phyOffset。调用者拿到所有的phyOffset读出commitlog的数据后再过滤一次。

    ConsumeQueue

    所有的消息都会依次存储在相同的CommitLog文件,所以消费消息时,无法直接使用CommitLog找到对应topic和tag的消息,而是依赖于ConsumeQueue文件。

    依赖于ReputMessageService服务不断拉取CommitLog最新的消息,并通过CommitLogDispatcherBuildConsumeQueue异步将消息在CommitLog的偏移量更新到ConsumerQueue文件中。

    ConsumerQueue的存储结构时,CommitLogOffset,msgSize,tagsCode如下图:

    image-20210711234123034

    所以根据一个逻辑的偏移量logicOffset,如第几条消息,可以通过公式:logicOffset*20获得消息在CommitLog相应的物理偏移量physicalOffset以及消息长度,就可以通过这两个参数取出commitlog中的数据。

    存储目录结构

    image-20210711232244614

    DefaultMessageStore

    public class DefaultMessageStore implements MessageStore {
        //存储相关的配置,例如存储路径、commitLog文件大小,刷盘频次等等
        private final MessageStoreConfig messageStoreConfig;
        // CommitLog
        //核心处理类,消息存储在commitlog文件中
        private final CommitLog commitLog;
        //topic的消费队列
        private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
        //ConsumeQueue刷盘服务线程
        private final FlushConsumeQueueService flushConsumeQueueService;
        //commitLog 定时清除服务现场
        private final CleanCommitLogService cleanCommitLogService;
        //consumeQueue 定时清除服务现场
        private final CleanConsumeQueueService cleanConsumeQueueService;
        // indexService 索引服务
        private final IndexService indexService;
        //MappedFile分配线程,RocketMQ使用内存映射处理commitlog,consumeQueue文件
        private final AllocateMappedFileService allocateMappedFileService;
        //重试存储消息服务现场
        private final ReputMessageService reputMessageService;
        //主从同步实现服务
        private final HAService haService;
        // 定时任务调度器,执行定时任务,主要是处理定时任务。
        private final ScheduleMessageService scheduleMessageService;
        //存储统计服务
        private final StoreStatsService storeStatsService;
        //DataBuffer池,后文会详细使用
        private final TransientStorePool transientStorePool;
        // 存储服务状态
        private final RunningFlags runningFlags = new RunningFlags();
        private final SystemClock systemClock = new SystemClock();
    
        private final ScheduledExecutorService scheduledExecutorService =
            Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
        // Broker统计服务
        private final BrokerStatsManager brokerStatsManager;
        //消息达到监听器
        private final MessageArrivingListener messageArrivingListener;
        //broker 配置文件
        private final BrokerConfig brokerConfig;
    
        private volatile boolean shutdown = true;
        // 检查点
        private StoreCheckpoint storeCheckpoint;
    
        private AtomicLong printTimes = new AtomicLong(0);
        //转发comitlog日志,主要是从commitlog转发到consumeQueue、commitlog、 index。
        private final LinkedList<CommitLogDispatcher> dispatcherList;
        //数据存储的文件夹下有一个lock文件
        private RandomAccessFile lockFile;
        //启动后给文件上锁
        private FileLock lock;
    
        boolean shutDownNormal = false;
    }
    
    • commitLog:处理消息存储

    • IndexService:索引服务

    MappedFile

    public class MappedFile extends ReferenceResource {
    
        /**
         * 操作系统每页大小,默认4k
         */
        public static final int OS_PAGE_SIZE = 1024 * 4;
        protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
        /**
         * 注意这里是static变量,由所有实例对象共享
         *
         * @see MappedFile#init(java.lang.String, int, org.apache.rocketmq.store.TransientStorePool)
         * 每次创建文件都会把文件大小累加到这里,注意是long类型
         * 2**63=9223372036854775808,为19位,而mappedfile文件名长度为20位
         */
        private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
        /**
         * MappedFile对象个数,也即commitLog文件个数
         */
        private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
        /**
         * 当前文件的写指针位置
         */
        protected final AtomicInteger wrotePosition = new AtomicInteger(0);
        /**
         * 当前文件的写指针位置
         */
        protected final AtomicInteger committedPosition = new AtomicInteger(0);
        /**
         * 数据已刷盘的指针位置
         */
        private final AtomicInteger flushedPosition = new AtomicInteger(0);
        /**
         * MappedFile文件大小,默认1G
         */
        protected int fileSize;
        /**
         * 文件的NIO通道
         */
        protected FileChannel fileChannel;
        /**
         * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
         * 如果开启了TransientStorePool,会启用TransientStorePool,负责创建堆外内存池,写数据时,
         * 数据会先写入TransientStorePool创建的(DirectBuffer)ByteBuffer
         *
         * @see TransientStorePool
         */
        protected ByteBuffer writeBuffer = null;
        /**
         * 堆外内存池
         */
        protected TransientStorePool transientStorePool = null;
        /**
         * 文件名长度位20,对应当前文件的第一个数据索引大小
         */
        private String fileName;
        /**
         * 文件名对应的偏移量
         */
        private long fileFromOffset;
        /**
         * 物理文件
         */
        private File file;
        /**
         * 未开始TransientStorePool时,直接由File创建MappedByteBuffer内存映射
         */
        private MappedByteBuffer mappedByteBuffer;
        /**
         *  文件的最后写入时间
         */
        private volatile long storeTimestamp = 0;
        /**
         * 是否时MappeFileQueue队列的第一个文件
         */
        private boolean firstCreateInQueue = false;
    }
    
    • 如果开启TransientStorePool,数据先写入writeBuffer,否则写入mappedByteBuffer对应数据文件的内存映射。
    • wrotePosition指向写入数据的位置

    CommitLog写入和读取示例

    public class MessageStoreTest {
    
        @ImportantField
        private String storePathRootDir = System.getProperty("user.dir") + File.separator + "store";
    
        //The directory in which the commitlog is kept
        @ImportantField
        private String storePathCommitLog = System.getProperty("user.dir") + File.separator + "store"
            + File.separator + "commitlog";
        private int QUEUE_TOTAL = 100;
        private AtomicInteger QueueId = new AtomicInteger(0);
        private SocketAddress BornHost;
        private SocketAddress StoreHost;
        private byte[] MessageBody;
        private MessageStore messageStore;
    
        @Before
        public void init() throws Exception {
            StoreHost    = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
            BornHost     = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
            messageStore = buildMessageStore();
            boolean load = messageStore.load();
            assertTrue(load);
            messageStore.start();
        }
    
        @Test
        public void testMessageWrite() throws Exception {
            final MessageExtBrokerInner msgExtInner = new MessageExtBrokerInner();
            msgExtInner.setTopic("MsgWriteTopic");
            msgExtInner.setTags("Tag1");
    //        msgExtInner.setKeys("key1");
            msgExtInner.setKeys(String.valueOf(System.currentTimeMillis()));
            msgExtInner.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
            msgExtInner.setSysFlag(MessageSysFlag.TRANSACTION_NOT_TYPE);
            msgExtInner.setBornTimestamp(System.currentTimeMillis());
            msgExtInner.setStoreHost(StoreHost);
            msgExtInner.setBornHost(BornHost);
    //        msgExtInner.setDelayTimeLevel(1);
            for (int i = 0; i < 10; i++) {
                msgExtInner.setBody(("hello world!!!" + i).getBytes());
                msgExtInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExtInner.getProperties()));
                messageStore.putMessage(msgExtInner);
            }
            //
            StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
            StoreTestUtil.flushConsumeIndex((DefaultMessageStore) messageStore);
        }
    
        @Test
        public void testMsgRead() {
            //这里的offset相当于是消息的索引(自增id),先取indexFile找到index对应到commitlog的offset
            //找到对应的文件,计算在文件具体的postion,并读取数据
    //        final String schedule_topic_xxxx = "SCHEDULE_TOPIC_XXXX";
            final GetMessageResult message = messageStore.getMessage("FirstGroup", "MsgWriteTopic", 0, 2, 1024,
                                                                     null);
            final List<ByteBuffer> messageBufferList = message.getMessageBufferList();
            System.out.println(messageBufferList.size());
            for (ByteBuffer byteBuffer : messageBufferList) {
                final MessageExt msg = MessageDecoder.decode(byteBuffer);
                System.out.println(msg.getTags());
                System.out.println(msg.getKeys());
                System.out.println(new String(msg.getBody()));
            }
        }
    
        private MessageStore buildMessageStore() throws Exception {
            MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
            //存储路径
            messageStoreConfig.setStorePathCommitLog(storePathCommitLog);
            messageStoreConfig.setStorePathRootDir(storePathRootDir);
            //文件大小
            messageStoreConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10);
            messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024 * 10);
            messageStoreConfig.setMaxHashSlotNum(10000);
            messageStoreConfig.setMaxIndexNum(100 * 100);
            //异步刷盘
            messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
            messageStoreConfig.setFlushIntervalConsumeQueue(1);
            return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"),
                                           new MyMessageArrivingListener(), new BrokerConfig());
        }
    
        private class MyMessageArrivingListener implements MessageArrivingListener {
    
            @Override
            public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
                                 byte[] filterBitMap, Map<String, String> properties) {
                System.out.println(topic + "==" + queueId);
            }
        }
    }
    
    

    CommitLog写入消息图示:

    根据类型以及方法名应该大致能知道具体是做什么的,提几点需要注意的:

    • MapperFile中有两个ByteBuffer分别是:writeBuffer和mappedByteBuffer,如果启用堆外内存,则使用transientStorePool生成writeBuffer,否则直接使用文件映射到堆内存的mappedByteBuffer。
    • MapperFile最后写入数据由AppendMessageCallback的实现类完成,即图示中的doAppend方法,会将MessageExtBrokerInner消息编码为CommitLog的格式写入上述的ByteBuffer中
    • 最后handlerDiskFlush()方法根据flushDiskType刷盘类型判断是否为异步刷盘。如果没有开启堆外内存,写入数据就是文件通过内存映射的mappedByteBuffer,那么在写操作时,其实以及将数据写入磁盘了。而开启了堆外内存,则写入的是transientStorePool生成writeBuffer,此时刷盘操作就是将wirtebuffer中的内容通过nio写入磁盘。todo
    • 主从服务handleHA
    • 这里仅仅是CommitLog的数据写入,还没有包括IndexFile和ConsumerQueue数据的写入

    CommitLog读取消息图示

    核心流程:根据逻辑offset(即第几笔)去取ComsumerQueue获取topic和queueid对应存储的数据,从上文可知包含消息再commitlog中的物理offset和消息长度,从而获取到commitlog中存储的具体消息,消息过滤,返回结果。消息过滤较复杂,单独分析。

    CommitLogDispatcher

    在DefaultMessageStore的构造方法里,会初始化创建ReputMessageService,从名称就可以知道用于再次处理接受到的消息的。

    在ReputMessageService维护reputFromOffset,即已处理的消息对应CommitLog的偏移量。ReputMessageService继承与ServiceThread本身就是一个线程,线程启动后会一直请求CommitLog获取CommitLog中的confirmOffset,并将reputFromOffset之后的消息封装成DispatchRequest,通过DefaultMessageStore#doDispatch分发给CommitLogDispatcher实现类处理。

    CommitLogDispatcher实现类包括CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex,核心逻辑就是按照将消息按上文提及的数据结构写入ComsumerQueue和IndexFile

    ReputMessageService核心代如下:

    class ReputMessageService extends ServiceThread {
        private void doReput() {
            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
                if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                    && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                    break;
                }
                /*
                * 这里将reputFromOffset传递给CommitLog,获取CommitLog在reputFromOffset处存储的消息。
                */
                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                if (result != null) {
                    try {
                        this.reputFromOffset = result.getStartOffset();
    
                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                            DispatchRequest dispatchRequest =
                                DefaultMessageStore.this.commitLog
                                .checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                            int size = dispatchRequest.getBufferSize() == -1 ?
                                dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
    
                            if (dispatchRequest.isSuccess()) {
                                if (size > 0) {
                                    // 分发到所有的CommitLogDispatcher实现类处理
                                    // 这里是CommitLogDispatcherBuildConsumeQueue
                                    // 以及CommitLogDispatcherBuildIndex
                                    DefaultMessageStore.this.doDispatch(dispatchRequest);
                                    //省略messageArrivingListener监听消息的处理
                                    // 更新reputFromOffset,设置为下次需要拉取的消息在CommitLog中的偏移。
                                    this.reputFromOffset += size;
                                    readSize += size;
                                    
                                    //省略storeStatsService统计的代码
                                } else if (size == 0) {//todo size不同的含义
                                    this.reputFromOffset = DefaultMessageStore.this.commitLog
                                        .rollNextFile(this.reputFromOffset);
                                    readSize             = result.getSize();
                                }
                            } else if (!dispatchRequest.isSuccess()) {
                                //省略请求不成功的处理:跳过这个范围的消息,继续循环
                                // 这里日志记录为bug,应该是理论上不会进入的
                            }
                        }
                    } finally {
                        result.release();
                    }
                } else {
                    doNext = false;
                }
            }
        }
    }
    

    RocketMQ 消息过滤流程

    SQL 匹配

    在发送消息的时候,可以为每一条消息附带一个或者多个属性值,SQL 匹配指的就是依据这些属性值TAG 标签是否满足一定的 SQL 语句条件,来过滤消息。用户如果想要开启 SQL 匹配,那么需要在 Broker 启动的时候,启用如下几个配置信息:

    brokerConfig.setEnablePropertyFilter(true);
    brokerConfig.setEnableCalcFilterBitMap(true);
    
    messageStoreConfig.setEnableConsumeQueueExt(true);
    

    资料

    1. 官方文档
    2. 源码分析RocketMQ之消费队列、Index索引文件存储结构与存储机制-上篇
    3. RocketMQ 消息过滤流程
  • 相关阅读:
    信息安全系统设计基础实验三报告
    信息安全系统设计基础第十二周学习总结
    信息安全系统设计基础实验二报告
    信息安全系统设计基础第十一周学习总结
    家庭作业汇总
    信息安全系统设计基础实验一报告
    信息安全系统设计基础第十周学习总结
    第十章家庭作业
    20182319彭淼迪第一周学习总结
    java预备作业
  • 原文地址:https://www.cnblogs.com/froggengo/p/15055261.html
Copyright © 2011-2022 走看看