zoukankan      html  css  js  c++  java
  • HBase行锁原理及实现

            请带着例如以下问题阅读本文。

           1、什么是行锁?

           2、HBase行锁的原理是什么?

           3、HBase行锁是怎样实现的?

           4、HBase行锁是怎样应用的?


            一、什么是行锁?

            我们知道。数据库中存在事务的概念。事务是作为单个逻辑工作单元运行的一系列操作,要么全然地运行,要么全然的不运行。

    而事务的四大特点即原子性、一致性、分离性和持久性。当中,原子性首当其冲。那么在HBase内部实现其原子性的重要保证是什么呢?答案就是行锁。

            什么是行锁呢?顾名思义。它就是加在行上的一把锁。在它未释放该行前。最起码其它訪问者是无法对该行做改动的,即要改动的话,必须得获得该行的锁才干拥有改动改行数据的权限,这就是行锁的含义。

            二、HBase行锁实现原理

            HBase行锁是利用Java并发包concurrent里的CountDownLatch(1)来实现的。

    它的主要思想就是在server端每一个訪问者单独一个数据处理线程,每一个处理线程针对特定行数据改动时必须获得该行的行锁,而其它client线程想要改动数据的话,必须等待前面的线程释放锁后才被同意,这就利用了Java并发包中的CountDownLatch。CountDownLatch为Java中的一个同步辅助类。在完毕一组正在其它线程中进行的操作之前,它同意一个或多个线程一直等待。

    这里,将线程数设置为1,十分巧妙的实现了独占锁的概念。

            三、HBase行锁的实现

            HBase的行锁主要是通过HRegion的两个内部类实现的,当中一个是RowLock,另外一个是RowLockContext。

            我们首先看RowLock这个类。其定义例如以下:

    /**
       * Row lock held by a given thread.
       * One thread may acquire multiple locks on the same row simultaneously.
       * The locks must be released by calling release() from the same thread.
       * 
       * 给定线程持有的行锁。
       * 一个线程能够同一时候获得同一行上的多个锁。
       * 锁必须被同样线程。通过调用release()释放。

    */ public static class RowLock { // 行锁上下文,持有锁定的行row、锁持有者线程thread、该行上锁的数目lockCount等内容 @VisibleForTesting final RowLockContext context; // 行锁是否被释放 private boolean released = false; // 构造函数 @VisibleForTesting RowLock(RowLockContext context) { this.context = context; } /** * Release the given lock. If there are no remaining locks held by the current thread * then unlock the row and allow other threads to acquire the lock. * 释放给定的锁。

    假设当前线程不再持有不论什么锁,那么对该行解锁并同意其它线程获得锁。 * @throws IllegalArgumentException if called by a different thread than the lock owning thread */ public void release() { if (!released) { context.releaseLock(); released = true; } } }

            通过上述源代码我们能够看到。行锁RowLock有两个成员变量。RowLockContext类型的行锁上下文context和布尔类型的行锁是否释放released。当中,行锁上下文context持有锁定的行row、锁持有者线程thread、该行上锁的数目lockCount等内容。而且,利用java的concurrent并发包里的CountDownLatch(1)实现了线程对对象的独占锁。

            RowLockContext的源代码例如以下:

      // 行锁上下文,包含指定的行row,同步计数器latch,锁的数目lockCount和线程thread
      @VisibleForTesting class RowLockContext {
        private final HashedBytes row;// 行
        private final CountDownLatch latch = new CountDownLatch(1);// 
        private final Thread thread;
        private int lockCount = 0;
    
        // 构造方法
        RowLockContext(HashedBytes row) {
          this.row = row;
          this.thread = Thread.currentThread();
        }
    
        // 推断是否为当前线程相应的行锁上下文
        boolean ownedByCurrentThread() {
          return thread == Thread.currentThread();
        }
    
        RowLock newLock() {
          lockCount++;
          return new RowLock(this);
        }
    
        void releaseLock() {
          if (!ownedByCurrentThread()) {
            throw new IllegalArgumentException("Lock held by thread: " + thread
              + " cannot be released by different thread: " + Thread.currentThread());
          }
          lockCount--;
          if (lockCount == 0) {
            // no remaining locks by the thread, unlock and allow other threads to access
            RowLockContext existingContext = lockedRows.remove(row);
            if (existingContext != this) {
              throw new RuntimeException(
                  "Internal row lock state inconsistent, should not happen, row: " + row);
            }
            
            // 同步计数器减1
            latch.countDown();
          }
        }
      }

            通过源代码我们能够看到,行锁的上下文信息,主要包含行锁相应的行row以及占用该行锁的线程thread。构造RowContext时,仅仅需传入行row就可以,占用的线程则通过Thread.currentThread()获得当前线程。

           新加锁时,通过调用newLock()方法就可以实现。首先锁的计数器lockCount加1,然后返回由当前RowContext构造RowLock实例就可以。

           释放锁时,通过调用releaseLock()方法就可以实现。首先通过ownedByCurrentThread()方法确保调用releaseLock()方法的当前线程是否和RowContext持有的线程一致,然后。锁的计数器lockCount减1,而且,假设lockCount为0的话。说明不再有操作占用该行,将row相应的行锁从数据结构lockedRows中删除,同意其它线程获得该行的行锁,最后,最重要的一步。latch.countDown(),就可完毕行锁的释放了。

            四、HBase行锁的使用

            以下,我们看下HBase行锁的使用。在涉及数据变更的操作。比方Put、Delete等中,在对一行数据操作之前。都会调用getRowLockInternal()方法,获得该行数据的行锁。代码例如以下:

    /**
       * A version of getRowLock(byte[], boolean) to use when a region operation has already been
       * started (the calling thread has already acquired the region-close-guard lock).
       */
      protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {
        
    	// 构造HashedBytes类型表示的行,rowKey
    	HashedBytes rowKey = new HashedBytes(row);
    	// 创建行锁上下文实例,并制定为行rowkey和当前线程拥有
        RowLockContext rowLockContext = new RowLockContext(rowKey);
    
        // loop until we acquire the row lock (unless !waitForLock)
        while (true) {
          // 将rowkey与行锁上下文的相应关系加入到Region的数据结构lockedRows中
          RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
          if (existingContext == null) {
            // Row is not already locked by any thread, use newly created context.
        	// 该行已经没有被不论什么线程锁住,使用这个新创建的上下文
            break;
          } else if (existingContext.ownedByCurrentThread()) {
            // Row is already locked by current thread, reuse existing context instead.
        	// 该行已经被当前线程锁住。复用当前线程之前创建的行锁上下文实例
            rowLockContext = existingContext;
            break;
          } else {
        	// 该行被其它线程锁住,假设不须要等待锁。直接返回null
            if (!waitForLock) {
              return null;
            }
            
            TraceScope traceScope = null;
            try {
              if (Trace.isTracing()) {
                traceScope = Trace.startSpan("HRegion.getRowLockInternal");
              }
              // Row is already locked by some other thread, give up or wait for it
              // 行已经被其它线程锁住,放弃或者等待
              // 等待rowLockWaitDuration时间后。假设还未获得行锁。直接抛出异常
              if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
                if(traceScope != null) {
                  traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
                }
                throw new IOException("Timed out waiting for lock for row: " + rowKey);
              }
              if (traceScope != null) traceScope.close();
              traceScope = null;
            } catch (InterruptedException ie) {
              LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
              InterruptedIOException iie = new InterruptedIOException();
              iie.initCause(ie);
              throw iie;
            } finally {
              if (traceScope != null) traceScope.close();
            }
          }
        }
    
        // allocate new lock for this thread
        return rowLockContext.newLock();
      }

            详细流程整理例如以下:

            1、利用byte[]类型的入參row构造HashedBytes类型表示的行,即rowKey;
            2、利用rowKey创建行锁上下文实例,并指定为行rowKey和当前线程拥有。
            3、循环:
                  3.1、将rowKey与行锁上下文的相应关系加入到Region的数据结构lockedRows中,可能出现下面几种情况:
                       3.1.1、假设lockedRows中之前不存在相应行rowKey的数据。说明该行当前没有被不论什么线程锁住,使用这个新创建的上下文rowLockContext,跳出循环并返回,说   明当前行可用。
                       3.1.2、假设该行已经被当前线程锁住,复用当前线程之前创建的行锁上下文实例,并赋值给rowLockContext,跳出循环并返回,说明当前行可用;
                       3.1.3、假设该行被其它线程锁住,假设入參确定不须要等待锁的获取,直接返回null,否则反复循环,直到等待rowLockWaitDuration时间后。假设还未获得行锁。   直接抛出异常。

            至于都是哪些地方须要获取行锁,在以后各种数据读写流程中再做分析吧~

  • 相关阅读:
    ACwing(基础)--- 树状数组
    ACwing(基础)--- 快速幂
    Oracle for loop 循环
    Oracle 为表增加时间戳字段
    Oracle Materialized View 物化视图
    Splunk DBConnect使用
    Splunk 过滤接入数据
    Python 协程库 asyncio 的简单理解和使用
    Python 正则使用 备查
    Splunk 数据接入 创建索引接收数据
  • 原文地址:https://www.cnblogs.com/wzjhoutai/p/7118318.html
Copyright © 2011-2022 走看看