1. 概述
AQS(AbstractQueuedSynchronizer)提供了原子式管理同步状态/阻塞和唤醒线程功能以及队列模型的简单框架, 许多同步类实现都依赖于它,
如常用的ReentrantLock/Semaphore/CountDownLatch等.
2. 框架
它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列). AQS 中的队列是CLH(Craig, Landin and Hagersten) 单向链表的变体--虚拟双向队列(FIFO).
双向链表中, 第一个节点为虚节点, 其实并不存储任何信息, 只是占位. 真正的第一个有数据的节点, 是在第二个节点开始的.
AQS定义两种资源共享方式: Exclusive(独占, 只有一个线程能执行, 如:ReentrantLock)和Share(共享, 多个线程可同时执行, 如:Semaphore/CountDownLatch). 不同的自定义同步器争用共享资源的方式也不同. 自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可, 至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等)AQS已经在顶层实现好了.
以ReentrantLock为例, state初始化为0, 表示未锁定状态. A线程lock()时, 会调用tryAcquire()独占该锁并将state+1. 此后, 其他线程再tryAcquire()时就会失败, 直到 A 线程unlock()到state=0(即释放锁)为止, 其它线程才有机会获取该锁.当然, 释放锁之前, A 线程自己是可以重复获取此锁的(state会累加), 这就是可重入的概念. 但要注意, 获取多少次就要释放多么次,这样才能保证state是能回到零态的.
再以CountDownLatch以例, 任务分为 N 个子线程去执行, state也初始化为 N(注意N要与线程个数一致). 这N个子线程是并行执行的,每个子线程执行完后会countDown()一次, state会CAS减1, 等到所有子线程都执行完后(即state=0), 会unpark()主调用线程, 然后主调用线程就会从await()函数返回, 继续后续动作.
一般来说, 自定义同步器要么是独占方法, 要么是共享方式, 他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可. 但AQS也支持自定义同步器同时实现独占和共享两种方式, 如ReentrantReadWriteLock.
3. 源码详解
3.1 Node
Node是对每一个等待获取资源的线程的封装, 其包含了需要同步的线程本身及其等待状态, 如是否被阻塞、是否等待唤醒、是否已经被取消等.
3.1.1 Node的方法和属性值
- waitStatus: 当前节点在队列中的状态
- thread: 表示处于该节点的线程
- prev: 前驱指针
- predecessor(): 返回前驱节点, 没有的话抛出npe
- nextWaiter: 指向下一个处于 CONDITION 状态的节点
- next: 后继指针
3.1.2 waitStatus
- CANCELLED(1): 表示当前结点已取消调度. 当timeout或被中断(响应中断的情况下), 会触发变更为此状态, 进入该状态后的结点将不会再变化.
- SIGNAL(-1): 表示后继结点在等待当前结点唤醒. 后继结点入队时, 会将前继结点的状态更新为SIGNAL.
- CONDITION(-2): 表示结点等待在Condition上, 当其他线程调用了Condition的signal()方法后, CONDITION状态的结点将从等待队列转移到同步队列中, 等待获取同步锁.
- PROPAGATE(-3): 共享模式下, 前继结点不仅会唤醒其后继结点, 同时也可能会唤醒后继的后继结点.
- 0: 新结点入队时的默认状态.
3.2 获取资源
3.2.1 acquire(int)
此方法是独占模式下线程获取共享资源的顶层入口. 如果获取到资源, 线程直接返回, 否则进入等待队列, 直到获取到资源为止, 且整个过程忽略中断的影响.
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
函数流程如下:
- tryAcquire()尝试直接去获取资源, 如果成功则直接返回(这里体现了非公平锁, 每个线程获取锁时会尝试直接抢占加塞一次, 而CLH队列中可能还有别的线程在等待);
- addWaiter()将该线程加入等待队列的尾部, 并标记为独占模式;
- acquireQueued()使线程阻塞在等待队列中获取资源, 一直获取到资源后才返回. 如果在整个等待过程中被中断过, 则返回true, 否则返回false.
- 如果线程在等待过程中被中断过, 它是不响应的. 只是获取资源后才再进行自我中断selfInterrupt(), 将中断补上.
3.2.2 tryAcquire(int)
此方法尝试去获取独占资源. 如果获取成功, 则直接返回true, 否则直接返回false. AQS只是一个框架, 具体资源的获取/释放方式交由自定义同步器去实现.
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
这里之所以没有定义成abstract, 是因为独占模式下只用实现tryAcquire-tryRelease, 而共享模式下只用实现tryAcquireShared-tryReleaseShared. 如果都定义成abstract, 那么每个模式也要去实现另一模式下的接口. 说到底Doug Lea还是站在开发者的角度, 尽量减少不必要的工作量.
3.2.3 addWaiter(Node)
此方法用于将当前线程加入到等待队列的队尾, 并返回当前线程所在的结点.
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 尝试快速加入队尾, 失败则使用 enq 把节点加入队尾 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
3.2.4 enq(Node)
此方法用于将node加入队尾.
private Node enq(final Node node) { for (;;) { // 自旋, 直到成功加入队尾 Node t = tail; if (t == null) { // 队列为空, 初始化队列 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { // 把当前节点加入队尾 t.next = node; return t; } } } }
3.2.5 acquireQueued(Node, int)
通过tryAcquire()和addWaiter(), 该线程获取资源失败, 已经被放入等待队列尾部了. 线程下一步进入等待状态, 直到其他线程彻底释放资源后唤醒它.
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; // 标记是否成功获取资源 try { boolean interrupted = false; // 标记等待过程中是否被中断过 for (;;) { // 自旋 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // 前驱节点为 head, 则可以尝试获取资源. setHead(node); // 将 head 指向该节点 p.next = null; // 切断之前的 head 与链表的关联, help for GC. failed = false; return interrupted; } // 判断在获取资源失败后是否进入 waiting 状态, 直到被 unpark(). if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
3.2.6 shouldParkAfterFailedAcquire(Node, Node)
此方法主要用于检查前驱节点状态, 从而决定当前线程是否可以进入 waiting 状态.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 拿到前驱节点的状态 if (ws == Node.SIGNAL) // 已经设置了前驱节点释放资源后通知自己一下 return true; if (ws > 0) { // 前驱节点已经放弃获取资源, 那就一直往前找, 直到找到最近一个正常等待的节点, 并排在它的后边. do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 如果前驱节点正常, 那就把前驱的状态设置成 SIGNAL, 让它释放资源后通知自己一下. compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
3.2.7 parkAndCheckInterrupt()
此方法就是让线程去休息, 真正进入等待状态. park()会让当前线程进入waiting状态. 在此状态下, 有两种途径可以唤醒该线程: 被unpark()或被interrupt().
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
3.3 释放资源
3.3.1 release(int)
此方法是独占模式释放资源
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
3.3.2 tryRelease(int)
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
3.3.3 unparkSuccessor(Node)
唤醒等待队列中的下一个线程
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 从队尾开始找一直到队首, 找到队列最靠前需要唤醒的节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
参考文档: