zoukankan      html  css  js  c++  java
  • java并发:阻塞队列之DelayQueue

    延时队列

    DelayQueue是一个支持延时获取元素的使用优先级队列实现的无界的阻塞队列。

    在创建元素时可以指定多久才能从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。

    类图如下:

    DelayQueue的定义以及构造函数如下:

    public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
        implements BlockingQueue<E> {
    
        private final transient ReentrantLock lock = new ReentrantLock();
        private final PriorityQueue<E> q = new PriorityQueue<E>();
    
        /**
         * Thread designated to wait for the element at the head of
         * the queue.  This variant of the Leader-Follower pattern
         * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
         * minimize unnecessary timed waiting.  When a thread becomes
         * the leader, it waits only for the next delay to elapse, but
         * other threads await indefinitely.  The leader thread must
         * signal some other thread before returning from take() or
         * poll(...), unless some other thread becomes leader in the
         * interim.  Whenever the head of the queue is replaced with
         * an element with an earlier expiration time, the leader
         * field is invalidated by being reset to null, and some
         * waiting thread, but not necessarily the current leader, is
         * signalled.  So waiting threads must be prepared to acquire
         * and lose leadership while waiting.
         */
        private Thread leader;
    
        /**
         * Condition signalled when a newer element becomes available
         * at the head of the queue or a new thread may need to
         * become leader.
         */
        private final Condition available = lock.newCondition();
    
        /**
         * Creates a new {@code DelayQueue} that is initially empty.
         */
        public DelayQueue() {}
    
        /**
         * Creates a {@code DelayQueue} initially containing the elements of the
         * given collection of {@link Delayed} instances.
         *
         * @param c the collection of elements to initially contain
         * @throws NullPointerException if the specified collection or any
         *         of its elements are null
         */
        public DelayQueue(Collection<? extends E> c) {
            this.addAll(c);
        }

    解读:

    DelayQueue 内部使用 PriorityQueue 存放数据,使用 ReentrantLock 实现线程同步。

    重点解释一下变量 leader:

    其使用基于 Leader-Follower模式的变体,用于减少不必要的线程等待。

    当一个线程调用队列的 take 方法变为 leader 线程后,它会调用 available. awaitNanos(delay) 等待 delay 时间;其他线程(follwer线程) 则调用 available. await()进行无限等待。

    leader 线程延迟时间过期后会退出 take 方法,并通过调用 available.signal()方法唤醒一个 follwer线程,被唤醒的 follwer线程被选举为新的 leader线程。 

    Note:

    队列中的元素必须实现Delayed接口和Comparable接口,也就是说DelayQueue里面的元素必须有public void compareTo(To)和long getDelay(TimeUnit unit)方法存在。

    原因:由于每个元素都有一个过期时间,所以要实现获取当前元素还剩下多少时间就过期了的接口;由于DelayQueue底层使用优先级队列来实现,所以要实现元素之间相互比较的接口。

    Delayed接口的定义如下:

    public interface Delayed extends Comparable<Delayed> {
    
        /**
         * Returns the remaining delay associated with this object, in the
         * given time unit.
         *
         * @param unit the time unit
         * @return the remaining delay; zero or negative values indicate
         * that the delay has already elapsed
         */
        long getDelay(TimeUnit unit);
    }

    解读:

    此接口继承自Comparable接口。

    Comparable接口的定义如下:

    public interface Comparable<T> {
        public int compareTo(T o);
    }

    添加元素

    offer方法的代码如下:

        /**
         * Inserts the specified element into this delay queue.
         *
         * @param e the element to add
         * @return {@code true}
         * @throws NullPointerException if the specified element is null
         */
        public boolean offer(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                q.offer(e);
                if (q.peek() == e) {
                    leader = null;
                    available.signal();
                }
                return true;
            } finally {
                lock.unlock();
            }
        }

    解读:

    该方法在获取独占锁之后调用优先级队列的offer方法实现入队

        /**
         * Inserts the specified element into this priority queue.
         *
         * @return {@code true} (as specified by {@link Queue#offer})
         * @throws ClassCastException if the specified element cannot be
         *         compared with elements currently in this priority queue
         *         according to the priority queue's ordering
         * @throws NullPointerException if the specified element is null
         */
        public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            modCount++;
            int i = size;
            if (i >= queue.length)
                grow(i + 1);
            siftUp(i, e);
            size = i + 1;
            return true;
        }

    解读:

    如果待插入元素 e 为 null,则抛出 NullPointerException 异常。

    由于DelayQueue是无界队列,所以方法一直返回 true。

    Note:

    前述offer中 q.peek()方法返回的并不一定是当前添加的元素。

    如果 q.peek()方法返回的是 e,则说明当前元素 e将是最先过期的,于是重置 leader线程为 null,进而激活 avaliable变量对应的条件队列里的一个线程,告诉它队列里面有元素了。

    put方法的代码如下:

        /**
         * Inserts the specified element into this delay queue. As the queue is
         * unbounded this method will never block.
         *
         * @param e the element to add
         * @throws NullPointerException {@inheritDoc}
         */
        public void put(E e) {
            offer(e);
        }

    解读:

    此方法直接调用offer方法来实现。

    获取元素

    poll方法的代码如下:

        /**
         * Retrieves and removes the head of this queue, or returns {@code null}
         * if this queue has no elements with an expired delay.
         *
         * @return the head of this queue, or {@code null} if this
         *         queue has no elements with an expired delay
         */
        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                E first = q.peek();
                return (first == null || first.getDelay(NANOSECONDS) > 0)
                    ? null
                    : q.poll();
            } finally {
                lock.unlock();
            }
        }

    解读:

    如果队列里面没有过期元素,则返回null;否则返回队首元素。

    take方法的代码如下:

        /**
         * Retrieves and removes the head of this queue, waiting if necessary
         * until an element with an expired delay is available on this queue.
         *
         * @return the head of this queue
         * @throws InterruptedException {@inheritDoc}
         */
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    E first = q.peek();
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0L)
                            return q.poll();
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && q.peek() != null)
                    available.signal();
                lock.unlock();
            }
        }

    解读:

    如果队列里面没有过期元素,则等待。

    示例

    可以将延时队列DelayQueue运用在以下场景中:

      (1)缓存系统的设计:可以用DelayQueue保存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素,则表示缓存有效期到了。

      (2)定时任务调度:使用DelayQueue保存当天将要执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行任务,比如TimerQueue就是使用DelayQueue实现的。

    具体示例如下:

    (1)Student对象作为DelayQueue的元素,其必须实现Delayed接口的两个方法

    package com.test;
    
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    public class Student implements Delayed {//必须实现Delayed接口
        
        private String name;
        private long submitTime;// 交卷时间
        private long workTime;// 考试时间
    
        public Student(String name, long submitTime) {
            this.name = name;
            this.workTime = submitTime;
            this.submitTime = TimeUnit.NANOSECONDS.convert(submitTime, TimeUnit.MILLISECONDS) + System.nanoTime();
            System.out.println(this.name + " 交卷,用时" + workTime);
        }
    
        public String getName() {
            return this.name + " 交卷,用时" + workTime;
        }
        
        //必须实现getDelay方法
        public long getDelay(TimeUnit unit) {
            //返回一个延迟时间
            return unit.convert(submitTime - System.nanoTime(), unit.NANOSECONDS);
        }
    
        //必须实现compareTo方法
        public int compareTo(Delayed o) {
            Student that = (Student) o;
            return submitTime > that.submitTime ? 1 : (submitTime < that.submitTime ? -1 : 0);
        }
    
    }

    (2)主线程程序

    package com.test;
    
    import java.util.concurrent.DelayQueue;

    public class DelayQueueTest { public static void main(String[] args) throws Exception { // 新建一个等待队列 final DelayQueue<Student> bq = new DelayQueue<Student>(); for (int i = 0; i < 5; i++) { Student student = new Student("学生"+i,Math.round((Math.random()*10+i))); bq.put(student); // 将数据存到队列里! } //获取但不移除此队列的头部;如果此队列为空,则返回 null。 System.out.println(bq.peek().getName()); } }

    小结:

  • 相关阅读:
    关于viewports 设备像素比 密度
    脚本检测 media query 分界点
    chrome Web开放 字体格式不能显示问题
    响应式图片
    ECMAScript 6 proxies
    大小不固定 文字图片居中
    prototype
    基于综合服务平台浅谈Sass应用
    Sass浅谈
    JQ怎么获取margin-left的值
  • 原文地址:https://www.cnblogs.com/studyLog-share/p/15140333.html
Copyright © 2011-2022 走看看