zoukankan      html  css  js  c++  java
  • Jdk1.6 JUC源码解析(6)-locks-AbstractQueuedSynchronizer

    功能简介:
    • AbstractQueuedSynchronizer(以下简称AQS)是Java并发包提供的一个同步基础机制,是并发包中实现Lock和其他同步机制(如:Semaphore、CountDownLatch和FutureTask等)的基础。
    • AQS内部包含一个FIFO的同步等待队列,简单的说,没有成功获取控制权的线程会在这个队列中等待。
    • AQS内部管理了一个原子的int域作为内部状态信息,并提供了一些方法来访问该域,基于AQS实现的同步机制可以按自己的需要来灵活使用这个 int域,比如:ReentrantLock用它记录锁重入次数;CountDownLatch用它表示内部的count;FutureTask用它表示任务运行状态(Running,Ran和Cancelled);Semaphore用它表示许可数量。
    • AQS提供了独占和共享两种模式。在独占模式下,当一个线程获取了AQS的控制权,其他线程获取控制权的操作就会失败;但在共享模式下,其他线程的获取控制权操作就可能成功。并发包中的同步机制如ReentrantLock就是典型的独占模式,Semaphore是共享模式;也有同时使用两种模式的同步机制,如ReentrantReadWriteLock。  
    • AQS内部提供了一个ConditionObject类来支持独占模式下的(锁)条件,这个条件的功能与Object的wait和notify/notifyAll的功能类似,但更加明确和易用。
    • AQS一般的使用方式为定义一个实现AQS的非公有的内部帮助类作为内部代理,来实现具体同步机制的方法,如Lock的lock和unlock;AQS中也提供一些检测和监控内部队列和条件对象的方法,具体同步机制可以按需使用这些方法;AQS内部只有一个状态,即原子int域,如果基于AQS实现的类需要做序列化/反序列化,注意这一点。    
    源码分析:
    • 内部等待队列:
           首先我们先做一个简单的概览,内部的同步等待队列是由一系列节点组成的一个链表。如果要将一个线程入队(竞争失败,进入队列等待),只需将这个线程及相关信息组成一个节点,拼接到队列链表尾部(尾节点)即可;如果要将一个线程出队(竞争成功),只需重新设置新的队列首部(头节点)即可。
           接下来先看一下组成同步等待队列的节点的类:
        static final class Node {
            /** 表示节点在共享模式下等待的常量 */
            static final Node SHARED = new Node();
            /** 表示节点在独占模式下等待的常量 */
            static final Node EXCLUSIVE = null;
            /** 表示当前节点的线程被取消 */
            static final int CANCELLED =  1;
            /** 表示后继节点的线程需要被唤醒 */
            static final int SIGNAL    = -1;
            /** 表示当前节点的线程正在等待某个条件 */
            static final int CONDITION = -2;
            /**
             * 表示接下来的一个共享模式请求(acquireShared)要无条件的传递(往后继节点方向)下去
             */
            static final int PROPAGATE = -3;
            /**
             * 等待状态域, 取以下值:
             *   SIGNAL:     当前节点的后继节点已经(或即将)被阻塞,所以如果当前节点释放(控制权) 
             *               或者被取消时,必须唤醒其后继节点。为了避免竞争,请求方法必须首先 
             *               声明它们需要一个信号,然后(原子的)调用请求方法,如果失败,当前线程
             *               进入阻塞状态。
             *   CANCELLED:  表示当前节点已经被取消(由于超时或中断),节点一旦进入被取消状态,就
             *               不会再变成其他状态了。具体来说,一个被取消节点的线程永远不会再次被
             *               阻塞
             *   CONDITION:  表示当前节点正处在一个条件队列中。当前节点直到转移时才会被作为一个
             *               同步队列的节点使用。转移时状态域会被设置为0。(使用0值和其他定义值 
             *               并没有关系,只是为了简化操作)
             *   PROPAGATE:  表示一个共享的释放操作(releaseShared)应该被传递到其他节点。该状态
             *               值在doReleaseShared过程中进行设置(仅在头节点),从而保证持续传递,
             *               即使其他操作已经开始。
             *   0:          None of the above
             *
             * 这些状态值之所以用数值来表示,目的是为了方便使用,非负的值意味着节点不需要信号(被唤醒)。
             * 所以,一些代码中不需要针对特殊值去做检测,只需要检查符号(正负)即可。
             * 
             * 针对普通的同步节点,这个域被初始化为0;针对条件(condition)节点,初始化为CONDITION(-2)
             * 需要通过CAS操作来修改这个域(如果可能的话,可以使用volatile写操作)。
             */
            volatile int waitStatus;
            /**
             * 指向当前节点的前驱节点,用于检测等待状态。这个域在入队时赋值,出队时置空。
             * 而且,在取消前驱节点的过程中,可以缩短寻找非取消状态节点的过程。由于头节点 
             * 永远不会取消(一个节点只有请求成功才会变成头节点,一个被取消的节点永远不可 
             * 能请求成功,而且一个线程只能取消自己所在的节点),所以总是存在一个非取消状态节点。
             */
            volatile Node prev;
            /**
             * 指向当前节点的后继节点,释放(控制权)时会唤醒该节点。这个域在入队时赋值,在跳过
             * 取消状态节点时进行调整,在出队时置空。入队操作在完成之前并不会对一个前驱节点的
             * next域赋值,所以一个节点的next域为null并不能说明这个节点在队列尾部。然而,如果
             * next域为null,我们可以从尾节点通过前驱节点往前扫描来做双重检测。取消状态节点的
             * next域指向自身,这样可以简化isOnSyncQueue的实现。
             */
            volatile Node next;
            /**
             * 使当前节点入队的线程。在构造构造的时候初始化,使用后置为null。
             */
            volatile Thread thread;
            /**
             * 指向下一个条件等待状态节点或者为特殊值(SHARED)。由于条件队列只有在独占模式下才
             * 能访问,所以我们只需要一个普通的链表队列来保存处于等待状态的节点。它们在重新请
             * 求的时候会转移到同步队列。由于条件只存在于独占模式下,所以如果是共享模式,就将
             * 这域保存为一个特殊值(SHARED)。
             */
            Node nextWaiter;
            /**
             * Returns true if node is waiting in shared mode
             */
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
            /**
             * Returns previous node, or throws NullPointerException if null.
             * Use when predecessor cannot be null.  The null check could
             * be elided, but is present to help the VM.
             *
             * @return the predecessor of this node
             */
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
            Node() {    // Used to establish initial head or SHARED marker
            }
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
           说明:节点类Node内部定义了一些常量,如节点模式、等待状态;Node 内部有指向其前驱和后继节点的引用(类似双向链表);Node内部有保存当前线程的引用;Node内部的nextWaiter域在共享模式下指向一个常量 SHARED,在独占模式下为null或者是一个普通的等待条件队列(只有独占模式下才存在等待条件)。
     
           再看一下AQS中同步等待队列相关的域:
    Java代码 
    1. /** 
    2.  * 同步等待队列的头节点,延迟初始化。除了初始化之外,只能通过setHead方法来改变 
    3.  * 这个域。注:如果头结点存在,那么它的waitStatus可以保证一定不是CANCELLED。 
    4.  */  
    5. private transient volatile Node head;  
    6. /** 
    7.  * 同步等待队列的尾节点,延迟初始化。只有通过enq方法添加一个新的等待节点的时候 
    8.  * 才会改变这个域。 
    9.  */  
    10. private transient volatile Node tail;  
     
     
    • 内部状态值:
    Java代码  收藏代码
    1. /** 
    2.  * The synchronization state. 
    3.  */  
    4. private volatile int state;  
    5. /** 
    6.  * Returns the current value of synchronization state. 
    7.  * This operation has memory semantics of a <tt>volatile</tt> read. 
    8.  * @return current state value 
    9.  */  
    10. protected final int getState() {  
    11.     return state;  
    12. }  
    13. /** 
    14.  * Sets the value of synchronization state. 
    15.  * This operation has memory semantics of a <tt>volatile</tt> write. 
    16.  * @param newState the new state value 
    17.  */  
    18. protected final void setState(int newState) {  
    19.     state = newState;  
    20. }  
    21. /** 
    22.  * Atomically sets synchronization state to the given updated 
    23.  * value if the current state value equals the expected value. 
    24.  * This operation has memory semantics of a <tt>volatile</tt> read 
    25.  * and write. 
    26.  * 
    27.  * @param expect the expected value 
    28.  * @param update the new value 
    29.  * @return true if successful. False return indicates that the actual 
    30.  *         value was not equal to the expected value. 
    31.  */  
    32. protected final boolean compareAndSetState(int expect, int update) {  
    33.     // See below for intrinsics setup to support this  
    34.     return unsafe.compareAndSwapInt(this, stateOffset, expect, update);  
    35. }  
    • 上面已经看到AQS内部的整体数据结构,一个同步等待队列+一个(原子的)int域。下面来从请求和释放两条主线来进行相关代码分析。
           首先看一下独占模式下,忽略中断的请求方法:
    Java代码  收藏代码
    1. /** 
    2.  * 独占模式下进行请求,忽略中断。方法实现中至少会调用一次tryAcquire方法, 
    3.  * 请求成功后方法返回。否则当前线程会排队,可能会重复的阻塞和解除阻塞, 
    4.  * 执行tryAcquire方法,直到成功。这个方法可以用来实现Lock的lock方法。 
    5.  * 
    6.  * @param arg the acquire argument.  这个值被传递给tryAcquire方法,值在 
    7.  *        这里并没有实际意义,如果基于AQS实现自己的同步机制(可能要实现 
    8.  *        tryAcquire方法),可以灵活利用这个值。 
    9.  */  
    10. public final void acquire(int arg) {  
    11.     if (!tryAcquire(arg) &&  
    12.         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  
    13.         selfInterrupt();  
    14. }  
           acquire方法中首先调用tryAcquire方法,如果tryAcquire返回true,说明请求成功,直接返回;否则,继续调用 acquireQueued方法,如果acquireQueued方法返回true,还需要调用一下selfInterrupt方法。
    首先看一下tryAcquire方法,该方法在AQS中并没有具体实现,而是开放出来,交由子类去实现。
    Java代码  收藏代码
    1. /** 
    2.  * 在独占模式下尝试请求(控制权)。这个方法(实现)应该查看一下对象的 
    3.  * 状态是否允许在独占模式下请求,如果允许再进行请求。 
    4.  * 
    5.  * 这个方法总是被请求线程执行,如果方法执行失败,会将当前线程放到 
    6.  * 同步等待队列中(如果当前线程还不在同步等待队列中),直到被其他线程的释放 
    7.  * 操作唤醒。可以用来实现Lock的tryLock方法。 
    8.  * 
    9.  * 该方法默认抛出UnsupportedOperationException异常。 
    10.  * 
    11.  * @param arg the acquire argument. This value is always the one 
    12.  *        passed to an acquire method, or is the value saved on entry 
    13.  *        to a condition wait.  The value is otherwise uninterpreted 
    14.  *        and can represent anything you like. 
    15.  * @return {@code true} if successful. Upon success, this object has 
    16.  *         been acquired. 
    17.  * @throws IllegalMonitorStateException if acquiring would place this 
    18.  *         synchronizer in an illegal state. This exception must be 
    19.  *         thrown in a consistent fashion for synchronization to work 
    20.  *         correctly. 
    21.  * @throws UnsupportedOperationException if exclusive mode is not supported 
    22.  */  
    23. protected boolean tryAcquire(int arg) {  
    24.     throw new UnsupportedOperationException();  
    25. }  
           接下来调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg),先看下其中的addWaiter方法。
    Java代码  收藏代码
    1. /** 
    2.  * Creates and enqueues node for current thread and given mode. 
    3.  * 
    4.  * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 
    5.  * @return the new node 
    6.  */  
    7. private Node addWaiter(Node mode) {  
    8.     //根据当前线程和模式创建一个Node。  
    9.     Node node = new Node(Thread.currentThread(), mode);  
    10.     //尝试快速入队,失败的话再执行正常的入队过程  
    11.     Node pred = tail;  
    12.     if (pred != null) {  
    13.         //如果同步等待队列尾节点不为null,将当前(线程的)Node链接到尾节点。  
    14.         node.prev = pred;  
    15.         //尝试将当前Node设置(原子操作)为同步等待队列的尾节点。  
    16.         if (compareAndSetTail(pred, node)) {  
    17.             //如果设置成功,完成链接(pred的next指向当前节点)。  
    18.             pred.next = node;  
    19.             //返回当前节点。  
    20.             return node;  
    21.         }  
    22.     }  
    23.     //如果同步等待队列尾节点为null,或者快速入队过程中设置尾节点失败,  
    24.     //进行正常的入队过程,调用enq方法。  
    25.     enq(node);  
    26.     //返回当前节点。  
    27.     return node;  
    28. }  
           看一下入队方法。
    Java代码  收藏代码
    1. /** 
    2.  * Inserts node into queue, initializing if necessary. See picture above. 
    3.  * @param node the node to insert 
    4.  * @return node's predecessor 
    5.  */  
    6. private Node enq(final Node node) {  
    7.     for (;;) {  
    8.         Node t = tail;  
    9.         if (t == null) { // Must initialize  
    10.             /*  
    11.              * 如果同步等待队列尾节点为null,说明还没有任何线程进入同步等待队列, 
    12.              * 这时要初始化同步等待队列:创建一个(dummy)节点,然后尝试将这个 
    13.              * 节点设置(CAS)为头节点,如果设置成功,将尾节点指向头节点 
    14.              * 也就是说,第一次有线程进入同步等待队列时,要进行初始化,初始化 
    15.              * 的结果就是头尾节点都指向一个哑(dummy)节点。 
    16.              */  
    17.             if (compareAndSetHead(new Node()))  
    18.                 tail = head;  
    19.         } else {  
    20.             //将当前(线程)节点的前驱节点指向同步等待队列的尾节点。  
    21.             node.prev = t;    
    22.             //注意节点拼接到同步等待队列总是分为3个步骤:1.将其prev引用指向尾节点 2.尝试将其设置为尾节点 3.将其prev节点(第2步之前的尾节点)的next指向其本身。  
    23.             //所以一个节点为尾节点,可以保证prev一定不为null,但无法保证其prev的next不为null。所以后续的一些方法内会看到很多对同步等待队列的反向遍历。   
    24.   
    25.             //尝试将当前节点设置为同步等待队列的尾节点。  
    26.             if (compareAndSetTail(t, node)) {  
    27.                 //如果成功,将之前尾节点的后继节点指向当前节点(现在的尾节点),完成节点拼接。  
    28.                 t.next = node;  
    29.                 //返回之前的尾节点。  
    30.                 return t;  
    31.             }  
    32.         }  
    33.     }  
    34. }  
           现在可以看acquireQueued方法。
    Java代码  收藏代码
    1. /** 
    2.  * Acquires in exclusive uninterruptible mode for thread already in 
    3.  * queue. Used by condition wait methods as well as acquire. 
    4.  * 
    5.  * @param node the node 
    6.  * @param arg the acquire argument 
    7.  * @return {@code true} if interrupted while waiting 
    8.  */  
    9. final boolean acquireQueued(final Node node, int arg) {  
    10.     boolean failed = true;  
    11.     try {  
    12.         boolean interrupted = false;  
    13.         for (;;) {  
    14.             //找到当前节点的前驱节点p  
    15.             final Node p = node.predecessor();  
    16.             /* 
    17.              * 检测p是否为头节点,如果是,再次调用tryAcquire方法 
    18.              * (这里可以体现出acquire方法执行过程中tryAcquire方法 
    19.              * 至少被调用一次)。 
    20.              */  
    21.             if (p == head && tryAcquire(arg)) {  
    22.                 //如果p节点是头节点且tryAcquire方法返回true。那么将  
    23.                 //当前节点设置为头节点。  
    24.                 //从这里可以看出,请求成功且已经存在队列中的节点会被设置成头节点。  
    25.                 setHead(node);  
    26.                 //将p的next引用置空,帮助GC,现在p已经不再是头节点了。  
    27.                 p.next = null; // help GC  
    28.                 //设置请求标记为成功  
    29.                 failed = false;  
    30.                 //传递中断状态,并返回。  
    31.                 return interrupted;  
    32.             }  
    33.             //如果p节点不是头节点,或者tryAcquire返回false,说明请求失败。  
    34.             //那么首先需要判断请求失败后node节点是否应该被阻塞,如果应该  
    35.             //被阻塞,那么阻塞node节点,并检测中断状态。  
    36.             if (shouldParkAfterFailedAcquire(p, node) &&  
    37.                 parkAndCheckInterrupt())  
    38.                 //如果有中断,设置中断状态。  
    39.                 interrupted = true;  
    40.         }  
    41.     } finally {  
    42.         if (failed) //最后检测一下如果请求失败(异常退出),取消请求。  
    43.             cancelAcquire(node);  
    44.     }  
    45. }  
           上面方法中如果请求成功,会将当前节点设置为同步等待队列的头节点。看一下设置为头节点的方法。
    Java代码  收藏代码
    1. /** 
    2.  * Sets head of queue to be node, thus dequeuing. Called only by 
    3.  * acquire methods.  Also nulls out unused fields for sake of GC 
    4.  * and to suppress unnecessary signals and traversals. 
    5.  * 
    6.  * @param node the node 
    7.  */  
    8. private void setHead(Node node) {  
    9.     head = node;  
    10.     //请求成功,当前线程获取控制权,当前节点会取代之前(dummy)头节点的位置。所以置空thread和prev这些没用的域。  
    11.     node.thread = null;  
    12.     node.prev = null;  
    13. }  
           继续看shouldParkAfterFailedAcquire方法。
    Java代码  收藏代码
    1. /** 
    2.  * 在一个节点请求失败时,检测并更新改节点的(等待)状态。如果当前 
    3.  * 节点的线程应该被阻塞,那么返回true。这里是整个请求(循环)中主 
    4.  * 要信号控制部分。方法的条件:pred == node.prev              
    5.  * 
    6.  * @param pred node's predecessor holding status 
    7.  * @param node the node 
    8.  * @return {@code true} if thread should block 
    9.  */  
    10. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  
    11.     //获取当前节点的前驱节点的等待状态。  
    12.     int ws = pred.waitStatus;  
    13.     if (ws == Node.SIGNAL)  
    14.         /* 
    15.          * 如果当前节点的前驱节点的状态为SIGNAL,说明当前节点已经声明了需要唤醒, 
    16.          * 所以可以阻塞当前节点了,直接返回true。 
    17.          * 一个节点在其被阻塞之前需要线程"声明"一下其需要唤醒(就是将其前驱节点 
    18.          * 的等待状态设置为SIGNAL,注意其前驱节点不能是取消状态,如果是,要跳过) 
    19.          */  
    20.         return true;  
    21.     if (ws > 0) {  
    22.         /* 
    23.          * 如果当前节点的前驱节点是取消状态,那么需要跳过这些(取消状态)前驱节点 
    24.          * 然后重试。 
    25.          */  
    26.         do {  
    27.             node.prev = pred = pred.prev;  
    28.         } while (pred.waitStatus > 0);  
    29.         pred.next = node;  
    30.     } else {  
    31.         /* 
    32.          * 这里等待状态一定是0或者PROPAGATE。这里将当前节点的前驱节点(非取消状态)的 
    33.          * 等待状态设置为SIGNAL。来声明需要一个(唤醒)信号。接下来方法会返回false, 
    34.          * 还会继续尝试一下请求,以确保在阻塞之前确实无法请求成功。 
    35.          */  
    36.         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  
    37.     }  
    38.     return false;  
    39. }  
           再看一下进行实际阻塞操作的parkAndCheckInterrupt方法。
    Java代码  收藏代码
    1. /** 
    2.  * Convenience method to park and then check if interrupted 
    3.  * 
    4.  * @return {@code true} if interrupted 
    5.  */  
    6. private final boolean parkAndCheckInterrupt() {  
    7.     //阻塞当前线程。  
    8.     LockSupport.park(this);  
    9.     //线程被唤醒,方法返回当前线程的中断状态,并重置当前线程的中断状态(置为false)。  
    10.     return Thread.interrupted();  
    11. }  
           看一下acquireQueued最后finally块中的cancelAcquire方法。
    Java代码  收藏代码
    1. /** 
    2.  * Cancels an ongoing attempt to acquire. 
    3.  * 
    4.  * @param node the node 
    5.  */  
    6. private void cancelAcquire(Node node) {  
    7.     // Ignore if node doesn't exist  
    8.     if (node == null)  
    9.         return;  
    10.     //跳过首先将要取消的节点的thread域置空。  
    11.     node.thread = null;  
    12.     //跳过状态为"取消"的前驱节点。  
    13.     Node pred = node.prev;  
    14.     //node前面总是会存在一个非"取消"状态的节点,所以这里不需要null检测。  
    15.     while (pred.waitStatus > 0)  
    16.         node.prev = pred = pred.prev;  
    17.     // predNext节点(node节点前面的第一个非取消状态节点的后继节点)是需要"断开"的节点。   
    18.     // 下面的CAS操作会达到"断开"效果,但(CAS操作)也可能会失败,因为可能存在其他"cancel"   
    19.     // 或者"singal"的竞争  
    20.     Node predNext = pred.next;  
    21.     // Can use unconditional write instead of CAS here.  
    22.     // After this atomic step, other Nodes can skip past us.  
    23.     // Before, we are free of interference from other threads.  
    24.     node.waitStatus = Node.CANCELLED;  
    25.     // 如果当前节点是尾节点,那么删除当前节点(将当前节点的前驱节点设置为尾节点)。  
    26.     if (node == tail && compareAndSetTail(node, pred)) {  
    27.         //将前驱节点(已经设置为尾节点)的next置空。  
    28.         compareAndSetNext(pred, predNext, null);  
    29.     } else {  
    30.         //如果当前节点不是尾节点,说明后面有其他等待线程,需要做一些唤醒工作。  
    31.   
    32.         // 如果当前节点不是头节点,那么尝试将当前节点的前驱节点  
    33.         // 的等待状态改成SIGNAL,并尝试将前驱节点的next引用指向  
    34.         // 其后继节点。否则,唤醒后继节点。  
    35.         int ws;  
    36.         if (pred != head &&  
    37.             ( (ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)) )  
    38.             && pred.thread != null) {  
    39.             //如果当前节点的前驱节点不是头节点,那么需要给当前节点的后继节点一个"等待唤醒"的标记,  
    40.             //即 将当前节点的前驱节点等待状态设置为SIGNAL,然后将其设置为当前节点的后继节点的前驱节点....(真绕!)  
    41.             Node next = node.next;  
    42.             if (next != null && next.waitStatus <= 0)  
    43.                 compareAndSetNext(pred, predNext, next);  
    44.         } else {  
    45.             //否则,唤醒当前节点的后继节点。  
    46.             unparkSuccessor(node);  
    47.         }  
    48.         //前面提到过,取消节点的next引用会指向自己。  
    49.         node.next = node; // help GC  
    50.     }  
    51. }  
           最后来看一下unparkSuccessor方法。
    Java代码  收藏代码
    1. /** 
    2.  * 如果node存在后继节点,唤醒后继节点。 
    3.  * 
    4.  * @param node the node 
    5.  */  
    6. private void unparkSuccessor(Node node) {  
    7.     /* 
    8.      * 如果node的等待状态为负数(比如:可能需要一个信号),尝试去清空 
    9.      * "等待唤醒"的状态(将状态置为0),即使设置失败,或者该状态已经  
    10.      * 被正在等待的线程修改,也没有任何影响。  
    11.      */  
    12.     int ws = node.waitStatus;  
    13.     if (ws < 0) //如果当前节点的状态小于0,尝试设置为0。  
    14.         compareAndSetWaitStatus(node, ws, 0);  
    15.     /* 
    16.      * 需要唤醒的线程在node的后继节点,一般来说就是node的next引用指向的节点。  
    17.      * 但如果next指向的节点被取消或者为null,那么就同步等待队列的队尾反向查找离  
    18.      * 当前节点最近的且状态不是"取消"的节点。  
    19.      */  
    20.     Node s = node.next;  
    21.     if (s == null || s.waitStatus > 0) {  
    22.         s = null;  
    23.         for (Node t = tail; t != null && t != node; t = t.prev)  
    24.             if (t.waitStatus <= 0)  
    25.                 s = t;  
    26.     }  
    27.     if (s != null) //如果存在(需要唤醒的节点),将该节点的线程唤醒。  
    28.         LockSupport.unpark(s.thread);  
    29. }  
           回到acquire方法,最后如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg)返回true,说明当前线程被中断,会继续调用selfInterrupt方法。
    Java代码  收藏代码
    1. /** 
    2.  * Convenience method to interrupt current thread. 
    3.  */  
    4. private static void selfInterrupt() {  
    5.     //中断当前线程。  
    6.     Thread.currentThread().interrupt();  
    7. }  
           OK,现在来总结一下acquire方法中的逻辑:
           1.调用tryAcquire方法进行(控制权)请求,如果请求成功,方法直接返回。
           2.如果请求失败,那么会使用当前线程建立一个独占模式的节点,然后将节点放到同步等待队列的队尾。然后进入一个无限循环。(这个过程中会帮助完成同步 等待队列的初始化,初始化过程中也可以看到,同步等待队列初始化后头尾节点都指向同一个哑节点。请求失败的线程(节点)进入队列时会链接到队列的尾部,如 果同步等待队列内的线程(节点)请求成功,会将其设置为新的头节点。)
           3.无限循环中会判断当前同步等待队列中是否有其他线程。
           4. 如果没有,再次调用tryAcquire进行请求。
           5.如果请求成功,将当前节点设置为同步等待队列头节点,向上传递中断状态,然后主循环退出。
           6.如果同步等待队列中有其他线程(在当前线程前面),或者前面第4步请求失败,那么首先需要检查当前节点是否已经设置"等待唤醒"标记,即将其非取消状态前驱节点的等待状态设置为SIGNAL。
           7.如果未设置"等待唤醒"标记,进行标记设置,然后继续进行无限循环,进入第3步。
           8.如果已经设置"等待唤醒"标记,那么阻塞当前线程(节点)。
           9.当前节点(线程)被唤醒后,设置(传递)中断标记,然后继续进行无限循环,进入第3步。
           10.最后在无限循环退出后,要判断请求是否失败(由于一些原因,循环退出,但请求失败),如果失败,取消当前节点。
     
           接下来看一下独占模式下,响应中断的请求方法,这个方法会抛出中断异常:
    Java代码  收藏代码
    1. /** 
    2.  * 独占模式下进行请求,如果当前线程被中断,放弃方法执行(抛出异常),  
    3.  * 方法实现中,首先会检查当前线程的中断状态,然后会执行至少一次 
    4.  * tryAcquire方法,如果请求成功,方法返回;如果失败,当前线程会。 
    5.  * 在同步等待队列中排队,可能会重复的被阻塞和被唤醒,并执行tryAcquire 
    6.  * 方法直到成功或者当前线程被中断。可以用来实现Lock的lockInterruptibly。 
    7.  * 
    8.  * @param arg the acquire argument.  This value is conveyed to 
    9.  *        {@link #tryAcquire} but is otherwise uninterpreted and 
    10.  *        can represent anything you like. 
    11.  * @throws InterruptedException if the current thread is interrupted 
    12.  */  
    13. public final void acquireInterruptibly(int arg) throws InterruptedException {  
    14.     if (Thread.interrupted())  
    15.         throw new InterruptedException();  
    16.     if (!tryAcquire(arg)) //如果请求不成功,执行doAcquireInterruptibly方法。  
    17.         doAcquireInterruptibly(arg);  
    18. }  
           继续看一下doAcquireInterruptibly方法。
    Java代码  收藏代码
    1. /** 
    2.  * Acquires in exclusive interruptible mode. 
    3.  * @param arg the acquire argument 
    4.  */  
    5. private void doAcquireInterruptibly(int arg)  
    6.     throws InterruptedException {  
    7.     final Node node = addWaiter(Node.EXCLUSIVE);  
    8.     boolean failed = true;  
    9.     try {  
    10.         for (;;) {  
    11.             final Node p = node.predecessor();  
    12.             if (p == head && tryAcquire(arg)) {  
    13.                 setHead(node);  
    14.                 p.next = null; // help GC  
    15.                 failed = false;  
    16.                 return;  
    17.             }  
    18.             if (shouldParkAfterFailedAcquire(p, node) &&  
    19.                 parkAndCheckInterrupt())  
    20.                 throw new InterruptedException(); //区别  
    21.         }  
    22.     } finally {  
    23.         if (failed)  
    24.             cancelAcquire(node);  
    25.     }  
    26. }  
           和前面的acquireQueued方法类似,区别基本上只是对中断状态的处理,这里没有将中断状态传递给上层,而是直接抛出InterruptedException异常,方法实现里其他方法的分析可以参考前面。
     
           最后看一下独占模式下,响应中断并且支持超时的请求方法:
    Java代码  收藏代码
    1. /** 
    2.  * 独占模式下进行请求,如果当前线程被中断,放弃方法执行(抛出异常), 
    3.  * 如果给定的超时时间耗尽,方法失败。方法实现中,首先会检查当前线程 
    4.  * 的中断状态,然后会执行至少一次tryAcquire方法,如果请求成功,方法 
    5.  * 返回;如果失败,当前线程会在同步等待队列中排队,可能会重复的被阻塞和 
    6.  * 被唤醒,并执行tryAcquire方法直到成功或者当前线程被中断或者超时时  
    7.  * 间耗尽。可以用来实现Lock的tryLock(long, TimeUnit)。 
    8.  * 
    9.  * @param arg the acquire argument.  This value is conveyed to 
    10.  *        {@link #tryAcquire} but is otherwise uninterpreted and 
    11.  *        can represent anything you like. 
    12.  * @param nanosTimeout the maximum number of nanoseconds to wait 
    13.  * @return {@code true} if acquired; {@code false} if timed out 
    14.  * @throws InterruptedException if the current thread is interrupted 
    15.  */  
    16. public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {  
    17.     if (Thread.interrupted())  
    18.         throw new InterruptedException();  
    19.     return tryAcquire(arg) || //如果请求失败,调用doAcquireNanos方法。  
    20.         doAcquireNanos(arg, nanosTimeout);  
    21. }  
           继续看一下doAcquireNanos方法。
    Java代码  收藏代码
    1. /** 
    2.  * Acquires in exclusive timed mode. 
    3.  * 
    4.  * @param arg the acquire argument 
    5.  * @param nanosTimeout max wait time 
    6.  * @return {@code true} if acquired 
    7.  */  
    8. private boolean doAcquireNanos(int arg, long nanosTimeout)  
    9.     throws InterruptedException {  
    10.     long lastTime = System.nanoTime();  
    11.     final Node node = addWaiter(Node.EXCLUSIVE);  
    12.     boolean failed = true;  
    13.     try {  
    14.         for (;;) {  
    15.             final Node p = node.predecessor();  
    16.             if (p == head && tryAcquire(arg)) {  
    17.                 setHead(node);  
    18.                 p.next = null; // help GC  
    19.                 failed = false;  
    20.                 return true;  
    21.             }  
    22.             if (nanosTimeout <= 0)  
    23.                 return false;  
    24.             if (shouldParkAfterFailedAcquire(p, node) &&  
    25.                 nanosTimeout > spinForTimeoutThreshold) //区别  
    26.                 LockSupport.parkNanos(this, nanosTimeout); //区别  
    27.             long now = System.nanoTime();  
    28.             nanosTimeout -= now - lastTime;  
    29.             lastTime = now;  
    30.             if (Thread.interrupted())  
    31.                 throw new InterruptedException();  
    32.         }  
    33.     } finally {  
    34.         if (failed)  
    35.             cancelAcquire(node);  
    36.     }  
    37. }  
    38. /** 
    39.  * The number of nanoseconds for which it is faster to spin 
    40.  * rather than to use timed park. A rough estimate suffices 
    41.  * to improve responsiveness with very short timeouts. 
    42.  */  
    43. static final long spinForTimeoutThreshold = 1000L;  
           和前面的doAcquireInterruptibly方法类似,区别在于方法实现里面加入了超时时间的检测,如果超时方法返回false。阻塞部分较之 前也有区别,如果剩余的超时时间小于1000纳秒,方法自旋;否则当前线程阻塞一段时间(剩余超时时间时长)。方法实现里其他方法的分析可以参考前面。
     
           看完了独占模式下的请求方法,继续分析共享模式下的请求方法。首先看下忽略中断的请求方法:
    Java代码  收藏代码
    1. /** 
    2.  * Acquires in shared mode, ignoring interrupts.  Implemented by 
    3.  * first invoking at least once {@link #tryAcquireShared}, 
    4.  * returning on success.  Otherwise the thread is queued, possibly 
    5.  * repeatedly blocking and unblocking, invoking {@link 
    6.  * #tryAcquireShared} until success. 
    7.  * 
    8.  * @param arg the acquire argument.  This value is conveyed to 
    9.  *        {@link #tryAcquireShared} but is otherwise uninterpreted 
    10.  *        and can represent anything you like. 
    11.  */  
    12. public final void acquireShared(int arg) {  
    13.     //首先调用tryAcquireShared方法  
    14.     if (tryAcquireShared(arg) < 0)  
    15.         //如果tryAcquireShared方法返回结果小于0,继续调用doAcquireShared方法。  
    16.         doAcquireShared(arg);  
    17. }  
           acquireShared方法中首先调用tryAcquireShared方法,如果tryAcquireShared返回值大于等于0,说明请求成 功,直接返回;否则,继续调用doAcquireShared方法。先看一下tryAcquireShared方法,该方法在AQS中并没有具体实现,同 样开放出来,交由子类去实现。
    Java代码  收藏代码
    1. /** 
    2.  * 在共享模式下尝试请求(控制权)。这个方法(实现)应该查看一下对象的 
    3.  * 状态是否允许在共享模式下请求,如果允许再进行请求。 
    4.  * 
    5.  * 这个方法总是被请求线程执行,如果方法执行失败,会将当前线程放到 
    6.  * 同步等待队列中(如果当前线程还不在同步等待队列中),直到被其他线程的释放 
    7.  * 操作唤醒。 
    8.  * 
    9.  * <p>The default implementation throws {@link 
    10.  * UnsupportedOperationException}. 
    11.  * 
    12.  * @param arg the acquire argument. This value is always the one 
    13.  *        passed to an acquire method, or is the value saved on entry 
    14.  *        to a condition wait.  The value is otherwise uninterpreted 
    15.  *        and can represent anything you like. 
    16.  * @return 返回负数表示失败;返回0表示共享模式下的请求成功,但是接下来 
    17.  *         的共享模式请求不会成功;返回正数表示共享模式请求成功,接下来 
    18.  *         的共享模式请求也可以成功,当然前提是接下来的等待线程必须检测 
    19.  *         对象的状态是否允许请求。(Support for three different 
    20.  *         return values enables this method to be used in contexts 
    21.  *         where acquires only sometimes act exclusively.)  Upon 
    22.  *         success, this object has been acquired. 
    23.  * @throws IllegalMonitorStateException if acquiring would place this 
    24.  *         synchronizer in an illegal state. This exception must be 
    25.  *         thrown in a consistent fashion for synchronization to work 
    26.  *         correctly. 
    27.  * @throws UnsupportedOperationException if shared mode is not supported 
    28.  */  
    29. protected int tryAcquireShared(int arg) {  
    30.     throw new UnsupportedOperationException();  
    31. }  
           接下来看一下doAcquireShared方法。
    Java代码  收藏代码
    1. /** 
    2.  * Acquires in shared uninterruptible mode. 
    3.  * @param arg the acquire argument 
    4.  */  
    5. private void doAcquireShared(int arg) {  
    6.     //将当前线程以共享模式加入同步等待队列。  
    7.     final Node node = addWaiter(Node.SHARED);  
    8.     boolean failed = true;  
    9.     try {  
    10.         boolean interrupted = false;  
    11.         //请求主循环  
    12.         for (;;) {  
    13.             //获取当前节点的前驱节点p  
    14.             final Node p = node.predecessor();  
    15.             if (p == head) {  
    16.                 //如果p是头节点。再次调用tryAcquireShared方法。  
    17.                 int r = tryAcquireShared(arg);  
    18.                 if (r >= 0) {  
    19.                     //如果tryAcquireShared方法执行成功,执行setHeadAndPropagate  
    20.                     setHeadAndPropagate(node, r);  
    21.                     //p节点被移除,置空next引用,帮助GC。  
    22.                     p.next = null; // help GC  
    23.                     if (interrupted)//检测中断状态,传递中断状态。  
    24.                         selfInterrupt();  
    25.                     //标记方法请求成功。  
    26.                     failed = false;  
    27.                     return;  
    28.                 }  
    29.             }  
    30.             //如果当前节点的前驱节点不是头节点,判断当前节点  
    31.             //请求失败后是否要被阻塞,如果是,阻塞并保存当前线程中断状态。  
    32.             if (shouldParkAfterFailedAcquire(p, node) &&  
    33.                 parkAndCheckInterrupt())  
    34.                 interrupted = true;  
    35.         }  
    36.     } finally {  
    37.         if (failed)//如果请求失败,取消当前节点。  
    38.             cancelAcquire(node);  
    39.     }  
    40. }  
          上面的方法实现里,如果请求成功,会调用setHeadAndPropagate方法,看下这个方法的实现。
    Java代码  收藏代码
    1. /** 
    2.  * 将node设置为同步等待队列的头节点,并且检测一下node的后继节点是 
    3.  * 否在共享模式下等待,如果是,并且propagate > 0 或者之前头节 
    4.  * 点的等待状态是PROPAGATE,唤醒后续节点。 
    5.  * 
    6.  * @param node the node 
    7.  * @param propagate the return value from a tryAcquireShared 
    8.  */  
    9. private void setHeadAndPropagate(Node node, int propagate) {  
    10.     Node h = head; // Record old head for check below  
    11.     setHead(node);  
    12.     /* 
    13.      * 尝试去唤醒队列中的下一个节点,如果满足如下条件: 
    14.      *   调用者明确表示"传递"(propagate > 0), 
    15.      *     或者h.waitStatus为PROPAGATE(被上一个操作设置) 
    16.      *     (注:这里使用符号检测是因为PROPAGATE状态可能会变成SIGNAL状态) 
    17.      * 并且 
    18.      *   下一个节点处于共享模式或者为null。 
    19.      * 
    20.      * The conservatism in both of these checks may cause 
    21.      * unnecessary wake-ups, but only when there are multiple 
    22.      * racing acquires/releases, so most need signals now or soon 
    23.      * anyway. 
    24.      */  
    25.     if (propagate > 0 || h == null || h.waitStatus < 0) {  
    26.         Node s = node.next;  
    27.         if (s == null || s.isShared())  
    28.             doReleaseShared();  
    29.     }  
    30. }  
           继续看下doReleaseShared方法。
    Java代码  收藏代码
    1. /** 
    2.  * 共享模式下的释放(控制权)动作 -- 唤醒后继节点并保证传递。  
    3.  * 注:在独占模式下,释放仅仅意味着如果有必要,唤醒头节点的  
    4.  * 后继节点。 
    5.  */  
    6. private void doReleaseShared() {  
    7.     /* 
    8.      * 保证释放动作(向同步等待队列尾部)传递,即使没有其他正在进行的  
    9.      * 请求或释放动作。如果头节点的后继节点需要唤醒,那么执行唤  
    10.      * 动作;如果不需要,将头结点的等待状态设置为PROPAGATE保证   
    11.      * 唤醒传递。另外,为了防止过程中有新节点进入(队列),这里必  
    12.      * 需做循环,所以,和其他unparkSuccessor方法使用方式不一样  
    13.      * 的是,如果(头结点)等待状态设置失败,重新检测。 
    14.      */  
    15.     for (;;) {  
    16.         Node h = head;  
    17.         //判断同步等待队列是否为空  
    18.         if (h != null && h != tail) {  
    19.             //如果不为空,获取头节点的等待状态。  
    20.             int ws = h.waitStatus;  
    21.             if (ws == Node.SIGNAL) {  
    22.                 //如果等待状态是SIGNAL,说明其后继节点需要唤醒  
    23.                 //尝试修改等待状态  
    24.                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))  
    25.                     continue;            //如果修改失败,重新循环检测。  
    26.                 unparkSuccessor(h);//如果修改成功,唤醒头节点的后继节点。  
    27.             }  
    28.             else if (ws == 0 &&  
    29.                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //如果等待状态是0,尝试将其(头节点)设置为PROPAGATE  
    30.                 continue;                // 如果设置失败,继续循环检测。  
    31.         }  
    32.         if (h == head)                   // 如果过程中头节点没有发生变化,循环退出;否则需要继续检测。  
    33.             break;  
    34.     }  
    35. }  
           总结一下acquireShared方法中的逻辑:
           1.调用tryAcquireShared方法进行(控制权)请求,如果请求成功,方法直接返回。
           2.如果请求失败,那么会使用当前线程建立一个共享模式的节点,然后将节点放到同步等待队列的队尾。然后进入一个无限循环。
           3.无限循环中会判断当前同步等待队列中是否有其他线程。
           4.如果没有,再次调用tryAcquireShared进行请求。
           5.如果请求成功,将当前节点设置为同步等待队列头节点,同时检查是否需要继续唤醒下一个共享模式的节点,如果需要就继续执行唤醒动作。当然还会向上传递中断状态,然后主循环退出。
           6.如果同步等待队列中有其他线程(在当前线程前面),或者第4步的请求失败,那么首先需要检查当前节点是否已经设置"等待唤醒"标记,即将其非取消状态前驱节点的等待状态设置为SIGNAL。
           7.如果未设置"等待唤醒"标记,进行标记设置,然后继续进行无限循环,进入第3步。
           8.如果已经设置"等待唤醒"标记,那么阻塞当前线程(节点)。
           9.当前节点(线程)被唤醒后,设置(传递)中断标记,然后继续进行无限循环,进入第3步。
           10.最后在无限循环退出后,要判断请求是否失败(由于一些原因,循环退出,但请求失败),如果失败,取消当前节点。      
     
           接下来看一下共享模式下,响应中断的请求方法,这个方法会抛出中断异常:
    Java代码  收藏代码
    1. /** 
    2.  * Acquires in shared mode, aborting if interrupted.  Implemented 
    3.  * by first checking interrupt status, then invoking at least once 
    4.  * {@link #tryAcquireShared}, returning on success.  Otherwise the 
    5.  * thread is queued, possibly repeatedly blocking and unblocking, 
    6.  * invoking {@link #tryAcquireShared} until success or the thread 
    7.  * is interrupted. 
    8.  * @param arg the acquire argument 
    9.  * This value is conveyed to {@link #tryAcquireShared} but is 
    10.  * otherwise uninterpreted and can represent anything 
    11.  * you like. 
    12.  * @throws InterruptedException if the current thread is interrupted 
    13.  */  
    14. public final void acquireSharedInterruptibly(int arg) throws InterruptedException {  
    15.     if (Thread.interrupted()) //如果当前线程被中断,抛出中断异常。  
    16.         throw new InterruptedException();  
    17.     if (tryAcquireShared(arg) < 0) //首先调用tryAcquireShared请求方法,请求失败的话,继续调用doAcquireSharedInterruptibly方法。  
    18.         doAcquireSharedInterruptibly(arg);  
    19. }  
           继续看doAcquireSharedInterruptibly方法。
    Java代码  收藏代码
    1. /** 
    2.  * Acquires in shared interruptible mode. 
    3.  * @param arg the acquire argument 
    4.  */  
    5. private void doAcquireSharedInterruptibly(int arg)  
    6.     throws InterruptedException {  
    7.     final Node node = addWaiter(Node.SHARED);  
    8.     boolean failed = true;  
    9.     try {  
    10.         for (;;) {  
    11.             final Node p = node.predecessor();  
    12.             if (p == head) {  
    13.                 int r = tryAcquireShared(arg);  
    14.                 if (r >= 0) {  
    15.                     setHeadAndPropagate(node, r);  
    16.                     p.next = null; // help GC  
    17.                     failed = false;  
    18.                     return;  
    19.                 }  
    20.             }  
    21.             if (shouldParkAfterFailedAcquire(p, node) &&  
    22.                 parkAndCheckInterrupt())  
    23.                 throw new InterruptedException(); //区别  
    24.         }  
    25.     } finally {  
    26.         if (failed)  
    27.             cancelAcquire(node);  
    28.     }  
    29. }  
           和doAcquireShared方法基本一致,唯一区别就是没有传递线程中断状态,而是直接抛出中断异常。
     
           最后看一下共享模式下,响应中断并且支持超时的请求方法:
    Java代码  收藏代码
    1. /** 
    2.  * Attempts to acquire in shared mode, aborting if interrupted, and 
    3.  * failing if the given timeout elapses.  Implemented by first 
    4.  * checking interrupt status, then invoking at least once {@link 
    5.  * #tryAcquireShared}, returning on success.  Otherwise, the 
    6.  * thread is queued, possibly repeatedly blocking and unblocking, 
    7.  * invoking {@link #tryAcquireShared} until success or the thread 
    8.  * is interrupted or the timeout elapses. 
    9.  * 
    10.  * @param arg the acquire argument.  This value is conveyed to 
    11.  *        {@link #tryAcquireShared} but is otherwise uninterpreted 
    12.  *        and can represent anything you like. 
    13.  * @param nanosTimeout the maximum number of nanoseconds to wait 
    14.  * @return {@code true} if acquired; {@code false} if timed out 
    15.  * @throws InterruptedException if the current thread is interrupted 
    16.  */  
    17. public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {  
    18.     if (Thread.interrupted()) //如果当前线程被中断,抛出中断异常。  
    19.         throw new InterruptedException();  
    20.     return tryAcquireShared(arg) >= 0 || //首先调用tryAcquireShared请求方法,请求失败的话,继续调用doAcquireSharedNanos方法。  
    21.         doAcquireSharedNanos(arg, nanosTimeout);  
    22. }  
           看下doAcquireSharedNanos方法:
    Java代码  收藏代码
    1. /** 
    2.  * Acquires in shared timed mode. 
    3.  * 
    4.  * @param arg the acquire argument 
    5.  * @param nanosTimeout max wait time 
    6.  * @return {@code true} if acquired 
    7.  */  
    8. private boolean doAcquireSharedNanos(int arg, long nanosTimeout)  
    9.     throws InterruptedException {  
    10.     long lastTime = System.nanoTime();  
    11.     final Node node = addWaiter(Node.SHARED);  
    12.     boolean failed = true;  
    13.     try {  
    14.         for (;;) {  
    15.             final Node p = node.predecessor();  
    16.             if (p == head) {  
    17.                 int r = tryAcquireShared(arg);  
    18.                 if (r >= 0) {  
    19.                     setHeadAndPropagate(node, r);  
    20.                     p.next = null; // help GC  
    21.                     failed = false;  
    22.                     return true;  
    23.                 }  
    24.             }  
    25.             if (nanosTimeout <= 0)  
    26.                 return false;  
    27.             if (shouldParkAfterFailedAcquire(p, node) &&  
    28.                 nanosTimeout > spinForTimeoutThreshold)  
    29.                 LockSupport.parkNanos(this, nanosTimeout);  
    30.             long now = System.nanoTime();  
    31.             nanosTimeout -= now - lastTime;  
    32.             lastTime = now;  
    33.             if (Thread.interrupted())  
    34.                 throw new InterruptedException();  
    35.         }  
    36.     } finally {  
    37.         if (failed)  
    38.             cancelAcquire(node);  
    39.     }  
    40. }  
    41. static final long spinForTimeoutThreshold = 1000L;  
           和前面的doAcquireSharedInterruptibly方法类似,区别在于方法实现里面加入了超时时间的检测,如果超时方法返回false。 阻塞部分较之前也有区别,如果剩余的超时时间小于1000纳秒,方法自旋;否则当前线程阻塞一段时间(剩余超时时间时长)。方法实现里其他方法的分析可以 参考前面。
     
           请求方法都分析完毕,下面开始分析释放方法,首先看下独占模式下的释放方法:
    Java代码  收藏代码
    1. /** 
    2.  * 独占模式下的释放方法。方法实现中,如果tryRelease返回true,会唤醒 
    3.  * 一个或者多个线程。这个方法可以用来实现Lock的unlock方法。 
    4.  * 
    5.  * @param arg the release argument.  This value is conveyed to 
    6.  *        {@link #tryRelease} but is otherwise uninterpreted and 
    7.  *        can represent anything you like. 
    8.  * @return the value returned from {@link #tryRelease} 
    9.  */  
    10. public final boolean release(int arg) {  
    11.     if (tryRelease(arg)) {  
    12.         Node h = head;  
    13.         if (h != null && h.waitStatus != 0)  
    14.             unparkSuccessor(h);  
    15.         return true;  
    16.     }  
    17.     return false;  
    18. }  
           方法中首先调用tryRelease。如果调用成功,继续判断同步等待队列里是否有需要唤醒的线程,如果有,进行唤醒。
           unparkSuccessor方法之前已经分析过,这里看下tryRelease方法,该方法并没有具体实现,而是交给子类去实现:
    Java代码  收藏代码
    1. /** 
    2.  * 尝试设置(AQS的)状态,反映出独占模式下的一个释放动作。 
    3.  * 
    4.  * 这个方法在线程释放(控制权)的时候被调用。 
    5.  * 
    6.  * <p>The default implementation throws 
    7.  * {@link UnsupportedOperationException}. 
    8.  * 
    9.  * @param arg the release argument. This value is always the one 
    10.  *        passed to a release method, or the current state value upon 
    11.  *        entry to a condition wait.  The value is otherwise 
    12.  *        uninterpreted and can represent anything you like. 
    13.  * @return {@code true} if this object is now in a fully released 
    14.  *         state, so that any waiting threads may attempt to acquire; 
    15.  *         and {@code false} otherwise. 
    16.  * @throws IllegalMonitorStateException if releasing would place this 
    17.  *         synchronizer in an illegal state. This exception must be 
    18.  *         thrown in a consistent fashion for synchronization to work 
    19.  *         correctly. 
    20.  * @throws UnsupportedOperationException if exclusive mode is not supported 
    21.  */  
    22. protected boolean tryRelease(int arg) {  
    23.     throw new UnsupportedOperationException();  
    24. }  
     
     

           最后看下共享模式下的释放方法:

    Java代码  收藏代码
    1. /** 
    2.  * 共享模式下的释放方法。方法实现中,如果tryReleaseShared方法 
    3.  * 返回true,那么会唤醒一个或者多个线程。 
    4.  * 
    5.  * @param arg the release argument.  This value is conveyed to 
    6.  *        {@link #tryReleaseShared} but is otherwise uninterpreted 
    7.  *        and can represent anything you like. 
    8.  * @return the value returned from {@link #tryReleaseShared} 
    9.  */  
    10. public final boolean releaseShared(int arg) {  
    11.     if (tryReleaseShared(arg)) {  
    12.         doReleaseShared();  
    13.         return true;  
    14.     }  
    15.     return false;  
    16. }  

           doReleaseShared方法之前已经分析过,这里看下tryReleaseShared方法,该方法并没有具体实现,而是交给子类去实现:

    Java代码  收藏代码
    1. /** 
    2.  * 尝试设置(AQS的)状态,反映出共享模式下的一个释放动作。 
    3.  * 
    4.  * 这个方法在线程释放(控制权)的时候被调用。 
    5.  * 
    6.  * <p>The default implementation throws 
    7.  * {@link UnsupportedOperationException}. 
    8.  * 
    9.  * @param arg the release argument. This value is always the one 
    10.  *        passed to a release method, or the current state value upon 
    11.  *        entry to a condition wait.  The value is otherwise 
    12.  *        uninterpreted and can represent anything you like. 
    13.  * @return {@code true} if this release of shared mode may permit a 
    14.  *         waiting acquire (shared or exclusive) to succeed; and 
    15.  *         {@code false} otherwise 
    16.  * @throws IllegalMonitorStateException if releasing would place this 
    17.  *         synchronizer in an illegal state. This exception must be 
    18.  *         thrown in a consistent fashion for synchronization to work 
    19.  *         correctly. 
    20.  * @throws UnsupportedOperationException if shared mode is not supported 
    21.  */  
    22. protected boolean tryReleaseShared(int arg) {  
    23.     throw new UnsupportedOperationException();  
    24. }  

           注意一下!

           AQS开放了几个方法交由子类实现(本类中抛出UnsupportedOperationException),分别是:
           tryAcquire
           tryRelease
           tryAcquireShared
           tryReleaseShared
           isHeldExclusively
           子类(具体同步器的内部同步机制)一般只需按照具体逻辑实现这几个方法就可以,注意这个方法内部需要考虑线程安全问题。
     
            以上是AQS中最重要的两类流程的方法实现,接下来看一下AQS中提供的一些检查方法:
    Java代码  收藏代码
    1. /** 
    2.  * 查询同步等待队列中是否有线程在等待(请求控制权)。  
    3.  * 注:因为由中断和超时引起的取消随时会发生,所以此方法并不能保证  
    4.  * 结果准确。 
    5.  * 
    6.  * 方法时间复杂度为常数时间。 
    7.  * 
    8.  * @return {@code true} if there may be other threads waiting to acquire 
    9.  */  
    10. public final boolean hasQueuedThreads() {  
    11.     return head != tail;  
    12. }  
     
    Java代码  收藏代码
    1. /** 
    2.  * 查询是否有线程竞争发生,也就是说是否有请求发生过阻塞。 
    3.  * 
    4.  * 方法时间复杂度为常数时间。 
    5.  * 
    6.  * @return {@code true} if there has ever been contention 
    7.  */  
    8. public final boolean hasContended() {  
    9.     return head != null;  
    10. }  
     
    Java代码  收藏代码
    1. /** 
    2.  * 返回同步等待队列中第一个(最前面)线程,如果没有,返回空。 
    3.  * 
    4.  * 正常情况下,方法的时间复杂度为常数时间;如果发生竞争 
    5.  * 会有一些迭代过程。 
    6.  * 
    7.  * @return the first (longest-waiting) thread in the queue, or 
    8.  *         {@code null} if no threads are currently queued 
    9.  */  
    10. public final Thread getFirstQueuedThread() {  
    11.     //先简单判断一下队列中是否有线程,没有的话,直接返回null;否则,调用fullGetFirstQueuedThread方法。  
    12.     return (head == tail) ? null : fullGetFirstQueuedThread();  
    13. }  
    14. /** 
    15.  * Version of getFirstQueuedThread called when fastpath fails 
    16.  */  
    17. private Thread fullGetFirstQueuedThread() {  
    18.     /* 
    19.      * 通常情况下,头结点的next指向的就是队列里第一个节点。 
    20.      * 尝试获取第一个节点的线程域,保证读取的一致性:如果  
    21.      * 线程域为null,或者第一个节点的前驱节点已经不是头节  
    22.      * 点,那么说有其他线程正在调用setHead方法。这里尝试  
    23.      * 获取(比较)两次,如果获取失败,再进行下面的遍历。 
    24.      */  
    25.     Node h, s;  
    26.     Thread st;  
    27.     if (((h = head) != null && (s = h.next) != null &&  
    28.          s.prev == head && (st = s.thread) != null) ||  
    29.         ((h = head) != null && (s = h.next) != null &&  
    30.          s.prev == head && (st = s.thread) != null))  
    31.         return st;  
    32.     /* 
    33.      * 头结点的next域可能还没有设置,或者已经在setHead后被重置。 
    34.      * 所以我们必须验证尾节点是否是真的是第一个节点。如果不是, 
    35.      * 如果不是,从尾节点反向遍历去查找头结点,确保程序退出。 
    36.      */  
    37.     Node t = tail;  
    38.     Thread firstThread = null;  
    39.     while (t != null && t != head) {  
    40.         Thread tt = t.thread;  
    41.         if (tt != null)  
    42.             firstThread = tt;  
    43.         t = t.prev;  
    44.     }  
    45.     return firstThread;  
    46. }  
     
    Java代码  收藏代码
    1. /** 
    2.     * 判断当前线程是否在同步等待队列中。 
    3.     * 
    4.     * <p>This implementation traverses the queue to determine 
    5.     * presence of the given thread. 
    6.     * 
    7.     * @param thread the thread 
    8.     * @return {@code true} if the given thread is on the queue 
    9.     * @throws NullPointerException if the thread is null 
    10.     */  
    11.    public final boolean isQueued(Thread thread) {  
    12.        if (thread == null)  
    13.            throw new NullPointerException();  
    14.        //反向遍历同步等待队列,查找给定线程是否存在。  
    15.        for (Node p = tail; p != null; p = p.prev)  
    16.            if (p.thread == thread)  
    17.                return true;  
    18.        return false;  
    19.    }  
     
    Java代码  收藏代码
    1. /** 
    2.  * 如果同步等待队列中第一个线程是独占模式,返回true。 
    3.  * 如果这个方法返回true,并且当前线程正尝试在共享模式下请求,那么可  
    4.  * 以保证当前线程不是同步等待队列里的第一个线程。 
    5.  */  
    6. final boolean apparentlyFirstQueuedIsExclusive() {  
    7.     Node h, s;  
    8.     return (h = head) != null &&  
    9.         (s = h.next)  != null &&  
    10.         !s.isShared()         &&  
    11.         s.thread != null;  
    12. }  
     
    Java代码  收藏代码
    1. /** 
    2.  * 判断同步等待队列里面是否存在比当前线程更早的线程。 
    3.  * 
    4.  * 相当于调用如下代码: 
    5.  * getFirstQueuedThread() != Thread.currentThread() && hasQueuedThreads() 
    6.  * 
    7.  * <p>Note that because cancellations due to interrupts and 
    8.  * timeouts may occur at any time, a {@code true} return does not 
    9.  * guarantee that some other thread will acquire before the current 
    10.  * thread.  Likewise, it is possible for another thread to win a 
    11.  * race to enqueue after this method has returned {@code false}, 
    12.  * due to the queue being empty. 
    13.  * 
    14.  * 这个方法主要用来避免"插队"问题。 
    15.  * @return {@code true} if there is a queued thread preceding the 
    16.  *         current thread, and {@code false} if the current thread 
    17.  *         is at the head of the queue or the queue is empty 
    18.  * @since 1.7 
    19.  */  
    20. final boolean hasQueuedPredecessors() {  
    21.     // The correctness of this depends on head being initialized  
    22.     // before tail and on head.next being accurate if the current  
    23.     // thread is first in queue.  
    24.     Node t = tail; // Read fields in reverse initialization order  
    25.     Node h = head;  
    26.     Node s;  
    27.     return h != t &&  
    28.         ((s = h.next) == null || s.thread != Thread.currentThread());  
    29. }  
     
     

          最后看一下AQS中提供的一些支持监控功能的方法:

    Java代码  收藏代码
    1. /** 
    2.  * 获取当前同步等待队列中线程的(估计)数量。 
    3.  * 
    4.  * @return the estimated number of threads waiting to acquire 
    5.  */  
    6. public final int getQueueLength() {  
    7.     int n = 0;  
    8.     for (Node p = tail; p != null; p = p.prev) {  
    9.         if (p.thread != null)  
    10.             ++n;  
    11.     }  
    12.     return n;  
    13. }  
    Java代码  收藏代码
    1. /** 
    2.  * 获取当前正在同步等待队列中等待的线程(不精确)。 
    3.  * 
    4.  * @return the collection of threads 
    5.  */  
    6. public final Collection<Thread> getQueuedThreads() {  
    7.     ArrayList<Thread> list = new ArrayList<Thread>();  
    8.     for (Node p = tail; p != null; p = p.prev) {  
    9.         Thread t = p.thread;  
    10.         if (t != null)  
    11.             list.add(t);  
    12.     }  
    13.     return list;  
    14. }  
    Java代码  收藏代码
    1. /** 
    2.  * 获取当前正在同步等待队列中以独占模式进行等待的线程(不精确)。 
    3.  * 
    4.  * @return the collection of threads 
    5.  */  
    6. public final Collection<Thread> getExclusiveQueuedThreads() {  
    7.     ArrayList<Thread> list = new ArrayList<Thread>();  
    8.     for (Node p = tail; p != null; p = p.prev) {  
    9.         if (!p.isShared()) {  
    10.             Thread t = p.thread;  
    11.             if (t != null)  
    12.                 list.add(t);  
    13.         }  
    14.     }  
    15.     return list;  
    16. }  
    Java代码  收藏代码
    1. /** 
    2.  * 获取当前正在同步等待队列中以共享模式进行等待的线程(不精确)。 
    3.  * 
    4.  * @return the collection of threads 
    5.  */  
    6. public final Collection<Thread> getSharedQueuedThreads() {  
    7.     ArrayList<Thread> list = new ArrayList<Thread>();  
    8.     for (Node p = tail; p != null; p = p.prev) {  
    9.         if (p.isShared()) {  
    10.             Thread t = p.thread;  
    11.             if (t != null)  
    12.                 list.add(t);  
    13.         }  
    14.     }  
    15.     return list;  
    16. }  
    • 内部类ConditionObject:
           ConditionObject是AQS中提供的一种锁的基础机制,实现了接口Condition。
           Condition是一种类似于Object监视条件的一种机制,相对于Object来说,Condition能让线程在各自条件下的等待队列等待,而不是像Object一样,在同一个等待队列里面等待。
           Condition提供了await/signal/signalAll来支持与Object wait/notify/nofityAll类似的功能。
           Condition由Lock内建支持,使用起来会很方便,直接调用Lock的newCondition方法,便可以获得一个与其相关联的条件对象。
     
           Condition接口的方法定义:
    Java代码  收藏代码
    1. public interface Condition {  
    2.     void await() throws InterruptedException;  
    3.     void awaitUninterruptibly();  
    4.     long awaitNanos(long nanosTimeout) throws InterruptedException;  
    5.     boolean await(long time, TimeUnit unit) throws InterruptedException;  
    6.     boolean awaitUntil(Date deadline) throws InterruptedException;  
    7.     void signal();  
    8.     void signalAll();  
    9. }  
     
     

          接下来分析ConditionObject类中的实现,首先看下内部数据结构:

    Java代码  收藏代码
    1. public class ConditionObject implements Condition, java.io.Serializable {  
    2.     private static final long serialVersionUID = 1173984872572414699L;  
    3.     /** First node of condition queue. */  
    4.     private transient Node firstWaiter;  
    5.     /** Last node of condition queue. */  
    6.     private transient Node lastWaiter;  
    7.     /** 
    8.      * Creates a new <tt>ConditionObject</tt> instance. 
    9.      */  
    10.     public ConditionObject() { }  
    11.     ...  
    12. }  

           内部结构非常简单,也是链表结构,表示一个条件等待队列。(每个条件一个队列)

     
           像AQS一样,从等待和唤醒两条主线开始分析,先看一下支持中断的等待方法,await方法。
    Java代码  收藏代码
    1. /** 
    2.  * 可中断的条件等待方法. 
    3.  * <ol> 
    4.  * <li> If current thread is interrupted, throw InterruptedException. 
    5.  * <li> Save lock state returned by {@link #getState}. 
    6.  * <li> Invoke {@link #release} with 
    7.  *      saved state as argument, throwing 
    8.  *      IllegalMonitorStateException if it fails. 
    9.  * <li> Block until signalled or interrupted. 
    10.  * <li> Reacquire by invoking specialized version of 
    11.  *      {@link #acquire} with saved state as argument. 
    12.  * <li> If interrupted while blocked in step 4, throw InterruptedException. 
    13.  * </ol> 
    14.  */  
    15. public final void await() throws InterruptedException {  
    16.     if (Thread.interrupted()) //如果当前线程被中断,抛出InterruptedException异常。  
    17.         throw new InterruptedException();  
    18.     //将当前线程添加到条件等待队列。  
    19.     Node node = addConditionWaiter();  
    20.     //释放当前线程对AQS的控制权,并返回当前AQS中的state值。  
    21.     int savedState = fullyRelease(node);  
    22.     int interruptMode = 0;  
    23.     while (!isOnSyncQueue(node)) {  
    24.         //如果当前线程不在AQS的同步等待队列中,那么阻塞当前线程。  
    25.         LockSupport.park(this);  
    26.         //其他线程调用相同条件上的signal/signalALl方法时,会将这个节点从条件队列转义到AQS的同步等待队列中。  
    27.         //被唤醒后需要检查是否在等待过程中被中断。  
    28.         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)   
    29.             break; //如果发生了中断,退出循环。  
    30.     }  
    31.     //重新请求AQS的控制权。  
    32.     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  
    33.         interruptMode = REINTERRUPT;  
    34.     if (node.nextWaiter != null) // clean up if cancelled  
    35.         unlinkCancelledWaiters();  
    36.     if (interruptMode != 0) //如果上面发生过中断,这里处理中断。  
    37.         reportInterruptAfterWait(interruptMode);  
    38. }  
           先看下上面方法内部调用的addConditionWaiter方法:
    Java代码  收藏代码
    1. /** 
    2.  * Adds a new waiter to wait queue. 
    3.  * @return its new wait node 
    4.  */  
    5. private Node addConditionWaiter() {  
    6.     Node t = lastWaiter;  
    7.     // If lastWaiter is cancelled, clean out.  
    8.     if (t != null && t.waitStatus != Node.CONDITION) {  
    9.         unlinkCancelledWaiters();  
    10.         t = lastWaiter;  
    11.     }  
    12.     //创建一个当前线程对应的节点。  
    13.     Node node = new Node(Thread.currentThread(), Node.CONDITION);  
    14.     if (t == null) //如果是队列中第一个节点,那么将firstWaiter指向这个节点,后面也会将lastWaiter指向这个节点。  
    15.         firstWaiter = node;  
    16.     else //如果是队列中已经存在其他节点,那么将原本lastWaiter的nextWaiter指向当前节点。  
    17.         t.nextWaiter = node;  
    18.     lastWaiter = node; //最后将lastWaiter指向当前节点。  
    19.     return node; //返回当前节点。  
    20. }  
           看下方法中调用的unlinkCancelledWaiters方法:
    Java代码  收藏代码
    1. /** 
    2.  * 移除条件等待队列中的取消状态节点。这个方法一定是在持有锁  
    3.  * (拥有AQS控制权)的情况下被调用的(所以不存在竞争)。  
    4.  * 当等待条件时被(节点的线程)取消,或者当lastWaiter被取消后   
    5.  * 条件等待队列中进入了一个新节点时会调用这个方法。 
    6.  * 这个方法需要避免由于没有signal而引起的垃圾滞留。所以尽管 
    7.  * 方法内会做一个完全遍历,也只有超时获或取消时(没有signal的 
    8.  * 情况下)才被调用。方法中会遍历所有节点,切断所有指向垃圾节 
    9.  * 点的引用,而不是一次取消切断一个引用。 
    10.  */  
    11. private void unlinkCancelledWaiters() {  
    12.     //获取条件等待队列的头节点t  
    13.     Node t = firstWaiter;  
    14.     Node trail = null;  
    15.     while (t != null) {  
    16.         //如果队列中有等待节点。获取头节点的nextWaiter节点next。  
    17.         Node next = t.nextWaiter;  
    18.         if (t.waitStatus != Node.CONDITION) {  
    19.             //如果t被取消。将t的nextWaiter置空。  
    20.             t.nextWaiter = null;  
    21.             if (trail == null) //将next设置为头节点(移除之前的取消节点)  
    22.                 firstWaiter = next;  
    23.             else //否则说明队列前端有未取消的节点,这里做下拼接(移除中间的取消节点)  
    24.                 trail.nextWaiter = next;  
    25.             if (next == null)  
    26.                 lastWaiter = trail; //最后设置尾节点。  
    27.         }  
    28.         else //如果t没被取消。将trail指向t。  
    29.             trail = t;  
    30.         t = next;  
    31.     }  
    32. }  
           再继续看下await方法中调用的fullyRelease方法:
    Java代码  收藏代码
    1. /** 
    2.  * 调用release方法并传入当前的state。 
    3.  * 调用成功会返回传入release方法之前的state. 
    4.  * 失败会抛出异常,并取消当前节点。 
    5.  * @param node the condition node for this wait 
    6.  * @return previous sync state 
    7.  */  
    8. final int fullyRelease(Node node) {  
    9.     boolean failed = true;  
    10.     try {  
    11.         int savedState = getState();  
    12.         if (release(savedState)) {  
    13.             failed = false;  
    14.             return savedState;  
    15.         } else {  
    16.             throw new IllegalMonitorStateException();  
    17.         }  
    18.     } finally {  
    19.         if (failed)  
    20.             node.waitStatus = Node.CANCELLED;  
    21.     }  
    22. }  
           看下await方法中调用的isOnSyncQueue方法:
    Java代码  收藏代码
    1. /** 
    2.  * 如果一个node最初放在一个条件队列里,而现在正在AQS的同步等待队列里, 
    3.  * 返回true。 
    4.  * @param node the node 
    5.  * @return true if is reacquiring 
    6.  */  
    7. final boolean isOnSyncQueue(Node node) {  
    8.     if (node.waitStatus == Node.CONDITION || node.prev == null)  
    9.         return false;  
    10.     if (node.next != null) //如果有后继节点,说明肯定在AQS同步等待队列里。  
    11.         return true;  
    12.     /* 
    13.      * 之前的代码中分析到过,node.prev不为空并不能说明节点在AQS的  
    14.      * 同步等待队列里面,因为后续的CAS操作可能会失败,所以这里从尾节  
    15.      * 开始反向遍历。  
    16.      */  
    17.     return findNodeFromTail(node);  
    18. }  
    19. /** 
    20.  * Returns true if node is on sync queue by searching backwards from tail. 
    21.  * Called only when needed by isOnSyncQueue. 
    22.  * @return true if present 
    23.  */  
    24. private boolean findNodeFromTail(Node node) {  
    25.     Node t = tail;  
    26.     for (;;) {  
    27.         if (t == node)  
    28.             return true;  
    29.         if (t == null)  
    30.             return false;  
    31.         t = t.prev;  
    32.     }  
    33. }  
           看下await方法中调用的checkInterruptWhileWaiting方法:
    Java代码  收藏代码
    1.     /** 在等待退出时重新中断(传递中断状态) */  
    2.     private static final int REINTERRUPT =  1;  
    3.     /** 在等待退出时抛出异常 */  
    4.     private static final int THROW_IE    = -1;  
    5.     /** 
    6.      * Checks for interrupt, returning THROW_IE if interrupted 
    7.      * before signalled, REINTERRUPT if after signalled, or 
    8.      * 0 if not interrupted. 
    9.      */  
    10.     private int checkInterruptWhileWaiting(Node node) {  
    11.         return Thread.interrupted() ?  
    12.             (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :  
    13.             0;  
    14.     }  
    15. /** 
    16.  * 在取消等待后,将节点转移到同步队列中。如果线程在唤醒钱被 
    17.  * 取消,返回true。 
    18.  * @param current the waiting thread 
    19.  * @param node its node 
    20.  * @return true if cancelled before the node was signalled 
    21.  */  
    22. final boolean transferAfterCancelledWait(Node node) {  
    23.     if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {  
    24.         enq(node);  
    25.         return true;  
    26.     }  
    27.     /* 
    28.      * If we lost out to a signal(), then we can't proceed 
    29.      * until it finishes its enq().  Cancelling during an 
    30.      * incomplete transfer is both rare and transient, so just 
    31.      * spin. 
    32.      */  
    33.     while (!isOnSyncQueue(node))  
    34.         Thread.yield();  
    35.     return false;  
    36. }  
           最后看下await方法中调用的reportInterruptAfterWait方法:
    Java代码  收藏代码
    1.     /** 
    2.      * Throws InterruptedException, reinterrupts current thread, or 
    3.      * does nothing, depending on mode. 
    4.      */  
    5.     private void reportInterruptAfterWait(int interruptMode)  
    6.         throws InterruptedException {  
    7.         if (interruptMode == THROW_IE)  
    8.             throw new InterruptedException();  
    9.         else if (interruptMode == REINTERRUPT)  
    10.             selfInterrupt();  
    11.     }  
    12. /** 
    13.  * Convenience method to interrupt current thread. 
    14.  */  
    15. private static void selfInterrupt() {  
    16.     Thread.currentThread().interrupt();  
    17. }  
           总结一下await方法中的逻辑:
           1.如果当前线程有中断状态,抛出InterruptedException异常。
           2.添加当前线程到条件等待队列。
           3.释放当前线程对AQS的控制权,并保存释放前AQS的状态(state域)。
           4.进入条件循环,条件为判断当前线程是否在AQS同步队列中,如果不在那么阻塞当前线程;如果在AQS同步队列中,就到第7步。
           5.当前线程被(其他线程)唤醒后,要检查等待过程中是否被中断或者取消,如果不是,继续循环,到第4步。
           6.如果是,保存中断状态和模式,然后退出条件循环。
           7.请求AQS控制权,然后做一些收尾工作,如果被取消,清理一下条件等待队列;然后按照中断模式处理一下中断。  
           
     
           然后看一下不能中断的等待方法,awaitUninterruptibly方法:
    Java代码  收藏代码
    1. /** 
    2.  * Implements uninterruptible condition wait. 
    3.  * <ol> 
    4.  * <li> Save lock state returned by {@link #getState}. 
    5.  * <li> Invoke {@link #release} with 
    6.  *      saved state as argument, throwing 
    7.  *      IllegalMonitorStateException if it fails. 
    8.  * <li> Block until signalled. 
    9.  * <li> Reacquire by invoking specialized version of 
    10.  *      {@link #acquire} with saved state as argument. 
    11.  * </ol> 
    12.  */  
    13. public final void awaitUninterruptibly() {  
    14.     Node node = addConditionWaiter();  
    15.     int savedState = fullyRelease(node);  
    16.     boolean interrupted = false;  
    17.     while (!isOnSyncQueue(node)) {  
    18.         LockSupport.park(this);  
    19.         if (Thread.interrupted())  
    20.             interrupted = true;  
    21.     }  
    22.     if (acquireQueued(node, savedState) || interrupted)  
    23.         selfInterrupt();  
    24. }  
           awaitUninterruptibly的逻辑相对await来说更加明确,条件循环中如果线程被中断,直接退出。后续只需要传递中断状态即可。
     
           再看一下支持超时和中断的等待方法,awaitNanos和await(long time, TimeUnit unit)方法:
    Java代码  收藏代码
    1. /** 
    2.  * Implements timed condition wait. 
    3.  * <ol> 
    4.  * <li> If current thread is interrupted, throw InterruptedException. 
    5.  * <li> Save lock state returned by {@link #getState}. 
    6.  * <li> Invoke {@link #release} with 
    7.  *      saved state as argument, throwing 
    8.  *      IllegalMonitorStateException if it fails. 
    9.  * <li> Block until signalled, interrupted, or timed out. 
    10.  * <li> Reacquire by invoking specialized version of 
    11.  *      {@link #acquire} with saved state as argument. 
    12.  * <li> If interrupted while blocked in step 4, throw InterruptedException. 
    13.  * </ol> 
    14.  */  
    15. public final long awaitNanos(long nanosTimeout) throws InterruptedException {  
    16.     if (Thread.interrupted())  
    17.         throw new InterruptedException();  
    18.     Node node = addConditionWaiter();  
    19.     int savedState = fullyRelease(node);  
    20.     long lastTime = System.nanoTime();  
    21.     int interruptMode = 0;  
    22.     while (!isOnSyncQueue(node)) {  
    23.         if (nanosTimeout <= 0L) {  
    24.             transferAfterCancelledWait(node);  
    25.             break;  
    26.         }  
    27.         LockSupport.parkNanos(this, nanosTimeout);  
    28.         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)  
    29.             break;  
    30.         long now = System.nanoTime();  
    31.         nanosTimeout -= now - lastTime;  
    32.         lastTime = now;  
    33.     }  
    34.     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  
    35.         interruptMode = REINTERRUPT;  
    36.     if (node.nextWaiter != null)  
    37.         unlinkCancelledWaiters();  
    38.     if (interruptMode != 0)  
    39.         reportInterruptAfterWait(interruptMode);  
    40.     return nanosTimeout - (System.nanoTime() - lastTime);  
    41. }  
    42. /** 
    43.  * Implements timed condition wait. 
    44.  * <ol> 
    45.  * <li> If current thread is interrupted, throw InterruptedException. 
    46.  * <li> Save lock state returned by {@link #getState}. 
    47.  * <li> Invoke {@link #release} with 
    48.  *      saved state as argument, throwing 
    49.  *      IllegalMonitorStateException if it fails. 
    50.  * <li> Block until signalled, interrupted, or timed out. 
    51.  * <li> Reacquire by invoking specialized version of 
    52.  *      {@link #acquire} with saved state as argument. 
    53.  * <li> If interrupted while blocked in step 4, throw InterruptedException. 
    54.  * <li> If timed out while blocked in step 4, return false, else true. 
    55.  * </ol> 
    56.  */  
    57. public final boolean await(long time, TimeUnit unit) throws InterruptedException {  
    58.     if (unit == null)  
    59.         throw new NullPointerException();  
    60.     long nanosTimeout = unit.toNanos(time);  
    61.     if (Thread.interrupted())  
    62.         throw new InterruptedException();  
    63.     Node node = addConditionWaiter();  
    64.     int savedState = fullyRelease(node);  
    65.     long lastTime = System.nanoTime();  
    66.     boolean timedout = false;  
    67.     int interruptMode = 0;  
    68.     while (!isOnSyncQueue(node)) {  
    69.         if (nanosTimeout <= 0L) {  
    70.             timedout = transferAfterCancelledWait(node);  
    71.             break;  
    72.         }  
    73.         if (nanosTimeout >= spinForTimeoutThreshold)  
    74.             LockSupport.parkNanos(this, nanosTimeout);  
    75.         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)  
    76.             break;  
    77.         long now = System.nanoTime();  
    78.         nanosTimeout -= now - lastTime;  
    79.         lastTime = now;  
    80.     }  
    81.     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  
    82.         interruptMode = REINTERRUPT;  
    83.     if (node.nextWaiter != null)  
    84.         unlinkCancelledWaiters();  
    85.     if (interruptMode != 0)  
    86.         reportInterruptAfterWait(interruptMode);  
    87.     return !timedout;  
    88. }  
           和await相比,这两个方法只是加入了超时取消的机制。
     
           最后看一下支持限时和中断的等待方法,awaitUntil方法:
    Java代码  收藏代码
    1. /** 
    2.  * Implements absolute timed condition wait. 
    3.  * <ol> 
    4.  * <li> If current thread is interrupted, throw InterruptedException. 
    5.  * <li> Save lock state returned by {@link #getState}. 
    6.  * <li> Invoke {@link #release} with 
    7.  *      saved state as argument, throwing 
    8.  *      IllegalMonitorStateException if it fails. 
    9.  * <li> Block until signalled, interrupted, or timed out. 
    10.  * <li> Reacquire by invoking specialized version of 
    11.  *      {@link #acquire} with saved state as argument. 
    12.  * <li> If interrupted while blocked in step 4, throw InterruptedException. 
    13.  * <li> If timed out while blocked in step 4, return false, else true. 
    14.  * </ol> 
    15.  */  
    16. public final boolean awaitUntil(Date deadline) throws InterruptedException {  
    17.     if (deadline == null)  
    18.         throw new NullPointerException();  
    19.     long abstime = deadline.getTime();  
    20.     if (Thread.interrupted())  
    21.         throw new InterruptedException();  
    22.     Node node = addConditionWaiter();  
    23.     int savedState = fullyRelease(node);  
    24.     boolean timedout = false;  
    25.     int interruptMode = 0;  
    26.     while (!isOnSyncQueue(node)) {  
    27.         if (System.currentTimeMillis() > abstime) {  
    28.             timedout = transferAfterCancelledWait(node);  
    29.             break;  
    30.         }  
    31.         LockSupport.parkUntil(this, abstime);  
    32.         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)  
    33.             break;  
    34.     }  
    35.     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  
    36.         interruptMode = REINTERRUPT;  
    37.     if (node.nextWaiter != null)  
    38.         unlinkCancelledWaiters();  
    39.     if (interruptMode != 0)  
    40.         reportInterruptAfterWait(interruptMode);  
    41.     return !timedout;  
    42. }  
           和awaitNanos基本一致,只是时间检测变成了和绝对时间相比较,而不是去判断超时时间的剩余量。
     
           分析完了等待方法,再分析下唤醒方法,先看一下signal方法。
    Java代码  收藏代码
    1. /** 
    2.  * 将条件等待队列里面等待时间最长(链表最前面)的线程(如果存在的话)  
    3.  * 移动到AQS同步等待队列里面。 
    4.  * 
    5.  * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 
    6.  *         returns {@code false} 
    7.  */  
    8. public final void signal() {  
    9.     //判断AQS的控制权是否被当前线程以独占的方式持有。如果不是,抛出IllegalMonitorStateException异常。  
    10.     if (!isHeldExclusively())  
    11.         throw new IllegalMonitorStateException();  
    12.     Node first = firstWaiter;  
    13.     if (first != null) //如果有线程在条件队列里面等待,那么执行doSignal方法。  
    14.         doSignal(first);  
    15. }  
           看下doSignal方法:
    Java代码  收藏代码
    1. /** 
    2.  * Removes and transfers nodes until hit non-cancelled one or 
    3.  * null. Split out from signal in part to encourage compilers 
    4.  * to inline the case of no waiters. 
    5.  * @param first (non-null) the first node on condition queue 
    6.  */  
    7. private void doSignal(Node first) {  
    8.     do {  
    9.         //移除first  
    10.         if ( (firstWaiter = first.nextWaiter) == null)  
    11.             lastWaiter = null;  
    12.         first.nextWaiter = null;  
    13.         //然后调用transferForSignal,如果调用失败且条件等待队列不为空,继续上面过程;否则方法结束。  
    14.     } while (!transferForSignal(first) &&  
    15.              (first = firstWaiter) != null);  
    16. }  
           看下transferForSignal方法:
    Java代码  收藏代码
    1. /** 
    2.  * 将一个节点从条件等待队列转移到同步等待队列。 
    3.  * 如果成功,返回true。 
    4.  * @param node the node 
    5.  * @return true if successfully transferred (else the node was 
    6.  * cancelled before signal). 
    7.  */  
    8. final boolean transferForSignal(Node node) {  
    9.     /* 
    10.      * 如果设置等待状态失败,说明节点已经被取消了,直接返回false。 
    11.      */  
    12.     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))  
    13.         return false;  
    14.     /* 
    15.      * Splice onto queue and try to set waitStatus of predecessor to 
    16.      * indicate that thread is (probably) waiting. If cancelled or 
    17.      * attempt to set waitStatus fails, wake up to resync (in which 
    18.      * case the waitStatus can be transiently and harmlessly wrong). 
    19.      */  
    20.     //将node加入到AQS同步等待队列中,并返回node的前驱节点。  
    21.     Node p = enq(node);  
    22.     int ws = p.waitStatus;  
    23.     //如果前驱节点被取消,或者尝试设置前驱节点的状态为SIGNAL(表示node节点需要唤醒)失败,那么唤醒node节点上的线程。  
    24.     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))  
    25.         LockSupport.unpark(node.thread);  
    26.     return true;  
    27. }  
           再看一下signalAll方法,相对于signal方法,signalAll方法会将条件等待队列中全部线程都移动到AQS的同步等待队列中:
    Java代码  收藏代码
    1. /** 
    2.  * Moves all threads from the wait queue for this condition to 
    3.  * the wait queue for the owning lock. 
    4.  * 
    5.  * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 
    6.  *         returns {@code false} 
    7.  */  
    8. public final void signalAll() {  
    9.     if (!isHeldExclusively())  
    10.         throw new IllegalMonitorStateException();  
    11.     Node first = firstWaiter;  
    12.     if (first != null)  
    13.         doSignalAll(first); //与signal唯一区别是这里调用了doSignalAll方法。  
    14. }  
           继续看doSignalAll方法:
    Java代码  收藏代码
    1. /** 
    2.  * Removes and transfers all nodes. 
    3.  * @param first (non-null) the first node on condition queue 
    4.  */  
    5. private void doSignalAll(Node first) {  
    6.     //首先将条件队列的头尾节点置空  
    7.     lastWaiter = firstWaiter = null;  
    8.     do {  
    9.         Node next = first.nextWaiter;  
    10.         first.nextWaiter = null;  
    11.         //移动first指向的节点,然后将first指向下一个节点,直到最后。  
    12.         transferForSignal(first);  
    13.         first = next;  
    14.     } while (first != null);  
    15. }  
           结合之前的await小总结一下:
           await就是把当前线程放到对应条件的等待队列里面,然后阻塞当前线程。
           signal就是把对应条件的等待队里的线程移动到对应AQS的同步等待队列里面,随后线程会被唤醒。 
     
           注:await存在"伪唤醒"问题,所以被唤醒后应该再次检测等待条件:
           while(condition不满足) { conditionObject.await() }
                   
           最后看一下ConditionObject提供的一些支持监测功能的方法:
    Java代码  收藏代码
    1. /** 
    2.  * 判断当前条件是否由给定的同步器(AQS)创建。 
    3.  * 
    4.  * @return {@code true} if owned 
    5.  */  
    6. final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {  
    7.     return sync == AbstractQueuedSynchronizer.this;  
    8. }  
     
    Java代码  收藏代码
    1. /** 
    2.  * 判断当前条件队列中是否存在等待的线程。 
    3.  * 
    4.  * @return {@code true} if there are any waiting threads 
    5.  * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 
    6.  *         returns {@code false} 
    7.  */  
    8. protected final boolean hasWaiters() {  
    9.     if (!isHeldExclusively()) //前提必须是当前线程独占的持有控制权。  
    10.         throw new IllegalMonitorStateException();  
    11.     //遍历条件等待队列,查找等待线程(节点)  
    12.     for (Node w = firstWaiter; w != null; w = w.nextWaiter) {  
    13.         if (w.waitStatus == Node.CONDITION)  
    14.             return true;  
    15.     }  
    16.     return false;  
    17. }  
     
    Java代码  收藏代码
    1. /** 
    2.  * 获取当前条件等待队列中等待线程的(估计)数量。 
    3.  * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength}. 
    4.  * 
    5.  * @return the estimated number of waiting threads 
    6.  * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 
    7.  *         returns {@code false} 
    8.  */  
    9. protected final int getWaitQueueLength() {  
    10.     if (!isHeldExclusively())   
    11.         throw new IllegalMonitorStateException();  
    12.     int n = 0;  
    13.     for (Node w = firstWaiter; w != null; w = w.nextWaiter) {  
    14.         if (w.waitStatus == Node.CONDITION)  
    15.             ++n;  
    16.     }  
    17.     return n;  
    18. }  
     
    Java代码  收藏代码
    1. /** 
    2.  * 获取当前条件等待队列中的等待线程。 
    3.  * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads}. 
    4.  * 
    5.  * @return the collection of threads 
    6.  * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 
    7.  *         returns {@code false} 
    8.  */  
    9. protected final Collection<Thread> getWaitingThreads() {  
    10.     if (!isHeldExclusively())  
    11.         throw new IllegalMonitorStateException();  
    12.     ArrayList<Thread> list = new ArrayList<Thread>();  
    13.     for (Node w = firstWaiter; w != null; w = w.nextWaiter) {  
    14.         if (w.waitStatus == Node.CONDITION) {  
    15.             Thread t = w.thread;  
    16.             if (t != null)  
    17.                 list.add(t);  
    18.         }  
    19.     }  
    20.     return list;  
    21. }  
     
     
    • AQS继承了类java.util.concurrent.locks.AbstractOwnableSynchronizer,看下这个类的代码:
    Java代码  收藏代码
    1. /** 
    2.  * A synchronizer that may be exclusively owned by a thread.  This 
    3.  * class provides a basis for creating locks and related synchronizers 
    4.  * that may entail a notion of ownership.  The 
    5.  * <tt>AbstractOwnableSynchronizer</tt> class itself does not manage or 
    6.  * use this information. However, subclasses and tools may use 
    7.  * appropriately maintained values to help control and monitor access 
    8.  * and provide diagnostics. 
    9.  * 
    10.  * @since 1.6 
    11.  * @author Doug Lea 
    12.  */  
    13. public abstract class AbstractOwnableSynchronizer  
    14.     implements java.io.Serializable {  
    15.     /** Use serial ID even though all fields transient. */  
    16.     private static final long serialVersionUID = 3737899427754241961L;  
    17.     /** 
    18.      * Empty constructor for use by subclasses. 
    19.      */  
    20.     protected AbstractOwnableSynchronizer() { }  
    21.     /** 
    22.      * The current owner of exclusive mode synchronization. 
    23.      */  
    24.     private transient Thread exclusiveOwnerThread;  
    25.     /** 
    26.      * Sets the thread that currently owns exclusive access. A 
    27.      * <tt>null</tt> argument indicates that no thread owns access. 
    28.      * This method does not otherwise impose any synchronization or 
    29.      * <tt>volatile</tt> field accesses. 
    30.      */  
    31.     protected final void setExclusiveOwnerThread(Thread t) {  
    32.         exclusiveOwnerThread = t;  
    33.     }  
    34.     /** 
    35.      * Returns the thread last set by 
    36.      * <tt>setExclusiveOwnerThread</tt>, or <tt>null</tt> if never 
    37.      * set.  This method does not otherwise impose any synchronization 
    38.      * or <tt>volatile</tt> field accesses. 
    39.      * @return the owner thread 
    40.      */  
    41.     protected final Thread getExclusiveOwnerThread() {  
    42.         return exclusiveOwnerThread;  
    43.     }  
    44. }  

           这个类提供了独占模式下的同步器控制权的信息,比如Lock或者其他相关的同步器。从代码中也可以看到,可以设置和获取拥有独占控制权的线程信息。

     
    • 最后,java.util.concurrent.locks包还提供了一个 AbstractQueuedLongSynchronizer同步基础类,内部代码和AQS基本一致,唯一区别是 AbstractQueuedLongSynchronizer中管理的是一个long型的状态,需要构建使用64bit信息的同步器可以基于这个类进行 构建,用法和AQS一致,这里就不具体说明了。
  • 相关阅读:
    【高软作业4】:Tomcat 观察者模式解析 之 Lifecycle
    Eclipse 导入 Tomcat 源码
    【高软作业3】:原型化系统 DevTools
    Java Obejct
    Java PriorityQueue
    【高软作业2】:Java IDE调研分析
    GitHub fork 合作开发 快速实现版
    用C#实现天气预报(调用WebService)
    hover和点击事件之间的冲突
    BurpSuite2021系列(三)新建扫描
  • 原文地址:https://www.cnblogs.com/wxgblogs/p/5806737.html
Copyright © 2011-2022 走看看