ReentrantLock 简介
ReentrantLock也是一个可重入的互斥锁,跟 synchronized 提供一样的线程同步功能,但是比 synchronized 更加灵活,优化了 synchronized 的不足。
ReentrantLock 基于 AQS 实现,它使用一个内部类来实现 AQS 接口,提供 Sync 同步锁的功能。
ReentrantLock 的几个重要方法:
-
lock:阻塞加锁直到获取锁为止。与 synchronized 语义一样,此方法不支持中断、超时。
-
lockInterruptibly:阻塞加锁,支持中断。
-
tryLock:尝试获得锁,获取锁返回 true,获取不到锁返回 false。该方法不等待,立即返回。
-
tryLock(long time,TimeUnit unit):阻塞可超时,在给定时间内尝试获得锁,若在该指定时间到达之后,还没获取锁,则返回 false。该方法和 tryLock() 方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回 false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回 true。
-
unlock:释放锁。
ReentrantLock 源码
AbstractQueuedSynchronizer
属性
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
/**
* Creates a new {@code AbstractQueuedSynchronizer} instance
* with initial synchronization state of zero.
*/
protected AbstractQueuedSynchronizer() { }
/**
* Wait queue node class.
*/
static final class Node {
// Node 类源码见下面
}
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// Queuing utilities
/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices
* to improve responsiveness with very short timeouts.
*/
static final long spinForTimeoutThreshold = 1000L;
}
Node
/**
* Wait queue node class.
*/
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
// 表示当前的线程已被取消(waitStatus > 0)
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
// 表示后继节点在等待当前结点唤醒。
// 后继节点入队时,会将前继节点的状态更新为 SIGNAL(逻辑在 shouldParkAfterFailedAcquire 方法里)
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
// 表示当前节点在等待 condition,也就是在 condition 队列中
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
// 传播状态模式下,前继节点不仅会唤醒其后继节点,同时也可能会唤醒后继的后继节点。
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* 节点状态。新节点入队时的默认状态为 0
* waitStatus 是负值表示节点处于有效等待状态,而正值表示节点已被取消
*
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* 该状态节点的后继节点通过 park 被阻塞了,所以当前节点必须在
* 释放锁或者被取消时去调用 unpark 唤醒其后继节点
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* 该状态的节点由于超时或者中断被取消了。然后该节点不会修改状态
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* 前驱节点
*/
volatile Node prev;
/**
* 后继节点
*/
volatile Node next;
/**
* 获取锁的线程
*
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
Node 既可以作为同步队列节点使用,也可以作为Condition的等待队列节点使用
在作为同步队列节点时(lock/unlock),nextWaiter 可能有两个值:EXCLUSIVE、SHARED 标识当前节点是独占模式还是共享模式,同步队列的链表是通过上面的 prev 和 next 属性实现的
在作为等待队列节点使用时(await/signal),nextWaiter 保存后继的 await 等待节点(属性名称 nextWaiter 看的出来)
Lock 锁类似于 synchronized 锁,即可以实现阻塞锁功能,也可以实现 wait/notify 进行线程间的消息通信功能
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
tryAcquire
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
tryRelease
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
tryAcquireShared
/**
* 尝试获取共享锁
*
* 返回值:
* 1. 返回负数表示获取共享锁失败
* 2. 返回 0 表示获取共享锁成功,但没有剩余可用资源
* 3. 返回正数表示获取共享锁成功,且有剩余资源
*
* 子类方法的不同实现:
* 在 ReentrantReadWriteLock 中的实现,返回值只有两种,1 表示获取锁成功,-1 表示获取锁失败
* 在 Semaphore 中的实现,返回值代表资源剩余量,返回值大于等于 0 表示获取锁成功,小于 0 表示获取锁失败
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
tryReleaseShared
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
acquire
// 阻塞获取锁
// 获取到锁了,该方法就能执行完成,然后继续执行业务逻辑;否则阻塞执行该方法的线程,等待唤醒
public final void acquire(int arg) {
// 尝试获取到锁直接退出方法,表示已拿到锁,继续执行业务逻辑
if (!tryAcquire(arg) &&
// 没有获取锁,将当前线程插入到等待队列中
// acquireQueued 方法返回了,表示线程被唤醒了
// acquireQueued 返回 true,表示线程被中断了,否则表示被正常唤醒,可以继续执行业务逻辑
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 下面的入队步骤和 enq 方法里的入队步骤相同
// 快速的尝试是否入队成功,成功了就不用走 enq 方法了
Node pred = tail;
// pred 为空,表示链表没有插入过元素节点
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 上面尝试入队失败,进入 enq 方法,开始自旋入队,直到入队成功
enq(node);
return node;
}
enq
// 将 node 节点插入到双向链表尾部
// 返回 node 的前序节点
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 尾结点为空,说明队列为空
if (t == null) { // Must initialize
// 初始化一个头结点,作为哨兵节点
if (compareAndSetHead(new Node()))
// 尾结点指向头结点
tail = head;
// 然后继续循环
} else {
// node 的前序节点先指向尾结点
node.prev = t;
// 尝试将尾结点改为 node
if (compareAndSetTail(t, node)) {
// 修改尾结点为 node 成功后,将原先尾结点的后续节点指向 node
t.next = node;
return t;
}
}
}
}
acquireQueued
// 将入队后的 node 追加到一个没有被取消的线程节点后,并将该线程节点的 waitStatus 设置为 Node.SIGNAL
// 然后 node 线程节点阻塞(park),等待 release 方法唤醒
// 被 park 前或者被 unpark 后,线程都会不停的自旋去获取锁(p == head && tryAcquire(arg))
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 在等待队列中自旋等待加锁
for (;;) {
// 获取 node 的 prev 节点
final Node p = node.predecessor();
// 如果 node 的前序节点是 head 节点,并且尝试获取锁成功
if (p == head && tryAcquire(arg)) {
// 将当前节点设置为头结点
setHead(node);
// 获取到锁之后会把原 head 给释放
p.next = null; // help GC
failed = false;
return interrupted;
}
// shouldParkAfterFailedAcquire:(每添加一个节点的时候)将节点的前序节点 pred 的状态设置为 Node.SIGNAL
// parkAndCheckInterrupt:阻塞新加入节点的线程(并且检查线程是否被中断)
// 如果 shouldParkAfterFailedAcquire 方法返回 false,则会重新执行 for 循环
// parkAndCheckInterrupt 方法中被阻塞的线程如果被唤醒了,可能是其它线程调用了 unpark 方法
// 也可能是调用了 interrupt 方法,所以 parkAndCheckInterrupt 返回 true,表示线程是被其它
// 线程中断了线程,则进入到 if 中将 interrupted 设置为 true
if (shouldParkAfterFailedAcquire(p, node) &&
// park、unpark 方法调用不分先后
// 所以线程先被 release 方法 unpark,然后这里再被 park,该线程不会被阻塞
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHead
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
shouldParkAfterFailedAcquire
// (每添加一个节点的时候)将节点的前序节点 pred 的状态设置为 Node.SIGNAL
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// pred 被取消了,将 node 的前驱节点往前移
// 从尾结点向头结点方向遍历,找到一个没有被取消的前驱节点
do {
// 重置 node 的前驱节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 重置 node 前驱节点的后继节点
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 将 pred 的 waitStatus 设置为 Node.SIGNAL
// node 节点初始化和 head 结点唤醒后继节点时,node 和 head 节点的 awaitStatus 都会被设置为 0
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt
// 阻塞线程并且检查是否需要中断线程
private final boolean parkAndCheckInterrupt() {
// 中断当前线程
LockSupport.park(this);
// LockSupport.park 阻塞会响应中断,并且中断标记被设置成 true,但不抛出 InterruptedException 异常
// 所以如果有触发过中断请求,那么这个方法会返回当前的中断标识 true
return Thread.interrupted();
}
cancelAcquire
// 删除 node 节点
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
// node 的 prev 节点
Node pred = node.prev;
// 跳过被取消的节点
while (pred.waitStatus > 0) {
node.prev = pred = pred.prev;
}
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
// node 当前前序节点的后继节点
Node predNext = pred.next;
// 将 node 的 waitStatus 设置为 Node.CANCELLED(不需要使用 cas)
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
// 如果 node 是 tail 节点,则要将 node 节点的 prev 节点设置为 tail 节点
if (node == tail && compareAndSetTail(node, pred)) {
// 设置 node 的 next 节点为 null
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// 获取 node 节点的 next 节点
Node next = node.next;
// 找 node 的后继没有被取消的节点
// 将后继节点设置为 node 前序节点的后继节点
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 唤醒 node 节点的后继节点
// 情况1:node 的前序节点是 head 结点
// 情况2:node 的前序节点的 waitStatus 不是 SIGNAL,并且没有成功设置为 SIGNAL
// 情况3:node 的前序节点的线程为空
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
unparkSuccessor
// node 是头结点
// 将 node 节点的 waitStatus 设置为 0
// 唤醒 node 节点后第一个没有被取消的线程节点(从尾结点向头结点方向遍历)
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0) {
// 将 node 节点(head 节点)的 awaitStatus 设置为 0
compareAndSetWaitStatus(node, ws, 0);
}
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 唤醒线程是从尾结点遍历的,从头结点遍历会有短链的问题
for (Node t = tail; t != null && t != node; t = t.prev)
// waitStatus <= 0 是正常的状态
if (t.waitStatus <= 0)
s = t;
}
// 从尾结点开始遍历,最终找到一个可唤醒的节点
if (s != null)
LockSupport.unpark(s.thread);
}
selfInterrupt
// 中断当前线程
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
release
public final boolean release(int arg) {
// 调用子类实现类,如 ReentrantLock.Sync 中的 tryRelease 方法(ReentrantLock 是重入锁,重入几次就要释放几次锁)
// 重入锁时,要将所有重入锁都释放了,tryRelease 才会返回 true
if (tryRelease(arg)) {
Node h = head;
// unparkSuccessor 方法将 h.waitStatus 设置为 0
// 如果等于 0 了,表示线程已被释放
if (h != null && h.waitStatus != 0)
// 唤醒头结点的后继节点
unparkSuccessor(h);
return true;
}
return false;
}
acquireInterruptibly
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 先判断当前线程是否被中断
if (Thread.interrupted())
// 中断了就抛出异常
// (代码中线程被中断了需要自己来决定是否对线程做一些处理,比抛出异常可以中断线程的继续执行)
throw new InterruptedException();
// 尝试获取锁
if (!tryAcquire(arg))
// 没有获取锁,则加入到可响应异常的阻塞线程队列中
doAcquireInterruptibly(arg);
}
doAcquireInterruptibly
// 和 acquireQueued 方法类似
// acquireQueued 要是获取锁后如果判断被中断了,则返回 boolean
// 而 doAcquireInterruptibly 是抛出 InterruptedException 异常
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果线程被中断了,则抛出异常
throw new InterruptedException();
}
} finally {
// 线程是被抛出异常结束的
if (failed)
// 线程节点 node 中断获取锁
cancelAcquire(node);
}
}
tryAcquireNanos
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
doAcquireNanos
// 带超时时间的获取锁,超时退出
// 主要逻辑也同前面获取锁逻辑,只是加了超时的判断及处理逻辑
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 超时时间要大于 0
if (nanosTimeout <= 0L)
return false;
// 计算超时的截止时间
final long deadline = System.nanoTime() + nanosTimeout;
// 排它(锁)节点
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 走到这里会判断是否已过期,过期则返回
// 并且下面代码中的 nanosTimeout > spinForTimeoutThreshold) 判断如果不成立,也会重试 for 循环,再次走到这里判断是否过期
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
// 阻塞线程前判断一下是否过期
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 可抛异常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireShared
public final void acquireShared(int arg) {
// tryAcquireShared 获取锁,并返回剩余可用资源
// 如果获取成功(tryAcquireShared 返回值 >= 0),则表示获取锁,直接执行返回
if (tryAcquireShared(arg) < 0)
// tryAcquireShared 返回值 < 0 表示获取锁失败
// 则调用 doAcquireShared 方法阻塞等待(类似于排它锁)
doAcquireShared(arg);
}
doAcquireShared
// 请求共享锁
private void doAcquireShared(int arg) {
// 共享节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// node 的前序节点
final Node p = node.predecessor();
// 如果前序节点是 head 节点
if (p == head) {
// 尝试获取共享锁
// r >= 0 表示获取共享锁成功
int r = tryAcquireShared(arg);
if (r >= 0) {
// 设置 head 节点和唤醒后继的所有共享节点(互斥锁这里是调用 setHead 方法)
setHeadAndPropagate(node, r);
p.next = null; // help GC
// 响应中断
// 将用于中断的 selfInterrupt() 方法放到 doAcquireShared() 里了,而独占模式是放在 acquireQueued() 外面
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 可能会将 PROPAGATE 状态节点变为 SIGNAL 状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate
// propagate >= 0 进入该方法
private void setHeadAndPropagate(Node node, int propagate) {
// 获取 head 结点
Node h = head; // Record old head for check below
// 将 node 节点设置为 head 节点
setHead(node);
// propagate > 0 (获得了共享锁后还有剩余资源)才进入该逻辑
// head 头结点为 null,或 waitStatus < 0(共享节点)
if (propagate > 0 || h == null || h.waitStatus < 0 ||
// 重新获取 head 节点,head 头结点为 null,或 waitStatus < 0(共享节点)
(h = head) == null || h.waitStatus < 0) {
// 唤醒 node 的后继节点
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
doReleaseShared
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// head 头结点 waitStatus 是 Node.SIGNAL,则唤醒后继节点
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// waitStatus 是 0,则改为 Node.PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
tryAcquireSharedNanos
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
hasQueuedPredecessors
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// h != t 表示队列不为空
return h != t &&
// (s = h.next) == null 说明此时有其它线程节点先当前线程入队了(enq 方法),当前正修改了 tail 指针,但还没有设置 head 指针,所以当前节点前有前序节点(此时不会走 s.thread != Thread.currentThread())
// s.thread != Thread.currentThread() 说明当前线程前有前序节点
((s = h.next) == null || s.thread != Thread.currentThread());
}
isOnSyncQueue
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
findNodeFromTail
// 从尾结点遍历查找 node 节点,找到返回 true,否则返回 false
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
// node 是 tail 节点,返回 true
if (t == node)
return true;
// 尾节点为 null,返回 false
if (t == null)
return false;
// 往 head 结点遍历
t = t.prev;
}
}
fullyRelease
// 一次性释放掉当前锁的全部重入次数
// 如果失败了就将传参的 node 节点的 waitStatus 设置为 Node.CANCELLED
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
Sync
主要实现了 nonfairTryAcquire 方法和 tryRelease 方法。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
// 加阻塞锁
abstract void lock();
// 尝试获取非公平锁
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// c == 0 表示没有线程获取锁
if (c == 0) {
// 当前线程尝试加锁,实现非公平
if (compareAndSetState(0, acquires)) {
// 加锁成功,将当前线程设置给锁的 exclusiveOwnerThread 属性,表示当前线程获取到了该锁
setExclusiveOwnerThread(current);
// 返回 true,表示加锁成功
return true;
}
}
// 如果线程已经获取到了锁(重入锁)
else if (current == getExclusiveOwnerThread()) {
// 加锁次数增加 acquires 次(ReentrantLock 实现类中,acquires 是 1)
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 解锁
// 解锁的前提是已获取到了锁,所以解锁不用加同步锁
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 锁只能由加锁的线程来解锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 解锁完成
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 当前线程此时是否独占锁
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
// 获取占有锁的线程
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
// 获取当前线程重入锁的次数
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
// 是否加锁
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
NonfairSync
// 非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 加阻塞锁(加锁不成功,阻塞加锁的线程)
// acquire 方法里也有 tryAcquire 逻辑,lock 方法可以改成和 FairSync 的 lock 一样的逻辑?
@ReservedStackAccess
final void lock() {
// 先尝试获取锁,实现非公平锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 尝试获取锁失败,阻塞获取锁
acquire(1);
}
// 尝试加阻塞锁(加锁不成功,返回 false,否则返回 true)
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
FairSync
// 公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// 加阻塞锁
final void lock() {
acquire(1);
}
// 尝试加阻塞锁
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 加锁前,判断当前线程的节点是否是队列的 head 节点,实现公平锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
FairSync 和 NonfairSync 都继承了 Sync 类,都重写了 Sync 类的 lock 方法。
并且都有一个 tryAcquire 方法,NonfairSync 类的 tryAcquire 方法调用的是 Sync 的 nonfairTryAcquire 方法,FairSync 则自己实现了 tryAcquired 逻辑,区别就是 FairSync 的 tryAcquire 方法中 当 c 等于 0 时,有一个 !hasQueuedPredecessors()
与判断,需要先通过 hasQueuedPredecessors 判断当前线程的节点是否是队列的头结点。
Lock 接口
package java.util.concurrent.locks;
public interface Lock {
// 获取锁,若锁不可用,则当前线程将会阻塞,直到获得锁
void lock();
// 获取锁,当被中断或获取到锁才返回;
// 若锁不可用,则当前线程被阻塞,直到获取锁或被中断
void lockInterruptibly() throws InterruptedException;
// 尝试获取锁,并立即返回;true:获取锁成功;false:获取锁失败
boolean tryLock();
// 尝试在指定的超时时间获取锁,当获取到锁时返回 true;当超时或被中断时返回 false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁
void unlock();
// 返回一个和锁绑定的条件队列
// (类似于 synchronized 的 waitSet,但是它可以创建多个,不同 condition 队列之间互不干扰)
// 在等待条件之前线程必须先获取当前锁
Condition newCondition();
}
ReentrantLock 属性、构造方法
package java.util.concurrent.locks;
// 实现了 Lock 接口
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
// 默认构造方法是 new 一个非公平锁对象
public ReentrantLock() {
sync = new NonfairSync();
}
// fair 是 true 则 new 公平锁,否则 new 非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
}
lock
public void lock() {
// sync 是公平锁,则调用公平锁的 lock 方法
// sync 是非公平锁,则调用非公平锁的 lock 方法
sync.lock();
}
lockInterruptibly
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
tryLock
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
tryLock
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
unlock
public void unlock() {
sync.release(1);
}
newCondition
public Condition newCondition() {
return sync.newCondition();
}
Condition 源码
ReentrantLock 和 synchronized 功能类似,Condition 就和 Object 的 wait() 方法和 notify() 方法类似。
Condition在Lock体系设计中,用于实现与synchronized中monitor对象所提供的wait,notify,notifyAll相同的语义,对应的方法分别为await,signal,signalAll。一个Lock可以对应多个Condition,每个Condition对应一个条件化线程等待队列,而在synchronized中只能使用monitor这一个Condition。
和 Object 的 wait() 和 notify() 方法一样,当线程使用 Condition.await() 时,要求线程持有相关的重入锁在 Condition.await() 调用前,这个线程会释放这把锁。同理,在 Conditin.signal() 方法调用时,也要求线程先获得相关的锁。在 signal() 方法调用后,系统会从当前 Condition 对象的等待队列中,唤醒一个线程。一旦线程被唤醒,它会重新尝试获得与之绑定的重入锁,一旦成功获取,就可以继续执行了。
Condition 接口提供的基本方法如下∶
package java.util.concurrent.locks;
public interface Condition {
// await() 方法会使当前线程等待,同时释放当前锁,当其他线程中使用 signal() 或者 signalAll() 方法时,线程会重新获得锁并继续执行。
// 或者当线程被中断时,也能跳出等待。这和 Object.wait() 方法很相似。
void await() throws InterruptedException;
// 当前线程进入等待状态直到被通知,对中断不响应
void awaitUninterruptibly();
// 当前线程进入等待状态直到被通知、中断或超时。
// 返回值表示剩余时间,如果在 nanosTimeout 纳秒之前被唤醒,那么返回值就是 nanosTimeout - 实际耗时,返回值 <=0 说明超时
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 当前线程进入等待状态直到被通知、中断或超时,如果没有到指定时间被通知返回 true,否则返回 false
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒一个等待在 Condition 上的线程,该线程从等待方法返回之前必须获得与 Condition 相关联的锁
void signal();
// 唤醒所有等待在 Condition 上的线程
void signalAll();
}
在 ReentrantLock 中,Condition 是通过 newCondition() 方法创建的:
// 创建一个 condition 对象
final ConditionObject newCondition() {
return new ConditionObject();
}
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
// condition 队列的第一个节点
private transient Node firstWaiter;
/** Last node of condition queue. */
// condition 队列的最后一个节点
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
}
await
/**
* await对synchronized的wait的语义实现如下:将当前线程放入条件化等待队列,然后释放锁,
在while循环内阻塞休眠,直到被放到AQS同步队列了,这时说明条件满足了,可以去竞争获取锁了,通过调用acquireQueued去竞争获取锁。如果获取锁成功了,则真正从await返回
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 将线程封装成 node 节点加入到 condition 列队中去(类似于 lock 锁里的 enq 方法)
Node node = addConditionWaiter();
// 释放当前线程抢占的锁(await 后要释放锁资源)
int savedState = fullyRelease(node);
int interruptMode = 0;
// 自旋将等待节点被放到同步队列中
// 这里 Node 从 Condtion Queue 里面转移到 Sync Queue 里面有两种可能:
// 1. 其他线程调用 signal 进行转移
// 2. 当前线程被中断而进行 Node 的转移(在 checkInterruptWhileWaiting 里面进行转移)
while (!isOnSyncQueue(node)) { // isOnSyncQueue:判断当前线程是否在 Sync Queue 里面
// 阻塞当前线程
LockSupport.park(this);
// 阻塞的线程被唤醒了
// 说明此是通过线程中断的方式进行唤醒, 并且已经进行了 node 的转移, 转移到 Sync Queue 里面
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 调用 acquireQueued 在 Sync Queue 里面进行独占锁的获取, 返回值表明在获取的过程中有没有被中断过
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 通过 "node.nextWaiter != null" 判断 线程的唤醒是中断还是 signal, 因为通过中断唤醒的话,
// 此刻代表线程的 Node 在 Condition Queue 与 Sync Queue 里面都会存在
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// true 表示通过中断的方式唤醒线程
if (interruptMode != 0)
// 根据 interruptMode 的类型决定线程中断方式
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter
// 将当前线程封装成一个 Node 节点,放入 Condition Queue 里面
private Node addConditionWaiter() {
// 获取 condition 列队的尾节点,因为需要从尾节点的位置开始插入
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
// 对 "waitStatus != Node.CONDITION" 的节点进行删除(在Condition里面的Node的waitStatus 要么是CONDITION(正常), 要么就是 0 (signal/timeout/interrupt))
unlinkCancelledWaiters();
// 获取最新的 lastWaiter,重新赋值给 t
t = lastWaiter;
}
// 将当前线程封装成一个 CONDITION Node 节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// condition 队列没有元素
if (t == null)
// firstWaiter 指向插入的第一个节点 node
firstWaiter = node;
else
// Condition 队列已有元素,在队列尾部插入节点 node
t.nextWaiter = node;
// lastWaiter 执行新插入的尾结点元素
lastWaiter = node;
return node;
}
unlinkCancelledWaiters
一般的节点都会被 signal 唤醒,从 Condition Queue 转移到 Sync Queue,而若遇到 interrupt 或 等待超时,则直接改变 node 的状态(从 CONDITION 变成 0),并直接放入 Sync 里面,而不清理 Condition Queue 里面的节点。
unlinkCancelledWaiters 方法就是用来清理这些被中断或者超时的节点的。
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
// trail 是整个 Condition Queue 时遇到的最后一个有效的节点
Node trail = null;
while (t != null) {
// 记录 t 的 next 节点
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
// 将 t 从队列中删除
t.nextWaiter = null;
// next 是 t 后第一个有效节点
if (trail == null)
// 将 firstWaiter 指向 next
firstWaiter = next;
else
trail.nextWaiter = next;
// 遍历完了
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
checkInterruptWhileWaiting
// signal() 方法被调用后被中断返回
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
// signal() 方法被调用前被中断返回(会抛出 InterruptedException 异常)
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
private int checkInterruptWhileWaiting(Node node) {
// 判断是否被中断并获取中断模式
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
transferAfterCancelledWait
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
reportInterruptAfterWait
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
signal
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
// 当前线程未持有锁,则抛出 IllegalMonitorStateException 异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 唤醒头结点
doSignal(first);
}
doSignal
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
// 如果 first 节点的后继节点是 null,说明 condition 队列无元素了,则将 lastWaiter 置为 null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 释放 first 结点(first 节点的后继节点设置为 null)
first.nextWaiter = null;
}
// 调用 transferForSignal 将 first 节点迁移到 Sync Queue 里面
// 迁移不成功的话, 重新获取 firstWaiter 并赋值给 first,继续迁移 first 节点
while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
transferForSignal
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// 将节点的 waitStatus 从 CONDITION 修改为 0,
// 0 是添加到同步队列的节点的初始化状态
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 将该节点迁移到同步队列尾部,并获取 node 的前序节点赋值给 p
Node p = enq(node);
// 获取 node 前序节点的 waitStatus
int ws = p.waitStatus;
// node 的前序节点的线程被取消或者将 node 的前序节点状态设置为 Node.SIGNAL 失败
// 这里为什么需要判断并唤醒线程 ???
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 则马上唤醒(被 await 阻塞的) node 节点的线程
LockSupport.unpark(node.thread);
return true;
}
signalAll
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
doSignalAll
private void doSignalAll(Node first) {
// 清空链表节点
lastWaiter = firstWaiter = null;
// 从 first 节点开始,将遍历到的所有节点迁移到 sync 阻塞队列中
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}