AbstractQueuedSynchronizer
AbstractQueuedSynchronizer
同步器,是JDK同步工具的基础框架,实现核心为对state
属性进行自旋、CAS原子更新,内部存在两个队列——同步队列、条件队列(·Condition·)。同步队列:所有尝试获取锁的线程会在该队列上进行排队,排队时线程通过LockSupport.park()
方法阻塞,通过LockSupport.unpark()
唤醒;条件队列:当线程需要的某个条件不满足时,通过调用LockSupport.park()
方法阻塞进入条件队列,当条件满足时,通过调用LockSupport.unpark()
方法唤醒线程,并从条件队列移除,进入到同步队列。AQS提供了排他、共享两种模式,默认排他模式。在排他模式下,系统可以获取较高吞吐量,但是可能出现较早进来的线程一直无法获得锁,造成线程饥饿;共享模式下,系统可以保证不出现线程饥饿,但是吞吐量会下降。
队列节点内部类Node
static final class Node {
// 共享模式
static final Node SHARED = new Node();
// 排他模式
static final Node EXCLUSIVE = null;
// 节点已被取消(因为超时或者打断)
static final int CANCELLED = 1;
// 等待被唤醒(当活动节点被释放或者被取消,需要唤醒它的后续节点)
static final int SIGNAL = -1;
// 在等待条件出现(节点在条件队列上等待)
static final int CONDITION = -2;
// 下一个对 acquireShared 的调用应该无条件传播
static final int PROPAGATE = -3;
// 同步队列节点状态,可取值(CANCELLED/SIGNAL/CONDITION/PROPAGATE),初始值=0
volatile int waitStatus;
// 队列前驱
volatile Node prev;
// 队列后驱
volatile Node next;
// 当前节点线程
volatile Thread thread;
// 条件队列的下一个界定、或者共享模式下值为SHARED
Node nextWaiter;
/**
* 创建同步节点
* @param thread 当前线程
* @param mode 共享模式 or 排他模式
*/
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
/**
* 创建条件队列节点
* @param thread 当前线程
* @param waitStatus 等待状态
*/
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
关键属性
// 同步队列的头节点,通过setHead方法更新
private transient volatile Node head;
// 同步队列的尾节点,当节点入队时被更新
private transient volatile Node tail;
// 同步状态
private volatile int state;
// 自旋时间控制
static final long spinForTimeoutThreshold = 1000L;
// 获取Unsafe对象CAS操作关键属性,保证原子性
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
获取锁或释放锁
该类为同步中获取锁和释放锁提供了模板方法,子类只需要根据自己的需要重写部分方法,即可实现同步机制。
/**
* 排他模式下尝试获取锁。
* 该方法一般被acquire方法调用,如果获取失败,返回false,当前线程将执行入队操作,如果当前线程不在同 * 步队列。进入同步队列后需要等待其他线程执行release操作后将其唤醒才能出队。
*
* @param arg 通常传递1,如果节点是刚从条件队列唤醒,那么传递阻塞之前的state值(如重入次数)
* @return 获取成功,返回true,否则返回false
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/**
* 排他模式下释放锁
*
* @param arg 通常传递1,如果节点即将进入条件队列,那么传递入队时的state(如重入次数)
* @return 如果state已经被完全释放(state = 0),返回true,否则返回false
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**
* 共享模式下尝试获取锁
* 该方法一般被acquire方法调用,如果获取失败,返回false,当前线程将执行入队操作,如果当前线程不在同 * 步队列。进入同步队列后需要等待其他线程执行release操作后将其唤醒才能出队。
* @param arg 通常传递1,如果节点是刚从条件队列唤醒,那么传递阻塞之前的state值(如重入次数)
* @return 如果获取失败,返回负数,共享下返回0,成功返回整数
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 共享模式下释放锁
*
* @param arg 通常传递1,如果节点即将进入条件队列,那么传递入队时的state(如重入次数)
* @return {@code true} if this release of shared mode may permit a
* waiting acquire (shared or exclusive) to succeed; and
* {@code false} otherwise
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 排他模式返回true,共享模式返回false
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
获取锁
排他模式
/**
* 如果获取失败,当前线程进入同步队列
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 尝试获取
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 获取失败,入队
selfInterrupt();
}
/**
* 如果获取失败,当前线程进入同步队列,可中断
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 中断检查
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg)) // 尝试获取
doAcquireInterruptibly(arg); // 获取失败,入队
}
/**
* 如果获取失败,当前线程进入同步队列,可中断,可超时
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 中断检测
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) || // 尝试获取
doAcquireNanos(arg, nanosTimeout); // 获取失败,调用该方法,入队
}
/**
* 创建同步节点,并加入到同步队列
* @param mode 共享模式 or 排他莫斯
*/
private Node addWaiter(Node mode) {
// 创建同步节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 队列已初始化
if (pred != null) {
node.prev = pred;
// CAS原子修改同步队列尾节点为新建节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 队列未初始化、或者第一次入队失败入队新建节点会走到这里
enq(node);
return node;
}
/**
* 自旋+CAS入队节点
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 如果队列未初始化,初始队列的头尾节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// CAS入队节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* 排他模式下,入队节点成功后会阻塞当前线程,并从尾部寻找到头节点,并唤醒尝试获取锁
*
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 从尾节点向头节点遍历
final Node p = node.predecessor();
// 第一次执行该方法,或者第二次循环,由于第一次执行去除中间已取消节点,或者中间有节点成功获 // 取锁,执行成功退出了队列,导致当前节点成为了第二个节点,那么这里if成立
// 如果遍历到头节点,则让当前节点尝试获取锁
if (p == head && tryAcquire(arg)) {
// 获取成功,设置头节点为当前
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// shouldParkAfterFailedAcquire 第一次先更新前驱节点状态为 SIGNAL,第二次循环进入会执行 // parkAndCheckInterrupt阻塞当前线程,直到被唤醒执行循环,
if (shouldParkAfterFailedAcquire(p, node) && // 前驱p状态为SIGNAL则返回true
parkAndCheckInterrupt()) // 阻塞当前线程,如果线程已被中断,返回true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 是否阻塞当前活动线程,从当前节点向前找到第一个未取消到的节点,移除遍历过程中已被取消的节点,
* 并更新第一个未取消节点的状态为 SIGNAL.
* 当前驱节点状态为 SIGNAL 返回true,否则返回false
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点等待唤醒
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
// 前驱节点已经被取消,向前寻找一个未取消的节点
if (ws > 0) {
// 移除队列中已被取消的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
// 到这里说明前驱节点的状态为0或者-3 waitStatus = 0 or waitStatus = PROPAGETE
else {
// 更新前驱节点状态为 等待唤醒
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* 阻塞当前活动线程,返回当前线程是否已被打断
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
/**
* 可中断,
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 可中断则检测到中断立即抛出异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 超时时间已到
if (nanosTimeout <= 0L)
return false;
// 最长等待到这个时间
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 当前节点是头节点的后驱节点
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 超时剩余多少
nanosTimeout = deadline - System.nanoTime();
// 超时时间已到
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold) // 如果超时时间大于最大自旋时间,那么阻塞当前线程,否则自旋
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
// 检测到中断,抛出异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
共享模式
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
/**
* 获取共享锁
*/
private void doAcquireShared(int arg) {
// 创建同步节点,并加入同步节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 当前节点的前驱
final Node p = node.predecessor();
// 当前节点的前驱节点是头节点,则当前节点线程尝试获取
if (p == head) {
// 尝试获取
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
释放锁
排他模式
public final boolean release(int arg) {
// 尝试释放
if (tryRelease(arg)) {
// 从头节点开始唤起节点
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒排队的下一个节点
unparkSuccessor(h);
return true;
}
return false;
}
/**
* 唤醒后驱节点
*
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 当前节点的后驱已被取消,则从尾节点开始查找最早一个未取消的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒下一个后驱节点
if (s != null)
LockSupport.unpark(s.thread);
}
共享模式
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
// 自旋,唤醒等待的第一个线程(其他线程将由第一个线程向后传递唤醒)
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒第一个等待线程
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
内部类条件队列ConditionObject
关键属性
// 条件队列头节点
private transient Node firstWaiter;
// 条件队列尾节点
private transient Node lastWaiter;
进入条件队列
await()
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 创建条件队列节点,移除队列中已取消节点,并入队
Node node = addConditionWaiter();
// 完全释放锁,调用该方法的线程必须是持锁线程
int savedState = fullyRelease(node);
int interruptMode = 0;
// 当前节点不在同步队列,阻塞当前线程
while (!isOnSyncQueue(node)) {
// 阻塞当前线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 向上:线程完全释放锁(重入次数将会被保存,之后唤醒线程后,再获取等次数重入)并进入条件队列,排队阻塞,等待条件出现
//////////////////////////////////////////////
// 向下:条件出现,sigal方法中线程从条件队列移除,线程被唤醒,继续执行
// 当节点收到signal被唤醒后,重新获取阻塞前的持锁次数,然后向下执行
// 进入自旋获取锁 如果线程被打断,处理中断
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清除条件队列中已被取消的线程
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 中断处理
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* 创建条件队列等待节点
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
// 从头节点开始清除已取消节点
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建条件队列
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
// 当前新增节点为尾节点
lastWaiter = node;
return node;
}
/**
* 从头部开始移除已取消节点
* 条件队列是单链表,所以从头部开始遍历移除
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
// 节点已取消
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
/**
* 完全释放锁
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 完全释放锁
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
await(long nanosTimeout)
可超时
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
// 如果超时时间大于自旋时间,那么阻塞线程;否则,线程自旋
while (!isOnSyncQueue(node)) {
// 已超时,节点从条件队列进入同步队列
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
// 超时时间大于自旋时间,直接阻塞
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
// 剩余超时时间
nanosTimeout = deadline - System.nanoTime();
}
// 自旋获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
/**
* 将节点从条件队列进入同步队列
*/
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
等待条件出现,唤醒条件队列中线程signal
/**
* 条件出现,唤醒条件队列头节点
* 如果当前线程不是持锁线程,抛出异常
* 如果条件队列头节点不为null,唤醒头节点,为null则什么也不错
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 唤醒条件队列头节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
// 第二个等待节点是null,则清空条件队列,lastWaiter、firstWaiter都为空
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
/**
* 唤醒节点,节点由条件队列转入同步队列
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 唤醒
LockSupport.unpark(node.thread);
return true;
}
/**
* 唤醒全部条件队列上的节点
*/
public final void signalAll() {
// 持锁线程不是当前线程,立即抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
// 从头节点开始逐个遍历唤醒
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}