zoukankan      html  css  js  c++  java
  • Hbase mvcc

    region hlog回放时mvcc的应用

    region open的时候会根据各个HStore下的所有HFile文件记录的maxMemstoreTS,找到最大的maxSeqId,然后根据各个HStore记录的maxSeqId回放HLog日志(从maxMemstoreTS+1开始)。

    数据写入过程中的版本管理

    HRegion.doMiniBatchMutate

        // STEP 1. Try to acquire as many locks as we can and build mini-batch of operations with
          // locked rows
          miniBatchOp = batchOp.lockRowsAndBuildMiniBatch(acquiredRowLocks);
    
          // We've now grabbed as many mutations off the list as we can
          // Ensure we acquire at least one.
          if (miniBatchOp.getReadyToWriteCount() <= 0) {
            // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
            return;
          }
    
          lock(this.updatesLock.readLock(), miniBatchOp.getReadyToWriteCount());
          locked = true;
    
          // STEP 2. Update mini batch of all operations in progress with  LATEST_TIMESTAMP timestamp
          // We should record the timestamp only after we have acquired the rowLock,
          // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
          long now = EnvironmentEdgeManager.currentTime();
          batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks);
    
          // STEP 3. Build WAL edit
          List<Pair<NonceKey, WALEdit>> walEdits = batchOp.buildWALEdits(miniBatchOp);
    
          // STEP 4. Append the WALEdits to WAL and sync.
          for(Iterator<Pair<NonceKey, WALEdit>> it = walEdits.iterator(); it.hasNext();) {
            Pair<NonceKey, WALEdit> nonceKeyWALEditPair = it.next();
            walEdit = nonceKeyWALEditPair.getSecond();
            NonceKey nonceKey = nonceKeyWALEditPair.getFirst();
    
            if (walEdit != null && !walEdit.isEmpty()) {
              writeEntry = doWALAppend(walEdit, batchOp.durability, batchOp.getClusterIds(), now,
                  nonceKey.getNonceGroup(), nonceKey.getNonce(), batchOp.getOrigLogSeqNum());
            }
    
            // Complete mvcc for all but last writeEntry (for replay case)
            if (it.hasNext() && writeEntry != null) {
              mvcc.complete(writeEntry);
              writeEntry = null;
            }
          }
    
          // STEP 5. Write back to memStore
          // NOTE: writeEntry can be null here
          writeEntry = batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry);
    
          // STEP 6. Complete MiniBatchOperations: If required calls postBatchMutate() CP hook and
          // complete mvcc for last writeEntry
          batchOp.completeMiniBatchOperations(miniBatchOp, writeEntry);
          writeEntry = null;
          success = true;
    

    HRegion.doWALAppend

    
        // Using default cluster id, as this can only happen in the originating cluster.
        // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey
        // here instead of WALKeyImpl directly to support legacy coprocessors.
        WALKeyImpl walKey = walEdit.isReplay()?
            new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
              this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now, clusterIds,
                nonceGroup, nonce, mvcc) :
            new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
                this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now, clusterIds,
                nonceGroup, nonce, mvcc, this.getReplicationScope());
        if (walEdit.isReplay()) {
          walKey.setOrigLogSeqNum(origLogSeqNum);
        }
        WriteEntry writeEntry = null;
        try {
          long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
          // Call sync on our edit.
          if (txid != 0) {
            sync(txid, durability);
          }
          writeEntry = walKey.getWriteEntry();
        } catch (IOException ioe) {
          if (walKey != null && walKey.getWriteEntry() != null) {
            mvcc.complete(walKey.getWriteEntry());
          }
          throw ioe;
        }
        return writeEntry;
    
    

    HSHLog:

      @Override
      public long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
          final boolean inMemstore) throws IOException {
        return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
          disruptor.getRingBuffer());
      }
    

    AbstractFSWAL

      protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
          WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
          throws IOException {
        if (this.closed) {
          throw new IOException(
              "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
        }
        MutableLong txidHolder = new MutableLong();
        MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
          txidHolder.setValue(ringBuffer.next());
        });
        long txid = txidHolder.longValue();
        ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
          .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
        try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
          FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
          entry.stampRegionSequenceId(we);
          ringBuffer.get(txid).load(entry);
        } finally {
          ringBuffer.publish(txid);
        }
        return txid;
      }
    
    

    关键点:

    • mvcc的begin方法把writePoint自增,并以自增后的writePoint生成一个写条目放入到写队列writeQueue
    • 新建FSWALEntrywal日志条目并publish到Disruptor队列中
    • 设置本次写入相关的Cell的sequenceId为自增后的writePoint
    • Disruptorsequence id作为本次事物IDtxid
    • 然后发布一个SyncFutureDisruptor队列,等待写wal日志完成

    FSWALEntry

     long stampRegionSequenceId(MultiVersionConcurrencyControl.WriteEntry we) throws IOException {
        long regionSequenceId = we.getWriteNumber();
        if (!this.getEdit().isReplay() && inMemstore) {
          for (Cell c : getEdit().getCells()) {
            PrivateCellUtil.setSequenceId(c, regionSequenceId);
          }
        }
    
        getKey().setWriteEntry(we);
        return regionSequenceId;
      }
    

    MultiVersionConcurrencyControl

      final AtomicLong readPoint = new AtomicLong(0); // 可以用来和Cell sequenceId做对比,判断该Cell是否可见
      final AtomicLong writePoint = new AtomicLong(0); // 每个事物写自增1
      private final Object readWaiters = new Object();
    
      public WriteEntry begin(Runnable action) {
        synchronized (writeQueue) {
          long nextWriteNumber = writePoint.incrementAndGet();
          WriteEntry e = new WriteEntry(nextWriteNumber);
          writeQueue.add(e);
          action.run();
          return e;
        }
      }
    
      public void completeAndWait(WriteEntry e) {
        if (!complete(e)) {
          waitForRead(e);
        }
      }
    
      public boolean complete(WriteEntry writeEntry) {
        synchronized (writeQueue) {
          writeEntry.markCompleted();
          long nextReadValue = NONE;
          boolean ranOnce = false;
          while (!writeQueue.isEmpty()) {
            ranOnce = true;
            WriteEntry queueFirst = writeQueue.getFirst();
    
            if (nextReadValue > 0) {
              if (nextReadValue + 1 != queueFirst.getWriteNumber()) {
                throw new RuntimeException("Invariant in complete violated, nextReadValue="
                    + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber());
              }
            }
    
            if (queueFirst.isCompleted()) {
              nextReadValue = queueFirst.getWriteNumber();
              writeQueue.removeFirst();
            } else {
              break;
            }
          }
    
          if (!ranOnce) {
            throw new RuntimeException("There is no first!");
          }
    
          if (nextReadValue > 0) {
            synchronized (readWaiters) {
              readPoint.set(nextReadValue);
              readWaiters.notifyAll();
            }
          }
          return readPoint.get() >= writeEntry.getWriteNumber();
        }
      }
    
    
      /**
       * Wait for the global readPoint to advance up to the passed in write entry number.
       */
      void waitForRead(WriteEntry e) {
        boolean interrupted = false;
        int count = 0;
        synchronized (readWaiters) {
          while (readPoint.get() < e.getWriteNumber()) {
            if (count % 100 == 0 && count > 0) {
              LOG.warn("STUCK: " + this);
            }
            count++;
            try {
              readWaiters.wait(10);
            } catch (InterruptedException ie) {
              // We were interrupted... finish the loop -- i.e. cleanup --and then
              // on our way out, reset the interrupt flag.
              interrupted = true;
            }
          }
        }
        if (interrupted) {
          Thread.currentThread().interrupt();
        }
      }
    
    • MultiVersionConcurrencyControl类中定义了readPoint,writePoint两个成员变量
    • 每一个事物操作,writePoint自增1并创建一条WriteEntry加入到writeQueue队列(LinkedList<WriteEntry>)中。
    • complete(WriteEntry writeEntry)方法把传入的writeEntry标记为已完成,并从队列首部不断移除已经完成的WriteEntry条目,并把readPoint更新为最后一个已经完成的writeEntrywriteNumber,返回当前readPoint跟上或者超过了传入的writeEntry
    • waitForRead(WriteEntry e)自旋等待直到该写条目e完成。

    读取过程中的版本控制

    Scan类可以设置事物隔离级别:

      @Override
      public Scan setIsolationLevel(IsolationLevel level) {
        return (Scan) super.setIsolationLevel(level);
      }
    
    public enum IsolationLevel {
    
      READ_COMMITTED(1),
      READ_UNCOMMITTED(2);
    
      IsolationLevel(int value) {}
    
    }
    

    StoreFileScanner

      @Override
      public boolean seek(Cell key) throws IOException {
        if (seekCount != null) seekCount.increment();
    
        try {
          try {
            if(!seekAtOrAfter(hfs, key)) {
              this.cur = null;
              return false;
            }
    
            setCurrentCell(hfs.getCell());
    
            if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
              return skipKVsNewerThanReadpoint();
            } else {
              return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
            }
          } finally {
            realSeekDone = true;
          }
        } catch (FileNotFoundException e) {
          throw e;
        } catch (IOException ioe) {
          throw new IOException("Could not seek " + this + " to key " + key, ioe);
        }
      }
    
    

    SegmentScanner

      protected void updateCurrent() {
        Cell next = null;
    
        try {
          while (iter.hasNext()) {
            next = iter.next();
            if (next.getSequenceId() <= this.readPoint) {
              current = next;
              return;// skip irrelevant versions
            }
            // for backwardSeek() stay in the boundaries of a single row
            if (stopSkippingKVsIfNextRow &&
                segment.compareRows(next, stopSkippingKVsRow) > 0) {
              current = null;
              return;
            }
          } // end of while
    
          current = null; // nothing found
        } finally {
          if (next != null) {
            // in all cases, remember the last KV we iterated to, needed for reseek()
            last = next;
          }
        }
      }
    
    

    RegionScannerImpl

          IsolationLevel isolationLevel = scan.getIsolationLevel();
          long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan);
          synchronized (scannerReadPoints) {
            if (mvccReadPoint > 0) {
              this.readPt = mvccReadPoint;
            } else if (nonce == HConstants.NO_NONCE || rsServices == null
                || rsServices.getNonceManager() == null) {
              this.readPt = getReadPoint(isolationLevel);
            } else {
              this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
            }
            scannerReadPoints.put(this, this.readPt);
          }
    
    

    HRegion

      public long getReadPoint(IsolationLevel isolationLevel) {
        if (isolationLevel != null && isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
          // This scan can read even uncommitted transactions
          return Long.MAX_VALUE;
        }
        return mvcc.getReadPoint();
      }
    

    StoreFileScannerSegmentScannerseek的过程中会根据CellsequenceId和mvcc的readPoint进行比较判断是否需要skipCell

    备注

    hbase version:2.1.7

  • 相关阅读:
    dell r720服务器raid5安装centos6.5系统
    centos6.5报错:checking filesystems failed问题处理
    配置mysql5.5主从复制、半同步复制、主主复制
    vmware下centos克隆功能对网络的设置
    mysql报错问题解决MySQL server PID file could not be found!
    使用第三方工具Xtrabackup进行MySQL备份
    mysql数据库基于LVM快照的备份
    mysql的日志及利用mysqldump备份及还原
    centos6.5下java和tomcat环境部署
    通达OA数据库优化方案之_历史数据清理
  • 原文地址:https://www.cnblogs.com/andyhe/p/11736724.html
Copyright © 2011-2022 走看看