region hlog回放时mvcc的应用
region open
的时候会根据各个HStore
下的所有HFile
文件记录的maxMemstoreTS
,找到最大的maxSeqId
,然后根据各个HStore
记录的maxSeqId
回放HLog日志(从maxMemstoreTS+1开始)。
数据写入过程中的版本管理
HRegion.doMiniBatchMutate
// STEP 1. Try to acquire as many locks as we can and build mini-batch of operations with
// locked rows
miniBatchOp = batchOp.lockRowsAndBuildMiniBatch(acquiredRowLocks);
// We've now grabbed as many mutations off the list as we can
// Ensure we acquire at least one.
if (miniBatchOp.getReadyToWriteCount() <= 0) {
// Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
return;
}
lock(this.updatesLock.readLock(), miniBatchOp.getReadyToWriteCount());
locked = true;
// STEP 2. Update mini batch of all operations in progress with LATEST_TIMESTAMP timestamp
// We should record the timestamp only after we have acquired the rowLock,
// otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
long now = EnvironmentEdgeManager.currentTime();
batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks);
// STEP 3. Build WAL edit
List<Pair<NonceKey, WALEdit>> walEdits = batchOp.buildWALEdits(miniBatchOp);
// STEP 4. Append the WALEdits to WAL and sync.
for(Iterator<Pair<NonceKey, WALEdit>> it = walEdits.iterator(); it.hasNext();) {
Pair<NonceKey, WALEdit> nonceKeyWALEditPair = it.next();
walEdit = nonceKeyWALEditPair.getSecond();
NonceKey nonceKey = nonceKeyWALEditPair.getFirst();
if (walEdit != null && !walEdit.isEmpty()) {
writeEntry = doWALAppend(walEdit, batchOp.durability, batchOp.getClusterIds(), now,
nonceKey.getNonceGroup(), nonceKey.getNonce(), batchOp.getOrigLogSeqNum());
}
// Complete mvcc for all but last writeEntry (for replay case)
if (it.hasNext() && writeEntry != null) {
mvcc.complete(writeEntry);
writeEntry = null;
}
}
// STEP 5. Write back to memStore
// NOTE: writeEntry can be null here
writeEntry = batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry);
// STEP 6. Complete MiniBatchOperations: If required calls postBatchMutate() CP hook and
// complete mvcc for last writeEntry
batchOp.completeMiniBatchOperations(miniBatchOp, writeEntry);
writeEntry = null;
success = true;
HRegion.doWALAppend
// Using default cluster id, as this can only happen in the originating cluster.
// A slave cluster receives the final value (not the delta) as a Put. We use HLogKey
// here instead of WALKeyImpl directly to support legacy coprocessors.
WALKeyImpl walKey = walEdit.isReplay()?
new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now, clusterIds,
nonceGroup, nonce, mvcc) :
new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now, clusterIds,
nonceGroup, nonce, mvcc, this.getReplicationScope());
if (walEdit.isReplay()) {
walKey.setOrigLogSeqNum(origLogSeqNum);
}
WriteEntry writeEntry = null;
try {
long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
// Call sync on our edit.
if (txid != 0) {
sync(txid, durability);
}
writeEntry = walKey.getWriteEntry();
} catch (IOException ioe) {
if (walKey != null && walKey.getWriteEntry() != null) {
mvcc.complete(walKey.getWriteEntry());
}
throw ioe;
}
return writeEntry;
HSHLog
:
@Override
public long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
final boolean inMemstore) throws IOException {
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
disruptor.getRingBuffer());
}
AbstractFSWAL
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
throws IOException {
if (this.closed) {
throw new IOException(
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
}
MutableLong txidHolder = new MutableLong();
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
txidHolder.setValue(ringBuffer.next());
});
long txid = txidHolder.longValue();
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
entry.stampRegionSequenceId(we);
ringBuffer.get(txid).load(entry);
} finally {
ringBuffer.publish(txid);
}
return txid;
}
关键点:
- mvcc的
begin
方法把writePoint
自增,并以自增后的writePoint
生成一个写条目放入到写队列writeQueue
中 - 新建
FSWALEntry
wal日志条目并publish到Disruptor
队列中 - 设置本次写入相关的
Cell
的sequenceId为自增后的writePoint
- 以
Disruptor
的sequence id
作为本次事物IDtxid
- 然后发布一个
SyncFuture
到Disruptor
队列,等待写wal日志完成
FSWALEntry
long stampRegionSequenceId(MultiVersionConcurrencyControl.WriteEntry we) throws IOException {
long regionSequenceId = we.getWriteNumber();
if (!this.getEdit().isReplay() && inMemstore) {
for (Cell c : getEdit().getCells()) {
PrivateCellUtil.setSequenceId(c, regionSequenceId);
}
}
getKey().setWriteEntry(we);
return regionSequenceId;
}
MultiVersionConcurrencyControl
final AtomicLong readPoint = new AtomicLong(0); // 可以用来和Cell sequenceId做对比,判断该Cell是否可见
final AtomicLong writePoint = new AtomicLong(0); // 每个事物写自增1
private final Object readWaiters = new Object();
public WriteEntry begin(Runnable action) {
synchronized (writeQueue) {
long nextWriteNumber = writePoint.incrementAndGet();
WriteEntry e = new WriteEntry(nextWriteNumber);
writeQueue.add(e);
action.run();
return e;
}
}
public void completeAndWait(WriteEntry e) {
if (!complete(e)) {
waitForRead(e);
}
}
public boolean complete(WriteEntry writeEntry) {
synchronized (writeQueue) {
writeEntry.markCompleted();
long nextReadValue = NONE;
boolean ranOnce = false;
while (!writeQueue.isEmpty()) {
ranOnce = true;
WriteEntry queueFirst = writeQueue.getFirst();
if (nextReadValue > 0) {
if (nextReadValue + 1 != queueFirst.getWriteNumber()) {
throw new RuntimeException("Invariant in complete violated, nextReadValue="
+ nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber());
}
}
if (queueFirst.isCompleted()) {
nextReadValue = queueFirst.getWriteNumber();
writeQueue.removeFirst();
} else {
break;
}
}
if (!ranOnce) {
throw new RuntimeException("There is no first!");
}
if (nextReadValue > 0) {
synchronized (readWaiters) {
readPoint.set(nextReadValue);
readWaiters.notifyAll();
}
}
return readPoint.get() >= writeEntry.getWriteNumber();
}
}
/**
* Wait for the global readPoint to advance up to the passed in write entry number.
*/
void waitForRead(WriteEntry e) {
boolean interrupted = false;
int count = 0;
synchronized (readWaiters) {
while (readPoint.get() < e.getWriteNumber()) {
if (count % 100 == 0 && count > 0) {
LOG.warn("STUCK: " + this);
}
count++;
try {
readWaiters.wait(10);
} catch (InterruptedException ie) {
// We were interrupted... finish the loop -- i.e. cleanup --and then
// on our way out, reset the interrupt flag.
interrupted = true;
}
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
MultiVersionConcurrencyControl
类中定义了readPoint
,writePoint
两个成员变量- 每一个事物操作,
writePoint
自增1并创建一条WriteEntry
加入到writeQueue
队列(LinkedList<WriteEntry>
)中。 complete(WriteEntry writeEntry)
方法把传入的writeEntry
标记为已完成,并从队列首部不断移除已经完成的WriteEntry
条目,并把readPoint
更新为最后一个已经完成的writeEntry
的writeNumber
,返回当前readPoint
跟上或者超过了传入的writeEntry
。waitForRead(WriteEntry e)
自旋等待直到该写条目e
完成。
读取过程中的版本控制
Scan
类可以设置事物隔离级别:
@Override
public Scan setIsolationLevel(IsolationLevel level) {
return (Scan) super.setIsolationLevel(level);
}
public enum IsolationLevel {
READ_COMMITTED(1),
READ_UNCOMMITTED(2);
IsolationLevel(int value) {}
}
StoreFileScanner
@Override
public boolean seek(Cell key) throws IOException {
if (seekCount != null) seekCount.increment();
try {
try {
if(!seekAtOrAfter(hfs, key)) {
this.cur = null;
return false;
}
setCurrentCell(hfs.getCell());
if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
return skipKVsNewerThanReadpoint();
} else {
return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
}
} finally {
realSeekDone = true;
}
} catch (FileNotFoundException e) {
throw e;
} catch (IOException ioe) {
throw new IOException("Could not seek " + this + " to key " + key, ioe);
}
}
SegmentScanner
protected void updateCurrent() {
Cell next = null;
try {
while (iter.hasNext()) {
next = iter.next();
if (next.getSequenceId() <= this.readPoint) {
current = next;
return;// skip irrelevant versions
}
// for backwardSeek() stay in the boundaries of a single row
if (stopSkippingKVsIfNextRow &&
segment.compareRows(next, stopSkippingKVsRow) > 0) {
current = null;
return;
}
} // end of while
current = null; // nothing found
} finally {
if (next != null) {
// in all cases, remember the last KV we iterated to, needed for reseek()
last = next;
}
}
}
RegionScannerImpl
IsolationLevel isolationLevel = scan.getIsolationLevel();
long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan);
synchronized (scannerReadPoints) {
if (mvccReadPoint > 0) {
this.readPt = mvccReadPoint;
} else if (nonce == HConstants.NO_NONCE || rsServices == null
|| rsServices.getNonceManager() == null) {
this.readPt = getReadPoint(isolationLevel);
} else {
this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
}
scannerReadPoints.put(this, this.readPt);
}
HRegion
public long getReadPoint(IsolationLevel isolationLevel) {
if (isolationLevel != null && isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
// This scan can read even uncommitted transactions
return Long.MAX_VALUE;
}
return mvcc.getReadPoint();
}
StoreFileScanner
和SegmentScanner
在seek
的过程中会根据Cell
的sequenceId
和mvcc的readPoint
进行比较判断是否需要skip
该Cell
。
备注
hbase version
:2.1.7