zoukankan      html  css  js  c++  java
  • Hbase flusher源码解析(flush全代码流程解析)

    版权声明:本文为博主原创文章,遵循版权协议,转载请附上原文出处链接和本声明。

    在介绍HBASE flush源码之前,我们先在逻辑上大体梳理一下,便于后续看代码。flush的整体流程分三个阶段

      1.第一阶段:prepare阶段,这个阶段主要是将当前memstore的内存结构做snapshot。HBASE写入内存的数据结构(memstore以及snapshot)是跳跃表,用的是jdk自带的ConcurrentSkipListMap结构。这个过程其实就是将memstore赋值给snapshot,并构造一个新的memstore。

      2.第二阶段:flushcache阶段,这个阶段主要是将第一阶段生成的snapshot flush到disk,但是注意这里是将其flush到temp文件,此时并没有将生成的hfile move到store实际对应的cf路径下,move是发生在第三阶段。

      3.第三阶段:commit阶段。这个阶段主要是将第二阶段生成的hfile move最终正确的位置。

    上面是HBASE flush的逻辑流程,flush是region级别,涉及到的类很多,下面我们开始介绍一下Flush相关的操作

    flush线程启动

    • 在regionserver启动时,会调用startServiceThread方法启动一些服务线程,其中
    // Cache flushing
    protected MemStoreFlusher cacheFlusher;
    。。。。。省略。。。。。。
    private void startServiceThreads() throws IOException { 。。。。其他代码省略。。。 this.cacheFlusher.start(uncaughtExceptionHandler); }
    •  而cacheFlusher是MemStoreFlusher类的实例,在梳理上述逻辑之前首先介绍两个MemStoreFlusher的变量
    •  //该变量是一个BlockingQueue<FlushQueueEntry>类型的变量。
        // 主要存储了FlushRegionEntry类型刷新请求实例,以及一个唤醒队列WakeupFlushThread实例对象。
        private final BlockingQueue<FlushQueueEntry> flushQueue =
          new DelayQueue<FlushQueueEntry>();
        //同时也会把加入到flushqueue中的requst加入到regionsInQueue中。
        private final Map<HRegion, FlushRegionEntry> regionsInQueue =
          new HashMap<HRegion, FlushRegionEntry>();
    • MemStoreFlusher的start方法如下:
     synchronized void start(UncaughtExceptionHandler eh) {
        ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
            server.getServerName().toShortString() + "-MemStoreFlusher", eh);
        for (int i = 0; i < flushHandlers.length; i++) {
          flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
          flusherThreadFactory.newThread(flushHandlers[i]);
          flushHandlers[i].start();
        }
      }
    

      会根据配置flusher.handler.count生成相应个数的flushHandler线程。然后对每一个flushHandler线程调用start方法。我们继续看一下flushHandler。

    private class FlushHandler extends HasThread {
    private FlushHandler(String name) {
    super(name);
    }
    @Override
    public void run() {
    //如果server正常没有stop
    while (!server.isStopped()) {
    FlushQueueEntry fqe = null;
    try {
    wakeupPending.set(false); // allow someone to wake us up again
    //阻塞队列的poll方法,如果没有会阻塞在这
    fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
    if (fqe == null || fqe instanceof WakeupFlushThread) {
    // 如果没有flush request或者flush request是一个全局flush的request。
    if (isAboveLowWaterMark()) {
    // 检查所有的memstore是否超过max_heap * hbase.regionserver.global.memstore.lowerLimit配置的值,默认0.35
    // 超过配置的最小memstore的值,flush最大的一个memstore的region
    LOG.debug("Flush thread woke up because memory above low water="
    + TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1));

    if (!flushOneForGlobalPressure()) {
    // 如果没有任何Region需要flush,但已经超过了lowerLimit。
    // 这种情况不太可能发生,除非可能会在关闭整个服务器时发生,即有另一个线程正在执行flush regions。
    // 只里只需要sleep一下,然后唤醒任何被阻塞的线程再次检查。
    // Wasn't able to flush any region, but we're above low water mark
    // This is unlikely to happen, but might happen when closing the
    // entire server - another thread is flushing regions. We'll just
    // sleep a little bit to avoid spinning, and then pretend that
    // we flushed one, so anyone blocked will check again
    Thread.sleep(1000);
    wakeUpIfBlocking();
    }
    // Enqueue another one of these tokens so we'll wake up again
    wakeupFlushThread();
    }
    //阻塞超时后也会继续continue
    continue;
    }
    // 如果是正常的flush request
    // 单个region memstore大小超过hbase.hregion.memstore.flush.size配置的值,默认128M,执行flush操作
    FlushRegionEntry fre = (FlushRegionEntry) fqe;
    if (!flushRegion(fre)) {
    break;
    }
    } catch (InterruptedException ex) {
    continue;
    } catch (ConcurrentModificationException ex) {
    continue;
    } catch (Exception ex) {
    LOG.error("Cache flusher failed for entry " + fqe, ex);
    if (!server.checkFileSystem()) {
    break;
    }
    }
    }
    //结束MemStoreFlusher的线程调用,通常是regionserver stop,这个是在while循环之外的
    synchronized (regionsInQueue) {
    regionsInQueue.clear();
    flushQueue.clear();
    }

    // Signal anyone waiting, so they see the close flag
    wakeUpIfBlocking();
    LOG.info(getName() + " exiting");
    }

      现在我们看是看梳理一下FlusherHandler的run方法的逻辑

    1. 只要rs不挂,就一直循环判断有没有flushrequest
    2. 通过flushqueue.poll来阻塞,应该flushqueue是阻塞队列,当队列为空时会阻塞,直到超时。
    3. 如果不为空,取出一个request,调用MemStoreFlusher.flushRegion(fre)

    Flush流程
      可见是调用的MemStoreFlusher.flushRegion方法进行flush的,我们继续跟进flushRegion一探究竟。
    private boolean flushRegion(final FlushRegionEntry fqe) {
        //在FlushQueueEntry中取出region信息
        HRegion region = fqe.region;
        //如果region不是metaregion并且含有太多的storefile,则随机blcoking.
        //tooManyStoreFiles默认的阈值时7,同时也要看hbase.hstore.blockingStoreFiles配置的值,没有配置取默认值7
        if (!region.getRegionInfo().isMetaRegion() &&
            isTooManyStoreFiles(region)) {
    
          //判断是否已经wait了设置的时间
          if (fqe.isMaximumWait(this.blockingWaitTime)) {
            LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
              "ms on a compaction to clean up 'too many store files'; waited " +
              "long enough... proceeding with flush of " +
              region.getRegionNameAsString());
          } else {
            // If this is first time we've been put off, then emit a log message.
            //如果当前flush是第一次加入到flush queue
            if (fqe.getRequeueCount() <= 0) {
              // Note: We don't impose blockingStoreFiles constraint on meta regions
              LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
                "store files; delaying flush up to " + this.blockingWaitTime + "ms");
              //flush前判断该region是否需要split,如果不需要split,同时因为又太多的storefiles,因此调用过一次compact
              if (!this.server.compactSplitThread.requestSplit(region)) {
                try {
                  this.server.compactSplitThread.requestSystemCompaction(
                      region, Thread.currentThread().getName());
                } catch (IOException e) {
                  LOG.error(
                    "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
                    RemoteExceptionHandler.checkIOException(e));
                }
              }
            }
    
            // Put back on the queue.  Have it come back out of the queue
            // after a delay of this.blockingWaitTime / 100 ms.
            //如果有too manyfile的region已经超过了随机延迟的时间,加入flushqueue队列,唤醒handler开始flush
            this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
            // Tell a lie, it's not flushed but it's ok
            return true;
          }
        }
        //正常情况下的flush
        return flushRegion(region, false, fqe.isForceFlushAllStores());
      }
    

      该方法中会判断要flush的region是否有过多的hfile,如果是则随机wait一定的时间。wait完成后加入flushqueue唤醒handler开始flush。在正常的情况下最终是调用MemStoreFlusher的重载函数flushRgion(region,flase, isForceFlushAllStores),那我们继续跟进该重载函数。

    private boolean flushRegion(final HRegion region, final boolean emergencyFlush,
          boolean forceFlushAllStores) {
        long startTime = 0;
        //枷锁
        synchronized (this.regionsInQueue) {
          //在regioninQueue中移除该region
          FlushRegionEntry fqe = this.regionsInQueue.remove(region);
          // Use the start time of the FlushRegionEntry if available
          if (fqe != null) {
            startTime = fqe.createTime;
          }
          if (fqe != null && emergencyFlush) {
            // Need to remove from region from delay queue.  When NOT an
            // emergencyFlush, then item was removed via a flushQueue.poll.
            flushQueue.remove(fqe);
         }
        }
        if (startTime == 0) {
          // Avoid getting the system time unless we don't have a FlushRegionEntry;
          // shame we can't capture the time also spent in the above synchronized
          // block
          startTime = EnvironmentEdgeManager.currentTime();
        }
        lock.readLock().lock();
        try {
          notifyFlushRequest(region, emergencyFlush);
          //最终是调用region的flushcache
          HRegion.FlushResult flushResult = region.flushcache(forceFlushAllStores);
          boolean shouldCompact = flushResult.isCompactionNeeded();
          // We just want to check the size
          boolean shouldSplit = region.checkSplit() != null;
          if (shouldSplit) {
            this.server.compactSplitThread.requestSplit(region);
          } else if (shouldCompact) {
            server.compactSplitThread.requestSystemCompaction(
                region, Thread.currentThread().getName());
          }
          if (flushResult.isFlushSucceeded()) {
            long endTime = EnvironmentEdgeManager.currentTime();
            server.metricsRegionServer.updateFlushTime(endTime - startTime);
          }
        } catch (DroppedSnapshotException ex) {
          // Cache flush can fail in a few places. If it fails in a critical
          // section, we get a DroppedSnapshotException and a replay of wal
          // is required. Currently the only way to do this is a restart of
          // the server. Abort because hdfs is probably bad (HBASE-644 is a case
          // where hdfs was bad but passed the hdfs check).
          server.abort("Replay of WAL required. Forcing server shutdown", ex);
          return false;
        } catch (IOException ex) {
          LOG.error("Cache flush failed" +
            (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName())) : ""),
            RemoteExceptionHandler.checkIOException(ex));
          if (!server.checkFileSystem()) {
            return false;
          }
        } finally {
          lock.readLock().unlock();
          wakeUpIfBlocking();
        }
        return true;
      }
    

      其他无关的代码这里不再细说,之间看标红的位置,核心逻辑在这里,可以看到是调用的region.flushcache(isForceFlushAllStores),因此flush是region级别。同时在flush完成后会判断是否需要进行split,如果不需要split会将判断是否需要compact。继续跟进看下里面做了啥。

    //flush cache,参数意义为是否需要flush所有的store
        public FlushResult flushcache(boolean forceFlushAllStores) throws IOException {
            // fail-fast instead of waiting on the lock
            //判断当前region是否处于closing状态,
            if (this.closing.get()) {
                String msg = "Skipping flush on " + this + " because closing";
                LOG.debug(msg);
                return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
            }
            MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
            status.setStatus("Acquiring readlock on region");
            // block waiting for the lock for flushing cache
            //此处加了锁
            lock.readLock().lock();
            try {
                if (this.closed.get()) {
                    String msg = "Skipping flush on " + this + " because closed";
                    LOG.debug(msg);
                    status.abort(msg);
                    return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
                }
                if (coprocessorHost != null) {
                    status.setStatus("Running coprocessor pre-flush hooks");
                    coprocessorHost.preFlush();
                }
                // TODO: this should be managed within memstore with the snapshot, updated only after flush
                // successful
                if (numMutationsWithoutWAL.get() > 0) {
                    numMutationsWithoutWAL.set(0);
                    dataInMemoryWithoutWAL.set(0);
                }
                synchronized (writestate) {
                    //此次flush之前 该region并没有在flush,是否还处于write状态
                    if (!writestate.flushing && writestate.writesEnabled) {
                        this.writestate.flushing = true;
                    } else {//否则表示该region正处于flushing状态或者不可写,abort flush
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("NOT flushing memstore for region " + this
                                    + ", flushing=" + writestate.flushing + ", writesEnabled="
                                    + writestate.writesEnabled);
                        }
                        String msg = "Not flushing since "
                                + (writestate.flushing ? "already flushing"
                                : "writes not enabled");
                        status.abort(msg);
                        return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
                    }
                }
    
                try {
                    //根据参数forceFlushAllStores判断是否需要所有的store都进行flush,否侧按照flush策略进行选择
                    //非全局flush的选择策略:flushSizeLowerBound是参数hbase.hregion.percolumnfamilyflush.size.lower.bound,默认16M或者不满足大小,
    //但是该memstore足够老 Collection<Store> specificStoresToFlush = forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush(); //调用internalFlushcache进行flush FlushResult fs = internalFlushcache(specificStoresToFlush, status); if (coprocessorHost != null) { status.setStatus("Running post-flush coprocessor hooks"); coprocessorHost.postFlush(); } status.markComplete("Flush successful"); return fs; } finally { synchronized (writestate) { writestate.flushing = false; this.writestate.flushRequested = false; writestate.notifyAll(); } } } finally { lock.readLock().unlock(); status.cleanup(); } }

      核心逻辑在FlushResult fs = internalFlushcache(specificStoresToFlush, status);里面涉及到了具体的三个阶段,其中prepare的第一阶段是调用了region.internalPrepareFlushCache()实现的,第二阶段flush以及第三阶段commit阶段,是通过internalFlushAndCommit()进行的。我们现在看下具体的internalFlushCache方法的逻辑:

    protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
                                                 final Collection<Store> storesToFlush, MonitoredTask status) throws IOException {
            //internalPrepareFlushCache执行snapshot,打快照
            PrepareFlushResult result
                    = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, false);
            //返回的result中的result是null.因此会执行internalFlushchacheAndCommit方法执行第二和第三阶段。
            if (result.result == null) {
                return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
            } else {
                return result.result; // early exit due to failure from prepare stage
            }
        }
    

      现在我们看一下第一阶段: internalPrepareFlushCache。里面有一把region级别的updatelock。,这个里面代码比较多,可以先忽略不重要的部分

     //该方法用来执行flush的prepare阶段
        protected PrepareFlushResult internalPrepareFlushCache(
                final WAL wal, final long myseqid, final Collection<Store> storesToFlush,
                MonitoredTask status, boolean isReplay)
                throws IOException {
    
            if (this.rsServices != null && this.rsServices.isAborted()) {
                // Don't flush when server aborting, it's unsafe
                throw new IOException("Aborting flush because server is aborted...");
            }
            //便于计算flush耗时,记录开始时间
            final long startTime = EnvironmentEdgeManager.currentTime();
            // If nothing to flush, return, but we need to safely update the region sequence id
            //如果当前memstroe为空,不执行flush,但是要更新squenid
            if (this.memstoreSize.get() <= 0) {
                // Take an update lock because am about to change the sequence id and we want the sequence id
                // to be at the border of the empty memstore.
                MultiVersionConsistencyControl.WriteEntry w = null;
                this.updatesLock.writeLock().lock();
                try {
                    if (this.memstoreSize.get() <= 0) {
                        // Presume that if there are still no edits in the memstore, then there are no edits for
                        // this region out in the WAL subsystem so no need to do any trickery clearing out
                        // edits in the WAL system. Up the sequence number so the resulting flush id is for
                        // sure just beyond the last appended region edit (useful as a marker when bulk loading,
                        // etc.)
                        // wal can be null replaying edits.
                        if (wal != null) {
                            w = mvcc.beginMemstoreInsert();
                            long flushSeqId = getNextSequenceId(wal);
                            FlushResult flushResult = new FlushResult(
                                    FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to flush");
                            w.setWriteNumber(flushSeqId);
                            mvcc.waitForPreviousTransactionsComplete(w);
                            w = null;
                            return new PrepareFlushResult(flushResult, myseqid);
                        } else {
                            return new PrepareFlushResult(
                                    new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"),
                                    myseqid);
                        }
                    }
                } finally {
                    this.updatesLock.writeLock().unlock();
                    if (w != null) {
                        mvcc.advanceMemstore(w);
                    }
                }
            }
    
            if (LOG.isInfoEnabled()) {
                LOG.info("Started memstore flush for " + this + ", current region memstore size "
                        + StringUtils.byteDesc(this.memstoreSize.get()) + ", and " + storesToFlush.size() + "/"
                        + stores.size() + " column families' memstores are being flushed."
                        + ((wal != null) ? "" : "; wal is null, using passed sequenceid=" + myseqid));
                // only log when we are not flushing all stores.
                //当不是flush所有的store时,打印log
                if (this.stores.size() > storesToFlush.size()) {
                    for (Store store : storesToFlush) {
                        LOG.info("Flushing Column Family: " + store.getColumnFamilyName()
                                + " which was occupying "
                                + StringUtils.byteDesc(store.getMemStoreSize()) + " of memstore.");
                    }
                }
            }
            // Stop updates while we snapshot the memstore of all of these regions' stores. We only have
            // to do this for a moment.  It is quick. We also set the memstore size to zero here before we
            // allow updates again so its value will represent the size of the updates received
            // during flush
            //停止写入,直到memstore的snapshot完成。
            MultiVersionConsistencyControl.WriteEntry w = null;
            // We have to take an update lock during snapshot, or else a write could end up in both snapshot
            // and memstore (makes it difficult to do atomic rows then)
            status.setStatus("Obtaining lock to block concurrent updates");
            // block waiting for the lock for internal flush
            //获取update的写锁
            this.updatesLock.writeLock().lock();
            status.setStatus("Preparing to flush by snapshotting stores in " +
                    getRegionInfo().getEncodedName());
            //用于统计flush的所有的store的memtore内存大小之和
            long totalFlushableSizeOfFlushableStores = 0;
            //记录所有flush的store的cfname
            Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
            for (Store store : storesToFlush) {
                flushedFamilyNames.add(store.getFamily().getName());
            }
            //storeFlushCtxs,committedFiles,storeFlushableSize,比较重要的是storeFlushCtxs和committedFiles。他们都被定义为以CF做key的TreeMap,
            // 分别代表了store的CF实际执行(StoreFlusherImpl)和最终刷写的HFlile文件。
            //其中storeFlushContext的实现类StoreFlusherImpl里包含了flush相关的核心操作:prepare,flushcache,commit,abort等。
    //所以这里保存的是每一个store的flush实例,后面就是通过这里的StoreFlushContext进行flush的 TreeMap<byte[], StoreFlushContext> storeFlushCtxs = new TreeMap<byte[], StoreFlushContext>(Bytes.BYTES_COMPARATOR);
    //用来存储每个store和它对应的hdfs commit路径的映射 TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>( Bytes.BYTES_COMPARATOR); // The sequence id of this flush operation which is used to log FlushMarker and pass to // createFlushContext to use as the store file's sequence id. long flushOpSeqId = HConstants.NO_SEQNUM; long flushedSeqId = HConstants.NO_SEQNUM; // The max flushed sequence id after this flush operation. Used as completeSequenceId which is // passed to HMaster. byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes(); long trxId = 0; try { try { w = mvcc.beginMemstoreInsert(); if (wal != null) { if (!wal.startCacheFlush(encodedRegionName, flushedFamilyNames)) { // This should never happen. String msg = "Flush will not be started for [" + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; status.setStatus(msg); return new PrepareFlushResult(new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg), myseqid); } flushOpSeqId = getNextSequenceId(wal); long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName); // no oldestUnflushedSeqId means we flushed all stores. // or the unflushed stores are all empty. flushedSeqId = (oldestUnflushedSeqId == HConstants.NO_SEQNUM) ? flushOpSeqId : oldestUnflushedSeqId - 1; } else { // use the provided sequence Id as WAL is not being used for this flush. flushedSeqId = flushOpSeqId = myseqid; } //循环遍历region下面的storeFile,为每个storeFile生成了一个StoreFlusherImpl类, // 生成MemStore的快照就是调用每个StoreFlusherImpl的prepare方法生成每个storeFile的快照, // 至于internalFlushCacheAndCommit中的flush和commti行为也是调用了region中每个storeFile的flushCache和commit接口。 for (Store s : storesToFlush) { //用于统计flush的所有的store的memtore内存大小之和,而不是snapshot的getCellsCount() totalFlushableSizeOfFlushableStores += s.getFlushableSize(); //为每一个store生成自己的storeFlushImpl storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId)); //此时还没有生成flush的hfile路径 committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL } // write the snapshot start to WAL if (wal != null && !writestate.readOnly) { FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, getRegionInfo(), flushOpSeqId, committedFiles); // no sync. Sync is below where we do not hold the updates lock //这里只是向wal中写入begin flush的marker,真正的sync在后面做,因为这里加了update的写锁,所有耗时操作都不在这里进行 trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, sequenceId, false); } // Prepare flush (take a snapshot)这里的StoreFlushContext就是StoreFlusherImpl for (StoreFlushContext flush : storeFlushCtxs.values()) { //迭代region下的每一个store,把memstore下的kvset复制到memstore的snapshot中并清空kvset的值 //把memstore的snapshot复制到HStore的snapshot中 flush.prepare();//其prepare方法就是调用store的storeFlushImpl的snapshot方法生成快照 } } catch (IOException ex) { if (wal != null) { if (trxId > 0) { // check whether we have already written START_FLUSH to WAL try { FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, getRegionInfo(), flushOpSeqId, committedFiles); WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, sequenceId, false); } catch (Throwable t) { LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" + StringUtils.stringifyException(t)); // ignore this since we will be aborting the RS with DSE. } } // we have called wal.startCacheFlush(), now we have to abort it wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); throw ex; // let upper layers deal with it. } } finally { //做完snapshot释放锁,此时不会阻塞业务的读写操作了 this.updatesLock.writeLock().unlock(); } String s = "Finished memstore snapshotting " + this + ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores; status.setStatus(s); if (LOG.isTraceEnabled()) LOG.trace(s); // sync unflushed WAL changes // see HBASE-8208 for details if (wal != null) { try { wal.sync(); // ensure that flush marker is sync'ed } catch (IOException ioe) { LOG.warn("Unexpected exception while wal.sync(), ignoring. Exception: " + StringUtils.stringifyException(ioe)); } } // wait for all in-progress transactions to commit to WAL before // we can start the flush. This prevents // uncommitted transactions from being written into HFiles. // We have to block before we start the flush, otherwise keys that // were removed via a rollbackMemstore could be written to Hfiles. w.setWriteNumber(flushOpSeqId); mvcc.waitForPreviousTransactionsComplete(w); // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block w = null; } finally { if (w != null) { // in case of failure just mark current w as complete mvcc.advanceMemstore(w); } } return new PrepareFlushResult(storeFlushCtxs, committedFiles, startTime, flushOpSeqId, flushedSeqId, totalFlushableSizeOfFlushableStores);
    
    

      在具体看StoreFlushContext.prepare()之前,我们先看一下StoreFlushContext接口的说明,如上所述,StoreFlushImpl是Store的内部类,继承自StoreFlushContext。

    interface StoreFlushContext {
    
    
      void prepare();
    
    
      void flushCache(MonitoredTask status) throws IOException;
    
    
      boolean commit(MonitoredTask status) throws IOException;
    
      
      void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot) throws IOException;
    
    
      void abort() throws IOException;
    
    
      List<Path> getCommittedFiles();
    }
    

      现在我们回过头来继续看internalPrepareFlushcache中标红的flush.prepare();

    public void prepare() {
                //在region调用storeFlusherImpl的prepare的时候,前面提到是在region的update.write.lock中的,因此这里面所有的耗时操作都会影响业务正在进行的读写操作.
                //在snapshot中的逻辑中只是将memstore的跳跃表赋值给snapshot的跳跃表,在返回memstoresnapshot的时候,调用的snapshot的size()方法
                this.snapshot = memstore.snapshot();
                //MemstoreSnapshot的getCellsCount方法即在memstore的shapshot中返回的MemStoresnapshot中传入的snapshot.size()值,时间复杂度是o(n)
                this.cacheFlushCount = snapshot.getCellsCount();
                this.cacheFlushSize = snapshot.getSize();
                committedFiles = new ArrayList<Path>(1);
            }
    

      我们看下memstore的snapshot方法

    public MemStoreSnapshot snapshot() {
        // If snapshot currently has entries, then flusher failed or didn't call
        // cleanup.  Log a warning.
        if (!this.snapshot.isEmpty()) {
          LOG.warn("Snapshot called again without clearing previous. " +
              "Doing nothing. Another ongoing flush or did we fail last attempt?");
        } else {
          this.snapshotId = EnvironmentEdgeManager.currentTime();
          //memstore使用的mem大小
          this.snapshotSize = keySize();
          if (!this.cellSet.isEmpty()) {
            //这里的cellset就是memstore内存中的数据
            this.snapshot = this.cellSet;
    //构造一个新的cellset存储数据 this.cellSet = new CellSkipListSet(this.comparator); this.snapshotTimeRangeTracker = this.timeRangeTracker; this.timeRangeTracker = new TimeRangeTracker(); // Reset heap to not include any keys this.size.set(DEEP_OVERHEAD); this.snapshotAllocator = this.allocator; // Reset allocator so we get a fresh buffer for the new memstore if (allocator != null) { String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName()); this.allocator = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { Configuration.class }, new Object[] { conf }); } timeOfOldestEdit = Long.MAX_VALUE; } }

          prepare中的snapshot.getCellsCount();我们重点说一下,hbase的内存存储写入的数据使用的是跳跃表的数据结构,实现是使用jdk自带的ConcurrentSkipListMap。在hbase的MemStore(默认是DefaultMemStore)实现中有两个环境变量,分别是ConcurrentSkipListMap类型的cellset和snapshot。cellset用来存储写入到memstore的数据,snapshot是在flush的第一阶段是将cellset赋值用的。因此这个的getCellsCount()方法最终调用的是concurrentSkipListMap.size(),concurrentSkipListMap并没有一个原子变量来报错map的大小,因为这里为了并发,同时该操作也不常用。因此concurrentSkipListMap.size()是遍历整个跳跃表获取size大小。

      继续回到internalPrepareFlushCache中,对每一个store调用完prepare后,就将updatelock进行unlock。并返回一个PrepareFlushResult。继续往上走,
    回到internalFlushCache方法。执行完internalPrepareFlushcache后走的是internalFlushAndCommit方法。继续跟进:
    protected FlushResult internalFlushCacheAndCommit(
                final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
                final Collection<Store> storesToFlush)
                throws IOException {
    
            // prepare flush context is carried via PrepareFlushResult
            //进行flush的store的cf:storeFlushImpl映射
            TreeMap<byte[], StoreFlushContext> storeFlushCtxs = prepareResult.storeFlushCtxs;
            //flush生成的hfile的路径,当前key是有的,为cf,但是List<Path>为null,是在internalPrepareFlushCache中初始化的
            TreeMap<byte[], List<Path>> committedFiles = prepareResult.committedFiles;
            long startTime = prepareResult.startTime;
            long flushOpSeqId = prepareResult.flushOpSeqId;
            long flushedSeqId = prepareResult.flushedSeqId;
            long totalFlushableSizeOfFlushableStores = prepareResult.totalFlushableSize;
    
            String s = "Flushing stores of " + this;
            status.setStatus(s);
            if (LOG.isTraceEnabled()) LOG.trace(s);
    
            // Any failure from here on out will be catastrophic requiring server
            // restart so wal content can be replayed and put back into the memstore.
            // Otherwise, the snapshot content while backed up in the wal, it will not
            // be part of the current running servers state.
            boolean compactionRequested = false;
            try {
                // A.  Flush memstore to all the HStores.
                // Keep running vector of all store files that includes both old and the
                // just-made new flush store file. The new flushed file is still in the
                // tmp directory.
                //迭代region下的每一个store,调用HStore.storeFlushImpl.flushCache方法,把store中snapshot的数据flush到hfile中,当然这里是flush到temp文件中,最终是通过commit将其移到正确的路径下
                //
                //
                for (StoreFlushContext flush : storeFlushCtxs.values()) {
                    flush.flushCache(status);
                }
    
                // Switch snapshot (in memstore) -> new hfile (thus causing
                // all the store scanners to reset/reseek).
                Iterator<Store> it = storesToFlush.iterator();
                // stores.values() and storeFlushCtxs have same order
                for (StoreFlushContext flush : storeFlushCtxs.values()) {
                    boolean needsCompaction = flush.commit(status);
                    if (needsCompaction) {
                        compactionRequested = true;
                    }
                    committedFiles.put(it.next().getFamily().getName(), flush.getCommittedFiles());
                }
                storeFlushCtxs.clear();
    
                // Set down the memstore size by amount of flush.
                this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores);
    
                if (wal != null) {
                    // write flush marker to WAL. If fail, we should throw DroppedSnapshotException
                    FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
                            getRegionInfo(), flushOpSeqId, committedFiles);
                    WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
                            desc, sequenceId, true);
                }
            } catch (Throwable t) {
                // An exception here means that the snapshot was not persisted.
                // The wal needs to be replayed so its content is restored to memstore.
                // Currently, only a server restart will do this.
                // We used to only catch IOEs but its possible that we'd get other
                // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
                // all and sundry.
                if (wal != null) {
                    try {
                        FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
                                getRegionInfo(), flushOpSeqId, committedFiles);
                        WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
                                desc, sequenceId, false);
                    } catch (Throwable ex) {
                        LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
                                StringUtils.stringifyException(ex));
                        // ignore this since we will be aborting the RS with DSE.
                    }
                    wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
                }
                DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
                        Bytes.toStringBinary(getRegionName()));
                dse.initCause(t);
                status.abort("Flush failed: " + StringUtils.stringifyException(t));
                throw dse;
            }
    
            // If we get to here, the HStores have been written.
            if (wal != null) {
                wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
            }
    
            // Record latest flush time
            for (Store store : storesToFlush) {
                this.lastStoreFlushTimeMap.put(store, startTime);
            }
    
            // Update the oldest unflushed sequence id for region.
            this.maxFlushedSeqId = flushedSeqId;
    
            // C. Finally notify anyone waiting on memstore to clear:
            // e.g. checkResources().
            synchronized (this) {
                notifyAll(); // FindBugs NN_NAKED_NOTIFY
            }
    
            long time = EnvironmentEdgeManager.currentTime() - startTime;
            long memstoresize = this.memstoreSize.get();
            String msg = "Finished memstore flush of ~"
                    + StringUtils.byteDesc(totalFlushableSizeOfFlushableStores) + "/"
                    + totalFlushableSizeOfFlushableStores + ", currentsize="
                    + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
                    + " for region " + this + " in " + time + "ms, sequenceid="
                    + flushOpSeqId + ", compaction requested=" + compactionRequested
                    + ((wal == null) ? "; wal=null" : "");
            LOG.info(msg);
            status.setStatus(msg);
    
            return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
                    FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushOpSeqId);
        }
    

      我们就只看其中两个方法:flush.flushcache和flush.commit。这里的flush即StoreFlushImpl。flushcache方法是用来执行第二阶段,commit用来执行第三阶段。

     public void flushCache(MonitoredTask status) throws IOException {
                //返回的是snapshotflush到临时文件后,最终需要移到的正确路径
                tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status);
            }
    

      转到store的flushcache方法

    protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
                                        MonitoredTask status) throws IOException {
            // If an exception happens flushing, we let it out without clearing
            // the memstore snapshot.  The old snapshot will be returned when we say
            // 'snapshot', the next time flush comes around.
            // Retry after catching exception when flushing, otherwise server will abort
            // itself
            StoreFlusher flusher = storeEngine.getStoreFlusher();
            IOException lastException = null;
            for (int i = 0; i < flushRetriesNumber; i++) {
                try {
                    //调用StoreFlusher.flushsnapshot方法将snapshotflush到temp文件
                    List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status);
                    Path lastPathName = null;
                    try {
                        for (Path pathName : pathNames) {
                            lastPathName = pathName;
                            validateStoreFile(pathName);
                        }
                        return pathNames;
                    } catch (Exception e) {
                        LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
                        if (e instanceof IOException) {
                            lastException = (IOException) e;
                        } else {
                            lastException = new IOException(e);
                        }
                    }
                } catch (IOException e) {
                    LOG.warn("Failed flushing store file, retrying num=" + i, e);
                    lastException = e;
                }
                if (lastException != null && i < (flushRetriesNumber - 1)) {
                    try {
                        Thread.sleep(pauseTime);
                    } catch (InterruptedException e) {
                        IOException iie = new InterruptedIOException();
                        iie.initCause(e);
                        throw iie;
                    }
                }
            }
            throw lastException;
        }
    

     其中标红的部分是主要的逻辑。首先通过storeEngine.getStoreFlusher获取flush的实例,实际包括了sync到disk的writer以及append等操作。这里不再展开说明。我们重点看一下for循环中的flusher.flushSnapshot方法,涉及到一个重要的环境变量cellsCount

    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
          MonitoredTask status) throws IOException {
        ArrayList<Path> result = new ArrayList<Path>();
        //这里会调用snapshot的getCellsCount方法,之所以这里提了这个方法,是因为其实一个prepare阶段耗时较大的过程。
        int cellsCount = snapshot.getCellsCount();
        if (cellsCount == 0) return result; // don't flush if there are no entries
    
        // Use a store scanner to find which rows to flush.
        long smallestReadPoint = store.getSmallestReadPoint();
        InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
        if (scanner == null) {
          return result; // NULL scanner returned from coprocessor hooks means skip normal processing
        }
    
        StoreFile.Writer writer;
        try {
          // TODO:  We can fail in the below block before we complete adding this flush to
          //        list of store files.  Add cleanup of anything put on filesystem if we fail.
          synchronized (flushLock) {
            status.setStatus("Flushing " + store + ": creating writer");
            // Write the map out to the disk
            //这里传入的cellsCount实际并没有用,可能是预置的变量?
            writer = store.createWriterInTmp(
                cellsCount, store.getFamily().getCompression(), false, true, true);
            writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
            IOException e = null;
            try {
              //真正的将snapshot写入临时文件
              performFlush(scanner, writer, smallestReadPoint);
            } catch (IOException ioe) {
              e = ioe;
              // throw the exception out
              throw ioe;
            } finally {
              if (e != null) {
                writer.close();
              } else {
                finalizeWriter(writer, cacheFlushId, status);
              }
            }
          }
        } finally {
          scanner.close();
        }
        LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize="
            + StringUtils.humanReadableInt(snapshot.getSize()) +
            ", hasBloomFilter=" + writer.hasGeneralBloom() +
            ", into tmp file " + writer.getPath());
        result.add(writer.getPath());
        return result;
      }
    

      可以看到store.createWriterInTmp中使用了该变量,继续跟进

    public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
                                                  boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag)
                throws IOException {
    。。。。。忽略不重要逻辑。。。。。 //这里传入的maxkeyCount没有用 StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, this.getFileSystem()) .withFilePath(fs.createTempName()) .withComparator(comparator) .withBloomType(family.getBloomFilterType()) .withMaxKeyCount(maxKeyCount) .withFavoredNodes(favoredNodes) .withFileContext(hFileContext) .build(); return w; }

      可见将cellscount以参数的形式传给了writer。然后执行performFlush方法,该方法通过scanner遍历,然后使用hfile.writer将数据罗盘。我们看一下Writer中将cellscount用来干啥了。在整个writer中只有这两个地方用到了

    generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(
              conf, cacheConf, bloomType,
              (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
    this.deleteFamilyBloomFilterWriter = BloomFilterFactory
                .createDeleteBloomAtWrite(conf, cacheConf,
                    (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);  

    继续跟进这两个

     public static BloomFilterWriter createDeleteBloomAtWrite(Configuration conf,
          CacheConfig cacheConf, int maxKeys, HFile.Writer writer) {
        if (!isDeleteFamilyBloomEnabled(conf)) {
          LOG.info("Delete Bloom filters are disabled by configuration for "
              + writer.getPath()
              + (conf == null ? " (configuration is null)" : ""));
          return null;
        }
    
        float err = getErrorRate(conf);
    
        int maxFold = getMaxFold(conf);
        // In case of compound Bloom filters we ignore the maxKeys hint.
        CompoundBloomFilterWriter bloomWriter = new CompoundBloomFilterWriter(getBloomBlockSize(conf),
            err, Hash.getHashType(conf), maxFold, cacheConf.shouldCacheBloomsOnWrite(),
            KeyValue.RAW_COMPARATOR);
        writer.addInlineBlockWriter(bloomWriter);
        return bloomWriter;
      }
    

      可见maxKeys没有使用,另一个方法同理,所以这里的cellscount变量在flush的第二阶段没有使用。

      到现在为止我们判断出在第二阶段cellcount没有使用,我们继续跟进第三阶段:回到internalFlushAndCOmmit中的flush.commit(status)
    public boolean commit(MonitoredTask status) throws IOException {
                if (this.tempFiles == null || this.tempFiles.isEmpty()) {
                    return false;
                }
                List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
                for (Path storeFilePath : tempFiles) {
                    try {
                        storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status));
                    } catch (IOException ex) {
                        LOG.error("Failed to commit store file " + storeFilePath, ex);
                        // Try to delete the files we have committed before.
                        for (StoreFile sf : storeFiles) {
                            Path pathToDelete = sf.getPath();
                            try {
                                sf.deleteReader();
                            } catch (IOException deleteEx) {
                                LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
                                Runtime.getRuntime().halt(1);
                            }
                        }
                        throw new IOException("Failed to commit the flush", ex);
                    }
                }
    
                for (StoreFile sf : storeFiles) {
                    if (HStore.this.getCoprocessorHost() != null) {
                        HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
                    }
                    committedFiles.add(sf.getPath());
                }
    
                HStore.this.flushedCellsCount += cacheFlushCount;
                HStore.this.flushedCellsSize += cacheFlushSize;
    
                // Add new file to store files.  Clear snapshot too while we have the Store write lock.
                return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
            }
    

      第三阶段比较简单,将flush的文件移动到hdfs正确的路径下。同时可见在这里用到了cellscount。这里是赋值给store的flushedCellsCount,这里主要是用来进行metric收集flushedCellsSize的。根据经验这个metric可忽略,未使用过。

    总结

    这里之所以总是提到cellscount变量,是因为给其赋值调用ConcurrentSkipListMap.size()方法在flush的第一阶段中最耗时的,同时持有hbase region 级别的updatelock,但是通过梳理并没有太大的用处,可以干掉。否则会因此一些毛刺,pct99比较高。已有patch,但是是应用在2.+的版本的、

    整个flush的流程就结束了,如有不对的地方,欢迎指正。欢迎加微信相互交流:940184856

  • 相关阅读:
    《一课经济学》八、通货膨胀与尾声
    《一课经济学》七、房租管制和最低工资法
    《一课经济学》六、政府价格管制
    《一课经济学》五、价值体系的运作
    《一课经济学》四、国际贸易
    《一课经济学》三、就业相关
    《一课经济学》二、政府的投资、税收与信贷
    《一课经济学》一、破窗谬误与“战祸之福”
    《人类简史》十五、开启未来(下)——智慧之心
    C++ Template之类模版
  • 原文地址:https://www.cnblogs.com/Evil-Rebe/p/11670568.html
Copyright © 2011-2022 走看看