zoukankan      html  css  js  c++  java
  • RocketMQ-存储机制-刷盘机制

    RocketMQ-存储机制-刷盘机制

    在理解RocketMQ刷盘实现之前,先理解一下上图展示的刷盘的2种实现的:

    1)直接通过内存映射文件,通过flush刷新到磁盘

    2)当异步刷盘且启用了对外内存池的时候,先write到writeBuffer,然后commit到Filechannel,最后flush到磁盘

    另外输盘的方式分为异步刷盘 同步刷盘 异步转存刷盘方式。

    初始化过程如下

    if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
                this.flushCommitLogService = new GroupCommitService();  //同步
            } else {
                this.flushCommitLogService = new FlushRealTimeService(); // 异步
            }
    
            this.commitLogService = new CommitRealTimeService();   异步转存

    在commitlog的putMessage方法最后handleDiskFlush,处理了刷盘的操作.

    public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
            // Synchronization flush
            if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
                final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
                if (messageExt.isWaitStoreMsgOK()) {
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                    // 提交同步刷盘请求
                    service.putRequest(request);
                    //  同步等待
                    boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    if (!flushOK) {
                        log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                            + " client address: " + messageExt.getBornHostString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                    }
                } else { // 唤醒输盘操作
                    service.wakeup();
                }
            }
            // Asynchronous flush   异步刷盘
            else {
                if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                    flushCommitLogService.wakeup();   //异步
                } else {
                    commitLogService.wakeup();  // 异步转存
                }
            }
        }

    同步刷盘GroupCommitService

    调用putRequest之后,实际上,会放到一个写容器中,如果当前不在处理,那就唤醒同步刷盘线程立即处理

    public synchronized void putRequest(final GroupCommitRequest request) {
                synchronized (this.requestsWrite) {
                    this.requestsWrite.add(request);
                }
                if (hasNotified.compareAndSet(false, true)) {
                    waitPoint.countDown(); // notify
                }
            }

    而此时的同步刷盘线程,如果正好检测到有请求过来就会立即执行任务,如果处在等待状态,则被唤醒,等待处理完,又把通知状态设置为false。

    protected void waitForRunning(long interval) {
            // 如果有刷盘请求  则立即返回
            if (hasNotified.compareAndSet(true, false)) {
                this.onWaitEnd();
                return;
            }
    
            //entry to wait
            waitPoint.reset();
    
            try {
                // 如果没有输盘请求  await  直到被唤醒
                // 唤醒之后处理do_commit 且当前hasNotified 已设置成true
                waitPoint.await(interval, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                log.error("Interrupted", e);
            } finally {
                // 最后通知状态设置成false
                hasNotified.set(false);
                this.onWaitEnd();
            }
        }

    实际上具体的刷盘逻辑是在doCommit方法中。因为采用的是读写分离的方式,所以在每次执行刷盘逻辑之前,都会交互读写容器中的数据。

    private void swapRequests() {
                List<GroupCommitRequest> tmp = this.requestsWrite;
                this.requestsWrite = this.requestsRead;
                this.requestsRead = tmp;
            }

    对于刷盘有没有成功的判断是这样的,每一次flush,都会记录好flush的位置,如果发现当前已经flush的位置已经超过了请求flush的位置,那就说明已经刷新成功,而这个过程运行重试2次。刷新完成之后便通知用户

            private void doCommit() {
                synchronized (this.requestsRead) {
                    if (!this.requestsRead.isEmpty()) {
                        for (GroupCommitRequest req : this.requestsRead) {
                            // There may be a message in the next file, so a maximum of
                            // two times the flush
                            boolean flushOK = false;
                            // 确认有没有刷盘成功
                            for (int i = 0; i < 2 && !flushOK; i++) {
                                flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
    
                                if (!flushOK) {
                                    CommitLog.this.mappedFileQueue.flush(0);
                                }
                            }
                            // 通知请求客户端,返回刷盘结果
                            req.wakeupCustomer(flushOK);
                        }
    
                        long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                        if (storeTimestamp > 0) {
                            // 记录checkoutpoint
                            CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                        }
    
                        this.requestsRead.clear();
                    } else {
                        // Because of individual messages is set to not sync flush, it
                        // will come to this process
                        CommitLog.this.mappedFileQueue.flush(0);
                    }
                }
            }

    在深入看一下CommitLog.this.mappedFileQueue.flush(0);中的 mappedFile.flush(flushLeastPages);,实际上就是根据当前flushwhere的点位来找到对应的MappendFile,然后flush当前MapedFile中的writeBuffer或者mappedByteBuffer中的数据到自盘

        public int flush(final int flushLeastPages) {
            if (this.isAbleToFlush(flushLeastPages)) {
                if (this.hold()) {
                    int value = getReadPosition();
    
                    try {
                        //We only append data to fileChannel or mappedByteBuffer, never both.
                        if (writeBuffer != null || this.fileChannel.position() != 0) {
                            this.fileChannel.force(false);
                        } else {
                            this.mappedByteBuffer.force();
                        }
                    } catch (Throwable e) {
                        log.error("Error occurred when force data to disk.", e);
                    }
    
                    this.flushedPosition.set(value);
                    this.release();
                } else {
                    log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                    this.flushedPosition.set(getReadPosition());
                }
            }
            return this.getFlushedPosition();
        }

    异步刷盘FlushRealTimeService

    异步刷盘分为定时异步调度刷盘和实时调度刷盘。

     class FlushRealTimeService extends FlushCommitLogService {
            private long lastFlushTimestamp = 0;
            private long printTimes = 0;
    
            public void run() {
                CommitLog.log.info(this.getServiceName() + " service started");
    
                while (!this.isStopped()) {
                    // 实时刷盘还是调度刷盘
                    boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
    
                    // 异步刷盘时间间隔  500ms
                    int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
                    // 每次刷新多少的pagecache页
                    int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
                    // 10s
                    int flushPhysicQueueThoroughInterval =
                        CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
    
                    boolean printFlushProgress = false;
    
                    // Print flush progress
                    long currentTimeMillis = System.currentTimeMillis();
                    if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                        this.lastFlushTimestamp = currentTimeMillis;
                        flushPhysicQueueLeastPages = 0;
                        printFlushProgress = (printTimes++ % 10) == 0;
                    }
    
                    try {
                        if (flushCommitLogTimed) {
                            Thread.sleep(interval);  // 异步调度刷盘  还是  实时刷盘?
                        } else {
                            this.waitForRunning(interval);
                        }
    
                        if (printFlushProgress) {
                            this.printFlushProgress();
                        }
    
                        long begin = System.currentTimeMillis();
                        CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                        long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                        if (storeTimestamp > 0) {
                            CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                        }
                        long past = System.currentTimeMillis() - begin;
                        if (past > 500) {
                            log.info("Flush data to disk costs {} ms", past);
                        }
                    } catch (Throwable e) {
                        CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                        this.printFlushProgress();
                    }
                }
    
                // Normal shutdown, to ensure that all the flush before exit
                boolean result = false;
                for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
                    result = CommitLog.this.mappedFileQueue.flush(0);
                    CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
                }
    
                this.printFlushProgress();
    
                CommitLog.log.info(this.getServiceName() + " service end");
            }
    
            @Override
            public String getServiceName() {
                return FlushRealTimeService.class.getSimpleName();
            }
    
            private void printFlushProgress() {
                // CommitLog.log.info("how much disk fall behind memory, "
                // + CommitLog.this.mappedFileQueue.howMuchFallBehind());
            }
    
            @Override
            public long getJointime() {
                return 1000 * 60 * 5;
            }
        }

    这里面会去判断未flush的数据量有没有超过flushLeastPages,处理的方式就是 (flushOffset - writeOffset)  /   os_page_size  >=  flushLeastPages来决定是和否需要flush。

    异步转存commit服务CommitRealTimeService

    转存服务就是为了把对外内存中的数据写进filechannel。在CommitRealTimeService的run方法中:

    boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);

        public boolean commit(final int commitLeastPages) {
            boolean result = true;
            // 获取committedWhere对应的mappedFile
            MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, false);
            if (mappedFile != null) {
                // commit
                int offset = mappedFile.commit(commitLeastPages);
                long where = mappedFile.getFileFromOffset() + offset;
                result = where == this.committedWhere;
                this.committedWhere = where;
            }
    
            return result;
        }
    protected void commit0(final int commitLeastPages) {
            int writePos = this.wrotePosition.get();
            int lastCommittedPosition = this.committedPosition.get();
    
            if (writePos - this.committedPosition.get() > 0) {
                try {
                    ByteBuffer byteBuffer = writeBuffer.slice();
                    byteBuffer.position(lastCommittedPosition);
                    byteBuffer.limit(writePos);
                    this.fileChannel.position(lastCommittedPosition);
                    // commit主要是读取DM中的数据往文件通道写数据,然后交给异步刷盘服务去flush
                    this.fileChannel.write(byteBuffer);
                    this.committedPosition.set(writePos);
                } catch (Throwable e) {
                    log.error("Error occurred when commit data to FileChannel.", e);
                }
            }
        }

     启动异步转存服务,一定是启用了对外内存池,且设置成异步刷盘的方式。

  • 相关阅读:
    好用的javascript eclipse插件Aptana
    汉字字符串转换成十六进制byte数组,一个汉字存到两个byte里面,大整数存到两个byte里面
    三星 平板手机电脑 Galaxytab2忘记开机密码解决方法
    java float 加减精度问题
    android 增加Addon属性支持的方法
    基于jquery的kendoUI 可以实现快速开发,节省大量web UI开发工作量
    zdz工具箱v1.5 android版本发布了,集成各种个人生活中常用的工具,方便日常使用管理
    存储联系人信息(进程com.motorola.contacts)意外停止 事件提醒eventreminder异常 处理方法
    playframework 同时运行多个项目的方法修改默认端口号
    免费的Git私有代码托管服务
  • 原文地址:https://www.cnblogs.com/gaojy/p/15093529.html
Copyright © 2011-2022 走看看