zoukankan      html  css  js  c++  java
  • java并发:阻塞队列之DelayQueue

    延时队列

    DelayQueue是一个支持延时获取元素的使用优先级队列实现的无界的阻塞队列。

    在创建元素时可以指定多久才能从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。

    类图如下:

    DelayQueue的定义以及构造函数如下:

    public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
        implements BlockingQueue<E> {
    
        private final transient ReentrantLock lock = new ReentrantLock();
        private final PriorityQueue<E> q = new PriorityQueue<E>();
    
        /**
         * Thread designated to wait for the element at the head of
         * the queue.  This variant of the Leader-Follower pattern
         * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
         * minimize unnecessary timed waiting.  When a thread becomes
         * the leader, it waits only for the next delay to elapse, but
         * other threads await indefinitely.  The leader thread must
         * signal some other thread before returning from take() or
         * poll(...), unless some other thread becomes leader in the
         * interim.  Whenever the head of the queue is replaced with
         * an element with an earlier expiration time, the leader
         * field is invalidated by being reset to null, and some
         * waiting thread, but not necessarily the current leader, is
         * signalled.  So waiting threads must be prepared to acquire
         * and lose leadership while waiting.
         */
        private Thread leader;
    
        /**
         * Condition signalled when a newer element becomes available
         * at the head of the queue or a new thread may need to
         * become leader.
         */
        private final Condition available = lock.newCondition();
    
        /**
         * Creates a new {@code DelayQueue} that is initially empty.
         */
        public DelayQueue() {}
    
        /**
         * Creates a {@code DelayQueue} initially containing the elements of the
         * given collection of {@link Delayed} instances.
         *
         * @param c the collection of elements to initially contain
         * @throws NullPointerException if the specified collection or any
         *         of its elements are null
         */
        public DelayQueue(Collection<? extends E> c) {
            this.addAll(c);
        }

    解读:

    DelayQueue 内部使用 PriorityQueue 存放数据,使用 ReentrantLock 实现线程同步。

    重点解释一下变量 leader:

    其使用基于 Leader-Follower模式的变体,用于减少不必要的线程等待。

    当一个线程调用队列的 take 方法变为 leader 线程后,它会调用 available. awaitNanos(delay) 等待 delay 时间;其他线程(follwer线程) 则调用 available. await()进行无限等待。

    leader 线程延迟时间过期后会退出 take 方法,并通过调用 available.signal()方法唤醒一个 follwer线程,被唤醒的 follwer线程被选举为新的 leader线程。 

    Note:

    队列中的元素必须实现Delayed接口和Comparable接口,也就是说DelayQueue里面的元素必须有public void compareTo(To)和long getDelay(TimeUnit unit)方法存在。

    原因:由于每个元素都有一个过期时间,所以要实现获取当前元素还剩下多少时间就过期了的接口;由于DelayQueue底层使用优先级队列来实现,所以要实现元素之间相互比较的接口。

    Delayed接口的定义如下:

    public interface Delayed extends Comparable<Delayed> {
    
        /**
         * Returns the remaining delay associated with this object, in the
         * given time unit.
         *
         * @param unit the time unit
         * @return the remaining delay; zero or negative values indicate
         * that the delay has already elapsed
         */
        long getDelay(TimeUnit unit);
    }

    解读:

    此接口继承自Comparable接口。

    Comparable接口的定义如下:

    public interface Comparable<T> {
        public int compareTo(T o);
    }

    添加元素

    offer方法的代码如下:

        /**
         * Inserts the specified element into this delay queue.
         *
         * @param e the element to add
         * @return {@code true}
         * @throws NullPointerException if the specified element is null
         */
        public boolean offer(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                q.offer(e);
                if (q.peek() == e) {
                    leader = null;
                    available.signal();
                }
                return true;
            } finally {
                lock.unlock();
            }
        }

    解读:

    该方法在获取独占锁之后调用优先级队列的offer方法实现入队

        /**
         * Inserts the specified element into this priority queue.
         *
         * @return {@code true} (as specified by {@link Queue#offer})
         * @throws ClassCastException if the specified element cannot be
         *         compared with elements currently in this priority queue
         *         according to the priority queue's ordering
         * @throws NullPointerException if the specified element is null
         */
        public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            modCount++;
            int i = size;
            if (i >= queue.length)
                grow(i + 1);
            siftUp(i, e);
            size = i + 1;
            return true;
        }

    解读:

    如果待插入元素 e 为 null,则抛出 NullPointerException 异常。

    由于DelayQueue是无界队列,所以方法一直返回 true。

    Note:

    前述offer中 q.peek()方法返回的并不一定是当前添加的元素。

    如果 q.peek()方法返回的是 e,则说明当前元素 e将是最先过期的,于是重置 leader线程为 null,进而激活 avaliable变量对应的条件队列里的一个线程,告诉它队列里面有元素了。

    put方法的代码如下:

        /**
         * Inserts the specified element into this delay queue. As the queue is
         * unbounded this method will never block.
         *
         * @param e the element to add
         * @throws NullPointerException {@inheritDoc}
         */
        public void put(E e) {
            offer(e);
        }

    解读:

    此方法直接调用offer方法来实现。

    获取元素

    poll方法的代码如下:

        /**
         * Retrieves and removes the head of this queue, or returns {@code null}
         * if this queue has no elements with an expired delay.
         *
         * @return the head of this queue, or {@code null} if this
         *         queue has no elements with an expired delay
         */
        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                E first = q.peek();
                return (first == null || first.getDelay(NANOSECONDS) > 0)
                    ? null
                    : q.poll();
            } finally {
                lock.unlock();
            }
        }

    解读:

    如果队列里面没有过期元素,则返回null;否则返回队首元素。

    take方法的代码如下:

        /**
         * Retrieves and removes the head of this queue, waiting if necessary
         * until an element with an expired delay is available on this queue.
         *
         * @return the head of this queue
         * @throws InterruptedException {@inheritDoc}
         */
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    E first = q.peek();
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0L)
                            return q.poll();
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && q.peek() != null)
                    available.signal();
                lock.unlock();
            }
        }

    解读:

    如果队列里面没有过期元素,则等待。

    示例

    可以将延时队列DelayQueue运用在以下场景中:

      (1)缓存系统的设计:可以用DelayQueue保存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素,则表示缓存有效期到了。

      (2)定时任务调度:使用DelayQueue保存当天将要执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行任务,比如TimerQueue就是使用DelayQueue实现的。

    具体示例如下:

    (1)Student对象作为DelayQueue的元素,其必须实现Delayed接口的两个方法

    package com.test;
    
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    public class Student implements Delayed {//必须实现Delayed接口
        
        private String name;
        private long submitTime;// 交卷时间
        private long workTime;// 考试时间
    
        public Student(String name, long submitTime) {
            this.name = name;
            this.workTime = submitTime;
            this.submitTime = TimeUnit.NANOSECONDS.convert(submitTime, TimeUnit.MILLISECONDS) + System.nanoTime();
            System.out.println(this.name + " 交卷,用时" + workTime);
        }
    
        public String getName() {
            return this.name + " 交卷,用时" + workTime;
        }
        
        //必须实现getDelay方法
        public long getDelay(TimeUnit unit) {
            //返回一个延迟时间
            return unit.convert(submitTime - System.nanoTime(), unit.NANOSECONDS);
        }
    
        //必须实现compareTo方法
        public int compareTo(Delayed o) {
            Student that = (Student) o;
            return submitTime > that.submitTime ? 1 : (submitTime < that.submitTime ? -1 : 0);
        }
    
    }

    (2)主线程程序

    package com.test;
    
    import java.util.concurrent.DelayQueue;

    public class DelayQueueTest { public static void main(String[] args) throws Exception { // 新建一个等待队列 final DelayQueue<Student> bq = new DelayQueue<Student>(); for (int i = 0; i < 5; i++) { Student student = new Student("学生"+i,Math.round((Math.random()*10+i))); bq.put(student); // 将数据存到队列里! } //获取但不移除此队列的头部;如果此队列为空,则返回 null。 System.out.println(bq.peek().getName()); } }

    小结:

  • 相关阅读:
    POJ 2236 Wireless Network(并查集)
    POJ 2010 Moo University
    POJ 3614 Sunscreen(贪心,区间单点匹配)
    POJ 2184 Cow Exhibition(背包)
    POJ 1631 Bridging signals(LIS的等价表述)
    POJ 3181 Dollar Dayz(递推,两个long long)
    POJ 3046 Ant Counting(递推,和号优化)
    POJ 3280 Cheapest Palindrome(区间dp)
    POJ 3616 Milking Time(dp)
    POJ 2385 Apple Catching(01背包)
  • 原文地址:https://www.cnblogs.com/studyLog-share/p/15140333.html
Copyright © 2011-2022 走看看