zoukankan      html  css  js  c++  java
  • hbase源码之 compact源码(一)

    hbase compact流程较多,这里分章节介绍,首先介绍compact在regionserver中的调用流程,并不会涉及真正的compact读取合并文件的流程,后续介绍。

    在regionserver启动时,会初始化compactsplitthread以及CompactionChecker。

    /*
       * Check for compactions requests.
       * 检查合并请求
       */
      ScheduledChore compactionChecker;
    
      // Compactions
      public CompactSplitThread compactSplitThread;
      其中compactsplitthread是用来实现时机的compact以及split流程的类,而compactchecker是用来周期性检查是否执行compact的。以下首先介绍compactchecker线程。

      compactionChecker是ScheduledChore类型,而ScheduledChore是hbase定期执行的一个task,如下所示,由注释可知,是hbase周期性执行的一个task。在Regionserver中可以看到flushChecker成员变量也是ScheduledChore类型的。ScheduledChore继承自Runable,因此是一个线程,主要逻辑在其run方法中。

    /**
     * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once
     * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The
     * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for
     * access to the threads in the core thread pool. If an unhandled exception occurs, the chore
     * cancellation is logged. Implementers should consider whether or not the Chore will be able to
     * execute within the defined period. It is bad practice to define a ScheduledChore whose execution
     * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s
     * thread pool.
     * <p>
     * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as
     * an entry being added to a queue, etc.
     */
    //scheduledChore继承自Runnable  所以Chore是一个线程
    //1. 是hbase定期执行的一个task, 2.在它所在的线程内执行  3.提供了loop循环和sleep机制
    @InterfaceAudience.Private
    public abstract class ScheduledChore implements Runnable {
    }
    

      ScheduledChore中的比较重要和成员变量稍作说明,如下:

     //睡眠周期
      private final int period;
    //上一次执行改task的时间
      private long timeOfLastRun = -1;
    //本次执行的时间
      private long timeOfThisRun = -1;
    //该ScheduledChore是否完成初始化,在第一次执行该check时会执行,调用的是initChore()方法,该方法直接返回true,不做任何逻辑处理。
      private boolean initialChoreComplete = false
    

      其中还有一个重要的成员变量stopper,stopper是实现了Stopper接口的任意一个对象。根据注释可知,stopper是停止ScheduledChore的一种方式,一旦chore察觉到已经stopped了,会cancel它自己。在Regionserver初始化实例化compactionChecker的时候,会将该stopper设置为this,因此,此处以为这当RS stop时,该chore会感知到,自动cancel其compact。具体的代码:

    在ScheduledChore中
    /** * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been * stopped, it will cancel itself. This is particularly useful in the case where a single stopper * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)} * command can cause many chores to stop together. */ private final Stoppable stopper;

    在RegionServer中
    this.compactionChecker = new CompactionChecker(this,this.frequency, stopper: this)

     

      ScheduledChore中最核心的部分,即其run方法,run()方法通过一系列的判断 然后周期性执行chore()方法。下面我们一行行解释。

    public void run() {
        //将timeOfLastRun设置为当前timeOfThisRun,同时将timeOfThisRun设置为当前时间
        updateTimeTrackingBeforeRun();
        if (missedStartTime() && isScheduled()) {
          onChoreMissedStartTime();
          if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " missed its start time");
        } else if (stopper.isStopped() || !isScheduled()) {
          cancel(false);
          cleanup();
          if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " was stopped");
        } else {
          try {
            if (!initialChoreComplete) {
              initialChoreComplete = initialChore();
            } else {
              chore();
            }
          } catch (Throwable t) {
            if (LOG.isErrorEnabled()) LOG.error("Caught error", t);
            if (this.stopper.isStopped()) {
              cancel(false);
              cleanup();
            }
          }
        }
      }
    

      在run方法中,首先调用updateTimeTrackingBeforeRun()方法,该方法很简单,只是简单的update timeOfLastRun和timeOfthsiRun(这两个变量初始化为-1)。每次周期性执行时都会更新。

    /**
       * Update our time tracking members. Called at the start of an execution of this chore's run()
       * method so that a correct decision can be made as to whether or not we missed the start time
       */
      private synchronized void updateTimeTrackingBeforeRun() {
        timeOfLastRun = timeOfThisRun;
        timeOfThisRun = System.currentTimeMillis();
      }
      然后对时间进行判断missedStartTime() && isScheduled(),在compact中isScheduled返回fasle。跳到else if分支,当该chore所依托的载体(此处即为RS)stop了,该chore会自动退出。最终会进入
    最后的else分支。在第一次运行时,initialChoreComplete是false,因此会执行initialChore方法,该方法直接返回true,不会做任何处理。
      在一切都准备好后,会周期执行chore方法,在Regionserver中有CompactionChecker,继承自ScheduledChore, 实现了自己的chore方法,在该方法中会根据判断执行具体的requestCompact方法,下次介绍,
    逻辑中也可以看到,首先是判断是否需要compact,如果需要则不会再判断是否需要majorcompact。如下
    /*
       * Inner class that runs on a long period checking if regions need compaction.
       */
      private static class CompactionChecker extends ScheduledChore {
        private final HRegionServer instance;
        private final int majorCompactPriority;
        private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
        private long iteration = 0;
    
        CompactionChecker(final HRegionServer h, final int sleepTime,
            final Stoppable stopper) {
          //调用父类的构造方法
          super("CompactionChecker", stopper, sleepTime);
          //将载体h赋值给instance
          this.instance = h;
          LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
    
          /* MajorCompactPriority is configurable.
           * If not set, the compaction will use default priority.
           */
          //设置major合并优先级,取参数hbase.regionserver.compactionChecker.majorCompactPriority,默认为Integer.MAX_VALUE
          this.majorCompactPriority = this.instance.conf.
            getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
            DEFAULT_PRIORITY);
        }
    
        //ScheduledChore的run方法会一直调用chore函数
        @Override
        protected void chore() {
          //遍历instance下的所有online的region  进行循环检测
          //onlineRegions是HRegionServer上存储的所有能够提供有效服务的在线Region集合;
          for (HRegion r : this.instance.onlineRegions.values()) {
            if (r == null)
              continue;
            //取出每个region的store
            for (Store s : r.getStores().values()) {
              try {
                //检查是否需要compact的时间间隔,一般情况是在比如memstore flush后或者其他事件触发compact的,但是有时也需要不同的compact策略,
                // 所以需要周期性的检查具体间隔=hbase.server.compactchecker.interval.multiplier * hbase.server.thread.wakefrequency,默认1000;
                long multiplier = s.getCompactionCheckMultiplier();
                assert multiplier > 0;
                // 未到整数倍,跳过,每当迭代因子iteration为合并检查倍增器multiplier的整数倍时,才会发起检查
                if (iteration % multiplier != 0) continue;
                if (s.needsCompaction()) {//// 需要合并的话,发起SystemCompaction请求,此处最终比较的是是否当前hfile数量减去正在compacting的文件数大于设置的compact min
                          //值。若满足则执行systemcompact // Queue a compaction. Will recognize if major is needed. this.instance.compactSplitThread.requestSystemCompaction(r, s, getName() + " requests compaction"); } else if (s.isMajorCompaction()) { if (majorCompactPriority == DEFAULT_PRIORITY || majorCompactPriority > r.getCompactPriority()) { this.instance.compactSplitThread.requestCompaction(r, s, getName() + " requests major compaction; use default priority", null); } else { this.instance.compactSplitThread.requestCompaction(r, s, getName() + " requests major compaction; use configured priority", this.majorCompactPriority, null); } } } catch (IOException e) { LOG.warn("Failed major compaction check on " + r, e); } } } iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); } }

      其中判断是否需要compact比较简单,主要是isMajorCompaction的判断。最主要的逻辑如下:()

        获取下一次majorcompact的时间mc

        获取所有需要compact的file的modify time,已得到所有的file中最小的时间戳lowTimestamp,如果lowTimestamp<now - mc以为这需要进行major compact了。

          如果此时只有一个file,则进行如下判断

            如果未过期,且其block的本定性不要求满足,则进行majorcompact,否则不进行major compact

            如果过期,则进行major compact

    public boolean isMajorCompaction(final Collection<StoreFile> filesToCompact)
    throws IOException {
    boolean result = false;
    //获取下一次major compact的时间
    long mcTime = getNextMajorCompactTime(filesToCompact);
    if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
    return result;
    }
    // TODO: Use better method for determining stamp of last major (HBASE-2990)
    //获取待合并文件中modify的最小时间戳 以及当前时间
    long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
    long now = System.currentTimeMillis();
    if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
    //lowTimestamp < (now - mcTime)即意味着当前时间位于进行major compact的时间范围之内,要进行compact
    // Major compaction time has elapsed.
    long cfTtl = this.storeConfigInfo.getStoreFileTtl();
    if (filesToCompact.size() == 1) {
    // Single file
    StoreFile sf = filesToCompact.iterator().next();
    Long minTimestamp = sf.getMinimumTimestamp();
    //文件存留时间oldest
    long oldest = (minTimestamp == null)
    ? Long.MIN_VALUE
    : now - minTimestamp.longValue();
    if (sf.isMajorCompaction() &&
    (cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
    float blockLocalityIndex = sf.getHDFSBlockDistribution().getBlockLocalityIndex(
    RSRpcServices.getHostname(comConf.conf, false)
    );
    if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
    if (LOG.isDebugEnabled()) {
    LOG.debug("Major compaction triggered on only store " + this +
    "; to make hdfs blocks local, current blockLocalityIndex is " +
    blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() +
    ")");
    }
    result = true;
    } else {
    if (LOG.isDebugEnabled()) {
    LOG.debug("Skipping major compaction of " + this +
    " because one (major) compacted file only, oldestTime " +
    oldest + "ms is < ttl=" + cfTtl + " and blockLocalityIndex is " +
    blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() +
    ")");
    }
    }
    } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {////只有一个hfile(最早的ts>ttl)整个文件过期 => 进行marjor compact
    LOG.debug("Major compaction triggered on store " + this +
    ", because keyvalues outdated; time since last major compaction " +
    (now - lowTimestamp) + "ms");
    result = true;
    }
    } else {
    if (LOG.isDebugEnabled()) {
    LOG.debug("Major compaction triggered on store " + this +
    "; time since last major compaction " + (now - lowTimestamp) + "ms");
    }
    result = true;
    }
    }
    return result;
    }

  • 相关阅读:
    IDEA tomcat热部署方法及乱码问题解决
    访问WEB-INF下JSP资源的几种方式(转)
    SpringMVC 静态资源处理
    SpringMVC中的拦截器
    SpringMVC中的异常处理
    SpringMVC实现文件上传
    IDEA 热部署
    响应数据和结果视图
    SpringMVC中的常用注解
    js获取当前根目录的方法
  • 原文地址:https://www.cnblogs.com/Evil-Rebe/p/11305411.html
Copyright © 2011-2022 走看看