zoukankan      html  css  js  c++  java
  • RocketMQ源码解析之Broker消息存储(刷盘机制)

      在上篇RocketMQ源码解析之Broker消息存储(消息存储)中分析了消息如何从Broker存储到MappedFile内存缓冲区,但还没有存储到文件中,即还没有刷盘,本篇将介绍RocketMQ如何进行刷盘的
      RocketMQ 的读写时基于NIO的内存映射机制的,进行消息存储时先将消息追加到MappedFile,然后根据不同的刷盘策略进行刷盘。同步刷盘是先将消息追加到内存后,将同步调用MappedByteBuffer的force()方法,同步等待刷盘结果,进行刷盘结果返回;异步刷盘在将消息追加到内存后,不等待刷盘结果立即将刷盘成功的消息返回给消息发送端;

    CommitLog#submitFlushRequest 分别针对同步和异步情况进行刷盘
    /**
     * 刷盘
     *
     * @param result
     * @param messageExt
     * @return
     */
    public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
        /**
         * flushCommitLogService 在CommitLog构造函数中实例化,按照同步和异步分别实例化对应的子类
         */
        // Synchronization flush 同步刷盘
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            // GroupCommitService 同步刷盘服务类
            /**将请求被提交给GroupCommitService后,GroupCommitService并不是立即处理,而是
             * 先放到内部的一个请求队列中
             * */
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; // 刷盘操作,
            if (messageExt.isWaitStoreMsgOK()) {    // 进行同步刷盘
                // 数据准备
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                        this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                /**
                 * 将数据对象放到requestWrite中
                 */
                service.putRequest(request);
                // 利用future模式的原理,阻塞等待
                return request.future();
            } else {
                service.wakeup();
                return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
            }
        }
        // Asynchronous flush 异步刷盘
        else {
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                flushCommitLogService.wakeup();
            } else {
                commitLogService.wakeup();
            }
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }
    
    • 第13行—第29行是针对同步刷盘,第32行—第36行是针对异步刷盘,同步刷盘的服务类是GroupCommitService ;异步刷盘服务类则是FlushRealTimeService

    CommitLog在初始化的时候,会根据配置,启动两种不同的刷盘服务:

    public CommitLog(final DefaultMessageStore defaultMessageStore) {
    	......
        // 如果是同步刷盘
        if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            this.flushCommitLogService = new GroupCommitService();
        } else {
            // 异步刷盘
            this.flushCommitLogService = new FlushRealTimeService();
        }
        ......
    }
    
    GroupCommitService#putRequest 将刷盘任务放入到requestsWrite中,

    requestsWrite:写队列,主要用于向该线程中添加刷盘任务

    requestsRead:读队列,主要用于执行特定的刷盘任务,读写分离

    /**
     * GroupCommitRequest刷盘任务放入到 requestWrite 中,就返回了
     *
     * @param request
     */
    public synchronized void putRequest(final GroupCommitRequest request) {
        synchronized (this.requestsWrite) { // 写队列
            this.requestsWrite.add(request);
        }
        /**
         * 设置成true,然后计数器-1 ,对应的run方法才会进行数据交换
         */
        this.wakeup();
    }
    
    public void wakeup() {
        /**
         * ** 设置成true,然后计数器-1 ,对应的run方法才会进行数据交换
         */
        if (hasNotified.compareAndSet(false, true)) {
            // waitPoint 是利用CountDownLatch自定义的CountDownLatch2对象
            waitPoint.countDown(); // notify
        }
    }
    

    GroupCommitService 被唤醒后,便会将requestsWrite 中的请求交换到requestsRead中,避免产生锁竞争,doCommit函数会遍历requestsRead的请求进行处理

    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");
    
        while (!this.isStopped()) {
            try {
                /**
                 *  一个线程一直的处理同步刷写任务,
                 */
                this.waitForRunning(10);
                /**
                 * 真正的刷盘逻辑
                 */
                this.doCommit();
            } catch (Exception e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
    
        // Under normal circumstances shutdown, wait for the arrival of the
        // request, and then flush
        // 每处理一个循环后等待 10 毫秒,
        // 一旦新任务到达,立即唤醒执行任务
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            CommitLog.log.warn("GroupCommitService Exception, ", e);
        }
    
        synchronized (this) {
            this.swapRequests();
        }
    
        this.doCommit();
    
        CommitLog.log.info(this.getServiceName() + " service end");
    }
    
    private void doCommit() {
        synchronized (this.requestsRead) {
            if (!this.requestsRead.isEmpty()) {
                for (GroupCommitRequest req : this.requestsRead) {
                    // There may be a message in the next file, so a maximum of
                    // two times the flush
                    boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                    for (int i = 0; i < 2 && !flushOK; i++) {
                        /**
                         * 执行刷盘操作
                         */
                        CommitLog.this.mappedFileQueue.flush(0);
                        flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                    }
                    // 唤醒用户线程
                    req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
    
                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                if (storeTimestamp > 0) {
                    // 保存刷盘检测点的commitLog 文件刷盘时间
                                 CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
    
                this.requestsRead.clear();
            } else {
                // Because of individual messages is set to not sync flush, it
                // will come to this process
                CommitLog.this.mappedFileQueue.flush(0);
            }
        }
    }
    
    • 第49行是具体的刷盘操作,详情解析见MappedFileQueue#flush
    FlushRealTimeService 异步刷盘
    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");
    
        /**
         * 周期性地将内存缓冲区中的内容刷到文件中
         */
        while (!this.isStopped()) {
            // 主要是休眠策略,下面会使用这个,true则使用Thread.sleep,false则使用waitForRunning,默认false
            boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
    
            // interval:获取刷盘的间隔时间,默认500ms
            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
            // flushPhysicQueueLeastPages:每次刷盘最少需要刷新的页,每页大小为4k,默认每次要刷新4页
            int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
            // 两次刷写之间的最大时间间隔,默认10s
            int flushPhysicQueueThoroughInterval =
                    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
    
            boolean printFlushProgress = false;
    
            // Print flush progress
            long currentTimeMillis = System.currentTimeMillis();
            /**
             * flushPhysicQueueThoroughInterval:两次刷写之间的最大时间间隔
             * 如果当前时间距离上次刷盘时间超出设置的两次刷盘最大间隔
             * 则改变 flushPhysicQueueLeastPages =0,并每10次输出异常刷新进度
             * flushPhysicQueueLeastPages:每次刷盘最少需要刷新的页
             */
            if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                this.lastFlushTimestamp = currentTimeMillis;
                flushPhysicQueueLeastPages = 0;
                printFlushProgress = (printTimes++ % 10) == 0;
            }
    
            try {
                /**
                 * 根据不同的休眠策略,进行休眠等待
                 * 两者区别是啥-2021/10/19
                 */
                if (flushCommitLogTimed) {
                    Thread.sleep(interval);
                } else {
                    this.waitForRunning(interval);
                }
    
                if (printFlushProgress) {
                    this.printFlushProgress();
                }
    
                long begin = System.currentTimeMillis();
                // 调用刷盘操作
                CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                if (storeTimestamp > 0) {
                    /**
                     * 设置检测点的StoreCheckpoint 的physicMsgTimestamp(commitlog文件的检测点,也就是记录最新刷盘的时间戳)
                     */
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
                long past = System.currentTimeMillis() - begin;
                if (past > 500) {
                    log.info("Flush data to disk costs {} ms", past);
                }
            } catch (Throwable e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                this.printFlushProgress();
            }
        }
    
        // Normal shutdown, to ensure that all the flush before exit
        boolean result = false;
        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
            result = CommitLog.this.mappedFileQueue.flush(0);
            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
        }
    
        this.printFlushProgress();
    
        CommitLog.log.info(this.getServiceName() + " service end");
    }
    
    MappedFileQueue#flush 刷盘具体实现
    /**
     * 刷盘具体实现
     *
     * @param flushLeastPages
     * @return
     */
    public boolean flush(final int flushLeastPages) {
        boolean result = true;
        /**
         * 根据上次刷新的位置,得到当前的MappedFile对象
         */
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            int offset = mappedFile.flush(flushLeastPages);     // 执行刷新,返回当前刷新后的位置
            long where = mappedFile.getFileFromOffset() + offset;   // 更新刷新的位置
            result = where == this.flushedWhere;
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }
    
        return result;
    }
    
    • 第12行是根据上次刷盘的位置,重新得到当前的MappedFile对象,详情解析见MappedFileQueue#findMappedFileByOffset
    • 第15行—第18行,执行刷盘,并更新刷盘位置,详情解析见MappedFile#flush
    MappedFileQueue#findMappedFileByOffset
    /**
     * Finds a mapped file by offset.
     * 根据 committedWhere 找到需要刷盘的 MappedFile 文件
     * -2021/10/19
     *
     * @param offset                Offset.
     * @param returnFirstOnNotFound If the mapped file is not found, then return the first one.
     * @return Mapped file or null (when not found and returnFirstOnNotFound is <code>false</code>).
     */
    public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
        try {
            /**
             * 此处得到的第一个MappedFile的文件起始offset,
             * 不一定是0,之前的文件可能已经被清除了
             */
            MappedFile firstMappedFile = this.getFirstMappedFile();
            /**
             * 获取队列中最后一个映射文件
             */
            MappedFile lastMappedFile = this.getLastMappedFile();
            if (firstMappedFile != null && lastMappedFile != null) {
                if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                    LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
                            offset,
                            firstMappedFile.getFileFromOffset(),
                            lastMappedFile.getFileFromOffset() + this.mappedFileSize,
                            this.mappedFileSize,
                            this.mappedFiles.size());
                } else {
                    /**
                     * 找到映射文件在队列中的索引位置
                     */
                    int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                    MappedFile targetFile = null;
                    try {
                        /**
                         * 获取索引文件大小
                         */
                        targetFile = this.mappedFiles.get(index);
                    } catch (Exception ignored) {
                    }
                    // offset在目标文件的起始offset和结束offset范围内
                    if (targetFile != null && offset >= targetFile.getFileFromOffset()
                            && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                        return targetFile;
                    }
                    // 如果按索引在队列中找不到映射文件就遍历队列查找映射文件
                    for (MappedFile tmpMappedFile : this.mappedFiles) {
                        if (offset >= tmpMappedFile.getFileFromOffset()
                                && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                            return tmpMappedFile;
                        }
                    }
                }
    
                if (returnFirstOnNotFound) {
                    return firstMappedFile;
                }
            }
        } catch (Exception e) {
            log.error("findMappedFileByOffset Exception", e);
        }
    
        return null;
    }
    
    MappedFile#flush
    /**
     * @return The current flushed position 返回当前刷新的位置
     * 就是调用FileChannel或者mappedByteBuffer的force方法
     */
    public int flush(final int flushLeastPages) {
        // 判断是否满足刷盘条件:即当前剩余未刷盘内容长度是否超过最小刷盘长度
        if (this.isAbleToFlush(flushLeastPages)) {
            if (this.hold()) {
                int value = getReadPosition();
    
                try {
                    //We only append data to fileChannel or mappedByteBuffer, never both.
                    // 如果writeBuffer不为空,说明开启了transientStorePoolEnable,
                    // 消息会先写入堆外内存,然后提交到PageCache并最终刷写到磁盘,
                    // 则表明消息是先提交到writeBuffer中的
                    // (如果writeBuffer不为空,则在调用MappedFile(,,transientStorePool)#init时就已经初始化writeBuffer了)
                    // 已经将消息从writeBuffer提交到fileChannel,直接调用fileChannel.force()
                    if (writeBuffer != null || this.fileChannel.position() != 0) {
                        // 刷盘 NIO
                        this.fileChannel.force(false);
                    } else {
                        // 反之,消息是直接存储在文件映射缓存区MappedByteBuffer中,直接调用它的force()即可
                        // 刷盘 NIO
                        this.mappedByteBuffer.force();
                    }
                } catch (Throwable e) {
                    log.error("Error occurred when force data to disk.", e);
                }
    
                this.flushedPosition.set(value);
                this.release();
            } else {
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition());
            }
        }
        // 当前刷新到磁盘的指针
        return this.getFlushedPosition();
    }
    
    • 第7行:判断当前剩余未刷盘内容长度是否超过最小刷盘长度,是否满足刷盘条件
    • 第18行至第24行,如果开启了transientStorePoolEnable模式,消息会先写入堆外内存,然后提交到PageCache并最终刷写到磁盘.如果消息是直接存储在文件映射缓存区MappedByteBuffer中,直接调用它的force()即可
    • 第38行,返回当前刷新到磁盘的指针
    FlushRealTimeService(异步不使用暂存池刷盘) 和 CommitRealTimeService(异步使用暂存池刷盘)的区别?
    TransientStorePool的作用

    TransientStorePool 相当于在内存层面做了读写分离,写走内存磁盘,读走pagecache,同时最大程度消除了page cache的锁竞争,降低了毛刺。它还使用了锁机制,避免直接内存被交换到swap分区
    参考:https://github.com/apache/rocketmq/issues/2466

    FileChannel.force VS MappedByteBuffer.force区别
  • 相关阅读:
    【LOJ】#3034. 「JOISC 2019 Day2」两道料理
    vue学习笔记(七)组件
    vue学习笔记(五)条件渲染和列表渲染
    vue学习笔记(一)入门
    JavaScript学习总结之函数
    JavaScript学习总结之对象的深拷贝和浅拷贝
    javascript学习总结之Object.assign()方法详解
    ES6学习总结之变量的解构赋值
    ES6学习总结之let和const命令
    javascript学习总结之Object.keys()方法详解
  • 原文地址:https://www.cnblogs.com/fyusac/p/15357263.html
Copyright © 2011-2022 走看看