AQS抽象队列同步器详解
学习材料来源于网络
如有侵权,联系删除
同步锁的本质
同步锁的本质就是排队
- 同步的方式:独享锁-单个队列窗口,共享锁-多个队列窗口
- 抢锁的方式:插队抢(不公平锁)、先来后到抢锁(公平锁)
- 没抢到锁的处理方式:快速尝试多次(CAS自旋锁)、阻塞等待
- 唤醒阻塞线程的方式(叫号器):全部通知、通知下一个
示例1
package icu.shaoyayu.multithreading.chapter5;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
/**
* @author shaoyayu
* @E_Mail
* @Version 1.0.0
* @readme :
*/
public class MyLock implements Lock {
/**
* 判断一个锁的状态,或者锁的拥有者
*/
volatile AtomicReference<Thread> owner = new AtomicReference<>();
/**
* 存储等待的线程
*/
volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
/**
* 尝试拿锁一次
* @return
*/
@Override
public boolean tryLock() {
return owner.compareAndSet(null,Thread.currentThread());
}
/**
* 一直尝试抢锁
*/
@Override
public void lock() {
boolean addQ = true;
while (!tryLock()){
if (addQ){
//第一次没有拿到锁,加入到等待队列中
waiters.offer(Thread.currentThread());
addQ = false;
}else {
//阻塞当前的线程,不进行往前执行
LockSupport.park();
}
}
waiters.remove(Thread.currentThread());
}
/**
* 释放锁
*/
@Override
public void unlock() {
//判断是否当前线程上的锁
if (owner.compareAndSet(Thread.currentThread(),null)){
Iterator<Thread> iterator = waiters.iterator();
while (iterator.hasNext()){
Thread next = iterator.next();
LockSupport.unpark( );
}
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
测试
package icu.shaoyayu.multithreading.chapter5;
import icu.shaoyayu.multithreading.chapter4.LingFengLock;
import java.util.concurrent.locks.Lock;
/**
* @author shaoyayu
* @E_Mail
* @Version 1.0.0
* @readme :
*/
public class LockDemo3 {
volatile int i = 0;
private MyLock lock = new MyLock();
public void add() {
lock.lock();
try {
// TODO 很多业务操作
i++;
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
LockDemo3 ld = new LockDemo3();
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int j = 0; j < 10000; j++) {
ld.add();
}
}).start();
}
Thread.sleep(2000L);
System.out.println(ld.i);
}
}
运行结果:
20000
Process finished with exit code 0
AQS抽象队列同步器
提供了对资源占用、释放,线程的等待、唤醒等等接口和具体实现。
可以用在各种需要控制资源争用的场景中。(ReentrantLock/CountDownLatch/Semphore)
acquire、acquireShared:定义了资源争用的逻辑,如果没拿到,则等待。
tryAcquire、tryAcquireShared:实际执行占用资源的操作,如何判定一个由使用者具体去实现。
release、releaseShared :定义释放资源的逻辑,释放之后,通知后续节点进行争抢。
tryRelease、tryReleaseShared:实际执行资源释放的操作,具体的AQS使用者去实现。
示例2
自定义AQS规则
package icu.shaoyayu.multithreading.chapter5;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
/**
* @author shaoyayu
* @E_Mail
* @Version 1.0.0
* @readme :
*/
// 抽象队列同步器
// state, owner, waiters
public class SimpleAqs {
// acquire、 acquireShared : 定义了资源争用的逻辑,如果没拿到,则等待。
// tryAcquire、 tryAcquireShared : 实际执行占用资源的操作,如何判定一个由使用者具体去实现。
// release、 releaseShared : 定义释放资源的逻辑,释放之后,通知后续节点进行争抢。
// tryRelease、 tryReleaseShared: 实际执行资源释放的操作,具体的AQS使用者去实现。
// 1、 如何判断一个资源的拥有者
public volatile AtomicReference<Thread> owner = new AtomicReference<>();
// 保存 正在等待的线程
public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
// 记录资源状态
public volatile AtomicInteger state = new AtomicInteger(0);
// 共享资源占用的逻辑,返回资源的占用情况
public int tryAcquireShared(){
throw new UnsupportedOperationException();
}
public void acquireShared(){
boolean addQ = true;
while(tryAcquireShared() < 0) {
if (addQ) {
// 没拿到锁,加入到等待集合
waiters.offer(Thread.currentThread());
addQ = false;
} else {
// 阻塞 挂起当前的线程,不要继续往下跑了
LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
}
}
waiters.remove(Thread.currentThread()); // 把线程移除
}
public boolean tryReleaseShared(){
throw new UnsupportedOperationException();
}
public void releaseShared(){
if (tryReleaseShared()) {
// 通知等待者
Iterator<Thread> iterator = waiters.iterator();
while (iterator.hasNext()) {
Thread next = iterator.next();
LockSupport.unpark(next); // 唤醒
}
}
}
// 独占资源相关的代码
public boolean tryAcquire() { // 交给使用者去实现。 模板方法设计模式
throw new UnsupportedOperationException();
}
public void acquire() {
boolean addQ = true;
while (!tryAcquire()) {
if (addQ) {
// 没拿到锁,加入到等待集合
waiters.offer(Thread.currentThread());
addQ = false;
} else {
// 阻塞 挂起当前的线程,不要继续往下跑了
LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
}
}
waiters.remove(Thread.currentThread()); // 把线程移除
}
public boolean tryRelease() {
throw new UnsupportedOperationException();
}
public void release() { // 定义了 释放资源之后要做的操作
if (tryRelease()) {
// 通知等待者
Iterator<Thread> iterator = waiters.iterator();
while (iterator.hasNext()) {
Thread next = iterator.next();
LockSupport.unpark(next); // 唤醒
}
}
}
public AtomicInteger getState() {
return state;
}
public void setState(AtomicInteger state) {
this.state = state;
}
}
使用在自定义的lock上面
package icu.shaoyayu.multithreading.chapter5;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @author shaoyayu
* @E_Mail
* @Version 1.0.0
* @readme :
*/
// 自己实现(独享锁) - 常用的
public class SimpleLock implements Lock {
// 抽象工具类AQS
SimpleAqs aqs = new SimpleAqs(){
@Override
public boolean tryAcquire() {
return owner.compareAndSet(null, Thread.currentThread());
}
@Override
public boolean tryRelease() {
// 可重入的情况下,要判断资源的占用情况(state字段保存了资源的占用次数)
return owner.compareAndSet(Thread.currentThread(), null);
}
};
@Override
public boolean tryLock() {
return aqs.tryAcquire();
}
@Override
public void lock() {
aqs.acquire();
}
@Override
public void unlock() {
aqs.release();
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
测试:
package icu.shaoyayu.multithreading.chapter5;
import icu.shaoyayu.multithreading.chapter4.LingFengLock;
import java.util.concurrent.locks.Lock;
/**
* @author shaoyayu
* @E_Mail
* @Version 1.0.0
* @readme :
*/
public class LockDemo3 {
volatile int i = 0;
private Lock lock = new SimpleLock();
public void add() {
lock.lock();
try {
// TODO 很多业务操作
i++;
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
LockDemo3 ld = new LockDemo3();
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int j = 0; j < 10000; j++) {
ld.add();
}
}).start();
}
Thread.sleep(2000L);
System.out.println(ld.i);
}
}
运行结果:
20000
Process finished with exit code 0
AQS 关键源码
package com.study.lock.aqs.source;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.locks.AbstractOwnableSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import sun.misc.Unsafe;
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
protected AbstractQueuedSynchronizer() { }
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;// 等待超时或被中断
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;// 释放锁之后,是否通知后一个节点
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;// 处于等待队列中,结点的线程等待在Condition上
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;// 共享模式中使用,线程处于可运行状态
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
static final long spinForTimeoutThreshold = 1000L;
/**将node加入队尾
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) { // CAS + 循环 = 自旋
Node t = tail;
if (t == null) { // Must initialize 队列为空
if (compareAndSetHead(new Node()))// 创建一个空的标志结点作为head结点
tail = head; // tail 和 head都是同一个节点
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { // 放入tail尾部
t.next = node;
return t;
}
}
}
}
/**将当前线程加入到等待队列的队尾,并返回当前线程所在的结点
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//EXCLUSIVE(独占)和SHARED(共享)
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {// 直接放到队尾。
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);// 按正常逻辑入队列
return node;
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
/** 唤醒等待者
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
int ws = node.waitStatus; // 正在释放锁的线程节点状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 修改当前节点状态
Node s = node.next; // 找下一个节点
if (s == null || s.waitStatus > 0) { // 如果不存在或者被取消了,继续寻找合适的下一个节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) // 如果找到了合适的节点,就唤醒它
LockSupport.unpark(s.thread);
}
/** 共享模式下 - 唤醒当前head节点的后续节点
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { // 判定是否还有后续节点
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 如果状态为SIGNAL,代表需要通知后续节点
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))// 修改状态为0,通知一次
continue; // loop to recheck cases 修改失败,代表已经通知,继续处理
unparkSuccessor(h); // 唤醒
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 通知过后,修改节点状态为PROPAGATE
continue; // loop on failed CAS
}
if (h == head) // loop if head changed 知道其他的节点,把这个head挤下来,它才跳出循环
break;
}
}
/** 修改head节点,同时传播可以获取资源的信号
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node); // 修改head
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {// 资源剩余的数量如果不大于0,则不需要再进行唤醒
Node s = node.next;
if (s == null || s.isShared()) // 共享模式,通知其他节点
doReleaseShared();
}
}
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
/**检查状态,是否需要挂起线程
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 根据 前置节点的状态 执行不同的流程
if (ws == Node.SIGNAL) // 前置节点释放锁之后会通知当前线程,挂起吧
return true;
if (ws > 0) { // 前置节点处于CANCELLED状态,跳过它继续寻找正常的节点,并且甩掉中间那段不正常的节点
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do { // 也可以理解为,这是一次队列检查
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 修改前置的状态为SIGNAL,用意是释放锁之后会通知后续节点
}
return false;
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted(); // 返回是否有中断迹象
}
/**进入等待状态,直到其他线程释放资源后唤醒继续执行
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 当前线程释放中断的标志位
for (;;) {// 不断尝试
final Node p = node.predecessor(); // 获取前一个节点
if (p == head && tryAcquire(arg)) { // 如果前一个节点是head,尝试抢一次锁
setHead(node); // 更换head
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&// 检查状态,是否需要挂起线程
parkAndCheckInterrupt())// 如果需要挂起,则通过Park进入停车场挂起
interrupted = true; // 如果出现中断,则修改标记
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**等待..
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 以共享模式加入队列尾部
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { // 自旋
final Node p = node.predecessor(); // 前置节点
if (p == head) { // 如果前置为head
int r = tryAcquireShared(arg); // 尝试获取资源,返回资源剩余的数量
if (r >= 0) { // 拿到资源
setHeadAndPropagate(node, r); // 修改head节点
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**尝试去获取独占资源,由使用者自行实现
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/**尝试去释放指定的资源,由具体使用者定义
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**尝试获取资源,由使用者自行实现
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/** 线程释放共享资源
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
/**独占模式下线程获取共享资源的顶层入口
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 判断是否拿到锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/**释放资源的顶层入口
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head; // 从头开始找
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒下一个线程
return true;
}
return false;
}
/** 线程获取共享资源的入口
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) // 判断量够不够
doAcquireShared(arg); // 没拿到资源,需要等待
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
/** 线程释放共享资源
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
public final boolean hasQueuedThreads() {
return head != tail;
}
public final boolean hasContended() {
return head != null;
}
public final Thread getFirstQueuedThread() {
// handle only fast path, else relay
return (head == tail) ? null : fullGetFirstQueuedThread();
}
private Thread fullGetFirstQueuedThread() {
Node h, s;
Thread st;
if (((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null) ||
((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null))
return st;
Node t = tail;
Thread firstThread = null;
while (t != null && t != head) {
Thread tt = t.thread;
if (tt != null)
firstThread = tt;
t = t.prev;
}
return firstThread;
}
public final boolean isQueued(Thread thread) {
if (thread == null)
throw new NullPointerException();
for (Node p = tail; p != null; p = p.prev)
if (p.thread == thread)
return true;
return false;
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
if (p.thread != null)
++n;
}
return n;
}
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (!p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
public final Collection<Thread> getSharedQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
public String toString() {
int s = getState();
String q = hasQueuedThreads() ? "non" : "";
return super.toString() +
"[State = " + s + ", " + q + "empty queue]";
}
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
public final boolean owns(ConditionObject condition) {
return condition.isOwnedBy(this);
}
public final boolean hasWaiters(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.hasWaiters();
}
public final int getWaitQueueLength(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitQueueLength();
}
public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitingThreads();
}
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
public ConditionObject() { }
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
}
ReentrantLock 源码
package com.study.lock.aqs.source;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
// 源码说明版本 - 删除无关紧要的方法
public class ReentrantLockSourceLess {
private static final long serialVersionUID = 7373984872572414699L;
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
public ReentrantLockSourceLess() {
sync = new NonfairSync();
}
public ReentrantLockSourceLess(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.release(1);
}
// public Condition newCondition();
// public int getHoldCount();
// public boolean isHeldByCurrentThread();
// public boolean isLocked();
// public final boolean isFair();
// protected Thread getOwner();
// public final boolean hasQueuedThreads();
// public final boolean hasQueuedThread(Thread thread);
// public final int getQueueLength();
// protected Collection<Thread> getQueuedThreads();
// public boolean hasWaiters(Condition condition) ;
// public int getWaitQueueLength(Condition condition);
// protected Collection<Thread> getWaitingThreads(Condition condition);
// public String toString();
}