在上篇RocketMQ源码解析之Broker消息存储(消息存储)中分析了消息如何从Broker存储到MappedFile内存缓冲区,但还没有存储到文件中,即还没有刷盘,本篇将介绍RocketMQ如何进行刷盘的
RocketMQ 的读写时基于NIO的内存映射机制的,进行消息存储时先将消息追加到MappedFile,然后根据不同的刷盘策略进行刷盘。同步刷盘是先将消息追加到内存后,将同步调用MappedByteBuffer的force()方法,同步等待刷盘结果,进行刷盘结果返回;异步刷盘在将消息追加到内存后,不等待刷盘结果立即将刷盘成功的消息返回给消息发送端;
CommitLog#submitFlushRequest 分别针对同步和异步情况进行刷盘
/**
* 刷盘
*
* @param result
* @param messageExt
* @return
*/
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
/**
* flushCommitLogService 在CommitLog构造函数中实例化,按照同步和异步分别实例化对应的子类
*/
// Synchronization flush 同步刷盘
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// GroupCommitService 同步刷盘服务类
/**将请求被提交给GroupCommitService后,GroupCommitService并不是立即处理,而是
* 先放到内部的一个请求队列中
* */
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; // 刷盘操作,
if (messageExt.isWaitStoreMsgOK()) { // 进行同步刷盘
// 数据准备
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
/**
* 将数据对象放到requestWrite中
*/
service.putRequest(request);
// 利用future模式的原理,阻塞等待
return request.future();
} else {
service.wakeup();
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
// Asynchronous flush 异步刷盘
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
- 第13行—第29行是针对同步刷盘,第32行—第36行是针对异步刷盘,同步刷盘的服务类是
GroupCommitService
;异步刷盘服务类则是FlushRealTimeService
CommitLog在初始化的时候,会根据配置,启动两种不同的刷盘服务:
public CommitLog(final DefaultMessageStore defaultMessageStore) {
......
// 如果是同步刷盘
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
// 异步刷盘
this.flushCommitLogService = new FlushRealTimeService();
}
......
}
GroupCommitService#putRequest 将刷盘任务放入到requestsWrite中,
requestsWrite:写队列,主要用于向该线程中添加刷盘任务
requestsRead:读队列,主要用于执行特定的刷盘任务,读写分离
/**
* GroupCommitRequest刷盘任务放入到 requestWrite 中,就返回了
*
* @param request
*/
public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) { // 写队列
this.requestsWrite.add(request);
}
/**
* 设置成true,然后计数器-1 ,对应的run方法才会进行数据交换
*/
this.wakeup();
}
public void wakeup() {
/**
* ** 设置成true,然后计数器-1 ,对应的run方法才会进行数据交换
*/
if (hasNotified.compareAndSet(false, true)) {
// waitPoint 是利用CountDownLatch自定义的CountDownLatch2对象
waitPoint.countDown(); // notify
}
}
当GroupCommitService 被唤醒后,便会将requestsWrite 中的请求交换到requestsRead中,避免产生锁竞争,doCommit函数会遍历requestsRead的请求进行处理
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
/**
* 一个线程一直的处理同步刷写任务,
*/
this.waitForRunning(10);
/**
* 真正的刷盘逻辑
*/
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// Under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
// 每处理一个循环后等待 10 毫秒,
// 一旦新任务到达,立即唤醒执行任务
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
synchronized (this) {
this.swapRequests();
}
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");
}
private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
for (int i = 0; i < 2 && !flushOK; i++) {
/**
* 执行刷盘操作
*/
CommitLog.this.mappedFileQueue.flush(0);
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
}
// 唤醒用户线程
req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
// 保存刷盘检测点的commitLog 文件刷盘时间
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}
}
- 第49行是具体的刷盘操作,详情解析见
MappedFileQueue#flush
FlushRealTimeService 异步刷盘
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
/**
* 周期性地将内存缓冲区中的内容刷到文件中
*/
while (!this.isStopped()) {
// 主要是休眠策略,下面会使用这个,true则使用Thread.sleep,false则使用waitForRunning,默认false
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
// interval:获取刷盘的间隔时间,默认500ms
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
// flushPhysicQueueLeastPages:每次刷盘最少需要刷新的页,每页大小为4k,默认每次要刷新4页
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
// 两次刷写之间的最大时间间隔,默认10s
int flushPhysicQueueThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
boolean printFlushProgress = false;
// Print flush progress
long currentTimeMillis = System.currentTimeMillis();
/**
* flushPhysicQueueThoroughInterval:两次刷写之间的最大时间间隔
* 如果当前时间距离上次刷盘时间超出设置的两次刷盘最大间隔
* 则改变 flushPhysicQueueLeastPages =0,并每10次输出异常刷新进度
* flushPhysicQueueLeastPages:每次刷盘最少需要刷新的页
*/
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
this.lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;
printFlushProgress = (printTimes++ % 10) == 0;
}
try {
/**
* 根据不同的休眠策略,进行休眠等待
* 两者区别是啥-2021/10/19
*/
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
this.waitForRunning(interval);
}
if (printFlushProgress) {
this.printFlushProgress();
}
long begin = System.currentTimeMillis();
// 调用刷盘操作
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
/**
* 设置检测点的StoreCheckpoint 的physicMsgTimestamp(commitlog文件的检测点,也就是记录最新刷盘的时间戳)
*/
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
if (past > 500) {
log.info("Flush data to disk costs {} ms", past);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
}
// Normal shutdown, to ensure that all the flush before exit
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.flush(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
this.printFlushProgress();
CommitLog.log.info(this.getServiceName() + " service end");
}
MappedFileQueue#flush 刷盘具体实现
/**
* 刷盘具体实现
*
* @param flushLeastPages
* @return
*/
public boolean flush(final int flushLeastPages) {
boolean result = true;
/**
* 根据上次刷新的位置,得到当前的MappedFile对象
*/
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
int offset = mappedFile.flush(flushLeastPages); // 执行刷新,返回当前刷新后的位置
long where = mappedFile.getFileFromOffset() + offset; // 更新刷新的位置
result = where == this.flushedWhere;
this.flushedWhere = where;
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}
return result;
}
- 第12行是根据上次刷盘的位置,重新得到当前的MappedFile对象,详情解析见
MappedFileQueue#findMappedFileByOffset
- 第15行—第18行,执行刷盘,并更新刷盘位置,详情解析见
MappedFile#flush
MappedFileQueue#findMappedFileByOffset
/**
* Finds a mapped file by offset.
* 根据 committedWhere 找到需要刷盘的 MappedFile 文件
* -2021/10/19
*
* @param offset Offset.
* @param returnFirstOnNotFound If the mapped file is not found, then return the first one.
* @return Mapped file or null (when not found and returnFirstOnNotFound is <code>false</code>).
*/
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
try {
/**
* 此处得到的第一个MappedFile的文件起始offset,
* 不一定是0,之前的文件可能已经被清除了
*/
MappedFile firstMappedFile = this.getFirstMappedFile();
/**
* 获取队列中最后一个映射文件
*/
MappedFile lastMappedFile = this.getLastMappedFile();
if (firstMappedFile != null && lastMappedFile != null) {
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
offset,
firstMappedFile.getFileFromOffset(),
lastMappedFile.getFileFromOffset() + this.mappedFileSize,
this.mappedFileSize,
this.mappedFiles.size());
} else {
/**
* 找到映射文件在队列中的索引位置
*/
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
MappedFile targetFile = null;
try {
/**
* 获取索引文件大小
*/
targetFile = this.mappedFiles.get(index);
} catch (Exception ignored) {
}
// offset在目标文件的起始offset和结束offset范围内
if (targetFile != null && offset >= targetFile.getFileFromOffset()
&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
return targetFile;
}
// 如果按索引在队列中找不到映射文件就遍历队列查找映射文件
for (MappedFile tmpMappedFile : this.mappedFiles) {
if (offset >= tmpMappedFile.getFileFromOffset()
&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
return tmpMappedFile;
}
}
}
if (returnFirstOnNotFound) {
return firstMappedFile;
}
}
} catch (Exception e) {
log.error("findMappedFileByOffset Exception", e);
}
return null;
}
MappedFile#flush
/**
* @return The current flushed position 返回当前刷新的位置
* 就是调用FileChannel或者mappedByteBuffer的force方法
*/
public int flush(final int flushLeastPages) {
// 判断是否满足刷盘条件:即当前剩余未刷盘内容长度是否超过最小刷盘长度
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
int value = getReadPosition();
try {
//We only append data to fileChannel or mappedByteBuffer, never both.
// 如果writeBuffer不为空,说明开启了transientStorePoolEnable,
// 消息会先写入堆外内存,然后提交到PageCache并最终刷写到磁盘,
// 则表明消息是先提交到writeBuffer中的
// (如果writeBuffer不为空,则在调用MappedFile(,,transientStorePool)#init时就已经初始化writeBuffer了)
// 已经将消息从writeBuffer提交到fileChannel,直接调用fileChannel.force()
if (writeBuffer != null || this.fileChannel.position() != 0) {
// 刷盘 NIO
this.fileChannel.force(false);
} else {
// 反之,消息是直接存储在文件映射缓存区MappedByteBuffer中,直接调用它的force()即可
// 刷盘 NIO
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}
this.flushedPosition.set(value);
this.release();
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
// 当前刷新到磁盘的指针
return this.getFlushedPosition();
}
- 第7行:判断当前剩余未刷盘内容长度是否超过最小刷盘长度,是否满足刷盘条件
- 第18行至第24行,如果开启了transientStorePoolEnable模式,消息会先写入堆外内存,然后提交到PageCache并最终刷写到磁盘.如果消息是直接存储在文件映射缓存区MappedByteBuffer中,直接调用它的force()即可
- 第38行,返回当前刷新到磁盘的指针
FlushRealTimeService(异步不使用暂存池刷盘) 和 CommitRealTimeService(异步使用暂存池刷盘)的区别?
TransientStorePool的作用
TransientStorePool 相当于在内存层面做了读写分离,写走内存磁盘,读走pagecache,同时最大程度消除了page cache的锁竞争,降低了毛刺。它还使用了锁机制,避免直接内存被交换到swap分区
参考:https://github.com/apache/rocketmq/issues/2466