zoukankan      html  css  js  c++  java
  • java延迟队列DelayQueue及底层优先队列PriorityQueue实现原理源码详解

      DelayQueue是基于java中一个非常牛逼的队列PriorityQueue(优先队列),PriorityQueue是java1.5新加入的,当我看到Doug Lea大神的署名之后,我就知道这个队列不简单,那我们先来看一下他的源码吧:

    作为一个队列来说,最基础的就是新增和查询,首先我们看下入队的逻辑:

    1.入队

    PriorityQueue提供了offer方法新增元素(add方法其实也是offer实现的),我们直接看下源码:

    public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            modCount++;
            int i = size;
            if (i >= queue.length)
                grow(i + 1);
            size = i + 1;
            if (i == 0)
                queue[0] = e;
            else
                siftUp(i, e);
            return true;
        }

    offer方法首先判断是否需要扩容,若需要则走grow方法:

    private void grow(int minCapacity) {
            int oldCapacity = queue.length;
            // Double size if small; else grow by 50%
            int newCapacity = oldCapacity + ((oldCapacity < 64) ?
                                             (oldCapacity + 2) :
                                             (oldCapacity >> 1));
            // overflow-conscious code
            if (newCapacity - MAX_ARRAY_SIZE > 0)
                newCapacity = hugeCapacity(minCapacity);
            queue = Arrays.copyOf(queue, newCapacity);
        }

    当长度小于64,扩容一倍+2,否则扩容50%。

    再往下看若队列中没有元素,直接复制下标为0的元素,否则调用siftUp方法:

    private void siftUp(int k, E x) {
            if (comparator != null)
                siftUpUsingComparator(k, x);
            else
                siftUpComparable(k, x);
        }

    俩方法差不多一个,具体可搜compare和compareTo的区别如:https://blog.csdn.net/fly910905/article/details/81670353,我们直接看siftUpComparable方法:

    private void siftUpComparable(int k, E x) {
            Comparable<? super E> key = (Comparable<? super E>) x;
            while (k > 0) {
                int parent = (k - 1) >>> 1;
                Object e = queue[parent];
                if (key.compareTo((E) e) >= 0)
                    break;
                queue[k] = e;
                k = parent;
            }
            queue[k] = key;
        }

         

     结合上图和代码可以看出每个节点新增时,首先会根据节点下标计算出当前新节点应该属于的节点的父节点,比较当小于父节点则交换,无限循环,知道不存在父节点或者当前节点大于父节点的值,这样可以保证每个节点都比起子节点要小。

    2.出队

    入队的时候基本都差不多,但出队却有好几种,我们首先看peek方法:

    public E peek() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return q.peek();
            } finally {
                lock.unlock();
            }
        }
    public E peek() {
            return (size == 0) ? null : (E) queue[0];
        }

    代码简洁明了,就是查询出第一个,这只能算查询,算不上出队,我觉得应该叫点名。

    再看poll方法:

    public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                E first = q.peek();  //取第一个节点
                if (first == null || first.getDelay(NANOSECONDS) > 0)  //节点为空或者首节点未到延时时间直接返回null
                    return null;
                else
                    return q.poll();  //PriorityQueue取节点逻辑
            } finally {
                lock.unlock();
            }
        }

    再看PriorityQueue.poll方法:

    public E poll() {
            if (size == 0)
                return null;
            int s = --size;
            modCount++;
            E result = (E) queue[0];
            E x = (E) queue[s];
            queue[s] = null;
            if (s != 0)
                siftDown(0, x);
            return result;
        }

    首先取出第一个节点,然后将最后一个节点放替换首节点,并与子节点对比找出最小的并替换直到当前节点为最小为止,具体替换流程见siftDown代码:

    private void siftDown(int k, E x) {
            if (comparator != null)
                siftDownUsingComparator(k, x);
            else
                siftDownComparable(k, x);
        }
    private void siftDownComparable(int k, E x) {
            Comparable<? super E> key = (Comparable<? super E>)x;
            int half = size >>> 1;        // loop while a non-leaf
            while (k < half) {
                int child = (k << 1) + 1; // assume left child is least  拿到左子节点下标
                Object c = queue[child]; 
                int right = child + 1;  //右子节点下标
                if (right < size &&
                    ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
                    c = queue[child = right];  //取出左右节点较小的
                if (key.compareTo((E) c) <= 0)  //当前节点比子节点小,结束流程
                    break;
                queue[k] = c;   //替换子节点至父节点
                k = child;
            }
            queue[k] = key;
        }

    这个代码看起来稍微复杂点,会首先拿到左子节点和右子节点,对比取出较小的节点后与当前节点对比,将小的放在父节点位置,其实这里也是保证替换后的节点依然保持每个父节点最小,符合小顶堆。具体流程如下图所示:

    我们最后看下take方法的实现:

    public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    E first = q.peek();  //取出第一个节点
                    if (first == null)  //首节点为空说明队列为空,await等待
                        available.await();
                    else {              //说明队列中有节点
                        long delay = first.getDelay(NANOSECONDS);   //获取首节点延时时间
                        if (delay <= 0)         //延时时间到期,直接取
                            return q.poll();
                        first = null; // don't retain ref while waiting 
                        if (leader != null)    //说明当前有其他线程在操作(一般是其他线程在await)
                            available.await();
                        else {  //这里设置操作线程为自己,并等待延时时间时长
                            Thread thisThread = Thread.currentThread(); 
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && q.peek() != null)
                    available.signal();
                lock.unlock();
            }
        }

    这个实现一看就是阻塞式等待,取不到不罢休系列。

    总结:

      这篇写的还是比较简单的,大体介绍了DelayQueue的实现,也从底层了解了小顶堆PriorityQueue的实现,算是补充了之前对延时队列的具体实现,这篇主要是通过一个小顶堆的实现,保证每次取得值都是最小的,而又不用像数组那样每次插入都要重新排序,这里只要排序一部分就可以,也保证了性能,而DelayQueue中,加入了ReentrantLock保证了多线程的线程安全,同时加入Condition实现了延时阻塞式存取的机制,jdk的代码还是牛,这里其实就是我之前写锁的时候介绍的等待通知模式的一种实现,结合起来看还是有一些收获的。

  • 相关阅读:
    修改sqlserver2008中表的schema
    MongoDB学习笔记06
    MongoDB学习笔记05
    MongoDB学习笔记04
    在IIS Express中调试时无法读取配置文件
    Spring 集成Redis
    Java操作Redis(代码演示)
    Redis的一些常用命令操作
    Redis安装步骤
    如何彻底删除电脑安装的软件程序?
  • 原文地址:https://www.cnblogs.com/gmt-hao/p/14417541.html
Copyright © 2011-2022 走看看