zoukankan      html  css  js  c++  java
  • RocketMQ(4.8.0)——Broker 过期文件删除机制

    Broker 过期文件删除机制

      RocketMQ 中主要保存了 CommitLog、Consume Queue、Index File 三种数据文件。由于内存和磁盘都是有限的资源,Broker 不可能永久地保存所有数据,所以一些超过保存期限的数据会被定期删除。RocketMQ 通过设置数据过期时间来删除额外的数据文件,具体的实现逻辑是通过 D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreDefaultMessageStore.start() 方法启动的周期性执行方法 cleanFilesPeriodically()方法,该方法的代码路径:D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreDefaultMessageStore.java 来实现的。

     一、CommitLog 文件的删除过程

      CommitLog 文件由 D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreDefaultMessageStore.CleanCommitLogService 类提供的一个线程服务周期执行删除操作,代码路径:

      1     class CleanCommitLogService {
      2 
      3         private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20;
      4         private final double diskSpaceWarningLevelRatio =
      5             Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90"));
      6 
      7         private final double diskSpaceCleanForciblyRatio =
      8             Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85"));
      9         private long lastRedeleteTimestamp = 0;
     10 
     11         private volatile int manualDeleteFileSeveralTimes = 0;
     12 
     13         private volatile boolean cleanImmediately = false;
     14 
     15         public void excuteDeleteFilesManualy() {
     16             this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;
     17             DefaultMessageStore.log.info("executeDeleteFilesManually was invoked");
     18         }
     19 
     20         public void run() {
     21             try {
     22                 this.deleteExpiredFiles();  #删除过期文件
     23 
     24                 this.redeleteHangedFile();
     25             } catch (Throwable e) {
     26                 DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
     27             }
     28         }
     29 
     30         private void deleteExpiredFiles() {
     31             int deleteCount = 0;
     32             long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
     33             int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
     34             int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
     35 
     36             boolean timeup = this.isTimeToDelete();
     37             boolean spacefull = this.isSpaceToDelete();
     38             boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
     39 
     40             if (timeup || spacefull || manualDelete) {
     41 
     42                 if (manualDelete)
     43                     this.manualDeleteFileSeveralTimes--;
     44 
     45                 boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
     46 
     47                 log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
     48                     fileReservedTime,
     49                     timeup,
     50                     spacefull,
     51                     manualDeleteFileSeveralTimes,
     52                     cleanAtOnce);
     53 
     54                 fileReservedTime *= 60 * 60 * 1000;
     55 
     56                 deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
     57                     destroyMapedFileIntervalForcibly, cleanAtOnce);
     58                 if (deleteCount > 0) {
     59                 } else if (spacefull) {
     60                     log.warn("disk space will be full soon, but delete file failed.");
     61                 }
     62             }
     63         }
     64 
     65         private void redeleteHangedFile() {
     66             int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval();
     67             long currentTimestamp = System.currentTimeMillis();
     68             if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) {
     69                 this.lastRedeleteTimestamp = currentTimestamp;
     70                 int destroyMapedFileIntervalForcibly =
     71                     DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
     72                 if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) {
     73                 }
     74             }
     75         }
     76 
     77         public String getServiceName() {
     78             return CleanCommitLogService.class.getSimpleName();
     79         }
     80 
     81         private boolean isTimeToDelete() {
     82             String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
     83             if (UtilAll.isItTimeToDo(when)) {
     84                 DefaultMessageStore.log.info("it's time to reclaim disk space, " + when);
     85                 return true;
     86             }
     87 
     88             return false;
     89         }
     90 
     91         private boolean isSpaceToDelete() {
     92             double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
     93 
     94             cleanImmediately = false;
     95 
     96             {
     97                 double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
     98                 if (physicRatio > diskSpaceWarningLevelRatio) {
     99                     boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
    100                     if (diskok) {
    101                         DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
    102                     }
    103 
    104                     cleanImmediately = true;
    105                 } else if (physicRatio > diskSpaceCleanForciblyRatio) {
    106                     cleanImmediately = true;
    107                 } else {
    108                     boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
    109                     if (!diskok) {
    110                         DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
    111                     }
    112                 }
    113 
    114                 if (physicRatio < 0 || physicRatio > ratio) {
    115                     DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
    116                     return true;
    117                 }
    118             }
    119 
    120             {
    121                 String storePathLogics = StorePathConfigHelper
    122                     .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
    123                 double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
    124                 if (logicsRatio > diskSpaceWarningLevelRatio) {
    125                     boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
    126                     if (diskok) {
    127                         DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full");
    128                     }
    129 
    130                     cleanImmediately = true;
    131                 } else if (logicsRatio > diskSpaceCleanForciblyRatio) {
    132                     cleanImmediately = true;
    133                 } else {
    134                     boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
    135                     if (!diskok) {
    136                         DefaultMessageStore.log.info("logics disk space OK " + logicsRatio + ", so mark disk ok");
    137                     }
    138                 }
    139 
    140                 if (logicsRatio < 0 || logicsRatio > ratio) {
    141                     DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + logicsRatio);
    142                     return true;
    143                 }
    144             }
    145 
    146             return false;
    147         }
    148 
    149         public int getManualDeleteFileSeveralTimes() {
    150             return manualDeleteFileSeveralTimes;
    151         }
    152 
    153         public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) {
    154             this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes;
    155         }
    156         public boolean isSpaceFull() {
    157             String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
    158             double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
    159             double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
    160             if (physicRatio > ratio) {
    161                 DefaultMessageStore.log.info("physic disk of commitLog used: " + physicRatio);
    162             }
    163             if (physicRatio > this.diskSpaceWarningLevelRatio) {
    164                 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
    165                 if (diskok) {
    166                     DefaultMessageStore.log.error("physic disk of commitLog maybe full soon, used " + physicRatio + ", so mark disk full");
    167                 }
    168 
    169                 return true;
    170             } else {
    171                 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
    172 
    173                 if (!diskok) {
    174                     DefaultMessageStore.log.info("physic disk space of commitLog OK " + physicRatio + ", so mark disk ok");
    175                 }
    176 
    177                 return false;
    178             }
    179         }
    180     }

    this.deleteExpiredFiles(),当满足3个条件时执行删除操作:

    • 第一,当前时间等于已经配置的删除时间。
    • 第二,磁盘使用空间超过85%。
    • 第三,手动执行删除

    上面代码,第56行,DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,destroyMapedFileIntervalForcibly, cleanAtOnce),代码路径:D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreDefaultMessageStore.java,该方法调用的了return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately),代码路径:D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreCommitLog.java。我么讲讲this.mappedFileQueue.deleteExpiredFileByTime()方法是如何删除 CommitLog文件的,代码如下:

    1     public int deleteExpiredFile(
    2         final long expiredTime,
    3         final int deleteFilesInterval,
    4         final long intervalForcibly,
    5         final boolean cleanImmediately
    6     ) {
    7         return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
    8     }
     1     public int deleteExpiredFileByTime(final long expiredTime,
     2         final int deleteFilesInterval,
     3         final long intervalForcibly,
     4         final boolean cleanImmediately) {
     5         Object[] mfs = this.copyMappedFiles(0);  #全部 commitLog 文件
     6 
     7         if (null == mfs)
     8             return 0;
     9 
    10         int mfsLength = mfs.length - 1;
    11         int deleteCount = 0;
    12         List<MappedFile> files = new ArrayList<MappedFile>(); #已经删除的文件
    13         if (null != mfs) {
    14             for (int i = 0; i < mfsLength; i++) {
    15                 MappedFile mappedFile = (MappedFile) mfs[i];
    16                 long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
    17                 if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { #删除条件:过期或者必须立即删除
    18                     if (mappedFile.destroy(intervalForcibly)) {  #关闭文件映射,删除物理文件
    19                         files.add(mappedFile);
    20                         deleteCount++;
    21 
    22                         if (files.size() >= DELETE_FILES_BATCH_MAX) {
    23                             break;
    24                         }
    25 
    26                         if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
    27                             try {
    28                                 Thread.sleep(deleteFilesInterval);
    29                             } catch (InterruptedException e) {
    30                             }
    31                         }
    32                     } else {
    33                         break;
    34                     }
    35                 } else {
    36                     //avoid deleting files in the middle
    37                     break;
    38                 }
    39             }
    40         }
    41 
    42         deleteExpiredFile(files); #删除内存中的文件信息
    43 
    44         return deleteCount;
    45     }

    deleteExpiredFileByTime()方法的实现分为如下两步:

    • 克隆全部的 CommitLog 文件。CommitLog 文件可能随时有数据写入,为了不影响正常写入,所以可能一份来操作。
    • 检查每一个 CommitLog 文件是否过期,如果已过期则立即通过调用 destroy() 方法进行删除。在删除前会做一系列检查:检查文件被引用的次数、清理映射的所有内存数据对象、释放内存。清理完成后,删除物理文件。

     二、Consume Queue、Index File 文件的删除过程

      Consume Queue 和 Index File 都是索引文件,在 CommitLog 文件被删除后,对应的索引文件其实没有存在的意义,并且占用磁盘空间,所以这些文件应该被删除。

      RocketMQ 的删除策略是定时检查,满足删除条件时会删除过期或者无意义的文件。

      程序调用 D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreDefaultMessageStore.java 中 deleteExpiredFiles(),代码如下:

     1         private void deleteExpiredFiles() {
     2             int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
     3 
     4             long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();   #CommitLog 全部文件中的最小物理位点。
     5             if (minOffset > this.lastPhysicalMinOffset) { #上次检查到的最小物理位点。当 if (minOffset > this.lastPhysicalMinOffset) 条件成立时,说明当前有新数据没有被检查过,就会
     6             调用 org.apache.rocketmq.store.MappedFileQueue.deleteExpiredFileByOffset()方法进行检查及删除。
     7                 this.lastPhysicalMinOffset = minOffset;
     8 
     9                 ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
    10 
    11                 for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
    12                     for (ConsumeQueue logic : maps.values()) {
    13                         int deleteCount = logic.deleteExpiredFile(minOffset);
    14 
    15                         if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
    16                             try {
    17                                 Thread.sleep(deleteLogicsFilesInterval);
    18                             } catch (InterruptedException ignored) {
    19                             }
    20                         }
    21                     }
    22                 }
    23 
    24                 DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
    25             }
    26         }

      org.apache.rocketmq.store.MappedFileQueue.deleteExpiredFileByOffset() 的代码路径:D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreMappedFileQueue.java,代码如下:

     1     public int deleteExpiredFileByOffset(long offset, int unitSize) {
     2         Object[] mfs = this.copyMappedFiles(0);
     3 
     4         List<MappedFile> files = new ArrayList<MappedFile>();
     5         int deleteCount = 0;
     6         if (null != mfs) {
     7 
     8             int mfsLength = mfs.length - 1;
     9 
    10             for (int i = 0; i < mfsLength; i++) {
    11                 boolean destroy;
    12                 MappedFile mappedFile = (MappedFile) mfs[i];
    13                 SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
    14                 if (result != null) {
    15                     long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
    16                     result.release();
    17                     destroy = maxOffsetInLogicQueue < offset;  #maxOffsetInLogicQueue:Consume Queue 中最大的位点值。 offset:检查的最小位点。如果maxOffsetInLogicQueue < offset 成立,则说明 Consume Queue 已经过期了,可以删除。
    18                     if (destroy) {
    19                         log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
    20                             + maxOffsetInLogicQueue + ", delete it");
    21                     }
    22                 } else if (!mappedFile.isAvailable()) { // Handle hanged file. 说明存储服务已经被关闭(或者该文件曾经被删除,但是删除失败)
    23                     log.warn("Found a hanged consume queue file, attempting to delete it.");
    24                     destroy = true;
    25                 } else {
    26                     log.warn("this being not executed forever.");
    27                     break;
    28                 }
    29 
    30                 if (destroy && mappedFile.destroy(1000 * 60)) {
    31                     files.add(mappedFile);
    32                     deleteCount++;
    33                 } else {
    34                     break;
    35                 }
    36             }
    37         }
    38 
    39         deleteExpiredFile(files);
    40 
    41         return deleteCount;
    42     }
  • 相关阅读:
    Debug和Release版本区别
    清空模拟器中的app
    在项目中移除CocoaPods
    设置导航栏 self.navigationItem.titleView 居中
    Verify the Developer App certificate for youraccount is trusted on your device
    字典转json
    self.navigationController.navigationBar.translucent = YES航栏的属性默认 YES是透明效果并且主view不会偏移 NO是导航栏不透明 主view会向下偏移64px
    归档-对模型数组对象(存储到本地的plist文件)也数组里存放的是模型
    FMDB存储模型对象(以二进制存储)用NSKeyedArchiver archivedDataWithRootObject序列号,NSKeyedUnarchiver unarchiveObjectWithData反序列化(重点坑是sql语句@"insert into t_newsWithChannel (nwesName,newsType) values (?,?)")一定要用占位符
    根据日期计算发布时间段(NSCalendar)
  • 原文地址:https://www.cnblogs.com/zuoyang/p/14465764.html
Copyright © 2011-2022 走看看