zoukankan      html  css  js  c++  java
  • 【1】【JUC】JDK1.8源码分析之ArrayBlockingQueue,LinkedBlockingQueue

    概要:

    ArrayBlockingQueue的内部是通过一个可重入锁ReentrantLock和两个Condition条件对象来实现阻塞

    注意这两个Condition即ReentrantLock的Condition:ReentrantLock的内部类Sync继承了AQS这个抽象类,Sync的newCondition()返回AQS的ConditionObject

    关键:队列已经满时入队,将当前线程加入notFull这个Condition等待队列,唤醒等待notEmpty条件的线程。

    关键:队列为空时出队,将当前线程加入notEmpty这个Condition等待队列,唤醒等待notFull条件的线程

    AQS同步队列与Condition等待队列协同机制概要

    AQS维护了一个同步队列

    Condition是JUC的一个接口,AQS的ConditionObject实现了这个接口,维护了一个等待队列(等待signal信号的队列)

     线程调用reentrantLock.lock()时,线程被加入到AQS同步队列中;

    线程A调用condition.await()方法时,线程A从AQS同步队列中被移除,被加入到Condition等待队列,等待signal信号

    线程B调用signal()方法,Condition等待队列中有一个节点A,把它取出来(A)加入到AQS同步队列中。这时候线程A并没有被唤醒(signal可以指定唤醒哪个condition)

    只有发送singal信号的线程调用reentrantLock.unLock()后,因为它(线程A)已经被加入到AQS同步队列并且成为同步队列头结点,所以线程才会被唤醒。

    一、前言

      在完成Map下的并发集合后,现在来分析ArrayBlockingQueue,ArrayBlockingQueue可以用作一个阻塞型队列,支持多任务并发操作,有了之前看源码的积累,再看ArrayBlockingQueue源码会很容易,下面开始正文。

      ArrayBlockingQueue的内部是通过一个可重入锁ReentrantLock和两个Condition条件对象来实现阻塞

      由于Java中的阻塞队列接口BlockingQueue继承自Queue接口,因此先来看看阻塞队列接口为我们提供的主要方法

    public interface BlockingQueue<E> extends Queue<E> {
    
        //将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量)
        //在成功时返回 true,如果此队列已满,则抛IllegalStateException。 
        boolean add(E e); 
    
        //将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量) 
        // 将指定的元素插入此队列的尾部,如果该队列已满, 
        //则在到达指定的等待时间之前等待可用的空间,该方法可中断 
        boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; 
    
        //将指定的元素插入此队列的尾部,如果该队列已满,则一直等到(阻塞)。 
        void put(E e) throws InterruptedException; 
    
        //获取并移除此队列的头部,如果没有元素则等待(阻塞), 
        //直到有元素将唤醒等待线程执行该操作 
        E take() throws InterruptedException; 
    
        //获取并移除此队列的头部,在指定的等待时间前一直等到获取元素, //超过时间方法将结束
        E poll(long timeout, TimeUnit unit) throws InterruptedException; 
    
        //从此队列中移除指定元素的单个实例(如果存在)。 
        boolean remove(Object o); 
    }
    
        //除了上述方法还有继承自Queue接口的方法 
        //获取但不移除此队列的头元素,没有则跑异常NoSuchElementException 
        E element(); 
    
        //获取但不移除此队列的头;如果此队列为空,则返回 null。 
        E peek(); 
    
        //获取并移除此队列的头,如果此队列为空,则返回 null。 
        E poll();
    View Code

    这里我们把上述操作进行分类

    • 插入方法

      • add(E e) : 添加成功返回true,失败抛IllegalStateException异常,add调用了offer函数
      • offer(E e) : 成功返回 true,如果此队列已满,则返回 false
      • put(E e) :将元素插入此队列的尾部,如果该队列已满,则一直阻塞
    • 删除方法:

      • remove(Object o) :移除指定元素,成功返回true,失败返回false
      • poll() : 获取并移除此队列的头元素,若队列为空,则返回 null
      • take():获取并移除此队列头元素,若没有元素则一直阻塞
    • 检查方法

      • element() :获取但不移除此队列的头元素,没有元素则抛异常
      • peek() :获取但不移除此队列的头;若队列为空,则返回 null

    阻塞队列的对元素的增删查操作主要就是上述的三类方法,通常情况下我们都是通过这3类方法操作阻塞队列

    二、ArrayBlockingQueue数据结构

      通过源码分析,并且可以对比ArrayList可知,ArrayBlockingQueue的底层数据结构是数组,数据结构如下

      说明:ArrayBlockingQueue底层采用数组存放数据,对数组的访问添加了锁的机制,使其能够支持多线程并发。

    三、ArrayBlockingQueue源码分析

      3.1 类的继承关系  

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {}

    说明:可以看到ArrayBlockingQueue继承了AbstractQueue抽象类,AbstractQueue定义了对队列的基本操作;同时实现了BlockingQueue接口,BlockingQueue表示阻塞型的队列,其对队列的操作可能会抛出异常;同时也实现了Searializable接口,表示可以被序列化。

      3.2 类的属性  

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
        // 版本序列号
        private static final long serialVersionUID = -817911632652898426L;
        // 存放实际元素的数组
        final Object[] items;
        // 取元素索引
        int takeIndex;
        // 获取元素索引
        int putIndex;
        // 队列中的项
        int count;
        // 可重入锁
        final ReentrantLock lock;
        // 等待获取条件
        private final Condition notEmpty;
        // 等待存放条件
        private final Condition notFull;
        // 迭代器
        transient Itrs itrs = null;
    }
    View Code

     说明:从类的属性中可以清楚的看到其底层的结构是Object类型的数组,取元素和存元素有不同的索引,有一个可重入锁ReentrantLock,两个条件Condition。对ReentrantLock和Condition不太熟悉的读者可以参考笔者的这篇博客,【JUC】JDK1.8源码分析之ReentrantLock(三)

    注意这两个Condition即ReentrantLock的Condition:

    关键:队列已经满时入队,将当前线程加入notFull这个Condition等待队列,唤醒等待notEmpty条件的线程。

    关键:队列为空时出队,将当前线程加入notEmpty这个Condition等待队列,唤醒等待notFull条件的线程

        // 等待获取条件
        private final Condition notEmpty; 
       // 等待存放条件
        private final Condition notFull;

    值得注意的是ArrayBlockingQueue通过一个ReentrantLock来同时控制添加线程与移除线程的并发访问,这点与LinkedBlockingQueue区别很大(稍后会分析)。

      3.3 类的构造函数

      1. ArrayBlockingQueue(int)型构造函数 

    public ArrayBlockingQueue(int capacity) {
            // 调用两个参数的构造函数
            this(capacity, false);
        }
    View Code

    说明:该构造函数用于创建一个带有给定的(固定)容量和默认访问策略的 ArrayBlockingQueue。

      2. ArrayBlockingQueue(int, boolean)型构造函数  

    public ArrayBlockingQueue(int capacity, boolean fair) {
            // 初始容量必须大于0
            if (capacity <= 0)
                throw new IllegalArgumentException();
            // 初始化数组
            this.items = new Object[capacity];
            // 初始化可重入锁
            lock = new ReentrantLock(fair);
            // 初始化等待条件
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    View Code

    说明:该构造函数用于创建一个具有给定的(固定)容量和指定访问策略的 ArrayBlockingQueue。

    ArrayBlockingQueue内部的阻塞队列是通过重入锁ReenterLock和Condition条件队列实现的,所以ArrayBlockingQueue中的元素存在公平访问与非公平访问的区别,对于公平访问队列,被阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞的线程先访问队列。而非公平队列,当队列可用时,阻塞的线程将进入争夺访问资源的竞争中,也就是说谁先抢到谁就执行,没有固定的先后顺序。创建公平与非公平阻塞队列代码如下:

     //默认非公平阻塞队列
    ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
    //公平阻塞队列

    ArrayBlockingQueue queue1 = new ArrayBlockingQueue(2,true);

      3. ArrayBlockingQueue(int, boolean, Collection<? extends E>)型构造函数 

    public ArrayBlockingQueue(int capacity, boolean fair,
                                  Collection<? extends E> c) {
            // 调用两个参数的构造函数
            this(capacity, fair);
            // 可重入锁
            final ReentrantLock lock = this.lock;
            // 上锁
            lock.lock(); // Lock only for visibility, not mutual exclusion
            try {
                int i = 0;
                try {
                    for (E e : c) { // 遍历集合
                        // 检查元素是否为空
                        checkNotNull(e);
                        // 存入ArrayBlockingQueue中
                        items[i++] = e;
                    }
                } catch (ArrayIndexOutOfBoundsException ex) { // 当初始化容量小于传入集合的大小时,会抛出异常
                    throw new IllegalArgumentException();
                }
                // 元素数量
                count = i;
                // 初始化存元素的索引
                putIndex = (i == capacity) ? 0 : i;
            } finally {
                // 释放锁
                lock.unlock();
            }
        }
    View Code

    说明:该构造函数用于创建一个具有给定的(固定)容量和指定访问策略的 ArrayBlockingQueue,它最初包含给定 collection 的元素,并以 collection 迭代器的遍历顺序添加元素。

      3.4 核心函数分析

      1. put函数  

    public void put(E e) throws InterruptedException {
            checkNotNull(e);
            // 获取可重入锁
            final ReentrantLock lock = this.lock;
            // 如果当前线程未被中断,则获取锁
            lock.lockInterruptibly();
            try {
                while (count == items.length) // 判断元素是否已满
                    // 若满,则等待
                    notFull.await();
                // 入队列
                enqueue(e);
            } finally {
                // 释放锁
                lock.unlock();
            }
        }
    View Code

    关键队列已经满时notFull.await();将当前线程加入notFull这个Condition等待队列

    说明:put函数用于存放元素,在当前线程被中断时会抛出异常,并且当队列已经满时,会阻塞一直等待。其中,put会调用enqueue函数,enqueue函数源码如下

    private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            // 获取数组
            final Object[] items = this.items;
            // 将元素放入
            items[putIndex] = x;
            if (++putIndex == items.length) // 放入后存元素的索引等于数组长度(表示已满)
                // 重置存索引为0
                putIndex = 0;
            // 元素数量加1
            count++;
            // 唤醒在notEmpty条件上等待的线程
            notEmpty.signal();
        }
    View Code

    说明:enqueue函数用于将元素存入底层Object数组中,并且会唤醒等待notEmpty条件的线程

      2. offer函数 

    public boolean offer(E e) {
            // 检查元素不能为空
            checkNotNull(e);
            // 可重入锁
            final ReentrantLock lock = this.lock;
            // 获取锁
            lock.lock();
            try {
                if (count == items.length) // 元素个数等于数组长度,则返回
                    return false; 
                else { // 添加进数组
                    enqueue(e);
                    return true;
                }
            } finally {
                // 释放数组
                lock.unlock();
            }
        }
    View Code

    说明:offer函数也用于存放元素,在调用ArrayBlockingQueue的add方法时,会间接的调用到offer函数,offer函数添加元素不会抛出异常,当底层Object数组已满时,则返回false,否则,会调用enqueue函数,将元素存入底层Object数组。并唤醒等待notEmpty条件的线程。

      add函数:

    //add方法实现,间接调用了offer(e)
    public boolean add(E e) {
            if (offer(e))
                return true;
            else
                throw new IllegalStateException("Queue full");
        }
    View Code

     到此我们对三个添加方法即put,offer,add都分析完毕,其中offer,add在正常情况下都是无阻塞的添加,而put方法是阻塞添加由源码可知put在队列满时await(),offer在队列满时返回false。这就是阻塞队列的添加过程。说白了就是当队列满时通过条件对象Condtion来阻塞当前调用put方法的线程,直到线程又再次被唤醒执行。

    这里的add方法和offer方法实现比较简单,其中需要注意的是enqueue(E x)方法,其方法内部通过putIndex索引直接将元素添加到数组items中,这里可能会疑惑的是当putIndex索引大小等于数组长度时,需要将putIndex重新设置为0,这是因为当前队列执行元素获取时总是从队列头部获取,而添加元素从中从队列尾部获取所以当队列索引(从0开始)与数组长度相等时,下次我们就需要从数组头部开始添加了,如下图演示 (队列的头尾是takeIndex和putIndex,不是数组的头尾

      3. take函数  

    public E take() throws InterruptedException {
            // 可重入锁
            final ReentrantLock lock = this.lock;
            // 如果当前线程未被中断,则获取锁,中断会抛出异常
            lock.lockInterruptibly();
            try {
                while (count == 0) // 元素数量为0,即Object数组为空
                    // 则等待notEmpty条件
                    notEmpty.await();
                // 出队列
                return dequeue();
            } finally {
                // 释放锁
                lock.unlock();
            }
        }
    View Code

    关键:队列为空时notEmpty.await();将当前线程加入notEmpty这个Condition等待队列

    说明:take函数用于从ArrayBlockingQueue中获取一个元素,其与put函数相对应,在当前线程被中断时会抛出异常,并且当队列为空时,会阻塞一直等待。其中,take会调用dequeue函数,dequeue函数源码如下

    private E dequeue() {
            // assert lock.getHoldCount() == 1;
            // assert items[takeIndex] != null;
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            // 取元素
            E x = (E) items[takeIndex];
            // 该索引的值赋值为null
            items[takeIndex] = null;
            // 取值索引等于数组长度
            if (++takeIndex == items.length)
                // 重新赋值取值索引
                takeIndex = 0;
            // 元素个数减1
            count--;
            if (itrs != null) 
                itrs.elementDequeued();
            // 唤醒在notFull条件上等待的线程
            notFull.signal();
            return x;
        }
    View Code

    说明:dequeue函数用于将取元素,并且会唤醒等待notFull条件的线程

      4. poll函数 

    public E poll() {
            // 重入锁
            final ReentrantLock lock = this.lock;
            // 获取锁
            lock.lock();
            try {
                // 若元素个数为0则返回null,否则,调用dequeue,出队列
                return (count == 0) ? null : dequeue();
            } finally {
                // 释放锁
                lock.unlock();
            }
        }
    View Code

    说明:poll(),获取并删除队列头元素,队列没有数据就返回null,内部通过dequeue()方法删除头元素

      接着看remove(Object o)方法:

    public boolean remove(Object o) {
        if (o == null) return false;
        //获取数组数据
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();//加锁
        try {
            //如果此时队列不为null,这里是为了防止并发情况
            if (count > 0) {
                //获取下一个要添加元素时的索引
                final int putIndex = this.putIndex;
                //获取当前要被删除元素的索引
                int i = takeIndex;
                //执行循环查找要删除的元素
                do {
                    //找到要删除的元素
                    if (o.equals(items[i])) {
                        removeAt(i);//执行删除
                        return true;//删除成功返回true
                    }
                    //当前删除索引执行加1后判断是否与数组长度相等
                    //若为true,说明索引已到数组尽头,将i设置为0
                    if (++i == items.length)
                        i = 0; 
                } while (i != putIndex);//继承查找
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
    
    //根据索引删除元素,实际上是把删除索引之后的元素往前移动一个位置
    void removeAt(final int removeIndex) {
    
         final Object[] items = this.items;
          //先判断要删除的元素是否为当前队列头元素
          if (removeIndex == takeIndex) {
              //如果是直接删除
              items[takeIndex] = null;
              //当前队列头元素加1并判断是否与数组长度相等,若为true设置为0
              if (++takeIndex == items.length)
                  takeIndex = 0;
              count--;//队列元素减1
              if (itrs != null)
                  itrs.elementDequeued();//更新迭代器中的数据
          } else {
          //如果要删除的元素不在队列头部,
          //那么只需循环迭代把删除元素后面的所有元素往前移动一个位置
              //获取下一个要被添加的元素的索引,作为循环判断结束条件
              final int putIndex = this.putIndex;
              //执行循环
              for (int i = removeIndex;;) {
                  //获取要删除节点索引的下一个索引
                  int next = i + 1;
                  //判断是否已为数组长度,如果是从数组头部(索引为0)开始找
                  if (next == items.length)
                      next = 0;
                   //如果查找的索引不等于要添加元素的索引,说明元素可以再移动
                  if (next != putIndex) {
                      items[i] = items[next];//把后一个元素前移覆盖要删除的元
                      i = next;
                  } else {
                  //在removeIndex索引之后的元素都往前移动完毕后清空最后一个元素
                      items[i] = null;
                      this.putIndex = i;
                      break;//结束循环
                  }
              }
              count--;//队列元素减1
              if (itrs != null)
                  itrs.removedAt(removeIndex);//更新迭代器数据
          }
          notFull.signal();//唤醒添加线程
        }
    View Code

     remove(Object o)方法的删除过程相对复杂些,因为该方法并不是直接从队列头部删除元素。首先线程先获取锁,再一步判断队列count>0,这点是保证并发情况下删除操作安全执行。接着获取下一个要添加源的索引putIndex以及takeIndex索引 ,作为后续循环的结束判断,因为只要putIndex与takeIndex不相等就说明队列没有结束。然后通过while循环找到要删除的元素索引,执行removeAt(i)方法删除,在removeAt(i)方法中实际上做了两件事,一是首先判断队列头部元素是否为删除元素,如果是直接删除,并唤醒添加线程,二是如果要删除的元素并不是队列头元素,那么执行循环操作,从要删除元素的索引removeIndex之后的元素都往前移动一个位置,那么要删除的元素就被removeIndex之后的元素替换,从而也就完成了删除操作。

      5. clear函数  

    public void clear() {
            // 数组
            final Object[] items = this.items;
            // 可重入锁
            final ReentrantLock lock = this.lock;
            // 获取锁
            lock.lock();
            try {
                // 保存元素个数
                int k = count;
                if (k > 0) { // 元素个数大于0
                    // 存数元素索引
                    final int putIndex = this.putIndex;
                    // 取元素索引
                    int i = takeIndex;
                    do {
                        // 赋值为null
                        items[i] = null;
                        if (++i == items.length) // 重新赋值i
                            i = 0;
                    } while (i != putIndex);
                    // 重新赋值取元素索引
                    takeIndex = putIndex;
                    // 元素个数为0
                    count = 0;
                    if (itrs != null)
                        itrs.queueIsEmpty();
                    for (; k > 0 && lock.hasWaiters(notFull); k--) // 若有等待notFull条件的线程,则逐一唤醒
                        notFull.signal();
                }
            } finally {
                // 释放锁
                lock.unlock();
            }
        }
    View Code

    说明:clear函数用于清空ArrayBlockingQueue,并且会释放所有等待notFull条件的线程(存放元素的线程)。

    四、示例

      下面给出一个具体的示例来演示ArrayBlockingQueue的使用

    package com.hust.grid.leesf.collections;
    
    import java.util.concurrent.ArrayBlockingQueue;
    
    class PutThread extends Thread {
        private ArrayBlockingQueue<Integer> abq;
        public PutThread(ArrayBlockingQueue<Integer> abq) {
            this.abq = abq;
        }
        
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    System.out.println("put " + i);
                    abq.put(i);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    class GetThread extends Thread {
        private ArrayBlockingQueue<Integer> abq;
        public GetThread(ArrayBlockingQueue<Integer> abq) {
            this.abq = abq;
        }
        
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    System.out.println("take " + abq.take());
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    public class ArrayBlockingQueueDemo {
        public static void main(String[] args) {
            ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<Integer>(10);
            PutThread p1 = new PutThread(abq);
            GetThread g1 = new GetThread(abq);
            
            p1.start();
            g1.start();
        }
    }
    View Code
    运行结果:
    put 0
    take 0
    put 1
    take 1
    put 2
    take 2
    put 3
    take 3
    put 4
    take 4
    put 5
    take 5
    put 6
    take 6
    put 7
    take 7
    put 8
    take 8
    put 9
    take 9
    View Code

    说明:示例中使用了两个线程,一个用于存元素,一个用于读元素,存和读各10次,每个线程存一个元素或者读一个元素后都会休眠100ms,可以看到结果是交替打印,并且首先打印的肯定是put线程语句(因为若取线程先取元素,此时队列并没有元素,其会阻塞,等待存线程存入元素),并且最终程序可以正常结束。

    五、总结

      总的来说,有了前面分析的基础,分析ArrayBlockingQueue就会非常的简单,ArrayBlockingQueue是通过ReentrantLock和Condition条件来保证多线程的正确访问的。

    LinkedBlockingQueue的实现原理剖析       

    LinkedBlockingQueue是一个基于链表的阻塞队列。每个添加到LinkedBlockingQueue队列中的数据都将被封装成Node节点,添加的链表队列中,其中head和last分别指向队列的头结点和尾结点。与ArrayBlockingQueue不同的是,LinkedBlockingQueue内部分别使用了takeLock 和 putLock 对并发进行控制(2个reentrantlock)也就是说,添加和删除操作并不是互斥操作,可以同时进行,这样也就可以大大提高吞吐量和ArrayBlockingQueue一样有notFull和notEmpty两个condition。这里再次强调如果没有给LinkedBlockingQueue指定容量大小,其默认值将是Integer.MAX_VALUE,如果存在添加速度大于删除速度时候,有可能会内存溢出,这点在使用前希望慎重考虑。至于LinkedBlockingQueue的实现原理图与ArrayBlockingQueue是类似的,除了对添加和移除方法使用单独的锁控制外,两者都使用了不同的Condition条件对象作为等待队列,用于挂起take线程和put线程。

    添加方法的实现原理

    对于添加方法,主要指的是add,offer以及put,这里先看看add方法和offer方法的实现

    public boolean add(E e) {
         if (offer(e))
             return true;
         else
             throw new IllegalStateException("Queue full");
    }
    View Code

    从源码可以看出,add方法间接调用的是offer方法,如果add方法添加失败将抛出IllegalStateException异常,添加成功则返回true,那么下面我们直接看看offer的相关方法实现

    public boolean offer(E e) {
         //添加元素为null直接抛出异常
         if (e == null) throw new NullPointerException();
          //获取队列的个数
          final AtomicInteger count = this.count;
          //判断队列是否已满
          if (count.get() == capacity)
              return false;
          int c = -1;
          //构建节点
          Node<E> node = new Node<E>(e);
          final ReentrantLock putLock = this.putLock;
          putLock.lock();
          try {
              //再次判断队列是否已满,考虑并发情况
              if (count.get() < capacity) {
                  enqueue(node);//添加元素
                  c = count.getAndIncrement();//拿到当前未添加新元素时的队列长度
                  //如果容量还没满
                  if (c + 1 < capacity)
                      notFull.signal();//唤醒下一个添加线程,执行添加操作
              }
          } finally {
              putLock.unlock();
          }
          // 由于存在添加锁和消费锁,而消费锁和添加锁都会持续唤醒等到线程,因此count肯定会变化。
          //这里的if条件表示如果队列中还有1条数据
          if (c == 0) 
            signalNotEmpty();//如果还存在数据那么就唤醒消费锁
        return c >= 0; // 添加成功返回true,否则返回false
      }
    
    //入队操作
    private void enqueue(Node<E> node) {
         //队列尾节点指向新的node节点
         last = last.next = node;
    }
    
    //signalNotEmpty方法
    private void signalNotEmpty() {
          final ReentrantLock takeLock = this.takeLock;
          takeLock.lock();
              //唤醒获取并删除元素的线程
              notEmpty.signal();
          } finally {
              takeLock.unlock();
          }
      }
    View Code

    这里的Offer()方法做了两件事,第一件事是判断队列是否满,满了就直接释放锁,没满就将节点封装成Node入队,然后再次判断队列添加完成后是否已满,不满就继续唤醒等到在条件对象notFull上的添加线程。第二件事是,判断是否需要唤醒等到在notEmpty条件对象上的消费线程。这里我们可能会有点疑惑,为什么添加完成后是继续唤醒在条件对象notFull上的添加线程而不是像ArrayBlockingQueue那样直接唤醒notEmpty条件对象上的消费线程?而又为什么要当if (c == 0)时才去唤醒消费线程呢?

    • 唤醒添加线程的原因,在添加新元素完成后,会判断队列是否已满,不满就继续唤醒在条件对象notFull上的添加线程,这点与前面分析的ArrayBlockingQueue很不相同,在ArrayBlockingQueue内部完成添加操作后,会直接唤醒消费线程对元素进行获取,这是因为ArrayBlockingQueue只用了一个ReenterLock同时对添加线程和消费线程进行控制,这样如果在添加完成后再次唤醒添加线程的话,消费线程可能永远无法执行,而对于LinkedBlockingQueue来说就不一样了其内部对添加线程和消费线程分别使用了各自的ReenterLock锁对并发进行控制,也就是说添加线程和消费线程是不会互斥的,所以添加锁只要管好自己的添加线程即可,添加线程自己直接唤醒自己的其他添加线程,如果没有等待的添加线程,直接结束了。如果有就直到队列元素已满才结束挂起,当然offer方法并不会挂起,而是直接结束,只有put方法才会当队列满时才执行挂起操作。注意消费线程的执行过程也是如此。这也是为什么LinkedBlockingQueue的吞吐量要相对大些的原因。

    • 为什么要判断if (c == 0)时才去唤醒消费线程呢,这是因为消费线程一旦被唤醒是一直在消费的(前提是有数据),所以c值是一直在变化的,c值是添加完元素前队列的大小,此时c只可能是0或c>0,如果是c=0,那么说明之前消费线程已停止,条件对象上可能存在等待的消费线程,添加完数据后应该是c+1,那么有数据就直接唤醒等待消费线程,如果没有就结束啦,等待下一次的消费操作。如果c>0那么消费线程就不会被唤醒,只能等待下一个消费操作(poll、take、remove)的调用,那为什么不是条件c>0才去唤醒呢?我们要明白的是消费线程一旦被唤醒会和添加线程一样,一直不断唤醒其他消费线程,如果添加前c>0,那么很可能上一次调用的消费线程后,数据并没有被消费完,条件队列上也就不存在等待的消费线程了,所以c>0唤醒消费线程得意义不是很大,当然如果添加线程一直添加元素,那么一直c>0,消费线程执行的换就要等待下一次调用消费操作了(poll、take、remove)。

    LinkedBlockingQueue和ArrayBlockingQueue迥异

    通过上述的分析,对于LinkedBlockingQueue和ArrayBlockingQueue的基本使用以及内部实现原理我们已较为熟悉了,这里我们就对它们两间的区别来个小结

    1.队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题

    2.数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表

    3.由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。

    4.两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

    https://blog.csdn.net/javazejian/article/details/77410889

  • 相关阅读:
    python中如何将两个list合并成一个list,不用for语句
    python print的用法
    Pandas Timestamp 和 python 中 datetime 的互相转换
    python eval, exec. compile
    python 用 __all__ 暴露接口
    python functiontools 模块
    Python 修饰符, 装饰符
    Python 字典(Dictionary) update()方法
    Python :random 随机数生成
    Pandas dataframe 标记删除重复记录
  • 原文地址:https://www.cnblogs.com/twoheads/p/9629092.html
Copyright © 2011-2022 走看看