zoukankan      html  css  js  c++  java
  • BlockQueue中ArrayBlockingQueue和LinkedBlockingQueue比较

    LinkedBlockingQueue是BlockingQueue的一种使用Link List的实现,它对头和尾(取和添加操作)采用两把不同的锁,相对于ArrayBlockingQueue提高了吞吐量。它也是一种阻塞型的容器,适合于实现“消费者生产者”模式。

    ArrayBlockingQueue是对BlockingQueue的一个数组实现,它使用一把全局的锁并行对queue的读写操作,同时使用两个Condition阻塞容量为空时的取操作和容量满时的写操作。

    正因为LinkedBlockingQueue使用两个独立的锁控制数据同步,所以可以使存取两种操作并行执行,从而提高并发效率。而ArrayBlockingQueue使用一把锁,造成在存取两种操作争抢一把锁,而使得性能相对低下。LinkedBlockingQueue可以不设置队列容量,默认为Integer.MAX_VALUE.其容易造成内存溢出,一般要设置其值。

     LinkedBlockingQueue底层的定义如下:

    Java代码  收藏代码
    1. public class LinkedBlockingQueue<E> extends AbstractQueue<E>  
    2.         implements BlockingQueue<E>, java.io.Serializable {  
    3.   
    4.     static class Node<E> {  
    5.         /** The item, volatile to ensure barrier separating write and read */  
    6.         volatile E item;  
    7.         Node<E> next;  
    8.         Node(E x) { item = x; }  
    9.     }  
    10.   
    11.     // 支持原子操作  
    12.     private final AtomicInteger count = new AtomicInteger(0);  
    13.   
    14.     // 链表的头和尾  
    15.     private transient Node<E> head;  
    16.     private transient Node<E> last;  
    17.   
    18.     // 针对取和添加操作的两把锁及其上的条件  
    19.    private final ReentrantLock takeLock = new ReentrantLock();  
    20.     private final Condition notEmpty = takeLock.newCondition();  
    21.     private final ReentrantLock putLock = new ReentrantLock();  
    22.     private final Condition notFull = putLock.newCondition();  
    23.   
    24.    ...  
    25. }  

        LinkedBlockingQueue的添加操作:

    Java代码  收藏代码
    1. public class LinkedBlockingQueue<E> extends AbstractQueue<E>  
    2.         implements BlockingQueue<E>, java.io.Serializable {  
    3.   
    4.     private void insert(E x) {  
    5.         last = last.next = new Node<E>(x);  
    6.     }  
    7.   
    8.     /** 
    9.      * signal方法在被调用时,当前线程必须拥有该condition相关的锁! 
    10.      * Signal a waiting take. Called only from put/offer (which do not 
    11.      * otherwise ordinarily lock takeLock.) 
    12.      */  
    13.     private void signalNotEmpty() {  
    14.         final ReentrantLock takeLock = this.takeLock;  
    15.         takeLock.lock();  
    16.         try {  
    17.             notEmpty.signal();  
    18.         } finally {  
    19.             takeLock.unlock();  
    20.         }  
    21.     }  
    22.   
    23.     public void put(E o) throws InterruptedException {  
    24.         if (o == nullthrow new NullPointerException();  
    25.         int c = -1;  
    26.         final ReentrantLock putLock = this.putLock;  
    27.         final AtomicInteger count = this.count;  
    28.         // 使用putLock  
    29.         putLock.lockInterruptibly();  
    30.         try {  
    31.             try {  
    32.                   // 当容量已满时,等待notFull条件  
    33.             while (count.get() == capacity)  
    34.                     notFull.await();  
    35.             } catch (InterruptedException ie) {  
    36.                 notFull.signal(); // propagate to a non-interrupted thread  
    37.                 throw ie;  
    38.             }  
    39.             insert(o);  
    40.             // 取出当前值,并将原数据增加1  
    41.             c = count.getAndIncrement();  
    42.             // 容量不满,再次激活notFull上等待的put线程  
    43.         if (c + 1 < capacity)  
    44.                 notFull.signal();  
    45.         } finally {  
    46.             putLock.unlock();  
    47.         }  
    48.         // 必须先释放putLock再在notEmpty上signal,否则会造成死锁  
    49.      if (c == 0)  
    50.             signalNotEmpty();  
    51.     }  
    52.   
    53.   ...  
    54. }  

        LinkedBlockingQueue的取操作:

    Java代码  收藏代码
    1. public class LinkedBlockingQueue<E> extends AbstractQueue<E>  
    2.         implements BlockingQueue<E>, java.io.Serializable {  
    3.   
    4.     private E extract() {  
    5.         Node<E> first = head.next;  
    6.         head = first;  
    7.         E x = first.item;  
    8.         first.item = null;  
    9.         return x;  
    10.     }  
    11.   
    12.     private void signalNotFull() {  
    13.         final ReentrantLock putLock = this.putLock;  
    14.         putLock.lock();  
    15.         try {  
    16.             notFull.signal();  
    17.         } finally {  
    18.             putLock.unlock();  
    19.         }  
    20.     }  
    21.   
    22.     public E take() throws InterruptedException {  
    23.         E x;  
    24.         int c = -1;  
    25.         final AtomicInteger count = this.count;  
    26.         final ReentrantLock takeLock = this.takeLock;  
    27.         // 使用takeLock  
    28.         takeLock.lockInterruptibly();  
    29.         try {  
    30.             try {  
    31.                   // 若容量为空,等待notEmpty  
    32.                 while (count.get() == 0)  
    33.                     notEmpty.await();  
    34.             } catch (InterruptedException ie) {  
    35.                 notEmpty.signal(); // propagate to a non-interrupted thread  
    36.                 throw ie;  
    37.             }  
    38.   
    39.             x = extract();  
    40.             c = count.getAndDecrement();  
    41.             // 再次激活notEmpty  
    42.             if (c > 1)  
    43.                 notEmpty.signal();  
    44.         } finally {  
    45.             takeLock.unlock();  
    46.         }  
    47.         // take执行之前容量已满,则激活notFull  
    48.         if (c == capacity)  
    49.             signalNotFull();  
    50.         return x;  
    51.     }  
    52.   
    53.   ...  

     ArrayBlockingQueue底层定义如下:

    Java代码  收藏代码
    1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>  
    2.         implements BlockingQueue<E>, java.io.Serializable {  
    3.   
    4.     // 使用循环数组来实现queue,初始时takeIndex和putIndex均为0  
    5.     private final E[] items;  
    6.     private transient int takeIndex;  
    7.     private transient int putIndex;  
    8.     private int count;  
    9.   
    10.     // 用于并发的锁和条件  
    11.    private final ReentrantLock lock;  
    12.     private final Condition notEmpty;  
    13.     private final Condition notFull;  
    14.   
    15.     /** 
    16.      * 循环数组 
    17.      * Circularly increment i. 
    18.      */  
    19.     final int inc(int i) {  
    20.         return (++i == items.length)? 0 : i;  
    21.     }  
    22.   
    23.     public ArrayBlockingQueue(int capacity, boolean fair) {  
    24.         if (capacity <= 0)  
    25.             throw new IllegalArgumentException();  
    26.         this.items = (E[]) new Object[capacity];  
    27.         // 分配锁及该锁上的condition  
    28.         lock = new ReentrantLock(fair);  
    29.         notEmpty = lock.newCondition();  
    30.         notFull =  lock.newCondition();  
    31.     }  
    32.   
    33.   ...  
    34. }  

       ArrayBlockingQueue的取操作:

    Java代码  收藏代码
    1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>  
    2.         implements BlockingQueue<E>, java.io.Serializable {  
    3.   
    4.     private E extract() {  
    5.         final E[] items = this.items;  
    6.         E x = items[takeIndex];  
    7.         items[takeIndex] = null;  
    8.         takeIndex = inc(takeIndex);  
    9.         --count;  
    10.        // 激发notFull条件  
    11.         notFull.signal();  
    12.         return x;  
    13.     }  
    14.   
    15.      /** 
    16.         * condition的await的语义如下: 
    17.      * 与condition相关的锁以原子方式释放,并禁用该线程 
    18.      * 方法返回时,线程必须获得与该condition相关的锁 
    19.      */  
    20.     public E take() throws InterruptedException {  
    21.         final ReentrantLock lock = this.lock;  
    22.         lock.lockInterruptibly();  
    23.         try {  
    24.             try {  
    25.                   // 等待notEmpty的条件  
    26.                 while (count == 0)  
    27.                     notEmpty.await();  
    28.             } catch (InterruptedException ie) {  
    29.                 notEmpty.signal(); // propagate to non-interrupted thread  
    30.                 throw ie;  
    31.             }  
    32.             E x = extract();  
    33.             return x;  
    34.         } finally {  
    35.             lock.unlock();  
    36.         }  
    37.     }  
    38.   
    39.   ...  
    40. }  

       ArrayBlockingQueue的写操作:

    Java代码  收藏代码
    1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>  
    2.         implements BlockingQueue<E>, java.io.Serializable {  
    3.   
    4.     private void insert(E x) {  
    5.         items[putIndex] = x;  
    6.         putIndex = inc(putIndex);  
    7.         ++count;  
    8.         notEmpty.signal();  
    9.     }  
    10.   
    11.     public void put(E o) throws InterruptedException {  
    12.         if (o == nullthrow new NullPointerException();  
    13.         final E[] items = this.items;  
    14.         final ReentrantLock lock = this.lock;  
    15.         lock.lockInterruptibly();  
    16.         try {  
    17.             try {  
    18.                   // 等待notFull条件  
    19.            while (count == items.length)  
    20.                     notFull.await();  
    21.             } catch (InterruptedException ie) {  
    22.                 notFull.signal(); // propagate to non-interrupted thread  
    23.                 throw ie;  
    24.             }  
    25.             insert(o);  
    26.         } finally {  
    27.             lock.unlock();  
    28.         }  
    29.     }  
    30.   
    31.   ...  
    32. }  

        注意:ArrayBlockingQueue在读写操作上都需要锁住整个容器,因此吞吐量与一般的实现是相似的,适合于实现“生产者消费者”模式。

    通过保证在临界区上多个线程的相互排斥,线程间可以完全避免竞争状态的发生,但是有时候还是需要线程之间的相互协作。使用条件(Condition)便于线程间通信。一个线程可以指定在某种条件下该做什么。标间是通过调用Lock对象的newCoditionn()方法来实现线程之间的相互通信的。

     一旦创建一个条件,就可使用await()、signal()、signalAll()方法来实现线程间通信。await()方法可以让当前线程都处于等待状态,知道条件放生。signal()方法唤醒一个等待的线程,而signalAll()方法唤醒所有等待线程。

    注:

    Lock接口的 线程请求锁的 几个方法:

    lock(), 拿不到lock就不罢休,不然线程就一直block。 比较无赖的做法。
    tryLock(),马上返回,拿到lock就返回true,不然返回false。 比较潇洒的做法。
    带时间限制的tryLock(),拿不到lock,就等一段时间,超时返回false。比较聪明的做法。

    下面的lockInterruptibly()就稍微难理解一些。

    先说说线程的打扰机制,每个线程都有一个 打扰 标志。这里分两种情况,
    1. 线程在sleep或wait,join, 此时如果别的进程调用此进程的 interrupt()方法,此线程会被唤醒并被要求处理InterruptedException;(thread在做IO操作时也可能有类似行为,见java thread api)
    2. 此线程在运行中, 则不会收到提醒。但是 此线程的 “打扰标志”会被设置, 可以通过isInterrupted()查看并 作出处理。

    lockInterruptibly()和上面的第一种情况是一样的, 线程在请求lock并被阻塞时,如果被interrupt,则“此线程会被唤醒并被要求处理InterruptedException”。

    引自:http://zhuhui-zj.iteye.com/blog/784193

  • 相关阅读:
    Quicksum -SilverN
    uva 140 bandwidth (好题) ——yhx
    uva 129 krypton factors ——yhx
    uva 524 prime ring problem——yhx
    uva 10976 fractions again(水题)——yhx
    uva 11059 maximum product(水题)——yhx
    uva 725 division(水题)——yhx
    uva 11853 paintball(好题)——yhx
    uva 1599 ideal path(好题)——yhx
    uva 1572 self-assembly ——yhx
  • 原文地址:https://www.cnblogs.com/shz365/p/3766246.html
Copyright © 2011-2022 走看看