zoukankan      html  css  js  c++  java
  • consumeQueue 如何知道最大的 offset

    consumeQueue 如何知道最大的 offset,或者说启动一个 broker 后,从哪个地方开始写入新的数据?

    consumeQueue 文件固定 20 字节存储一个 entry,新建一个 consumeQueue 文件,只有写入 entry 的地方有值,其他地方是 0 字节。

    1. 调用 fileChannel.map 函数,可以指定文件大小,同时文件空白的地方以 0 字节填充

    this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
    this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);

    2. broker 启动时检查 consumeQueue 文件

    // org.apache.rocketmq.store.DefaultMessageStore#recoverConsumeQueue
    // org.apache.rocketmq.store.ConsumeQueue#recover
    public void recover() {
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
    
            int index = mappedFiles.size() - 3;
            if (index < 0)
                index = 0;
    
            int mappedFileSizeLogics = this.mappedFileSize;
            MappedFile mappedFile = mappedFiles.get(index);
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            // 文件名即当前文件开始的物理偏移量
            long processOffset = mappedFile.getFileFromOffset();
            // 当前文件的 entry 的总字节数
            long mappedFileOffset = 0;
            long maxExtAddr = 1;
            while (true) {
                for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
                    long offset = byteBuffer.getLong();
                    int size = byteBuffer.getInt();
                    long tagsCode = byteBuffer.getLong();
    
                    if (offset >= 0 && size > 0) {
                        // 发现一个 entry
                        mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
                        this.maxPhysicOffset = offset + size;
                        if (isExtAddr(tagsCode)) {
                            maxExtAddr = tagsCode;
                        }
                    } else {
                        log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "
                            + offset + " " + size + " " + tagsCode);
                        break;
                    }
                }
    
                if (mappedFileOffset == mappedFileSizeLogics) {
                    index++;
                    if (index >= mappedFiles.size()) {
    
                        log.info("recover last consume queue file over, last mapped file "
                            + mappedFile.getFileName());
                        break;
                    } else {
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next consume queue file, " + mappedFile.getFileName());
                    }
                } else {
                    log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
                        + (processOffset + mappedFileOffset));
                    break;
                }
            }
    
            // 计算总的 entry 的位移
            processOffset += mappedFileOffset;
            this.mappedFileQueue.setFlushedWhere(processOffset);
            // 后续写入的开始点
            this.mappedFileQueue.setCommittedWhere(processOffset);
            this.mappedFileQueue.truncateDirtyFiles(processOffset);
    
            if (isExtReadEnable()) {
                this.consumeQueueExt.recover();
                log.info("Truncate consume queue extend file by max {}", maxExtAddr);
                this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
            }
        }
    }
  • 相关阅读:
    hdu 4578 线段树 ****
    hdu 4576 概率dp **
    hdu 4622 **
    vue中保存和获取cookie,读写cookie以及设置有效时间等,使用js-cookie
    go语言 strconv.ParseInt 的例子
    【golang】unsafe.Sizeof浅析
    Golang 漫谈之channel妙法
    总结了才知道,原来channel有这么多用法!
    字符集之在UTF-8中,一个汉字为什么需要三个字节?
    什么是Bitmap
  • 原文地址:https://www.cnblogs.com/allenwas3/p/12205000.html
Copyright © 2011-2022 走看看