zoukankan      html  css  js  c++  java
  • consumeQueue 如何知道最大的 offset

    consumeQueue 如何知道最大的 offset,或者说启动一个 broker 后,从哪个地方开始写入新的数据?

    consumeQueue 文件固定 20 字节存储一个 entry,新建一个 consumeQueue 文件,只有写入 entry 的地方有值,其他地方是 0 字节。

    1. 调用 fileChannel.map 函数,可以指定文件大小,同时文件空白的地方以 0 字节填充

    this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
    this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);

    2. broker 启动时检查 consumeQueue 文件

    // org.apache.rocketmq.store.DefaultMessageStore#recoverConsumeQueue
    // org.apache.rocketmq.store.ConsumeQueue#recover
    public void recover() {
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
    
            int index = mappedFiles.size() - 3;
            if (index < 0)
                index = 0;
    
            int mappedFileSizeLogics = this.mappedFileSize;
            MappedFile mappedFile = mappedFiles.get(index);
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            // 文件名即当前文件开始的物理偏移量
            long processOffset = mappedFile.getFileFromOffset();
            // 当前文件的 entry 的总字节数
            long mappedFileOffset = 0;
            long maxExtAddr = 1;
            while (true) {
                for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
                    long offset = byteBuffer.getLong();
                    int size = byteBuffer.getInt();
                    long tagsCode = byteBuffer.getLong();
    
                    if (offset >= 0 && size > 0) {
                        // 发现一个 entry
                        mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
                        this.maxPhysicOffset = offset + size;
                        if (isExtAddr(tagsCode)) {
                            maxExtAddr = tagsCode;
                        }
                    } else {
                        log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "
                            + offset + " " + size + " " + tagsCode);
                        break;
                    }
                }
    
                if (mappedFileOffset == mappedFileSizeLogics) {
                    index++;
                    if (index >= mappedFiles.size()) {
    
                        log.info("recover last consume queue file over, last mapped file "
                            + mappedFile.getFileName());
                        break;
                    } else {
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next consume queue file, " + mappedFile.getFileName());
                    }
                } else {
                    log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
                        + (processOffset + mappedFileOffset));
                    break;
                }
            }
    
            // 计算总的 entry 的位移
            processOffset += mappedFileOffset;
            this.mappedFileQueue.setFlushedWhere(processOffset);
            // 后续写入的开始点
            this.mappedFileQueue.setCommittedWhere(processOffset);
            this.mappedFileQueue.truncateDirtyFiles(processOffset);
    
            if (isExtReadEnable()) {
                this.consumeQueueExt.recover();
                log.info("Truncate consume queue extend file by max {}", maxExtAddr);
                this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
            }
        }
    }
  • 相关阅读:
    Linux内核之数据双链表
    程序员必读:Linux内存管理剖析
    大型网站系统架构演化之路
    高流量站点NGINX与PHP-fpm配置优化
    LVS负载均衡集群服务搭建详解(二)
    LVS负载均衡集群服务搭建详解(一)
    安装 openSUSE Leap 42.1 之后要做的 8 件事
    【Linux基础】VI命令模式下删除拷贝与粘贴
    【Linux基础】VI命令模式下大小写转换
    【Linux基础】VI 编辑器基本使用方法
  • 原文地址:https://www.cnblogs.com/allenwas3/p/12205000.html
Copyright © 2011-2022 走看看