zoukankan      html  css  js  c++  java
  • 聊聊 ReentrantLock是如何实现互斥访问临界资源的

    java中要实现临界资源的互斥访问,大体来说常用的就是就是关键字synchronized以及ReentrantLock。相比于synchronized,ReentrantLock可以实现尝试加锁,以及限时加锁。

    1. 首先基于非公平锁看下,如何锁住临界代码块:

      1.1 NonfairSync.lock()

    1 final void lock() {
    2             if (compareAndSetState(0, 1))
    3                 setExclusiveOwnerThread(Thread.currentThread());
    4             else
    5                 acquire(1);
    6         }

      第2-3行是具体的非公平性的一个体现。第5行是加锁的代码(当然,在第2行判断成功了,就在第3行完成了加锁,不会走到第五行);

      第2行就是对state这个字段尝试一次cas操作(这个字段加锁的次数),成功就设置当前线程线程为当前锁的独占线程,加锁成功。

      接着第5行看:

      AbstractQueuedSynchronizer.acquire(int args)

    1  public final void acquire(int arg) {
    2         if (!tryAcquire(arg) &&
    3             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    4             selfInterrupt();
    5     }

        这个方法在第2行尝试获取锁,获取失败进入第三行完成入队操作,加锁阶段暂时不会走到第4行代码来(实际上是在第三行的方法内的某一行阻塞了)。接着第2的代码看:

    1 protected final boolean tryAcquire(int acquires) {
    2             return nonfairTryAcquire(acquires);
    3         }

       这个就是一个简单的方法调用,继续往里面看:

      ReentrantLock.Sync.nonfairTryAcquire(int acquires)

     1 final boolean nonfairTryAcquire(int acquires) {
     2             final Thread current = Thread.currentThread();
     3             int c = getState();
     4             if (c == 0) {
     5                 if (compareAndSetState(0, acquires)) {
     6                     setExclusiveOwnerThread(current);
     7                     return true;
     8                 }
     9             }
    10             else if (current == getExclusiveOwnerThread()) {
    11                 int nextc = c + acquires;
    12                 if (nextc < 0) // overflow
    13                     throw new Error("Maximum lock count exceeded");
    14                 setState(nextc);
    15                 return true;
    16             }
    17             return false;
    18         }

      第4-9行代码和之前尝试加锁的方法类似,此处忽略。重点看下10-17行,第10-16行是可重入锁的代码的实现,就是先获取当前持有锁的线程,和请求锁的线程,如果是同一线程,那么直接对state累加,然后更新state即可。如果是其他线程请求加锁,

    直接在第17行返回false。接着看AbstractQueuedSynchronizer.acquire(int args)方法的第2行

    1   acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

      先看里面这个方法 AbstractQueuedSynchronizer.addWaiter(Node model);

     1 private Node addWaiter(Node mode) {
     2         Node node = new Node(Thread.currentThread(), mode);
     3         // Try the fast path of enq; backup to full enq on failure
     4         Node pred = tail;
     5         if (pred != null) {
     6             node.prev = pred;
     7             if (compareAndSetTail(pred, node)) {
     8                 pred.next = node;
     9                 return node;
    10             }
    11         }
    12         enq(node);
    13         return node;
    14     }

      这个方法就是完成AQS中原子入队的功能。要么在5-11行完成入队操作,要么在12行完成入队操作。

      首先,第2行完成node节点的创建(注意, 此处传入的mode为 Node.EXCLUSIVE), 表明当前线程请求以独占模式持有锁。

      先看第5-11行,这段代码完成入队操作有两个先决条件,1)当前等待队列已经有线程在等待锁了;2)当前线程获取tail节点的引用之后,完成入队操作之前,没有其他线程更新tail;

      如果这段代码首先获取tail并且保存到pred中,然后再比较当前的tail和pred是否一样,如果一样,则更新tail为node(这一段比较交换的原子性是由底层操作系统提供的),然后将之前tail的next指向当前的node;

      再看第12行,如果刚才的两个先决条件有一个不满足,将会执行第12行代码:

    AbstractQueuedSynchronizer.enq(Node node)

     1 private Node enq(final Node node) {
     2         for (;;) {
     3             Node t = tail;
     4             if (t == null) { // Must initialize
     5                 if (compareAndSetHead(new Node()))
     6                     tail = head;
     7             } else {
     8                 node.prev = t;
     9                 if (compareAndSetTail(t, node)) {
    10                     t.next = node;
    11                     return t;
    12                 }
    13             }
    14         }
    15     }

      很明显可以看到,这段代码是个死循环,只有在第11行可以结束方法。第4-7行是完成队列的初始化(也就是给head和tail初始化),8-11是完成入队操作的(和上面方法的6-10行思想是一样的)。

      无论是第4-7行还是第8-11行都是基于CAS的思想,上面已经描述过一次了。

      这段代码执行完,也就标志了节点入队成功了。方法会返回刚才成功入队的节点,接下来就需要改变节点的状态了,以及决定是否挂起节点。

      返回到AbstractQueuedSynchronizer.acquire(int args)的第二行,接着看方法

      AbstractQueuedSynchronizer.acquiredQueue(Node node)

     1 final boolean acquireQueued(final Node node, int arg) {
     2         boolean failed = true;
     3         try {
     4             boolean interrupted = false;
     5             for (;;) {
     6                 final Node p = node.predecessor();
     7                 if (p == head && tryAcquire(arg)) {
     8                     setHead(node);
     9                     p.next = null; // help GC
    10                     failed = false;
    11                     return interrupted;
    12                 }
    13                 if (shouldParkAfterFailedAcquire(p, node) &&
    14                     parkAndCheckInterrupt())
    15                     interrupted = true;
    16             }
    17         } finally {
    18             if (failed)
    19                 cancelAcquire(node);
    20         }
    21     }

      这段代码是功能点有改变节点状态,挂起节点中的线程,唤醒接待中的线程,出队操作。传入的node就是刚刚完成入队的node, args = 1;

      这段代码正常情况下只能在第11行结束方法,获取在try语句块中抛出异常结束循环,然后执行玩finallly语句块中的方法结束方法。

      加锁的逻辑只有在第13行和第14行方法的第1句。

    AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(Node pred, Node node)

     1  private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     2         int ws = pred.waitStatus;
     3         if (ws == Node.SIGNAL)
     4             /*
     5              * This node has already set status asking a release
     6              * to signal it, so it can safely park.
     7              */
     8             return true;
     9         if (ws > 0) {
    10             /*
    11              * Predecessor was cancelled. Skip over predecessors and
    12              * indicate retry.
    13              */
    14             do {
    15                 node.prev = pred = pred.prev;
    16             } while (pred.waitStatus > 0);
    17             pred.next = node;
    18         } else {
    19             /*
    20              * waitStatus must be 0 or PROPAGATE.  Indicate that we
    21              * need a signal, but don't park yet.  Caller will need to
    22              * retry to make sure it cannot acquire before parking.
    23              */
    24             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    25         }
    26         return false;
    27     }

      这段代码就是改变node的前一个节点的ws,以及决定是否挂起node中的线程。

      此处的pred就是node的前一个节点。

      首先获取前一个节点的ws, 如果ws = SIGNAL(实际上就是前一个节点也是被挂起的),直接返回true;

      如果ws > 0,也就是说ws = CANCELLED,那么会从pred往head节点方向找,找到第一个ws <= 0的节点A,然后将node挂在node后面,返回false;

      如果ws是其他值, 直接将ws的值更改为SIGNAL,然后返回false.

      结合前一个方法知道,如果这个方法返回false, 实际上是重新进入下一个循环。

      以第一个入队的节点为例:第一次执行该方法时候,pred就是head节点,假设尝试获取锁失败,第一次先会将head节点的ws 由0--->-1,返回false,然后再尝试获取锁,假设还是失败,直接返回true。

      接着看下一个方法

      AbstractQueuedSychronizer.parkAndChechInterrupt()

    1 private final boolean parkAndCheckInterrupt() {
    2         LockSupport.park(this);
    3         return Thread.interrupted();
    4     }

      这个方法很简单,直接就在第二行挂起当前线程。同样,加锁方法也就到此为止了,请求锁的线程到此也就被挂起了。

    2 解锁

      解锁的过程,入口方法就是 

      ReenTrantLock.unlock()  

    1 public void unlock() {
    2         sync.release(1);
    3     }

     此处就是一个简单的方法调用,具体看下个方法:

     AbstractQueuedSynchronizer.release(int args)

    1 public final boolean release(int arg) {
    2         if (tryRelease(arg)) {
    3             Node h = head;
    4             if (h != null && h.waitStatus != 0)
    5                 unparkSuccessor(h);
    6             return true;
    7         }
    8         return false;
    9     }

      这个方法先看第二行,调用tryRelease(int args)来决定当前线程是否释放锁

    ReentrantLock.tryRelease(int releases)

     1 protected final boolean tryRelease(int releases) {
     2             int c = getState() - releases;
     3             if (Thread.currentThread() != getExclusiveOwnerThread())
     4                 throw new IllegalMonitorStateException();
     5             boolean free = false;
     6             if (c == 0) {
     7                 free = true;
     8                 setExclusiveOwnerThread(null);
     9             }
    10             setState(c);
    11             return free;
    12         }

      这个方法是非线程安全的,同时也无需保证线程线程安全,因为之后持有锁的线程调用该方法才能改变state,以及决定是否释放锁,而ReentrantLock是独占锁(就是只有一个线程能否持有锁),所以只有在任何时刻,只有一个线程可以调用该方法并且

    改变state的值。同样,由于每次传入的releases = 1, 所以无需考虑c < 0 的情况。这个代码表示只有state的时候,将free赋值为true,也就是说释放锁,同时将锁的独占线程设置为null。

      再回头看上个方法AbstractQueuedSynchronizer.release(int args),当ReentrantLock.tryRelease(int releases)方法返回true的时候,重点看第三行if语句:

      h != null && h.waitStatus != 0
      首先 h != null 代表当前可能有节点在等待锁,然后 h.waitStatus != 0 代表一定有节点在等待锁。
      因为对于head节点,其ws只有两个值, 0和-1,0的情况是此刻锁被其他线程持有,正好有两一个线程又在请求锁,并且请求锁的线程还没有被挂起之前(没被挂起,当然不需要唤醒);而 -1 代表,刚刚请求锁的线程已经被挂起了。
      所以说,只有在 h != null && h.waitStatus !=0 的时候才去调用 unparkSuccessor(h)方法;
    AbstractQueuedSynchronizer.unaprkSuccessor(NOde node)
     1 private void unparkSuccessor(Node node) {
     2         /*
     3          * If status is negative (i.e., possibly needing signal) try
     4          * to clear in anticipation of signalling.  It is OK if this
     5          * fails or if status is changed by waiting thread.
     6          */
     7         int ws = node.waitStatus;
     8         if (ws < 0)
     9             compareAndSetWaitStatus(node, ws, 0);
    10 
    11         /*
    12          * Thread to unpark is held in successor, which is normally
    13          * just the next node.  But if cancelled or apparently null,
    14          * traverse backwards from tail to find the actual
    15          * non-cancelled successor.
    16          */
    17         Node s = node.next;
    18         if (s == null || s.waitStatus > 0) {
    19             s = null;
    20             for (Node t = tail; t != null && t != node; t = t.prev)
    21                 if (t.waitStatus <= 0)
    22                     s = t;
    23         }
    24         if (s != null)
    25             LockSupport.unpark(s.thread);
    26     }  

      此处传入的node 就是head, 首先获取head的ws, 当ws  < 0 的时候,cas修改为 0;接下来的动作就是寻找下一个被唤醒的节点。

      首先查看头节点的下一个节点s == null, 如果不成立并且s.ws <= 0 (s节点不是被取消的) 直接唤醒 s, 否则 从tail节点往head方向找,找到最靠近head的节点,然后唤醒它。

      唤醒之后就又回到之前的方法

      AbstractQueuedSynchronized.parkANdCheckInterrupt()方法

    1  private final boolean parkAndCheckInterrupt() {
    2         LockSupport.park(this);
    3         return Thread.interrupted();
    4     }

      直接在第3行返回当前线程的中断标志位;

      继续回到方法 AbstractQueuedSynchronized.acquireQueued(final Node node, int args)

     1 final boolean acquireQueued(final Node node, int arg) {
     2         boolean failed = true;
     3         try {
     4             boolean interrupted = false;
     5             for (;;) {
     6                 final Node p = node.predecessor();
     7                 if (p == head && tryAcquire(arg)) {
     8                     setHead(node);
     9                     p.next = null; // help GC
    10                     failed = false;
    11                     return interrupted;
    12                 }
    13                 if (shouldParkAfterFailedAcquire(p, node) &&
    14                     parkAndCheckInterrupt())
    15                     interrupted = true;
    16             }
    17         } finally {
    18             if (failed)
    19                 cancelAcquire(node);
    20         }
    21     }

      由之前的unparkSuccessor(Node node)我们知道,能够被唤醒的一定是距离head节点最近的一个节点并且其ws <= 0。

      先看第一种情况,head和node之间还存在若干个节点,但是这若干个节点的ws都是 -1(CANCELLED),那么方法会进入到第13行,执行 shouldParkAfterFailedAcquire(p, node)将这若干个节点清除掉,然后继续下次循环;  

      这个时候的node一定是head的下一个节点了,假设当前线程成功获取锁(也就是trtAcquire(int args)返回true),方法执行8-11行。

      第8行完成head引用的更改,以及将节点绑定的线程置空。然后断开当前节点与队列中其他节点的引用(好让GC完成对当前节点的回收), 然后将失败标志位置为false, 最后返回当前线程的中断标志位。

      然后回到方法:

      AbstrctQueuedSynchronized,acquire()方法:

    1 public final void acquire(int arg) {
    2         if (!tryAcquire(arg) &&
    3             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    4             selfInterrupt();
    5     }

       回到方法的第3行,如果返回true,那么执行方法的第4行,中断线程。

    总结一下:

      1. ReentrantLock中的state就是记录当前锁被某一线程加锁的次数,任何时刻满足 state >= 0;

      2. 从ReentrantLock的加锁与解锁来看,加锁的时候在非公平锁的情况下有一个抢锁的机会,抢锁失败会加入队列的尾部,公平所直接加入队列的尾部,而解锁的时候,是从头节点依次往后唤醒线程的,

        因此公平锁肯定满足先进入队列的线程先获取锁,非公平所的大部分线程也是如此。

      3. 整个加锁解锁的操作过程中,加锁解锁的次数以及节点的入队都是依靠cas来完成原子操作的。其中加锁解锁直接以来于两次state的值,节点入队操作依赖于tail节点。
      4. ReentrantLock能够实现临界代码的互斥访问依赖于LockSupport.park()与LockSupport.unpark()方法实现线程的挂起与唤醒。

      5.ReentrantLock 依赖于队列完成对线程进行有效的挂起与唤醒(这个是与Synchronize关键字相比)。

      

      

    ---恢复内容结束---

  • 相关阅读:
    Alpha阶段项目复审
    复审与事后分析
    测试与发布(Alpha版本)
    第七天
    第六天
    团队作业第4周——项目冲刺
    第一天
    第二天
    第四天
    第五天
  • 原文地址:https://www.cnblogs.com/bedlimate/p/9129390.html
Copyright © 2011-2022 走看看