概述
并发编程中,ReentrantLock
的使用是比较多的,包括之前讲的LinkedBlockingQueue
和ArrayBlockQueue
的内部都是使用的ReentrantLock
,谈到它又不能的不说AQS,AQS的全称是AbstractQueuedSynchronizer
,这个类也是在java.util.concurrent.locks
下面,提供了一个FIFO的队列,可以用于构建锁的基础框架,内部通过原子变量state
来表示锁的状态,当state
大于0的时候表示锁被占用,如果state等于0时表示没有占用锁,ReentrantLock
是一个重入锁,表现在state
上,如果持有锁的线程重复获取锁时,它会将state
状态进行递增,也就是获得一个信号量,当释放锁时,同时也是释放了信号量,信号量跟随减少,如果上一个线程还没有完成任务,则会进行入队等待操作。
本文分析内容主要是针对jdk1.8版本
约束:文中图片的ref-xxx代表引用地址
图片中的内容prve更正为prev,由于文章不是一天写的所以有些图片更正了有些没有。
AQS主要字段
/**
* 头节点指针,通过setHead进行修改
*/
private transient volatile Node head;
/**
* 队列的尾指针
*/
private transient volatile Node tail;
/**
* 同步器状态
*/
private volatile int state;
AQS需要子类实现的方法
AQS是提供了并发的框架,它内部提供一种机制,它是基于模板方法的实现,整个类中没有任何一个abstract的抽象方法,取而代之的是,需要子类去实现的那些方法通过一个方法体抛出UnsupportedOperationException异常来让子类知道,告知如果没有实现模板的方法,则直接抛出异常。
方法名 | 方法描述 |
---|---|
tryAcquire | 以独占模式尝试获取锁,独占模式下调用acquire,尝试去设置state的值,如果设置成功则返回,如果设置失败则将当前线程加入到等待队列,直到其他线程唤醒 |
tryRelease | 尝试独占模式下释放状态 |
tryAcquireShared | 尝试在共享模式获得锁,共享模式下调用acquire,尝试去设置state的值,如果设置成功则返回,如果设置失败则将当前线程加入到等待队列,直到其他线程唤醒 |
tryReleaseShared | 尝试共享模式下释放状态 |
isHeldExclusively | 是否是独占模式,表示是否被当前线程占用 |
AQS是基于FIFO队列实现的,那么队列的Node节点又是存放的什么呢?
Node字段信息
字段名 | 类型 | 默认值 | 描述 |
---|---|---|---|
SHARED | Node | new Node() | 一个标识,指示节点使用共享模式等待 |
EXCLUSIVE | Nodel | Null | 一个标识,指示节点使用独占模式等待 |
CANCELLED |
int | 1 | 节点因超时或被中断而取消时设置状态为取消状态 |
SIGNAL |
int | -1 | 当前节点的后节点被park,当前节点释放时,必须调用unpark通知后面节点,当后面节点竞争时,会将前面节点更新为SIGNAL |
CONDITION |
int | -2 | 标识当前节点已经处于等待中,通过条件进行等待的状态 |
PROPAGATE |
int | -3 | 共享模式下释放节点时设置的状态,被标记为当前状态是表示无限传播下去 |
0 |
int | 不属于上面的任何一种状态 | |
waitStatus | int | 0 | 等待状态,默认初始化为0,表示正常同步等待, |
pre | Node | Null | 队列中上一个节点 |
next | Node | Null | 队列中下一个节点 |
thread | Thread | Null | 当前Node操作的线程 |
nextWaiter | Node | Null | 指向下一个处于阻塞的节点 |
通过上面的内容我们可以看到waitStatus其实是有5个状态的,虽然这里面0并不是什么字段,但是他是waitStatus状态的一种,表示不是任何一种类型的字段,上面也讲解了关于AQS中子类实现的方法,AQS提供了独占模式和共享模式两种,但是ReentrantLock
实现的是独占模式的方式,下面来通过源码的方式解析ReentrantLock
。
ReentrantLock源码分析
首先在源码分析之前我们先来看一下ReentrantLock的类的继承关系,如下图所示:
可以看到ReentrantLock
继承自Lock
接口,它提供了一些获取锁和释放锁的方法,以及条件判断的获取的方法,通过实现它来进行锁的控制,它是显示锁,需要显示指定起始位置和终止位置,Lock
接口的方法介绍:
方法名称 | 方法描述 |
---|---|
lock | 用来获取锁,如果锁已被其他线程获取,则进行等待。 |
tryLock | 表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false,也就说这个方法无论如何都会立即返回。在拿不到锁时不会一直在那等待 |
tryLock(long time, TimeUnit unit) | 和tryLock()类似,区别在于它在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true |
lockInterruptibly | 获取锁,如果获取锁失败则进行等到,如果等待的线程被中断会相应中断信息。 |
unlock | 释放锁的操作 |
newCondition | 获取Condition对象,该组件和当前的锁绑定,当前线程只有获得了锁,才能调用该组件wait()方法,而调用后,当前线程释放锁。 |
ReentrantLock也实现了上面接口的内容,前面讲解了很多理论行的内容,接下来我们以一个简单的例子来进行探讨
public class ReentrantLockDemo {
public static void main(String[] args) throws Exception {
AddDemo runnalbeDemo = new AddDemo();
Thread thread = new Thread(runnalbeDemo::add);
thread.start();
Thread thread1 = new Thread(runnalbeDemo::add);
thread1.start();
Thread.sleep(1000);
System.out.println(runnalbeDemo.getCount());
}
private static class AddDemo {
private final AtomicInteger count = new AtomicInteger();
private final ReentrantLock reentrantLock = new ReentrantLock();
private void add() {
try {
reentrantLock.lock();
count.getAndIncrement();
} finally {
// reentrantLock.unlock();
}
}
int getCount() {
return count.get();
}
}
}
- 首先声明内部类AddDemo,AddDemo的主要作用是将原子变量count进行递增的操作
- AddDemo内部声明了ReentrantLock对象进行同步操作
- AddDemo的add方法,进行递增操作,细心地同学发现,使用了lock方法获取锁,但是没有释放锁,这里面没有释放锁可以更让我们清晰的分析内部结构的变化。
- 主线程开启了两个线程进行同步进行递增的操作,最后让线程休眠一会输出累加的最后结果。
ReentrantLock
内部提供了两种AQS的实现,一种公平模式,一种是非公平模式,如果没有特别指定在构造器中,默认是非公平的模式,我们可以看一下无参的构造函数。
public ReentrantLock() {
sync = new NonfairSync();
}
当调用有参构造函数时,指定使用哪种模式来进行操作,参数为布尔类型,如果指定为false的话代表非公平模式,如果指定为true的话代表的是公平模式,如下所示:
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
我们使用的是非公平模式,后面再来进行分析公平模式,上面也讲到了分为两种模式,这两种模式为FairSync
和NonfairSync
两个内部静态类不可变类,不能被继承和实例化,这两个类是我们今天分析的重点,为什么说是重点呢,这里讲的内容是有关于AQS的,而FairSync
和NonfairSync
实现了抽象内部类Sync
,Sync
实现了AbstractQueuedSynchronizer
这个类,这个类就是我们说的AQS也是主要同步操作的类,下面我们来看一下公平模式和非公平模式下类的继承关系,如下图所示:
非公平模式:
公平模式:
通过上面两个继承关系UML来看其实无差别,差别在于内部实现的原理不一样,回到上面例子中使用的是非公平模式,那先以非公平模式来进行分析,
假设第一个线程启动调用AddDemo的add方法时,首先执行的事reentrantLock.lock()
方法,这个lock方法调用了sync.lock()
,sync就是我们上面提到的两种模式的对象,来看一下源码内容:
public void lock() {
sync.lock();
}
内部调用了sync.lock()
,其实是调用了NonfairSync
对象的lock
方法,也就是下面的方法内容。
/**
* 非公平模式锁
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* 执行锁动作,先进行修改状态,如果锁被占用则进行请求申请锁,申请锁失败则将线程放到队列中
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// 继承自AQS的tryAcquire方法,尝试获取锁操作,这个方法会被AQS的acquire调用
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
我们看到lock
方法首先先对state
状态进行修改操作,如果锁没有被占用则获取锁,并设置当前线程独占锁资源,如果尝试获取锁失败了,则进行acqurie
方法的调用,例子中第一个线程当尝试获取锁是内部state
状态为0
,进行修改操作的时候,发现锁并没有被占用,则获得锁,此时我们来看一下内部变化的情况,如下图所示:
此时只是将state
的状态更新为1
,表示锁已经被占用了,独占锁资源的线程是Thread0
,也就是exclusiveOwnerThread
的内容,头节点和尾节点都没有被初始化,当第二个线程尝试去获取锁的时候,发现锁已经被占用了,因为上一个线程并没有释放锁,所以第二线程直接获取锁时获取失败则进入到acquire
方法中,这个方法是AbstractQueuedSynchronizer
中的方法acquire
,先来看一下具体的实现源码如下所示:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我个人理解acquire
方法不间断的尝试获取锁,如果锁没有获取到则现将节点加入到队列中,并将当前线程设置为独占锁资源,也就是独占了锁的意思,别的线程不能拥有锁,然后如果当前节点的前节点是头节点话,再去尝试争抢锁,则设置当前节点为头节点,并将原头节点的下一个节点设置为null,帮助GC回收它,如果不是头节点或争抢锁不成功,则会现将前面节点的状态设置直到设置为SIGNAL
为止,代表下面有节点被等待了等待上一个线程发来的信号,然后就挂起当前线程。
我们接下来慢慢一步一步的分析,我们先来看一下NonfairSync
中的tryAcquire
,如下所示:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
它调用的是他的父类方法,也就是ReentrantLock
下Sync
中的nonfairTryAcquire
方法,这个方法主要就是去申请锁的操作,来看一下具体源码:
final boolean nonfairTryAcquire(int acquires) { //首先是一个被final修饰的方法
final Thread current = Thread.currentThread(); //获取当前线程
int c = getState(); //获取state的状态值
if (c == 0) { //如果状态等于0代表线程没有被占用
if (compareAndSetState(0, acquires)) { //cas修改state值
setExclusiveOwnerThread(current); //设置当前线程为独占模式
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//如果state状态不等于0则先判断是否是当前线程占用锁,如果是则进行下面的流程。
int nextc = c + acquires; //这个地方就说明重入锁的原理,如果拥有锁的是当前线程,则每次获取锁state值都会跟随递增
if (nextc < 0) // overflow //溢出了
throw new Error("Maximum lock count exceeded");
setState(nextc); //直接设置state值就可以不需要CAS
return true;
}
return false; //都不是就返回false
}
通过源码我们可以看到其实他是有三种操作逻辑:
- 如果
state
为0,则代表锁没有被占用,尝试去修改state状态,并且将当前线程设置为独占锁资源,表示获得锁成功 - 如果
state
大于0并且拥有锁的线程和当前申请锁的线程一致,则代表重入了锁,state
值会进行递增,表示获得锁成功 - 如果
state
大于0并且拥有锁的线程和当前申请锁的线程不一致则直接返回false,代表申请锁失败
当第二个线程去争抢锁的时候,state值已经设置为1了也就是已经被第一个线程占用了锁,所以这里它会返回false,而通过acquire
方法内容可以看到if语句中是!tryAcquire(arg)
,也就是!false=ture
,它会进行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
方法,这个方法里面又有一个addWaiter
方法,从方法语义上能看到是添加等待队列的操作,方法的参数代表的是模式,Node.EXCLUSIVE
表示的是在独占模式下等待,我们先来看一下addWaiter
里面是如何进行操作,如下所示:
private Node addWaiter(Node mode) {
//首先生成当前线程拥有的节点
Node node = new Node(Thread.currentThread(), mode);
// 下面的内容是尝试快速进行插入末尾的操作,在没有其他线程同时操作的情况
Node pred = tail; //获取尾节点
if (pred != null) { //尾节点不为空,代表队列不为空
node.prev = pred; //尾节点设置为当前节点的前节点
if (compareAndSetTail(pred, node)) {//修改尾节点为当前节点
pred.next = node; //原尾节点的下一个节点设置为当前节点
return node; //返回node节点
}
}
enq(node); //如果前面入队失败,这里进行循环入队操作,直到入队成功
return node;
}
前面代码中可以看到,它有一个快速入队的操作,如果快速入队失败则进行死循环进行入队操作,当然我们上面例子中发现队列其实是为空的,也就是pred==null,不能进行快速入队操作,则进入到enq
进行入队操作,下面看一下enq
方法实现,如下所示:
private Node enq(final Node node) {
for (;;) { //死循环进行入队操作,直到入队成功
Node t = tail; //获取尾节点
if (t == null) { // Must initialize //判断尾节点为空,则必须先进行初始化
if (compareAndSetHead(new Node()))//生成一个Node,并将当前Node作为头节点
tail = head; //head和tail同时指向上面Node节点
} else {
node.prev = t; //设置入队的当前节点的前节点设置为尾节点
if (compareAndSetTail(t, node)) { //将当前节点设置为尾节点
t.next = node; //修改原有尾节点的下一个节点为当前节点
return t; //返回最新的节点
}
}
}
}
通过上面入队操作,可以清晰的了解入队操作其实就是Node节点的prev节点和next节点之前的引用,运行到这里我们应该能看到入队的状态了,如下图所示:
如上图可以清晰的看到,此时拥有锁的线程是Thread0,而当前线程是Threa1,头节点为初始化的节点,Ref-707
引用地址所在的Node节点操作当前操作的节点信息,入队操作后并没有完成,而是继续往下进行,此时则进行acquireQueued
这个方法,这个方法是不间断的去获取已经入队队列中的前节点的状态,如果前节点的状态为大于0,则代表当前节点被取消了,会一直往前面的节点进行查找,如果节点状态小于0并且不等于SIGNAL
则将其设置为SIGNAL
状态,设置成功后将当前线程挂起,挂起线程后也有可能会反复唤醒挂起操作,原因后面会讲到。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; //取消节点标志位
try {
boolean interrupted = false; //中断标志位
for (;;) {
final Node p = node.predecessor(); //获取前节点
if (p == head && tryAcquire(arg)) { //这里的逻辑是如果前节点为头结点并且获取到锁则进行头结点变换
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //设置waitStatus状态
parkAndCheckInterrupt()) //挂起线程
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); //取消操作
}
}
前面的源码可以看到它在acquireQueued
中对已经入队的节点进行尝试锁的获取,如果锁获得就修改头节点的指针,如果不是头节点或者争抢锁失败时,此时会进入到shouldParkAfterFailedAcquire
方法,这个方法是获取不到锁时需要停止继续无限期等待锁,其实就是内部的操作逻辑也很简单,就是如果前节点状态为0
时,需要将前节点修改为SIGNAL
,如果前节点大于0
则代表前节点已经被取消了,应该移除队列,并将前前节点作为当前节点的前节点,一直循环直到前节点状态修改为SIGNAL
或者前节点被释放锁,当前节点获取到锁停止循环。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 此节点已经设置了状态,要求对当前节点进行挂起操作
*/
return true;
if (ws > 0) {
/*
* 如果前节点被取消,则将取消节点移除队列操作
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus=0或者PROPAGATE时,表示当前节点还没有被挂起停止,需要等待信号来通知节点停止操作。
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
上面的方法其实很容易理解就是等待挂起信号,如果前节点的状态为0或PROPAGATE则将前节点修改为SIGNAL
,则代表后面前节点释放锁后会通知下一个节点,也就是说唤醒下一个可以唤醒的节点继续争抢所资源,如果前节点被取消了那就继续往前寻找不是被取消的节点,这里不会找到前节点为null的情况,因为它默认会有一个空的头结点,也就是上图内容,此时的队列状态是如何的我们看一下,这里它会进来两次,以为我们上图可以看到当前节点前节点是Ref-724
此时waitStatus=0
,他需要先将状态更改为SIGNAL
也就是运行最有一个else语句,此时又会回到外面的for循环中,由于方法返回的是false则不会运行parkAndCheckInterrupt
方法,而是又循环了一次,此时发现当前节点争抢锁又失败了,然后此时队列的状态如下图所示:
再次进入到方法之后发现前驱节点的waitStatus=-1,表示当前节点需要进行挂起等到,此时返回的结果是true,则会运行parkAndCheckInterrupt
方法,这个方法很简单就是将当前线程进行挂起操作,如下所示:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //挂起线程
return Thread.interrupted(); //判断是否被中断,获取中断标识
}
park
挂起线程并且响应中断信息,其实我们从这里就能发现一个问题,Thread.interrupted方法是用来获取是否被中断的标志,如果被中断则返回true,如果没有被中断则返回false,当当前节点被中断后,其实就会返回true,返回true这里并没有结束,而是跳到调用地方,也就是acquireQueued
方法内部:
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
以一个案例来进行分析:
public class ReentrantLockDemo {
public static void main(String[] args) throws Exception {
AddDemo runnalbeDemo = new AddDemo();
Thread thread = new Thread(runnalbeDemo::add);
thread.start();
Thread thread1 = new Thread(runnalbeDemo::add);
thread1.start();
Thread thread2 = new Thread(runnalbeDemo::add);
thread2.start();
Thread.sleep(10000);
thread1.interrupt();
System.out.println(runnalbeDemo.getCount());
}
private static class AddDemo {
private final AtomicInteger count = new AtomicInteger();
private final ReentrantLock reentrantLock = new ReentrantLock();
private final Condition condition = reentrantLock.newCondition();
private void add() {
try {
reentrantLock.lock();
count.getAndIncrement();
} finally {
// reentrantLock.unlock();
}
}
int getCount() {
return count.get();
}
}
}
通过上面的例子可以发现,thread1调用中断方法interrupt(),当调用第一次方法的时候,它会进入到parkAndCheckInterrupt
方法,然后线程响应中断,最后返回true,最后返回到acquireQueued
方法内部,整个if语句为true,则开始设置interrupted=true,仅仅是设置了等于true,但是这离还会进入下一轮的循环,假如说上次的线程没有完成任务,则没有获取到锁,还是会进入到shouldParkAfterFailedAcquire
由于已经修改了上一个节点的waitStatus=-1,直接返回true,然后再进入到parkAndCheckInterrupt
又被挂起线程,但是如果上步骤操作他正抢到锁,则会返回ture,外面也会清除中断标志位,从这里可以清楚地看到acquire
方法是一个不间断获得锁的操作,可能重复阻塞和解除阻塞操作。
上面阻塞队列的内容已经讲完了,接下来我们看一下unlock都为我们做了什么工作:
public void unlock() {
sync.release(1);
}
我们可以看到他直接调用了独占模式的release
方法,看一下具体源码:
public final boolean release(int arg) {
if (tryRelease(arg)) { //调用ReentrantLock中的Sync里面的tryRelease方法
Node h = head; //获取头节点
if (h != null && h.waitStatus != 0) //头节点不为空且状态不为0时进行unpark方法
unparkSuccessor(h); //唤醒下一个未被取消的节点
return true;
}
return false;
}
release方法,首先先进行尝试去释放锁,如果释放锁仍然被占用则直接返回false,如果尝试释放锁时,发现锁已经释放,当前线程不在占用锁资源时,则会进入的下面进行一些列操作后返回true,接下来我们先来看一下ReentrantLock
的Sync
下的tryRelease
方法,如下所示:
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //获取state状态,标志信息减少1
if (Thread.currentThread() != getExclusiveOwnerThread()) //线程不一致抛出异常
throw new IllegalMonitorStateException();
boolean free = false; //是否已经释放锁
if (c == 0) { //state=0时表示锁已经释放
free = true; //将标志free设置为true
setExclusiveOwnerThread(null); //取消独占锁信息
}
setState(c); //设置锁标志信息
return free;
}
看上面的源码,表示首先先获取state
状态,如果state
状态减少1之后和0不相等则代表有重入锁,则表示当前线程还在占用所资源,直到线程释放锁返回ture标识,还是以上例子为主(此时AddDemo
中的unlock
不在被注释),分析其现在的队列中的状态
释放锁后,进入到if语句中,判断当前头节点不为空且waitStatus!=0
,通过上图也可以发现头节点为-1,则进入到unparkSuccessor
方法内:
private void unparkSuccessor(Node node) {
/*
* 获取节点的waitStatus状态
*/
int ws = node.waitStatus;
// 如果小于0则设置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 唤醒下一个节点,唤醒下一个节点之前需要判断节点是否存在或已经被取消了节点,如果没有节点则不需唤醒操作,如果下一个节点被取消了则一直一个没有被取消的节点。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
可以看到它是现将头节点的状态更新为0,然后再唤醒下一个节点,如果下一个节点为空则直接返回不唤醒任何节点,如果下一个节点被取消了,那么它会从尾节点往前进行遍历,遍历与头节点最近的没有被取消的节点进行唤醒操作,在唤醒前看一下队列状态:
然后唤醒节点后他会进入到parkAndCheckInterrupt
方法里面,再次去执行下面的方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; //取消节点标志位
try {
boolean interrupted = false; //中断标志位
for (;;) {
final Node p = node.predecessor(); //获取前节点
if (p == head && tryAcquire(arg)) { //这里的逻辑是如果前节点为头结点并且获取到锁则进行头结点变换
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //设置waitStatus状态
parkAndCheckInterrupt()) //挂起线程
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); //取消操作
}
}
此时获取p==head成立,并且可以正抢到所资源,所以它会进入到循环体内,进行设置头结点为当前节点,前节点的下一个节点设置为null,返回中断标志,看一下此时队列情况,如下图所示:
AbstractQueuedSynchronizer
的独占模式其实提供了三种不同的形式进行获取锁操作,看一下下表所示:
方法名称 | 方法描述 | 对应调用的内部方法 |
---|---|---|
acquire | 以独占模式进行不间断的获取锁 | tryAcquire,acquireQueued |
acquireInterruptibly | 以独占模式相应中断的方式获取锁,发生中断抛出异常 | tryAcquire,doAcquireInterruptibly |
tryAcquireNanos | 以独占模式相应中断的方式并且在指定时间内获取锁,会阻塞一段时间,如果还未获得锁直接返回,发生中断抛出异常 | tryAcquire,doAcquireNanos |
通过上面图可以发现,他都会调用图表一中需要用户实现的方法,ReentrantLock
实现了独占模式则内部实现的是tryAcquire
和tryRelease
方法,用来尝试获取锁和尝试释放锁的操作,其实上面内容我们用的是ReentrantLock
中的lock
方法作为同步器,细心的朋友会发现,这个lock
,方法是ReentrantLock实现的,它内部调用了acquire
方法,实现了不间断的获取锁机制,ReentrantLock
中还有一个lockInterruptibly
方法,它内部直接调用的是AbstractQueuedSynchronizer
的acquireInterruptibly
方法,两个之间的区别在于,两者都会相应中断信息,前者不会做任何处理还会进入等待状态,而后者则抛出异常终止操作,
这里为了详细看清楚它内部关系我这里用张图来进行阐述,如下所示:
- 左侧代表的事ReentrantLock,右侧代表的AQS
- 左侧内部黄色区域代表
NonfairSync
- 图中1和2代表AQS调用其他方法的过程
接下来我们来看一下源码信息:
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
发现他调用的Sync
类中的acquireInterruptibly
方法,但其实这个方法是AQS中的方法,源码如下所示:
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) //判断线程是否被中断
throw new InterruptedException(); //中断则抛出异常
if (!tryAcquire(arg)) //尝试获取锁
doAcquireInterruptibly(arg); //进行添加队列,并且修改前置节点状态,且响应中断抛出异常
}
通过上面的源码,它也调用了子类实现的tryAcquire
方法,这个方法和我们上文提到的tryAcquire
是一样,ReentrantLock
下的NonfairSync
下的tryAcquire
方法,这里这个方法就不多说了详细请看上文内容,这里主要讲一下doAcquireInterruptibly
这个方法:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE); //将节点添加到队列尾部
boolean failed = true; //失败匹配机制
try {
for (;;) {
final Node p = node.predecessor(); //获取前节点
if (p == head && tryAcquire(arg)) { //如果前节点为头节点并且获得了锁
setHead(node); //设置当前节点为头节点
p.next = null; // help GC //头节点的下一个节点设置为null
failed = false; //匹配失败变为false
return;
}
if (shouldParkAfterFailedAcquire(p, node) && //将前节点设置为-1,如果前节点为取消节点则往前一直寻找直到修改为-1为止。
parkAndCheckInterrupt()) //挂起线程返回是否中断
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
其实这个方法和acquireQueued
区别在于以下几点:
acquireQueued
是在方法内部添加节点到队列尾部,而doAcquireInterruptibly
是在方法内部进行添加节点到尾部,这个区别点并不是很重要- 重点是
acquireQueued
响应中断,但是他不会抛出异常,而后者会抛出异常throw new InterruptedException()
分析到这里我们来用前面的例子来进行模拟一下中中断的操作,详细代码如下所示:
public class ReentrantLockDemo {
public static void main(String[] args) throws Exception {
AddDemo runnalbeDemo = new AddDemo();
Thread thread = new Thread(runnalbeDemo::add);
thread.start();
Thread.sleep(500);
Thread thread1 = new Thread(runnalbeDemo::add);
thread1.start();
Thread.sleep(500);
Thread thread2 = new Thread(runnalbeDemo::add);
thread2.start();
Thread.sleep(500);
Thread thread3 = new Thread(runnalbeDemo::add);
thread3.start();
Thread.sleep(10000);
thread1.interrupt();
System.out.println(runnalbeDemo.getCount());
}
private static class AddDemo {
private final AtomicInteger count = new AtomicInteger();
private final ReentrantLock reentrantLock = new ReentrantLock();
private final Condition condition = reentrantLock.newCondition();
private void add() {
try {
reentrantLock.lockInterruptibly();
count.getAndIncrement();
} catch (Exception ex) {
System.out.println("线程被中断了");
} finally {
// reentrantLock.unlock();
}
}
int getCount() {
return count.get();
}
}
}
上面的例子其实和前面提到的例子没有什么太大的差别主要的差别是将lock
替换为lockInterruptibly
,其次就是在三个线程后面讲线程1进行中断操作,这里入队的操作不在多说,因为操作内容和上面大致相同,下面是四个个线程操作完成的状态信息:
如果线程等待的过程中抛出异常,则当前线程进入到finally中的时候failed为true,因为修改该字段只有获取到锁的时候才会修改为false,进来之后它会运行cancelAcquire
来进行取消当前节点,下面我们先来分析下源码内容:
private void cancelAcquire(Node node) {
// 如果节点为空直接返回,节点不存在直接返回
if (node == null)
return;
// 设置节点所在的线程为空,清除线程操作
node.thread = null;
// 获取当前节点的前节点
Node pred = node.prev;
// 如果前节点是取消节点则跳过前节点,一直寻找一个不是取消节点为止
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取头节点下一个节点
Node predNext = pred.next;
// 这里直接设置为取消节点状态,没有使用CAS原因是因为直接设置只有其他线程可以跳过取消的节点
node.waitStatus = Node.CANCELLED;
// 如果当前节点为尾节点,并且设置尾节点为找到的合适的前节点时,修改前节点的下一个节点为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 如果不是尾节点,则说明是中间节点,则需要通知后续节点,嘿,伙计你被唤醒了。
int ws;
if (pred != head && //前节点不是头结点
((ws = pred.waitStatus) == Node.SIGNAL || // 前节点的状态为SIGNAL
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) //或者前节点状态小于0而且修改前节点状态为SIGNAL成功
&& pred.thread != null) { //前节点线程不为空
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//唤醒下一个不是取消的节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
- 首先找到当前节点的前节点,如果前节点为取消节点则一直往前寻找一个节点。
- 取消的是尾节点,则直接将前节点的下一个节点设置为null
- 如果取消的是头节点的下一个节点,且不是尾节点的情况时,它是唤醒下一个节点,唤醒之前并没有将其移除队列,而是在唤醒下一个节点的时候,
shouldParkAfterFailedAcquire
里面将取消的节点移除队列,唤醒之后,当前节点的下一个节点也设置成自己,帮助GC回收它。 - 如果取消节点是中间的节点,则直接将其前节点的下一个节点设置为取消节点的下下个节点即可。
第一种情况如果我们取消的节点是前节点是头节点,此时线程1的节点应该是被中断操作,此时进入到cancelAcquire
之后会进入else语句中,然后进去到unparkSuccessor
方法,当进入到这个方法之前我们看一下状态变化:
我们发现线程1的Node节点的waitStatus变为1也就是Node.CANCELLED
节点,然后运行unparkSuccessor
方法,该方法上面就已经讲述了其中的源码,这里就不在贴源码了,就是要唤醒下一个没有被取消的节点,这里是Ref-695
这个线程,当Ref-695
被唤醒之后它会继续运行下面的内容:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //再一次循环发现还是没有争抢到锁
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) && //再一次循环之后有运行到这里了
parkAndCheckInterrupt()) //这里被唤醒了,又要进行循环操作了
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
发现再一次循环操作后,还是没有正抢到锁,这时候还是会运行shouldParkAfterFailedAcquire
方法,这个方法内部发现前节点的状态是Node.CANCELLED
这时候它会在内部先将节点给干掉,也就是这个代码:
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
最后还是会被挂起状态,因为没有释放锁操作,最后移除的节点如下所示:
如果取消的事尾节点,也就是线程3被中断操作,这个是比较简单的直接将尾节点删除即可,其中会走如下代码:
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
}
如果取消的节点是中间的节点,通过上例子中则是取消线程2,其实它内部只是将线程取消线程的前节点的下一个节点指向了取消节点的下节点,如下图所示:
结束语
这章节分析的主要是ReentrantLock
的内部原理,本来公平模式和非公平模式想放在一起来写,无奈发现篇幅有点长了,所以就分开进行写,这样读取来不会那么费劲,内部还有条件内容等待下章节分析,如果有分析不到位的请大家指正。