zoukankan      html  css  js  c++  java
  • Java并发之LinkedBlockingQueue

    上一篇我们已经学习过了 ArrayBlockingQueue的知识及相关方法的使用,这一篇我们就来再学习一下ArrayBlockingQueue的亲戚 LinkedBlockingQueue。在集合类中 ArrayList与 LinkedList会常常拿来比较,ArrayList内部实现是基于数组的,而 LinkedList内部实现是基于链表,所以他们之间会有很多不同,但是本文不会去重点讨论,感兴趣的朋友可以参考我之前发过的几篇文章,那么有请本节的主角 LinkedBlockingQueue!

            LinkedBlockingQueue

            LinkedBlockingQueue是一个一个基于已链接节点的、范围任意(相对而论)的 blocking queue。此队列按 FIFO(先进先出)排序元素。队列的头部 是在队列中时间最长的元素。队列的尾部 是在队列中时间最短的元素。新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。 

            可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于 Integer.MAX_VALUE。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。 

            LinkedBlockingQueue及其迭代器实现了 Collection 和 Iterator 接口的所有可选 方法。 

            我们已经学习过了 ArrayBlockingQueue,所以学习 LinkedBlockingQueue就自然比较轻松,所以本文对于已经明确的相关概念就不做过多介绍了,而是重点放在两者的区别之上。

            1.成员变量

            与ArrayBlockingQueue不同 LinkedBlockingQueue的成员变量有些变化,以下是 LinkedBlockingQueue的成员变量:

    Java代码  收藏代码
    1. /** 容量范围,默认值为 Integer.MAX_VALUE */  
    2. private final int capacity;  
    3.   
    4. /** 当前队列中元素数量 */  
    5. private final AtomicInteger count = new AtomicInteger(0);  
    6.   
    7. /** 头节点 */  
    8. private transient Node<E> head;  
    9.   
    10. /** 尾节点 */  
    11. private transient Node<E> last;  
    12.   
    13. /** take, poll等方法的锁 */  
    14. private final ReentrantLock takeLock = new ReentrantLock();  
    15.   
    16. /** 获取队列的 Condition(条件)实例 */  
    17. private final Condition notEmpty = takeLock.newCondition();  
    18.   
    19. /** put, offer等方法的锁 */  
    20. private final ReentrantLock putLock = new ReentrantLock();  
    21.   
    22. /** 插入队列的 Condition(条件)实例 */  
    23. private final Condition notFull = putLock.newCondition();  

            1)首先 LinkedBlockingQueue明确了容量变量,当为指定容量时,默认容量为Int的最大值Integer.MAX_VALUE。

            2)队列元素数量变量 count采用的是 AtomicInteger ,而不是普通的Int型。CAS相关可参考http://286.iteye.com/blog/2295165

            3)LinkedBlockingQueue内部队列实现使用的是 Node节点类,这与 LinkedList类似。

            4)最后也是最重要的一点,那就是获取与插入操作分成了两个锁:takeLock与 putLock来处理,这点下面还会重点分析。

            2.构造方法

            有三个构造方法,分别为默认,指定容量,指定容量和初始元素。

    Java代码  收藏代码
    1. /** 
    2.  * 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue 
    3.  */  
    4. public LinkedBlockingQueue() {  
    5.     this(Integer.MAX_VALUE);  
    6. }  
    7.   
    8. /** 
    9.  * 创建一个具有给定(固定)容量的 LinkedBlockingQueue 
    10.  */  
    11. public LinkedBlockingQueue(int capacity) {  
    12.     if (capacity <= 0)  
    13.         throw new IllegalArgumentException();  
    14.     this.capacity = capacity;  
    15.     last = head = new Node<E>(null);  
    16. }  
    17.   
    18. /** 
    19.  * 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue, 
    20.  * 最初包含给定 collection 的元素,元素按该 collection 迭代器的遍历顺序添加。 
    21.  */  
    22. public LinkedBlockingQueue(Collection<? extends E> c) {  
    23.     this(Integer.MAX_VALUE);  
    24.     for (E e : c)  
    25.         add(e);  
    26. }  

            默认构造方法创建一个容量为 Integer.MAX_VALUE的 LinkedBlockingQueue实例。

            第二种构造方法,指定了队列容量,首先判断指定容量是否大于零,否则抛出异常。然后为 capacity 赋值,最后创建空节点,并指向 head与 last,两者的 item与 next此时均为 null。

     

            最后一种,利用循环向队列中添加指定集合中的元素。

            3.Node类

            LinkedBlockingQueue内部列表实现是使用的 Node内部类,Node类也并不复杂,以下是其源代码:

    Java代码  收藏代码
    1. /** 
    2.  * 节点类 
    3.  */  
    4. static class Node<E> {  
    5.     /** volatile保障读写分离 */  
    6.     volatile E item;  
    7.     Node<E> next;  
    8.   
    9.     Node(E x) {  
    10.         item = x;  
    11.     }  
    12. }  

            item用于表示元素对象,next指向链表的下一个节点。



            LinkedBlockingQueue的大部分方法其实是与  ArrayBlockingQueue类似的,所以本文就只介绍不同于ArrayBlockingQueue的相关方法。

            4.添加元素

            1)add方法

            add方法相同就不介绍了,同样调用的是offer方法。

            2)offer方法

            将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false。当使用有容量限制的队列时,此方法通常要优于 add 方法,后者可能无法插入元素,而只是抛出一个异常。 

            与ArrayBlockingQueue不同,LinkedBlockingQueue多了一些容量方面的判断。

    Java代码  收藏代码
    1. /** 
    2.  * 将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量) 
    3.  * 在成功时返回 true,如果此队列已满,则返回 false。 
    4.  * 当使用有容量限制的队列时,此方法通常要优于 add 方法, 
    5.  * 后者可能无法插入元素,而只是抛出一个异常。  
    6.  */  
    7. public boolean offer(E e) {  
    8.     //判断添加元素是否为null  
    9.     if (e == null)  
    10.         throw new NullPointerException();  
    11.     //第一点不同,使用原子类操作count,因为有两个锁  
    12.     final AtomicInteger count = this.count;  
    13.     //判断容量,队列是否已满  
    14.     if (count.get() == capacity)  
    15.         return false;  
    16.     int c = -1;  
    17.     final ReentrantLock putLock = this.putLock;  
    18.     //获取添加锁  
    19.     putLock.lock();  
    20.     try {  
    21.         //再次判断,如果队列未满  
    22.         if (count.get() < capacity) {  
    23.             //插入元素  
    24.             insert(e);  
    25.             //增加元素数count  
    26.             c = count.getAndIncrement();  
    27.             if (c + 1 < capacity)  
    28.                 //未满则唤醒添加线程  
    29.                 notFull.signal();  
    30.         }  
    31.     } finally {  
    32.         //释放锁  
    33.         putLock.unlock();  
    34.     }  
    35.     //c等于0说明添加成功  
    36.     if (c == 0)  
    37.         //唤醒读取线程  
    38.         signalNotEmpty();  
    39.     return c >= 0;  
    40. }  

            可以看到offer方法的关键在于 insert方法。

            3)insert方法

             insert方法非常简单,但是却不要小看。

    Java代码  收藏代码
    1. /** 
    2.  * 再队尾添加元素 
    3.  */  
    4. private void insert(E x) {  
    5.     last = last.next = new Node<E>(x);  
    6. }  

            首先,根据指定参数x创建一个Node实例。

            然后,将原尾节点的next指向此节点。

            最后,将尾节点设置尾此节点。

            这样新添加的节点就成为了新的尾节点。



     

            当向链表中添加第一个节点时,因为在初始化时

    Java代码  收藏代码
    1. last = head = new Node<E>(null);  

            所以此时 head与 last指向的是同一个对象new Node<E>(null)。

            之后将last.next指向x。

    Java代码  收藏代码
    1. last.next = new Node<E>(x);  

            因为此时 head与 last是同一个对象,所以 head.next也指向x。

            最后将 last指向x。

    Java代码  收藏代码
    1. last =  new Node<E>(x);  

            这样 head的next就指向了 last。此时head中的 item仍为 null。

            4)put方法

            将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。

    Java代码  收藏代码
    1. /** 
    2.  * 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用 
    3.  */  
    4. public void put(E e) throws InterruptedException {  
    5.     //判断添加元素是否为null  
    6.     if (e == null)  
    7.         throw new NullPointerException();  
    8.     int c = -1;  
    9.     final ReentrantLock putLock = this.putLock;  
    10.     final AtomicInteger count = this.count;  
    11.     //获取插入的可中断锁  
    12.     putLock.lockInterruptibly();  
    13.     try {  
    14.         try {  
    15.             //判断队列是否已满  
    16.             while (count.get() == capacity)  
    17.                 //如果已满则阻塞添加线程  
    18.                 notFull.await();  
    19.         } catch (InterruptedException ie) {  
    20.             //失败就唤醒添加线程  
    21.             notFull.signal();   
    22.             throw ie;  
    23.         }  
    24.         //添加元素  
    25.         insert(e);  
    26.         //修改c值  
    27.         c = count.getAndIncrement();  
    28.         //根据c值判断队列是否已满  
    29.         if (c + 1 < capacity)  
    30.             //未满则唤醒添加线程  
    31.             notFull.signal();  
    32.     } finally {  
    33.         //释放锁  
    34.         putLock.unlock();  
    35.     }  
    36.     //c等于0代表添加成功  
    37.     if (c == 0)  
    38.         signalNotEmpty();  
    39. }  

            5.获取元素

            1)peek方法

            peek方法获取但不移除此队列的头;如果此队列为空,则返回 null。

    Java代码  收藏代码
    1. /** 
    2.  * 获取但不移除此队列的头;如果此队列为空,则返回 null 
    3.  */  
    4. public E peek() {  
    5.     //判断元素数是否为0  
    6.     if (count.get() == 0)  
    7.         return null;  
    8.     final ReentrantLock takeLock = this.takeLock;  
    9.     //获取获取锁  
    10.     takeLock.lock();  
    11.     try {  
    12.         //头节点的 next节点即为添加的第一个节点  
    13.         Node<E> first = head.next;  
    14.         //如果不为空则返回该节点  
    15.         if (first == null)  
    16.             return null;  
    17.         else  
    18.             return first.item;  
    19.     } finally {  
    20.         //释放锁  
    21.         takeLock.unlock();  
    22.     }  
    23. }  

            peek方法从头节点直接就可以获取到第一个添加的元素,所以效率是比较高的。如果不存在则返回null。

            2)poll方法

            poll方法获取并移除此队列的头,如果此队列为空,则返回 null。

    Java代码  收藏代码
    1. /** 
    2.  * 获取并移除此队列的头,如果此队列为空,则返回 null 
    3.  */  
    4. public E poll() {  
    5.     final AtomicInteger count = this.count;  
    6.     //判断元素数量  
    7.     if (count.get() == 0)  
    8.         return null;  
    9.     E x = null;  
    10.     int c = -1;  
    11.     final ReentrantLock takeLock = this.takeLock;  
    12.     //获取获取锁  
    13.     takeLock.lock();  
    14.     try {  
    15.         //再次判断元素数量  
    16.         if (count.get() > 0) {  
    17.             //调用extract方法获取第一个元素  
    18.             x = extract();  
    19.             //c=count++  
    20.             c = count.getAndDecrement();  
    21.             //如果队列中含有元素  
    22.             if (c > 1)  
    23.                 //唤醒读取线程  
    24.                 notEmpty.signal();  
    25.         }  
    26.     } finally {  
    27.         //释放锁  
    28.         takeLock.unlock();  
    29.     }  
    30.     //如果队列已满  
    31.     if (c == capacity)  
    32.         //唤醒等待中的添加线程  
    33.         signalNotFull();  
    34.     return x;  
    35. }  

            poll与 peek方法不同在于poll获取完元素后移除这个元素,获取与移除是通过 extract()方法实现的。

            注意:其中需要注意的是最后部分代码:

    Java代码  收藏代码
    1. //如果队列已满  
    2. if (c == capacity)  
    3.     //唤醒等待中的添加线程  
    4.     signalNotFull();  

            肯定会有朋友有以下疑问:

            1)队列都已经满了,还需要唤醒添加线程干什么?

            2)线程满了就不应该再向里面添加元素了啊?

            3)signalNotFull方法是干什么的?

        signalNotFull方法的作用是唤醒等待中的put线程,signalNotFull只能被 take/poll方法调用,以下是 signalNotFull方法的源代码:

    Java代码  收藏代码
    1. /** 
    2.  * 唤醒等待中的put线程,只能被 take/poll方法调用 
    3.  */  
    4. private void signalNotFull() {  
    5.     final ReentrantLock putLock = this.putLock;  
    6.     //获取锁  
    7.     putLock.lock();  
    8.     try {  
    9.         //唤醒添加线程  
    10.         notFull.signal();  
    11.     } finally {  
    12.         //释放锁  
    13.         putLock.unlock();  
    14.     }  
    15. }  

          前两点问题其实转换一下角度就能很好的理解了,虽然队列已经满了,但是此时本线程已经完成了添加,但是其他线程还在等待获取条件进行添加,如果不去主动唤醒的话,那么这些添加操作就只能无限期的等待下去,所以这些等待的添加操作就会失效。所以此时需要唤醒已经排队的添加线程,虽然他们已经无法添加元素至队列。

            3)extract方法

            extract方法用于获取并移除头节点。

    Java代码  收藏代码
    1. /** 
    2.  * 获取并移除头节点 
    3.  */  
    4. private E extract() {  
    5.     //获取第一个节点,即 head的下一个元素  
    6.     Node<E> first = head.next;  
    7.     //将head指向此元素  
    8.     head = first;  
    9.     //获取元素值  
    10.     E x = first.item;  
    11.     //清除first的item元素为空,即head元素的item为空  
    12.     first.item = null;  
    13.     //返回  
    14.     return x;  
    15. }  

            这里需要注意的是这里指的头节点并不是 head,而是 head的 next所指 Node的 item元素。因为 head的 item永远为 null。last的 next永远为 null。

            4)take方法

            获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。

    Java代码  收藏代码
    1. /** 
    2.  * 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要) 
    3.  */  
    4. public E take() throws InterruptedException {  
    5.     E x;  
    6.     int c = -1;  
    7.     final AtomicInteger count = this.count;  
    8.     final ReentrantLock takeLock = this.takeLock;  
    9.     //获取可中断锁  
    10.     takeLock.lockInterruptibly();  
    11.     try {  
    12.         try {  
    13.             //判断队列是否含有元素  
    14.             while (count.get() == 0)  
    15.                 //没有元素就阻塞获取线程,因为没有元素所以获取线程也就没有必要运行  
    16.                 notEmpty.await();  
    17.         } catch (InterruptedException ie) {  
    18.             //失败就唤醒获取线程  
    19.             notEmpty.signal();   
    20.             throw ie;  
    21.         }  
    22.         //调用 extract方法获取元素  
    23.         x = extract();  
    24.         //计数c的新值  
    25.         c = count.getAndDecrement();  
    26.         //如果元素数大于1  
    27.         if (c > 1)  
    28.             //唤醒获取线程  
    29.             notEmpty.signal();  
    30.     } finally {  
    31.         //释放锁  
    32.         takeLock.unlock();  
    33.     }  
    34.     //如果队列已满  
    35.     if (c == capacity)  
    36.         //唤醒还在等待的put线程  
    37.         signalNotFull();  
    38.     return x;  
    39. }  

            与 poll方法类似,只是take方法采用阻塞的方式来获取元素。

            7.其他方法

            1)remainingCapacity方法

    Java代码  收藏代码
    1. /** 
    2.  * 返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量 
    3.  */  
    4. public int remainingCapacity() {  
    5.     return capacity - count.get();  
    6. }  

            也就是返回可以立即添加元素的数量。

            2)iterator方法

            iterator方法返回在队列中的元素上按适当顺序进行迭代的迭代器。返回的 Iterator 是一个“弱一致”的迭代器,从不抛出 ConcurrentModificationException,并且确保可遍历迭代器构造后所存在的所有元素,并且可能(但并不保证)反映构造后的所有修改。 

    Java代码  收藏代码
    1. /** 
    2.  * 返回Itr实例 
    3.  */  
    4. public Iterator<E> iterator() {  
    5.     return new Itr();  
    6. }  

            iterator方法返回的是一个Itr内部类的实例,通过这个实例可以遍历整个队列。以下是Itr内部类的源代码:

    Java代码  收藏代码
    1. private class Itr implements Iterator<E> {  
    2.     //当前节点  
    3.     private Node<E> current;  
    4.     private Node<E> lastRet;  
    5.     //当前元素  
    6.     private E currentElement;  
    7.   
    8.     Itr() {  
    9.         final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;  
    10.         final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;  
    11.         //获取获取与添加锁  
    12.         putLock.lock();  
    13.         takeLock.lock();  
    14.         try {  
    15.             current = head.next;  
    16.             if (current != null)  
    17.                 currentElement = current.item;  
    18.         } finally {  
    19.             takeLock.unlock();  
    20.             putLock.unlock();  
    21.         }  
    22.     }  
    23.   
    24.     public boolean hasNext() {  
    25.         return current != null;  
    26.     }  
    27.   
    28.     public E next() {  
    29.         final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;  
    30.         final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;  
    31.         putLock.lock();  
    32.         takeLock.lock();  
    33.         try {  
    34.             if (current == null)  
    35.                 throw new NoSuchElementException();  
    36.             E x = currentElement;  
    37.             lastRet = current;  
    38.             current = current.next;  
    39.             if (current != null)  
    40.                 currentElement = current.item;  
    41.             return x;  
    42.         } finally {  
    43.             takeLock.unlock();  
    44.             putLock.unlock();  
    45.         }  
    46.     }  
    47.   
    48.     public void remove() {  
    49.         if (lastRet == null)  
    50.             throw new IllegalStateException();  
    51.         final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;  
    52.         final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;  
    53.         putLock.lock();  
    54.         takeLock.lock();  
    55.         try {  
    56.             Node<E> node = lastRet;  
    57.             lastRet = null;  
    58.             Node<E> trail = head;  
    59.             Node<E> p = head.next;  
    60.             while (p != null && p != node) {  
    61.                 trail = p;  
    62.                 p = p.next;  
    63.             }  
    64.             if (p == node) {  
    65.                 p.item = null;  
    66.                 trail.next = p.next;  
    67.                 if (last == p)  
    68.                     last = trail;  
    69.                 int c = count.getAndDecrement();  
    70.                 if (c == capacity)  
    71.                     notFull.signalAll();  
    72.             }  
    73.         } finally {  
    74.             takeLock.unlock();  
    75.             putLock.unlock();  
    76.         }  
    77.     }  
    78. }  

             Itr类不复杂,我就不详细解释了。

            3)清除方法

            clear,drainTo等方法与 ArrayBlockingQueue类似,这里就不说了。

            8,.LinkedBlockingQueue与 ArrayBlockingQueue

            1)内部实现不同

            ArrayBlockingQueue内部队列存储使用的是数组:

    Java代码  收藏代码
    1. private final E[] items;  

            而 LinkedBlockingQueue内部队列存储使用的是Node节点内部类:

    Java代码  收藏代码
    1. static class Node<E> {  
    2.     /** The item, volatile to ensure barrier separating write and read */  
    3.     volatile E item;  
    4.     Node<E> next;  
    5.     Node(E x) { item = x; }  
    6. }  

            2)队列中锁的实现不同

    Java代码  收藏代码
    1. /** LinkedBlockingQueue的获取锁 */  
    2. private final ReentrantLock takeLock = new ReentrantLock();  
    3.   
    4. /** LinkedBlockingQueue的添加锁 */  
    5. private final ReentrantLock putLock = new ReentrantLock();  
    6.   
    7.   
    8. /** ArrayBlockingQueue的唯一锁 */  
    9. private final ReentrantLock lock;  

            从源代码就可以看出 ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加与获取使用的是同一个锁;而 LinkedBlockingQueue实现的队列中的锁是分离的,即添加用的是 putLock,获取是 takeLock。

            3)初始化条件不同

            ArrayBlockingQueue实现的队列中必须指定队列的大小。

            LinkedBlockingQueue实现的队列中可以不指定队列的大小,默认容量为Integer.MAX_VALUE。

            4)操作不同

            ArrayBlockingQueue无论是添加还是获取使用的是同一个锁,所以添加的同时就不能读取,读取的同时就不能添加,所以锁方面性能不如 LinkedBlockingQueue。

            LinkedBlockingQueue读取与添加操作使用不同的锁,因为其内部实现的特殊性,添加的时候只需要修改 last即可,而不会影响 head节点。而获取时也只需要修改 head节点即可,同样不会影响 last节点。所以在添加获取方面理论上性能会高于 ArrayBlockingQueue。

            所以 LinkedBlockingQueue更适合实现生产者-消费者队列。

  • 相关阅读:
    尖峰冲击测试(spike Testing)
    mysql返回记录的ROWNUM(转)
    SQL2005四个排名函数(row_number、rank、dense_rank和ntile)的比较
    JUnit编写单元测试代码注意点小结
    Linux下Tomcat的启动、关闭、杀死进程
    linux下oracle11g R2的启动与关闭监听、数据库
    linux下使用yum安装mysql详解
    VC++ 实现文件与应用程序关联
    C++ 去掉字符串首尾的 x20 字符
    VC++ 线程同步 总结
  • 原文地址:https://www.cnblogs.com/chencanjian/p/9350389.html
Copyright © 2011-2022 走看看