前言 AQS(Abstract Queued Synchronizer)是JUC并发包中的核心基础组件,作者是大名鼎鼎的Doug Lea。通过AQS可以实现大部分的同步需求。
宏观架构 AQS包括一个state和一个FIFO的CLH队列,如下图所示: CLH队列中的每个节点Node就可以对应与争用该资源的线程,Node的数据结构如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 static final class { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared () { return nextWaiter == SHARED; } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } Node(Thread thread, int waitStatus) { this .waitStatus = waitStatus; this .thread = thread; } } ``` ### 获取锁的过程 以使用默认构造函数的reentrantLock为例子: ```java ReentrantLock reentrantLock = new ReentrantLock(); reentrantLock.lock();
lock()代码如下:
1 2 3 4 5 6 7 final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); }
如果当前的state值为0,当前线程获得lock,将state的值通过cas的方式设置为1。如果不是0,则添加到队列中。通过acquire方法去申请资源。
1 2 3 4 5 6 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
再次获取锁尝试失败后,调用addWaiter将线程封装成节点信息,加入到等待队列中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { 大专栏 AQS总结 pred.next = node; return node; } } enq(node); return node; }
addWaiter首先会通过cas的方式快速的去添加到队列的尾部,如果添加不成功,调enq(node)再次入队;enq(node)是一个死循环,不断的通过cas去添加到节点,直到成功。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
线程节点进入队列后,调用acquireQueued,acquireQueuedxiang
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
最关键的应该是shouldParkAfterFailedAcquire方法,如果每个线程都在这么自旋的去拿锁,cpu肯定炸了。所以,当当前的前一个节点处于SIGNAL状态的时候,可以挂起当前线程。这个操作就好比在排队的时候和前一个人说:我出去买点吃的,你轮到的时候叫我一下。当前面的节点轮到的时候,会唤醒当前线程,然后又开始自旋,判断自己能否拿到同步状态,如果拿到,就获取到了锁,这就是一个完整的获得同步状态的过程。 至于如何挂起当前线程,使用的是LockSupport的park()挂起当前线程。park可以精确的进行挂起,精确到thread。
释放锁的过程
释放的过程正好相反,通过release来释放锁。
1 2 3 4 5 6 7 8 9 10 11 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
tryRelease的内容主要是:获取state的值,减去要释放的值,如果state已经是0,把当前的线程设置为null。要注意的是,这里完全没有使用cas,因为当前线程还持有锁,绝对的线程安全。
1 2 3 4 5 6 7 8 9 10 11 12 protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
最为关键的 unparkSuccessor(h),这个时候头节点已经处于了获取同步的状态,通过unparkSuccessor(h)来唤醒头节点的后一个节点。从而后一个节点就可以自旋的去获取同步状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
总结 个人认为AQS在很多地方使用cas和自旋的方式,一定程度上提升吞吐率,之前看到过测试ReentrantLock的吞吐比synchronized要高很多,不对synchronized一直在优化,估计现在性能也差不多了,以后做个测试。本文只是总结了AQS的独占式的获取同步状态,还有共享式的获取同步状态,还支持很多的特性,将在后面进行总结。