zoukankan      html  css  js  c++  java
  • 《java.util.concurrent 包源码阅读》20 DelayQueue

    DelayQueue有序存储Delayed类型或者子类型的对象,没当从队列中取走元素时,需要等待延迟耗完才会返回该对象。

    所谓Delayed类型,因为需要比较,所以继承了Comparable接口:

    public interface Delayed extends Comparable<Delayed> {
        long getDelay(TimeUnit unit);
    }

    其实Delayed对象的排序和延迟长短是无关的,因为Comparable的compare方法是用户自己实现的,DelayQueue只是保证返回对象的延迟已经耗尽。

    DelayQueue需要排序存储Delayed类型的对象同时具备阻塞功能,但是阻塞的过程伴有延迟等待类型的阻塞,因此不能直接使用BlockingPriorityQueue来实现,而是用非阻塞的版本的PriorityQueue来实现排序存储。

    private final PriorityQueue<E> q = new PriorityQueue<E>();

    因此DelayQueue需要自己实现阻塞的功能(需要一个Condition):

    private final Condition available = lock.newCondition();

    老规矩还是先来看offer方法:

        public boolean offer(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                q.offer(e);
                // 如果原来队列为空,重置leader线程,通知available条件
                if (q.peek() == e) {
                    leader = null;
                    available.signal();
                }
                return true;
            } finally {
                lock.unlock();
            }
        }

    顺便提一下,因为DelayQueue不限制长度,因此添加元素的时候不会因为队列已满产生阻塞,因此带有超时的offer方法的超时设置是不起作用的:

        public boolean offer(E e, long timeout, TimeUnit unit) {
            // 和不带timeout的offer方法一样
            return offer(e);
        }

    因为DelayQueue需要自己实现阻塞,因此关注的重点应该是两个带有阻塞的方法:没有超时的take方法和带有超时的poll方法。

    普通poll方法很简单,如果延迟时间没有耗尽的话,直接返回null就可以了。

        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                E first = q.peek();
                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                    return null;
                else
                    return q.poll();
            } finally {
                lock.unlock();
            }
        }

    接下来看take和带timeout的poll方法,在看过DelayedWorkQueue之后这部分还是比较好理解的:

        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    // 如果队列为空,需要等待available条件被通知
                    E first = q.peek();
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(TimeUnit.NANOSECONDS);
                        // 如果延迟时间已到,直接返回第一个元素
                        if (delay <= 0)
                            return q.poll();
                        // leader线程存在表示有其他线程在等待,那么当前线程肯定需要等待
                        else if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            // 如果没有leader线程,设置当前线程为leader线程
                            // 尝试等待直到延迟时间耗尽(可能提前返回,那么下次
                            // 循环会继续处理)
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                // 如果leader线程还是当前线程,重置它用于下一次循环。
                                // 等待available条件时,锁可能被其他线程占用从而导致
                                // leader线程被改变,所以要检查
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                // 如果没有其他线程在等待,并且队列不为空,通知available条件
                if (leader == null && q.peek() != null)
                    available.signal();
                lock.unlock();
            }
        }

    再来看带有timeout的poll方法,和DelayedWorkQueue非常相似:

        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    E first = q.peek();
                    if (first == null) {
                        if (nanos <= 0)
                            return null;
                        else
                            // 尝试等待available条件,记录剩余的时间
                            nanos = available.awaitNanos(nanos);
                    } else {
                        long delay = first.getDelay(TimeUnit.NANOSECONDS);
                        if (delay <= 0)
                            return q.poll();
                        if (nanos <= 0)
                            return null;
                        // 当leader线程不为空时(此时delay>=nanos),等待的时间
                        // 似乎delay更合理,但是nanos也可以,因为排在当前线程前面的
                        // 其他线程返回时会唤醒available条件从而返回,
                        // 这里使用nanos和nonas<delay合并更加简单
                        if (nanos < delay || leader != null)
                            nanos = available.awaitNanos(nanos);
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                long timeLeft = available.awaitNanos(delay);
                                // nanos需要更新
                                nanos -= delay - timeLeft;
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && q.peek() != null)
                    available.signal();
                lock.unlock();
            }
        }

    前面理解了DelayedWorkQueue再来看DelayQueue就非常容易理解了。

  • 相关阅读:
    洛谷
    洛谷
    洛谷
    51nod
    洛谷
    洛谷
    51nod
    洛谷
    2019五一训练记录
    2019.5.4备战省赛组队训练赛第十九场
  • 原文地址:https://www.cnblogs.com/wanly3643/p/3944661.html
Copyright © 2011-2022 走看看