zoukankan      html  css  js  c++  java
  • HBase MVCC 代码阅读(一)

    MultiVersionConcurrencyControl.java,版本 0.94.1

    MultiVersionConsistencyControl 管理 memstore 中的读写一致性。该类实现了一种机制,达到如下的目的:

    • 提供接口让 reader 知道可以忽略哪些元素项
    • 提供一个新的 WriteNumber 给 writer
    • 将写更新提供给 reader(通过原子事务)

    1 变量

    主要包含两个变量:

    • memstoreRead 前面文章中提到的 ReadPoint
    • memstoreWrite 前面文章中提到的 WriteNumber

    变量描述private volatile long,私有,长整形,volatile 标记,线程间可见。

    readWriters 私有 object 常量,加锁时候用。

    writeQueue 写操作队列,默认构造函数初始化的LinkedList,元素类型为 WriterEntry。注意,LinkedList 这个双向链表数据结构,适合随机增、删,但不是线程安全的。

    perThreadReadPoint 线程局部变量,保存当前线程获取的 readPoint,static final,ThreadLocal,用 Long.MAX_VALUE初始化。

    FIXED_SIZE 一个 MVCC 对象占用内存空间的大小,包括:一个锁对象 readWriters,两个 long 型变量 memstoreRead 和 memstoreWrite,两个应用对象的地址:writeQueue 和 perThreadReadPoint。

    2 内嵌类

    WriteEntry 每个 writer 持有一个,保存 writeNumber,并用一个 completed 标记是否写完。

    实际上就是对理论中 writeNumber 的包装:通过 writeNumber 对应到一个writer,通过 completed 标记 writer 是否写完。

    3 方法

    3.1 构造方法

    • public MultiVersionConsistencyControl();

    唯一的一个构造方法,无参数,将 memstoreRead 和 memstoreWrite 初始化为0。

    两个变量的值一般不会为0,后续会用初始化方法,将两个变量的值设置为一个正常值。

    3.2 初始化方法

    • public void initialize(long startPoint);

    初始化 memstoreRead 和 memstoreWrite 。

    在初始化的时候,会用 synchronized 对 writeQueue 加锁。

    判断 memstoreWrite 是否和 memstoreRead 相同,如果两者不同,认为这个是已经用过的 MVCC 对象,不需要再做初始化;如果相同,则将 memstoreRead 和 memstoreWriet 设置为输入参数。

    initialize 方法唯一一处引用在 org.apache.hadoop.hbase.regionserver.HRegion.initializeRegionInternals方法中。
    HRegion 包含一个私有的常量 mvcc ,对象声明的时候用 MVCC 的缺省构造方法初始化。HRegion 的 org.apache.hadoop.hbase.regionserver.HRegion.initialize 对 HRegion 初始化的时候,调用了 initialiazeRegionInterals,初始化 Region 的内部对象,把 mvcc 初始化为一个合理的值。

    所以,上面 MVCC 初始化方法的判断逻辑是 memstoreRead == memstoreWrite 时候才有必要进行初始化。

    3.3 memstoreRead相关

    在实现的时候,ReadPoint 包含两类,一个是全局的,由 memstoreRead 记录的,这个是所有线程均可见、可改的,一个是局部的, ThreadLocal 的 readPoint,每个线程有一个自己的副本,且这个不保证与全局的 readPoint 一样。在代码实现的时候,需要对这两类 readPoint 小心处理。

    3.3.1 memstoreReadPoint

    public long memstoreReadPoint();

    返回当前 MVCC 记录的 memstoreRead。在两处被用到:

    一处是 MVCC 的 resetThreadReadPoint,用 mvcc 的全局 readpoint 更新线程局部的 readPoint。

    一处是在 HRegion 类中,getSmallestReadPoint 方法,比较该 Region 上所有 scanner 持有的 readPoint,以及 Region MVCC 对象的 readPoint,在这所有中找到一个最小的。

    3.3.2 ThreadReadPoint

    ThreadReadPoint 相关4个方法,均标记为 static

    • public static long getThreadReadPoint();

    获取线程局部 readPoint,主要由 memstore scanner 调用,以确认跳过哪些值。

    分别在 org.apahce.hadoop.hbase.regionserver.MemStore.MemStoreScanner.getNext方法和 org.apahce.hadoop.hbase.regionserver.StoreFileScanner.skipKVsNewerThanReadpoint方法中被调用。

    getNext方法中,获取线程当前的 readPoint,scanner 在扫描的时候,所有比 readPonit 老的值才可以被读到。

    skipKVsNewerThanReadpoint方法中则想法,获取线程当前的 readPoint 后,把所有比 readPoint 新的 KV 对象会被跳过。

    • public static void setThreadReadPoint(long readPoint);

    设置线程局部 readPoint。

    主要在两个地方用到:

    org.apahce.hadoop.hbase.regionserver.HRegion.RegionScannerImpl

    配合对象自己的 readPt 使用。

    构造方法,根据隔离级别参数,如果为 READ_UNCOMMITED,将线程局部 readPoint 设置为 long.MAX_VALUE。

    next()方法, 带 synchronized 标记,在开始取数据之前,先用 readPt 更新线程局部 readPoint,然后再取值。

    reseek()方法,带 synchronized 标记,重新定位之前,用 readPt 更新线程局部的 readPoint 标记。

    org.apahce.hadoop.hbase.regionserver.Store

    compactStore方法。如前面理论描述中提到的,先获取当前 region 所有读者中持有的最老的 readPoint,用 setThreadReadPoint 方法更新线程局部 readPoint。后续做 compact,以这个smallestReadPoint 为基准。

    由上可见,线程局部维护的 ThreadReadPoint 不自动更新,可以随时被读取,但是在所有写操作之前,需要对线程局部的 ThreadReadPoint 更新,写入一个确定的值,以确认最新。

    • public static void resetThreadReadPoint();

    将线程局部 readPoint 重置为0。Find Usages 工具发现该方法实际未被使用。

    • public static long resetThreadReadPoint(MultiVersionConsistencyControl mvcc);

    用一个 MVCC 对象的 memstoreReadPoint 更新当前线程的局部 readPoint,并返回更新后的局部 readPoint。

    3.4 memstoreWrite相关

    涉及4个方法,入口是 beginMemstoreInsert()completeMemstoreInsert()completeMemstoreInsert()在实现的时候,先调用 advanceMemstore() 后调用 waitForRead(),以下依次说明。

    • public WriteEntry beginMemstoreInsert();

    获取一个最新的 writerNumber,核心就是干这一件事。

    对 writeQueue 加锁,获取一个新的 memstoreWrite,利用新的 memstore 新建个 WriteEntry 对象,并添加到 writeQueue 链表中,最后将新建的 WriteEntry 对象返回。

    该方法的调用都在 HRegion 类中:

    1. applyFamilyMapToMemstore 传入参数包含 WriteEntry 对象,如果该对象为 null,则调用 beginMemstoreInsert() 会创建一个 WriterEntry 且已加入 writeQueue。

    2. doMiniBatchMutation 批量修改,put 或者 delete。通过 beginMemstoreInsert() 获取一个包含最新的 WriteNumber 的 WriteEntry

    3. internalFlushcache flush 的实现方法,获取最新的 writeNumber

    4. mutateRowsWithLocks region 内的原子修改,获取最新的 writeNumber

    • public void completeMemstoreInsert(WriteEntry e);

    按序调用后面两个方法,没有其他。

    • boolean advanceMemstore(WriteEntry e);

    "从头开始遍历writeQueue,移除所有已完成的WriteEntry对象,最后将memstoreRead更新为最新已完成的memstoreWrite;"

      boolean advanceMemstore(WriteEntry e) {
        synchronized (writeQueue) {
          e.markCompleted();
    
          long nextReadValue = -1;
          boolean ranOnce=false;
          while (!writeQueue.isEmpty()) {
            ranOnce=true;
            WriteEntry queueFirst = writeQueue.getFirst();
    
            if (nextReadValue > 0) {
              if (nextReadValue+1 != queueFirst.getWriteNumber()) {
                throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
                    + nextReadValue + " next: " + queueFirst.getWriteNumber());
              }
            }
    
            if (queueFirst.isCompleted()) {
              nextReadValue = queueFirst.getWriteNumber();
              writeQueue.removeFirst();
            } else {
              break;
            }
          }
    
          if (!ranOnce) {
            throw new RuntimeException("never was a first");
          }
    
          if (nextReadValue > 0) {
            synchronized (readWaiters) {
              memstoreRead = nextReadValue;
              readWaiters.notifyAll();
            }
          }
          if (memstoreRead >= e.getWriteNumber()) {
            return true;
          }
          return false;
        }
      }
    
    • public void waitForRead(WriteEntry e);

    等待全局的 readPoint 更新到当前 writer 的事务号。"阻塞当前线程,直到memstoreRead等于当前WriteEntry的memstoreWrite,至此表明当前WriteEntry之前的所有更新事务都已经完成"

    正如之前 解释 HBase MVCC 实现原理的时候提到的,所有事务号小于 readPoint 的事务,被认为是"确定安全"了,相关的线程也就可以被释放了。

      public void waitForRead(WriteEntry e) {
        boolean interrupted = false;
        synchronized (readWaiters) {
          while (memstoreRead < e.getWriteNumber()) {
            try {
              readWaiters.wait(0);
            } catch (InterruptedException ie) {
              // We were interrupted... finish the loop -- i.e. cleanup --and then
              // on our way out, reset the interrupt flag.
              interrupted = true;
            }
          }
        }
        if (interrupted) Thread.currentThread().interrupt();
      }
    

    4 最新版本

    MVCC 最新版本的代码参见:MultiVersionConcurrencyControl.java

    从变量到方法都有了明显的变化,下一篇专门比较下。

    参考

    1. HBase中MVCC的实现机制及应用情况
  • 相关阅读:
    算法学习算法复杂度
    算法学习冒泡排序和快速排序
    焦点管理
    数据结构学习链表、双向链表、循环链表
    数据结构学习数组、栈和队列
    数据结构学习集合
    数据结构学习字典和散列表
    (转)一个webservice的小demo
    堆栈详解
    JS实现大小写转换
  • 原文地址:https://www.cnblogs.com/YFYkuner/p/5206482.html
Copyright © 2011-2022 走看看