JUC源码分析-集合篇(八)DelayQueue
DelayQueue 是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。 队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
1. DelayQueue 使用场景
1.1 DelayQueue 特点
DelayQueue 也是一种比较特殊的阻塞队列,从类声明也可以看出,DelayQueue 中的所有元素必须实现 Delayed 接口。DelayQueue 队列的元素必须实现 Delayed 接口。
// 此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。
public interface Delayed extends Comparable<Delayed> {
// 返回与此对象相关的剩余有效时间,以给定的时间单位表示
long getDelay(TimeUnit unit);
}
可以看到,Delayed 接口除了自身的 getDelay 方法外,还实现了 Comparable 接口。getDelay 方法用于返回对象的剩余有效时间,实现 Comparable 接口则是为了能够比较两个对象,以便排序。
也就是说,如果一个类实现了 Delayed 接口,当创建该类的对象并添加到 DelayQueue 中后,只有当该对象的 getDalay 方法返回的剩余时间 ≤0 时才会出队。
另外,由于 DelayQueue 内部委托了 PriorityQueue 对象来实现所有方法,所以能以堆的结构维护元素顺序,这样剩余时间最小的元素就在堆顶,每次出队其实就是删除剩余时间 ≤0 的最小元素。
DelayQueue 的特点简要概括如下:
- DelayQueue 是无界阻塞队列;
- 队列中的元素必须实现 Delayed 接口,元素过期后才会从队列中取走;
1.2 DelayQueue 使用场景
DelayQueue 非常有用,可以将 DelayQueue 运用在以下应用场景。
- 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。
- 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就开始执行,比如 javax.swing.TimerQueue 就是使用 DelayQueue 实现的。ScheduledFutureTask
1.3 DelayQueue 示例
我们可以参考 ScheduledThreadPoolExecutor#ScheduledFutureTask 类的实现。
// 模仿网吧上网场景
public class DelayQueueTest extends Thread {
DelayQueue queue = new DelayQueue();
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
public static void main(String[] args) {
DelayQueueTest wangba = new DelayQueueTest();
wangba.start();
wangba.shangji("A", 5);
wangba.shangji("B", 2);
wangba.shangji("C", 4);
}
public void shangji(String name, int money) {
WangMing wm = new WangMing(name, System.currentTimeMillis() + money * 1000l);
queue.add(wm);
System.out.println(name + "开始上网,时间:" + format.format(new Date()) +
",预计下机时间为:" + format.format(new Date(wm.getEndTime())));
}
public void xiaji(WangMing wm) {
System.out.println(wm.getName() + "下机,时间:" + format.format(new Date(wm.getEndTime())));
}
public void run() {
while (true) {
try {
WangMing wm = (WangMing) queue.take();
xiaji(wm);
} catch (InterruptedException e) {
}
}
}
}
// 网民,必须实现 Delayed 接口
class WangMing implements Delayed {
private String name;
private long endTime;
private TimeUnit timeUnit = TimeUnit.SECONDS;
@Override
public long getDelay(TimeUnit unit) {
return endTime - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
WangMing wm = (WangMing) o;
return this.getDelay(timeUnit) - wm.getDelay(timeUnit) > 0 ? 1 :
(this.getDelay(timeUnit) - wm.getDelay(timeUnit) < 0 ? -1 : 0);
}
}
程序执行结果:
A开始上网,时间:2017-12-07 09:37:52,预计下机时间为:2017-12-07 09:37:57
B开始上网,时间:2017-12-07 09:37:52,预计下机时间为:2017-12-07 09:37:54
C开始上网,时间:2017-12-07 09:37:52,预计下机时间为:2017-12-07 09:37:56
B下机,时间:2017-12-07 09:37:54
C下机,时间:2017-12-07 09:37:56
A下机,时间:2017-12-07 09:37:57
2. DelayQueue 源码分析
介绍完了 DelayQueued 的基本使用,读者应该对该阻塞队列的功能有了基本了解,接下来我们看下 Doug Lea 是如何实现 DelayQueued 的。
2.1 DelayQueue 属性
private final transient ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();
// PriorityQueue 维护队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
上述比较特殊的是 leader 字段,我们之前已经说过,DelayQueue 每次只会出队一个过期的元素,如果队首元素没有过期,就会阻塞出队线程,让线程在 available 这个条件队列上无限等待。
为了提升性能,DelayQueue 并不会让所有出队线程都无限等待,而是用 leader 保存了第一个尝试出队的线程,该线程的等待时间是队首元素的剩余有效期。这样,一旦 leader 线程被唤醒(此时队首元素也失效了),就可以出队成功,然后唤醒一个其它在 available 条件队列上等待的线程。之后,会重复上一步,新唤醒的线程可能取代成为新的 leader 线程。这样,就避免了无效的等待,提升了性能。这其实是一种名为 Leader-Follower pattern
的多线程设计模式。
2.2 入队 offer
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e); // 调用 PriorityQueue#offer 方法
if (q.peek() == e) { // 如果入队元素在队首, 则唤醒一个出队线程
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
需要注意的是当首次入队元素时,需要唤醒一个出队线程,因为此时可能已有出队线程在空队列上等待了,如果不唤醒,会导致出队线程永远无法执行。
2.3 出队 poll
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
// 1. 没有元素或元素还在有效期内则直接返回 null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
// 2. 元素已经失效直接取出来一个
else
return q.poll();
} finally {
lock.unlock();
}
}
不阻塞直接 poll 时很简单,再来看一下阻塞式获取元素 take 方法。
2.4 阻塞式出队 take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
// 1. 集合为空时所有的线程都处于无限等待的状态。
// 只要有元素将其中一个线程转为 leader 状态
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
// 2. 元素已经过期,直接取出返回
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
// 3. 已经在其它线程设置为 leader,无限期等着
if (leader != null)
available.await();
// 4. 将 leader 设置为当前线程,阻塞当前线程(限时等待剩余有效时间)
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
// 4.1 尝试获取过期的元素,重新竞争
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 5. 队列中有元素则唤醒其它无限等待的线程
// leader 线程是限期等待,每次 leader 线程获取元素出队,如果队列中有元素
// 就要唤醒一个无限等待的线程,将其设置为限期等待,也就是总有一个等待线程是 leader 状态
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
采用 take 阻塞式出队时,这里要思考下集合中元素时所有的等待线程永远进行 wait 状态不被唤醒,也就是说即使元素过期了也无法正常出队?
首先,在每次入队 offer 时,如果是第一个元素就会调用 vailable.signal() 唤醒一个等待的线程。
其次,take 方法自旋结束后如果 leader == null && q.peek() != null,需要唤醒一个等待中的出队线程。
leader == null && q.peek() != null 的含义就是——没有 leader 线程但队列中存在元素。我们之前说了,leader 线程作用之一就是用来唤醒其它无限等待的线程,所以必须要有这个判断。
当然,如果集合中没有元素了,所有的等待线程都处理无限等待的状态。
参考:
每天用心记录一点点。内容也许不重要,但习惯很重要!