zoukankan      html  css  js  c++  java
  • AQS框架源码分析-AbstractQueuedSynchronizer

    前言:AQS框架在J.U.C中的地位不言而喻,可以说没有AQS就没有J.U.C包,可见其重要性,因此有必要对其原理进行详细深入的理解。

    1.AQS是什么

    在深入AQS之前,首先我们要搞清楚什么是AQS。AQS全称是AbstractQueuedSynchronizer,我们直接查看AQS源码的注释。

    大致意思就是说:AQS提供了实现阻塞锁和相关同步器并依赖先进先出(FIFO)等待队列的框架。

    AQS依赖一个原子数值作为锁的状态,子类可以有多个状态值,只能通过原子方法区操作该值,从而保证同步。

    通过第一段的注释大致总结下AQS是什么:

    ①AQS是一个同步的基础框架,基于一个先进先出的队列

    ②锁机制依赖一个原子值的状态。

    ③AQS的子类负责定义与操作这个状态值,但必须通过AQS提供的原子操作。

    ④AQS剩余的方法就是围绕队列,与线程阻塞唤醒等功能。

    2.重要成员变量

    AQS中有两个重要的成员变量:Node和ConditionObject。

    ①Node的作用是存储获取锁失败的线程,并且维护一个CLH FIFO队列,该队列是会被多线程操作的,所以Node中大部分变量都是被volatile修饰,并且通过自旋和CAS进行原子性的操作。CLH的数据结构如下:

    Node有一个模式的属性:独占模式共享模式,独占模式下资源是线程独占的,共享模式下,资源是可以被多个线程占用的。

    Node源码如下:

      1 static final class Node {
      2         /** Marker to indicate a node is waiting in shared mode */
      3         static final Node SHARED = new Node();  // 共享模式
      4         /** Marker to indicate a node is waiting in exclusive mode */
      5         static final Node EXCLUSIVE = null;  // 独占模式
      6 
      7         /** waitStatus value to indicate thread has cancelled */
      8         static final int CANCELLED =  1;  // 表明线程已处于结束状态(被取消)
      9         /** waitStatus value to indicate successor's thread needs unparking */
     10         static final int SIGNAL    = -1; // 表明线程需要被唤醒
     11         /** waitStatus value to indicate thread is waiting on condition */
     12         static final int CONDITION = -2; // 表明线程正处于条件队列上,等待某一条件
     13         /**
     14          * waitStatus value to indicate the next acquireShared should
     15          * unconditionally propagate
     16          */
     17         static final int PROPAGATE = -3; // 共享模式下同步状态会被传播
     18 
     19         /**
     20          * Status field, taking on only the values:
     21          *   SIGNAL:     The successor of this node is (or will soon be)
     22          *               blocked (via park), so the current node must
     23          *               unpark its successor when it releases or
     24          *               cancels. To avoid races, acquire methods must
     25          *               first indicate they need a signal,
     26          *               then retry the atomic acquire, and then,
     27          *               on failure, block.
     28          *   CANCELLED:  This node is cancelled due to timeout or interrupt.
     29          *               Nodes never leave this state. In particular,
     30          *               a thread with cancelled node never again blocks.
     31          *   CONDITION:  This node is currently on a condition queue.
     32          *               It will not be used as a sync queue node
     33          *               until transferred, at which time the status
     34          *               will be set to 0. (Use of this value here has
     35          *               nothing to do with the other uses of the
     36          *               field, but simplifies mechanics.)
     37          *   PROPAGATE:  A releaseShared should be propagated to other
     38          *               nodes. This is set (for head node only) in
     39          *               doReleaseShared to ensure propagation
     40          *               continues, even if other operations have
     41          *               since intervened.
     42          *   0:          None of the above
     43          *
     44          * The values are arranged numerically to simplify use.
     45          * Non-negative values mean that a node doesn't need to
     46          * signal. So, most code doesn't need to check for particular
     47          * values, just for sign.
     48          *
     49          * The field is initialized to 0 for normal sync nodes, and
     50          * CONDITION for condition nodes.  It is modified using CAS
     51          * (or when possible, unconditional volatile writes).
     52          */
     53         volatile int waitStatus;
     54 
     55         /**
     56          * Link to predecessor node that current node/thread relies on
     57          * for checking waitStatus. Assigned during enqueuing, and nulled
     58          * out (for sake of GC) only upon dequeuing.  Also, upon
     59          * cancellation of a predecessor, we short-circuit while
     60          * finding a non-cancelled one, which will always exist
     61          * because the head node is never cancelled: A node becomes
     62          * head only as a result of successful acquire. A
     63          * cancelled thread never succeeds in acquiring, and a thread only
     64          * cancels itself, not any other node.
     65          */
     66         volatile Node prev;
     67 
     68         /**
     69          * Link to the successor node that the current node/thread
     70          * unparks upon release. Assigned during enqueuing, adjusted
     71          * when bypassing cancelled predecessors, and nulled out (for
     72          * sake of GC) when dequeued.  The enq operation does not
     73          * assign next field of a predecessor until after attachment,
     74          * so seeing a null next field does not necessarily mean that
     75          * node is at end of queue. However, if a next field appears
     76          * to be null, we can scan prev's from the tail to
     77          * double-check.  The next field of cancelled nodes is set to
     78          * point to the node itself instead of null, to make life
     79          * easier for isOnSyncQueue.
     80          */
     81         volatile Node next;
     82 
     83         /**
     84          * The thread that enqueued this node.  Initialized on
     85          * construction and nulled out after use.
     86          */
     87         volatile Thread thread;
     88 
     89         /**
     90          * Link to next node waiting on condition, or the special
     91          * value SHARED.  Because condition queues are accessed only
     92          * when holding in exclusive mode, we just need a simple
     93          * linked queue to hold nodes while they are waiting on
     94          * conditions. They are then transferred to the queue to
     95          * re-acquire. And because conditions can only be exclusive,
     96          * we save a field by using special value to indicate shared
     97          * mode.
     98          */
     99         Node nextWaiter;
    100 
    101         /**
    102          * Returns true if node is waiting in shared mode.
    103          */
    104         final boolean isShared() {
    105             return nextWaiter == SHARED;
    106         }
    107 
    108         /**
    109          * Returns previous node, or throws NullPointerException if null.
    110          * Use when predecessor cannot be null.  The null check could
    111          * be elided, but is present to help the VM.
    112          *
    113          * @return the predecessor of this node
    114          */
    115         final Node predecessor() throws NullPointerException {
    116             Node p = prev;
    117             if (p == null)
    118                 throw new NullPointerException();
    119             else
    120                 return p;
    121         }
    122 
    123         Node() {    // Used to establish initial head or SHARED marker
    124         }
    125         // 线程加入等待结点
    126         Node(Thread thread, Node mode) {     // Used by addWaiter
    127             this.nextWaiter = mode;
    128             this.thread = thread;
    129         }
    130         // 线程加入条件对列,会带上线程的状态值waitStatus
    131         Node(Thread thread, int waitStatus) { // Used by Condition
    132             this.waitStatus = waitStatus;
    133             this.thread = thread;
    134         }
    135     }

    ②ConditionObject:条件队列,这个类的作用从AQS的注释上可知。

    该类主要是为了让子类实现独占模式。AQS框架下独占模式的获取资源、释放等操作到最后都是基于这个类实现的。只有在独占模式下才会去使用该类。

    ConditionObject源码如下(对主要代码进行了注释):

      1 public class ConditionObject implements Condition, java.io.Serializable {
      2         private static final long serialVersionUID = 1173984872572414699L;
      3         /** First node of condition queue. */
      4         private transient Node firstWaiter;  // 存储条件对列中第一个节点
      5         /** Last node of condition queue. */
      6         private transient Node lastWaiter; // 存储条件对列中最后一个节点
      7 
      8         /**
      9          * Creates a new {@code ConditionObject} instance.
     10          */
     11         public ConditionObject() { }
     12 
     13         // Internal methods
     14 
     15         /**
     16          * Adds a new waiter to wait queue.  // 增加一个新的节点到等待队列中
     17          * @return its new wait node
     18          */
     19         private Node addConditionWaiter() {
     20             Node t = lastWaiter;
     21             // 如果最后一个节点的状态已经结束,则直接清理掉
     22             // If lastWaiter is cancelled, clean out.
     23             if (t != null && t.waitStatus != Node.CONDITION) {
     24                // 拆分已经处于结束状态的节点 也就是清除掉这类节点
     25                unlinkCancelledWaiters();
     26                t = lastWaiter;
     27             }
     28             // 创建一个新的节点,带上结点状态,表明结点处于条件对列上
     29             Node node = new Node(Thread.currentThread(), Node.CONDITION);
     30             /**
     31              条件队列中加入节点都是从队尾加入,并且从下面代码可知,每次都会存储最后一个节点的值。
     32              当最后一个节点为空时,说明队列中不存在节点,所以将node赋值给第一个节点,否则将节点加入对列尾
     33              */
     34             if (t == null)
     35                 firstWaiter = node;
     36             else
     37                 t.nextWaiter = node;
     38             lastWaiter = node;  // 存储最后一个节点的值
     39             return node;
     40         }
     41 
     42         /**
     43          * 唤醒节点
     44          * 移除和转换节点直到节点状态处于未结束或者为空 (节点移除相当于唤醒)
     45          * Removes and transfers nodes until hit non-cancelled one or
     46          * null. Split out from signal in part to encourage compilers
     47          * to inline the case of no waiters.
     48          * @param first (non-null) the first node on condition queue
     49          */
     50         private void doSignal(Node first) {
     51             do {
     52                 // 当next节点为null时,则将lastWaiter赋值为null
     53                 if ( (firstWaiter = first.nextWaiter) == null)
     54                     lastWaiter = null;
     55                 first.nextWaiter = null; // 切断当前节点
     56             } while (!transferForSignal(first) &&
     57                      (first = firstWaiter) != null);
     58         }
     59 
     60         /**
     61          * 唤醒所有节点
     62          * Removes and transfers all nodes.
     63          * @param first (non-null) the first node on condition queue
     64          */
     65         private void doSignalAll(Node first) {
     66             lastWaiter = firstWaiter = null;
     67             do {
     68                 // 循环唤醒所有节点,代码还是比较容易理解
     69                 // 将每个节点直接截断即可
     70                 Node next = first.nextWaiter;
     71                 first.nextWaiter = null;
     72                 transferForSignal(first);
     73                 first = next;
     74             } while (first != null);
     75         }
     76 
     77         /**
     78          * Unlinks cancelled waiter nodes from condition queue.
     79          * Called only while holding lock. This is called when
     80          * cancellation occurred during condition wait, and upon
     81          * insertion of a new waiter when lastWaiter is seen to have
     82          * been cancelled. This method is needed to avoid garbage
     83          * retention in the absence of signals. So even though it may
     84          * require a full traversal, it comes into play only when
     85          * timeouts or cancellations occur in the absence of
     86          * signals. It traverses all nodes rather than stopping at a
     87          * particular target to unlink all pointers to garbage nodes
     88          * without requiring many re-traversals during cancellation
     89          * storms.
     90          */
     91         private void unlinkCancelledWaiters() { // 删除处于结束状态的节点
     92             Node t = firstWaiter;
     93             Node trail = null;
     94             // 第一个节点为空,直接返回
     95             // 这里会遍历所有节点
     96             while (t != null) {
     97                 Node next = t.nextWaiter; // 记录下一个节点的值
     98                 // 当节点状态不为CONDITION
     99                 if (t.waitStatus != Node.CONDITION) {
    100                     // 首先将当前节点的下一个节点赋值为空,切断当前节点链路
    101                     t.nextWaiter = null;
    102                     // 如果追踪节点为空的时候,则存储第一个节点的值为next,因为当前节点状态不为CONDITION需要清理
    103                     if (trail == null)
    104                         firstWaiter = next;
    105                     else  // 在追踪节点串联下一个节点,主要是为了存储最后一个节点的值
    106                         trail.nextWaiter = next;
    107                     if (next == null)  // 当next为空时,则存储trail为最后一个节点,将最后一个节点值存储下来
    108                         lastWaiter = trail;
    109                 }
    110                 else  // 当节点状态为CONDITION时,将该节点赋值给trail
    111                     trail = t;
    112                 t = next; // 将next赋值给t,继续遍历
    113             }
    114         }
    115 
    116         // public methods
    117 
    118         /**
    119          * 唤醒等待时间最长的节点,使其拥有锁
    120          * Moves the longest-waiting thread, if one exists, from the
    121          * wait queue for this condition to the wait queue for the
    122          * owning lock.
    123          *
    124          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
    125          *         returns {@code false}
    126          */
    127         public final void signal() {
    128             // 如果线程不是独占资源,则抛出异常,从这里也说明ConditionObject只能用在独占模式中
    129             if (!isHeldExclusively())
    130                 throw new IllegalMonitorStateException();
    131             Node first = firstWaiter;
    132             if (first != null)
    133                 doSignal(first);
    134         }
    135 
    136         /**
    137          * 唤醒所有等待节点
    138          * Moves all threads from the wait queue for this condition to
    139          * the wait queue for the owning lock.
    140          *
    141          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
    142          *         returns {@code false}
    143          */
    144         public final void signalAll() {
    145             if (!isHeldExclusively())
    146                 throw new IllegalMonitorStateException();
    147             Node first = firstWaiter;
    148             if (first != null)
    149                 doSignalAll(first);
    150         }
    151 
    152         /**
    153          * 节点不间断等待
    154          * Implements uninterruptible condition wait.
    155          * <ol>
    156          * <li> Save lock state returned by {@link #getState}.
    157          * <li> Invoke {@link #release} with saved state as argument,
    158          *      throwing IllegalMonitorStateException if it fails.
    159          * <li> Block until signalled.
    160          * <li> Reacquire by invoking specialized version of
    161          *      {@link #acquire} with saved state as argument.
    162          * </ol>
    163          */
    164         public final void awaitUninterruptibly() {
    165             Node node = addConditionWaiter();
    166             int savedState = fullyRelease(node);
    167             boolean interrupted = false;
    168             while (!isOnSyncQueue(node)) {
    169                 LockSupport.park(this);
    170                 if (Thread.interrupted())
    171                     interrupted = true;
    172             }
    173             if (acquireQueued(node, savedState) || interrupted)
    174                 selfInterrupt();
    175         }
    176         
    177         /*
    178          * For interruptible waits, we need to track whether to throw
    179          * InterruptedException, if interrupted while blocked on
    180          * condition, versus reinterrupt current thread, if
    181          * interrupted while blocked waiting to re-acquire.
    182          */
    183 
    184         /** Mode meaning to reinterrupt on exit from wait */
    185         private static final int REINTERRUPT =  1;
    186         /** Mode meaning to throw InterruptedException on exit from wait */
    187         private static final int THROW_IE    = -1;
    188 
    189         /**
    190          * Checks for interrupt, returning THROW_IE if interrupted
    191          * before signalled, REINTERRUPT if after signalled, or
    192          * 0 if not interrupted.
    193          */
    194         private int checkInterruptWhileWaiting(Node node) {
    195             return Thread.interrupted() ?
    196                 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
    197                 0;
    198         }
    199 
    200         /**
    201          * Throws InterruptedException, reinterrupts current thread, or
    202          * does nothing, depending on mode.
    203          */
    204         private void reportInterruptAfterWait(int interruptMode)
    205             throws InterruptedException {
    206             if (interruptMode == THROW_IE)
    207                 throw new InterruptedException();
    208             else if (interruptMode == REINTERRUPT)
    209                 selfInterrupt();
    210         }
    211 
    212         /**
    213          * Implements interruptible condition wait.
    214          * <ol>
    215          * <li> If current thread is interrupted, throw InterruptedException.
    216          * <li> Save lock state returned by {@link #getState}.
    217          * <li> Invoke {@link #release} with saved state as argument,
    218          *      throwing IllegalMonitorStateException if it fails.
    219          * <li> Block until signalled or interrupted.
    220          * <li> Reacquire by invoking specialized version of
    221          *      {@link #acquire} with saved state as argument.
    222          * <li> If interrupted while blocked in step 4, throw InterruptedException.
    223          * </ol>
    224          */
    225         public final void await() throws InterruptedException {
    226             if (Thread.interrupted())
    227                 throw new InterruptedException();
    228             Node node = addConditionWaiter();
    229             int savedState = fullyRelease(node);
    230             int interruptMode = 0;
    231             while (!isOnSyncQueue(node)) {
    232                 LockSupport.park(this);
    233                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    234                     break;
    235             }
    236             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    237                 interruptMode = REINTERRUPT;
    238             if (node.nextWaiter != null) // clean up if cancelled
    239                 unlinkCancelledWaiters();
    240             if (interruptMode != 0)
    241                 reportInterruptAfterWait(interruptMode);
    242         }
    243 
    244         /**
    245          * Implements timed condition wait.
    246          * <ol>
    247          * <li> If current thread is interrupted, throw InterruptedException.
    248          * <li> Save lock state returned by {@link #getState}.
    249          * <li> Invoke {@link #release} with saved state as argument,
    250          *      throwing IllegalMonitorStateException if it fails.
    251          * <li> Block until signalled, interrupted, or timed out.
    252          * <li> Reacquire by invoking specialized version of
    253          *      {@link #acquire} with saved state as argument.
    254          * <li> If interrupted while blocked in step 4, throw InterruptedException.
    255          * </ol>
    256          */
    257         public final long awaitNanos(long nanosTimeout)
    258                 throws InterruptedException {
    259             if (Thread.interrupted())
    260                 throw new InterruptedException();
    261             Node node = addConditionWaiter();
    262             int savedState = fullyRelease(node);
    263             final long deadline = System.nanoTime() + nanosTimeout;
    264             int interruptMode = 0;
    265             while (!isOnSyncQueue(node)) {
    266                 if (nanosTimeout <= 0L) {
    267                     transferAfterCancelledWait(node);
    268                     break;
    269                 }
    270                 if (nanosTimeout >= spinForTimeoutThreshold)
    271                     LockSupport.parkNanos(this, nanosTimeout);
    272                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    273                     break;
    274                 nanosTimeout = deadline - System.nanoTime();
    275             }
    276             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    277                 interruptMode = REINTERRUPT;
    278             if (node.nextWaiter != null)
    279                 unlinkCancelledWaiters();
    280             if (interruptMode != 0)
    281                 reportInterruptAfterWait(interruptMode);
    282             return deadline - System.nanoTime();
    283         }
    284 
    285         /**
    286          * Implements absolute timed condition wait.
    287          * <ol>
    288          * <li> If current thread is interrupted, throw InterruptedException.
    289          * <li> Save lock state returned by {@link #getState}.
    290          * <li> Invoke {@link #release} with saved state as argument,
    291          *      throwing IllegalMonitorStateException if it fails.
    292          * <li> Block until signalled, interrupted, or timed out.
    293          * <li> Reacquire by invoking specialized version of
    294          *      {@link #acquire} with saved state as argument.
    295          * <li> If interrupted while blocked in step 4, throw InterruptedException.
    296          * <li> If timed out while blocked in step 4, return false, else true.
    297          * </ol>
    298          */
    299         public final boolean awaitUntil(Date deadline)
    300                 throws InterruptedException {
    301             long abstime = deadline.getTime();
    302             if (Thread.interrupted())
    303                 throw new InterruptedException();
    304             Node node = addConditionWaiter();
    305             int savedState = fullyRelease(node);
    306             boolean timedout = false;
    307             int interruptMode = 0;
    308             while (!isOnSyncQueue(node)) {
    309                 if (System.currentTimeMillis() > abstime) {
    310                     timedout = transferAfterCancelledWait(node);
    311                     break;
    312                 }
    313                 LockSupport.parkUntil(this, abstime);
    314                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    315                     break;
    316             }
    317             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    318                 interruptMode = REINTERRUPT;
    319             if (node.nextWaiter != null)
    320                 unlinkCancelledWaiters();
    321             if (interruptMode != 0)
    322                 reportInterruptAfterWait(interruptMode);
    323             return !timedout;
    324         }
    325 
    326         /**
    327          * Implements timed condition wait.
    328          * <ol>
    329          * <li> If current thread is interrupted, throw InterruptedException.
    330          * <li> Save lock state returned by {@link #getState}.
    331          * <li> Invoke {@link #release} with saved state as argument,
    332          *      throwing IllegalMonitorStateException if it fails.
    333          * <li> Block until signalled, interrupted, or timed out.
    334          * <li> Reacquire by invoking specialized version of
    335          *      {@link #acquire} with saved state as argument.
    336          * <li> If interrupted while blocked in step 4, throw InterruptedException.
    337          * <li> If timed out while blocked in step 4, return false, else true.
    338          * </ol>
    339          */
    340         public final boolean await(long time, TimeUnit unit)
    341                 throws InterruptedException {
    342             long nanosTimeout = unit.toNanos(time);
    343             if (Thread.interrupted())
    344                 throw new InterruptedException();
    345             Node node = addConditionWaiter();
    346             int savedState = fullyRelease(node);
    347             final long deadline = System.nanoTime() + nanosTimeout;
    348             boolean timedout = false;
    349             int interruptMode = 0;
    350             while (!isOnSyncQueue(node)) {
    351                 if (nanosTimeout <= 0L) {
    352                     timedout = transferAfterCancelledWait(node);
    353                     break;
    354                 }
    355                 if (nanosTimeout >= spinForTimeoutThreshold)
    356                     LockSupport.parkNanos(this, nanosTimeout);
    357                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    358                     break;
    359                 nanosTimeout = deadline - System.nanoTime();
    360             }
    361             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    362                 interruptMode = REINTERRUPT;
    363             if (node.nextWaiter != null)
    364                 unlinkCancelledWaiters();
    365             if (interruptMode != 0)
    366                 reportInterruptAfterWait(interruptMode);
    367             return !timedout;
    368         }
    369 
    370         //  support for instrumentation
    371 
    372         /**
    373          * Returns true if this condition was created by the given
    374          * synchronization object.
    375          *
    376          * @return {@code true} if owned
    377          */
    378         final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
    379             return sync == AbstractQueuedSynchronizer.this;
    380         }
    381 
    382         /**
    383          * Queries whether any threads are waiting on this condition.
    384          * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
    385          *
    386          * @return {@code true} if there are any waiting threads
    387          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
    388          *         returns {@code false}
    389          */
    390         protected final boolean hasWaiters() {
    391             if (!isHeldExclusively())
    392                 throw new IllegalMonitorStateException();
    393             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
    394                 if (w.waitStatus == Node.CONDITION)
    395                     return true;
    396             }
    397             return false;
    398         }
    399 
    400         /**
    401          * Returns an estimate of the number of threads waiting on
    402          * this condition.
    403          * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
    404          *
    405          * @return the estimated number of waiting threads
    406          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
    407          *         returns {@code false}
    408          */
    409         protected final int getWaitQueueLength() {
    410             if (!isHeldExclusively())
    411                 throw new IllegalMonitorStateException();
    412             int n = 0;
    413             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
    414                 if (w.waitStatus == Node.CONDITION)
    415                     ++n;
    416             }
    417             return n;
    418         }
    419 
    420         /**
    421          * Returns a collection containing those threads that may be
    422          * waiting on this Condition.
    423          * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
    424          *
    425          * @return the collection of threads
    426          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
    427          *         returns {@code false}
    428          */
    429         protected final Collection<Thread> getWaitingThreads() {
    430             if (!isHeldExclusively())
    431                 throw new IllegalMonitorStateException();
    432             ArrayList<Thread> list = new ArrayList<Thread>();
    433             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
    434                 if (w.waitStatus == Node.CONDITION) {
    435                     Thread t = w.thread;
    436                     if (t != null)
    437                         list.add(t);
    438                 }
    439             }
    440             return list;
    441         }
    442     }
    View Code

    3.AQS成员函数

    由于AQS分独占模式和共享模式,因此这里按独占、共享模式的顺序对AQS的成员函数进行分析。

    ①acquire(int arg)

    独占模式下获取资源,如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,整个过程忽略中断。源码如下:

     1  /**
     2      * Acquires in exclusive mode, ignoring interrupts.  Implemented
     3      * by invoking at least once {@link #tryAcquire},
     4      * returning on success.  Otherwise the thread is queued, possibly
     5      * repeatedly blocking and unblocking, invoking {@link
     6      * #tryAcquire} until success.  This method can be used
     7      * to implement method {@link Lock#lock}.
     8      *
     9      * @param arg the acquire argument.  This value is conveyed to
    10      *        {@link #tryAcquire} but is otherwise uninterpreted and
    11      *        can represent anything you like.
    12      */
    13     public final void acquire(int arg) {
    14         if (!tryAcquire(arg) &&
    15             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    16             selfInterrupt();
    17     }

    该函数执行流程:

    A.如果tryAcquire()成功获取资源,则直接返回。

    B.直接获取资源失败,则通过addWaiter()将线程加入队列尾,并标记为独占模式。

    C.通过acquireQueued()让线程在等待队列中获取资源,通过自旋方式,一直获取到后才返回。如果在等待过程中被中断过,则返回true,否则返回false。

    D.如果线程在等待获取资源的过程中被中断,只有在获取到资源后才会去响应,执行selfInterrupt进行自我中断。

    #1.tryAcquire(int)

    该方法是在独占模式下获取资源,成功-ture,失败-false。

    1  protected boolean tryAcquire(int arg) {
    2         throw new UnsupportedOperationException();
    3     }

    直接调用该方法会抛出异常,因为AQS只是一个框架,只是定义该接口,具体实现需在子类中实现。

    #2.addWaiter(Node mode)

    将当前线程加入等待队列的队尾,并返回当前线程所在的节点。

     1 private Node addWaiter(Node mode) {
     2        // 创建节点,以独占模式       
     3        Node node = new Node(Thread.currentThread(), mode);
     4         // Try the fast path of enq; backup to full enq on failure
     5         // 尝试将节点快速放入队尾
     6         Node pred = tail;
     7         if (pred != null) {
     8             node.prev = pred;
     9             // 主要通过CAS入队尾
    10             if (compareAndSetTail(pred, node)) {
    11                 pred.next = node;
    12                 return node;
    13             }
    14         }
    15         // 如果快速入队尾失败,则通过enq方式入对尾
    16         enq(node);
    17         return node;
    18     }

    CAS操作后面讨论,这里先看enq(final Node node)入队尾操作。

     1 private Node enq(final Node node) {
     2         // 这里是CAS的“自旋”操作,直到将节点成功加入队尾
     3         for (;;) {
     4             Node t = tail;
     5             // 因为每次入队都是从队尾加入,当队尾为null,则表明队列为null,则需初始化头结点
     6             // 并将尾节点也指向头节点
     7             if (t == null) { // Must initialize   
     8                 if (compareAndSetHead(new Node()))
     9                     tail = head;
    10             } else {  // 通过CAS入队尾,自旋操作
    11                 node.prev = t;
    12                 if (compareAndSetTail(t, node)) {
    13                     t.next = node;
    14                     return t;
    15                 }
    16             }
    17         }
    18     }

    在线程入队尾后,就需要acquireQueued函数了,该函数的作用是让线程拿到资源,当然还是通过自旋的方式来拿资源,也是就是一个排队的过程。

     1 final boolean acquireQueued(final Node node, int arg) {
     2         boolean failed = true; // 标记是否成功拿到资源
     3         try {
     4             boolean interrupted = false; // 标记在等待过程中是否被中断过
     5             // 自旋操作
     6             for (;;) {
     7                 final Node p = node.predecessor(); // 拿到当前节点的前向节点
     8                 // 如果前向节点为head,则表明当前节点排在第二位了,已经得到获取资源的资格
     9                 if (p == head && tryAcquire(arg)) {
    10                     // 成功拿到资源后,将head节点指向当前节点
    11                     // 从这里可以看出,head节点就是当前获取到锁的节点
    12                     setHead(node); 
    13                     // 将原来head节点的next设置为null,方便GC回收以前的head节点,也就意味着之前拿到锁的节点出队列了
    14                     p.next = null; // help GC
    15                     failed = false;
    16                     return interrupted;  // 返回在排队过程中线程是否被中断过
    17                 }
    18                 // 到这里,表明线程处于等待状态,自旋直到被unpark
    19                 if (shouldParkAfterFailedAcquire(p, node) &&
    20                     parkAndCheckInterrupt())
    21                     interrupted = true;
    22             }
    23         } finally {
    24             if (failed) // 获取资源失败,则将节点标记为结束状态
    25                 cancelAcquire(node);
    26         }
    27     }

    在线程排队等待的过程中,有两个关键函数shouldParkAfterFailedAcquire(Node pred, Node node)和parkAndCheckInterrupt()。

     1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     2         int ws = pred.waitStatus; // 前驱节点的状态
     3         if (ws == Node.SIGNAL)
     4            // 如果前驱节点正处于被唤醒的状态,则正常排队等待即可
     5             /*
     6              * This node has already set status asking a release
     7              * to signal it, so it can safely park.
     8              */
     9             return true;
    10         if (ws > 0) {  // 前驱节点处于结束状态
    11             /*
    12              * Predecessor was cancelled. Skip over predecessors and
    13              * indicate retry.
    14              */
    15             /*
    16              *继续向下找,一直找到处于正常等待状态的节点,将当前节点插入其后,其他
    17              *无用节点形成一个链,会被GC
    18              */
    19             do {
    20                 node.prev = pred = pred.prev;
    21             } while (pred.waitStatus > 0);
    22             pred.next = node;
    23         } else {
    24             /*
    25              * waitStatus must be 0 or PROPAGATE.  Indicate that we
    26              * need a signal, but don't park yet.  Caller will need to
    27              * retry to make sure it cannot acquire before parking.
    28              */
    29             // 前驱节点状态正常,则把前驱节点的状态设置为SIGNAL,这样前驱节点拿到资源后,可通知下当前节点
    30             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    31         }
    32         return false;
    33     }

    分析以上源码可知:只有当前驱节点的状态为SIGNAL时,当前节点才能正常排队等待,否则需找到一个合适的节点next位置来进行排队等待。

    1   private final boolean parkAndCheckInterrupt() {
    2         // 使线程进入waitting状态
    3         LockSupport.park(this);
    4         return Thread.interrupted(); // 返回线程是否被中断过
    5     }

    该函数作用:当节点正常进入排队后,让线程进入等待状态。

    至此acquireQueued()函数总结完成,该函数的具体执行流程:

    #1.首先检查节点是否可以立即获取资源。

    #2.如果不能立即获取资源,则进行排队,这里需要找到正确的排队点,直到unpark或interrupt唤醒自己。

    #3.唤醒后,判断自己是否有资格获取资源,如果拿到资源,则将head指向当前节点,并返回在等待过程是否被中断过,如果没拿到资源,则继续流程2。

    acquire小结

    到这里acquire(int)函数分析结束,这个函数非常重要,这里再贴上源码:

    1   public final void acquire(int arg) {
    2         if (!tryAcquire(arg) &&
    3             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    4             selfInterrupt();
    5     }

    #1.调用子类的tryAcquire直接获取资源,如果成功则返回。

    #2.如果流程1失败,则将线程加入等待队列的队尾(独占模式)。

    #3.在acquireQueued中排队,通过自旋获取资源,直到获取资源才返回。如果在排队过程中线程被中断过返回true,否则返回false。

    #4.在排队过程中被中断是不响应的,只有获取到资源后,才进行自我中断,补上中断标记。

    整个过程的流程图如下:

    ②release(int)独占模式释放资源。

     1  public final boolean release(int arg) {
     2         // 尝试释放资源
     3         if (tryRelease(arg)) {
     4             Node h = head; 
     5             if (h != null && h.waitStatus != 0)
     6                 unparkSuccessor(h); // 唤醒队列中下一个线程
     7             return true;
     8         }
     9         return false;
    10     }

    释放锁的函数很简单,通过tryRelease尝试释放资源,然后唤醒队列中的其他线程。

    tryRelease(int):

    1    protected boolean tryRelease(int arg) {
    2         throw new UnsupportedOperationException();
    3     }

    与tryAcquire函数一样,该方法需要子类去实现,如果直接调用会抛异常。

    unparkSuccessor(Node node):

    唤醒等待队列中的下一个线程,这里唤醒的是等待队列中最前边那个未放弃的线程,注意看代码注释。

     1  private void unparkSuccessor(Node node) {
     2         /*
     3          * If status is negative (i.e., possibly needing signal) try
     4          * to clear in anticipation of signalling.  It is OK if this
     5          * fails or if status is changed by waiting thread.
     6          */
     7         int ws = node.waitStatus; // 获取当前线程的状态
     8         if (ws < 0) // 如果当前线程状态处于可用状态,则直接将状态值置0
     9             compareAndSetWaitStatus(node, ws, 0);
    10 
    11         /*
    12          * Thread to unpark is held in successor, which is normally
    13          * just the next node.  But if cancelled or apparently null,
    14          * traverse backwards from tail to find the actual
    15          * non-cancelled successor.
    16          */
    17         Node s = node.next; // 下一个节点
    18         if (s == null || s.waitStatus > 0) {  // 如果节点为null或节点已处于结束状态
    19             s = null;
    20             // 从队列尾向前遍历,找到next可用的节点,状态小于0就可用,这里的节点是队列中最前边的可用节点
    21             for (Node t = tail; t != null && t != node; t = t.prev)
    22                 if (t.waitStatus <= 0)  
    23                     s = t;
    24         }
    25         if (s != null)
    26             LockSupport.unpark(s.thread);// 唤醒next线程
    27     }

    独占模式的主要函数分析完毕,接下来看共享模式。

    acquireShared(int)

    共享模式下获取资源,如果成功则直接返回,否则进入等待队列,通过自旋直到获取资源为止。

    1 public final void acquireShared(int arg) {
    2         // 共享模式下获取资源,如果获取失败,则进入等待队列
    3         // 同样该函数需要子类去实现
    4         if (tryAcquireShared(arg) < 0)
    5             doAcquireShared(arg);  // 进入等待队列直到锁获取到为止
    6     }

    tryAcquireShared(int)函数返回值,需要注意下:

    负数:表示获取失败;

    0:获取成功,但没有剩余资源;

    正数:获取成功,且有剩余资源;

    #1.doAcquireShared(int)

    将线程加入队列尾,然后通过自旋获取资源,直到得到资源才返回。

     1  private void doAcquireShared(int arg) {
     2         final Node node = addWaiter(Node.SHARED); // 将线程加入队尾,通过共享模式
     3         boolean failed = true;// 是否成功
     4         try {
     5             boolean interrupted = false; // 在自旋过程中是否被中断过
     6             for (;;) {
     7                 final Node p = node.predecessor(); // 前驱节点
     8                 if (p == head) {  // 这里表明当前节点处于head的next位,此时node被唤醒,很可能是head用完来唤醒
     9                     int r = tryAcquireShared(arg); // 获取资源
    10                     if (r >= 0) { // 成功
    11                         setHeadAndPropagate(node, r);// 将head指向自己,还有剩余资源可用的话再唤醒之后的线程
    12                         p.next = null; // help GC 无用链,帮助GC
    13                         if (interrupted)  // 如果等待过程中被中断过,将中断补上
    14                             selfInterrupt();
    15                         failed = false;
    16                         return;
    17                     }
    18                 }
    19                 // 线程未排在head之后,继续排队,进入waiting状态,等着unpark
    20                 if (shouldParkAfterFailedAcquire(p, node) &&
    21                     parkAndCheckInterrupt())
    22                     interrupted = true;  // 中断标记
    23             }
    24         } finally {
    25             if (failed)
    26                 cancelAcquire(node);
    27         }
    28     }

    整个流程与独占模式的acquireQueued很相似,只是共享模式下,在唤醒自己后,如果还有剩余资源,需要唤醒后续节点。

    setHeadAndPropagate(node, int)

    将head节点设置为当前节点,如果还有剩余资源,则唤醒下一个线程。

     1  private void setHeadAndPropagate(Node node, int propagate) {
     2         Node h = head; // Record old head for check below
     3         setHead(node); // 将队列中的head执行当前节点
     4         /*
     5          * Try to signal next queued node if:
     6          *   Propagation was indicated by caller,
     7          *     or was recorded (as h.waitStatus either before
     8          *     or after setHead) by a previous operation
     9          *     (note: this uses sign-check of waitStatus because
    10          *      PROPAGATE status may transition to SIGNAL.)
    11          * and
    12          *   The next node is waiting in shared mode,
    13          *     or we don't know, because it appears null
    14          *
    15          * The conservatism in both of these checks may cause
    16          * unnecessary wake-ups, but only when there are multiple
    17          * racing acquires/releases, so most need signals now or soon
    18          * anyway.
    19          */
    20          // 如果还有剩余资源,则唤醒后续线程
    21         if (propagate > 0 || h == null || h.waitStatus < 0 ||
    22             (h = head) == null || h.waitStatus < 0) {
    23             Node s = node.next;
    24             if (s == null || s.isShared())
    25                 doReleaseShared();
    26         }
    27     }

    这里除了将head设置成当前线程,如果有剩余资源,需要唤醒后续节点。

    doReleaseShared()

     1  private void doReleaseShared() {
     2         /*
     3          * Ensure that a release propagates, even if there are other
     4          * in-progress acquires/releases.  This proceeds in the usual
     5          * way of trying to unparkSuccessor of head if it needs
     6          * signal. But if it does not, status is set to PROPAGATE to
     7          * ensure that upon release, propagation continues.
     8          * Additionally, we must loop in case a new node is added
     9          * while we are doing this. Also, unlike other uses of
    10          * unparkSuccessor, we need to know if CAS to reset status
    11          * fails, if so rechecking.
    12          */
    13         // 自旋操作
    14         for (;;) {
    15             Node h = head;
    16             if (h != null && h != tail) {
    17                 int ws = h.waitStatus;
    18                 if (ws == Node.SIGNAL) { // 如果head状态为SIGNAL,则需唤醒后续节点
    19                     // CAS一下当前节点的状态,判断是否为SIGNAL,如果是则置为0,否则继续循环
    20                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    21                         continue;            // loop to recheck cases
    22                     unparkSuccessor(h); // 唤醒后继节点
    23                 }
    24                 // 如果head节点状态为0,且CAS置为传播状态失败,则继续循环,因为if操作中会改变节点的状态
    25                 else if (ws == 0 &&
    26                          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    27                     continue;                // loop on failed CAS
    28             }
    29             if (h == head) // 如果head节点发生了改变,则继续自旋操作,防止上述操作过程中添加了节点的情况                  // loop if head changed
    30                 break;
    31         }
    32     }

    该方法的作用主要是用于唤醒后续节点。

    共享模式获取锁操作与独占模式基本相同:先直接获取资源,如果成功,直接返回;如果失败,则将线程加入等待队列尾,直到获取到资源才返回,整个过程忽略中断。不同点在于共享模式下自己拿到资源后,还需要唤醒后续节点。

    #2.releaseShared(int)

    同享模式下释放资源

    1 public final boolean releaseShared(int arg) {
    2         if (tryReleaseShared(arg)) { // 尝试释放资源
    3             doReleaseShared(); // 唤醒后续节点,前面已经分析
    4             return true;
    5         }
    6         return false;
    7     }

    共享模式释放资源与独占模式类似,但是独占模式下需要完全释放资源后,才会返回true,而共享模式没有这种要求。

    总结

    这里只是对AQS的顶层框架进行了简要的分析,具体需要深入其子类中去,AQS的子类按模式分类可聚合成以下几类:

    #1.独占模式:

    ReentrantLock:可重入锁。state=0独占锁,或者同一线程可多次获取锁(获取+1,释放-1)。
    Worker(java.util.concurrent.ThreadPoolExecutor类中的内部类)线程池类。shutdown关闭空闲工作线程,中断worker工作线程是独占的,互斥的。

    #2.共享模式:
    Semaphore:信号量。 控制同时有多少个线程可以进入代码段。(互斥锁的拓展)
    CountDownLatch:倒计时器。  初始化一个值,多线程减少这个值,直到为0,倒计时完毕,执行后续代码。

    #3.独占+共享模式:
    ReentrantReadWriteLock:可重入读写锁。独占写+共享读,即并发读,互斥写。

    后续对这些类进行详细分析。


    by Shawn Chen,2019.1.29日,下午。

  • 相关阅读:
    物体也能正常移动
    同时按住两个键
    连续子数组的最大和Java实现
    Entity Framework基础01
    MVC知识进阶01
    面向对象基础进阶03
    面向对象基础进阶02
    面向对象基础进阶01
    little skill---ping
    SqlServer------范式小结
  • 原文地址:https://www.cnblogs.com/developer_chan/p/10324196.html
Copyright © 2011-2022 走看看