zoukankan      html  css  js  c++  java
  • RocketMQ-存储机制-文件恢复&过期文件删除

    broker过期文件删除机制

    RocketMQ会每隔10秒执行文件清理任务

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    DefaultMessageStore.this.cleanFilesPeriodically();
                }
            }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

    主要是删除commitlog和consumequeue文件

        private void cleanFilesPeriodically() {
            this.cleanCommitLogService.run();
            this.cleanConsumeQueueService.run();
        }

    commitlog文件删除

    触发过期文件删除的条件:

    1)达到配置的时间点

    2)磁盘用了超过85%

    3)手动执行

          public void run() {
                try {
                    //删除已经失效的
                    this.deleteExpiredFiles();
                    // 为啥会有挂起的文件呢?
                    /**
                     * 第一次删除有可能失败,比如有线程引用该过期文件,内存映射清理失败,都可能导致失败
                     * 如果文件已经关闭,删除前检查没有通过,可以通过第二次删除
                     */
                    this.redeleteHangedFile();
                } catch (Throwable e) {
                    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

           private void deleteExpiredFiles() {
                int deleteCount = 0;
                // 文件保留时长 72
                long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
                // 100
                int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
                // 1000*120
                int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
                // 判断有没到凌晨4点
                boolean timeup = this.isTimeToDelete();
                // 空间是否上限
                boolean spacefull = this.isSpaceToDelete();
                // 手动删除    经过20次的调度
                boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
    
                if (timeup || spacefull || manualDelete) {
    
                    if (manualDelete)
                        this.manualDeleteFileSeveralTimes--;
    
                    boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
    
                    log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", //
                            fileReservedTime, //
                            timeup, //
                            spacefull, //
                            manualDeleteFileSeveralTimes, //
                            cleanAtOnce);
    
                    fileReservedTime *= 60 * 60 * 1000;
    
                    deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
                            destroyMapedFileIntervalForcibly, cleanAtOnce);
                    if (deleteCount > 0) {
                    } else if (spacefull) {  // 删除文件失败
                        log.warn("disk space will be full soon, but delete file failed.");
                    }
                }
            }

     删除的数据是针对于3天前的或者当前磁盘已经占用了85%以上。

        public int deleteExpiredFileByTime(final long expiredTime,    // 72h
            final int deleteFilesInterval,  // 0.1s
            final long intervalForcibly,   // 120s
            final boolean cleanImmediately) {
            // commitlog文件可能随时有写入,copy一份不影响写入
            Object[] mfs = this.copyMappedFiles(0);
    
            if (null == mfs)
                return 0;
    
            int mfsLength = mfs.length - 1;
            int deleteCount = 0;
            // 存放要删除的MappedFile
            List<MappedFile> files = new ArrayList<MappedFile>();
            if (null != mfs) {
                for (int i = 0; i < mfsLength; i++) {
                    MappedFile mappedFile = (MappedFile) mfs[i];
                    long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
                    // 如果文件最新修改已经超过三天或者是磁盘空间达到85%以上  而要在此之前需要满足3个条件之一,时间,容量,和手动触发
                    if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
                        // 真正的删除逻辑
                        if (mappedFile.destroy(intervalForcibly)) {
                            files.add(mappedFile);
                            deleteCount++;
    
                            // 当删除的文件达到10的时候 结束
                            if (files.size() >= DELETE_FILES_BATCH_MAX) {
                                break;
                            }
    
                            // 如果没达到十个  并且  还没扫描完所有文件
                            if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                                try {
                                    //等待0.1s
                                    Thread.sleep(deleteFilesInterval);
                                } catch (InterruptedException e) {
                                }
                            }
                        } else {
                            break;
                        }
                    }
                }
            }
    
            deleteExpiredFile(files);
    
            return deleteCount;
        }

    接下来具体看MappedFile的destory过程

    public boolean destroy(final long intervalForcibly) {
            this.shutdown(intervalForcibly);
    
            if (this.isCleanupOver()) {
                try {
                    // 关闭文件通道
                    this.fileChannel.close();
                    log.info("close file channel " + this.fileName + " OK");
    
                    long beginTime = System.currentTimeMillis();
                    // 删除文件
                    boolean result = this.file.delete();
                    log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
                        + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
                        + this.getFlushedPosition() + ", "
                        + UtilAll.computeEclipseTimeMilliseconds(beginTime));
                } catch (Exception e) {
                    log.warn("close file channel " + this.fileName + " Failed. ", e);
                }
    
                return true;
            } else {
                log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
                    + " Failed. cleanupOver: " + this.cleanupOver);
            }
    
            return false;
        }

    MappedFile的shutdown,并释放内存的过程:

    public void shutdown(final long intervalForcibly) {
            if (this.available) {
                this.available = false;
                this.firstShutdownTimestamp = System.currentTimeMillis();
                this.release();
            } else if (this.getRefCount() > 0) {   // 说明已经shutdown过了  但是还有引用并且时间已经超过了intervalForcibly
                if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
                    // 强制回收内存了
                    this.refCount.set(-1000 - this.getRefCount());
                    this.release();
                }
            }
        }
    
        //如果还存在引用,返回再等等
        public void release() {
            long value = this.refCount.decrementAndGet();
            if (value > 0)
                return;
    
            // value <= 0 ,表示已经没有引用了 或者需要强制cleanup的时候
            synchronized (this) {
                //清理映射的所有内存数据对象,释放内存
                this.cleanupOver = this.cleanup(value);
            }
        }

    ConsumeQueue文件删除

            private void deleteExpiredFiles() {
                // 0.1s
                int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
    
                //得到commitlog中第一个文件的起始物理offset
                long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
                if (minOffset > this.lastPhysicalMinOffset) {  //发现上次的已经变小了   说明commitlog已经发生过删除操作了
                    this.lastPhysicalMinOffset = minOffset;
    
                    ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
    
                    for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
                        for (ConsumeQueue logic : maps.values()) {
                            // 对某一个消费队列做删除  参数是commitlog最小的物理点位
                            int deleteCount = logic.deleteExpiredFile(minOffset);
    
                            if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
                                try {
                                    // 当上一个ConsumeQueue成功删除之后,下一个ConsumeQueue删除需要等待0.1s
                                    Thread.sleep(deleteLogicsFilesInterval);
                                } catch (InterruptedException ignored) {
                                }
                            }
                        }
                    }
    
                    // 删除索引文件
                    DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
                }
            }
    public int deleteExpiredFile(long offset) {
            int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE);
            // 重新计算最小的逻辑点位
            this.correctMinOffset(offset);
            return cnt;
        }

     deleteExpiredFileByOffset方法从第一个consumeQueue开始遍历,拿最后一个offset获取其物理点位,并比较当前commitlog中最小的物理点位。如果小了,则把这个comsumequeue删除。

    其次遍历所有的cosumequeue,并从第一个offset开始,直到发现其指定的最小的物理点位>=当前commitlog中最小的物理点位

    Index文件删除

     Index文件的删除原理和consumeQueue一样。

        public void deleteExpiredFile(long offset) {
            Object[] files = null;
            try {
                this.readWriteLock.readLock().lock();
                if (this.indexFileList.isEmpty()) {
                    return;
                }
    
                long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();
                if (endPhyOffset < offset) {  // 这个判断说明存在了无效的index数据
                    files = this.indexFileList.toArray();
                }
            } catch (Exception e) {
                log.error("destroy exception", e);
            } finally {
                this.readWriteLock.readLock().unlock();
            }
    
            // 遍历索引文件   如果发现存在一个索引文件其最后一个物理offset比当前最小的物理ofset还小,说明这个index文件可以删除了
            if (files != null) {
                List<IndexFile> fileList = new ArrayList<IndexFile>();
                for (int i = 0; i < (files.length - 1); i++) {
                    IndexFile f = (IndexFile) files[i];
                    if (f.getEndPhyOffset() < offset) {
                        fileList.add(f);
                    } else {
                        break;
                    }
                }
    
                this.deleteExpiredFile(fileList);
            }
        }

    broker存储文件恢复

    private void recover(final boolean lastExitOK) {
            // 恢复consumeQueue
            this.recoverConsumeQueue();
    
    
            if (lastExitOK) {
                // 上次正常退出
                this.commitLog.recoverNormally();
            } else {
                // 上次非正常退出
                this.commitLog.recoverAbnormally();
            }
    
            //consumerlog写进度  保存
    this.recoverTopicQueueTable();
    }

    恢复consumelog的逻辑其实很简单,从倒数第三个文件开始,逐条遍历消息,如果取出的物理点位大于0并且message的size大于0,说明数据有效。

        public void recover() {
            final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
            if (!mappedFiles.isEmpty()) {
    
                int index = mappedFiles.size() - 3;
                if (index < 0)
                    index = 0;
    
                int mappedFileSizeLogics = this.mappedFileSize;
                MappedFile mappedFile = mappedFiles.get(index);
                ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
                long processOffset = mappedFile.getFileFromOffset();
                long mappedFileOffset = 0;
                long maxExtAddr = 1;
                while (true) {
                    for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
                        long offset = byteBuffer.getLong();
                        int size = byteBuffer.getInt();
                        long tagsCode = byteBuffer.getLong();
    
                        if (offset >= 0 && size > 0) {
                            mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
                            this.maxPhysicOffset = offset;
                            if (isExtAddr(tagsCode)) {
                                maxExtAddr = tagsCode;
                            }
                        } else {
                            log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "
                                + offset + " " + size + " " + tagsCode);
                            break;
                        }
                    }
    
                    // 读完一个comsumelog
                    if (mappedFileOffset == mappedFileSizeLogics) {
                        index++;
                        if (index >= mappedFiles.size()) {
    
                            log.info("recover last consume queue file over, last maped file "
                                + mappedFile.getFileName());
                            break;
                        } else {
                            // 读下一个文件
                            mappedFile = mappedFiles.get(index);
                            byteBuffer = mappedFile.sliceByteBuffer();
                            processOffset = mappedFile.getFileFromOffset();
                            mappedFileOffset = 0;
                            log.info("recover next consume queue file, " + mappedFile.getFileName());
                        }
                    } else {
                        log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
                            + (processOffset + mappedFileOffset));
                        break;
                    }
                }
    
                processOffset += mappedFileOffset;
                this.mappedFileQueue.setFlushedWhere(processOffset);
                this.mappedFileQueue.setCommittedWhere(processOffset);
                this.mappedFileQueue.truncateDirtyFiles(processOffset);
    
                if (isExtReadEnable()) {
                    this.consumeQueueExt.recover();
                    log.info("Truncate consume queue extend file by max {}", maxExtAddr);
                    this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
                }
            }
        }

    恢复commitlog分正常退出和非正常退出。

    正常退出的commitlog所有数据都是flush完成的,所以只要从倒数第三个文件开始恢复即可,遍历每一个message,并校验其CRC。

    非正常退出则从最后一个文件开始恢复,一般出现问题的都是最后一个文件,然后获取文件中的第一个message,其存储时间是否小于checkpoint时间点中的最小的一个,如果是,表示其就是需要恢复的起始文件。然后检验每一个message的CRC,并将通过校验的数据dispatch到consumelog和index文件中。

    当前consumer写的进度,写的逻辑点位保存

    private void recoverTopicQueueTable() {
            HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
            // 第一个commit log中的起始点位
            long minPhyOffset = this.commitLog.getMinOffset();
            for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
                for (ConsumeQueue logic : maps.values()) {
                    String key = logic.getTopic() + "-" + logic.getQueueId();
                    // consumerlog写进度保存
                    table.put(key, logic.getMaxOffsetInQueue());
                    // commitlog中最小的物理点位 来计算出consumelog中最小的逻辑点位minLogicOffset
                    logic.correctMinOffset(minPhyOffset);
                }
            }
    
            // <String/* topic-queueid */, Long/* logicOffset */>
            this.commitLog.setTopicQueueTable(table);
        }
  • 相关阅读:
    matlab中输入x. 与x的区别
    nginx 访问控制之deny allow
    nginx 反向代理之 负载均衡
    http 缓存机制简介
    nginx 反向代理之 proxy_cache
    nginx 反向代理之 proxy_buffering
    nginx 反向代理之 proxy_redirect
    nginx 反向代理之 proxy_set_header
    nginx 反向代理之 proxy_pass
    nginx 反向代理配置示例
  • 原文地址:https://www.cnblogs.com/gaojy/p/15096894.html
Copyright © 2011-2022 走看看