zoukankan      html  css  js  c++  java
  • 再谈AbstractQueuedSynchronizer2:共享模式与基于Condition的等待/通知机制实现

    共享模式acquire实现流程

    上文我们讲解了AbstractQueuedSynchronizer独占模式的acquire实现流程,本文趁热打铁继续看一下AbstractQueuedSynchronizer共享模式acquire的实现流程。连续两篇文章的学习,也可以对比独占模式acquire和共享模式acquire的区别,加深对于AbstractQueuedSynchronizer的理解。

    先看一下共享模式acquire的实现,方法为acquireShared和acquireSharedInterruptibly,两者差别不大,区别就在于后者有中断处理,以acquireShared为例:

     1 public final void acquireShared(int arg) {
     2     if (tryAcquireShared(arg) < 0)
     3         doAcquireShared(arg);
     4 }

    这里就能看出第一个差别来了:独占模式acquire的时候子类重写的方法tryAcquire返回的是boolean,即是否tryAcquire成功;共享模式acquire的时候,返回的是一个int型变量,判断是否<0。doAcquireShared方法的实现为:

     1 private void doAcquireShared(int arg) {
     2     final Node node = addWaiter(Node.SHARED);
     3     boolean failed = true;
     4     try {
     5         boolean interrupted = false;
     6         for (;;) {
     7             final Node p = node.predecessor();
     8             if (p == head) {
     9                 int r = tryAcquireShared(arg);
    10                 if (r >= 0) {
    11                     setHeadAndPropagate(node, r);
    12                     p.next = null; // help GC
    13                     if (interrupted)
    14                         selfInterrupt();
    15                     failed = false;
    16                     return;
    17                 }
    18             }
    19             if (shouldParkAfterFailedAcquire(p, node) &&
    20                 parkAndCheckInterrupt())
    21                 interrupted = true;
    22         }
    23     } finally {
    24         if (failed)
    25             cancelAcquire(node);
    26     }
    27 }

    我们来分析一下这段代码做了什么:

    1. addWaiter,把所有tryAcquireShared<0的线程实例化出一个Node,构建为一个FIFO队列,这和独占锁是一样的
    2. 拿当前节点的前驱节点,只有前驱节点是head的节点才能tryAcquireShared,这和独占锁也是一样的
    3. 前驱节点不是head的,执行"shouldParkAfterFailedAcquire() && parkAndCheckInterrupt()",for(;;)循环,"shouldParkAfterFailedAcquire()"方法执行2次,当前线程阻塞,这和独占锁也是一样的

    确实,共享模式下的acquire和独占模式下的acquire大部分逻辑差不多,最大的差别在于tryAcquireShared成功之后,独占模式的acquire是直接将当前节点设置为head节点即可,共享模式会执行setHeadAndPropagate方法,顾名思义,即在设置head之后多执行了一步propagate操作。setHeadAndPropagate方法源码为:

     1 private void setHeadAndPropagate(Node node, int propagate) {
     2     Node h = head; // Record old head for check below
     3     setHead(node);
     4     /*
     5      * Try to signal next queued node if:
     6      *   Propagation was indicated by caller,
     7      *     or was recorded (as h.waitStatus) by a previous operation
     8      *     (note: this uses sign-check of waitStatus because
     9      *      PROPAGATE status may transition to SIGNAL.)
    10      * and
    11      *   The next node is waiting in shared mode,
    12      *     or we don't know, because it appears null
    13      *
    14      * The conservatism in both of these checks may cause
    15      * unnecessary wake-ups, but only when there are multiple
    16      * racing acquires/releases, so most need signals now or soon
    17      * anyway.
    18      */
    19     if (propagate > 0 || h == null || h.waitStatus < 0) {
    20         Node s = node.next;
    21         if (s == null || s.isShared())
    22             doReleaseShared();
    23     }
    24 }

    第3行的代码设置重设head,第2行的代码由于第3行的代码要重设head,因此先定义一个Node型变量h获得原head的地址,这两行代码很简单。

    第19行~第23行的代码是独占锁和共享锁最不一样的一个地方,我们再看独占锁acquireQueued的代码:

     1 final boolean acquireQueued(final Node node, int arg) {
     2     boolean failed = true;
     3     try {
     4         boolean interrupted = false;
     5         for (;;) {
     6             final Node p = node.predecessor();
     7             if (p == head && tryAcquire(arg)) {
     8                 setHead(node);
     9                 p.next = null; // help GC
    10                 failed = false;
    11                 return interrupted;
    12             }
    13             if (shouldParkAfterFailedAcquire(p, node) &&
    14                 parkAndCheckInterrupt())
    15                 interrupted = true;
    16         }
    17     } finally {
    18         if (failed)
    19             cancelAcquire(node);
    20     }
    21 }

    这意味着独占锁某个节点被唤醒之后,它只需要将这个节点设置成head就完事了,而共享锁不一样,某个节点被设置为head之后,如果它的后继节点是SHARED状态的,那么将继续通过doReleaseShared方法尝试往后唤醒节点,实现了共享状态的向后传播

    共享模式release实现流程

    上面讲了共享模式下acquire是如何实现的,下面再看一下release的实现流程,方法为releaseShared:

    1 public final boolean releaseShared(int arg) {
    2     if (tryReleaseShared(arg)) {
    3         doReleaseShared();
    4         return true;
    5     }
    6     return false;
    7 }

    tryReleaseShared方法是子类实现的,如果tryReleaseShared成功,那么执行doReleaseShared()方法:

     1 private void doReleaseShared() {
     2     /*
     3      * Ensure that a release propagates, even if there are other
     4      * in-progress acquires/releases.  This proceeds in the usual
     5      * way of trying to unparkSuccessor of head if it needs
     6      * signal. But if it does not, status is set to PROPAGATE to
     7      * ensure that upon release, propagation continues.
     8      * Additionally, we must loop in case a new node is added
     9      * while we are doing this. Also, unlike other uses of
    10      * unparkSuccessor, we need to know if CAS to reset status
    11      * fails, if so rechecking.
    12      */
    13     for (;;) {
    14         Node h = head;
    15         if (h != null && h != tail) {
    16             int ws = h.waitStatus;
    17             if (ws == Node.SIGNAL) {
    18                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    19                     continue;            // loop to recheck cases
    20                 unparkSuccessor(h);
    21             }
    22             else if (ws == 0 &&
    23                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    24                 continue;                // loop on failed CAS
    25         }
    26         if (h == head)                   // loop if head changed
    27             break;
    28     }
    29 }

    主要是两层逻辑:

    1. 头结点本身的waitStatus是SIGNAL且能通过CAS算法将头结点的waitStatus从SIGNAL设置为0,唤醒头结点的后继节点
    2. 头结点本身的waitStatus是0的话,尝试将其设置为PROPAGATE状态的,意味着共享状态可以向后传播

    Condition的await()方法实现原理----构建等待队列

    我们知道,Condition是用于实现通知/等待机制的,和Object的wait()/notify()一样,由于本文之前描述AbstractQueuedSynchronizer的共享模式的篇幅不是很长,加之Condition也是AbstractQueuedSynchronizer的一部分,因此将Condition也放在这里写了。

    Condition分为await()和signal()两部分,前者用于等待、后者用于唤醒,首先看一下await()是如何实现的。Condition本身是一个接口,其在AbstractQueuedSynchronizer中的实现为ConditionObject:

    1 public class ConditionObject implements Condition, java.io.Serializable {
    2         private static final long serialVersionUID = 1173984872572414699L;
    3         /** First node of condition queue. */
    4         private transient Node firstWaiter;
    5         /** Last node of condition queue. */
    6         private transient Node lastWaiter;
    7         
    8         ...
    9 }

    这里贴了一些字段定义,后面都是方法就不贴了,会对重点方法进行分析的。从字段定义我们可以看到,ConditionObject全局性地记录了第一个等待的节点与最后一个等待的节点

    像ReentrantLock每次要使用ConditionObject,直接new一个ConditionObject出来即可。我们关注一下await()方法的实现:

     1 public final void await() throws InterruptedException {
     2     if (Thread.interrupted())
     3         throw new InterruptedException();
     4     Node node = addConditionWaiter();
     5     int savedState = fullyRelease(node);
     6     int interruptMode = 0;
     7     while (!isOnSyncQueue(node)) {
     8         LockSupport.park(this);
     9         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    10             break;
    11     }
    12     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    13         interruptMode = REINTERRUPT;
    14     if (node.nextWaiter != null) // clean up if cancelled
    15         unlinkCancelledWaiters();
    16     if (interruptMode != 0)
    17         reportInterruptAfterWait(interruptMode);
    18 }

    第2行~第3行的代码用于处理中断,第4行代码比较关键,添加Condition的等待者,看一下实现:

     1 private Node addConditionWaiter() {
     2     Node t = lastWaiter;
     3     // If lastWaiter is cancelled, clean out.
     4     if (t != null && t.waitStatus != Node.CONDITION) {
     5         unlinkCancelledWaiters();
     6         t = lastWaiter;
     7     }
     8     Node node = new Node(Thread.currentThread(), Node.CONDITION);
     9     if (t == null)
    10         firstWaiter = node;
    11     else
    12         t.nextWaiter = node;
    13     lastWaiter = node;
    14     return node;
    15 }

    首先拿到队列(注意数据结构,Condition构建出来的也是一个队列)中最后一个等待者,紧接着第4行的的判断,判断最后一个等待者的waitStatus不是CONDITION的话,执行第5行的代码,解绑取消的等待者,因为通过第8行的代码,我们看到,new出来的Node的状态都是CONDITION的

    那么unlinkCancelledWaiters做了什么?里面的流程就不看了,就是一些指针遍历并判断状态的操作,总结一下就是:从头到尾遍历每一个Node,遇到Node的waitStatus不是CONDITION的就从队列中踢掉,该节点的前后节点相连。

    接着第8行的代码前面说过了,new出来了一个Node,存储了当前线程,waitStatus是CONDITION,接着第9行~第13行的操作很好理解:

    1. 如果lastWaiter是null,说明FIFO队列中没有任何Node,firstWaiter=Node
    2. 如果lastWaiter不是null,说明FIFO队列中有Node,原lastWaiter的next指向Node
    3. 无论如何,新加入的Node编程lastWaiter,即新加入的Node一定是在最后面

    用一张图表示一下构建的数据结构就是:

    对比学习,我们总结一下Condition构建出来的队列和AbstractQueuedSynchronizer构建出来的队列的差别,主要体现在2点上:

    1. AbstractQueuedSynchronizer构建出来的队列,头节点是一个没有Thread的空节点,其标识作用,而Condition构建出来的队列,头节点就是真正等待的节点
    2. AbstractQueuedSynchronizer构建出来的队列,节点之间有next与pred相互标识该节点的前一个节点与后一个节点的地址,而Condition构建出来的队列,只使用了nextWaiter标识下一个等待节点的地址

    整个过程中,我们看到没有使用任何CAS操作,firstWaiter和lastWaiter也没有用volatile修饰,其实原因很简单:要await()必然要先lock(),既然lock()了就表示没有竞争,没有竞争自然也没必要使用volatile+CAS的机制去保证什么

    Condition的await()方法实现原理----线程等待

    前面我们看了Condition构建等待队列的过程,接下来我们看一下等待的过程,await()方法的代码比较短,再贴一下:

     1 public final void await() throws InterruptedException {
     2     if (Thread.interrupted())
     3         throw new InterruptedException();
     4     Node node = addConditionWaiter();
     5     int savedState = fullyRelease(node);
     6     int interruptMode = 0;
     7     while (!isOnSyncQueue(node)) {
     8         LockSupport.park(this);
     9         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    10             break;
    11     }
    12     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    13         interruptMode = REINTERRUPT;
    14     if (node.nextWaiter != null) // clean up if cancelled
    15         unlinkCancelledWaiters();
    16     if (interruptMode != 0)
    17         reportInterruptAfterWait(interruptMode);
    18 }

    构建完毕队列之后,执行第5行的fullyRelease方法,顾名思义:fullyRelease方法的作用是完全释放Node的状态。方法实现为:

     1 final int fullyRelease(Node node) {
     2     boolean failed = true;
     3     try {
     4         int savedState = getState();
     5         if (release(savedState)) {
     6             failed = false;
     7             return savedState;
     8         } else {
     9             throw new IllegalMonitorStateException();
    10         }
    11     } finally {
    12         if (failed)
    13             node.waitStatus = Node.CANCELLED;
    14     }
    15 }

    这里第4行获取state,第5行release的时候将整个state传过去,理由是某线程可能多次调用了lock()方法,比如调用了10次lock,那么此线程就将state加到了10,所以这里要将10传过去,将状态全部释放,这样后面的线程才能重新从state=0开始竞争锁,这也是方法被命名为fullyRelease的原因,因为要完全释放锁,释放锁之后,如果有竞争锁的线程,那么就唤醒第一个,这都是release方法的逻辑了,前面的文章详细讲解过。

    接着看await()方法的第7行判断"while(!isOnSyncQueue(node))":

     1 final boolean isOnSyncQueue(Node node) {
     2     if (node.waitStatus == Node.CONDITION || node.prev == null)
     3         return false;
     4     if (node.next != null) // If has successor, it must be on queue
     5         return true;
     6     /*
     7      * node.prev can be non-null, but not yet on queue because
     8      * the CAS to place it on queue can fail. So we have to
     9      * traverse from tail to make sure it actually made it.  It
    10      * will always be near the tail in calls to this method, and
    11      * unless the CAS failed (which is unlikely), it will be
    12      * there, so we hardly ever traverse much.
    13      */
    14     return findNodeFromTail(node);
    15 }

    注意这里的判断是Node是否在AbstractQueuedSynchronizer构建的队列中而不是Node是否在Condition构建的队列中,如果Node不在AbstractQueuedSynchronizer构建的队列中,那么调用LockSupport的park方法阻塞。

    至此调用await()方法的线程构建Condition等待队列--释放锁--等待的过程已经全部分析完毕。

    Condition的signal()实现原理

    上面的代码分析了构建Condition等待队列--释放锁--等待的过程,接着看一下signal()方法通知是如何实现的:

    1 public final void signal() {
    2     if (!isHeldExclusively())
    3         throw new IllegalMonitorStateException();
    4     Node first = firstWaiter;
    5     if (first != null)
    6         doSignal(first);
    7 }

    首先从第2行的代码我们看到,要能signal(),当前线程必须持有独占锁,否则抛出异常IllegalMonitorStateException。

    那么真正操作的时候,获取第一个waiter,如果有waiter,调用doSignal方法:

    1 private void doSignal(Node first) {
    2     do {
    3         if ( (firstWaiter = first.nextWaiter) == null)
    4             lastWaiter = null;
    5         first.nextWaiter = null;
    6     } while (!transferForSignal(first) &&
    7              (first = firstWaiter) != null);
    8 }

    第3行~第5行的代码很好理解:

    1. 重新设置firstWaiter,指向第一个waiter的nextWaiter
    2. 如果第一个waiter的nextWaiter为null,说明当前队列中只有一个waiter,lastWaiter置空
    3. 因为firstWaiter是要被signal的,因此它没什么用了,nextWaiter置空

    接着执行第6行和第7行的代码,这里重点就是第6行的transferForSignal方法:

     1 final boolean transferForSignal(Node node) {
     2     /*
     3      * If cannot change waitStatus, the node has been cancelled.
     4      */
     5     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
     6         return false;
     7 
     8     /*
     9      * Splice onto queue and try to set waitStatus of predecessor to
    10      * indicate that thread is (probably) waiting. If cancelled or
    11      * attempt to set waitStatus fails, wake up to resync (in which
    12      * case the waitStatus can be transiently and harmlessly wrong).
    13      */
    14     Node p = enq(node);
    15     int ws = p.waitStatus;
    16     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    17         LockSupport.unpark(node.thread);
    18     return true;
    19 }

    方法本意是将一个节点从Condition队列转换为AbstractQueuedSynchronizer队列,总结一下方法的实现:

    1. 尝试将Node的waitStatus从CONDITION置为0,这一步失败直接返回false
    2. 当前节点进入调用enq方法进入AbstractQueuedSynchronizer队列
    3. 当前节点通过CAS机制将waitStatus置为SIGNAL

    最后上面的步骤全部成功,返回true,返回true唤醒等待节点成功。从唤醒的代码我们可以得出一个重要结论:某个await()的节点被唤醒之后并不意味着它后面的代码会立即执行,它会被加入到AbstractQueuedSynchronizer队列的尾部,只有前面等待的节点获取锁全部完毕才能轮到它

    代码分析到这里,我想类似的signalAll方法也没有必要再分析了,显然signalAll方法的作用就是将所有Condition队列中等待的节点逐一队列中从移除,由CONDITION状态变为SIGNAL状态并加入AbstractQueuedSynchronizer队列的尾部。

    代码示例

    可能大家看了我分析半天代码会有点迷糊,这里最后我贴一段我用于验证上面Condition结论的示例代码,首先建立一个Thread,我将之命名为ConditionThread:

     1 /**
     2  * @author 五月的仓颉http://www.cnblogs.com/xrq730/p/7067904.html
     3  */
     4 public class ConditionThread implements Runnable {
     5 
     6     private Lock lock;
     7     
     8     private Condition condition;
     9     
    10     public ConditionThread(Lock lock, Condition condition) {
    11         this.lock = lock;
    12         this.condition = condition;
    13     }
    14     
    15     @Override
    16     public void run() {
    17         
    18         if ("线程0".equals(JdkUtil.getThreadName())) {
    19             thread0Process();
    20         } else if ("线程1".equals(JdkUtil.getThreadName())) {
    21             thread1Process();
    22         } else if ("线程2".equals(JdkUtil.getThreadName())) {
    23             thread2Process();
    24         }
    25         
    26     }
    27     
    28     private void thread0Process() {
    29         try {
    30             lock.lock();
    31             System.out.println("线程0休息5秒");
    32             JdkUtil.sleep(5000);
    33             condition.signal();
    34             System.out.println("线程0唤醒等待线程");
    35         } finally {
    36             lock.unlock();
    37         }
    38     }
    39     
    40     private void thread1Process() {
    41         try {
    42             lock.lock();
    43             System.out.println("线程1阻塞");
    44             condition.await();
    45             System.out.println("线程1被唤醒");
    46         } catch (InterruptedException e) {
    47             
    48         } finally {
    49             lock.unlock();
    50         }
    51     }
    52     
    53     private void thread2Process() {
    54         try {
    55             System.out.println("线程2想要获取锁");
    56             lock.lock();
    57             System.out.println("线程2获取锁成功");
    58         } finally {
    59             lock.unlock();
    60         }
    61     }
    62     
    63 }

    这个类里面的方法就不解释了,反正就三个方法片段,根据线程名判断,每个线层执行的是其中的一个代码片段。写一段测试代码:

     1 /**
     2  * @author 五月的仓颉http://www.cnblogs.com/xrq730/p/7067904.html
     3  */
     4 @Test
     5 public void testCondition() throws Exception {
     6     Lock lock = new ReentrantLock();
     7     Condition condition = lock.newCondition();
     8         
     9     // 线程0的作用是signal
    10     Runnable runnable0 = new ConditionThread(lock, condition);
    11     Thread thread0 = new Thread(runnable0);
    12     thread0.setName("线程0");
    13     // 线程1的作用是await
    14     Runnable runnable1 = new ConditionThread(lock, condition);
    15     Thread thread1 = new Thread(runnable1);
    16     thread1.setName("线程1");
    17     // 线程2的作用是lock
    18     Runnable runnable2 = new ConditionThread(lock, condition);
    19     Thread thread2 = new Thread(runnable2);
    20     thread2.setName("线程2");
    21         
    22     thread1.start();
    23     Thread.sleep(1000);
    24     thread0.start();
    25     Thread.sleep(1000);
    26     thread2.start();
    27         
    28     thread1.join();
    29 }

    测试代码的意思是:

    1. 线程1先启动,获取锁,调用await()方法等待
    2. 线程0后启动,获取锁,休眠5秒准备signal()
    3. 线程2最后启动,获取锁,由于线程0未使用完毕锁,因此线程2排队,可以此时由于线程0还未signal(),因此线程1在线程0执行signal()后,在AbstractQueuedSynchronizer队列中的顺序是在线程2后面的

    代码执行结果为:

     1 线程1阻塞
     2 线程0休息5秒
     3 线程2想要获取锁
     4 线程0唤醒等待线程
     5 线程2获取锁成功
     6 线程1被唤醒

    符合我们的结论:signal()并不意味着被唤醒的线程立即执行。由于线程2先于线程0排队,因此看到第5行打印的内容,线程2先获取锁。

  • 相关阅读:
    day67——前后端传输数据的编码格式、ajax传json数据/传文件、批量插入
    day66——choices参数、MTV/MVC模型、三种创建多对多的方式、AJAX
    day65——聚合函数、分组查询、F与Q查询、django开事务、orm查询优化
    dayⅢ、基本数据类型+运算符作业
    dayⅡ:编程语言+变量+垃圾回收制
    dayⅡ:变量作业
    dayⅠ:计算机基础知识
    ⅩⅥ:无参装饰器
    ⅩⅤ:作业
    ⅩⅤ:名称空间与作用域
  • 原文地址:https://www.cnblogs.com/xrq730/p/7067904.html
Copyright © 2011-2022 走看看