zoukankan      html  css  js  c++  java
  • rocketmq刷盘过程

     本文基于rocketmq4.0版本,结合CommitlLog的刷盘过程,对消息队列的刷盘过程源码进行分析,进而对RocketMQ的刷盘原理和过程进行了解。
     
    rocketmq 4.0版本中刷盘类型和以前的版本一样有两种:
    public enum FlushDiskType {
        // 同步刷盘
        SYNC_FLUSH,
        // 异步刷盘    
        ASYNC_FLUSH
    }
     
    刷盘方式有三种:
    线程服务 场景 写消息性能
    CommitRealTimeService 异步刷盘 && 开启内存字节缓冲区 第一
    FlushRealTimeService 异步刷盘  第二
    GroupCommitService 同步刷盘 第三

    其中CommitRealTimeService是老一些版本中没有的,它为开启内存字节缓存的刷盘服务。

    介绍各个线程工作之前,先需要重点了解一下waitForRunning方法,因为在三个刷盘服务线程中都频繁使用该方法:

    protected void waitForRunning(long interval) {
            if (hasNotified.compareAndSet(true, false)) {
                this.onWaitEnd();
                return;
            }
            //entry to wait
            waitPoint.reset();
    
            try {
                waitPoint.await(interval, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                log.error("Interrupted", e);
            } finally {
                hasNotified.set(false);
                this.onWaitEnd();
            }
        }
    这里要注意一下 waitPoint 这个共享变量,它是CountDownLatch2类型,,没有细看CountDownLatch2的原理,猜测它和CountDownLatch类似,根据CountDownLatch的使用原理,大致可以猜测waitPoint的作用。
     
    回顾一下CountDownLatch相关知识:
    CountDownLatch能够使一个线程等待其他线程完成各自的工作后再执行自己的任务,CountDownLatch是通过一个计数器来实现的,计数器的初始值为需要等待的线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
     
    因此,可以猜测waitForRunning的业务逻辑大致为:
    (1). 通过闭锁没执行依次waitPoint.countDown(),当计数器值到达0时,结束阻塞状态,开始执行等待线程的任务;
    (2). 等待一定时间之后,结束阻塞状态,开始执行等待线程的任务。
     
    在rocketmq4.0版本中,调用了waitPoint.countDown()的地方有三处:
    shutdown()
    stop()
    wakeup()
    

     这里我们关心的是wakeup()方法,调用wakeup方法的几处如下

    其中与commitLog刷盘相关的有:
    service.wakeup()、flushCommitLogService.wakeup()、commitLogService.wakeup(),其中service.wakeup()的service是GroupCommitService类型。
     
    由此引入了本文所要讲述的FlushRealTimeService、CommitRealTimeService以及GroupCommitService三个线程刷盘服务。
     
     
    GroupCommitService
    broker启动后,会启动许多服务线程,包括刷盘服务线程,如果刷盘服务线程类型是SYNC_FLUSH (同步刷盘类型:对写入的数据同步刷盘,只在broker==master时使用),则开启GroupCommitService服务,该服务线程启动后每隔10毫秒或该线程调用了wakeup()方法后停止阻塞,执行doCommit()方法。doCommit里面执行具体的刷盘逻辑业务。GroupCommitService服务线程体如下:
     
    public void run() {
                CommitLog.log.info(this.getServiceName() + " service started");
    
                while (!this.isStopped()) {
                    try {
                        this.waitForRunning(10);
                        this.doCommit();
                    } catch (Exception e) {
                        CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                    }
                }
    
                // Under normal circumstances shutdown, wait for the arrival of the
                // request, and then flush
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    CommitLog.log.warn("GroupCommitService Exception, ", e);
                }
    
                synchronized (this) {
                    this.swapRequests();
                }
    
                this.doCommit();
    
                CommitLog.log.info(this.getServiceName() + " service end");
            }
     
    当broker类型为master时,每写入一条消息成功写入mapedFile文件后,调用handleDiskFlush方法,如果该消息满足messageExt.isWaitStoreMsgOK(),则将这一条成功写入的消息生成GroupCommitRequest对象,将该对像放入GroupCommitService的requestsWrite列表中(List<GroupCommitRequest>),等待刷盘线程调用doCommit,对列表中的消息进行刷盘,doCommit中每对一个request处理完成后,会调用wakeupCustomer。等待时间5s后或者request的countDownLatch记数为0时,则将这条消息是否已经刷盘成功进行汇报,如果没有刷盘成功,则再日志中记录错误,并将putMessageResult设置为FLUSH_DISK_TIMEOUT。代码如下:
     
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
                final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
                if (messageExt.isWaitStoreMsgOK()) {
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                    service.putRequest(request);
                    boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    if (!flushOK) {
                        log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                            + " client address: " + messageExt.getBornHostString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                    }
                } else {
                    service.wakeup();
                }
    }
    

    这条消息是否已经刷盘成功进行汇报的逻辑 -- waitForFlush方法:

    public static class GroupCommitRequest {
            private final long nextOffset;
            private final CountDownLatch countDownLatch = new CountDownLatch(1);
            private volatile boolean flushOK = false;
    
            public GroupCommitRequest(long nextOffset) {
                this.nextOffset = nextOffset;
            }
    
            public long getNextOffset() {
                return nextOffset;
            }
    
            public void wakeupCustomer(final boolean flushOK) {
                this.flushOK = flushOK;
                this.countDownLatch.countDown();
            }
    
            public boolean waitForFlush(long timeout) {
                try {
     // 阻塞当前工作线程,等待时间5s后或者countDownLatch记数为0时,停止阻塞,执行下一条语句
                    this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
                    return this.flushOK;
                } catch (InterruptedException e) {
                    log.error("Interrupted", e);
                    return false;
                }
            }
    }
    对GroupCommitRequest类中的两个方法的说明:
    在waitForFlush方法阻塞的时候,doCommit方法对写入requestsWrite列表中(List<GroupCommitRequest>)所有GroupCommitRequest对象依次进行了 wakeupCustomer方法调用,wakeupCustomer调用后,countDownLatch 闭锁记数减一,等待时间5s后或者countDownLatch记数为0时,返回调用wakeupCustomer的GroupCommitRequest对应的消息的刷盘结果。
     
    GroupCommitService的doCommit方法
    说明一下:分析doCommit方法之前,先提及一下swapRequests这个方法,之前提过,GroupCommitService服务线程该每隔10毫秒或调用了该线程的wakeup()方法后执行doCommit()方法,具体地要涉及到waitForRunning方法,waitForRunning方法中onWaitEnd的作用在这里就可以提及一下了,它的作用就是将requestsWrite 转换为requestsRead ,这个与消息存储过程中处理dispatchRequest是类似的。
     private void swapRequests() {
            List<GroupCommitRequest> tmp = this.requestsWrite;
            this.requestsWrite = this.requestsRead;
            this.requestsRead = tmp;
    }
     doCommit代码:
    private void doCommit() {
        //
                synchronized (this.requestsRead) {
                    if (!this.requestsRead.isEmpty()) {
                        for (GroupCommitRequest req : this.requestsRead) {
                            // There may be a message in the next file, so a maximum of
                            // two times the flush
                            boolean flushOK = false;
                            for (int i = 0; i < 2 && !flushOK; i++) {
                                flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
    
                                if (!flushOK) {
                                    CommitLog.this.mappedFileQueue.flush(0);
                                }
                            }
    
                            req.wakeupCustomer(flushOK);
                        }
    
                        long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                        if (storeTimestamp > 0) {
                            CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                        }
    
                        this.requestsRead.clear();
                    } else {
                        // Because of individual messages is set to not sync flush, it
                        // will come to this process
                        CommitLog.this.mappedFileQueue.flush(0);
                    }
                }
    }
     
    其中,具体的刷盘核心代码:CommitLog.this.mappedFileQueue.flush(0); 接下来看其他两个刷盘服务线程,对CommitLog.this.mappedFileQueue.flush(0)下文将具体讲解。
     
     
     
    FlushRealTimeService
    // 刷新策略(默认是实时刷盘)
    flushCommitLogTimed
    // 刷盘时间间隔(默认0.5s)
    interval = flushIntervalCommitLog
    // 每次刷盘至少需要多少个page(默认是4个)
    flushPhysicQueueLeastPages
    // 彻底刷盘间隔时间(默认10s)
    flushPhysicQueueThoroughInterval
     
    大致逻辑:
    -- 如果 当前时间 >(最后一次刷盘时间 + 彻底刷盘间隔时间(10s)),则将最新一次刷盘时间更新为当前时间
     
    -- 如果是实时刷盘,每隔一定时间间隔,该线程休眠500毫秒
    如果不是实时刷盘,则调用waitForRunning,即每隔500毫秒或该刷盘服务线程调用了wakeup()方法之后结束阻塞。
     
    -- 调用 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
     
     
    CommitRealTimeService
    CommitRealTimeService 比较特殊,它会包含提交和异步刷盘逻辑,专门为开启内存字节缓冲区的刷盘服务。
    // 提交到FileChannel的时间间隔,只在TransientStorePool 打开的情况下使用,默认0.2s
    interva l= commitIntervalCommitLog
    //每次提交到File至少需要多少个page(默认是4个)
    commitDataLeastPages = commitCommitLogLeastPages
    / 提交完成间隔时间(默认0.2s)
    commitDataThoroughInterval
     
    大致逻辑:
    如果 当前时间 >(最后一次提交时间 + 提交完成间隔时间),更新lastCommitTimestamp之后,执行提交的核心逻辑:
    boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
    如果result == false 则意味着有新的数据 committed,此时需要wakeup刷盘线程,即:
    flushCommitLogService.wakeup(); 进行异步刷盘处理。
     
    可知道,刷盘的下一层核心逻辑:
    mappedFileQueue.flush
    mappedFileQueue.commit
     
      flush
     public boolean flush(final int flushLeastPages) {
            boolean result = true;
            MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, false);
            if (mappedFile != null) {
                long tmpTimeStamp = mappedFile.getStoreTimestamp();
                int offset = mappedFile.flush(flushLeastPages);
                long where = mappedFile.getFileFromOffset() + offset;
                result = where == this.flushedWhere;
                this.flushedWhere = where;
                if (0 == flushLeastPages) {
                    this.storeTimestamp = tmpTimeStamp;
                }
            }
    
            return result;
        }
    

      commit

    public boolean commit(final int commitLeastPages) {
            boolean result = true;
            MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, false);
            if (mappedFile != null) {
                int offset = mappedFile.commit(commitLeastPages);
                long where = mappedFile.getFileFromOffset() + offset;
                result = where == this.committedWhere;
                this.committedWhere = where;
            }
    
            return result;
        }
     
    从上代码可以看出,刷盘过程与MappedFile有很大关系,通过findMappedFileByOffset方法找到要刷盘的MappedFile,然后MappedFile中采用数据刷盘技术将数据刷入到磁盘

    MappedFile的刷盘方式有两种:
    1. 写入内存字节缓冲区(writeBuffer) ----> 从内存字节缓冲区(write buffer)提交(commit)到文件通道(fileChannel) ----> 文件通道(fileChannel)flush到磁盘
    2.写入映射文件字节缓冲区(mappedByteBuffer) ----> 映射文件字节缓冲区(mappedByteBuffer)flush
     
    (MappedFile的刷盘方式待具体分析,待补充...)
     
     
  • 相关阅读:
    jqurey技术总结
    ie浏览器兼容问题小结
    FIS的合并压缩技术
    对js中数组的一些总结
    浅谈如何面向对象进行封装
    13th week blog
    12th week blog
    11th week blog
    10th week blog
    9th Week blog
  • 原文地址:https://www.cnblogs.com/chenjunjie12321/p/8331740.html
Copyright © 2011-2022 走看看