问题:
- consumeQueue 如何工作
- 刷盘机制如何工作
概述
该节我们将学习 consumeQueue 如何工作,先来看一下消息发送的大概过程。 而为什么需要 consumeQueue 的存在呢?我们只有一个 commitLog 文件,那么假如需要查找某个主题下的消息,那么我们不得不遍历整个commmitLog 来完成查询,consumeQueue 的存在方便的消息的查询获取。首先来看一下 consumeQueue 文件的结构,其中 TopicTest 和 TopicTest1 是 topic ,里面的编号0~3表示该主题下的4个队列。
├─TopicTest │ ├─0 │ ├─1 │ ├─2 │ └─3 └─TopicTest1 ├─0 ├─1 ├─2 └─3
ConsumeQueue 相关
DefaultMessageStore在初始化时开启了一个线程,该线程的作用是**(当生产者新生产一条消息)更新消息到 consumeQueue 中去** ,然后consumeQueue在回刷回去磁盘持久化。 可以看到这里分为两部分。
我们看一下这个线程是在哪里启动的
public void start() throws Exception { if (this.messageStore != null) { this.messageStore.start(); } ... }
DefaultMessageStore#start 方法
//No.3 开启 reputMessageService if (this.getMessageStoreConfig().isDuplicationEnable()) { this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset()); } else { this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset()); } this.reputMessageService.start();
reputMessageService#run 方法
@Override public void run() { DefaultMessageStore.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { Thread.sleep(1); //一直执行 doReput 方法 this.doReput(); } catch (Exception e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } } DefaultMessageStore.log.info(this.getServiceName() + " service end"); }
下面总结来自参考文章,侵删 doReput做了以下几件事 1:获取CommitLog中存储的新消息。
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
reputFromOffset记录了本次需要拉取的消息在CommitLog中的偏移。这里将reputFromOffset传递给CommitLog,获取CommitLog在reputFromOffset处存储的消息。
2:如果第一步获取的消息不为空,则表明有新消息被存储到CommitLog中,此时便会通知ConsumeQueue更新消息偏移。
DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); ......
DefaultMessageStore.this.doDispatch(dispatchRequest);
3:更新reputFromOffset,设置为下次需要拉取的消息在CommitLog中的偏移。
this.reputFromOffset = result.getStartOffset(); ...... int size = dispatchRequest.getMsgSize(); ...... this.reputFromOffset += size;
上面的重点在第二步中,这里调用 DefaultMessageStore.this.doDispatch(dispatchRequest) 来通知ConsumeQueue。
DefaultMessageStore中存储了一个dispatcherList,其中存放了几个CommitLogDispatcher对象,它们都是用来监听CommitLog中新消息存储的。
this.dispatcherList = new LinkedList<>(); //用于分发给 ConsumeQueue this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); //用于分发给 Index this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
其中第二步比较重要,这里我们这里也可以知道 commitLog 的分发器有两个,一个用于 ConsumeQueue ,另外一个用于 Index
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) { final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag()); switch (tranType) { case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: DefaultMessageStore.this.putMessagePositionInfo(request); break; case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; } } } public void putMessagePositionInfo(DispatchRequest dispatchRequest) { //找到对应的 ConsumeQueue ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId()); //调用ConsumeQueue的存储逻辑 cq.putMessagePositionInfoWrapper(dispatchRequest); }
// 存储体的封装 public void putMessagePositionInfoWrapper(DispatchRequest request) { final int maxRetries = 30; boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable(); for (int i = 0; i < maxRetries && canWrite; i++) { long tagsCode = request.getTagsCode(); if (isExtWriteEnable()) { ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); cqExtUnit.setFilterBitMap(request.getBitMap()); cqExtUnit.setMsgStoreTime(request.getStoreTimestamp()); cqExtUnit.setTagsCode(request.getTagsCode()); long extAddr = this.consumeQueueExt.put(cqExtUnit); if (isExtAddr(extAddr)) { tagsCode = extAddr; } else { log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit, topic, queueId, request.getCommitLogOffset()); } } //存储操作 boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(), request.getMsgSize(), tagsCode, request.getConsumeQueueOffset()); if (result) { this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp()); return; } else { // XXX: warn and notify me log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset() + " failed, retry " + i + " times"); try { Thread.sleep(1000); } catch (InterruptedException e) { log.warn("", e); } } } // XXX: warn and notify me log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId); this.defaultMessageStore.getRunningFlags().makeLogicsQueueError(); } //存储操作 private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) { if (offset <= this.maxPhysicOffset) { return true; } this.byteBufferIndex.flip(); this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); //offset -> size -> tagsCode this.byteBufferIndex.putLong(offset); this.byteBufferIndex.putInt(size); this.byteBufferIndex.putLong(tagsCode); final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE; //找到对应的 MappedFile MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset); if (mappedFile != null) { if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { this.minLogicOffset = expectLogicOffset; this.mappedFileQueue.setFlushedWhere(expectLogicOffset); this.mappedFileQueue.setCommittedWhere(expectLogicOffset); this.fillPreBlank(mappedFile, expectLogicOffset); log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " " + mappedFile.getWrotePosition()); } if (cqOffset != 0) { long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset(); if (expectLogicOffset < currentLogicOffset) { log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset); return true; } if (expectLogicOffset != currentLogicOffset) { LOG_ERROR.warn( "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset ); } } this.maxPhysicOffset = offset; //调用 MappedFile 直接加入 return mappedFile.appendMessage(this.byteBufferIndex.array()); } return false; }
从上面我们可以知道 consumeQueue 的存储底层使用的是 mapfile 。
异步刷盘
刷盘的方式分为异步和同步,rocketmq 默认是异步刷盘,根据我们前面的分析,消息最后是落地的,那么刷盘的最终调用应该会是 mappfile 的flush 方法 。
异步刷盘
异步刷盘相关的服务是FlushRealTimeService ,位于 CommitLog 这个类里面。 FlushRealTimeService 的 run 方法 。
public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); boolean printFlushProgress = false; // Print flush progress long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; flushPhysicQueueLeastPages = 0; printFlushProgress = (printTimes++ % 10) == 0; } // 前面的都是获刷盘时间 try { if (flushCommitLogTimed) { Thread.sleep(interval); } else { this.waitForRunning(interval); } if (printFlushProgress) { this.printFlushProgress(); } long begin = System.currentTimeMillis(); // 核心逻辑 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } long past = System.currentTimeMillis() - begin; if (past > 500) { log.info("Flush data to disk costs {} ms", past); } } catch (Throwable e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); this.printFlushProgress(); } } // Normal shutdown, to ensure that all the flush before exit boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.flush(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } this.printFlushProgress(); CommitLog.log.info(this.getServiceName() + " service end"); }
同步刷盘
同步刷盘使用的是 GroupCommitService
总结
参考文章
- https://zhuanlan.zhihu.com/p/59516998
- https://zhuanlan.zhihu.com/p/58728454
- https://juejin.im/post/5d3f00aaf265da03e1685097 (深入分析)