AQS(AbstractQueuedSynchronizer)顾名思义,抽象的队列同步器。
它是JUC中大部分同步工具的基础。如:CountDownLatch/FutureTask/ReentrantLock/RenntrantReadWriteLock/Semaphore。
它更是一个模板模式的典型应用。接下来就主要阐述这个模板的主要流程。
首先从结构来简单介绍一下AQS,代码如下
class aqs{ // 表示线程对资源的占有状态 int state; // 线程悬停队列的头指针与尾指针 Node head; Node tail; // 双向链表 static class Node{ int waitStatus; /* -1(SIGNAL): 表示下一个节点的线程需要被释放 1(CANCELLED): 当前节点的线程已被标志为退出状态 -2(CONDITION): 用于条件队列 -3(PROPAGATE): 共享状态的中间状态 0(None): None of the above */ Node prev; Node next; Thread thread; } }
刚刚就有提及AQS就是模板模式实现的。而模板模式的核心则是流程的制定。而aqs的主流程:
Acquire: while (!tryAcquire(arg)) { enqueue thread if it is not already queued; possibly block current thread; } Release: if (tryRelease(arg)) unblock the first queued thread;
上述代码就包含了流程中最重要的4个方法:
- acquire 循环调用tryAcquire, 失败则阻塞, 并进入线程队列
- tryAcquire 试图获取资源(通过设定aqs的state属性)
- release 释放资源, 释放成功则唤醒线程队列中的第一个节点线程
- tryRelease 试图释放资源(与tryAcquire相反)
我们通过非公平锁的简单实现来看一下整个流程的细节:
class UnfairLock { private final Sync sync = new Sync(); void lock() { sync.lock(); } void unlock() { sync.release(1); } /** 通过继承aqs实现一个同步工具, 通常只需实现tryAcquire与tryRelease, (在独占模式下, 其实是必须实现. 独占模式等下会再详说) */ static class Sync extends AbstractQueuedSynchronizer { void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } /** * 试图获取锁资源(即将sync的state属性设置为非0) */ final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 第一次获取锁 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 锁重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 获取失败 return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 释放资源成功 free = true; setExclusiveOwnerThread(null); } setState(c); return free; } } }
从上面的实现便可以看到AQS的acquire与release方法的重要性了。我们再来看看它们的具体实现。
acquire的实现:
public final void acquire(int arg) { if (!tryAcquire(arg) && // 试图获取资源失败 /** addWaiter方法, 生成一个新的线程节点, 并加入线程队列. acquireQueued, 核心方法, (自旋等待, 阻塞都在这个方法实现) 并返回该线程是否在等待时有被中断. */ acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果该线程在等待时被中断, 传递中断信号 selfInterrupt(); } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { /** 自旋等待, 直到首节点被通知(WaitStatus必须为SIGNAL), 并唤醒第二节点的线程. (这里就可以看到SIGNAL的作用: 表示下一节点的线程可以安全的被park, 因为该节点已经处于等待信号状态了, 这个等待信号指的是unparkSuccessor(node)的调用) 被唤醒后, 该节点则被置为首节点. */ final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } /** shouldParkAfterFailedAcquire 如果前置节点(p)已经被设置为SIGNAL信号, 则表示该线程可以被悬停, 则通过parkAndCheckInterrupt()中的LockSupport.lock()实现悬停 否则设置一次前置节点(p)为SIGNAL, 再次循环验证 */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
release的实现:
public final boolean release(int arg) { if (tryRelease(arg)) { // 试图释放资源 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 通知头结点 return true; } return false; } private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
了解了acquire和release的实现, aqs模板的流程就已经十分清晰了。总结来说: 请求线程通过acquire去获取资源,获取成功则占有资源并返回,失败则进入线程队列进行悬停等待。直到持有资源的线程通过release方法通知线程队列。
然后就是对队列的维护工作了。注意:所有维护工作都是通过CAS(Compare and Swap)实现的。这是JUC的核心。
但是这个其实说的是独占模式下的流程。aqs还有个共享模式,原理是类似的, 区别只有一些细节上的实现。
/** acquire */ public final void acquireShared(int arg) { /** 共享模式资源可能会被几个线程同时占有, aqs的state属性在这表示剩余可被占有的资源的数量 因此如果可被占有的少于0,则会进入悬停等待队列 */ if (tryAcquireShared(arg) < 0) /** 类似之前的acquireQueued的, 循环通过tryAcquireShared去获取资源, 并悬停等待 */ doAcquireShared(arg); } private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); // 以共享模式进入线程队列 boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); // 再次试探是否有剩余资源 if (r >= 0) { setHeadAndPropagate(node, r); // 设置头结点,并传递通知其他共享对象 p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } /** release */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
aqs还有一个很巧妙的实现: ConditionObject。它是作为内部类实现的。
/**
最重要的两个方法, 主流程
独占模式与共享模式
Condition
*/