一 AQS介绍与简单应用
1-1 概述
定义:AbstractQueuedSynchronizer(抽象队列同步器) ,是阻塞式锁和相关同步器工具的框架(其他的同步器都是基于AQS,调用AQS的方法)
特点:
1)用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
getState | setState | compareAndSetState | |
---|---|---|---|
获取 state 状态 | 设置 state 状态 | 设置 state 状态(利用CAS机制保证多线程情况下,state变量操作的原子性) |
2)提供了基于 FIFO 的等待队列
3)条件变量来实现等待、唤醒机制,支持多个条件变量。
AQS子类需要实现的方法
tryAcquire // 获取锁
tryRelease // 释放所
tryAcquireShared // 共享锁模式下获取锁
tryReleaseShared // 共享锁模式下释放锁
isHeldExclusively // Returns true if synchronization is held exclusively with respect to the current (calling) thread.(判断同步器是否被调用线程独占即判断锁是否被当前线程独占)
使用方式
// 如果获取锁失败
if (!tryAcquire(arg)) {
// 入队, 可以选择阻塞当前线程 park unpark
}
// 如果释放锁成功
if (tryRelease(arg)) {
// 让阻塞线程恢复运行
}
1-2 利用AQS实现自定义的独占不可重入锁
代码
package chapter8;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
// 自定义锁,不可重入锁
class MyLock implements Lock {
// 独占锁
// 借助AQS定义用于定义锁的同步器
class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
// 利用CAS以及volatile的state
if(compareAndSetState(0,1)){
// 加上锁,并设置锁的ownner为当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
/*值得注意的是这里将setState放置于setExclusiveOwnerThread后面保证线程拥有者修改的
* 可见性,由于state是volatile修饰,使用写屏障之前写入的可见性。
* */
setExclusiveOwnerThread(null); /*设置线程的owner为null*/
setState(0); /*设置状态为0,即解锁*/
return true;
}
@Override // 是否持有独占锁
protected boolean isHeldExclusively() {
return getState() == 1;
}
public Condition newCondition(){
return new ConditionObject();
}
}
// 借助提供的AQS重写下面的方法
private MySync sync = new MySync();
@Override // 加锁,失败会进入阻塞队列等待
public void lock() {
sync.acquire(1);
}
@Override // 加锁,可以被打断的锁
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override // 尝试加锁(1次)
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override // 带超时的加锁
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock() { // 调用tryrelease后,并唤醒阻塞队列上的线程
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
@Slf4j(topic = "c.test12")
public class test16 {
public static void main(String[] args) {
MyLock lock = new MyLock();
new Thread(()->{
lock.lock();
try{
log.warn("t1 locking");
}finally {
log.warn("t1 unlocking");
lock.unlock();
}
},"t1").start();
new Thread(()->{
lock.lock();
try{
log.warn("t2 locking");
}finally {
log.warn("t2 unlocking");
lock.unlock();
}
},"t2").start();
}
}
1-2-1 锁的独占性测试
[t1] WARN c.test12 - t1 locking
[t1] WARN c.test12 - t1 unlocking
[t2] WARN c.test12 - t2 locking
[t2] WARN c.test12 - t2 unlocking
总结:可以看到线程t2必须在线程t1解锁后才能获得锁。
1-2-1 锁的不可重入性测试
@Slf4j(topic = "c.test12")
public class test16 {
public static void main(String[] args) {
MyLock lock = new MyLock();
new Thread(()->{
lock.lock();
log.warn("first lock");
lock.lock(); // 第二次进入锁
log.warn("second lock");
try{
log.warn("t1 locking");
}finally {
log.warn("t1 unlocking");
lock.unlock();
}
},"t1").start();
}
}
执行结果
[t1] WARN c.test12 - first lock
总结:上面代码中第二次加锁导致线程被阻塞,所以该锁是不可重入的。值得注意的是synchronized与Reentrantlock都是可重入锁。在上面的锁的实现过程中AQS提供了锁定义所必须的属性以及方法。
1-3 AQS的设计思想
AQS 要实现的功能目标
- 阻塞版本获取锁 acquire 和非阻塞的版本尝试获取锁 tryAcquire
- 获取锁超时机制
- 通过打断取消机制
- 独占机制及共享机制
- 条件不满足时的等待机制
- 性能好:这里的性能是指
the primary performance goal here is scalability: to predictably maintain efficiency even, orespecially, when synchronizers are contended.
具体的逻辑实现
获取锁的逻辑
while(state 状态不允许获取) {
if(队列中还没有此线程) {
入队并阻塞
}
}
当前线程出队
释放锁的逻辑
if(state 状态允许了) {
恢复阻塞的线程(s)
}
设计要点
1)原子维护 state 状态。 2)阻塞及恢复线程。 3)维护队列
state设计
state 使用 volatile 配合 cas 保证其修改时的原子性
state 使用了 32bit int 来维护同步状态,因为当时使用 long 在很多平台下测试的结果并不理想
// 可以看到JDK8.0中AQS的state确实是int,采用CAS机制确保变量修改的原子性
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this,xia stateOffset, expect, update);
}
2)阻塞恢复设计
早期的调用次序的问题:早期的控制线程暂停和恢复的 api 有 suspend 和 resume,但它们是不可用的,因为如果先调用的 resume那么 suspend 将感知不到。
解决方法:使用 park & unpark 来实现线程的暂停和恢复先 unpark 再 park 也没
问题,此外park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细park 线程还可以通过 interrupt 打断。
04 JAVA中park/unpark的原理以及JAVA在API层面线程状态总结
3)队列设计
等待队列的节点定义(是CLH队列的variant)源码
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
volatile Node prev; // AQS阻塞链表的前驱节点
volatile Node next; // AQS阻塞链表的后驱节点
volatile Thread thread; // 线程对象
Node nextWaiter; // 当Node被放入到条件对象的waitset需要用到这个属性
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
使用了 FIFO 先入先出队列,并不支持优先级队列,AQS等待队列设计时借鉴了 CLH 队列,它是一种单向无锁队列
队列的维护:队列中有 head 和 tail 两个指针节点,都用 volatile 修饰配合 cas修改
使用CLH队列的优点:无锁,使用自旋,快速,无阻塞
总结: 源码定义的Node包含了线程对象Thread,Node的状态,双向链表所需的前向节点指针,后向节点指针,
1-4 AQS的相关的JUC类
ReentrantReadWriteLock
ReentrantLock
Semaphore
二 ReentrantLock的原理
2-1 概述
从上面的类图中可以看到
1)ReentrantLock实现了lock接口。
2)ReentrantLock内部属性类继承了AQS来实现同步。
- 注意:Sync是抽象类,在此基础上实现了NonfairSync与FairSync同步器,分别对应了ReentrantLock的公平锁与非公平锁。
2-2 Reentrantlock的非公平锁(NonfairSyn类)的实现原理
源码分析
package chapter8;
import java.util.concurrent.locks.ReentrantLock;
public class test17 {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
}
}
ReentrantLock()构造函数源码
public ReentrantLock() {
sync = new NonfairSync();
}
NonfairSyn类源码
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
/*
lock方法中加锁会做二件事情:1)利用CAS设置锁的状态变量 2)将锁的拥有者设为自己。
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
Reentrant类的节点类状态相关的重要属性
无论是AQS等待队列中的节点与Condition对象的条件等待队列中的节点对象是同一节点对象(见1-3中Node的源码)。其中waitStatus用于区分节点的状态。
- waitStatus = 1:代表该节点所关联的线程已经被取消锁的竞争(超时或者打断的原因),这个节点不应该存在于AQS等待队列中的节点或者Condition对象的条件等待队列,应该被释放。
- waitStatus = -1:该节点位于AQS等待队列中,并且不是尾节点,该节点在退出锁的时候有义务唤醒其后续节点的关联进程
- waitStatus = -2:该节点位于Condition对象的条件等待队列中
volatile int waitStatus; /
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
2-2-1 非公平锁的lock()原理
final void lock() {
if (compareAndSetState(0, 1)) /*情况1:加锁成功*/
setExclusiveOwnerThread(Thread.currentThread());
else /*情况2:加锁失败*/
acquire(1);
}
情况1:当加锁的时候没有其他线程竞争时
- 利用CAS设置锁的状态变量为1,表示这个锁已经被该线程占有
- 设置线程的拥有者属性为该线程。
情况2:当加锁的时发现其他线程竞争,加锁失败。
此时会调用acquire(1),其源码如下:
/*
tryAcquire:尝试arg次加锁,如果加锁失败返回false,如果是非公平锁,调用的实际上是nonfairTryAcquire
acquireQueued(addWaiter(Node.EXCLUSIVE):创建节点对象并将其加入到等待队列中
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/*
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
上图的流程如下:
step1: CAS 尝试将 state 由 0 改为 1,结果失败
step2:调用acquire(1)进入tryAcquire 逻辑
-
state 仍然时1,则加锁失败
-
staet为0,则加锁成功。
step3:再次尝试加锁失败进入 addWaiter 逻辑,构造 Node并加入wait 队列,这个队列采用双向链表实现并由NonfairSyn类进行维护。注意此时虽然将线程加入到队列,单线程还没有停止运行。
addWaiter源码(该方法由AQS提供)
private Node addWaiter(Node mode) {
// 为当前线程创建新的节点并将节点插入到队列,最后返回创建节点。
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
step4: 然后进入 acquireQueued 逻辑,其基本逻辑:死循环中不断尝试获得锁,失败后park让线程停止运行。
死循环具体流程:
- 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
- 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时state 仍为 1,失败
- 再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回
true - 进入 parkAndCheckInterrupt。线程调用JUC的park停止运行(图中灰色表示停止运行)
- 前驱节点-1表明该节点有责任唤醒其后驱节点。
总结:这里的死循环保证了线程直到获得锁才能够退出循环,由于死循环是对CPU资源的浪费,因此这里的负责逻辑总体上就是让线程在获取锁失败后再挣扎几下,如果还是失败了,那么则主动park,等待队列中前一个节点唤醒后再竞争锁。
acquireQueued源码(死循环中尝试获取锁,获得锁后退出循环,AQS父类的继承过来的)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { // 用于获取锁的死循环
/*获取当前节点的先驱节点*/
final Node p = node.predecessor();
/*如果先驱节点时头节点,那么意味着当前线程是第二个加锁的线程
因此这个if里面会让该线程再次尝试加锁,
*/
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// shouldParkAfterFailedAcquire:判断当前线程是否应该阻塞
// 调用JUC的park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed) // 最终确定获取锁失败
cancelAcquire(node);
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
lock方法的总结:
1)竞争成功,直接修改state并设置锁的线程拥有者为自己。
2)竞争失败,则会创建Node关联该线程,将Node加入到AQS的等待队列尾部,然后会进入到一个死循环中,在这个循环可能再次尝试加锁仍然失败的话,主动park,等待其前驱节点唤醒再去竞争锁。
2-2-2 非公平锁的unlock()原理
多个线程加锁失败等待情况下锁被释放。
上图中Thread0长期霸占锁,后续的Thread1-3都进入队列中等待,当线程0释放锁后:
step1:进入 tryRelease 流程,如果成功, 设置 exclusiveOwnerThread 为 null state = 0
/*位于ReentrantLock.java中*/
public void unlock() {
sync.release(1);
}
/*位于AbstractQueuedSynchronizer.java中*/
public final boolean release(int arg) {
// 尝试解锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
/*位于ReentrantLock.java的Sync extends AbstractQueuedSynchronizer*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
step2:当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程找到AQS等待队列中离 head 最近的一个 Node(关联线程需要竞争锁),unpark 恢复其运行,本例中即为 Thread-1
step3: 回到 Thread-1 的 acquireQueued 流程 (见2-2-1 step4)
- 加锁成功(没有竞争),会设置 exclusiveOwnerThread 为 Thread-1,state = 1,head 指向刚刚 Thread-1 所在的 Node,原本的 head 因为从链表断开,而可被垃圾回收
非公平性如何体现(如下图)?
- 如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来,如果被 Thread-4 抢先获得锁,那么
- Thread-4 被设置为 exclusiveOwnerThread,state = 1
- Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
- 如果是公平锁的话,对于新来的竞争线程,会先查看AQS等待队列是否为空,如果不为空则不会直接去竞争锁。
unlock总结:主要就是修改锁的state为0并将锁的拥有者设置为null,然后从AQS等待队列中唤醒(unpark)一个新的线程去竞争锁。
2-2-3 非公平锁的加锁与解锁的源码汇总
- 涉及2个文件:ReentrantLock.java和AbstractQueuedSynchronizer.java
加锁源码
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
// 这个是从AQS类中继承过来的方法,其中tryAcquire(arg)需要我们子类自己实现,在
// 这里调用nonfairTryAcquire(acquires)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果还没有获得锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 获取失败, 回到调用处
return false;
}
/*===============================下面方法都是继承AQS的源码==========================*/
private Node addWaiter(Node mode) {
// 为当前线程创建新的节点并将节点插入到队列,最后返回创建节点。
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { // 用于获取锁的死循环
/*获取当前节点的先驱节点*/
final Node p = node.predecessor();
/*如果先驱节点时头节点,那么意味着当前线程是第二个加锁的线程
因此这个if里面会让该线程再次尝试加锁,
*/
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// shouldParkAfterFailedAcquire:判断当前线程是否应该阻塞
// 调用JUC的park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed) // 最终确定获取锁失败
cancelAcquire(node);
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
解锁源码
public void unlock() {
sync.release(1);
}
/*======================下面都是AQS父类继承的代码========================*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
2-3 锁的可重入原理(reentrantlock可重入锁)
本质:通过state实现
- 不可重入锁state仅需要0和1两种状态表示即可,0表示没有加锁,1表示已经加锁
- 可重入锁则是0表示没有加锁,>0表示加锁的次数。
实例:以非公平锁为例
可重入锁的的加锁源码
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
// acquires每次都是1
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取锁的状态
int c = getState();
// 还没有加锁,设置锁的状态为1,并设置锁的拥有者为当前线程
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} // 已经加锁,且调用线程就是锁的拥有者,代表锁重入,此时state++
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
可重入锁的解锁源码
protected final boolean tryRelease(int releases) {
/*releases每次也是1,此时就是减去*/
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 当state等于0的时候,设置锁的拥有者为null
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 到有锁重入时,state--
setState(c);
return free;
}
2-4 锁的可打断原理
常识:ReentrantLock分为不可打断模式与可打断模式,默认是不可打断模式。
不可打断模式(lock()方法)
- 这个模式下,即使线程被打断,线程仍然会留在AQS阻塞队列中,等获得锁后方能继续运行。
- 线程只有获得锁后才能响应打断,。
1)acquireQueued中设置interrupted为false然后进入到死循环中再次申请锁。
2)在shouldParkAfterFailedAcquire(p, node)为true的情况下
--如果当前线程没有被打断,则parkAndCheckInterrupt()调用park,线程阻塞。
--如果该线程已经被打断,则parkAndCheckInterrupt()内部的park调用失效,线程无法停止执行。函数返回true,
此时interrupted为true。
然而在不可打断模式下,interrupted为true 没有被处理。只有满足 if (p == head && tryAcquire(arg)) 这个条件即当前线程获得锁才能返回interrupted = true。其他情况下线程依旧最终会陷入阻塞状态。
3)acquireQueued(addWaiter(Node.EXCLUSIVE), arg)的返回true,则会执行selfInterrupt()函数,对当前线程进行一次打断,打断标记设为true。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果该线程被其他线程interrupt,那么中断标记就为true
// 此时parkAndCheckInterrupt()返回的就是true,并且park方法失效
//
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
// park方法会在打断 标记为true的情况下失效。
LockSupport.park(this);
// interrupted() 方法会查看中断状态,将中断标记重置为假,保证下一次park不受之前的打断标记影响。
return Thread.interrupted();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
可打断模式(lockInterruptibly())
从下面的源码可以看到,采用可打断的方式进行加锁,源码中在位置1与位置2的地方会检查打断标记,并抛出异常。
源码
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) // 位置1
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 区别于不可中断模式下的实现,打断标记让park失效后,这里会抛出异常,
// 不会再执行for循环即在AQS队列中等待
throw new InterruptedException(); // 位置2
}
} finally {
if (failed)
cancelAcquire(node);
}
}
总结:可打断锁与不可打断锁的实现关键在于对于打断的处理。
- 不可打断锁循环中会调用Thread.interrupted()方法清除打断标记,从而使得线程继续在AQS阻塞队列中等待。
- 可打断锁则会通过抛出异常的方式立刻响应打断,从而让线程停止在阻塞队列中等待。结合实例来理解。
2-5 公平锁的原理
概念强调:公平性体现在当多个线程进行锁竞争时,确保AQS阻塞队列中唤醒的线程与没有进入阻塞队列的线程竞争时优先获得锁。
非公平锁加锁时不会检查阻塞队列
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 非公平锁加锁的时候,线程直接尝试修改state变量
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
公平锁竞争锁时会检查阻塞队列
- 检查阻塞队列中是否还有线程
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 可以看到公平锁在线程竞争锁之前会先去检查AQS的阻塞队列是否为空
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// h != t 为 true代表队列中还有节点
// ((s = h.next) == null 为true代表头节点的后继节点是否为空
// s.thread != Thread.currentThread()为true代表当前线程不是阻塞队列中线程
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
2-6 条件变量设置原理
基本思想:每个条件变量其实就对应着一个等待队列(waitset),用于存放因不满条件需要等待的线程,其实现类是 ConditionObject
await的流程(与synchronized的wait()方法对比学习)
情景:Thread-0 持有锁,但是由于执行条件不满足,调用对应的ConditionObject 的await方法。
在await方法内部有以下几个步骤
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// step1:创建等待队列节点,并加入到等待队列尾部
Node node = addConditionWaiter();
// step2:释放锁,并唤醒阻塞队列中的下一个线程去竞争锁。
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// step3:当前线程主动park,变成等待状态。
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
step1: 首先addConditionWaiter 流程:该流程创建新的 Node(该节点关联当前线程),然后加入对应的ConditionObject对象的等待队列尾部。
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建新的节点,关联当前线程,并设置节点状态为-2,用于标识是等待队列中的节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 将新的节点插入到链表尾部(队列尾部)
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
step2: 将节点加入等待队列尾部后,则进入 AQS 的 fullyRelease 流程, 该流程用于释放同步器上的锁(当前线程拥有的锁)(释放拥有的锁给其他需要的线程使用)
- fullyRelease考虑到锁重入的情况的锁的释放。
下面图中Thread0进入等待队列后唤醒阻塞队列中的下一个线程Thread1去竞争锁。
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) { // release过程中会huan'xi
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
step3: 当前线程调用park方法使得线程陷入到等待状态。
图中已经在等待队列中的线程0调用park进入等待状态(变为灰色)
signal的流程(与synchronized的signal()方法对比学习)
情景:Thread0在等待队列中,当前拥有锁的Thread1调用signal()去唤醒Thread0。
在signal方法内部有以下几个步骤
step1:判断调用线程是否为锁的持有者,不是抛出异常
step2:等待队列中出队一个节点,并加入到阻塞队列的尾部,如果转移失败,则再出队一个节点,直到转移成功或者等待队列为空。
step3:
public final void signal() {
// step1:先判断调用线程是否为锁的持有者,只有锁的持有者线程才能唤醒等待队列中的线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
// step2:
do {
// firstWaiter是队列头部,改变队列队列头部指向,让First节点出队
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//出队的节点与其他节点断开链接
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/*
transferForSignal(first)将等待队列中出队的first节点转移到AQS的阻塞队列尾部,
转移成功则返回true,转移失败返回false,这种情况下会再从等待队列中选择一个节点出队。
转移失败是由于当前线程由于等待或者超时放弃了对锁的竞争,因此不需要放入阻塞队列
*/
final boolean transferForSignal(Node node) {
// AQS等待队列中的尾节点waitStatus必须为0,如果无法修改,说明该线程已经被取消锁的竞争
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node); // 将node加入到AQS等待队列尾部,成功返回前驱节点
// 由于p节点不再是AQS等待队列的尾部节点,因此其waitset需要从0修改为-1,
// -1代表该节点有义务唤醒其后驱节点线程
int ws = p.waitStatus;
// 如果前驱节点被取消或者设置失败,进入到if
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
step2: 执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0(新的尾部节点) 的 waitStatus 改为 0,Thread-3 的waitStatus(非尾部节点,有义务唤醒后驱节点的线程) 改为 -1
step3: Thread-1 释放锁,进入 unlock 流程(与2-2中unlock方法一致)
ConditionObject源码的部分方法
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
public ConditionObject() { }
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
}
核心:AQS抽象类实现了两个内部类ConditionObject类与Node类
该抽象类中三个关键属性以及数据结构:
1)Node类,类的每个实例都关联一个竞争锁的线程。
2)ConditionObject类,每个condition类的实例都是维护了一个等待队列。该等待队列的节点就是Node类的实例。
3)AQS类中维护的等待队列。
AQS类中维护的等待队列与每个condition类的实例都是维护的等待队列的关系?
概述:二者的节点都是Node类实例,其关系类似于synchronized的Monitor对象头的EntrySet与WaitSet
1)AQS类中维护的等待队列中每个节点关联的线程都可以直接参与锁的竞争
2)每个condition类的实例都是维护的等待队列中的节点需要被当前拥有锁的线程signal之后才能进入到AQS类中维护的等待队列中。