zoukankan      html  css  js  c++  java
  • HBase MemStoreFlusher

    HBase MemStore Flush由类org.apache.hadoop.hbase.regionserver.MemStoreFlusher实现,具体表现为HRegionServer中的一个实例变量cacheFlusher,类结构如下:

    class MemStoreFlusher extends HasThread implements FlushRequester {
        ......
    }

    MemStoreFlusher实质是一个线程类。

    HasThread可以理解为Thread的一个代码类:

    /**
     * Abstract class which contains a Thread and delegates the common Thread
     * methods to that instance.
     * 
     * The purpose of this class is to workaround Sun JVM bug #6915621, in which
     * something internal to the JDK uses Thread.currentThread() as a monitor lock.
     * This can produce deadlocks like HBASE-4367, HBASE-4101, etc.
     */
    public abstract class HasThread implements Runnable {
    
        private final Thread thread;
    
        public HasThread() {
            this.thread = new Thread(this);
        }
    
        public HasThread(String name) {
            this.thread = new Thread(this, name);
        }
    
        public Thread getThread() {
            return thread;
        }
    
        public abstract void run();
    
        // // Begin delegation to Thread
    
        public final String getName() {
            return thread.getName();
        }
    
        public void interrupt() {
            thread.interrupt();
        }
    
        public final boolean isAlive() {
            return thread.isAlive();
        }
    
        public boolean isInterrupted() {
            return thread.isInterrupted();
        }
    
        public final void setDaemon(boolean on) {
            thread.setDaemon(on);
        }
    
        public final void setName(String name) {
            thread.setName(name);
        }
    
        public final void setPriority(int newPriority) {
            thread.setPriority(newPriority);
        }
    
        public void setUncaughtExceptionHandler(UncaughtExceptionHandler eh) {
            thread.setUncaughtExceptionHandler(eh);
        }
    
        public void start() {
            thread.start();
        }
    
        public final void join() throws InterruptedException {
            thread.join();
        }
    
        public final void join(long millis, int nanos) throws InterruptedException {
            thread.join(millis, nanos);
        }
    
        public final void join(long millis) throws InterruptedException {
            thread.join(millis);
        }
        // // End delegation to Thread
    
    }

    FlushRequester是一个接口,仅包含一个方法:

    /**
     * Request a flush.
     */
    public interface FlushRequester {
    
        /**
         * Tell the listener the cache needs to be flushed.
         * 
         * @param region
         *            the HRegion requesting the cache flush
         */
        void requestFlush(HRegion region);
    
    }

    核心变量

    // These two data members go together. Any entry in the one must have
    // a corresponding entry in the other.
    private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<FlushQueueEntry>();
    
    private final Map<HRegion, FlushRegionEntry> regionsInQueue = new HashMap<HRegion, FlushRegionEntry>();

    flushQueue:DelayQueue队列,元素类型为FlushQueueEntry,代表某一Region的Flush请求,Flusher线程不断地从该队列中获取请求信息,完成Region的Flush操作;

    regionsInQueue:维护HRegion实例与请求FlushRegionEntry之间的对应关系;

    如注释中所说,如果某一个FlushQueueEntry实例存在于flushQueue中,那么它必然存在于regionsInQueue中,后者看似多余,其实不然,例如,验证某一Region是否已经发起过Flush请求。

    private AtomicBoolean wakeupPending = new AtomicBoolean();

    wakeupPending:主要与flushQueue结合使用,flushQueue是一种阻塞队列,当队列为空时,poll操作会将线程阻塞一段时间,某些情况下需要在flushQueue中加入一个“空元素”,以唤醒线程工作,但如果线程本次操作(后面会看到Flusher线程工作实质是一个循环操作)已经被加入“空”元素,则不需要重复加入。

    private final long threadWakeFrequency;

    threadWakeFrequency:用于flushQueue执行poll操作时,最多等待多长时间,配置项为hbase.server.thread.wakefrequency;

    private final HRegionServer server;

    server:当前HRegionServer实例;

    private final ReentrantLock lock = new ReentrantLock();
    
    private final Condition flushOccurred = lock.newCondition();

    lock、flushOccurred:用于同步操作,类似于synchronized、wait、signal、signalAll;

    protected final long globalMemStoreLimit;
    
    protected final long globalMemStoreLimitLowMark;
    
    private static final float DEFAULT_UPPER = 0.4f;
    
    private static final float DEFAULT_LOWER = 0.35f;
    
    private static final String UPPER_KEY = "hbase.regionserver.global.memstore.upperLimit";
    
    private static final String LOWER_KEY = "hbase.regionserver.global.memstore.lowerLimit";

    globalMemStoreLimit、globalMemStoreLimitLowMark:表示HRegionServer整个MemStore的上下限值,当整个MemStore的内存消耗值达到下限值时就会采取相应的措施;

    private long blockingStoreFilesNumber;
    
    private long blockingWaitTime;

    blockingStoreFilesNumber:对某一Region执行Flush操作时,如果该Region中的某一Store中已有的StoreFile数目超过blockingStoreFilesNumber(hbase.hstore.compactionThreshold),则该Region的Flush操作会被最多延迟blockingWaitTime(hbase.hstore.blockingWaitTime)。

    Flush请求

    所有的Region Flush请求会被放到一个DelayedQueue中,因此放入该队列的元素必须实现Delayed接口:

    interface FlushQueueEntry extends Delayed {
    }

    Flush请求会被分为两种类型:“空”请求与实质请求,“空”请求主要用于唤醒线程,实质请求即为Region Flush请求。

    “空”请求:

    /**
     * Token to insert into the flush queue that ensures that the flusher does
     * not sleep
     */
    static class WakeupFlushThread implements FlushQueueEntry {
    
        @Override
        public long getDelay(TimeUnit unit) {
            return 0;
        }
    
        @Override
        public int compareTo(Delayed o) {
            return -1;
        }
    
    }

    “空”请求的作用主要是唤醒,不需要任何实质性的内容,且延迟时间被设为0,表示立即可从队列中获取。

    实质请求:

    /**
     * Datastructure used in the flush queue. Holds region and retry count.
     * Keeps tabs on how old this object is. Implements {@link Delayed}. On
     * construction, the delay is zero. When added to a delay queue, we'll come
     * out near immediately. Call {@link #requeue(long)} passing delay in
     * milliseconds before readding to delay queue if you want it to stay there
     * a while.
     */
    static class FlushRegionEntry implements FlushQueueEntry {
    
        private final HRegion region;
    
        private final long createTime;
    
        private long whenToExpire;
    
        private int requeueCount = 0;
    
        FlushRegionEntry(final HRegion r) {
            this.region = r;
    
            this.createTime = System.currentTimeMillis();
    
            this.whenToExpire = this.createTime;
        }
    
        /**
         * @param maximumWait
         * @return True if we have been delayed > <code>maximumWait</code>
         *         milliseconds.
         */
        public boolean isMaximumWait(final long maximumWait) {
            return (System.currentTimeMillis() - this.createTime) > maximumWait;
        }
    
        /**
         * @return Count of times {@link #resetDelay()} was called; i.e this is
         *         number of times we've been requeued.
         */
        public int getRequeueCount() {
            return this.requeueCount;
        }
    
        /**
         * @param when
         *            When to expire, when to come up out of the queue. Specify
         *            in milliseconds. This method adds
         *            System.currentTimeMillis() to whatever you pass.
         * @return This.
         */
        public FlushRegionEntry requeue(final long when) {
            this.whenToExpire = System.currentTimeMillis() + when;
    
            this.requeueCount++;
    
            return this;
        }
    
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.whenToExpire - System.currentTimeMillis(),
                    TimeUnit.MILLISECONDS);
        }
    
        @Override
        public int compareTo(Delayed other) {
            return Long.valueOf(
                    getDelay(TimeUnit.MILLISECONDS)
                            - other.getDelay(TimeUnit.MILLISECONDS)).intValue();
            }
    
        @Override
        public String toString() {
            return "[flush region "
                    + Bytes.toStringBinary(region.getRegionName()) + "]";
        }
    
    }

    region:表示发起Flush请求的HRegion实例;

    createTime:表示Flush请求的创建时间;

    whenToExpire:表示Flush请求的过期时间;

    requeueCount:表示Flush请求的入队次数,因为有些Flush请求根据情况需要被延迟执行,所以需要重新入队。

    构造函数

    MemStoreFlusher的构造函数主要用于初始化上述这些变量,其中比较重要的是RegionServer整个MemStore内存消耗上下限值的计算:

    long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
                    .getMax();
    
    this.globalMemStoreLimit = globalMemStoreLimit(max, DEFAULT_UPPER,
            UPPER_KEY, conf);
    
    long lower = globalMemStoreLimit(max, DEFAULT_LOWER, LOWER_KEY, conf);
    
    if (lower > this.globalMemStoreLimit) {
        lower = this.globalMemStoreLimit;
    
        LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit "
                + "because supplied " + LOWER_KEY + " was > " + UPPER_KEY);
    }
    
    this.globalMemStoreLimitLowMark = lower;

    方法globalMemStoreLimit的相关代码如下:

    /**
     * Calculate size using passed <code>key</code> for configured percentage of
     * <code>max</code>.
     * 
     * @param max
     * @param defaultLimit
     * @param key
     * @param c
     * @return Limit.
     */
    static long globalMemStoreLimit(final long max, final float defaultLimit,
            final String key, final Configuration c) {
        float limit = c.getFloat(key, defaultLimit);
    
        return getMemStoreLimit(max, limit, defaultLimit);
    }
    
    static long getMemStoreLimit(final long max, final float limit,
            final float defaultLimit) {
        float effectiveLimit = limit;
    
        if (limit >= 0.9f || limit < 0.1f) {
            LOG.warn("Setting global memstore limit to default of "
                    + defaultLimit
                    + " because supplied value outside allowed range of 0.1 -> 0.9");
    
            effectiveLimit = defaultLimit;
        }
    
        return (long) (max * effectiveLimit);
    }

    循环Flush操作

    Flush请求的处理是在一个循环的操作中被处理的:

    @Override
    public void run() {
        while (!this.server.isStopped()) {
            FlushQueueEntry fqe = null;    
    
            try {
                ......    
            } catch (InterruptedException ex) {
                continue;
            } catch (ConcurrentModificationException ex) {
                continue;
            } catch (Exception ex) {
                LOG.error("Cache flusher failed for entry " + fqe, ex);
    
                if (!server.checkFileSystem()) {
                    break;
                }
            }
        }
    
        this.regionsInQueue.clear();
    
        this.flushQueue.clear();
    
        // Signal anyone waiting, so they see the close flag
        lock.lock();
    
        try {
            flushOccurred.signalAll();
        } finally {
            lock.unlock();
        }
    
        LOG.info(getName() + " exiting");
    }
    只要该HRegionServer没有被请求停止,则该操作将一直被执行,不断地从请求队列中获取具体的请求fqe,然后执行Flush操作,具体的操作被包含在一个try、catch块中。如果该HRegionServer已经被请求停止,则会清空相应的数据结构及唤醒其它被阻塞的线程。
    某一Flush操作
    wakeupPending.set(false); // allow someone to wake us up again
    
    fqe = flushQueue.poll(threadWakeFrequency,
            TimeUnit.MILLISECONDS);
    从队列中获取一个Flush请求,如果此时队列为空则本线程会被阻塞直至超时,wakeupPending.set(false)则表示外界在某些条件下可以通过向队列中加入一个“空”请求(WakeupFlushThread)来唤醒被阻塞的线程。
    如果从队列中获取数据的结果fqe为null或者为WakeupFlushThread实例时,则执行以下代码:
    if (fqe == null || fqe instanceof WakeupFlushThread) {
        if (isAboveLowWaterMark()) {
            LOG.debug("Flush thread woke up because memory above low water="
                    + StringUtils
                                            .humanReadableInt(this.globalMemStoreLimitLowMark));
    
            if (!flushOneForGlobalPressure()) {
                // Wasn't able to flush any region, but we're above
                // low water mark
                // This is unlikely to happen, but might happen when
                // closing the
                // entire server - another thread is flushing
                // regions. We'll just
                // sleep a little bit to avoid spinning, and then
                // pretend that
                // we flushed one, so anyone blocked will check
                // again
                lock.lock();
    
                try {
                    Thread.sleep(1000);
    
                    flushOccurred.signalAll();
                } finally {
                    lock.unlock();
                }
            }
    
            // Enqueue another one of these tokens so we'll wake up
            // again
            wakeupFlushThread();
        }
    
        continue;
    }

    此时并没有获取到实质的Flush请求,主要判断当前RegionServer整个MemStore的内存消耗是否已达到下限临界值,如果已达到下限临界值,则为了缓解内存压力,需要选取某一个Region进行Flush操作。

    判断内存消耗由方法isAboveHighWaterMark完成:

    /**
     * Return true if we're above the high watermark
     */
    private boolean isAboveLowWaterMark() {
        return server.getRegionServerAccounting().getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
    }

    如果isAboveLowWaterMark返回值为true,则表示此时RegionServer的整个MemStore内存消耗已达到下限临界值,需要选取一个Region进行Flush以缓解内存压力,由方法flushOneForGlobalPressure完成:

    /**
     * The memstore across all regions has exceeded the low water mark. Pick one
     * region to flush and flush it synchronously (this is called from the flush
     * thread)
     * 
     * @return true if successful
     */
    private boolean flushOneForGlobalPressure() {
        SortedMap<Long, HRegion> regionsBySize = server
                .getCopyOfOnlineRegionsSortedBySize();
    
        Set<HRegion> excludedRegions = new HashSet<HRegion>();
    
        boolean flushedOne = false;
    
        while (!flushedOne) {
            ......    
        }
    
        return true;
    }

    上述代码的主体思想是不断循环操作,直接成功选取某一Region完成Flush操作为止,在循环操作开始之前,已经依据Region大小获取到了该RegionServer上的所有Region:regionsBySize(SortedMap实现,依据Region大小作为排序依据,顺序为从大到小),如果选取的Region在执行Flush操作时发生了某些异常,导致Flush失败,则将其保存至excludedRegions,以使在下次选取过程中能够将其排除。

    循环中的操作流程如下:

    // Find the biggest region that doesn't have too many storefiles
    // (might be null!)
    HRegion bestFlushableRegion = getBiggestMemstoreRegion(
            regionsBySize, excludedRegions, true);

    选取当前状态下最适合进行Flush操作的Region,该Region需要满足两个条件:

    (1)Region没有包含超过一定数量的StoreFile;

    (2)在满足(1)的所有Region中大小为最大值。

    具体执行时代码如下:

    private HRegion getBiggestMemstoreRegion(
            SortedMap<Long, HRegion> regionsBySize,
            Set<HRegion> excludedRegions, boolean checkStoreFileCount) {
        synchronized (regionsInQueue) {
            for (HRegion region : regionsBySize.values()) {
                //如果Region出现在excludedRegions中,则表示该Region是unflushable的。
                if (excludedRegions.contains(region)) {
                    continue;
                }
    
                if (checkStoreFileCount && isTooManyStoreFiles(region)) {
                    continue;
                }
                    
                return region;
            }
        }
            
        return null;
    }
    
    private boolean isTooManyStoreFiles(HRegion region) {
        for (Store hstore : region.stores.values()) {
            if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
                return true;
            }
        }
    
        return false;
    }

    因为regionsBySize中的Region就是根据Region大小从大到小排列的,只要依次处理其中的Region即可,如果该Region即没有出现在excludedRegions,也没有包含过多的StoreFile(checkStoreFileCount为true),即该Region就是bestFlushableRegion。

    为了防止bestFlushableRegion为null(如果目前所有的Region包含的StoreFile数目都大于临界值blockingStoreFilesNumber),我们需要选取一个目前最大的Region作为备选,即时它拥有的StoreFile数目大于临界值blockingStoreFilesNumber。

    // Find the biggest region, total, even if it might have too many
    // flushes.
    HRegion bestAnyRegion = getBiggestMemstoreRegion(regionsBySize,
            excludedRegions, false);
    
    if (bestAnyRegion == null) {
        LOG.error("Above memory mark but there are no flushable regions!");
    
        return false;
    }

    执行getBiggestMemstoreRegion方法时,checkStoreFileCount为false,表示这些选取不考虑Region包含StoreFile的数目。

    如果我们无法获取一个bestAnyRegion(bestAnyRegion为null),表示目前虽然内存压力较大,但是我们无法选取出一个可进行Flush操作的Region,直接返回false即可。

    无法选取出一个可进行Flush操作的Region的原因一般有两个:

    (1)在循环选取的过程中,我们发现所有的Region进行Flush操作时都失败了(可能原因是HDFS失效),它们都会出现在excludedRegions中,因此,会导致上述方法执行时返回值为null;

    (2)RegionServer开始执行关闭操作。

    HRegion regionToFlush;
    
    if (bestFlushableRegion != null
            && bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize
                    .get()) {
        // Even if it's not supposed to be flushed, pick a region if
        // it's more than twice
        // as big as the best flushable one - otherwise when we're under
        // pressure we make
        // lots of little flushes and cause lots of compactions, etc,
        // which just makes
        // life worse!
        if (LOG.isDebugEnabled()) {
            LOG.debug("Under global heap pressure: "
                    + "Region "
                    + bestAnyRegion.getRegionNameAsString()
                    + " has too many "
                    + "store files, but is "
                    + StringUtils
                            .humanReadableInt(bestAnyRegion.memstoreSize
                                    .get())
                    + " vs best flushable region's "
                    + StringUtils
                            .humanReadableInt(bestFlushableRegion.memstoreSize
                                    .get()) + ". Choosing the bigger.");
        }
    
        regionToFlush = bestAnyRegion;
    } else {
        if (bestFlushableRegion == null) {
            regionToFlush = bestAnyRegion;
        } else {
            regionToFlush = bestFlushableRegion;
        }
    }

    根据bestFlushableRegion和bestAnyRegion的选取结果,决定最后的选取结果regionToFlush:

    (1)虽然bestFlushableRegion不为null,但bestAnyRegion的MemStore大小比bestFlushableRegion的MemStore大小两倍还要在,此时regionToFlush = bestAnyRegion;

    (2)否则,如果bestFlushableRegion为null,则regionToFlush = bestAnyRegion,否则regionToFlush = bestFlushableRegion。

    至此,我们已经选取出了需要进行Flush操作的Region:regionToFlush,接下来对其进行Flush即可:

    Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
    
    LOG.info("Flush of region " + regionToFlush
            + " due to global heap pressure");
    
    flushedOne = flushRegion(regionToFlush, true);
    
    if (!flushedOne) {
        LOG.info("Excluding unflushable region " + regionToFlush
                + " - trying to find a different region to flush.");
    
        excludedRegions.add(regionToFlush);
    }

    如果该Region的Flush操作失败,即flushRegion的返回值为false,将其添加至excludedRegions中,并继续循环选取。

    如果flushOneForGlobalPressure的返回值为false,则表示我们无法选取一个Region进行Flush,如注释所说,造成这种情况可能原因是RegionServer正处于关闭中,此时,会有其它线程来负责Region的Flush操作。我们仅仅需要休眠一会儿,假装我们完成了一个Region的Flush,然后就可以唤醒其它因内存压力而阻塞的线程了,使它们可以再次对内存消耗大小进行确认(后面会讲述为何有线程被阻塞)。

    如果从队列中获取数据的结果fqe为FlushRegionEntry实例,则会直接执行以下代码:

    FlushRegionEntry fre = (FlushRegionEntry) fqe;
    
    if (!flushRegion(fre)) {
        break;
    }

    直接执行相应Region的Flush操作,如果发生错误(认为不可修复),则结束MemStoreFlusher线程的循环操作,执行清理工作。

    MemStore与Put

    在我们将大批量的数据定入HBase时,可能会由于内存的原因导致写入操作的Block,主要有以下两个方面的原因:

    (1)reclaimMemStoreMemory

    该方法是MemStoreFlusher的实例方法,在执行具体的Region batchMutate操作(完成写入操作)之前被调用,

    HRegion region = getRegion(regionName);
    
    if (!region.getRegionInfo().isMetaTable()) {
        /*
         * This method blocks callers until we're down to a safe
         * amount of memstore consumption.
         * 
         * ******************************************************
         */
        this.cacheFlusher.reclaimMemStoreMemory();
    }

    可见,一般地用户表都会在实际写入数据之前都会调用此方法,该方法可能会导致写入的阻塞。

    reclaimMemStoreMemory分两种情况进行处理:isAboveHighWaterMark、isAboveLowWaterMark。

    isAboveHighWaterMark:RegionServer整个MemStore的内存消耗值超过上限值

    if (isAboveHighWaterMark()) {
        lock.lock();
    
        try {
            boolean blocked = false;
    
            long startTime = 0;
    
            while (isAboveHighWaterMark() && !server.isStopped()) {
                if (!blocked) {
                    startTime = EnvironmentEdgeManager.currentTimeMillis();
    
                    LOG.info("Blocking updates on "
                            + server.toString()
                            + ": the global memstore size "
                            + StringUtils.humanReadableInt(server
                                            .getRegionServerAccounting()
                                            .getGlobalMemstoreSize())
                            + " is >= than blocking "
                            + StringUtils
                                            .humanReadableInt(globalMemStoreLimit)
                            + " size");
                }
    
                blocked = true;
    
                wakeupFlushThread();
    
                try {
                    // we should be able to wait forever, but we've seen a
                    // bug where
                    // we miss a notify, so put a 5 second bound on it at
                    // least.
                    flushOccurred.await(5, TimeUnit.SECONDS);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
    
            if (blocked) {
                final long totalTime = EnvironmentEdgeManager
                        .currentTimeMillis() - startTime;
    
                if (totalTime > 0) {
                    this.updatesBlockedMsHighWater.add(totalTime);
                }
    
                LOG.info("Unblocking updates for server "
                        + server.toString());
            }
        } finally {
            lock.unlock();
        }
    }

    当写入数据之前,如果我们发现当内存的消耗已经超过上限值时,会有一个循环等待的过程,直到内存的消耗值低于上限值为止,在每次等待操作之前都会通过wakeupFlushThread方法在Flush请求队列放入一个空元素,以激活MemStoreFlusher线程进行工作(可能会选取某一Region进行Flush),其中,上限值的判断如下所示:

    /**
     * Return true if global memory usage is above the high watermark
     */
    private boolean isAboveHighWaterMark() {
        return server.getRegionServerAccounting().getGlobalMemstoreSize() >= globalMemStoreLimit;
    }

    isAboveLowWaterMark:RegionServer的整个MemStore的内存消耗值仅超过下限值

    else if (isAboveLowWaterMark()) {
        wakeupFlushThread();
    }

    此时,不需要阻塞写入操作,仅仅需要在Flush请求队列中加入一个“空”元素,促使MemStoreFlusher工作即可。

    (2)checkResources

    /**
     * Perform a batch of mutations. It supports only Put and Delete mutations
     * and will ignore other types passed.
     * 
     * @param mutationsAndLocks
     *            the list of mutations paired with their requested lock IDs.
     * @return an array of OperationStatus which internally contains the
     *         OperationStatusCode and the exceptionMessage if any.
     * @throws IOException
     */
    public OperationStatus[] batchMutate(
            Pair<Mutation, Integer>[] mutationsAndLocks) throws IOException {
        BatchOperationInProgress<Pair<Mutation, Integer>> batchOp = new BatchOperationInProgress<Pair<Mutation, Integer>>(
                mutationsAndLocks);
    
        boolean initialized = false;
    
        while (!batchOp.isDone()) {
            checkReadOnly();
            
            // Check if resources to support an update, may be blocked.
            checkResources();
             ......
                
        }
    
        return batchOp.retCodeDetails;
    }

    在Region batchMutate中,每次循环写入数据之前都会进行checkResources的操作,该操作可能会导致本次地写入操作被阻塞。

    /*
     * Check if resources to support an update.
     * 
     * Here we synchronize on HRegion, a broad scoped lock. Its appropriate
     * given we're figuring in here whether this region is able to take on
     * writes. This is only method with a synchronize (at time of writing), this
     * and the synchronize on 'this' inside in internalFlushCache to send the
     * notify.
     */
    private void checkResources() throws RegionTooBusyException,
            InterruptedIOException {
        // If catalog region, do not impose resource constraints or block
        // updates.
        if (this.getRegionInfo().isMetaRegion()) {
            return;
        }
    
        boolean blocked = false;
    
        long startTime = 0;
    
        while (this.memstoreSize.get() > this.blockingMemStoreSize) {
            requestFlush();
    
            if (!blocked) {
                startTime = EnvironmentEdgeManager.currentTimeMillis();
    
                LOG.info("Blocking updates for '"
                        + Thread.currentThread().getName()
                        + "' on region "
                        + Bytes.toStringBinary(getRegionName())
                        + ": memstore size "
                        + StringUtils.humanReadableInt(this.memstoreSize.get())
                        + " is >= than blocking "
                        + StringUtils
                                    .humanReadableInt(this.blockingMemStoreSize)
                        + " size");
            }
    
            long now = EnvironmentEdgeManager.currentTimeMillis();
    
            long timeToWait = startTime + busyWaitDuration - now;
    
            if (timeToWait <= 0L) {
                final long totalTime = now - startTime;
    
                this.updatesBlockedMs.add(totalTime);
    
                LOG.info("Failed to unblock updates for region " + this + " '"
                        + Thread.currentThread().getName() + "' in "
                        + totalTime + "ms. The region is still busy.");
    
                throw new RegionTooBusyException("region is flushing");
            }
    
            blocked = true;
    
            synchronized (this) {
                try {
                    wait(Math.min(timeToWait, threadWakeFrequency));
                } catch (InterruptedException ie) {
                    final long totalTime = EnvironmentEdgeManager
                            .currentTimeMillis() - startTime;
    
                    if (totalTime > 0) {
                        this.updatesBlockedMs.add(totalTime);
                    }
    
                    LOG.info("Interrupted while waiting to unblock updates for region "
                            + this
                            + " '"
                            + Thread.currentThread().getName()
                            + "'");
    
                    InterruptedIOException iie = new InterruptedIOException();
    
                    iie.initCause(ie);
    
                    throw iie;
                }
            }
        }
    
        if (blocked) {
            // Add in the blocked time if appropriate
            final long totalTime = EnvironmentEdgeManager.currentTimeMillis()
                    - startTime;
    
            if (totalTime > 0) {
                this.updatesBlockedMs.add(totalTime);
            }
    
            LOG.info("Unblocking updates for region " + this + " '"
                    + Thread.currentThread().getName() + "'");
        }
    }

    由上述代码可知,阻塞条件为

    this.memstoreSize.get() > this.blockingMemStoreSize

    如果上述条件成立,本次写入操作会被阻塞直到该Region MemStore的内存消耗值低于要求值为止。

    其中,memstoreSize表示即将被写入数据的Region的MemStore的当前大小,blockingMemStoreSize由下述代码计算而来:

    long flushSize = this.htableDescriptor.getMemStoreFlushSize();
    
    if (flushSize <= 0) {
        flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
                HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
    }
    
    this.memstoreFlushSize = flushSize;
    
    this.blockingMemStoreSize = this.memstoreFlushSize
            * conf.getLong("hbase.hregion.memstore.block.multiplier", 2);

    可以看出,blockingMemStoreSize为memstoreFlushSize的整数倍。

    MemStoreFlusher flushRegion

    当MemStoreFlusher线程在Flush队列中取出要进行Flush操作的请求元素(FlushRegionEntry)时,都是通过下面的方法来完成Flush的。

    /*
     * A flushRegion that checks store file count. If too many, puts the flush
     * on delay queue to retry later.
     * 
     * @param fqe
     * 
     * @return true if the region was successfully flushed, false otherwise. If
     * false, there will be accompanying log messages explaining why the log was
     * not flushed.
     */
    private boolean flushRegion(final FlushRegionEntry fqe) {
        HRegion region = fqe.region;
    
        if (!fqe.region.getRegionInfo().isMetaRegion()
                && isTooManyStoreFiles(region)) {
            if (fqe.isMaximumWait(this.blockingWaitTime)) {
                LOG.info("Waited "
                        + (System.currentTimeMillis() - fqe.createTime)
                        + "ms on a compaction to clean up 'too many store files'; waited "
                        + "long enough... proceeding with flush of "
                        + region.getRegionNameAsString());
            } else {
                // If this is first time we've been put off, then emit a log
                // message.
                if (fqe.getRequeueCount() <= 0) {
                    // Note: We don't impose blockingStoreFiles constraint on
                    // meta regions
                    LOG.warn("Region " + region.getRegionNameAsString()
                            + " has too many "
                            + "store files; delaying flush up to "
                            + this.blockingWaitTime + "ms");
    
                    if (!this.server.compactSplitThread.requestSplit(region)) {
                        try {
                            this.server.compactSplitThread.requestCompaction(
                                    region, getName());
                        } catch (IOException e) {
                                LOG.error(
                                    "Cache flush failed"
                                            + (region != null ? (" for region " + Bytes
                                                        .toStringBinary(region
                                                                .getRegionName()))
                                                        : ""),
                                        RemoteExceptionHandler.checkIOException(e));
                            }
                        }
                    }
    
                // Put back on the queue. Have it come back out of the queue
                // after a delay of this.blockingWaitTime / 100 ms.
                this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
    
                // Tell a lie, it's not flushed but it's ok
                return true;
            }
        }
    
        return flushRegion(region, false);
    }

    上述代码根据具体情况,可能会在执行具体的flushRegion操作之前,采取一些特殊的动作。

    如果当前Region所属的表是用户表,且该Region中包含过多的StoreFile,则会下述判断:

    (1)该Flush请求已达到最大等待时间,认为此时必须进行处理,仅仅打印一些信息即可(因此请求队列的实现为一个DealyedQueue,每一个队列元素都会根据自己的“过期时间”进行排序);

    (2)该Flush请求尚未达到最大等待时间,认为因为该Region已经包含过多的StoreFile,应该延迟本次的Flush请求,而且在延迟操作之前,如果是第一次被延迟,则会根据情况判断是否发起Split或Compact请求;

  • 相关阅读:
    C++的XML编程经验――LIBXML2库使用指南
    C/C++:sizeof('a')的值为什么不一样?
    Linux core dump file详解
    非阻塞socket的连接
    Java环境设置、HelloWorld例子、Ant环境及运行
    linux下杀死进程命令
    IP协议详解
    内置函数(上)
    异常处理
    递归函数与二分法
  • 原文地址:https://www.cnblogs.com/yurunmiao/p/3520077.html
Copyright © 2011-2022 走看看