zoukankan      html  css  js  c++  java
  • HBase行锁原理及实现

      hbase mutation操作,比如delete put等,都需要先获取行锁,然后再进行操作,在获取行锁时,是通过HRegion.getRowLockInternal(byte[] row, boolean waitForLock)进行的,因此,我们先大体浏览一下这个方法的流程,如下。可以看到,该方法中主要涉及到行锁相关的内容为RowLock和RowLockContext两个类。这两个都是HRegion的内部类,下面详细看一下这两个类是咋实现的。

    protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {
        HashedBytes rowKey = new HashedBytes(row);
        RowLockContext rowLockContext = new RowLockContext(rowKey);
    
        // loop until we acquire the row lock (unless !waitForLock)
        while (true) {
          RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
          if (existingContext == null) {
            // Row is not already locked by any thread, use newly created context.
            break;
          } else if (existingContext.ownedByCurrentThread()) {
            // Row is already locked by current thread, reuse existing context instead.
            rowLockContext = existingContext;
            break;
          } else {
            if (!waitForLock) {
              return null;
            }
            try {
              // Row is already locked by some other thread, give up or wait for it
              if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
                throw new IOException("Timed out waiting for lock for row: " + rowKey);
              }
            } catch (InterruptedException ie) {
              LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
              InterruptedIOException iie = new InterruptedIOException();
              iie.initCause(ie);
              throw iie;
            }
          }
        }
    
        // allocate new lock for this thread
        return rowLockContext.newLock();
      }
    

      首先看RowLock类,该类主要逻辑是release方法,是用来释放行锁的。同时有一个布尔类型参数release,默认为false,代表该行锁是否被释放掉了。

    public static class RowLock {
        @VisibleForTesting final RowLockContext context;
        private boolean released = false;
    
        @VisibleForTesting RowLock(RowLockContext context) {
          this.context = context;
        }
    
        /**
         * Release the given lock.  If there are no remaining locks held by the current thread
         * then unlock the row and allow other threads to acquire the lock.
         * @throws IllegalArgumentException if called by a different thread than the lock owning thread
         */
        public void release() {
          if (!released) {
            context.releaseLock();
            released = true;
          }
        }
      }
    

      但是在RowLock中,并没有看到实际涉及到锁的信息,这是咋回事呢,别急,细细看下release方法,里面有一个context,是RowLockContext类型。同时其构造方法中也传了一个context对象,因此怀疑是在RowLockContext中new出了一个rowlock,进RowLockContext中看下:

    @VisibleForTesting class RowLockContext {
        private final HashedBytes row;
      //通过计数以及CountDownLatch实现对行锁的condition。这里之所以将countdownlatch设置为一,是因为hbase自己也不知道到底有多少condition来竞争锁,所以加一个计数lockCount,
      //当lockCount为零时,再把latch.coutDown。否则会在getRowLockInternal中await。
    private final CountDownLatch latch = new CountDownLatch(1); private final Thread thread; private int lockCount = 0; RowLockContext(HashedBytes row) { this.row = row; this.thread = Thread.currentThread(); } boolean ownedByCurrentThread() { return thread == Thread.currentThread(); } RowLock newLock() { lockCount++; return new RowLock(this); } void releaseLock() { if (!ownedByCurrentThread()) { throw new IllegalArgumentException("Lock held by thread: " + thread + " cannot be released by different thread: " + Thread.currentThread()); } lockCount--; if (lockCount == 0) { // no remaining locks by the thread, unlock and allow other threads to access RowLockContext existingContext = lockedRows.remove(row); if (existingContext != this) { throw new RuntimeException( "Internal row lock state inconsistent, should not happen, row: " + row); } latch.countDown(); } } }

      通过计数以及CountDownLatch实现对行锁的condition。这里之所以将countdownlatch设置为一,是因为hbase自己也不知道到底有多少condition来竞争锁,所以加一个计数lockCount,
    当lockCount为零时,再把latch.coutDown。否则会在getRowLockInternal中await。
      在HRegion中还有一个关键的成员变量: lockedrows,用来存储当前已经获取了行锁的所有行信息,key为rowkey,value为RowLockContext。
    // map from a locked row to the context for that lock including:
      // - CountDownLatch for threads waiting on that row
      // - the thread that owns the lock (allow reentrancy)
      // - reference count of (reentrant) locks held by the thread
      // - the row itself
      private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
          new ConcurrentHashMap<HashedBytes, RowLockContext>();
      好啦,行锁涉及到的内容,我们都大体浏览了,再从getRowLockInternal中开始通一遍逻辑:
    1. 根据rowkey构建RowLockContext对象
    2. while循环,直到获取到行锁,或者wait超时
      1. 首先判断lockedrows中是否有该rowkey的行锁信息,此处利用的是concurrentMap的putIfAbsent
        1. 如果不存在,以为着这行锁还没有其他线程拿到,将行锁信息加入到lockedrows中,直接break跳出循环,然后now一个行锁。
        2. 如果存在,则以为着该行锁已经被占有了,逻辑如下
          1. 判断持有该行锁的线程是否是自己本身,如果是,则直接覆盖rowLockContext,跳出循环
          2. 判断是否需要wait 行锁,通过参数waitForLock,如果不wait直接return;如果wait,则调用latch.await等待,如果超时则抛出异常。
      2. 如果跳出了循环,则意味着获取成功,则newLock并返回。

       上面是获取行锁的流程,释放行锁呢,是通过HRegion的releaseRowLocks方式实现,我们看下代码:

     /**
       * If the given list of row locks is not null, releases all locks.
       */
      public void releaseRowLocks(List<RowLock> rowLocks) {
        if (rowLocks != null) {
          for (RowLock rowLock : rowLocks) {
            rowLock.release();
          }
          rowLocks.clear();
        }
      }
    

      可见是调用RowLock.release实现,该方法代码在上面有,具体的逻辑如下:

          在lockedrows中将该行锁删除。

          判断release是否为false,如果为false,则调用context.releaseLock,context.releaseLock逻辑如下

            首先判断释放该行锁的线程是否是该行锁的持有者,若不是则抛出异常

            将count--;

            如果count==0了,则直接调用latch.countDown,这个方法会触发其他线程去获取行锁。当count==0了也就是说该线程已经不需要改行锁,已经释放

            将release设置为true。

    注意

    这里在getRowLockInternal中,只要lockedRows.putIfAbsent(rowKey, rowLockContext)成功,其他线程将不会获取成功,由concurrentMap保证。

  • 相关阅读:
    git 项目代码打包
    jira查看字段
    jmeter压力测试报错:java.net.BindException: Address already in use: connect解决办法
    python 破解验证码
    mysql授权远程登录
    豆瓣api
    利用python开发财务工具
    钉钉发送消息通知
    git使用命令行自动登录
    后宫
  • 原文地址:https://www.cnblogs.com/Evil-Rebe/p/11323010.html
Copyright © 2011-2022 走看看