zoukankan      html  css  js  c++  java
  • HBase源代码分析之compact请求发起时机、推断条件等详情(一)

            一般说来,不论什么一个比較复杂的分布式系统。针对可以使得其性能得到大幅提升的某一内部处理流程,必定有一个定期检查机制,使得该流程在满足一定条件的情况下,可以自发的进行。这样才可以非常好的体现出复杂系统的自我适应与自我调节能力。我们知道,HBase内部的compact处理流程是为了解决MemStore Flush之后,文件数目太多,导致读数据性能大大下降的一种自我调节手段。它会将文件依照某种策略进行合并。大大提升HBase的数据读性能。

    那么,基于我刚才的陈述,compact流程是否有一个定期检查机制呢?在满足什么条件的情况下,会触发compact请求呢?

            针对第一个问题。回答当然是肯定的。在HRegionServer内部,有一个成员变量,定义例如以下:

      /*
       * Check for compactions requests.
       * 检查合并请求
       */
      Chore compactionChecker;
             单从凝视。我们就能够看出,这个compactionChecker成员变量就是一个检查合并请求的Chore。那么什么是Chore呢?先来看下它的定义、成员变量以及构造函数。先来看下类的定义,代码例如以下:

    /**
     * Chore is a task performed on a period in hbase.  The chore is run in its own
     * thread. This base abstract class provides while loop and sleeping facility.
     * If an unhandled exception, the threads exit is logged.
     * Implementers just need to add checking if there is work to be done and if
     * so, do it.  Its the base of most of the chore threads in hbase.
     *
     * <p>Don't subclass Chore if the task relies on being woken up for something to
     * do, such as an entry being added to a queue, etc.
     * 
     * Chore是定期在HBase中运行的一个任务。

    Chore在它所在的线程内运行。

    这个基础抽象类提供了loop循环和sleep机制。 */ @InterfaceAudience.Private public abstract class Chore extends HasThread { }

            首先。从类的定义我们能够看到,Chore继承自HasThread类,而HasThread类是一个实现了Runnable接口的抽象类,而且定义了一个抽象的run()方法。

    自然,Chore就是一个线程了。而通过凝视,我们能够非常清晰的知道下面三点:1、Chore是定期在HBase中运行的一个任务;2、Chore在它所在的线程内运行。3、这个基础抽象类提供了loop循环和sleep机制。

            再来看下它的成员变量。主要包括下面几个:

    private final Sleeper sleeper;// 睡眠器
    protected final Stoppable stopper;
            上面提到,Chore提供了sleep机制,那么这个机制就是依靠Sleeper类型的sleeper这个成员变量来实现的,而stopper则是实现了Stoppable接口的不论什么实例,实际上是工作线程所依附的可停止执行的载体,比方HRegionServer,载体停止执行后,工作线程。

    等到分析其run()方法时,我们再详细分析这两个变量。

            然后。我们再看下Chore的构造方法,代码例如以下:

      /**
       * @param p Period at which we should run.  Will be adjusted appropriately
       * should we find work and it takes time to complete.
       * @param stopper When {@link Stoppable#isStopped()} is true, this thread will
       * cleanup and exit cleanly.
       * 
       * 构造方法。须要name、p和stopper三个參数
       * p为run方法循环的周期
       * 
       */
      public Chore(String name, final int p, final Stoppable stopper) {
        super(name);
        if (stopper == null){
          throw new NullPointerException("stopper cannot be null");
        }
        this.sleeper = new Sleeper(p, stopper);
        this.stopper = stopper;
      }
            它须要name、p和stopper三个參数。name非常easy。String类型的线程名字而已,关键在于这个int类型的p和Stoppable类型的stopper。构造函数利用p和stopper生成了一个睡眠期sleeper,并将stopper赋值给其同名成员变量。

            以下,我们来看下这个sleeper的实现吧!Sleeper类中定义了4个关键变量和两个关键方法。实现了一个简单的睡眠器,其4个关键成员变量例如以下:

      private final int period;
      private final Stoppable stopper;
      private final Object sleepLock = new Object();
      private boolean triggerWake = false;
            当中,period代表了睡眠周期,它是由上诉參数p赋值的。而stopper的含义与Chore中同名变量一样。

    sleepLock不过一个Object对象,依靠它的wait()方法,我们能够实现对象等待一段时间;triggerWake是一个标志位,依靠它被设置为true,我们能够跳出睡眠,又一次复苏。


            再来看下它的终于要的两个方法,第一个便是睡眠器最基本的功能性方法--睡眠sleep(),代码例如以下:

      /**
       * Sleep for period adjusted by passed <code>startTime<code>
       * @param startTime Time some task started previous to now.  Time to sleep
       * will be docked current time minus passed <code>startTime<code>.
       */
      public void sleep(final long startTime) {
        
    	// 假设stopper已停止,直接返回
    	if (this.stopper.isStopped()) {
          return;
        }
        
        // 当前时间now
        long now = System.currentTimeMillis();
        
        // 计算最新的须要等待的时间,循环周期减去已过去的时间
        long waitTime = this.period - (now - startTime);
        
        // 假设等待时间waitTime已超过周期period,那么直接将period赋值给waitTime。并记录警告信息
        if (waitTime > this.period) {
          LOG.warn("Calculated wait time > " + this.period +
            "; setting to this.period: " + System.currentTimeMillis() + ", " +
            startTime);
          waitTime = this.period;
        }
        
        // 当等待时间waitTime大于0时。一直循环
        while (waitTime > 0) {
          long woke = -1;
          try {
        	  
        	// 推断标志位triggerWake。假设为true,
        	// 即假设其它线程已唤醒该睡眠期,跳出循环,复位triggerWake为fale。直接返回,不再睡眠
            synchronized (sleepLock) {
              if (triggerWake) break;
              // 否则,依靠sleepLock等待waitTime时间
              sleepLock.wait(waitTime);
            }
            
            // 计算已睡眠时间slept
            woke = System.currentTimeMillis();
            long slept = woke - now;
            
            // 假设slept时间已超出周期10s,记录警告信息
            if (slept - this.period > MINIMAL_DELTA_FOR_LOGGING) {
              LOG.warn("We slept " + slept + "ms instead of " + this.period +
                  "ms, this is likely due to a long " +
                  "garbage collecting pause and it's usually bad, see " +
                  "http://hbase.apache.org/book.html#trouble.rs.runtime.zkexpired");
            }
          } catch(InterruptedException iex) {
            // We we interrupted because we're meant to stop?

    If not, just // continue ignoring the interruption if (this.stopper.isStopped()) { return; } } // 又一次计算等待时间:等待周期减去已睡眠时间 // Recalculate waitTime. woke = (woke == -1)?

    System.currentTimeMillis(): woke; waitTime = this.period - (woke - startTime); } // 标志位triggerWake复位为false,须要在sleepLock上用synchronizedkeyword进行同步 synchronized(sleepLock) { triggerWake = false; } }

            这种方法会依据传入的參数睡眠的起始时间startTime,结合睡眠器构造时设定好的睡眠周期period。以及当前时间now。计算出等待时间waitTime。而后。在一个等待时间waitTime大于0的while循环内,首先推断标志位triggerWake。假设其为true。则break,复位triggerWake并停止休眠,否则,利用sleepLock的wait()方法休眠指定时间waitTime,直到时间结束或者有其它线程设置triggerWake标志位为true并通过sleepLock的notifyAll()方法唤醒sleepLock对象,让其wait()方法抛出InterruptedException异常。继而又一次计算等待时间,并进入下一个循环。此时。标志位triggerWake已设置为true,则直接跳出循环,结束休眠。而在休眠时间未到的情况下结束休眠的一种手段,就是通过调用另外一个非常关键的方法skipSleepCycle()来实现的,代码非常easy。不做解释:

      /**
       * If currently asleep, stops sleeping; if not asleep, will skip the next
       * sleep cycle.
       */
      public void skipSleepCycle() {
        synchronized (sleepLock) {
          // 标志位triggerWake设置为true
          triggerWake = true;
          // 唤醒等待在sleepLock上的其他线程
          sleepLock.notifyAll();
        }
      }

            接下来,再看下Chore中最重要的run()方法。定义例如以下:

      /**
       * @see java.lang.Thread#run()
       */
      @Override
      public void run() {
        try {
          boolean initialChoreComplete = false;
          
          // 仅仅要stopper不停止,while循环就继续啊
          while (!this.stopper.isStopped()) {
        	// 開始时间
            long startTime = System.currentTimeMillis();
            try {
              // 假设是第一次循环,完毕初始化工作
              if (!initialChoreComplete) {
                initialChoreComplete = initialChore();
              } else {
            	// 第一次后的每次循环,则周期性的调用chore()方法
                chore();
              }
            } catch (Exception e) {
              LOG.error("Caught exception", e);
              if (this.stopper.isStopped()) {
                continue;
              }
            }
            
            // 睡眠期睡眠一定的时间,然后再去调用chore()方法
            this.sleeper.sleep(startTime);
          }
        } catch (Throwable t) {
          LOG.fatal(getName() + "error", t);
        } finally {
          LOG.info(getName() + " exiting");
          cleanup();
        }
      }
            这个run()方法的运行逻辑很easy。仅仅要stopper不停止,while循环就持续进行,首先。第一次进入run()方法时,标志位initialChoreComplete初始化为false,标志着Chore尚未初始化完毕,此时调用initialChore()做初始化工作,并返回初始化结果赋值给标志位initialChoreComplete,这个initialChore()眼下是一个空方法,仅仅返回true,而 第一次后的每次循环,则周期性的调用chore()方法,每次调用完chore()方法后,都通过睡眠器sleeper的sleep()方法,从每次进入while循环时获取的时刻startTime開始,休眠Chore构造函数传入的p时间,休眠过后再次运行chore()方法。假设stopper已停止,或者发生Throwable异常,则Chore调用cleanup()完毕清理工作。

            好了,Chore的执行机制到这里。已经给大家解说清楚了。那么,再回到文章的初始,HRegionServer中名为compactionChecker的这个Chore,究竟是怎样初始化,而且都做了哪些事情呢?让我们继续往下看。

            在前面解说compact合并线程CompactSplitThread的文章中。我们了解过HRegionServer的initializeThreads()方法,它负责初始化工作在HRegionServer上的各种线程,包含CompactSplitThread,当然也就包含CompactionChecker。代码例如以下:

        this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
            它是通过构造一个CompactionChecker对象来完毕初始化的。

    其构造方法例如以下:

        // 构造函数
        CompactionChecker(final HRegionServer h, final int sleepTime,
            final Stoppable stopper) {
          
          // 调用父类Chore的构造方法
          super("CompactionChecker", sleepTime, h);
          
          // 将载体HRegionServer赋值给instance变量
          this.instance = h;
          LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
    
          /* MajorCompactPriority is configurable.
           * If not set, the compaction will use default priority.
           */
          // 设置major合并优先级。取參数hbase.regionserver.compactionChecker.majorCompactPriority。默觉得Integer.MAX_VALUE
          this.majorCompactPriority = this.instance.conf.
            getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
            DEFAULT_PRIORITY);
        }
            非常easy,调用父类Chore的构造方法,设置上面提到的线程工作周期period和stopper,而这个工作周期period就是HRegionServer的threadWakeFrequency变量,它取自參数hbase.server.thread.wakefrequency,默觉得10s,它是HBase上众多后台工作线程通用的工作频率,比方周期性MemStore刷新线程等。然后,构造方法还会将载体HRegionServer赋值给instance变量,并设置major合并优先级,取參数hbase.regionserver.compactionChecker.majorCompactPriority,默觉得Integer.MAX_VALUE。

            不止如此,在HRegionServer上的startServiceThreads()方法中,会将该线程设置为一个后台线程。目的就是为了方便虚拟机管理,当全部用户线程退出后。该后台线程也会自己主动退出,代码例如以下:

        Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), getName() +
          ".compactionChecker", uncaughtExceptionHandler);
            至此,compactionChecker的初始化已完毕。那么它是怎样工作的呢?换句话,为了确保回答问题的全面性,也就是上面我们提到的第二个还没回答的问题:在满足什么条件的情况下,会触发compact请求呢?既然是个Chore,我们看下CompactionChecker的chore()方法,代码例如以下:

        @Override
        // 线程的run方法会一直调用的函数chore()
        protected void chore() {
          
          // 循环检測HRegionServer的onlineRegions中的每一个HRegion
          for (HRegion r : this.instance.onlineRegions.values()) {
            
        	// 相应HRegion为null的话。进入下一个HRegion的循环
        	if (r == null)
              continue;
            
        	// 取出每一个Region中的Store
            for (Store s : r.getStores().values()) {
              try {
            	
            	// 调用Store的getCompactionCheckMultiplier()方法,获取合并检查倍增器multiplier
                long multiplier = s.getCompactionCheckMultiplier();
                
                // 合并检查倍增器multiplier必须确保大于0
                assert multiplier > 0;
                
                // 未到整数倍,跳过,每当迭代因子iteration为合并检查倍增器multiplier的整数倍时,才会发起检查
                if (iteration % multiplier != 0) continue;
                
                if (s.needsCompaction()) {// 须要合并的话。发起SystemCompaction请求
                	
                  // Queue a compaction. Will recognize if major is needed.
                  this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
                      + " requests compaction");
                } else if (s.isMajorCompaction()) {// 假设是Major合并的话,依据配置的major合并优先级majorCompactPriority确定发起合并请求
                  
                  // 假设工作线程中设置的合并优先级为Integer.MAX_VALUE,即默认,或者HRegion的合并优先级小于设置值的话
                  if (majorCompactPriority == DEFAULT_PRIORITY
                      || majorCompactPriority > r.getCompactPriority()) {
                	  
                	// 使用默认优先级发起合并请求
                    this.instance.compactSplitThread.requestCompaction(r, s, getName()
                        + " requests major compaction; use default priority", null);
                  } else {
                	  
                	// 使用设置的优先级发起合并请求
                    this.instance.compactSplitThread.requestCompaction(r, s, getName()
                        + " requests major compaction; use configured priority",
                      this.majorCompactPriority, null);
                  }
                }
              } catch (IOException e) {
                LOG.warn("Failed major compaction check on " + r, e);
              }
            }
          }
          
          // 迭代计数器设置。累加1
          iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
        }

            整个工作流程非常easy,chore()方法周期性的检測HRegionServer中全部在线Region的每一个HStore,调用Store的getCompactionCheckMultiplier()方法。获取合并检查倍增器multiplier。当迭代因子iteration为合并检查倍增器multiplier的整数倍时,发起针对该HStore是否须要compact的检查。假设须要合并,则依据合并的种类,确定发起何种合并请求,并且假设是Major合并的话,则须要确定优先级。毕竟Major是最耗费资源的compact,为了合理有效的利用资源,也为了防止系统性能瓶颈。添加优先级就显得十分有必要了。整个流程比較清晰,并且上述代码凝视也非常具体,读者可自行补脑。

            以下,我们针对几个要点进行简要说明:

            1、onlineRegions是HRegionServer上存储的全部可以提供有效服务的在线Region集合;

            2、整个检查过程是先轮询HRegion。然后针对HRegion上每一个HStore进行的。

    而且,很重要的是。它并非对每一个HRegion上全部HStore挨个检查,而是利用取余算法,对Region上的HStore进行检查。而这个过程的关键。就是上述代码中的合并检查倍增器multiplier,该值假设配置为1的话,则是挨个检查,假设配置成2的话,则是隔一个检查一个。依次类推。这个multiplier的获取。是通过HStore的getCompactionCheckMultiplier()方法获取的,它实际上是获取的HStore的compactionCheckMultiplier变量,而其初始化。则是取參数hbase.server.compactchecker.interval.multiplier,默觉得1000。代码例如以下:

      @Override
      public long getCompactionCheckMultiplier() {
        return this.compactionCheckMultiplier;
      }
        // 取參数hbase.server.compactchecker.interval.multiplier,默觉得1000
        this.compactionCheckMultiplier = conf.getInt(
            COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
        if (this.compactionCheckMultiplier <= 0) {
          LOG.error("Compaction check period multiplier must be positive, setting default: "
              + DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
          this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
        }
            3、对于是否须要合并。则是通过HStore的needsCompaction()方法推断的,代码例如以下:

      @Override
      public boolean needsCompaction() {
        return this.storeEngine.needsCompaction(this.filesCompacting);
      }
            而通过StoreEngine的一种实现DefaultStoreEngine。还有CompactionPolicy的一种实现RatioBasedCompactionPolicy等一系列调用,终于实现为例如以下代码:

      public boolean needsCompaction(final Collection<StoreFile> storeFiles,
          final List<StoreFile> filesCompacting) {
    	 // storeFile的总数减去正在合并的文件的数目
        int numCandidates = storeFiles.size() - filesCompacting.size();
        // 假设这个数目超过配置中合并文件的最小值
        return numCandidates >= comConf.getMinFilesToCompact();
      }
            非常easy。storeFile的总数减去正在合并的文件的数目,假设这个数目超过配置中合并文件的最小值,则视为须要发起合并请求。这个配置中合并文件的最小值,就是通过例如以下代码设置的:

        // 先取新參数hbase.hstore.compaction.min。未配置的话,再去旧參数hbase.hstore.compactionThreshold。
        // 再未配置的话则默觉得3,可是终于不能小于2
        minFilesToCompact = Math.max(2, conf.getInt(HBASE_HSTORE_COMPACTION_MIN_KEY,
              /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
            4、须要合并的话,则调用CompactSplitThread的requestSystemCompaction()方法发起SystemCompaction请求。而假设是Major合并的话。则须要依据配置的major合并优先级majorCompactPriority确定发起合并请求。继而调用CompactSplitThread的requestCompaction()方法发起合并请求。

            那么。怎样认定一个合并为Major合并呢?它的推断须要下面几个条件:

                  4.1、HStore下所有存储文件的Reader必须不为null,也就是所有文件必须处于打开状态,否则直接返回false。

                  4.2、依据合并策略来确定,以RatioBasedCompactionPolicy为例:

                           4.2.1、获取下一次须要Major合并的时间mcTime;

                           4.2.2、假设待合并的所有文件为空,或者下一次须要Major合并的时间为0,直接返回false;

                           4.2.3、获取待合并文件里最小的时间戳lowTimestamp,并获取当前时间now;

                     4.2.4、假设最小时间戳lowTimestamp大于0,且小于当前时间now-减去下一次须要Major合并的时间:

                                       4.2.4.1、获取列簇的TTL。即cfTtl;

                                       4.2.4.2、假设存在多个待合并文件:直接返回true;

                                       4.2.4.3、假设仅仅存在一个待合并文件:则首先获取文件的最小时间戳minTimestamp,然后计算文件存留时间oldest。假设该文件不是元数据相关文件,且假设列簇的TTL为FOREVER。且文件保留时间仍在TTL内,那么我们须要依据数据块的位置索引与參数hbase.hstore.min.locality.to.skip.major.compact大小来推断是否仅仅针对一个文件做compact,此时的这个compact理解为压缩比合并更好点,这部分后面再讲合并策略时再着重描写叙述。

            至此,我们把HRegionServer内部一个合并检查线程的初始化、工作方式及compact检查机制等统统讲完了。那么是否仅仅要有这个定期检查工作线程就能够保证compact及时、正常执行。就能保证HBase的高性能了呢?

            No,No。No,等着HBase源代码分析之compact请求发起时机、推断条件等详情(二)吧!

    O(∩_∩)O哈哈~










  • 相关阅读:
    clearfix
    css浮动
    css常识
    给数组排序方法2
    定时器
    数组
    redhat 7.6 iptables 配置
    redhat 7.6 流量监控命令、软件(3)nethogs 监控进程实时流量
    redhat 7.6 流量监控命令、软件(2) iftop 监控网络IP实时流量
    redhat 7.6 流量监控命令、软件(1) ethstatus
  • 原文地址:https://www.cnblogs.com/wgwyanfs/p/7083241.html
Copyright © 2011-2022 走看看