AbstractQueuedSynchronizer简称(AQS)是同步组件的基础框架,可以理解为是一个模版类,AQS以模版方法定义获取和释放同步状态的语义,并且通过队列管理获取同步状态失败的线程。本篇通过解析ReentrantLock重入锁解析AbstractQueuedSynchronizer(AQS)。
一、AQS主要内部结构
1. 同步状态变量
/**
* The synchronization state.
*/
private volatile int state;
volatile类型对int值,类似于计数器,在不同的同步组件中有不同含义,在ReentrantLock中,当state为0时说明锁不被任何线程持有,大于0时表示锁被同一个线程重入了state次,因为可能会出现并发的情况,所以使用volatile修饰。
2. 同步等待队列
AQS中的等待队列是一个带有头尾节点的双向链表。
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;//指向队首节点的指针
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;//指向队尾节点的指针
static final class Node {
//等待状态
volatile int waitStatus;
//前一个节点
volatile Node prev;
//后一个节点
volatile Node next;
/**
* 当前节点代表的线程
* 0:初始化状态;
* -1(SIGNAL):当前结点表示的线程在释放锁后需要唤醒后续节点的线程;
* 1(CANCELLED):在同步队列中等待的线程等待超时或者被中断,取消继续等待;
*/
volatile Thread thread;
}
同步等待队列特性:
- 先进先出队列,获取锁失败的队列将构造节点并加入队列的尾部
- 队列的首节点可以表示正在获取锁的线程
- 当前线程释放锁之后将尝试唤醒后续处于阻塞状态的线程
3. 持有同步状态的线程的标示
private transient Thread exclusiveOwnerThread;
继承自父类AOS的属性,用来标示锁被哪个线程持有。
二、ReentrantLock重入锁
重入的意思是线程1执行ReentrantLock.lock()方法获取到锁之后再次执行lock()方法获取锁,线程不会阻塞,而是会增加重入次数。
//demo
public class Main {
ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
new Main().m1();
}
void m1(){
lock.lock();
System.out.println("m1 running!");
m2();
lock.unlock();
}
void m2(){
lock.lock();
System.out.println("m2 running!");
lock.unlock();
}
}
执行结果
m1 running!
m2 running!
可以看到同一个线程两次获取同一个锁时没有发生阻塞。
ReentrantLock源码分析
1. ReentrantLock()构造函数
我们一般通过new ReentrantLock()构造一个重入锁,ReentrantLock有一下两种构造方法:
//无参构造函数,默认为非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//bool类型入参,定义是否为公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
可以看到关键的两个类是Sync和NonfairSync,并且NonfairSync是Sync的子类,接下来我们深入了解。
1.1 Sync
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();
final boolean nonfairTryAcquire(int acquires) {
...
}
protected final boolean tryRelease(int releases) {
...
}
}
可以看到sync是ReentrantLock的一个属性,并且继承了AQS。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
}
AQS继承了AbstractOwnableSynchronizer(AOS)。
1.2 Sync子类(FairSync/NonfairSync)
public class ReentrantLock implements Lock, java.io.Serializable {
...
static final class NonfairSync extends Sync {
final void lock() {
...
}
protected final boolean tryAcquire(int acquires) {
...
}
}
NonfairSync是ReentrantLock的内部类,并且继承了Sync接口。
同样继承了Sync接口的还有FairSync:
public class ReentrantLock implements Lock, java.io.Serializable {
static final class FairSync extends Sync {
final void lock() {
...
}
protected final boolean tryAcquire(int acquires) {
...
}
}
}
分析到这里我们可以总结出以下几点:
- FairSync、NonfairSync是Sync的子类
- Sync继承AQS,AQS继承AOS
- Sync、FairSync、NonfairSync是ReentrantLock的内部类
得到以下UML图
2. lock()方法
通过执行lock()方法加锁:
public void lock() {
sync.lock();
}
2.1 非公平锁加锁
以下是NonfairSync的lock()方法源码:
final void lock() {
if (compareAndSetState(0, 1))
//获取锁成功
setExclusiveOwnerThread(Thread.currentThread());
else
//获取锁失败
acquire(1);
}
可以看到首先调用了父类AQS的compareAndSetState方法,也就是我们平时说的CAS,源码如下:
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
其中,unsafe是sun.misc包下的原子操作类,stateOffset是state在内存中的偏移量,可以间接拿到state的值,expect是预期的state的值,这个方法的含义是:如果state=expect,将state的值更新为update,并且返回true,否则返回false。
unsafe.compareAndSwapInt保证了原子性,就算有多个线程同时执行这个方法,也只有一个线程能成功,也就是将状态从0置为1,即成功获取锁。
状态更新成功后,执行AOS下的setExclusiveOwnerThread方法,具体实现如下:
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
可以看出获取锁成功后,将当前线程标记为持有锁的线程。
如果compareAndSetState执行结果为false,也就是获取锁失败,接下来执行acquire(1),acquire源码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
在该方法中,主要的逻辑都在if的判断条件中,接下来,我们要对tryAcquire、addWaiter、acquireQueued这三个方法进行深入解析。
2.1.1 尝试获取锁 tryAcquire()
tryAcquire是AQS中定义的钩子方法,如下:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
方法默认会抛出异常,强制同步组件通过扩展AQS实现同步功能时必须重写该方法,ReentrantLock在非公平模式下对该方法的实现如下:
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
调用了nonfairTryAcquire()方法,具体实现以及说明如下:
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程的实例
final Thread current = Thread.currentThread();
//获取当前锁被重入的次数
int c = getState();
if (c == 0) {
//如果state为0,说明当前锁未被任何线程持有,尝试以CAS方式获取锁
if (compareAndSetState(0, acquires)) {
//获取锁成功,将当前线程标记为持有锁的线程
setExclusiveOwnerThread(current);
//获取锁成功,并且非重入
return true;
}
}
//如果当前线程是持有锁的线程,说明锁被重入
else if (current == getExclusiveOwnerThread()) {
//计算锁被重入的次数,因为当前线程已经获取了锁,所以这个累加操作不用同步
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//更新state
setState(nextc);
//获取锁成功,重入
return true;
}
//当前锁被其他线程持有,获取锁失败
return false;
}
我们回退到上一步的acquire方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如果tryAcquire返回true,也就是获取锁成功,方法将直接返回,如果失败将执行后半部分的判断逻辑,我们先看addWaiter(Node.EXCLUSIVE)。
2.1.2 获取锁失败的线程加入同步队列 addWaiter()
具体实现:
private Node addWaiter(Node mode) {
//创造一个新节点,将当前线程实例封装在内部,此处mode为null
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
因为中间部分代码冗余,直接进入enq()方法,具体实现:
private Node enq(final Node node) {
//for循环确保所有获取锁失败的线程都能够加入同步队列
for (;;) {
//t指向队尾节点,如果队列为空,那么t=tail=NULL
Node t = tail;
if (t == null) { // Must initialize //队列为空
//CAS方式设置新节点为队首节点,当队首节点为NULL时设置成功
if (compareAndSetHead(new Node()))
//当前队列只有一个节点,所以队尾节点=队首节点
tail = head;
} else { //队列不为空
//将当前节点放置队尾
node.prev = t;
//CAS方式设置队尾指针指向当前节点,当t(获取到的队尾指针)=tail(实际的队尾指针)时,设置成功
if (compareAndSetTail(t, node)) {
//尾节点的next指针指向当前节点
t.next = node;
return t;
}
}
}
}
外层的for循环确保所有获取锁失败的线程都能够加入同步队列,当同步队列为空时,先插入一个空节点作为队首节点,这是由于当前线程所在的节点不能插入空队列,因为阻塞的线程是由前驱节点唤醒的。整个过程如下图所示:
接下来我们考虑一下线程安全的问题:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // step:1
if (compareAndSetHead(new Node())) // step:2
tail = head; // step:3
} else {
node.prev = t; // step:4
if (compareAndSetTail(t, node)) { // step:5
t.next = node; // step:6
return t;
}
}
}
}
- 当同步队列为空时,step:1通过,假设step:2执行成功,插入了一个节点为队首节点,也就是head不为null,如果在step:3执行前有其他线程进入该方法,compareAndSetHead将会一直返回false,所以step:3不做同步处理也不会有线程安全的问题。
- 当同步队列不为null时,假设step:5执行成功,则可以确认step:4的操作是正确的,也就意味着我们获取到的队尾指针就是实际的队尾指针,此时 step:5已经将队尾指针指向当前节点,其他线程执行step:5将返回false,所以step:6何时执行也不会有线程安全的问题。
2.1.3 线程加入同步队列后 acquireQueued()
源码如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//死循环,确保线程只有获取锁才能跳出循环
for (;;) {
//获取当前节点的前一个节点
final Node p = node.predecessor();
//如果前一个节点是头节点,并且当前线程成功获取锁
//说明:前一个节点是头节点,头节点表示当前正在占有锁的线程,占有锁的线程释放锁之后,会通知后续的阻塞节点,阻塞节点被唤醒后尝试获取锁
if (p == head && tryAcquire(arg)) {
//将当前节点设置为头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //判断是否要阻塞当前线程
parkAndCheckInterrupt()) //阻塞当前线程
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
获取锁失败后,执行shouldParkAfterFailedAcquire()方法判断是否要阻塞当前线程,源码如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
可以看到针对前驱节点的不同状态会进行不同的处理
- pred状态为SIGNAL时,返回true,表示要阻塞当前线程
- pred状态为CANCELLED时,说明前驱节点取消了等待,这种情况下不应该获取锁,而是向前回溯到一个没有取消等待的节点,然后将自身连接在这个节点后面
- pred的状态为初始化状态,则通过CAS将状态置为SIGNAL
接下来我们看一下实际阻塞线程的方法parkAndCheckInterrupt():
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
通过调用LockSupport.park()阻塞当前线程,此处不过多展开。
线程加入同步队列到成功获取锁的过程如下:
ReentrantLock非公平锁整体加锁流程如下:
2.2 非公平锁解锁
ReentrantLock解锁方法源码:
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//释放锁,如果锁没有被重入,则可以被其他线程获取,返回true
if (tryRelease(arg)) {
Node h = head;
//如果头节点不为空(队列不为空)并且头节点不是初始状态
if (h != null && h.waitStatus != 0)
//唤醒队列中被阻塞的线程
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//重新计算state的值
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果state为0,说明释放锁的线程没有重入,释放锁后其他线程可以获取
if (c == 0) {
free = true;
//清除持有锁的线程标示
setExclusiveOwnerThread(null);
}
//重置state
setState(c);
return free;
}
ReentrantLock整体解锁流程如下:
2.3 公平锁与非公平锁的区别
公平锁的lock方法源码:
final void lock() {
acquire(1);
}
可以看出,公平锁的模式下,所有线程获取锁之前必须先进入同步队列,按照先进先出(FIFO)的顺序获取锁,对比非公平锁,少了非重入式获取锁的方法。
接下来看公平锁的获取锁方法tryAcquire():
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//判断是否有优先级更高的节点
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
可以看到在CAS获取锁之前加了判断是否有优先级更高的节点hasQueuedPredecessors(),实现如下:
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
//判断队列中是否有第二个节点,并且第二个节点的线程是不是当前线程
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
因为头节点代表当前占有锁的线程,释放锁之后唤醒下一个节点,所以第二个节点是优先级最高的节点。
2.4 总结
公平锁和非公平锁都是基于FIFO的同步队列,获得锁的顺序和加入队列的顺序一致,看起来是一种公平的模式,然而,非公平锁的非公平之处在于,线程并非加入同步队列时才有机会获取锁,回顾一下非公平锁的加锁流程,在进入同步队列前有两次加锁的机会:
- 当前锁未被任何线程加锁,非重入式获取锁
- 进入同步队列前,包含所有情况的获取锁方式
这两次获取锁都失败后,才会进入到阻塞队列,我们设想一种情况,正在占有锁的线程A释放锁,但是还没来得及唤醒后继线程B,此时线程C尝试获取锁,并且获取锁成功,此时线程B被唤醒,尝试获取锁但是获取失败,只能在同步队列里继续等待,在线程竞争激烈的情况下,等待队列中的线程可能迟迟获取不到锁,所以说这是非公平的。
那么既然不公平为什么大多数情况下我们都用非公平锁呢?因为非公平锁相较公平锁性能高,体现在两个方面:
- 不必加入队列就可以获取锁,免去了出队入队、构造节点的操作,同时节省了唤醒线程的开销
- 减少CAS竞争
参考博客: