zoukankan      html  css  js  c++  java
  • consumeQueue 如何知道最大的 offset

    consumeQueue 如何知道最大的 offset,或者说启动一个 broker 后,从哪个地方开始写入新的数据?

    consumeQueue 文件固定 20 字节存储一个 entry,新建一个 consumeQueue 文件,只有写入 entry 的地方有值,其他地方是 0 字节。

    1. 调用 fileChannel.map 函数,可以指定文件大小,同时文件空白的地方以 0 字节填充

    this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
    this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);

    2. broker 启动时检查 consumeQueue 文件

    // org.apache.rocketmq.store.DefaultMessageStore#recoverConsumeQueue
    // org.apache.rocketmq.store.ConsumeQueue#recover
    public void recover() {
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
    
            int index = mappedFiles.size() - 3;
            if (index < 0)
                index = 0;
    
            int mappedFileSizeLogics = this.mappedFileSize;
            MappedFile mappedFile = mappedFiles.get(index);
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            // 文件名即当前文件开始的物理偏移量
            long processOffset = mappedFile.getFileFromOffset();
            // 当前文件的 entry 的总字节数
            long mappedFileOffset = 0;
            long maxExtAddr = 1;
            while (true) {
                for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
                    long offset = byteBuffer.getLong();
                    int size = byteBuffer.getInt();
                    long tagsCode = byteBuffer.getLong();
    
                    if (offset >= 0 && size > 0) {
                        // 发现一个 entry
                        mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
                        this.maxPhysicOffset = offset + size;
                        if (isExtAddr(tagsCode)) {
                            maxExtAddr = tagsCode;
                        }
                    } else {
                        log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "
                            + offset + " " + size + " " + tagsCode);
                        break;
                    }
                }
    
                if (mappedFileOffset == mappedFileSizeLogics) {
                    index++;
                    if (index >= mappedFiles.size()) {
    
                        log.info("recover last consume queue file over, last mapped file "
                            + mappedFile.getFileName());
                        break;
                    } else {
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next consume queue file, " + mappedFile.getFileName());
                    }
                } else {
                    log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
                        + (processOffset + mappedFileOffset));
                    break;
                }
            }
    
            // 计算总的 entry 的位移
            processOffset += mappedFileOffset;
            this.mappedFileQueue.setFlushedWhere(processOffset);
            // 后续写入的开始点
            this.mappedFileQueue.setCommittedWhere(processOffset);
            this.mappedFileQueue.truncateDirtyFiles(processOffset);
    
            if (isExtReadEnable()) {
                this.consumeQueueExt.recover();
                log.info("Truncate consume queue extend file by max {}", maxExtAddr);
                this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
            }
        }
    }
  • 相关阅读:
    数据库主从同步相关问题
    前端使用node.js的http-server开启一个本地服务器
    css中height 100vh的应用场景,动态高度百分比布局,浏览器视区大小单位
    通过浏览器F12开发工具快速获取别的网站前端代码的方法
    vue打包app嵌入h5,区分app进入和android,ios显示不同的下载链接
    vue实现验证码倒计时60秒的具体代码
    vue用hbuilderX打包app嵌入h5方式云打包和遇到的问题
    Cookie写不进去问题深入调查 https Secure Cookie
    vue配置手机通过IP访问电脑开发环境
    区块链名词解析:ICO、IFO、IEO和IMO,分别是什么呢?
  • 原文地址:https://www.cnblogs.com/allenwas3/p/12205000.html
Copyright © 2011-2022 走看看