zoukankan      html  css  js  c++  java
  • 消息队列(五) ---RocketMQ-消息存储3

    问题:

    • consumeQueue 如何工作
    • 刷盘机制如何工作

    概述

    该节我们将学习 consumeQueue 如何工作,先来看一下消息发送的大概过程。 1297993-20191103145932334-361316700.jpg 而为什么需要 consumeQueue 的存在呢?我们只有一个 commitLog 文件,那么假如需要查找某个主题下的消息,那么我们不得不遍历整个commmitLog 来完成查询,consumeQueue 的存在方便的消息的查询获取。首先来看一下 consumeQueue 文件的结构,其中 TopicTest 和 TopicTest1 是 topic ,里面的编号0~3表示该主题下的4个队列。

    ├─TopicTest
    │  ├─0
    │  ├─1
    │  ├─2
    │  └─3
    └─TopicTest1
        ├─0
        ├─1
        ├─2
        └─3
    
    

    ConsumeQueue 相关

    DefaultMessageStore在初始化时开启了一个线程,该线程的作用是**(当生产者新生产一条消息)更新消息到 consumeQueue 中去** ,然后consumeQueue在回刷回去磁盘持久化。 可以看到这里分为两部分。

    我们看一下这个线程是在哪里启动的

        public void start() throws Exception {
            if (this.messageStore != null) {
                this.messageStore.start();
            }
    
            ...
    
        }
    
    

    DefaultMessageStore#start 方法

            //No.3 开启 reputMessageService
            if (this.getMessageStoreConfig().isDuplicationEnable()) {
                this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
            } else {
                this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
            }
            this.reputMessageService.start();
    
    
    
    

    reputMessageService#run 方法

            @Override
            public void run() {
                DefaultMessageStore.log.info(this.getServiceName() + " service started");
    
                while (!this.isStopped()) {
                    try {
                        Thread.sleep(1);
                        //一直执行 doReput 方法 
                        this.doReput();
                    } catch (Exception e) {
                        DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                    }
                }
    
                DefaultMessageStore.log.info(this.getServiceName() + " service end");
            }
    
    

    下面总结来自参考文章,侵删 doReput做了以下几件事 1:获取CommitLog中存储的新消息。

    SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
    

    reputFromOffset记录了本次需要拉取的消息在CommitLog中的偏移。这里将reputFromOffset传递给CommitLog,获取CommitLog在reputFromOffset处存储的消息。

    2:如果第一步获取的消息不为空,则表明有新消息被存储到CommitLog中,此时便会通知ConsumeQueue更新消息偏移。

    DispatchRequest dispatchRequest =
                                    DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
    ......
    

    DefaultMessageStore.this.doDispatch(dispatchRequest);

    3:更新reputFromOffset,设置为下次需要拉取的消息在CommitLog中的偏移。

    this.reputFromOffset = result.getStartOffset();
    ......
    int size = dispatchRequest.getMsgSize();
    ......
    this.reputFromOffset += size;
    
    

    上面的重点在第二步中,这里调用 DefaultMessageStore.this.doDispatch(dispatchRequest) 来通知ConsumeQueue。

    DefaultMessageStore中存储了一个dispatcherList,其中存放了几个CommitLogDispatcher对象,它们都是用来监听CommitLog中新消息存储的。

    this.dispatcherList = new LinkedList<>();
    //用于分发给 ConsumeQueue 
    this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
    //用于分发给 Index 
    this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
    
    

    其中第二步比较重要,这里我们这里也可以知道 commitLog 的分发器有两个,一个用于 ConsumeQueue ,另外一个用于 Index

        class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
    
            @Override
            public void dispatch(DispatchRequest request) {
                final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
                switch (tranType) {
                    case MessageSysFlag.TRANSACTION_NOT_TYPE:
                    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                        DefaultMessageStore.this.putMessagePositionInfo(request);
                        break;
                    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                        break;
                }
            }
        }
    
    
        public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
            //找到对应的 ConsumeQueue
            ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
            //调用ConsumeQueue的存储逻辑
            cq.putMessagePositionInfoWrapper(dispatchRequest);
        }
    
    
        // 存储体的封装
        public void putMessagePositionInfoWrapper(DispatchRequest request) {
            final int maxRetries = 30;
            boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
            for (int i = 0; i < maxRetries && canWrite; i++) {
                long tagsCode = request.getTagsCode();
                if (isExtWriteEnable()) {
                    ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                    cqExtUnit.setFilterBitMap(request.getBitMap());
                    cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
                    cqExtUnit.setTagsCode(request.getTagsCode());
    
                    long extAddr = this.consumeQueueExt.put(cqExtUnit);
                    if (isExtAddr(extAddr)) {
                        tagsCode = extAddr;
                    } else {
                        log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
                            topic, queueId, request.getCommitLogOffset());
                    }
                }
                //存储操作
                boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                    request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
                if (result) {
                    this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
                    return;
                } else {
                    // XXX: warn and notify me
                    log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
                        + " failed, retry " + i + " times");
    
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        log.warn("", e);
                    }
                }
            }
    
            // XXX: warn and notify me
            log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
            this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
        }
    
        //存储操作
        private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
            final long cqOffset) {
    
            if (offset <= this.maxPhysicOffset) {
                return true;
            }
    
            this.byteBufferIndex.flip();
            this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    
            //offset -> size -> tagsCode 
            this.byteBufferIndex.putLong(offset);
            this.byteBufferIndex.putInt(size);
            this.byteBufferIndex.putLong(tagsCode);
    
            final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
    
            //找到对应的 MappedFile 
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
            if (mappedFile != null) {
    
                if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
                    this.minLogicOffset = expectLogicOffset;
                    this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
                    this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
                    this.fillPreBlank(mappedFile, expectLogicOffset);
                    log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                        + mappedFile.getWrotePosition());
                }
    
                if (cqOffset != 0) {
                    long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
    
                    if (expectLogicOffset < currentLogicOffset) {
                        log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                            expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                        return true;
                    }
    
                    if (expectLogicOffset != currentLogicOffset) {
                        LOG_ERROR.warn(
                            "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                            expectLogicOffset,
                            currentLogicOffset,
                            this.topic,
                            this.queueId,
                            expectLogicOffset - currentLogicOffset
                        );
                    }
                }
                this.maxPhysicOffset = offset;
                //调用 MappedFile 直接加入 
                return mappedFile.appendMessage(this.byteBufferIndex.array());
            }
            return false;
        }
     
    

    从上面我们可以知道 consumeQueue 的存储底层使用的是 mapfile 。

    异步刷盘

    刷盘的方式分为异步和同步,rocketmq 默认是异步刷盘,根据我们前面的分析,消息最后是落地的,那么刷盘的最终调用应该会是 mappfile 的flush 方法 。 1297993-20191026150313974-477241547.png

    异步刷盘

    异步刷盘相关的服务是FlushRealTimeService ,位于 CommitLog 这个类里面。 FlushRealTimeService 的 run 方法 。

            public void run() {
                CommitLog.log.info(this.getServiceName() + " service started");
    
                while (!this.isStopped()) {
                    
                    boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
    
                    int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
                    int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
    
                    int flushPhysicQueueThoroughInterval =
                            CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
    
                    boolean printFlushProgress = false;
    
                    // Print flush progress
                    long currentTimeMillis = System.currentTimeMillis();
                    if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                        this.lastFlushTimestamp = currentTimeMillis;
                        flushPhysicQueueLeastPages = 0;
                        printFlushProgress = (printTimes++ % 10) == 0;
                    }
    
                    // 前面的都是获刷盘时间
                    try {
                        if (flushCommitLogTimed) {
                            Thread.sleep(interval);
                        } else {
                            this.waitForRunning(interval);
                        }
    
                        if (printFlushProgress) {
                            this.printFlushProgress();
                        }
    
                        long begin = System.currentTimeMillis();
                        // 核心逻辑
                        CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                        long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                        if (storeTimestamp > 0) {
                            CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                        }
                        long past = System.currentTimeMillis() - begin;
                        if (past > 500) {
                            log.info("Flush data to disk costs {} ms", past);
                        }
                    } catch (Throwable e) {
                        CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                        this.printFlushProgress();
                    }
                }
    
                // Normal shutdown, to ensure that all the flush before exit
                boolean result = false;
                for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
                    result = CommitLog.this.mappedFileQueue.flush(0);
                    CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
                }
    
                this.printFlushProgress();
    
                CommitLog.log.info(this.getServiceName() + " service end");
            }
    
    

    同步刷盘

    同步刷盘使用的是 GroupCommitService

    总结

    参考文章

    • https://zhuanlan.zhihu.com/p/59516998
    • https://zhuanlan.zhihu.com/p/58728454
    • https://juejin.im/post/5d3f00aaf265da03e1685097 (深入分析)
  • 相关阅读:
    状压dp学习笔记
    dbcc log(转)
    日志分析 操作(转)
    Sql server 2005系统表详细说明
    c#读取并分析sqlServer据库日志(转)
    [POJ]1915 Knight Moves
    [JOY]1143 飘飘乎居士的约会
    [POJ]1164 The Castle
    win7注册ocx时出现对DllRegisterServer的调用失败,错误代码为0x80040200解决方案
    注册DLL、OCX的方法
  • 原文地址:https://www.cnblogs.com/Benjious/p/11893270.html
Copyright © 2011-2022 走看看