背景
在自学DelayQueue
源码的时候遇到一个问题,咨询附近人无果后,在sf上提问,结果就是又长了一个知识点。
原文链接 https://segmentfault.com/q/1010000007602886
问题如下
DelayQueue 学习中遇到的疑问
环境:jdk1.8.0_73
在学习DelayQueue
的take()
方法时,关于源码有多处不理解,特求助。
先贴源码
public E take() throws InterruptedException {
//获取锁
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
try {
//无限尝试
for (;;) {
//获取 priorityQueue 的第一个元素,不会移除
E first = q.peek();
//如果对应的 priorityQueue 中没有元素,也就是first为空,那么就等待(阻塞等待唤醒)
if (first == null)
available.await();
else {
//priorityQueue中有元素,获取first的过期时间
long delay = first.getDelay(NANOSECONDS);
//如果已经过期直接取出
if (delay <= 0)
return q.poll();
//1111111111111111111 此处疑问
first = null; // don't retain ref while waiting
//222222222222 此处疑问
if (leader != null)
available.await();
else {
//如果 leader 为空,也就是没有线程等待获取队列头元素,获取当前线程,并且将leader设置为当前线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//333333333333
//等待第一个元素过期
available.awaitNanos(delay);
} finally {
//如果leader是当前线程,leader置空
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//for 循环结束后 leader 为空 并且priorityQueue中有元素 唤醒等待队列。
if (leader == null && q.peek() != null)
available.signal();
//释放锁
lock.unlock();
}
}
顺便贴上部分变量的说明
/**
* Thread designated to wait for the element at the head of
* the queue. This variant of the Leader-Follower pattern
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
* minimize unnecessary timed waiting. When a thread becomes
* the leader, it waits only for the next delay to elapse, but
* other threads await indefinitely. The leader thread must
* signal some other thread before returning from take() or
* poll(...), unless some other thread becomes leader in the
* interim. Whenever the head of the queue is replaced with
* an element with an earlier expiration time, the leader
* field is invalidated by being reset to null, and some
* waiting thread, but not necessarily the current leader, is
* signalled. So waiting threads must be prepared to acquire
* and lose leadership while waiting.
*/
private Thread leader = null;
/**
* Condition signalled when a newer element becomes available
* at the head of the queue or a new thread may need to
* become leader.
*/
private final Condition available = lock.newCondition();
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
首先说说我的理解:
首先leader是准备获取头元素的线程,但是不稳定有可能会被质空,如果leader不为空则说明有线程正在获取头元素,所以其他线程会阻塞,如果leader为空,那么就是无线程在尝试获取头元素,之所以不稳定是因为在添加元素的时候(add()
),有可能导致头元素变动,所以如果添加的时候导致头元素变动,那么leader会被置空。(如果理解不正确希望指正)
我的问题
我有两个问题:
1、第一个问题
先说数字2处的问题,我不懂为什么这个地方需要判断leader是否为空,在我的理解里,这个地方leader是一定为空的。
理由如下:
我查找了一下所有leader变量被更改的地方总计有5处,offer()
方法一处,也就是之前说的头元素变动会把leader置空,take()
方法两处,poll(long timeout, TimeUnit unit)
方法两处。但是仔细观察的话会发现不管是take()
还是poll()
方法都是先获得锁,再for循环,而且整个方法结束后leader
变量会被置空,也就是说不管是take
还是poll
方法只要是执行过leader
就会被置空,也许有人说万一在leader
赋值后抛异常怎么办?从代码来看只能从代码3的地方抛异常,为什么呢,因为如果3不抛异常,那么leader
立马就被置空。综上所述,既然take()
方法先获取到了锁,那么为什么leader
还会为非空!也就是为什么要在2的地方进行判断非空。
2、第二个问题
数字1处的问题,为什么此处first
置空。
我查询了相关资料,发现在jdk1.7中是没有这段代码的,资料表述的意思是防止内存泄漏,first
变量被gc回收。
资料地址 http://www.jianshu.com/p/e0bcc9eae0ae
很抱歉没找到官方说明。
引用自资料中相关说明
take方法中为什么释放first元素
first = null; // don't retain ref while waiting
我们可以看到doug lea后面写的注释,那么这段代码有什么用呢?想想假设现在延迟队列里面有三个对象。
- 线程A进来获取first,然后进入 else 的else ,设置了leader为当前线程A
- 线程B进来获取first,进入else的阻塞操作,然后无限期等待
- 这时在JDK 1.7下面他是持有first引用的
- 如果线程A阻塞完毕,获取对象成功,出队,这个对象理应被GC回收,但是他还被线程B持有着,GC链可达,所以不能回收这个first.
- 假设还有线程C 、D、E.. 持有对象1引用,那么无限期的不能回收该对象1引用了,那么就会造成内存泄露.
从第二条我就看不懂了
线程B进来获取first,进入else的阻塞操作,然后无限期等待
在我理解,线程进来的时候,线程A已经释放锁了,因为ReentranceLock
是排他锁,并非共享锁,所以之前的first
变量指向的是线程A取走的那个元素,而且方法出栈后,相关变量应该是会被gc的。那么线程B获取first的时候,应该是指向不同的元素了吧,所以为啥线程B还会持有之前的first变量。
求大神详解!我是哪里理解的不对,还是有什么基础概念想错了,还是说忽略了某些部分。
最终结果
自问自答
缺失的知识点是:
Condition.await()
会自行释放锁。
参考资料
https://segmentfault.com/q/1010000002390706
http://stackoverflow.com/questions/27058828/why-await-of-condition-releases-the-lock-but-signal-does-not
以下摘取部分内容
jdk1.8 await方法的说明
The lock associated with this Condition is atomically released and the current thread becomes disabled for thread scheduling purposes and lies dormant until one of four things happens:
- Some other thread invokes the signal() method for this Condition and the current thread happens to be chosen as the thread to be awakened; or
- Some other thread invokes the signalAll() method for this Condition; or
- Some other thread interrupts the current thread, and interruption of thread suspension is supported; or
- A "spurious wakeup" occurs.
In all cases, before this method can return the current thread must re-acquire the lock associated with this condition. When the thread returns it is guaranteed to hold this lock.
评论
the API would become confusing: there would be more than one method releasing the lock
自行的测试代码
public class ConditionReleaseTest {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
final Condition con1 = lock.newCondition();
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
lock.lock();
try {
System.out.println("线程1获取锁,condition wait 10s");
con1.await(10, TimeUnit.SECONDS);
System.out.println("线程1获取锁,condition wait 10s 结束 假定拥有锁?");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("线程1释放锁");
lock.unlock();
}
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
lock.lock();
try {
Thread.sleep(100L);
System.out.println("线程2 获得锁 睡眠20秒 拥有锁");
Thread.sleep(20000L);
} catch (Exception e) {
} finally {
System.out.println("线程2释放锁");
lock.unlock();
}
}
});
thread1.start();
thread2.start();
}
}
运行结果:
线程1获取锁,condition wait 10s
线程2 获得锁 睡眠20秒 拥有锁
线程2释放锁
线程1获取锁,condition wait 10s 结束 假定拥有锁?
线程1释放锁
在有以上知识点的基础上,我所有的疑问都可以解释的通了。
first
变量会引起内存泄漏
感谢所有回答的人。
@iMouseWu[imousewu] @kevinz[kevinz] @scort[scort]
ps: peek()
只是查询第一个元素,不会从队列里取出。
最终感想
其实这个问题的答案到最后真的很简单,简单到什么程度呢,可以总结成一个常见的面试题。
wait()和sleep()有什么区别?
也许大家对这个答案非常熟悉,本人也是,但是当遇到最初提问的问题的时候却没有第一时间想通。这个真的是理论很简单,一到真正的实践部分,就完全没概念了。所以在学习的过程中理论很重要,实践也非常重要,与大家共勉!