zoukankan      html  css  js  c++  java
  • 【延时任务】-DelayQueue详解

     

     

    DelayQueue

    1. 使用方法

    DelayQueue中的元素要实现Delayed接口,Delayed接口又实现了 Comparable接口

    public interface Delayed extends Comparable<Delayed> {
        //用于返回还有多久到期
        long getDelay(TimeUnit unit);
    }

    public interface Comparable<T> {
        //用于入队元素的排序,对于延时队列一般让过期时间短的放在前面
        public int compareTo(T o);
    }
    class Work implements Delayed{

        String name;
        //过期时间
        long time;

        public Work(String name, long time, TimeUnit timeUnit) {
            this.name = name;
            //计算到期时间
            this.time = timeUnit.toMillis(time)+System.currentTimeMillis();
        }

        @Override
        public long getDelay(TimeUnit unit) {
            //根据给的时间单位返回剩余到期时间,注意time的保存单位,根据time选择合适的TimeUnit
            return unit.convert(time-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            //根据到期时间从小到大排序
            return Long.compare(this.time,o.getDelay(TimeUnit.MILLISECONDS));
        }
    }

    public class TestMain {
        public static void main(String[] args) throws InterruptedException {
        
            DelayQueue<Work> q=new DelayQueue<>();
            q.add(new Work("001",10,TimeUnit.SECONDS));
            q.add(new Work("002",15,TimeUnit.SECONDS));
            q.add(new Work("003",20,TimeUnit.SECONDS));

            System.out.println("begin time:" + new Date()); 
            for (int i = 0; !q.isEmpty(); i++) {
                Work take = q.take();
                if(take == null){ 
                    continue;
                }
                System.out.format("name:{%s}, time:{%s} ",take.name, new Date());
            } 
        }
    }

    2. 源码解析

    我们根据上面的调用示例来看看源码,先看看入队操作

    入队

    通过源码我们可以看到DelayQueue一共有4个入队方法,add(E),offer(E),put(E),offer(T,long,TimeUnit),但其实都是调用offer(E)的实现。

        public boolean add(E e) {
            return offer(e);
        }

        public void put(E e) {
            offer(e);
        }

        public boolean offer(E e, long timeout, TimeUnit unit) {
            return offer(e);
        }

        public boolean offer(E e) {
            /**
             * this.lock 源码为
             * private final transient ReentrantLock lock = new ReentrantLock();
             * 可重入锁
             */
            final ReentrantLock lock = this.lock;
            // 获取锁
            lock.lock();
            try {
                /**
                * q 的源码为
                 * private final PriorityQueue<E> q = new PriorityQueue<E>();
                 * 一个基于优先级的无界优先级队列
                 */
                // 把元素入队并保持顺序
                q.offer(e);
       
                //如果新入元素在队头
                if (q.peek() == e) {
                    /**
                     * leader 源码为
                     * private Thread leader = null;
                     * leader为当前正在等待队首元素的线程
                     */
                    // 把leader设为空,因为队首有更先出队的元素,之前的leader等待的就不是最先出队的元素了,
                    leader = null;
                    /**
                     * avaiable 源码为
                     * private final Condition available = lock.newCondition();
                     * availale表示某种条件,可以唤醒或阻塞线程
                     */
                    //唤醒正在等待此条件线程中的一个,这个线程必须重新再获得锁
                    available.signal();
                }
                return true;
            } finally {
                lock.unlock();
            }
        }

    出队

    出队也有4个方法,poll(),take(),poll(long,TimeUnit),peek() ,我这边主要写一下poll和take

    poll()

        /**
         * 返回并移除队首到期元素,如果队里没有到期元素则返回 null 
         * 非阻塞
         */
        public E poll() {
            
            final ReentrantLock lock = this.lock;
            //获得锁
            lock.lock();
            try {
                //获取队里第一个元素
                E first = q.peek();
                //如果第一个元素为空或者未过期则返回null
                if (first == null || first.getDelay(NANOSECONDS) > 0)
                    return null;
                else
                    return q.poll();
            } finally {
                lock.unlock();
            }
        }

    take()

        /**
         * 返回队首到期元素,如果到期元素则等待到有一个到期元素返回
         * 阻塞
      */
     public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            //获取锁
            lock.lockInterruptibly();
            try {
                for (;;) {
                    E first = q.peek();
                    if (first == null)
                        //队首元素为空则等待,会释放锁
                        available.await();
                    else {
                        //获取队首元素剩余的到期时间
                        long delay = first.getDelay(NANOSECONDS);
                        //如果过期则返回队首元素
                        if (delay <= 0)
                            return q.poll();
                        //释放first的引用,
                        // 因为在线程等待期间不释放引用,
                        // 多线程情况下这个元素可能会被其他线程弹出,而本线程还未释放引用造成内存泄露
                        first = null; // don't retain ref while waiting
                        // 如果已经有线程在等待头元素了,则本线程等待
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            // 没有线程在等待头元素,则把本线程设为等待头元素线程
                            leader = thisThread;
                            try {
           // 本地线程等待到 头元素 到期时间自动唤醒
                                available.awaitNanos(delay);
                            } finally { 
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && q.peek() != null)
                    available.signal();
                lock.unlock();
            }
        }

    3. 总结

    • DelayQueue通过全局可重入锁来实现同步
    • DelayQueue常用于定时任务
    • DelayQueue内部使用优先级队列来存储
  • 相关阅读:
    【Spring-MVC】
    【多线程】线程池关闭
    【DDD】基于事件驱动EDA -- 待完成
    【DDD】编码实战
    【Elastic Search】01- 原理
    【DDD】基于DDD的分层设计
    【DDD】Thoughtworks笔记(编码样例) -- 未完成
    【DDD】Thoughtworks笔记(目录划分、异常设计)
    平方和求余
    Factoring a Polynomial
  • 原文地址:https://www.cnblogs.com/A-yes/p/13397788.html
Copyright © 2011-2022 走看看