zoukankan      html  css  js  c++  java
  • Java并发之LinkedBlockingQueue

    上一篇我们已经学习过了 ArrayBlockingQueue的知识及相关方法的使用,这一篇我们就来再学习一下ArrayBlockingQueue的亲戚 LinkedBlockingQueue。在集合类中 ArrayList与 LinkedList会常常拿来比较,ArrayList内部实现是基于数组的,而 LinkedList内部实现是基于链表,所以他们之间会有很多不同,但是本文不会去重点讨论,感兴趣的朋友可以参考我之前发过的几篇文章,那么有请本节的主角 LinkedBlockingQueue!

            LinkedBlockingQueue

            LinkedBlockingQueue是一个一个基于已链接节点的、范围任意(相对而论)的 blocking queue。此队列按 FIFO(先进先出)排序元素。队列的头部 是在队列中时间最长的元素。队列的尾部 是在队列中时间最短的元素。新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。 

            可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于 Integer.MAX_VALUE。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。 

            LinkedBlockingQueue及其迭代器实现了 Collection 和 Iterator 接口的所有可选 方法。 

            我们已经学习过了 ArrayBlockingQueue,所以学习 LinkedBlockingQueue就自然比较轻松,所以本文对于已经明确的相关概念就不做过多介绍了,而是重点放在两者的区别之上。

            1.成员变量

            与ArrayBlockingQueue不同 LinkedBlockingQueue的成员变量有些变化,以下是 LinkedBlockingQueue的成员变量:

    Java代码  收藏代码
    1. /** 容量范围,默认值为 Integer.MAX_VALUE */  
    2. private final int capacity;  
    3.   
    4. /** 当前队列中元素数量 */  
    5. private final AtomicInteger count = new AtomicInteger(0);  
    6.   
    7. /** 头节点 */  
    8. private transient Node<E> head;  
    9.   
    10. /** 尾节点 */  
    11. private transient Node<E> last;  
    12.   
    13. /** take, poll等方法的锁 */  
    14. private final ReentrantLock takeLock = new ReentrantLock();  
    15.   
    16. /** 获取队列的 Condition(条件)实例 */  
    17. private final Condition notEmpty = takeLock.newCondition();  
    18.   
    19. /** put, offer等方法的锁 */  
    20. private final ReentrantLock putLock = new ReentrantLock();  
    21.   
    22. /** 插入队列的 Condition(条件)实例 */  
    23. private final Condition notFull = putLock.newCondition();  

            1)首先 LinkedBlockingQueue明确了容量变量,当为指定容量时,默认容量为Int的最大值Integer.MAX_VALUE。

            2)队列元素数量变量 count采用的是 AtomicInteger ,而不是普通的Int型。CAS相关可参考http://286.iteye.com/blog/2295165

            3)LinkedBlockingQueue内部队列实现使用的是 Node节点类,这与 LinkedList类似。

            4)最后也是最重要的一点,那就是获取与插入操作分成了两个锁:takeLock与 putLock来处理,这点下面还会重点分析。

            2.构造方法

            有三个构造方法,分别为默认,指定容量,指定容量和初始元素。

    Java代码  收藏代码
    1. /** 
    2.  * 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue 
    3.  */  
    4. public LinkedBlockingQueue() {  
    5.     this(Integer.MAX_VALUE);  
    6. }  
    7.   
    8. /** 
    9.  * 创建一个具有给定(固定)容量的 LinkedBlockingQueue 
    10.  */  
    11. public LinkedBlockingQueue(int capacity) {  
    12.     if (capacity <= 0)  
    13.         throw new IllegalArgumentException();  
    14.     this.capacity = capacity;  
    15.     last = head = new Node<E>(null);  
    16. }  
    17.   
    18. /** 
    19.  * 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue, 
    20.  * 最初包含给定 collection 的元素,元素按该 collection 迭代器的遍历顺序添加。 
    21.  */  
    22. public LinkedBlockingQueue(Collection<? extends E> c) {  
    23.     this(Integer.MAX_VALUE);  
    24.     for (E e : c)  
    25.         add(e);  
    26. }  

            默认构造方法创建一个容量为 Integer.MAX_VALUE的 LinkedBlockingQueue实例。

            第二种构造方法,指定了队列容量,首先判断指定容量是否大于零,否则抛出异常。然后为 capacity 赋值,最后创建空节点,并指向 head与 last,两者的 item与 next此时均为 null。

     

            最后一种,利用循环向队列中添加指定集合中的元素。

            3.Node类

            LinkedBlockingQueue内部列表实现是使用的 Node内部类,Node类也并不复杂,以下是其源代码:

    Java代码  收藏代码
    1. /** 
    2.  * 节点类 
    3.  */  
    4. static class Node<E> {  
    5.     /** volatile保障读写分离 */  
    6.     volatile E item;  
    7.     Node<E> next;  
    8.   
    9.     Node(E x) {  
    10.         item = x;  
    11.     }  
    12. }  

            item用于表示元素对象,next指向链表的下一个节点。



            LinkedBlockingQueue的大部分方法其实是与  ArrayBlockingQueue类似的,所以本文就只介绍不同于ArrayBlockingQueue的相关方法。

            4.添加元素

            1)add方法

            add方法相同就不介绍了,同样调用的是offer方法。

            2)offer方法

            将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false。当使用有容量限制的队列时,此方法通常要优于 add 方法,后者可能无法插入元素,而只是抛出一个异常。 

            与ArrayBlockingQueue不同,LinkedBlockingQueue多了一些容量方面的判断。

    Java代码  收藏代码
    1. /** 
    2.  * 将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量) 
    3.  * 在成功时返回 true,如果此队列已满,则返回 false。 
    4.  * 当使用有容量限制的队列时,此方法通常要优于 add 方法, 
    5.  * 后者可能无法插入元素,而只是抛出一个异常。  
    6.  */  
    7. public boolean offer(E e) {  
    8.     //判断添加元素是否为null  
    9.     if (e == null)  
    10.         throw new NullPointerException();  
    11.     //第一点不同,使用原子类操作count,因为有两个锁  
    12.     final AtomicInteger count = this.count;  
    13.     //判断容量,队列是否已满  
    14.     if (count.get() == capacity)  
    15.         return false;  
    16.     int c = -1;  
    17.     final ReentrantLock putLock = this.putLock;  
    18.     //获取添加锁  
    19.     putLock.lock();  
    20.     try {  
    21.         //再次判断,如果队列未满  
    22.         if (count.get() < capacity) {  
    23.             //插入元素  
    24.             insert(e);  
    25.             //增加元素数count  
    26.             c = count.getAndIncrement();  
    27.             if (c + 1 < capacity)  
    28.                 //未满则唤醒添加线程  
    29.                 notFull.signal();  
    30.         }  
    31.     } finally {  
    32.         //释放锁  
    33.         putLock.unlock();  
    34.     }  
    35.     //c等于0说明添加成功  
    36.     if (c == 0)  
    37.         //唤醒读取线程  
    38.         signalNotEmpty();  
    39.     return c >= 0;  
    40. }  

            可以看到offer方法的关键在于 insert方法。

            3)insert方法

             insert方法非常简单,但是却不要小看。

    Java代码  收藏代码
    1. /** 
    2.  * 再队尾添加元素 
    3.  */  
    4. private void insert(E x) {  
    5.     last = last.next = new Node<E>(x);  
    6. }  

            首先,根据指定参数x创建一个Node实例。

            然后,将原尾节点的next指向此节点。

            最后,将尾节点设置尾此节点。

            这样新添加的节点就成为了新的尾节点。



     

            当向链表中添加第一个节点时,因为在初始化时

    Java代码  收藏代码
    1. last = head = new Node<E>(null);  

            所以此时 head与 last指向的是同一个对象new Node<E>(null)。

            之后将last.next指向x。

    Java代码  收藏代码
    1. last.next = new Node<E>(x);  

            因为此时 head与 last是同一个对象,所以 head.next也指向x。

            最后将 last指向x。

    Java代码  收藏代码
    1. last =  new Node<E>(x);  

            这样 head的next就指向了 last。此时head中的 item仍为 null。

            4)put方法

            将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。

    Java代码  收藏代码
    1. /** 
    2.  * 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用 
    3.  */  
    4. public void put(E e) throws InterruptedException {  
    5.     //判断添加元素是否为null  
    6.     if (e == null)  
    7.         throw new NullPointerException();  
    8.     int c = -1;  
    9.     final ReentrantLock putLock = this.putLock;  
    10.     final AtomicInteger count = this.count;  
    11.     //获取插入的可中断锁  
    12.     putLock.lockInterruptibly();  
    13.     try {  
    14.         try {  
    15.             //判断队列是否已满  
    16.             while (count.get() == capacity)  
    17.                 //如果已满则阻塞添加线程  
    18.                 notFull.await();  
    19.         } catch (InterruptedException ie) {  
    20.             //失败就唤醒添加线程  
    21.             notFull.signal();   
    22.             throw ie;  
    23.         }  
    24.         //添加元素  
    25.         insert(e);  
    26.         //修改c值  
    27.         c = count.getAndIncrement();  
    28.         //根据c值判断队列是否已满  
    29.         if (c + 1 < capacity)  
    30.             //未满则唤醒添加线程  
    31.             notFull.signal();  
    32.     } finally {  
    33.         //释放锁  
    34.         putLock.unlock();  
    35.     }  
    36.     //c等于0代表添加成功  
    37.     if (c == 0)  
    38.         signalNotEmpty();  
    39. }  

            5.获取元素

            1)peek方法

            peek方法获取但不移除此队列的头;如果此队列为空,则返回 null。

    Java代码  收藏代码
    1. /** 
    2.  * 获取但不移除此队列的头;如果此队列为空,则返回 null 
    3.  */  
    4. public E peek() {  
    5.     //判断元素数是否为0  
    6.     if (count.get() == 0)  
    7.         return null;  
    8.     final ReentrantLock takeLock = this.takeLock;  
    9.     //获取获取锁  
    10.     takeLock.lock();  
    11.     try {  
    12.         //头节点的 next节点即为添加的第一个节点  
    13.         Node<E> first = head.next;  
    14.         //如果不为空则返回该节点  
    15.         if (first == null)  
    16.             return null;  
    17.         else  
    18.             return first.item;  
    19.     } finally {  
    20.         //释放锁  
    21.         takeLock.unlock();  
    22.     }  
    23. }  

            peek方法从头节点直接就可以获取到第一个添加的元素,所以效率是比较高的。如果不存在则返回null。

            2)poll方法

            poll方法获取并移除此队列的头,如果此队列为空,则返回 null。

    Java代码  收藏代码
    1. /** 
    2.  * 获取并移除此队列的头,如果此队列为空,则返回 null 
    3.  */  
    4. public E poll() {  
    5.     final AtomicInteger count = this.count;  
    6.     //判断元素数量  
    7.     if (count.get() == 0)  
    8.         return null;  
    9.     E x = null;  
    10.     int c = -1;  
    11.     final ReentrantLock takeLock = this.takeLock;  
    12.     //获取获取锁  
    13.     takeLock.lock();  
    14.     try {  
    15.         //再次判断元素数量  
    16.         if (count.get() > 0) {  
    17.             //调用extract方法获取第一个元素  
    18.             x = extract();  
    19.             //c=count++  
    20.             c = count.getAndDecrement();  
    21.             //如果队列中含有元素  
    22.             if (c > 1)  
    23.                 //唤醒读取线程  
    24.                 notEmpty.signal();  
    25.         }  
    26.     } finally {  
    27.         //释放锁  
    28.         takeLock.unlock();  
    29.     }  
    30.     //如果队列已满  
    31.     if (c == capacity)  
    32.         //唤醒等待中的添加线程  
    33.         signalNotFull();  
    34.     return x;  
    35. }  

            poll与 peek方法不同在于poll获取完元素后移除这个元素,获取与移除是通过 extract()方法实现的。

            注意:其中需要注意的是最后部分代码:

    Java代码  收藏代码
    1. //如果队列已满  
    2. if (c == capacity)  
    3.     //唤醒等待中的添加线程  
    4.     signalNotFull();  

            肯定会有朋友有以下疑问:

            1)队列都已经满了,还需要唤醒添加线程干什么?

            2)线程满了就不应该再向里面添加元素了啊?

            3)signalNotFull方法是干什么的?

        signalNotFull方法的作用是唤醒等待中的put线程,signalNotFull只能被 take/poll方法调用,以下是 signalNotFull方法的源代码:

    Java代码  收藏代码
    1. /** 
    2.  * 唤醒等待中的put线程,只能被 take/poll方法调用 
    3.  */  
    4. private void signalNotFull() {  
    5.     final ReentrantLock putLock = this.putLock;  
    6.     //获取锁  
    7.     putLock.lock();  
    8.     try {  
    9.         //唤醒添加线程  
    10.         notFull.signal();  
    11.     } finally {  
    12.         //释放锁  
    13.         putLock.unlock();  
    14.     }  
    15. }  

          前两点问题其实转换一下角度就能很好的理解了,虽然队列已经满了,但是此时本线程已经完成了添加,但是其他线程还在等待获取条件进行添加,如果不去主动唤醒的话,那么这些添加操作就只能无限期的等待下去,所以这些等待的添加操作就会失效。所以此时需要唤醒已经排队的添加线程,虽然他们已经无法添加元素至队列。

            3)extract方法

            extract方法用于获取并移除头节点。

    Java代码  收藏代码
    1. /** 
    2.  * 获取并移除头节点 
    3.  */  
    4. private E extract() {  
    5.     //获取第一个节点,即 head的下一个元素  
    6.     Node<E> first = head.next;  
    7.     //将head指向此元素  
    8.     head = first;  
    9.     //获取元素值  
    10.     E x = first.item;  
    11.     //清除first的item元素为空,即head元素的item为空  
    12.     first.item = null;  
    13.     //返回  
    14.     return x;  
    15. }  

            这里需要注意的是这里指的头节点并不是 head,而是 head的 next所指 Node的 item元素。因为 head的 item永远为 null。last的 next永远为 null。

            4)take方法

            获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。

    Java代码  收藏代码
    1. /** 
    2.  * 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要) 
    3.  */  
    4. public E take() throws InterruptedException {  
    5.     E x;  
    6.     int c = -1;  
    7.     final AtomicInteger count = this.count;  
    8.     final ReentrantLock takeLock = this.takeLock;  
    9.     //获取可中断锁  
    10.     takeLock.lockInterruptibly();  
    11.     try {  
    12.         try {  
    13.             //判断队列是否含有元素  
    14.             while (count.get() == 0)  
    15.                 //没有元素就阻塞获取线程,因为没有元素所以获取线程也就没有必要运行  
    16.                 notEmpty.await();  
    17.         } catch (InterruptedException ie) {  
    18.             //失败就唤醒获取线程  
    19.             notEmpty.signal();   
    20.             throw ie;  
    21.         }  
    22.         //调用 extract方法获取元素  
    23.         x = extract();  
    24.         //计数c的新值  
    25.         c = count.getAndDecrement();  
    26.         //如果元素数大于1  
    27.         if (c > 1)  
    28.             //唤醒获取线程  
    29.             notEmpty.signal();  
    30.     } finally {  
    31.         //释放锁  
    32.         takeLock.unlock();  
    33.     }  
    34.     //如果队列已满  
    35.     if (c == capacity)  
    36.         //唤醒还在等待的put线程  
    37.         signalNotFull();  
    38.     return x;  
    39. }  

            与 poll方法类似,只是take方法采用阻塞的方式来获取元素。

            7.其他方法

            1)remainingCapacity方法

    Java代码  收藏代码
    1. /** 
    2.  * 返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量 
    3.  */  
    4. public int remainingCapacity() {  
    5.     return capacity - count.get();  
    6. }  

            也就是返回可以立即添加元素的数量。

            2)iterator方法

            iterator方法返回在队列中的元素上按适当顺序进行迭代的迭代器。返回的 Iterator 是一个“弱一致”的迭代器,从不抛出 ConcurrentModificationException,并且确保可遍历迭代器构造后所存在的所有元素,并且可能(但并不保证)反映构造后的所有修改。 

    Java代码  收藏代码
    1. /** 
    2.  * 返回Itr实例 
    3.  */  
    4. public Iterator<E> iterator() {  
    5.     return new Itr();  
    6. }  

            iterator方法返回的是一个Itr内部类的实例,通过这个实例可以遍历整个队列。以下是Itr内部类的源代码:

    Java代码  收藏代码
    1. private class Itr implements Iterator<E> {  
    2.     //当前节点  
    3.     private Node<E> current;  
    4.     private Node<E> lastRet;  
    5.     //当前元素  
    6.     private E currentElement;  
    7.   
    8.     Itr() {  
    9.         final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;  
    10.         final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;  
    11.         //获取获取与添加锁  
    12.         putLock.lock();  
    13.         takeLock.lock();  
    14.         try {  
    15.             current = head.next;  
    16.             if (current != null)  
    17.                 currentElement = current.item;  
    18.         } finally {  
    19.             takeLock.unlock();  
    20.             putLock.unlock();  
    21.         }  
    22.     }  
    23.   
    24.     public boolean hasNext() {  
    25.         return current != null;  
    26.     }  
    27.   
    28.     public E next() {  
    29.         final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;  
    30.         final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;  
    31.         putLock.lock();  
    32.         takeLock.lock();  
    33.         try {  
    34.             if (current == null)  
    35.                 throw new NoSuchElementException();  
    36.             E x = currentElement;  
    37.             lastRet = current;  
    38.             current = current.next;  
    39.             if (current != null)  
    40.                 currentElement = current.item;  
    41.             return x;  
    42.         } finally {  
    43.             takeLock.unlock();  
    44.             putLock.unlock();  
    45.         }  
    46.     }  
    47.   
    48.     public void remove() {  
    49.         if (lastRet == null)  
    50.             throw new IllegalStateException();  
    51.         final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;  
    52.         final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;  
    53.         putLock.lock();  
    54.         takeLock.lock();  
    55.         try {  
    56.             Node<E> node = lastRet;  
    57.             lastRet = null;  
    58.             Node<E> trail = head;  
    59.             Node<E> p = head.next;  
    60.             while (p != null && p != node) {  
    61.                 trail = p;  
    62.                 p = p.next;  
    63.             }  
    64.             if (p == node) {  
    65.                 p.item = null;  
    66.                 trail.next = p.next;  
    67.                 if (last == p)  
    68.                     last = trail;  
    69.                 int c = count.getAndDecrement();  
    70.                 if (c == capacity)  
    71.                     notFull.signalAll();  
    72.             }  
    73.         } finally {  
    74.             takeLock.unlock();  
    75.             putLock.unlock();  
    76.         }  
    77.     }  
    78. }  

             Itr类不复杂,我就不详细解释了。

            3)清除方法

            clear,drainTo等方法与 ArrayBlockingQueue类似,这里就不说了。

            8,.LinkedBlockingQueue与 ArrayBlockingQueue

            1)内部实现不同

            ArrayBlockingQueue内部队列存储使用的是数组:

    Java代码  收藏代码
    1. private final E[] items;  

            而 LinkedBlockingQueue内部队列存储使用的是Node节点内部类:

    Java代码  收藏代码
    1. static class Node<E> {  
    2.     /** The item, volatile to ensure barrier separating write and read */  
    3.     volatile E item;  
    4.     Node<E> next;  
    5.     Node(E x) { item = x; }  
    6. }  

            2)队列中锁的实现不同

    Java代码  收藏代码
    1. /** LinkedBlockingQueue的获取锁 */  
    2. private final ReentrantLock takeLock = new ReentrantLock();  
    3.   
    4. /** LinkedBlockingQueue的添加锁 */  
    5. private final ReentrantLock putLock = new ReentrantLock();  
    6.   
    7.   
    8. /** ArrayBlockingQueue的唯一锁 */  
    9. private final ReentrantLock lock;  

            从源代码就可以看出 ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加与获取使用的是同一个锁;而 LinkedBlockingQueue实现的队列中的锁是分离的,即添加用的是 putLock,获取是 takeLock。

            3)初始化条件不同

            ArrayBlockingQueue实现的队列中必须指定队列的大小。

            LinkedBlockingQueue实现的队列中可以不指定队列的大小,默认容量为Integer.MAX_VALUE。

            4)操作不同

            ArrayBlockingQueue无论是添加还是获取使用的是同一个锁,所以添加的同时就不能读取,读取的同时就不能添加,所以锁方面性能不如 LinkedBlockingQueue。

            LinkedBlockingQueue读取与添加操作使用不同的锁,因为其内部实现的特殊性,添加的时候只需要修改 last即可,而不会影响 head节点。而获取时也只需要修改 head节点即可,同样不会影响 last节点。所以在添加获取方面理论上性能会高于 ArrayBlockingQueue。

            所以 LinkedBlockingQueue更适合实现生产者-消费者队列。

  • 相关阅读:
    牛客网 二叉树的镜像 JAVA
    牛客网 反转链表 JAVA
    牛客网 调整数组顺序使奇数位于偶数前面 JAVA
    Integer to Roman LeetCode Java
    Valid Number leetcode java
    Longest Common Prefix
    Wildcard Matching leetcode java
    Regular Expression Matching
    Longest Palindromic Substring
    Add Binary LeetCode Java
  • 原文地址:https://www.cnblogs.com/chencanjian/p/9350389.html
Copyright © 2011-2022 走看看