Support for these operations requires the coordination of three basic components:
* Atomically managing synchronization state
* Blocking and unblocking threads
* Maintaining queues
一个Synchronizer需要三个基本组件,再来看看AbstractQueuedSynchronizer的源码:
/**
* The synchronization state.
*/
private volatile int state;
/**
* Head of the wait queue, lazily initialized. Except for initialization, it
* is modified only via method setHead. Note: If head exists, its waitStatus
* is guaranteed not to be CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via method enq
* to add new wait node.
*/
private transient volatile Node tail;
上面代码声明了state和队列wait queue(FIFO类型)。AbstractQueuedSynchronizer采用LockSupport来支持Block,LockSupport的park()和unpark(Thread thread)可以block和unblock一个线程。
再来看看wait queue中Node的结构,首先看源码中Node class的说明:
The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten)lock queue. CLH locks are normally used for spinlocks. We instead use them for blocking synchronizers, but use the same basic tactic of holding some of the control information about a thread in the predecessor of its node. A "status" field in each node keeps track of whether a thread should block. A node is signalled when its predecessor releases. Each node of the queue otherwise serves as a specific-notification-style monitor holding a single waiting thread. The status field does NOT control whether threads are granted locks etc though. A thread may try to acquire if it is first in the queue. But being first does not guarantee success; it only gives the right to contend.
So the currently released contender thread may need to rewait.
To enqueue into a CLH lock, you atomically splice it in as new tail. To dequeue, you just set the head field.
<pre>
+------+ prev +-----+ +-----+
head | | <---- | | <---- | | tail
+------+ +-----+ +-----+
</pre>
Insertion into a CLH queue requires only a single atomic operation on "tail", so there is a simple atomic point of demarcation from unqueued to queued. Similarly, dequeing involves only updating the "head". However,it takes a bit more work for nodes to determine who their successors are,in part to deal with possible cancellation due to timeouts and interrupts.
The "prev" links (not used in original CLH locks), are mainly needed to handle cancellation. If a node is cancelled, its successor is (normally) relinked to a non-cancelled predecessor.
For explanation of similar mechanics in the case of spin locks, see the papers by Scott and Scherer at http://www.cs.rochester.edu/u/scott/synchronization/
We also use "next" links to implement blocking mechanics. The thread id for each node is kept in its own node, so a predecessor signals the next node to wake up by traversing next link to determine which thread it is.Determination of successor must avoid races with newly queued nodes to set the "next" fields of their predecessors. This is solved when necessary by checking backwards from the atomically updated "tail" when a node's successor appears to be null. (Or, said differently, thenext-links are an optimization so that we don't usually need a backward scan.)
Cancellation introduces some conservatism to the basic algorithms. Since we must poll for cancellation of other nodes, we can miss noticing whether a cancelled node is ahead or behind us. This is dealt with by always unparking successors upon cancellation, allowing them to stabilize on a new predecessor.
CLH queues need a dummy header node to get started. But we don't create them on construction, because it would be wasted effort if there is never contention. Instead, the node is constructed and head and tail pointers are set upon first contention.
Threads waiting on Conditions use the same nodes, but use an additional link. Conditions only need to link nodes in simple (non-concurrent) linked queues because they are only accessed when exclusively held. Upon await, a node is inserted into a condition queue. Upon signal, the node is transferred to the main queue. A special value of status field is used to mark which queue a node is on. 说明很长,黑体部分是关键。继续看Node class的源码:
static final class Node {
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
定义了一些常量,指明了wait status的几个状态。
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be) blocked (via park), so the current node must unpark its successor when it releases or cancels. To avoid races, acquire methods must first indicate they need a signal, then retry the atomic acquire, and then on failure,block.
* CANCELLED:This node is cancelled due to timeout or interrupt. Nodes never leave this state. In particular, a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue. It will not be used as a sync queue node until transferred. (Use of this value here has nothing to do with the other uses of the field, but simplifies mechanics.)
* 0: None of the above
*
* The values are arranged numerically to simplify use. Non-negative
* values mean that a node doesn't need to signal. So, most code doesn't
* need to check for particular values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and CONDITION
* for condition nodes. It is modified only using CAS.
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on for
* checking waitStatus. Assigned during enqueing, and nulled out (for
* sake of GC) only upon dequeuing. Also, upon cancellation of a
* predecessor, we short-circuit while finding a non-cancelled one,
* which will always exist because the head node is never cancelled: A
* node becomes head only as a result of successful acquire. A cancelled
* thread never succeeds in acquiring, and a thread only cancels itself,
* not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread unparks upon
* release. Assigned once during enqueuing, and nulled out (for sake of
* GC) when no longer needed. Upon cancellation, we cannot adjust this
* field, but can notice status and bypass the node if cancelled. The
* enq operation does not assign next field of a predecessor until after
* attachment, so seeing a null next field does not necessarily mean
* that node is at end of queue. However, if a next field appears to be
* null, we can scan prev's from the tail to double-check.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on construction and
* nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special value SHARED.
* Because condition queues are accessed only when holding in exclusive
* mode, we just need a simple linked queue to hold nodes while they are
* waiting on conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive, we save a
* field by using special value to indicate shared mode.
*/
Node nextWaiter;
这几个变量都非常关键,只有理解每个变量所代表的含义,才能继续着去看源码。
Synchronizer采用Template Pattern,AbstractQueuedSynchronizer是个抽象类,其中没有抽象方法,但有五个protected方法需要具体子类去覆盖:
/**
* Attempts to acquire in exclusive mode. This method should query if the
* state of the object permits it to be acquired in the exclusive mode, and
* if so to acquire it.
*
* This method is always invoked by the thread performing acquire. If this
* method reports failure, the acquire method may queue the thread, if it is
* not already queued, until it is signalled by a release from some other
* thread. This can be used to implement method {@link Lock#tryLock()}.
*
* The default implementation throws {@link UnsupportedOperationException}.
*
* @param arg
* the acquire argument. This value is always the one passed to
* an acquire method, or is the value saved on entry to a
* condition wait. The value is otherwise uninterpreted and can
* represent anything you like.
* @return {@code true} if successful. Upon success, this object has been
* acquired.
* @throws IllegalMonitorStateException
* if acquiring would place this synchronizer in an illegal
* state. This exception must be thrown in a consistent fashion
* for synchronization to work correctly.
* @throws UnsupportedOperationException
* if exclusive mode is not supported
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/**
* Attempts to set the state to reflect a release in exclusive mode.
*
* This method is always invoked by the thread performing release.
*
* The default implementation throws {@link UnsupportedOperationException}.
*
* @param arg
* the release argument. This value is always the one passed to a
* release method, or the current state value upon entry to a
* condition wait. The value is otherwise uninterpreted and can
* represent anything you like.
* @return {@code true} if this object is now in a fully released state, so
* that any waiting threads may attempt to acquire; and {@code
* false} otherwise.
* @throws IllegalMonitorStateException
* if releasing would place this synchronizer in an illegal
* state. This exception must be thrown in a consistent fashion
* for synchronization to work correctly.
* @throws UnsupportedOperationException
* if exclusive mode is not supported
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**
* Attempts to acquire in shared mode. This method should query if the state
* of the object permits it to be acquired in the shared mode, and if so to
* acquire it.
*
* This method is always invoked by the thread performing acquire. If this
* method reports failure, the acquire method may queue the thread, if it is
* not already queued, until it is signalled by a release from some other
* thread.
*
* The default implementation throws {@link UnsupportedOperationException}.
*
* @param arg
* the acquire argument. This value is always the one passed to
* an acquire method, or is the value saved on entry to a
* condition wait. The value is otherwise uninterpreted and can
* represent anything you like.
* @return a negative value on failure; zero if acquisition in shared mode
* succeeded but no subsequent shared-mode acquire can succeed; and
* a positive value if acquisition in shared mode succeeded and
* subsequent shared-mode acquires might also succeed, in which case
* a subsequent waiting thread must check availability. (Support for
* three different return values enables this method to be used in
* contexts where acquires only sometimes act exclusively.) Upon
* success, this object has been acquired.
* @throws IllegalMonitorStateException
* if acquiring would place this synchronizer in an illegal
* state. This exception must be thrown in a consistent fashion
* for synchronization to work correctly.
* @throws UnsupportedOperationException
* if shared mode is not supported
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* Attempts to set the state to reflect a release in shared mode.
*
* This method is always invoked by the thread performing release.
*
* The default implementation throws {@link UnsupportedOperationException}.
*
* @param arg
* the release argument. This value is always the one passed to a
* release method, or the current state value upon entry to a
* condition wait. The value is otherwise uninterpreted and can
* represent anything you like.
* @return {@code true} if this release of shared mode may permit a waiting
* acquire (shared or exclusive) to succeed; and {@code false}
* otherwise
* @throws IllegalMonitorStateException
* if releasing would place this synchronizer in an illegal
* state. This exception must be thrown in a consistent fashion
* for synchronization to work correctly.
* @throws UnsupportedOperationException
* if shared mode is not supported
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* Returns {@code true} if synchronization is held exclusively with respect
* to the current (calling) thread. This method is invoked upon each call to
* a non-waiting {@link ConditionObject} method. (Waiting methods instead
* invoke {@link #release}.)
*
* <p>
* The default implementation throws {@link UnsupportedOperationException}.
* This method is invoked internally only within {@link ConditionObject}
* methods, so need not be defined if conditions are not used.
*
* @return {@code true} if synchronization is held exclusively; {@code
* false} otherwise
* @throws UnsupportedOperationException
* if conditions are not supported
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
AbstractQueuedSynchronizer的源码很长,光看这部分的源码感觉条理还是不清晰的。不妨从上层的Synchronizer来看看。《The java.util.concurrent Synchronizer Framework》提到:
the general form of the resulting implementation of the basic acquire operation (exclusive, noninterruptible, untimed case only) is:
if (!tryAcquire(arg)) {
node = create and enqueue new node;
pred = node's effective predecessor;
while (pred is not head node || !tryAcquire(arg)) {
if (pred's signal bit is set)
park();
else
compareAndSet pred's signal bit to true;
pred = node's effective predecessor;
}
head = node;
}
And the release operation is:
if (tryRelease(arg) && head node's signal bit is set) {
compareAndSet head's signal bit to false;
unpark head's successor, if one exists
}
结合ReentrantLock中的FairSync来逐步分析:
/**
* Sync object for fair locks
*/
final static class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
.........................
}
FairSync 继承了Sync类,override lock和tryAcquire方法。lock方法中调用 acquire(1):
acquire方法在AbstractQueuedSynchronizer中:
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented by invoking
* at least once {@link #tryAcquire}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking, invoking
* {@link #tryAcquire} until success. This method can be used to implement
* method {@link Lock#lock}.
*
* @param arg
* the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and can
* represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire方法在FairSync中:
/**
* Fair version of tryAcquire. Don't grant access unless recursive call
* or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); //取出state
if (c == 0) {
if (isFirst(current) && compareAndSetState(0, acquires)) { //设置state值为acquires即1
setExclusiveOwnerThread(current); //让当前线程exclusively owned the synchronizer
return true;
}
} else if (current == getExclusiveOwnerThread()) { //如果当前线程已经持有synchronizer
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);//state值累加acquires,在已经exclusively owned synchronizer的情况下,继续累加state的值。这表明ReentrantLock是可重入锁,
return true;
}
return false;
}
addWaiter及acquireQueued方法在AbstractQueuedSynchronizer中:
/**
* Creates and enqueues node for given thread and mode.
*
* @param current
* the thread
* @param mode
* Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {//设置tail结点为node
pred.next = node;//双向队列
return node;
}
}
enq(node);//如果tail结点为空,直接将结点插入队列
return node;
}
enq方法也在AbstractQueuedSynchronizer中:
/**
* Inserts node into queue, initializing if necessary. See picture above.
*
* @param node
* the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize,这个时候队列还没有初始化
Node h = new Node(); // Dummy header
h.next = node;
node.prev = h;//初始化后,循环队列中存在两个结点,一个dummy head,tail结点为参数node
if (compareAndSetHead(h)) {
tail = node;//完成tail=tail.next工作,为下一步建链准备
return h;
}
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {//完成tail=tail.next工作,为下一步建链准备
t.next = node;
return t;
}
}
}
}
继续:
/**
* Acquires in exclusive uninterruptible mode for thread already in queue.
* Used by condition wait methods as well as acquire.
*
* @param node
* the node
* @param arg
* the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//取出前继结点
if (p == head && tryAcquire(arg)) {//node为队列中第一个结点且exclusively owned synchronizer
setHead(node);//将当前结点设为head
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
//p's thread is blocked and interrupted
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
这段代码感觉很诡异,试想一下,如果只能进入第二个if这种情况,或许我的理解有误,看来得还发邮件给Doug Lea问问:这个方法是个阻塞方法,直到当前结点前继结点是head而且当前线程exclusively owned synchronizer才能返回,如果进入第二个if,那么当前线程是blocked且interrupted的,会等待其他线程操作
继续看shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法:
/**
* Checks and updates status for a node that failed to acquire. Returns true
* if thread should block. This is the main signal control in all acquire
* loops. Requires that pred == node.prev
*
* @param pred
* node's predecessor holding status
* @param node
* the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int s = pred.waitStatus;
if (s < 0)
/*
* This node has already set status asking a release to signal it, so it can safely park
*/
return true;
if (s > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
//向队列前面遍历,直至找到一个线程blocked的结点,并将其设为node的前继结点
} else
/*
* Indicate that we need a signal, but don't park yet. Caller will
* need to retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
最后看一下异常的处理cancelAcquire(node):
/**
* Cancels an ongoing attempt to acquire.
*
* @param node
* the node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// Getting this before setting waitStatus ensures staleness
Node predNext = pred.next;
// Can use unconditional write instead of CAS here
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null); //把pred设为tail,pre.next设为null
} else {
// If "active" predecessor found...
if (pred != head && (pred.waitStatus == Node.SIGNAL || compareAndSetWaitStatus(pred, 0, Node.SIGNAL)) && pred.thread != null) {
// If successor is active, set predecessor's next link
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);//pred.next=node.next
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
回顾一下,在FairSync的lock方法里,一共做了哪些事情:
1.调用tryAcquire方法exclusively owned synchronizer,如果成功,直接返回,失败执行2
2.调用addWaiter方法(传入Node.EXCLUSIVE),将当前线程以EXCLUSIVE模式添加到队列尾部
3.调用acquireQueued阻塞方法直至node前继结点为head并且成功exclusively own synchronizer,最后deque node并返回,可能会interrupt invoker thread
继续看ReentrantLock类的unlock方法:
public void unlock() {
sync.release(1);
}
release方法在AbstractQueuedSynchronizer中:
/**
* Releases in exclusive mode. Implemented by unblocking one or more threads
* if {@link #tryRelease} returns true. This method can be used to implement
* method {@link Lock#unlock}.
*
* @param arg
* the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and can
* represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//unpark 下一个结点
return true;
}
return false;
}
tryRelease方法在Sync中:
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //累减state值
if (Thread.currentThread() != getExclusiveOwnerThread())
//当前线程必须exclusively own synchronizer
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
//成功release,现在没有线程excluseive own synchronizer
}
setState(c);
return free;
}
继续看unparkSuccessor方法:
/**
* Wakes up node's successor, if one exists.
*
* @param node
* the node
*/
private void unparkSuccessor(Node node) {
/*
* Try to clear status in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
compareAndSetWaitStatus(node, Node.SIGNAL, 0);
//如果node's waitStatus为Node.SIGNAL,恢复到初始状态
/*
* Thread to unpark is held in successor, which is normally just the
* next node. But if cancelled or apparently null, traverse backwards
* from tail to find the actual non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//unpark node结点的下个结点所持有的线程
}
回顾一下unlock方法所进行的操作:
1.调用tryRelease方法放弃synchronizer的排它持有权
2.unpark node的下一个blocked结点
看完FairSync的lock方法,继续看NotFairSync类:
NotFairSync也是override lock和tryAcquire方法:
/**
* Performs lock. Try immediate barge, backing up to normal acquire on
* failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
NotFairSync锁策略与FairSync不同,首先直接尝试让当前线程exclusively own synchronizer,继续看两者tryAcquire方法的不同:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in subclasses,
* but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
对比FairSync的tryAcquire,发现两个方法极其相似,只是在if (isFirst(current) && compareAndSetState(0, acquires))处不同,FairSync要求当前线程必须在队列头部才能exclusively owned synchronizer,而NotFairSync则无此要求,可见FairSync与NotFairSync惟一的不同在于NotFairSync会尽最大努力让当前线程exclusively owned synchronizer,但有可能会导致其他线程饿死,而FairSync则保证等待时间最长的线程(即队列中第一个结点所含的线程)优先exclusively owned synchronizer,这样效率可能不高,但不会导致某些线程饿死。