ReentrantLock
ReentrantLock
可重入排他锁,基于AQS同步框架实现锁机制。可重入锁支持公平和非公平模式,默认使用非公平模式。内部通过
AbstractQueuedSynchronizer
中的内部类ConditionObject
可以实现条件等待,调用Condition#await
、Condition#signal
的线程需要先获取到锁,否则将会抛出异常。
AQS实现
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
/**
* 获取非公平锁
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// state = 0,锁未被任何线程持有,立即尝试获取一次
if (c == 0) {
// 与公平锁的区别在于非公平不论同步队列中是否有线程等待,都立即获取一次锁
// 公平锁在会先检查同步队列中是否有在排队的,如果有立即进入队列排队
if (compareAndSetState(0, acquires)) {
// cas成功,获取锁成功,则设置当前持有锁线程为当前线程
setExclusiveOwnerThread(current);
// 返回true,表示获取锁成功
return true;
}
}
// 锁已被持有,但是锁可能被重入
// 锁被当前线程持有,持有次数+acquires,立即返回获取锁成功
else if (current == getExclusiveOwnerThread()) {
// 增加获取次数
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 因为锁被当前线程持有,所以不需要加锁,直接更新即可
setState(nextc);
return true;
}
return false;
}
/**
* 释放锁
*/
protected final boolean tryRelease(int releases) {
// 释放后持有锁次数
int c = getState() - releases;
// 锁不是当前线程持有,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 标识锁是否已经全部释放
boolean free = false;
// c=0,表明锁已经完全释放
if (c == 0) {
// 锁完全释放
free = true;
// 置空持有锁线程
setExclusiveOwnerThread(null);
}
// 因为锁被当前线程持有,所以不需要加锁,直接更新即可
setState(c);
return free;
}
/**
* 是否是当前线程持有锁
*/
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
/**
* 当前谁持有锁。如果state = 0,表明没有线程持有锁,否则返回持有线程
*/
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
/**
* 持有锁次数
* 如果是当前线程持有锁,则返回持有次数,否则返回0
*/
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
/**
* 锁是否被持有
*/
final boolean isLocked() {
return getState() != 0;
}
}
/**
* 非公平锁
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* 非公平锁获取锁
*/
final void lock() {
// 不论前面是否有线程在排队,直接获取一次
if (compareAndSetState(0, 1))
// 获取成功,设置持所线程为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 第一次获取失败,调用AQS#acquire方法再次获取
acquire(1);
}
/**
* 该方法将被AQS#acquire调用
*/
protected final boolean tryAcquire(int acquires) {
// 调用非公平获取
return nonfairTryAcquire(acquires);
}
}
/**
* 公平锁实现
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
/**
* 公平锁获取锁
*/
final void lock() {
// 调用AQS#acquire方法再次获取
acquire(1);
}
/**
* 该方法将被AQS#acquire调用
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 锁还未被持有
if (c == 0) {
if (!hasQueuedPredecessors() && // 是否有线程在排队
compareAndSetState(0, acquires)) { // CAS获取锁
// 获取成功,设置持所线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
// 锁已被持有
// 锁被当前线程持有,持有次数+acquires,立即返回获取锁成功
else if (current == getExclusiveOwnerThread()) {
// 增加获取次数
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 因为锁被当前线程持有,所以不需要加锁,直接更新即可
setState(nextc);
return true;
}
// 获取锁失败,进入排队
return false;
}
}
获取锁
1、
void lock()
方法1.1、非公平锁:上来直接CAS抢占一次锁,如果获取获取成功返回,失败才调用
acquire
。acquire
中线程直接获取获取锁(CAS或重入),获取失败才进入排队。1.2、公平锁:直接调用
acquire
获取。acquire
中线程先检查是否有线程在排队,如果没有才尝试获取锁(CAS或重入),获取失败进入排队。2、
boolean tryLock()
方法无论非公平锁或者公平锁,都直接同样逻辑,不论是否有线程在排队,都会直接CAS抢占一次锁或者重入,抢占失败,线程返回。即
tryLock()
只会尝试获取一次,获取失败也不会进入排队。3、
void lockInterruptibly()
方法、boolean tryLock(long timeout, TimeUnit unit)
方法如果出现线程中断,立即抛出异常
/**
* 尝试获取锁,获取失败,进入排队
*/
public void lock() {
// 调用Sync#lock --> AQS#acquire
sync.lock();
}
public void lockInterruptibly() throws InterruptedException {
// 调用AQS#acquireInterruptibly
sync.acquireInterruptibly(1);
}
/**
* 无论公平或者非公平都只获取一次,获取失败,立即返回
*/
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
/**
* 尝试获取锁,如果超时立即返回,获取失败会进入排队
*/
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
// 非公平锁lock
非公平锁
lock()方法
/**
* 尝试获取锁,获取失败,进入排队
*/
public void lock() {
// 调用Sync#lock --> AQS#acquire
sync.lock();
}
/**
* NonfairSync#lock
*/
final void lock() {
// 非公平锁不论是否前面是否有线程在等待都会CAS抢占一次锁
if (compareAndSetState(0, 1))
// 抢占成功,更新持锁线程为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 直接抢占锁失败,进入acquire正常获取锁
acquire(1);
}
/**
* AQS#acquire
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && // CAS抢占锁失败,,会来到这里正常获取锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* NonfairSync#tryAcquire
*/
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* Sync#nonfairTryAcquire
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// state = 0,锁未被任何线程持有,立即尝试获取一次
if (c == 0) {
// 与公平锁的区别在于非公平不论同步队列中是否有线程等待,都立即获取一次锁
// 公平锁在会先检查同步队列中是否有在排队的,如果有立即进入队列排队
if (compareAndSetState(0, acquires)) {
// cas成功,获取锁成功,则设置当前持有锁线程为当前线程
setExclusiveOwnerThread(current);
// 返回true,表示获取锁成功
return true;
}
}
// 锁已被持有,但是锁可能被重入
// 锁被当前线程持有,持有次数+acquires,立即返回获取锁成功
else if (current == getExclusiveOwnerThread()) {
// 增加获取次数
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 因为锁被当前线程持有,所以不需要加锁,直接更新即可
setState(nextc);
return true;
}
return false;
}
公平锁
lock()
public void lock() {
sync.lock();
}
/**
* 公平锁获取锁
* 非公平锁会直接CAS抢占一次锁,抢占失败才会调用acquire()获取一次锁
*/
final void lock() {
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 无论锁是否被其他线程持有,都会判断队列中是否有其他线程在等待,如果没有才尝试获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
释放锁
如果调用
release()
方法的线程不是持有锁的线程,将会抛出异常。锁释放完全,Sync#tryRelease
返回true,否则返回false,返回true,意味着当前线程锁完全释放,使用unparkSuccessor
下一个线程,唤醒后的线程将继续在acquireQueued()
方法中继续自旋获取锁。
public void unlock() {
// 调用AQS#release --> Sync#tryRelease
sync.release(1);
}
/**
* 释放锁。如果当前线程已经完全释放锁,则唤醒后驱节点
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* 释放锁。
* 1、当前线程不是持锁线程,抛出异常
* 2、锁完全释放,置空持锁线程,返回true
* 3、锁未完全释放,返回false
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/**
* 找到第一个等待唤醒的节点,唤醒线程,唤醒后的线程继续在acquireQueued()方法中自旋获取锁
*/
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 当前节点的后驱节点不存在或者已取消
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾节点找到第一个等待唤醒的节点,
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒第一个等待唤醒的后驱节点
if (s != null)
LockSupport.unpark(s.thread);
}
创建Condition
// 直接调用Sync#newCondition()
public Condition newCondition() {
return sync.newCondition();
}
// 创建Condition对象(AQS#ConditionObject)
final ConditionObject newCondition() {
return new ConditionObject();
}
进入条件队列
await()
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 创建条件队列节点,移除队列中已取消节点,并入队
Node node = addConditionWaiter();
// 完全释放锁,调用该方法的线程必须是持锁线程
int savedState = fullyRelease(node);
int interruptMode = 0;
// 当前节点不在同步队列,阻塞当前线程
while (!isOnSyncQueue(node)) {
// 阻塞当前线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 向上:线程完全释放锁(重入次数将会被保存,之后唤醒线程后,再获取等次数重入)并进入条件队列,排队阻塞,等待条件出现
//////////////////////////////////////////////
// 向下:条件出现,sigal方法中线程从条件队列移除,线程被唤醒,继续执行
// 当节点收到signal被唤醒后,重新获取阻塞前的持锁次数,然后向下执行
// 进入自旋获取锁 如果线程被打断,处理中断
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清除条件队列中已被取消的线程
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 中断处理
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* 创建条件队列等待节点
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
// 从头节点开始清除已取消节点
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建条件队列
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
// 当前新增节点为尾节点
lastWaiter = node;
return node;
}
/**
* 从头部开始移除已取消节点
* 条件队列是单链表,所以从头部开始遍历移除
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
// 节点已取消
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
/**
* 完全释放锁
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 完全释放锁
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
await(long nanosTimeout)
可超时
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
// 如果超时时间大于自旋时间,那么阻塞线程;否则,线程自旋
while (!isOnSyncQueue(node)) {
// 已超时,节点从条件队列进入同步队列
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
// 超时时间大于自旋时间,直接阻塞
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
// 剩余超时时间
nanosTimeout = deadline - System.nanoTime();
}
// 自旋获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
/**
* 将节点从条件队列进入同步队列
*/
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
等待条件出现,唤醒条件队列中线程signal
/**
* 条件出现,唤醒条件队列头节点
* 如果当前线程不是持锁线程,抛出异常
* 如果条件队列头节点不为null,唤醒头节点,为null则什么也不错
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 唤醒条件队列头节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
// 第二个等待节点是null,则清空条件队列,lastWaiter、firstWaiter都为空
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
/**
* 唤醒节点,节点由条件队列转入同步队列
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 唤醒
LockSupport.unpark(node.thread);
return true;
}
/**
* 唤醒全部条件队列上的节点
*/
public final void signalAll() {
// 持锁线程不是当前线程,立即抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
// 从头节点开始逐个遍历唤醒
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}