zoukankan      html  css  js  c++  java
  • HBase行锁原理及实现

            请带着例如以下问题阅读本文。

           1、什么是行锁?

           2、HBase行锁的原理是什么?

           3、HBase行锁是怎样实现的?

           4、HBase行锁是怎样应用的?


            一、什么是行锁?

            我们知道。数据库中存在事务的概念。事务是作为单个逻辑工作单元运行的一系列操作,要么全然地运行,要么全然的不运行。

    而事务的四大特点即原子性、一致性、分离性和持久性。当中,原子性首当其冲。那么在HBase内部实现其原子性的重要保证是什么呢?答案就是行锁。

            什么是行锁呢?顾名思义。它就是加在行上的一把锁。在它未释放该行前。最起码其它訪问者是无法对该行做改动的,即要改动的话,必须得获得该行的锁才干拥有改动改行数据的权限,这就是行锁的含义。

            二、HBase行锁实现原理

            HBase行锁是利用Java并发包concurrent里的CountDownLatch(1)来实现的。

    它的主要思想就是在server端每一个訪问者单独一个数据处理线程,每一个处理线程针对特定行数据改动时必须获得该行的行锁,而其它client线程想要改动数据的话,必须等待前面的线程释放锁后才被同意,这就利用了Java并发包中的CountDownLatch。CountDownLatch为Java中的一个同步辅助类。在完毕一组正在其它线程中进行的操作之前,它同意一个或多个线程一直等待。

    这里,将线程数设置为1,十分巧妙的实现了独占锁的概念。

            三、HBase行锁的实现

            HBase的行锁主要是通过HRegion的两个内部类实现的,当中一个是RowLock,另外一个是RowLockContext。

            我们首先看RowLock这个类。其定义例如以下:

    /**
       * Row lock held by a given thread.
       * One thread may acquire multiple locks on the same row simultaneously.
       * The locks must be released by calling release() from the same thread.
       * 
       * 给定线程持有的行锁。
       * 一个线程能够同一时候获得同一行上的多个锁。
       * 锁必须被同样线程。通过调用release()释放。

    */ public static class RowLock { // 行锁上下文,持有锁定的行row、锁持有者线程thread、该行上锁的数目lockCount等内容 @VisibleForTesting final RowLockContext context; // 行锁是否被释放 private boolean released = false; // 构造函数 @VisibleForTesting RowLock(RowLockContext context) { this.context = context; } /** * Release the given lock. If there are no remaining locks held by the current thread * then unlock the row and allow other threads to acquire the lock. * 释放给定的锁。

    假设当前线程不再持有不论什么锁,那么对该行解锁并同意其它线程获得锁。 * @throws IllegalArgumentException if called by a different thread than the lock owning thread */ public void release() { if (!released) { context.releaseLock(); released = true; } } }

            通过上述源代码我们能够看到。行锁RowLock有两个成员变量。RowLockContext类型的行锁上下文context和布尔类型的行锁是否释放released。当中,行锁上下文context持有锁定的行row、锁持有者线程thread、该行上锁的数目lockCount等内容。而且,利用java的concurrent并发包里的CountDownLatch(1)实现了线程对对象的独占锁。

            RowLockContext的源代码例如以下:

      // 行锁上下文,包含指定的行row,同步计数器latch,锁的数目lockCount和线程thread
      @VisibleForTesting class RowLockContext {
        private final HashedBytes row;// 行
        private final CountDownLatch latch = new CountDownLatch(1);// 
        private final Thread thread;
        private int lockCount = 0;
    
        // 构造方法
        RowLockContext(HashedBytes row) {
          this.row = row;
          this.thread = Thread.currentThread();
        }
    
        // 推断是否为当前线程相应的行锁上下文
        boolean ownedByCurrentThread() {
          return thread == Thread.currentThread();
        }
    
        RowLock newLock() {
          lockCount++;
          return new RowLock(this);
        }
    
        void releaseLock() {
          if (!ownedByCurrentThread()) {
            throw new IllegalArgumentException("Lock held by thread: " + thread
              + " cannot be released by different thread: " + Thread.currentThread());
          }
          lockCount--;
          if (lockCount == 0) {
            // no remaining locks by the thread, unlock and allow other threads to access
            RowLockContext existingContext = lockedRows.remove(row);
            if (existingContext != this) {
              throw new RuntimeException(
                  "Internal row lock state inconsistent, should not happen, row: " + row);
            }
            
            // 同步计数器减1
            latch.countDown();
          }
        }
      }

            通过源代码我们能够看到,行锁的上下文信息,主要包含行锁相应的行row以及占用该行锁的线程thread。构造RowContext时,仅仅需传入行row就可以,占用的线程则通过Thread.currentThread()获得当前线程。

           新加锁时,通过调用newLock()方法就可以实现。首先锁的计数器lockCount加1,然后返回由当前RowContext构造RowLock实例就可以。

           释放锁时,通过调用releaseLock()方法就可以实现。首先通过ownedByCurrentThread()方法确保调用releaseLock()方法的当前线程是否和RowContext持有的线程一致,然后。锁的计数器lockCount减1,而且,假设lockCount为0的话。说明不再有操作占用该行,将row相应的行锁从数据结构lockedRows中删除,同意其它线程获得该行的行锁,最后,最重要的一步。latch.countDown(),就可完毕行锁的释放了。

            四、HBase行锁的使用

            以下,我们看下HBase行锁的使用。在涉及数据变更的操作。比方Put、Delete等中,在对一行数据操作之前。都会调用getRowLockInternal()方法,获得该行数据的行锁。代码例如以下:

    /**
       * A version of getRowLock(byte[], boolean) to use when a region operation has already been
       * started (the calling thread has already acquired the region-close-guard lock).
       */
      protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {
        
    	// 构造HashedBytes类型表示的行,rowKey
    	HashedBytes rowKey = new HashedBytes(row);
    	// 创建行锁上下文实例,并制定为行rowkey和当前线程拥有
        RowLockContext rowLockContext = new RowLockContext(rowKey);
    
        // loop until we acquire the row lock (unless !waitForLock)
        while (true) {
          // 将rowkey与行锁上下文的相应关系加入到Region的数据结构lockedRows中
          RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
          if (existingContext == null) {
            // Row is not already locked by any thread, use newly created context.
        	// 该行已经没有被不论什么线程锁住,使用这个新创建的上下文
            break;
          } else if (existingContext.ownedByCurrentThread()) {
            // Row is already locked by current thread, reuse existing context instead.
        	// 该行已经被当前线程锁住。复用当前线程之前创建的行锁上下文实例
            rowLockContext = existingContext;
            break;
          } else {
        	// 该行被其它线程锁住,假设不须要等待锁。直接返回null
            if (!waitForLock) {
              return null;
            }
            
            TraceScope traceScope = null;
            try {
              if (Trace.isTracing()) {
                traceScope = Trace.startSpan("HRegion.getRowLockInternal");
              }
              // Row is already locked by some other thread, give up or wait for it
              // 行已经被其它线程锁住,放弃或者等待
              // 等待rowLockWaitDuration时间后。假设还未获得行锁。直接抛出异常
              if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
                if(traceScope != null) {
                  traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
                }
                throw new IOException("Timed out waiting for lock for row: " + rowKey);
              }
              if (traceScope != null) traceScope.close();
              traceScope = null;
            } catch (InterruptedException ie) {
              LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
              InterruptedIOException iie = new InterruptedIOException();
              iie.initCause(ie);
              throw iie;
            } finally {
              if (traceScope != null) traceScope.close();
            }
          }
        }
    
        // allocate new lock for this thread
        return rowLockContext.newLock();
      }

            详细流程整理例如以下:

            1、利用byte[]类型的入參row构造HashedBytes类型表示的行,即rowKey;
            2、利用rowKey创建行锁上下文实例,并指定为行rowKey和当前线程拥有。
            3、循环:
                  3.1、将rowKey与行锁上下文的相应关系加入到Region的数据结构lockedRows中,可能出现下面几种情况:
                       3.1.1、假设lockedRows中之前不存在相应行rowKey的数据。说明该行当前没有被不论什么线程锁住,使用这个新创建的上下文rowLockContext,跳出循环并返回,说   明当前行可用。
                       3.1.2、假设该行已经被当前线程锁住,复用当前线程之前创建的行锁上下文实例,并赋值给rowLockContext,跳出循环并返回,说明当前行可用;
                       3.1.3、假设该行被其它线程锁住,假设入參确定不须要等待锁的获取,直接返回null,否则反复循环,直到等待rowLockWaitDuration时间后。假设还未获得行锁。   直接抛出异常。

            至于都是哪些地方须要获取行锁,在以后各种数据读写流程中再做分析吧~

  • 相关阅读:
    QFramework 使用指南 2020(二):下载与版本介绍
    QFramework 使用指南 2020 (一): 概述
    Unity 游戏框架搭建 2018 (二) 单例的模板与最佳实践
    Unity 游戏框架搭建 2018 (一) 架构、框架与 QFramework 简介
    Unity 游戏框架搭建 2017 (二十三) 重构小工具 Platform
    Unity 游戏框架搭建 2017 (二十二) 简易引用计数器
    Unity 游戏框架搭建 2017 (二十一) 使用对象池时的一些细节
    你确定你会写 Dockerfile 吗?
    小白学 Python 爬虫(8):网页基础
    老司机大型车祸现场
  • 原文地址:https://www.cnblogs.com/wzjhoutai/p/7118318.html
Copyright © 2011-2022 走看看