zoukankan      html  css  js  c++  java
  • 【Java并发编程】19、DelayQueue源码分析

    DelayQueue,带有延迟元素的线程安全队列,当非阻塞从队列中获取元素时,返回最早达到延迟时间的元素,或空(没有元素达到延迟时间)。DelayQueue的泛型参数需要实现Delayed接口,Delayed接口继承了Comparable接口,DelayQueue内部使用非线程安全的优先队列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待时间。DelayQueue不允许包含null元素。

    领导者/追随者模式是多个工作线程轮流获得事件源集合,轮流监听、分发并处理事件的一种模式。在任意时间点,程序都仅有一个领导者线程,它负责监听IO事件。而其他线程都是追随者,它们休眠在线程池中等待成为新的领导者。当前的领导者如果检测到IO事件,首先要从线程池中推选出新的领导者线程,然后处理IO事件。此时,新的领导者等待新的IO事件,而原来的领导者则处理IO事件,二者实现了并发。

     简单理解,就是最多只有一个线程在处理,其他线程在睡眠。在DelayQueue的实现中,Leader/Followers模式用于等待队首的第一个元素。

    类定义及参数:

    复制代码
    public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
        implements BlockingQueue<E> {
        /** 重入锁,实现线程安全 */
        private final transient ReentrantLock lock = new ReentrantLock();
        /** 使用优先队列实现 */
        private final PriorityQueue<E> q = new PriorityQueue<E>();
    
        /** Leader/Followers模式 */
        private Thread leader = null;
    
        /** 条件对象,当新元素到达,或新线程可能需要成为leader时被通知 */
        private final Condition available = lock.newCondition();
    复制代码

      构造函数:

    复制代码
        /**
         * 默认构造,得到空的延迟队列
         */
        public DelayQueue() {}
    
        /**
         * 构造延迟队列,初始包含c中的元素
         *
         * @param c 初始包含的元素集合
         * @throws NullPointerException 当集合或集合任一元素为空时抛出空指针错误
         */
        public DelayQueue(Collection<? extends E> c) {
            this.addAll(c);
        }
    复制代码

      add方法:

    复制代码
        /**
         * 向延迟队列插入元素
         *
         * @param e 要插入的元素
         * @return true
         * @throws NullPointerException 元素为空,抛出空指针错误
         */
        public boolean add(E e) {
            // 直接调用offer并返回
            return offer(e);
        }
    复制代码

      offer方法:

    复制代码
        /**
         * 向延迟队列插入元素
         *
         * @param e 要插入的元素
         * @return true
         * @throws NullPointerException 元素为空,抛出空指针错误
         */
        public boolean offer(E e) {
            final ReentrantLock lock = this.lock;
            // 获得锁
            lock.lock();
            try {
                // 向优先队列插入元素
                q.offer(e);
                // 若在此之前队列为空,则置空leader,并通知条件对象,需要结合take方法看
                if (q.peek() == e) {
                    leader = null;
                    available.signal();
                }
                return true;
            } finally {
                // 释放锁
                lock.unlock();
            }
        }
    复制代码

      put方法:

    复制代码
        /**
         * 向延迟队列插入元素. 因为队列是无界的,所以不会阻塞。
         *
         * @param e 要插入的元素
         * @throws NullPointerException 元素为空,抛出空指针错误
         */
        public void put(E e) {
            offer(e);
        }
    复制代码

      带超时的offer方法:

    复制代码
        /**
         * 向延迟队列插入元素. 因为队列是无界的,所以不会阻塞,因此,直接调用offer方法并返回
         *
         * @param e 要插入的元素
         * @param timeout 不会阻塞,忽略
         * @param unit 不会阻塞,忽略
         * @return true
         * @throws NullPointerException 元素为空,抛出空指针错误
         */
        public boolean offer(E e, long timeout, TimeUnit unit) {
            // 直接调用offer方法并返回
            return offer(e);
        }
    复制代码

      poll方法:

    复制代码
        /**
         * 获取并移除队首的元素, 或者返回null(如果队列不包含到达延迟时间的元素)
         *
         * @return 队首的元素, 或者null(如果队列不包含到达延迟时间的元素)
         */
        public E poll() {
            final ReentrantLock lock = this.lock;
            // 获得锁
            lock.lock();
            try {
                // 获取优先队列队首元素
                E first = q.peek();
                // 若优先队列队首元素为空,或者还没达到延迟时间,返回null
                if (first == null || first.getDelay(NANOSECONDS) > 0)
                    return null;
                // 否则,返回并移除队首元素
                else
                    return q.poll();
            } finally {
                // 释放锁
                lock.unlock();
            }
        }
    复制代码

      take方法(重要):

    复制代码
        /**
         * 获取并移除队首元素,该方法将阻塞,直到队列中包含达到延迟时间的元素
         *
         * @return 队首元素
         * @throws InterruptedException 阻塞时被打断,抛出打断异常
         */
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            // 获得锁,该锁可被打断
            lock.lockInterruptibly();
            try {
                // 循环处理
                for (;;) {
                    // 获取队首元素
                    E first = q.peek();
                    // 若元素为空,等待条件,在offer方法中会调用条件对象的通知方法
                    // 并重新进入循环
                    if (first == null)
                        available.await();
                    // 若元素不为空
                    else {
                        // 获取延迟时间
                        long delay = first.getDelay(NANOSECONDS);
                        // 若达到延迟时间,返回并移除队首元素
                        if (delay <= 0)
                            return q.poll();
                        // 否则,需要进入等待
                        first = null; // 在等待时,不持有引用
                        // 若leader不为空,等待条件
                        if (leader != null)
                            available.await();
                        // 否则,设置leader为当前线程,并超时等待延迟时间
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                // 通知其他线程条件得到满足
                if (leader == null && q.peek() != null)
                    available.signal();
                 // 释放锁
                lock.unlock();
            }
        }
    复制代码

      带超时的poll方法(重要):

    复制代码
        /**
         * 获取并移除队首元素,该方法将阻塞,直到队列中包含达到延迟时间的元素或超时
         *
         * @return 队首元素,或者null
         * @throws InterruptedException 阻塞等待时被打断,抛出打断异常*/
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    E first = q.peek();
                    if (first == null) {
                        if (nanos <= 0)
                            return null;
                        else
                            nanos = available.awaitNanos(nanos);
                    } else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return q.poll();
                        if (nanos <= 0)
                            return null;
                        first = null; // don't retain ref while waiting
                        if (nanos < delay || leader != null)
                            nanos = available.awaitNanos(nanos);
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                long timeLeft = available.awaitNanos(delay);
                                nanos -= delay - timeLeft;
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && q.peek() != null)
                    available.signal();
                lock.unlock();
            }
        }
    复制代码

      peek方法:

    复制代码
        /**
         * 获取但不移除队首元素,或返回null(如果队列为空)。和poll方法不同,
         * 若队列不为空,该方法换回队首元素,不论是否达到延迟时间
         *
         * @return 队首元素,或null(如果队列为空)
         */
        public E peek() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return q.peek();
            } finally {
                lock.unlock();
            }
        }
    复制代码

    出处:

    https://www.cnblogs.com/enumhack/p/7472873.html

    https://www.cnblogs.com/wanly3643/p/3944661.html

    jdk源码

            

  • 相关阅读:
    使用jquery.js写可增行删行可编辑的table
    当Ext.js中xtype: 'checkboxfield'时,没勾选则向后台发送的数据没有字段的解决方法
    沉迷js不能自拔~
    kubernetes concepts -- Pod Overview
    kubernetes concepts (一)
    所有锁的unlock要放到try{}finally{}里,不然发生异常返回就丢了unlock了
    Java故障定位方法总结
    年度计划
    minikube 设置CPU和内存
    Ubuntu 设置中文输入法
  • 原文地址:https://www.cnblogs.com/wangzhongqiu/p/8529001.html
Copyright © 2011-2022 走看看