1.概述
AQS( AbstractQueuedSynchronizer ) 是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效地构造出来。如: ReentrantLock 和 Semaphore都是基于AQS构建的,还包括CountDownLatch、ReentrantReadWriteLock、SynchronousQueue和FutureTask。
AQS解决了在实现同步器时涉及的大量细节问题,例如等待线程采用FIFO队列操作顺序。在不同的同步器中还可以定义一些灵活的标准来判断某个线程是应该通过还是需要等待。本文通过介绍AQS相关API以及juc包下相关实现类对其进行介绍
2.原理
使用volatile的整形变量state用来表示当前是否可以获取锁,如在某些非共享锁里,state=1 则代表当前锁已经被占有,此时如果有线程来请求锁时,则会进入AQS里维持的CLH队列(FIFO)里,且使用自旋的方式不断的尝试获取锁。自定义的同步器在实现时,只需要定义state变量获取与释放的规则,其他细节在AQS里已经实现。
3.独占模式
acquire(int arg) 独占锁获取state资源时调用的方法。
- tryAcquire(子类实现)获取资源,如果获取成功则返回true,不成功返回false,进入下一步
- addWaiter(...),实例化一个排他节点,入队,返回该实例的节点
- 调用acqireQueue(..) 方法 自旋方式获取资源,成功则false,失败返回false。false代表中断的响应,如果在队列里是不会响应中断的,获取资源时才响应。
1 public final void acquire(int arg) {
2 if (!tryAcquire(arg) &&
3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4 selfInterrupt();
5 }
tryAcquire(int arg) 需要实现类去实现的一个独占方式获取资源时的方法,成功返回true,失败为false(简而言之,通过state状态,判断当前资源是否可用)
1 protected boolean tryAcquire(int arg) {
2 throw new UnsupportedOperationException();
3 }
addWaiter(Node mode) 获取资源失败后,将当前线程和模式封装到Node节点,并添加到队列里
1 private Node addWaiter(Node mode) {
//实例化Node节点
2 Node node = new Node(Thread.currentThread(), mode);
3 // Try the fast path of enq; backup to full enq on failure
// 尾插法快速插入到链表末尾
4 Node pred = tail;
5 if (pred != null) {
6 node.prev = pred;
7 if (compareAndSetTail(pred, node)) {
8 pred.next = node;
9 return node;
10 }
11 }
// 如果尾部节点为空,则初始化一次,再使用尾插法插入
12 enq(node);
13 return node;
14 }
acquireQueued(final Node node, int arg) 该方法主要功能是通过自旋不断尝试获取资源,在这个过程中,一旦没有获取资源成功则 park 当前线程进入等待唤醒的状态,同时检查当前线程是否中断,如果被设置了中断的标记,则 将interrupted 设置为true,在获取资源成功的时候,去响应中断
1 final boolean acquireQueued(final Node node, int arg) {
2 boolean failed = true;
3 try {
4 //判断当前节点是否被通知中断
5 boolean interrupted = false;
6 //自旋方式,不断尝试获取资源
7 for (;;) {
8 final Node p = node.predecessor();
9 //如果上一个节点是Head节点,则尝试获取资源
10 if (p == head && tryAcquire(arg)) {
11 //获取资源成功,则将当前节点设为Head节点
12 setHead(node);
13 //当前节点获取资源成功,那么之前的Head节点需要释放,因此将之前Head相关引用设为NULL,帮助GC回收
14 p.next = null; // help GC
15 failed = false;
16 //返回中断过程是否中断过
17 return interrupted;
18 }
19 //递归判断当前节点的上一个节点的waitStatus是否有效
20 //park 当前Node节点的线程(wait当前线程),检查是否中断
21 if (shouldParkAfterFailedAcquire(p, node) &&
22 parkAndCheckInterrupt())
23 interrupted = true;
24 }
25 } finally {
26 if (failed)
27 cancelAcquire(node);
28 }
29 }
shouldParkAfterFailedAcquire(Node pred, Node node) 此方法主要用于检查waitStatus的状态,看看自己是否真的可以去休息了(进入waiting状态),万一队列前边的线程都放弃了,则丢弃引用,让GC回收。
Node节点中维持着waitStatus这个字段代表的含义如下:
/** waitStatus value to indicate thread has cancelled */
//值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
//值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,
//就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
//值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()
// 方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
//值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
static final int PROPAGATE = -3;
//0 代表为初始状态
1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 2 //判断前一个节点的waitStatus 是否小于0,如果大于0,则该节点已经被取消 3 int ws = pred.waitStatus; 4 //如果为等待通知,则表示等待unpark(等待唤醒) 5 if (ws == Node.SIGNAL) 6 /* 7 * This node has already set status asking a release 8 * to signal it, so it can safely park. 9 */ 10 return true; 11 //如果上一个节点被取消掉(大于0)则跳过,循环的找上一节点为未取消的(小于0) 12 if (ws > 0) { 13 /* 14 * Predecessor was cancelled. Skip over predecessors and 15 * indicate retry. 16 */ 17 do { 18 node.prev = pred = pred.prev; 19 } while (pred.waitStatus > 0); 20 pred.next = node; 21 } else { 22 /* 23 * waitStatus must be 0 or PROPAGATE. Indicate that we 24 * need a signal, but don't park yet. Caller will need to 25 * retry to make sure it cannot acquire before parking. 26 */ 27 //如果当前节点的waitStatus 小于0,但不等于SIGNAL,则设置为SIGNAL 28 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 29 } 30 return false; 31 }
parkAndCheckInterrupt() 让线程去休息,真正进入等待状态。
1 private final boolean parkAndCheckInterrupt() {
2 //调用park()使线程进入waiting状态
3 LockSupport.park(this);
4 //如果被唤醒,查看自己是不是被中断的
5 return Thread.interrupted();
6 }
park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。
独占模式的大致流程知道了后,用流程图来总结下(贴上源码)
1 public final void acquire(int arg) {
2 if (!tryAcquire(arg) &&
3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4 selfInterrupt();
5 }
- 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
- 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
- acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
release(int arg) 独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义,当然不仅仅只限于unlock()。下面是release()的源码:
public final boolean release(int arg) {
//子类实现释放资源,释放成功则返回true,失败则false
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//unpart 下一个等待的节点
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease(int arg) 需要子类实现的获取资源成功的方法,用作父类根据子类释放是否成功作为判断依据
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
unparkSuccessor(Node node) 唤醒队列中的下一个未取消的线程
1 private void unparkSuccessor(Node node) {
2 int ws = node.waitStatus;
3 if (ws < 0) //设置当前线程的waitStatus为0
4 compareAndSetWaitStatus(node, ws, 0);
5 Node s = node.next; //找到head节点的下一个需要唤醒的节点,如果下一个节点被取消了(>0),则从tail往前找
6 if (s == null || s.waitStatus > 0) {
7 s = null;
8 for (Node t = tail; t != null && t != node; t = t.prev)
9 if (t.waitStatus <= 0)
10 s = t;
11 }
12 //unpart 唤醒线程
13 if (s != null)
14 LockSupport.unpark(s.thread);
15 }
4.共享模式
acquireShared(int) 此方法是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。下面是acquireShared()的源码:
public final void acquireShared(int arg) { //子类实现获取共享资源,成功则返回大于0的整数 if (tryAcquireShared(arg) < 0) //获取资源失败,进入队列等待 doAcquireShared(arg); }
doAcquireShared(int) 此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。下面是doAcquireShared()的源码:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED);//加入队列尾部 boolean failed = true;//是否成功标志 try { boolean interrupted = false;//等待过程中是否被中断过的标志 for (;;) { final Node p = node.predecessor();//前驱 if (p == head) {//如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的 int r = tryAcquireShared(arg);//尝试获取资源 if (r >= 0) {//成功 setHeadAndPropagate(node, r);//将head指向自己,还有剩余资源可以再唤醒之后的线程 p.next = null; // help GC if (interrupted)//如果等待过程中被打断过,此时将中断补上。 selfInterrupt(); failed = false; return; } } //判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt() if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
与acquireQueued大致相同。只不过这里将补中断的selfInterrupt()放到doAcquireShared()里了,而独占模式是放到acquireQueued()之外。
setHeadAndPropagate(Node, int) 此方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式!
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node);//head指向自己 //如果还有剩余量,继续唤醒下一个邻居线程 if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
小结
OK,至此,acquireShared()也要告一段落了。让我们再梳理一下它的流程:
- tryAcquireShared()尝试获取资源,成功则直接返回;
- 失败则通过doAcquireShared()进入等待队列park(),直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。
其实跟acquire()的流程大同小异,只不过多了个自己拿到资源后,还会去唤醒后继队友的操作(这才是共享嘛)。
releaseShared() 上一小节已经把acquireShared()说完了,这一小节就来讲讲它的反操作releaseShared()吧。此方法是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。下面是releaseShared()的源码:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {//尝试释放资源 doReleaseShared();//唤醒后继结点 return true; } return false; }
此方法的流程也比较简单,一句话:释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定tryReleaseShared()的返回值。
doReleaseShared() 此方法主要用于唤醒后继。下面是它的源码:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h);//唤醒后继 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head)// head发生变化 break; } }