zoukankan      html  css  js  c++  java
  • JUC源码分析-集合篇(八)DelayQueue

    JUC源码分析-集合篇(八)DelayQueue

    DelayQueue 是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。 队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

    1. DelayQueue 使用场景

    1.1 DelayQueue 特点

    DelayQueue 也是一种比较特殊的阻塞队列,从类声明也可以看出,DelayQueue 中的所有元素必须实现 Delayed 接口。DelayQueue 队列的元素必须实现 Delayed 接口。

    // 此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。
    public interface Delayed extends Comparable<Delayed> {
        // 返回与此对象相关的剩余有效时间,以给定的时间单位表示
        long getDelay(TimeUnit unit);
    }
    

    可以看到,Delayed 接口除了自身的 getDelay 方法外,还实现了 Comparable 接口。getDelay 方法用于返回对象的剩余有效时间,实现 Comparable 接口则是为了能够比较两个对象,以便排序。

    也就是说,如果一个类实现了 Delayed 接口,当创建该类的对象并添加到 DelayQueue 中后,只有当该对象的 getDalay 方法返回的剩余时间 ≤0 时才会出队。

    另外,由于 DelayQueue 内部委托了 PriorityQueue 对象来实现所有方法,所以能以堆的结构维护元素顺序,这样剩余时间最小的元素就在堆顶,每次出队其实就是删除剩余时间 ≤0 的最小元素。

    DelayQueue 的特点简要概括如下:

    • DelayQueue 是无界阻塞队列;
    • 队列中的元素必须实现 Delayed 接口,元素过期后才会从队列中取走;

    1.2 DelayQueue 使用场景

    DelayQueue 非常有用,可以将 DelayQueue 运用在以下应用场景。

    1. 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。
    2. 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就开始执行,比如 javax.swing.TimerQueue 就是使用 DelayQueue 实现的。ScheduledFutureTask

    1.3 DelayQueue 示例

    我们可以参考 ScheduledThreadPoolExecutor#ScheduledFutureTask 类的实现。

    // 模仿网吧上网场景
    public class DelayQueueTest extends Thread {
        DelayQueue queue =  new DelayQueue();
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
    
        public static void main(String[] args) {
            DelayQueueTest wangba = new DelayQueueTest();
            wangba.start();
    
            wangba.shangji("A", 5);
            wangba.shangji("B", 2);
            wangba.shangji("C", 4);
        }
    
        public void shangji(String name, int money) {
            WangMing wm = new WangMing(name, System.currentTimeMillis() + money * 1000l);
            queue.add(wm);
            System.out.println(name + "开始上网,时间:" + format.format(new Date()) +
                    ",预计下机时间为:" + format.format(new Date(wm.getEndTime())));
        }
    
        public void xiaji(WangMing wm) {
            System.out.println(wm.getName() + "下机,时间:" + format.format(new Date(wm.getEndTime())));
        }
    
        public void run() {
            while (true) {
                try {
                    WangMing wm = (WangMing) queue.take();
                    xiaji(wm);
                } catch (InterruptedException e) {
                }
            }
        }
    }
    
    // 网民,必须实现 Delayed 接口
    class WangMing implements Delayed {
        private String name;
        private long endTime;
        private TimeUnit timeUnit = TimeUnit.SECONDS;
    
        @Override
        public long getDelay(TimeUnit unit) {
            return endTime - System.currentTimeMillis();
        }
    
        @Override
        public int compareTo(Delayed o) {
            WangMing wm = (WangMing) o;
            return this.getDelay(timeUnit) - wm.getDelay(timeUnit) > 0 ? 1 :
                    (this.getDelay(timeUnit) - wm.getDelay(timeUnit) < 0 ? -1 : 0);
        }
    }
    

    程序执行结果:

    A开始上网,时间:2017-12-07 09:37:52,预计下机时间为:2017-12-07 09:37:57
    B开始上网,时间:2017-12-07 09:37:52,预计下机时间为:2017-12-07 09:37:54
    C开始上网,时间:2017-12-07 09:37:52,预计下机时间为:2017-12-07 09:37:56
    B下机,时间:2017-12-07 09:37:54
    C下机,时间:2017-12-07 09:37:56
    A下机,时间:2017-12-07 09:37:57
    

    2. DelayQueue 源码分析

    介绍完了 DelayQueued 的基本使用,读者应该对该阻塞队列的功能有了基本了解,接下来我们看下 Doug Lea 是如何实现 DelayQueued 的。

    2.1 DelayQueue 属性

    private final transient ReentrantLock lock = new ReentrantLock();
    private final Condition available = lock.newCondition();
    
    // PriorityQueue 维护队列
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    private Thread leader = null;
    

    上述比较特殊的是 leader 字段,我们之前已经说过,DelayQueue 每次只会出队一个过期的元素,如果队首元素没有过期,就会阻塞出队线程,让线程在 available 这个条件队列上无限等待。

    为了提升性能,DelayQueue 并不会让所有出队线程都无限等待,而是用 leader 保存了第一个尝试出队的线程,该线程的等待时间是队首元素的剩余有效期。这样,一旦 leader 线程被唤醒(此时队首元素也失效了),就可以出队成功,然后唤醒一个其它在 available 条件队列上等待的线程。之后,会重复上一步,新唤醒的线程可能取代成为新的 leader 线程。这样,就避免了无效的等待,提升了性能。这其实是一种名为 Leader-Follower pattern 的多线程设计模式。

    2.2 入队 offer

    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);             // 调用 PriorityQueue#offer 方法
            if (q.peek() == e) {    // 如果入队元素在队首, 则唤醒一个出队线程
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
    

    需要注意的是当首次入队元素时,需要唤醒一个出队线程,因为此时可能已有出队线程在空队列上等待了,如果不唤醒,会导致出队线程永远无法执行。

    2.3 出队 poll

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            // 1. 没有元素或元素还在有效期内则直接返回 null
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            // 2. 元素已经失效直接取出来一个
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }
    

    不阻塞直接 poll 时很简单,再来看一下阻塞式获取元素 take 方法。

    2.4 阻塞式出队 take

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                // 1. 集合为空时所有的线程都处于无限等待的状态。
                //    只要有元素将其中一个线程转为 leader 状态
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    // 2. 元素已经过期,直接取出返回
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    // 3. 已经在其它线程设置为 leader,无限期等着
                    if (leader != null)
                        available.await();
                    // 4. 将 leader 设置为当前线程,阻塞当前线程(限时等待剩余有效时间)
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            // 4.1 尝试获取过期的元素,重新竞争
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // 5. 队列中有元素则唤醒其它无限等待的线程
            //    leader 线程是限期等待,每次 leader 线程获取元素出队,如果队列中有元素
            //    就要唤醒一个无限等待的线程,将其设置为限期等待,也就是总有一个等待线程是 leader 状态
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
    

    采用 take 阻塞式出队时,这里要思考下集合中元素时所有的等待线程永远进行 wait 状态不被唤醒,也就是说即使元素过期了也无法正常出队?

    首先,在每次入队 offer 时,如果是第一个元素就会调用 vailable.signal() 唤醒一个等待的线程。
    其次,take 方法自旋结束后如果 leader == null && q.peek() != null,需要唤醒一个等待中的出队线程。
    leader == null && q.peek() != null 的含义就是——没有 leader 线程但队列中存在元素。我们之前说了,leader 线程作用之一就是用来唤醒其它无限等待的线程,所以必须要有这个判断。
    当然,如果集合中没有元素了,所有的等待线程都处理无限等待的状态。

    参考:

    1. J.U.C之collections框架:DelayQueue

    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    [kuangbin带你飞]专题十二 基础DP1 E
    hdu 1203 I NEED A OFFER! (01背包)
    hdu 2602 Bone Collector (01背包)
    hdu 4513 吉哥系列故事——完美队形II (manacher)
    hdu 2203 亲和串 (KMP)
    hdu 1686 Oulipo (KMP)
    hdu 1251 统计难题 (字典树)
    hdu 2846 Repository (字典树)
    hdu 1711 Number Sequence (KMP)
    poj 3461 Oulipo(KMP)
  • 原文地址:https://www.cnblogs.com/binarylei/p/10926812.html
Copyright © 2011-2022 走看看