zoukankan      html  css  js  c++  java
  • 深入浅出多线程——ReentrantLock (一)

      ReentrantLock是一个排它重入锁,与synchronized关键字语意类似,但比其功能更为强大。该类位于java.util.concurrent.locks包下,是Lock接口的实现类。基本用法如下:

    class X {
        private final ReentrantLock lock = new ReentrantLock();
        // ...
     
        public void m() {
          lock.lock();  // block until condition holds
          try {
            // ... method body
          } finally {
            lock.unlock()
          }
        }
      }

      本文章会围绕核心方法lock(),unlock()进行分析。在开始之前,对部分概念进行阐述:

      1,RenntrantLock是一个排它重入锁,重入次数为Integer.MAX_VALUE,其中通过构造实现两大核心(公平锁,非公平锁)。在默认情况下是非公平锁。

      2,RenntrantLock的公平锁和非公平锁基于抽象类AbstractQueuedSynchornizer,简称AQS。在源码分析阶段,也会涉及该类相关的原理分析。更加详细的会在后续文章中单独说明。

      3,AQS中涉及到了大量的Compare and swap操作,简称CAS。CAS利用的是cpu级别原子指令无锁的去修改目标值,在并发场景下只会有一个成功。在java中有大量的应用,其中最经典的为java.util.concurrent.atomic包下的相关类。更加详细的阐述会在后续文章中单独说明。

    原理分析

      我们按照默认的不公平锁为例子进行深入。

    lock()方法分析

    NonfairSync.lock()
    1 final void lock() {
    2     if (compareAndSetState(0, 1))
    3         setExclusiveOwnerThread(Thread.currentThread());
    4     else
    5         acquire(1);
    6 }

      首先去尝试将state值从0改为1,如果修改成功,把该线程设置为Owner。因为用CAS的方式去修改这个值,在并发环境下只会有一个成功。不成功的则进入acquire(1)方法。

    AbstractQueuedSynchronizer.acquire(int)
    1     public final void acquire(int arg) {
    2         if (!tryAcquire(arg) &&
    3             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    4             selfInterrupt();
    5     }

      该方法,首先在此尝试修改state的值,尽量用最小的代价设置成功。

    具体方法代码如下:

    NonfairSync.tryAcquire(int)
    1         protected final boolean tryAcquire(int acquires) {
    2             return nonfairTryAcquire(acquires);
    3         }

    直接调用了nonfairTryAcquire(acquires)。如下:

    Sync.nonfairTryAcquire(int)
     1         final boolean nonfairTryAcquire(int acquires) {
     2             final Thread current = Thread.currentThread();
     3             int c = getState();
     4             if (c == 0) {
     5                 if (compareAndSetState(0, acquires)) {
     6                     setExclusiveOwnerThread(current);
     7                     return true;
     8                 }
     9             }
    10             else if (current == getExclusiveOwnerThread()) {
    11                 int nextc = c + acquires;
    12                 if (nextc < 0) // overflow
    13                     throw new Error("Maximum lock count exceeded");
    14                 setState(nextc);
    15                 return true;
    16             }
    17             return false;
    18         }

      首先判断state的值是否为0。如果为0,则尝试修改state为1,如果设置成功,则将执行线程Owner为当前线程。如果state不为0,则判断当前线程是否与执行线程Owner一致。如果一致则只对state加1,这个地方实现了类似偏向锁。

      如果条件都不满足,返回false,则执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法。从里往外看,

      首先调用的是addWaiter(Node.EXCLUSIVE)方法,该方法的参数为Node.EXCLUSIVE,表示为队列为独占模式。

    AbstractQueuedSynchronizer.addWaiter(Node)
     1     private Node addWaiter(Node mode) {
     2         Node node = new Node(Thread.currentThread(), mode);
     3         // Try the fast path of enq; backup to full enq on failure
     4         Node pred = tail;
     5         if (pred != null) {
     6             node.prev = pred;
     7             if (compareAndSetTail(pred, node)) {
     8                 pred.next = node;
     9                 return node;
    10             }
    11         }
    12         enq(node);
    13         return node;
    14     }

      第2行代码是 创建了一个独占模式的队列节点node,通过node实现可以看出是双向链表数据结构。

      判断列队pred是否为空,如果不为空,则node的节点prev变量设置为pred。尝试去修改列队tail的值为node,如果成功则直接返回。

      因为第一次进入,tail肯定为空,直接执行enq(node)方法。

    AbstractQueuedSynchronizer.enq(Node)
     1     private Node enq(final Node node) {
     2         for (;;) {
     3             Node t = tail;
     4             if (t == null) { // Must initialize
     5                 if (compareAndSetHead(new Node()))
     6                     tail = head;
     7             } else {
     8                 node.prev = t;
     9                 if (compareAndSetTail(t, node)) {
    10                     t.next = node;
    11                     return t;
    12                 }
    13             }
    14         }
    15     }

       enq(node)方法进来是一个自旋操作,一段很经典的代码。

      首先判断tail是否为空,因为第一次进入肯定为空。那么实例化一个空节点,将队列head,tail指向该空节点。完成该动作后再次自旋,此时tail肯定是不为空的,则直接执行else内容。

      首先将node节点的上游指向tail后利用cas将队列tail设置为node,然后将原先的tail(t)的next指向node,此时node节点成功加入列队中。

      再次回到acquire(1)方法,执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法。

    AbstractQueuedSynchronizer.acquireQueued(Node,int)
     1     final boolean acquireQueued(final Node node, int arg) {
     2         boolean failed = true;
     3         try {
     4             boolean interrupted = false;
     5             for (;;) {
     6                 final Node p = node.predecessor();
     7                 if (p == head && tryAcquire(arg)) {
     8                     setHead(node);
     9                     p.next = null; // help GC
    10                     failed = false;
    11                     return interrupted;
    12                 }
    13                 if (shouldParkAfterFailedAcquire(p, node) &&
    14                     parkAndCheckInterrupt())
    15                     interrupted = true;
    16             }
    17         } finally {
    18             if (failed)
    19                 cancelAcquire(node);
    20         }
    21     }

       该方法进来也是一个自旋操作,与enq方法类似。

      第6行,node的上游此时指向的是空节点,虽然和head相等,但是由于是空线程,那么在执行tryAcquire(arg)方法肯定返回false。

      代码直接来到了第13行,shouldParkAfterFailedAcquire(p,node)方法先执行,如下:

    AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(Node,Node)
     1     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     2         int ws = pred.waitStatus;
     3         if (ws == Node.SIGNAL)
     4             return true;
     5         if (ws > 0) {
     6             do {
     7                 node.prev = pred = pred.prev;
     8             } while (pred.waitStatus > 0);
     9             pred.next = node;
    10         } else {
    11             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    12         }
    13         return false;
    14     }

      首先判断node的上游节点的等待状态是否为-1,因为node的上游节点是空对象,waitStatus为初始值0。该方法会直接返回运行else里面内容,将waitStatus修改为-1。

      通过acquireQueued自旋会再次来到该方法,此时waitStatus的值为-1,返回true。然后执行第二个方法parkAndCheckInterrupt()方法。如下:

    AbstractQueuedSynchronizer.parkAndCheckInterrupt()
    1     private final boolean parkAndCheckInterrupt() {
    2         LockSupport.park(this);
    3         return Thread.interrupted();
    4     }

       进入该方法直接调用LockSupport.park(this)方法,意思是将该线程直接暂停,其线程状态在Runnable变为WAITING。等待调用LockSupport.unpark(this)将其唤醒,再次进入acquireQueued的自旋当中,直至能成功的把state的值从0变为1为止,当修改成功后,将队列的head设置为当前node。

    unlock()方法分析

    ReentrantLock.lock()
    1     public void unlock() {
    2         sync.release(1);
    3     }

      这个没啥好说的,直接调用了AQS里面的release方法了。

    AbstractQueuedSynchronizer.release(int)
    1     public final boolean release(int arg) {
    2         if (tryRelease(arg)) {
    3             Node h = head;
    4             if (h != null && h.waitStatus != 0)
    5                 unparkSuccessor(h);
    6             return true;
    7         }
    8         return false;
    9     }

       首先调用tryRelease(int)方法,尝试去释放该锁。如果释放成功,则进入if方法体,首先判断队列的head不为空,在判断head.waitStatus不为0(当前实际值为-1),则调用unparkSuccessor(Node)。

    Sync.tryRelease(int)
     1         protected final boolean tryRelease(int releases) {
     2             int c = getState() - releases;
     3             if (Thread.currentThread() != getExclusiveOwnerThread())
     4                 throw new IllegalMonitorStateException();
     5             boolean free = false;
     6             if (c == 0) {
     7                 free = true;
     8                 setExclusiveOwnerThread(null);
     9             }
    10             setState(c);
    11             return free;
    12         }

      首先将state减1后,判断是否为0,如果为0,则正式释放,如果不为0,仅仅将state的值更新。在这个方法可以反映出 lock()调用几次,必须有相应的unlock()调用次数,否则造成死锁。

    AbstractQueuedSynchronizer.unparkSuccessor(int)
     1     private void unparkSuccessor(Node node) {
     2 
     3         int ws = node.waitStatus;
     4         if (ws < 0)
     5             compareAndSetWaitStatus(node, ws, 0);
     6 
     7         Node s = node.next;
     8         if (s == null || s.waitStatus > 0) {
     9             s = null;
    10             for (Node t = tail; t != null && t != node; t = t.prev)
    11                 if (t.waitStatus <= 0)
    12                     s = t;
    13         }
    14         if (s != null)
    15             LockSupport.unpark(s.thread);
    16     }

      首先判断node.waitStatus是否小于0,如果小于0,则将state变量修改为0。其次获取到node的下游节点,如果下游节点s 为空或者被取消,则从队列尾部向前查找符合条件的节点。如果不为空或者未被取消,则调用LockSupport.unpark(s.thread)将其唤醒。

     总结

      通过分析lock()和unlock(),我们得知AQS内部实现了一个基于双向链表的队列。发成资源竞争时,因为CAS的特性只会有一个成功,其他的均进入该队列,有点类似于synchronized的临界区。

      执行中的线程再次调用lock()时,并不会进入等待列队,而是将state加1继续执行,基于偏向锁的思想去实现的。

      在线程释放时,也要对应着将state进行每次减1。直到state值为0,才认为当前线程真正的释放。释放后调用当前线程的下游节点去执行,此时,因为是非公平锁的缘故,可能新加入的线程在当前线程释放时征用成功,state值又变为1。那当前线程的下游节点再次陷入WAITING状态。

    喜欢技术、崇尚技术,更敬畏技术!
  • 相关阅读:
    常见的eclipse和真机出现的问题
    volley+okhttp封装,一行代码就可访问网络
    android异步任务处理(网络等耗时操作)
    android手机短信获取
    Android从启动到程序运行整个过程的整理
    android中的广播
    图片旋转问题
    Android Satudio的使用记录
    百度地图初学者
    简单的图片上传和下载
  • 原文地址:https://www.cnblogs.com/itunic/p/java-reentrant-lock1.html
Copyright © 2011-2022 走看看