zoukankan      html  css  js  c++  java
  • BlockQueue中ArrayBlockingQueue和LinkedBlockingQueue比较

    LinkedBlockingQueue是BlockingQueue的一种使用Link List的实现,它对头和尾(取和添加操作)采用两把不同的锁,相对于ArrayBlockingQueue提高了吞吐量。它也是一种阻塞型的容器,适合于实现“消费者生产者”模式。

    ArrayBlockingQueue是对BlockingQueue的一个数组实现,它使用一把全局的锁并行对queue的读写操作,同时使用两个Condition阻塞容量为空时的取操作和容量满时的写操作。

    正因为LinkedBlockingQueue使用两个独立的锁控制数据同步,所以可以使存取两种操作并行执行,从而提高并发效率。而ArrayBlockingQueue使用一把锁,造成在存取两种操作争抢一把锁,而使得性能相对低下。LinkedBlockingQueue可以不设置队列容量,默认为Integer.MAX_VALUE.其容易造成内存溢出,一般要设置其值。

     LinkedBlockingQueue底层的定义如下:

    Java代码  收藏代码
    1. public class LinkedBlockingQueue<E> extends AbstractQueue<E>  
    2.         implements BlockingQueue<E>, java.io.Serializable {  
    3.   
    4.     static class Node<E> {  
    5.         /** The item, volatile to ensure barrier separating write and read */  
    6.         volatile E item;  
    7.         Node<E> next;  
    8.         Node(E x) { item = x; }  
    9.     }  
    10.   
    11.     // 支持原子操作  
    12.     private final AtomicInteger count = new AtomicInteger(0);  
    13.   
    14.     // 链表的头和尾  
    15.     private transient Node<E> head;  
    16.     private transient Node<E> last;  
    17.   
    18.     // 针对取和添加操作的两把锁及其上的条件  
    19.    private final ReentrantLock takeLock = new ReentrantLock();  
    20.     private final Condition notEmpty = takeLock.newCondition();  
    21.     private final ReentrantLock putLock = new ReentrantLock();  
    22.     private final Condition notFull = putLock.newCondition();  
    23.   
    24.    ...  
    25. }  

        LinkedBlockingQueue的添加操作:

    Java代码  收藏代码
    1. public class LinkedBlockingQueue<E> extends AbstractQueue<E>  
    2.         implements BlockingQueue<E>, java.io.Serializable {  
    3.   
    4.     private void insert(E x) {  
    5.         last = last.next = new Node<E>(x);  
    6.     }  
    7.   
    8.     /** 
    9.      * signal方法在被调用时,当前线程必须拥有该condition相关的锁! 
    10.      * Signal a waiting take. Called only from put/offer (which do not 
    11.      * otherwise ordinarily lock takeLock.) 
    12.      */  
    13.     private void signalNotEmpty() {  
    14.         final ReentrantLock takeLock = this.takeLock;  
    15.         takeLock.lock();  
    16.         try {  
    17.             notEmpty.signal();  
    18.         } finally {  
    19.             takeLock.unlock();  
    20.         }  
    21.     }  
    22.   
    23.     public void put(E o) throws InterruptedException {  
    24.         if (o == nullthrow new NullPointerException();  
    25.         int c = -1;  
    26.         final ReentrantLock putLock = this.putLock;  
    27.         final AtomicInteger count = this.count;  
    28.         // 使用putLock  
    29.         putLock.lockInterruptibly();  
    30.         try {  
    31.             try {  
    32.                   // 当容量已满时,等待notFull条件  
    33.             while (count.get() == capacity)  
    34.                     notFull.await();  
    35.             } catch (InterruptedException ie) {  
    36.                 notFull.signal(); // propagate to a non-interrupted thread  
    37.                 throw ie;  
    38.             }  
    39.             insert(o);  
    40.             // 取出当前值,并将原数据增加1  
    41.             c = count.getAndIncrement();  
    42.             // 容量不满,再次激活notFull上等待的put线程  
    43.         if (c + 1 < capacity)  
    44.                 notFull.signal();  
    45.         } finally {  
    46.             putLock.unlock();  
    47.         }  
    48.         // 必须先释放putLock再在notEmpty上signal,否则会造成死锁  
    49.      if (c == 0)  
    50.             signalNotEmpty();  
    51.     }  
    52.   
    53.   ...  
    54. }  

        LinkedBlockingQueue的取操作:

    Java代码  收藏代码
    1. public class LinkedBlockingQueue<E> extends AbstractQueue<E>  
    2.         implements BlockingQueue<E>, java.io.Serializable {  
    3.   
    4.     private E extract() {  
    5.         Node<E> first = head.next;  
    6.         head = first;  
    7.         E x = first.item;  
    8.         first.item = null;  
    9.         return x;  
    10.     }  
    11.   
    12.     private void signalNotFull() {  
    13.         final ReentrantLock putLock = this.putLock;  
    14.         putLock.lock();  
    15.         try {  
    16.             notFull.signal();  
    17.         } finally {  
    18.             putLock.unlock();  
    19.         }  
    20.     }  
    21.   
    22.     public E take() throws InterruptedException {  
    23.         E x;  
    24.         int c = -1;  
    25.         final AtomicInteger count = this.count;  
    26.         final ReentrantLock takeLock = this.takeLock;  
    27.         // 使用takeLock  
    28.         takeLock.lockInterruptibly();  
    29.         try {  
    30.             try {  
    31.                   // 若容量为空,等待notEmpty  
    32.                 while (count.get() == 0)  
    33.                     notEmpty.await();  
    34.             } catch (InterruptedException ie) {  
    35.                 notEmpty.signal(); // propagate to a non-interrupted thread  
    36.                 throw ie;  
    37.             }  
    38.   
    39.             x = extract();  
    40.             c = count.getAndDecrement();  
    41.             // 再次激活notEmpty  
    42.             if (c > 1)  
    43.                 notEmpty.signal();  
    44.         } finally {  
    45.             takeLock.unlock();  
    46.         }  
    47.         // take执行之前容量已满,则激活notFull  
    48.         if (c == capacity)  
    49.             signalNotFull();  
    50.         return x;  
    51.     }  
    52.   
    53.   ...  

     ArrayBlockingQueue底层定义如下:

    Java代码  收藏代码
    1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>  
    2.         implements BlockingQueue<E>, java.io.Serializable {  
    3.   
    4.     // 使用循环数组来实现queue,初始时takeIndex和putIndex均为0  
    5.     private final E[] items;  
    6.     private transient int takeIndex;  
    7.     private transient int putIndex;  
    8.     private int count;  
    9.   
    10.     // 用于并发的锁和条件  
    11.    private final ReentrantLock lock;  
    12.     private final Condition notEmpty;  
    13.     private final Condition notFull;  
    14.   
    15.     /** 
    16.      * 循环数组 
    17.      * Circularly increment i. 
    18.      */  
    19.     final int inc(int i) {  
    20.         return (++i == items.length)? 0 : i;  
    21.     }  
    22.   
    23.     public ArrayBlockingQueue(int capacity, boolean fair) {  
    24.         if (capacity <= 0)  
    25.             throw new IllegalArgumentException();  
    26.         this.items = (E[]) new Object[capacity];  
    27.         // 分配锁及该锁上的condition  
    28.         lock = new ReentrantLock(fair);  
    29.         notEmpty = lock.newCondition();  
    30.         notFull =  lock.newCondition();  
    31.     }  
    32.   
    33.   ...  
    34. }  

       ArrayBlockingQueue的取操作:

    Java代码  收藏代码
    1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>  
    2.         implements BlockingQueue<E>, java.io.Serializable {  
    3.   
    4.     private E extract() {  
    5.         final E[] items = this.items;  
    6.         E x = items[takeIndex];  
    7.         items[takeIndex] = null;  
    8.         takeIndex = inc(takeIndex);  
    9.         --count;  
    10.        // 激发notFull条件  
    11.         notFull.signal();  
    12.         return x;  
    13.     }  
    14.   
    15.      /** 
    16.         * condition的await的语义如下: 
    17.      * 与condition相关的锁以原子方式释放,并禁用该线程 
    18.      * 方法返回时,线程必须获得与该condition相关的锁 
    19.      */  
    20.     public E take() throws InterruptedException {  
    21.         final ReentrantLock lock = this.lock;  
    22.         lock.lockInterruptibly();  
    23.         try {  
    24.             try {  
    25.                   // 等待notEmpty的条件  
    26.                 while (count == 0)  
    27.                     notEmpty.await();  
    28.             } catch (InterruptedException ie) {  
    29.                 notEmpty.signal(); // propagate to non-interrupted thread  
    30.                 throw ie;  
    31.             }  
    32.             E x = extract();  
    33.             return x;  
    34.         } finally {  
    35.             lock.unlock();  
    36.         }  
    37.     }  
    38.   
    39.   ...  
    40. }  

       ArrayBlockingQueue的写操作:

    Java代码  收藏代码
    1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>  
    2.         implements BlockingQueue<E>, java.io.Serializable {  
    3.   
    4.     private void insert(E x) {  
    5.         items[putIndex] = x;  
    6.         putIndex = inc(putIndex);  
    7.         ++count;  
    8.         notEmpty.signal();  
    9.     }  
    10.   
    11.     public void put(E o) throws InterruptedException {  
    12.         if (o == nullthrow new NullPointerException();  
    13.         final E[] items = this.items;  
    14.         final ReentrantLock lock = this.lock;  
    15.         lock.lockInterruptibly();  
    16.         try {  
    17.             try {  
    18.                   // 等待notFull条件  
    19.            while (count == items.length)  
    20.                     notFull.await();  
    21.             } catch (InterruptedException ie) {  
    22.                 notFull.signal(); // propagate to non-interrupted thread  
    23.                 throw ie;  
    24.             }  
    25.             insert(o);  
    26.         } finally {  
    27.             lock.unlock();  
    28.         }  
    29.     }  
    30.   
    31.   ...  
    32. }  

        注意:ArrayBlockingQueue在读写操作上都需要锁住整个容器,因此吞吐量与一般的实现是相似的,适合于实现“生产者消费者”模式。

    通过保证在临界区上多个线程的相互排斥,线程间可以完全避免竞争状态的发生,但是有时候还是需要线程之间的相互协作。使用条件(Condition)便于线程间通信。一个线程可以指定在某种条件下该做什么。标间是通过调用Lock对象的newCoditionn()方法来实现线程之间的相互通信的。

     一旦创建一个条件,就可使用await()、signal()、signalAll()方法来实现线程间通信。await()方法可以让当前线程都处于等待状态,知道条件放生。signal()方法唤醒一个等待的线程,而signalAll()方法唤醒所有等待线程。

    注:

    Lock接口的 线程请求锁的 几个方法:

    lock(), 拿不到lock就不罢休,不然线程就一直block。 比较无赖的做法。
    tryLock(),马上返回,拿到lock就返回true,不然返回false。 比较潇洒的做法。
    带时间限制的tryLock(),拿不到lock,就等一段时间,超时返回false。比较聪明的做法。

    下面的lockInterruptibly()就稍微难理解一些。

    先说说线程的打扰机制,每个线程都有一个 打扰 标志。这里分两种情况,
    1. 线程在sleep或wait,join, 此时如果别的进程调用此进程的 interrupt()方法,此线程会被唤醒并被要求处理InterruptedException;(thread在做IO操作时也可能有类似行为,见java thread api)
    2. 此线程在运行中, 则不会收到提醒。但是 此线程的 “打扰标志”会被设置, 可以通过isInterrupted()查看并 作出处理。

    lockInterruptibly()和上面的第一种情况是一样的, 线程在请求lock并被阻塞时,如果被interrupt,则“此线程会被唤醒并被要求处理InterruptedException”。

    引自:http://zhuhui-zj.iteye.com/blog/784193

  • 相关阅读:
    上下文调用(call , apply , bind)
    源码学习第七天(水滴石穿)
    学习源码第六天(加油别放弃)
    学习源码第五天(难得可贵)
    学习源码第四天(昨天只看了一点正则,发现正则真的水很深,但很有魅力)
    简单谈谈$.merge()
    学习源码第三天(短暂的坚持)
    学习源码第二天(渐入佳境)
    jquery源码学习第一天
    经典面试题简单分析
  • 原文地址:https://www.cnblogs.com/shz365/p/3766246.html
Copyright © 2011-2022 走看看