zoukankan      html  css  js  c++  java
  • 【Java多线程】PriorityBlockingQueue源码分析 (二十五)

      阅读本文前,请先了解AQS,阻塞队列,优先级队列,最小堆数据结构

      参考:【Java多线程】队列同步器AQS(十一)【Java多线程】ArrayBlockingQueue阻塞队列原理分析(十六)【Java】PriorityQueue 的实现原理 

    一、PriorityBlockingQueue介绍

      一个无界的blocking queue使用与PriorityQueue类相同的排序规则,并提供阻塞检索操作。 虽然这个队列在逻辑上是无界的,但由于资源耗尽,尝试的添加可能会失败(导致OutOfMemoryError )。 这个类不允许null元素。 根据natural ordering的优先级队列也不允许插入不可比较的对象(这样做在ClassCastException )。

    二、属性

     1 // 默认队列容量
     2 private static final int DEFAULT_INITIAL_CAPACITY = 11;
     3 
     4 // 最大集合容量
     5 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
     6 
     7 // 存储数据的数组
     8 private transient Object[] queue;
     9 
    10 // 集合大小
    11 private transient int size;
    12 
    13 // 比较器
    14 private transient Comparator<? super E> comparator;
    15 
    16 // 全局锁
    17 private final ReentrantLock lock;
    18 
    19 // 非空条件
    20 private final Condition notEmpty;
    21 
    22 // 自旋锁标识
    23 private transient volatile int allocationSpinLock;
    24 
    25 // 优先级队列
    26 private PriorityQueue<E> q;

    三、方法

    1、构造方法

     1 // 创建默认阻塞式优先级队列,已经初始化了数据数组
     2 public PriorityBlockingQueue() {
     3     this(DEFAULT_INITIAL_CAPACITY, null);
     4 }
     5 
     6 // 根据初始值容量创建阻塞式优先级队列
     7 public PriorityBlockingQueue(int initialCapacity) {
     8     this(initialCapacity, null);
     9 }
    10 
    11 // 根据创建比较器,默认容量为11,阻塞式优先级队列
    12 public PriorityBlockingQueue(int initialCapacity,
    13                              Comparator<? super E> comparator) {
    14     if (initialCapacity < 1)
    15         throw new IllegalArgumentException();
    16     this.lock = new ReentrantLock();
    17     this.notEmpty = lock.newCondition();
    18     this.comparator = comparator;
    19     this.queue = new Object[initialCapacity];
    20 }
    21 
    22 // 根据集合创建阻塞式优先级队列
    23 public PriorityBlockingQueue(Collection<? extends E> c) {
    24     this.lock = new ReentrantLock();
    25     this.notEmpty = lock.newCondition();
    26     boolean heapify = true; // true if not known to be in heap order
    27     boolean screen = true;  // true if must screen for nulls
    28     if (c instanceof SortedSet<?>) {
    29         SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
    30         this.comparator = (Comparator<? super E>) ss.comparator();
    31         heapify = false;
    32     }
    33     else if (c instanceof PriorityBlockingQueue<?>) {
    34         PriorityBlockingQueue<? extends E> pq =
    35             (PriorityBlockingQueue<? extends E>) c;
    36         this.comparator = (Comparator<? super E>) pq.comparator();
    37         screen = false;
    38         if (pq.getClass() == PriorityBlockingQueue.class) // exact match
    39             heapify = false;
    40     }
    41     Object[] a = c.toArray();
    42     int n = a.length;
    43     // If c.toArray incorrectly doesn't return Object[], copy it.
    44     if (a.getClass() != Object[].class)
    45         a = Arrays.copyOf(a, n, Object[].class);
    46     if (screen && (n == 1 || this.comparator != null)) {
    47         for (int i = 0; i < n; ++i)
    48             if (a[i] == null)
    49                 throw new NullPointerException();
    50     }
    51     this.queue = a;
    52     this.size = n;
    53     if (heapify)
    54         heapify();
    55 }

    2、offer() 方法

     1 // 放入元素到队列中 
     2 public boolean offer(E e) {
     3     if (e == null)
     4         throw new NullPointerException();
     5     final ReentrantLock lock = this.lock;
     6     // 获取锁
     7     lock.lock();
     8     int n, cap;
     9     Object[] array;
    10     // 集合容量小于等于存储数据数组长度时,进行扩容
    11     while ((n = size) >= (cap = (array = queue).length))
    12         // 扩容
    13         tryGrow(array, cap);
    14 
    15     // PriorityBlockingQueue 的数据结构是最小堆,与PriorityQueue结构相同
    16     // 最小堆:queue[0],总是最小
    17     try {
    18         Comparator<? super E> cmp = comparator;
    19         // 使用自然(自身)排序上浮
    20         if (cmp == null)
    21             siftUpComparable(n, e, array);
    22         // 使用定制比较器排序上浮
    23         else
    24             siftUpUsingComparator(n, e, array, cmp);
    25         // 队列大小+1
    26         size = n + 1;
    27         // 唤醒消费线程
    28         notEmpty.signal();
    29     } finally {
    30         // 释放锁
    31         lock.unlock();
    32     }
    33     return true;
    34 }

    3、tryGrow() 方法

     1 private void tryGrow(Object[] array, int oldCap) {
     2     // 释放锁
     3     lock.unlock(); // must release and then re-acquire main lock
     4     Object[] newArray = null;
     5     
     6     // CAS获取自旋锁
     7     if (allocationSpinLock == 0 &&
     8         UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
     9                                  0, 1)) {
    10         try {
    11             // 情况1、原容量小于 64,新容量为原容量的2倍+2
    12             // 情况2、原容量不小于 64,新容量为原容量的1.5倍
    13             int newCap = oldCap + ((oldCap < 64) ?
    14                                    (oldCap + 2) : // grow faster if small
    15                                    (oldCap >> 1));
    16             if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
    17                 int minCap = oldCap + 1;
    18                 if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
    19                     throw new OutOfMemoryError();
    20                 newCap = MAX_ARRAY_SIZE;
    21             }
    22             // 创建新数组
    23             if (newCap > oldCap && queue == array)
    24                 newArray = new Object[newCap];
    25         } finally {
    26             // 释放自旋锁
    27             allocationSpinLock = 0;
    28         }
    29     }
    30 
    31     // 扩容失败,返回,可能是其他线程在扩容
    32     if (newArray == null) // back off if another thread is allocating
    33         Thread.yield();
    34     // 获取锁
    35     lock.lock();
    36 
    37     // 复制数据到新数组
    38     if (newArray != null && queue == array) {
    39         queue = newArray;
    40         System.arraycopy(array, 0, newArray, 0, oldCap);
    41     }
    42 }

    4、take() 方法

     1 public E take() throws InterruptedException {
     2     final ReentrantLock lock = this.lock;
     3     // 获取可以被打断的锁
     4     lock.lockInterruptibly();
     5     E result;
     6     try {
     7         // 获取队列元素
     8         while ( (result = dequeue()) == null)
     9             // 未获取到,进行条件等待
    10             notEmpty.await();
    11     } finally {
    12         // 释放锁
    13         lock.unlock();
    14     }
    15     return result;
    16 }
    17 
    18 // 出队列
    19 private E dequeue() {
    20     
    21     int n = size - 1;
    22     // 判断队列是否存在元素
    23     if (n < 0)
    24         return null;
    25     else {
    26         Object[] array = queue;
    27         // 最小堆,取出array[0],最小值
    28         E result = (E) array[0];
    29         E x = (E) array[n];
    30         array[n] = null;
    31         Comparator<? super E> cmp = comparator;
    32         // 取出顶元素之后,进行下浮
    33         if (cmp == null)
    34             siftDownComparable(0, x, array, n);
    35         else
    36             siftDownUsingComparator(0, x, array, n, cmp);
    37         // 修改队列大小
    38         size = n;
    39         return result;
    40     }
    41 }

     四、示例

     1 public class TestPriorityBlockingQueue {
     2 
     3 
     4     public static void main(String[] args) throws InterruptedException {
     5 
     6         PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
     7 
     8         for (int i = 0; i < 5; i++) {
     9             int finalI = i;
    10             new Thread(() -> {
    11                 queue.offer(finalI);
    12                 System.out.println("生产:" + finalI);
    13             }).start();
    14         }
    15 
    16         Thread.sleep(2000);
    17 
    18         for (int j = 0; j < 5; j++) {
    19             int finalI = j;
    20             new Thread(() -> {
    21                 try {
    22                     System.out.println("消费:" + queue.take());
    23                 } catch (InterruptedException e) {
    24                     e.printStackTrace();
    25                 }
    26             }).start();
    27         }
    28     }
    29 }

    五、总结

      1、PriorityBlockingQueue 采用数组来存储数据的,数据结构是最小堆

      2、PriorityBlockingQueue 扩容时

        - 原容量小于 64,新容量为原容量的2倍+2

        - 原容量不小于 64,新容量为原容量的1.5倍

      3、由于是最小堆的数据结构,queue[0]最小,每次放入数据 和 取出数据,要进行数据调整(上浮,下浮);

      4、PriorityBlockingQueue 是线程安全的

  • 相关阅读:
    hdu 2647 Reward
    hdu 2094 产生冠军
    hdu 3342 Legal or Not
    hdu 1285 确定比赛名次
    hdu 3006 The Number of set
    hdu 1429 胜利大逃亡(续)
    UVA 146 ID Codes
    UVA 131 The Psychic Poker Player
    洛谷 P2491消防 解题报告
    洛谷 P2587 [ZJOI2008]泡泡堂 解题报告
  • 原文地址:https://www.cnblogs.com/h--d/p/14597186.html
Copyright © 2011-2022 走看看