一、入队操作
- 当一个线程获取锁失败之后会被转换为Node节点,然后会使用enq方法,将该节点插入到AQS的阻塞队列,下面看一下这个方法如何实现
private Node enq(final Node node) {
for(;;) {
Node t = tail;
if( t == null) {
if(compareAndSetHead(new Node())) {
tail = head;
}
}else {
node.prev = t;
if(compareAndSetTail(t,node)) {
t.next = node;
}
}
}
}
- 解析:进入到第一次循环,tail和head此时都是null,如图default,然后让t指向tail,即null,如图I,此时都是null,然后随便产生一个实例,也就是II所示,让head指向它,然后执行语句,让tail也指向它;
- 接下来进行第二次循环,让t也指向这个实例,如图III,然后进入到else语句,先让node的前向指向哨兵,如图IV;然后,把tail设置为node,在把哨兵的next指向tail,如图VI,也就结束了。
二、AQS-条件变量的支持
- notify和wait方法是配合synchronized内置锁实现线程同步的,条件变量的signal和await方法也是用来配合锁(使用AQS实现的锁),实现线程间同步的基础设施
- 它们的不同在于,synchronized同时只能与一个共享变量的notify或者wait方法实现同步,而AQS的一个锁可以对应多个条件变量,我们先看下面的例子
package com.ruigege.LockSourceAnalysis6;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestReentrantLock {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
try {
System.out.println("begin wait");
condition.await();
System.out.println("end wait");
}catch(Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
lock.lock();
try {
System.out.println("begin signal");
condition.signal();
System.out.println("end signal");
}catch(Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
- 其实这里的Lock对象就相当于synchronized加上了共享变量,调用lock.lock()就相当于进入了synchronized块(获取了共享变量的内置锁),调用lock.unlock()方法就相当于推出了synchronized块,调用条件变量的signal方法就相当于调用共享变量的notify方法,signalAll()相当于notifyAll()方法
- 在上面的代码中,lock.newCondition()的作用其实就是new了一个在AQS内部声明的ConditionObject对象,ConditionObject是AQS的内部类,可以方法AQS内部的变量(例如状态变量state)和方法,在每个条件变量内部都维护了一个条件队列,用来存放调用条件变量的await()方法时被阻塞的线程,注意这个条件队列和AQS队列不是一回事。
- 下面便是await方法源码,当线程调用条件变量的await()方法时(必须先调用锁的lock方法获取锁),在内部会构造一个类型为Node.CONDITION的node节点,然后将该节点插入到条件队列末尾,之后当前线程会释放获取得锁(也就是会操作锁对应的state变量的值),并被阻塞挂起,这时候如果有其他线程调用lock.lock()尝试获取锁,就会有一个线程获取到锁,如果获取到的锁的线程调用了条件变量的await方法,则该线程会被放入到条件变量的阻塞队列中,然后释放获取到的锁,在await方法处阻塞。
public final void await() throws InterruptedException {
if(Thread.interrupted()) {
throw new InterruptedException();
}
Node node = addContionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while(!isOnSyncQueue(node)) {
LockSupport.park(this);
if((interruprMode = checkInterruptWhileWaiting(node)) != 0) {
break;
}
}
}
三、源码:
- 所在包:com.ruigege.ConcurrentListSouceCodeAnalysis5
https://github.com/ruigege66/ConcurrentJava
- 欢迎关注微信公众号:傅里叶变换,个人账号,仅用于技术交流