zoukankan      html  css  js  c++  java
  • JUC源码分析-集合篇(七)PriorityBlockingQueue

    JUC源码分析-集合篇(七)PriorityBlockingQueue

    PriorityBlockingQueue 是带优先级的无界阻塞队列,每次出队都返回优先级最高的元素,是二叉树最小堆的实现。 PriorityBlockingQueue 数据结构和 PriorityQueue 一致,而线程安全性使用的是 ReentrantLock。

    1. 基本属性

    // 最大可分配队列容量 Integer.MAX_VALUE - 8,减 8 是因为有的 VM 实现在数组头有些内容
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    // 默认队列容量11,这里不是 HashMap,不需要 hash 取余,因此不必是 2^n
    private static final int DEFAULT_INITIAL_CAPACITY = 11;
    
    // 数组结构,是二叉树最小堆的实现
    private transient Object[] queue;
    private transient int size;
    

    PriorityBlockingQueue 使用 ReentrantLock 保证数据安全性,数据结构使用的是数组。PriorityBlockingQueue 数组的结构和 PriorityQueue 一致,是基于平衡二叉堆实现,父节点下标是 n,左节点则是 2n + 1,右节点是 2n + 2。queue[0] 永远都是最小的元素。

    public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }
    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }
    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }
    

    2. 入队 offer

    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        // 1. 扩容
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            // 2. 将节点 e 插入数据 array 的第 n 个位置
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }
    

    代码很简单,添加元素时 offer 做了两件事:一是判断是否需要扩容(tryGrow),二是将元素 e 插入到数组中(siftUpComparable)。先看一下如何进行扩容的,至于元素添加在 poll 时再一起分析。

    // 集合中元素个数 size>=queue.length 则进行扩容
    while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
    
    // tryGrow 最终只有一个线程能扩容成功,其它线程通过 while 自旋检查当前扩容是否完毕
    private void tryGrow(Object[] array, int oldCap) {
        // 1. 释放锁,这样在数组扩容期间其它线程可以正常出队
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        // 2. allocationSpinLock 是数组扩容的独占锁
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                // oldGap<64则扩容新增oldcap+2,否者扩容50%,并且最大为MAX_ARRAY_SIZE
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                // 3.1 如果 oldCap=MAX_ARRAY_SIZE 则 newCap 就会变成负数
                // 3.2 如果 queue 已经改变,则有其它线程已经完成扩容 ok
                //     线程1已经完成扩容,线程2执行到这里时 queue=newArray
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        // 4. 线程1 cas 成功后,线程2会进入这个地方,然后线程2让出 cpu
        //    尽量让线程1执行下面代码获取锁,但是这得不到肯定的保证。
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
    
        // 5. 重新获取锁,只有一个线程可以最终完成数组的扩容。
        //    cas 只进行了数组的初始化,即 newArray=new Object[newCap],可能有多个线程都成功了
        lock.lock();
        // 6. 数组元素拷贝到新数组中,完成扩容。可能有多个线程都初始化了 newArray
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }
    

    tryGrow 目的是扩容,这里要思考下为啥在扩容前要先释放锁,然后使用 cas 控制只有一个线程可以扩容成功呢?

    其实这里不先释放锁也是可以的,也就是在整个扩容期间一直持有锁,但是扩容是需要花时间的,如果扩容的时候还占用锁,那么其他线程在这个时候是不能进行出队和入队操作的,这大大降低了并发性。

    所以在扩容前释放锁,这允许其他出队线程可以进行出队操作,但是由于释放了锁,所以也允许在扩容时候进行入队操作,这就会导致多个线程进行扩容会出现问题,所以这里使用了一个 spinlock 用 cas 控 制只有一个线程可以进行扩容,失败的线程调用 Thread.yield() 让出 cpu,目的意在让扩容线程扩容后优先调用 lock.lock 重新获取锁,但是这得不到一定的保证,有可能调用 Thread.yield() 的线程先获取了锁。如果这时候扩容线程还没扩容完毕,其他线程是通过自旋检查当前扩容是否完毕。

    那 copy 元素数据到新数组为啥放到获取锁后面那?原因应该是因为可见性问题,因为 queue 并没有被 volatile 修饰。另外有可能在扩容时候进行了出队操作,如果直接拷贝可能看到的数组元素不是最新的。而通过调用 Lock 后,获取的数组则是最新的,并且在释放锁前 数组内容不会变化。

    3. 出队 poll

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    // 出队
    private E dequeue() {
        int n = size - 1;
        // 1. 没有元素直接返回 null
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            // 2. array[0] 永远都是最小的元素
            E result = (E) array[0];
            E x = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            // 3. 因为 array[0] 已经出队,现在需要将元素 array[n] 插入到 0 这个位置
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            // 4. 返回 array[0]
            return result;
        }
    }
    

    4. 数据结构

    // 数组结构,是二叉树最小堆的实现,array[0] 永远是优先级最高的元素
    private transient Object[] queue;
    
    // offer 时将元素 e 插入到节点 n 位置
    siftUpComparable(n, e, array);
    // poll 时将最后一个元素 array[n] 插入到 0 位置
    siftDownComparable(0, x, array, n);
    

    (1) siftUpComparable

    // 将元素 x 插入数据 array 的第 k 个位置
    private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (key.compareTo((T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }
    

    这个排序看过去有些奇怪,怎么有 parent,并且下标是 (k-1)>>>1 呢?其实这里的操作就反应了优先队列的真正数据结构,其实际上是一个二叉树,将二叉树存储在数组之中而已。根节点就是数组的 0 位。下图给出其具体结构:

    最小堆二叉树

    PriorityQueue 是一个完全二叉树,且不允许出现 null 节点,其父节点都比叶子节点小,这个是堆排序中的小顶堆。二叉树存入数组的方式很简单,就是从上到下,从左到右。完全二叉树可以和数组中的位置一一对应:

    • 左叶子节点 = 父节点下标 * 2 + 1
    • 右叶子节点 = 父节点下标 * 2 + 2
    • 父节点 = (叶子节点 - 1) / 2

    现在在看 siftUpComparable 代码就轻松多了,实际上就是将要插入的元素 x 和它的父节点做对比,如果比父节点大就一直向上移动。因为比较后元素是在向上移动,所以叫 siftUpComparable

    (2) siftDownComparable

    // 将元素 x 插入数据 array 的第 k 个位置,n 表示当前数组的最后一个位置
    private static <T> void siftDownComparable(int k, T x, Object[] array, int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) {
                int child = (k << 1) + 1; // assume left child is least
                Object c = array[child];
                int right = child + 1;
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];
                if (key.compareTo((T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;
        }
    }
    

    siftDownComparable(0, x, array, n) 将元素 x 和第 0 位的左右子节点进行比较,如果 x 大于这两个子节点则向下移动,小的子节点则上移。这样 array[0] 又变成最小的值了。

    参考:

    1. 并发队列-无界阻塞优先级队列PriorityBlockingQueue原理探究
    2. Java之集合(六)PriorityQueue

    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    TWaver网元动态转动效果
    替换TWaver中Tree展开合并图标
    MOSS 2010:Visual Studio 2010开发体验(10)——列表开发之内容类型
    MOSS 2010:Visual Studio 2010开发体验(8)——Silverlight应用
    MOSS 2010:Visual Studio 2010开发体验(13)——列表开发之列表实例
    MOSS 2010:Visual Studio 2010开发体验(5)——Mapped Folder
    MOSS 2010:Visual Studio 2010开发体验(6)——开发WebPart
    MOSS 2010:Visual Studio 2010开发体验(7)——AJAX Web Part
    MOSS 2010:Visual Studio 2010开发体验(11)——扩展SharePoint Explorer
    MOSS 2010:Visual Studio 2010开发体验(12)——列表开发之列表定义
  • 原文地址:https://www.cnblogs.com/binarylei/p/10926062.html
Copyright © 2011-2022 走看看