consumeQueue 如何知道最大的 offset,或者说启动一个 broker 后,从哪个地方开始写入新的数据?
consumeQueue 文件固定 20 字节存储一个 entry,新建一个 consumeQueue 文件,只有写入 entry 的地方有值,其他地方是 0 字节。
1. 调用 fileChannel.map 函数,可以指定文件大小,同时文件空白的地方以 0 字节填充
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
2. broker 启动时检查 consumeQueue 文件
// org.apache.rocketmq.store.DefaultMessageStore#recoverConsumeQueue // org.apache.rocketmq.store.ConsumeQueue#recover public void recover() { final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { int index = mappedFiles.size() - 3; if (index < 0) index = 0; int mappedFileSizeLogics = this.mappedFileSize; MappedFile mappedFile = mappedFiles.get(index); ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); // 文件名即当前文件开始的物理偏移量 long processOffset = mappedFile.getFileFromOffset(); // 当前文件的 entry 的总字节数 long mappedFileOffset = 0; long maxExtAddr = 1; while (true) { for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) { long offset = byteBuffer.getLong(); int size = byteBuffer.getInt(); long tagsCode = byteBuffer.getLong(); if (offset >= 0 && size > 0) { // 发现一个 entry mappedFileOffset = i + CQ_STORE_UNIT_SIZE; this.maxPhysicOffset = offset + size; if (isExtAddr(tagsCode)) { maxExtAddr = tagsCode; } } else { log.info("recover current consume queue file over, " + mappedFile.getFileName() + " " + offset + " " + size + " " + tagsCode); break; } } if (mappedFileOffset == mappedFileSizeLogics) { index++; if (index >= mappedFiles.size()) { log.info("recover last consume queue file over, last mapped file " + mappedFile.getFileName()); break; } else { mappedFile = mappedFiles.get(index); byteBuffer = mappedFile.sliceByteBuffer(); processOffset = mappedFile.getFileFromOffset(); mappedFileOffset = 0; log.info("recover next consume queue file, " + mappedFile.getFileName()); } } else { log.info("recover current consume queue queue over " + mappedFile.getFileName() + " " + (processOffset + mappedFileOffset)); break; } } // 计算总的 entry 的位移 processOffset += mappedFileOffset; this.mappedFileQueue.setFlushedWhere(processOffset); // 后续写入的开始点 this.mappedFileQueue.setCommittedWhere(processOffset); this.mappedFileQueue.truncateDirtyFiles(processOffset); if (isExtReadEnable()) { this.consumeQueueExt.recover(); log.info("Truncate consume queue extend file by max {}", maxExtAddr); this.consumeQueueExt.truncateByMaxAddress(maxExtAddr); } } }