zoukankan      html  css  js  c++  java
  • java 并发(五)---AbstractQueuedSynchronizer(2)

           文章部分代码和照片来自参考资料

    问题 :

    •    ConditionObject  的 await 和 signal 方法是如何实现的

    ConditonObject

             ConditionObject 继承 Condition 这个接口, 看一下这个接口的注解说明 :

    Condition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods.

           如果Lock替换了synchronized方法和语句的使用,则Condition将替换Object监视方法的使用。

             Condition 经常可以用在生产者-消费者的场景中,ArrayBlockingQueue 采用这种方式实现了生产者-消费者.

             ConditionObject 是 AQS里的一个对象,继承Condition 接口,上一节我们提到AQS 通过同步队列(sync queue )和 等待队列(wait queue )还有 状态变量(statue)进行并发控制。这节我们要讲的就是在等待队列的操作。

             下面是 wait queue 和 sync queue 的图例。

    wait_queue

    await 和 signal 方法

          代码分析来自于 一行一行源码分析清楚 AbstractQueuedSynchronizer (二)

             

            这两个方法可以用下面两种两张图来描述。其中await 是释放所有的锁,然后将节点加入到 wait queue ,然后等待唤醒。 signal 方法是从wait queue 移动到 sync queue 中,然后唤醒。

           

            wait_queue_await      

                                                                                图一. await 方法

    wait_queue_signal

                                                                                图二. signal 方法

                  Conditon 的方法实现 基于 ReetranLock 。下面源码分析会涉及到。

                  Condition 的await 方法 包括的操作有 :

    • If current thread is interrupted, throw InterruptedException.
    • Save lock state returned by getState.
    • Invoke release with saved state as argument, throwing IllegalMonitorStateException if it fails.
    • Block until signalled or interrupted.
    • Reacquire by invoking specialized version of acquire with saved state as argument.
      If interrupted while blocked in step 4, throw InterruptedException.
      1 // 首先,这个方法是可被中断的,不可被中断的是另一个方法 awaitUninterruptibly()
      2 // 这个方法会阻塞,直到调用 signal 方法(指 signal() 和 signalAll(),下同),或被中断
      3 public final void await() throws InterruptedException {
      4     if (Thread.interrupted())
      5         throw new InterruptedException();
      6     // 添加到 condition 的条件队列中
      7     Node node = addConditionWaiter();
      8     // 释放锁,返回值是释放锁之前的 state 值
      9     int savedState = fullyRelease(node);
     10     int interruptMode = 0;
     11     // 这里退出循环有两种情况,之后再仔细分析
     12     // 1. isOnSyncQueue(node) 返回 true,即当前 node 已经转移到阻塞队列了
     13     // 2. checkInterruptWhileWaiting(node) != 0 会到 break,然后退出循环,代表的是线程中断
     14     while (!isOnSyncQueue(node)) {
     15         LockSupport.park(this);
     16         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
     17             break;
     18     }
     19     // 被唤醒后,抢锁,抢不到将进入阻塞队列,等待获取锁
     20     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
     21         interruptMode = REINTERRUPT;
     22     if (node.nextWaiter != null) // clean up if cancelled
     23         unlinkCancelledWaiters();
     24     if (interruptMode != 0)
     25         reportInterruptAfterWait(interruptMode);
     26 }

           

      1 // 将当前线程对应的节点入队,插入队尾
      2 private Node addConditionWaiter() {
      3     Node t = lastWaiter;
      4     // 如果条件队列的最后一个节点取消了,将其清除出去
      5     if (t != null && t.waitStatus != Node.CONDITION) {
      6         // 这个方法会遍历整个条件队列,然后会将已取消的所有节点清除出队列
      7         unlinkCancelledWaiters();
      8         t = lastWaiter;
      9     }
     10     Node node = new Node(Thread.currentThread(), Node.CONDITION);
     11     // 如果队列为空
     12     if (t == null)
     13         firstWaiter = node;
     14     else
     15         t.nextWaiter = node;
     16     lastWaiter = node;
     17     return node;
     18 }
     19 在addWaiter 方法中,有一个 unlinkCancelledWaiters() 方法,该方法用于清除队列中已经取消等待的节点。
     20 
     21 当 await 的时候如果发生了取消操作(这点之后会说),或者是在节点入队的时候,发现最后一个节点是被取消的,会调用一次这个方法。
     22 
     23 // 等待队列是一个单向链表,遍历链表将已经取消等待的节点清除出去
     24 // 纯属链表操作,很好理解,看不懂多看几遍就可以了
     25 private void unlinkCancelledWaiters() {
     26     Node t = firstWaiter;
     27     Node trail = null;
     28     while (t != null) {
     29         Node next = t.nextWaiter;
     30         // 如果节点的状态不是 Node.CONDITION 的话,这个节点就是被取消的
     31         if (t.waitStatus != Node.CONDITION) {
     32             t.nextWaiter = null;
     33             if (trail == null)
     34                 firstWaiter = next;
     35             else
     36                 trail.nextWaiter = next;
     37             if (next == null)
     38                 lastWaiter = trail;
     39         }
     40         else
     41             trail = t;
     42         t = next;
     43     }
     44 }

              ReentranLock 是可重入的,所以释放所有的锁。

      1 // 首先,我们要先观察到返回值 savedState 代表 release 之前的 state 值
      2 // 对于最简单的操作:先 lock.lock(),然后 condition1.await()。
      3 //         那么 state 经过这个方法由 1 变为 0,锁释放,此方法返回 1
      4 //         相应的,如果 lock 重入了 n 次,savedState == n
      5 // 如果这个方法失败,会将节点设置为"取消"状态,并抛出异常 IllegalMonitorStateException
      6 final int fullyRelease(Node node) {
      7     boolean failed = true;
      8     try {
      9         int savedState = getState();
     10         // 这里使用了当前的 state 作为 release 的参数,也就是完全释放掉锁,将 state 置为 0
     11         if (release(savedState)) {
     12             failed = false;
     13             return savedState;
     14         } else {
     15             throw new IllegalMonitorStateException();
     16         }
     17     } finally {
     18         if (failed)
     19             node.waitStatus = Node.CANCELLED;
     20     }
     21 }
      1 // 在节点入条件队列的时候,初始化时设置了 waitStatus = Node.CONDITION
      2 // 前面我提到,signal 的时候需要将节点从条件队列移到阻塞队列,
      3 // 这个方法就是判断 node 是否已经移动到阻塞队列了
      4 final boolean isOnSyncQueue(Node node) {
      5     // 移动过去的时候,node 的 waitStatus 会置为 0,这个之后在说 signal 方法的时候会说到
      6     // 如果 waitStatus 还是 Node.CONDITION,也就是 -2,那肯定就是还在条件队列中
      7     // 如果 node 的前驱 prev 指向还是 null,说明肯定没有在 阻塞队列
      8     if (node.waitStatus == Node.CONDITION || node.prev == null)
      9         return false;
     10     // 如果 node 已经有后继节点 next 的时候,那肯定是在阻塞队列了
     11     if (node.next != null)
     12         return true;
     13 
     14     // 这个方法从阻塞队列的队尾开始从后往前遍历找,如果找到相等的,说明在阻塞队列,否则就是不在阻塞队列
     15 
     16     // 可以通过判断 node.prev() != null 来推断出 node 在阻塞队列吗?答案是:不能。
     17     // 这个可以看上篇 AQS 的入队方法,首先设置的是 node.prev 指向 tail,
     18     // 然后是 CAS 操作将自己设置为新的 tail,可是这次的 CAS 是可能失败的。
     19 
     20     // 调用这个方法的时候,往往我们需要的就在队尾的部分,所以一般都不需要完全遍历整个队列的
     21     return findNodeFromTail(node);
     22 }
     23 
     24 // 从同步队列的队尾往前遍历,如果找到,返回 true
     25 private boolean findNodeFromTail(Node node) {
     26     Node t = tail;
     27     for (;;) {
     28         if (t == node)
     29             return true;
     30         if (t == null)
     31             return false;
     32         t = t.prev;
     33     }
     34 }
     35 }
              
      1 int interruptMode = 0;
      2 while (!isOnSyncQueue(node)) {
      3     // 线程挂起
      4     LockSupport.park(this);
      5 
      6     if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
      7         break;
      8 }

             回到前面的循环,isOnSyncQueue(node) 返回 false 的话,那么进到 LockSupport.park(this); 这里线程挂起。

             接下来就是 signal 移动元素到同步队列,释放锁的时候唤醒线程,转移到阻塞队列为了大家理解,这里我们先看唤醒操作,因为刚刚到 LockSupport.park(this); 把线程挂起了,等待唤醒。唤醒操作通常由另一个线程来操作,就像生产者-消费者模式中,如果线程因为等待消费而挂起,那么当生产者生产了一个东西后,会调用 signal 移动元素到同步队列,释放锁的时候唤醒线程来消费。

      1 
      2 // 唤醒等待了最久的线程
      3 // 其实就是,将这个线程对应的 node 从条件队列转移到阻塞队列
      4 public final void signal() {
      5     // 调用 signal 方法的线程必须持有当前的独占锁
      6     if (!isHeldExclusively())
      7         throw new IllegalMonitorStateException();
      8     Node first = firstWaiter;
      9     if (first != null)
     10         doSignal(first);
     11 }
     12 
     13 // 从条件队列队头往后遍历,找出第一个需要转移的 node
     14 // 因为前面我们说过,有些线程会取消排队,但是还在队列中
     15 private void doSignal(Node first) {
     16     do {
     17           // 将 firstWaiter 指向 first 节点后面的第一个
     18         // 如果将队头移除后,后面没有节点在等待了,那么需要将 lastWaiter 置为 null
     19         if ( (firstWaiter = first.nextWaiter) == null)
     20             lastWaiter = null;
     21         // 因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉
     22         first.nextWaiter = null;
     23     } while (!transferForSignal(first) &&
     24              (first = firstWaiter) != null);
     25       // 这里 while 循环,如果 first 转移不成功,那么选择 first 后面的第一个节点进行转移,依此类推
     26 }
     27 
     28 // 将节点从条件队列转移到阻塞队列
     29 // true 代表成功转移
     30 // false 代表在 signal 之前,节点已经取消了
     31 final boolean transferForSignal(Node node) {
     32 
     33     // CAS 如果失败,说明此 node 的 waitStatus 已不是 Node.CONDITION,说明节点已经取消,
     34     // 既然已经取消,也就不需要转移了,方法返回,转移后面一个节点
     35     // 否则,将 waitStatus 置为 0
     36     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
     37         return false;
     38 
     39     // enq(node): 自旋进入阻塞队列的队尾
     40     // 注意,这里的返回值 p 是 node 在阻塞队列的前驱节点
     41     Node p = enq(node);
     42     int ws = p.waitStatus;
     43     // ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。唤醒之后会怎么样,后面再解释
     44     // 如果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用,上篇介绍的时候说过,节点入队后,需要把前驱节点的状态设为 Node.SIGNAL(-1)
     45     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
     46         // 如果前驱节点取消或者 CAS 失败,会进到这里唤醒线程,之后的操作看下一节
     47         LockSupport.unpark(node.thread);
     48     return true;
     49 

            注意 :

    • 调用 signal 前要先获得锁
    • 正常情况下,signal 方法只是对元素进行了移动(从wait queue 到 sync queue),真正的唤醒操作在释放锁的代码里。

            那么阻塞的线程现在继续执行,我们可以看到,即使被唤醒依旧要继续抢。

      1     /**
      2      * Acquires in exclusive uninterruptible mode for thread already in
      3      * queue. Used by condition wait methods as well as acquire.
      4      *
      5      * @param node the node
      6      * @param arg the acquire argument
      7      * @return {@code true} if interrupted while waiting
      8      */
      9     final boolean acquireQueued(final Node node, int arg) {
     10         boolean failed = true;
     11         try {
     12             boolean interrupted = false;
     13             for (;;) {
     14                 final Node p = node.predecessor();
     15                 if (p == head && tryAcquire(arg)) {
     16                     setHead(node);
     17                     p.next = null; // help GC
     18                     failed = false;
     19                     return interrupted;
     20                 }
     21                 if (shouldParkAfterFailedAcquire(p, node) &&
     22                     parkAndCheckInterrupt())
     23                     interrupted = true;
     24             }
     25         } finally {
     26             if (failed)
     27                 cancelAcquire(node);
     28         }
     29     }

           注意 :

    • await 方法被唤醒后依旧要抢,要是抢不到就继续阻塞。

           要是CAS 成功,检查中断状态

      1 int interruptMode = 0;
      2 while (!isOnSyncQueue(node)) {
      3     // 线程挂起
      4     LockSupport.park(this);
      5 
      6     if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
      7         break;
      8 }
      1 // 1. 如果在 signal 之前已经中断,返回 THROW_IE
      2 // 2. 如果是 signal 之后中断,返回 REINTERRUPT
      3 // 3. 没有发生中断,返回 0
      4 private int checkInterruptWhileWaiting(Node node) {
      5     return Thread.interrupted() ?
      6         (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
      7         0;
      8 }
      1 // 只有线程处于中断状态,才会调用此方法
      2 // 如果需要的话,将这个已经取消等待的节点转移到阻塞队列
      3 // 返回 true:如果此线程在 signal 之前被取消,
      4 final boolean transferAfterCancelledWait(Node node) {
      5     // 用 CAS 将节点状态设置为 0 
      6     // 如果这步 CAS 成功,说明是 signal 方法之前发生的中断,因为如果 signal 先发生的话,signal 中会将 waitStatus 设置为 0
      7     if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
      8         // 将节点放入阻塞队列
      9         // 这里我们看到,即使中断了,依然会转移到阻塞队列
     10         enq(node);
     11         return true;
     12     }
     13 
     14     // 到这里是因为 CAS 失败,肯定是因为 signal 方法已经将 waitStatus 设置为了 0
     15     // signal 方法会将节点转移到阻塞队列,但是可能还没完成,这边自旋等待其完成
     16     // 当然,这种事情还是比较少的吧:signal 调用之后,没完成转移之前,发生了中断
     17     while (!isOnSyncQueue(node))
     18         Thread.yield();
     19     return false;
     20 }

    小提示

             在AQS 的子类实现中,Conditon 常常会配合 ReentranLock  使用,例如 LinkBlockingQueue 。 要注意的是ConditionObject的 await 方法会释放掉所有的锁。show me code !

      1 public class LinkBlockingQueueTest {
      2     private ReentrantLock mLock = new ReentrantLock();
      3     private Condition mConditionObject = mLock.newCondition();
      4 
      5     public void testMethod() throws InterruptedException {
      6 
      7         final ReentrantLock methodLock = this.mLock;
      8         methodLock.lockInterruptibly();
      9         try {
     10             mConditionObject.await();
     11             System.out.println("sss");
     12         } finally {
     13             methodLock.unlock();
     14         }
     15     }
     16 
     17     public void testMethod1() throws InterruptedException {
     18 
     19         final ReentrantLock methodLock = this.mLock;
     20         methodLock.lockInterruptibly();
     21         try {
     22             System.out.println("s1");
     23         } finally {
     24             methodLock.unlock();
     25         }
     26     }
     27 
     28 }
      1   public static void main(String[] args) throws InterruptedException {
      2 
      3         LinkBlockingQueueTest obj = new LinkBlockingQueueTest();
      4 
      5         new Thread(() -> {
      6             try {
      7                 obj.testMethod();
      8             } catch (InterruptedException e) {
      9                 e.printStackTrace();
     10             }
     11         }).start();
     12 
     13         Thread.sleep(1000L * 2);
     14         new Thread(() -> {
     15             try {
     16                 obj.testMethod1();
     17             } catch (InterruptedException e) {
     18                 e.printStackTrace();
     19             }
     20         }).start();
     21 
     22     }

             会输出 s1 。

    为什么有了sync queue 还需要 wait queue ?

              ConditionObject  里的await方法会(假如这个节点已经存在sync queue)释放锁移入 wait queue , signal 方法则是重新移入到 sync queue ,那么我们就可以知道了 wait queue 的作用是临时存放节点,移除在 sync queue 的节点(假如存在),再插入到 sync queue 的队尾。它的作用我们可以在阅读ArrayBlockingQueue源码时就可以感受到了。

    参考资料

  • 相关阅读:
    当import的模块内容发生变化时,对此模块进行重新加载(刷新)
    使用python的ctypes库实现内存的动态申请和释放
    【转载】实现博客园图片的可放大功能
    使用tqdm实现下载文件进度条
    pytest参数化的两种方式
    Jmeter之Bean shell使用-常用内置变量
    JMeter之Ramp-up Period(in seconds)说明
    Jmeter性能测试基础
    接口测试基础
    JMeter做http接口功能测试
  • 原文地址:https://www.cnblogs.com/Benjious/p/10161633.html
Copyright © 2011-2022 走看看