zoukankan      html  css  js  c++  java
  • 并发编程学习笔记(二十一、ArrayBlockingQueue、LinkedBlockingQueue源码分析)

    目录:

    • 什么是ArrayBlockingQueue
    • 为什么要有ArrayBlockingQueue
    • 如何使用ArrayBlockingQueue
    • ArrayBlockingQueue源码分析
    • LinkedBlockingQueue源码分析

    什么是ArrayBlockingQueue

    首先在说明ArrayBlockingQueue前,我们需要只要ArrayBlockingQueue是实现与BlockingQueue,而BlockingQueue是一个阻塞队列

    也就是说ArrayBlockingQueue其实也是一个阻塞队列,只不过是众多阻塞队列中的一种实现。

    你可以简单的查阅一下源码就可以知道,ArrayBlockingQueue是一个通过数组实现线程安全有界阻塞队列

    • 线程安全:内部通过互斥锁保护了资源的竞争,实现了多线程的资源互斥访问。
    • 有界:指对应的数组是有界限的。
    • 阻塞队列:指多个线程竞争资源时,当某个资源已被其它线程获取到,其它线程需要阻塞等待。

    为什么要有ArrayBlockingQueue

    其实不管是ArrayBlockingQueue还是BlockingQueue的其它实现,如DelayQueue、LinkedBlockingQueue等都是用于应对与多线程并发场景

    如何使用ArrayBlockingQueue

    ArrayBlockingQueue的使用很简单,就和数组一样,但你要熟悉下它的api,其实也就是BlockingQueue定义的一些方法。

     1 public interface BlockingQueue<E> extends Queue<E> {
     2     /**
     3      * 将给定元素添加到队列,成功返回true,否则false。基于offer实现,添加失败,也就是队列满了则抛出异常。
     4      * 如果是往“限定长度”的队列中添加,推荐使用offer()方法。
     5      */
     6     boolean add(E e);
     7 
     8     /**
     9      * 将给定元素添加到队列,成功返回true,否则false。
    10      * e为空则抛出空指针异常。
    11      */
    12     boolean offer(E e);
    13 
    14     /**
    15      * 将给定元素添加到队列,若队列中无多余空间,则会一直阻塞,知道有对于空间。
    16      */
    17     void put(E e) throws InterruptedException;
    18 
    19     /**
    20      * 将给定元素在规定时间内添加到队列,成功返回true,否则false。
    21      */
    22     boolean offer(E e, long timeout, TimeUnit unit)
    23         throws InterruptedException;
    24 
    25     /**
    26      * 从队列中获取值,若队列中无值则会一直阻塞,知道有值才会返回。
    27      */
    28     E take() throws InterruptedException;
    29 
    30     /**
    31      * 在给定时间内从队列中取值,时间到了后还未取到则调用普通的poll方法,为null则就返回null。
    32      */
    33     E poll(long timeout, TimeUnit unit)
    34         throws InterruptedException;
    35 
    36     /**
    37      * 获取队列中剩余的空间。
    38      */
    39     int remainingCapacity();
    40 
    41     /**
    42      * 从队列中移除指定的值。
    43      */
    44     boolean remove(Object o);
    45 
    46     /**
    47      * 判断队列中是否存在该值。
    48      */
    49     public boolean contains(Object o);
    50 
    51     /**
    52      * 将队列中的值全部移除,并设置到指定的集合中。
    53      */
    54     int drainTo(Collection<? super E> c);
    55 
    56     /**
    57      * 从队列中移除maxElements数量,并设置到指定的集合中。
    58      */
    59     int drainTo(Collection<? super E> c, int maxElements);
    60 }

    ArrayBlockingQueue源码分析

    1、声明:

    1 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
    2         implements BlockingQueue<E>, java.io.Serializable {
    3 }

    没啥好说的,继承自AbstractQueue,实现与BlockingQueue。

    2、属性:

     1 /**
     2  * Serialization ID.
     3  */
     4 private static final long serialVersionUID = -817911632652898426L;
     5 
     6 /** 存储数据的数组 */
     7 final Object[] items;
     8 
     9 /** take, poll, peek or remove的下一个索引 */
    10 int takeIndex;
    11 
    12 /** put, offer, or add的下一个索引 */
    13 int putIndex;
    14 
    15 /** 队列中元素的个数 */
    16 int count;
    17 
    18 /** 重入锁 */
    19 final ReentrantLock lock;
    20 
    21 /** 队列不为空的条件 */
    22 private final Condition notEmpty;
    23 
    24 /** 队列未满的条件 */
    25 private final Condition notFull;
    26 
    27 /**
    28  * 当前活动迭代器的共享状态
    29  */
    30 transient Itrs itrs = null;

    通过属性我们就可以看出ArrayBlockingQueue的底层是通过数据来实现的,并且还使用了ReentrantLock控制资源的访问,以及两个Condition。

    • 数组的容量是在构造函器中指定的,后面会说到。
    • Condition notEmpty,某线程A获取数据时发现队列还无数据,就会执行notEmpty.await()进行等待;直到另一个线程B插入了一条数据后,便会调用notEmpty.signal()唤醒notEmpty上等待的线程A。
    • Condition notFull,某线程C插入数据的时候,发现数组已满,则执行notFull.await()进行等待;直到某个线程D取出数据后,便会调用notFull.signal()唤醒notFull上等待的线程C。

    3、构造器:

     1 /**
     2  * 创建指定容量的ArrayBlockingQueue对象。
     3  */
     4 public ArrayBlockingQueue(int capacity) {
     5     // 默认采用非公平锁策略
     6     this(capacity, false);
     7 }
     8 
     9 /**
    10  * 创建指定容量的ArrayBlockingQueue对象,可指定锁的策略(公平或非公平)。
    11  */
    12 public ArrayBlockingQueue(int capacity, boolean fair) {
    13     if (capacity <= 0)
    14         throw new IllegalArgumentException();
    15     this.items = new Object[capacity];
    16     lock = new ReentrantLock(fair);
    17     notEmpty = lock.newCondition();
    18     notFull =  lock.newCondition();
    19 }
    20 
    21 /**
    22  * 创建指定容量的ArrayBlockingQueue对象,可指定锁的策略(公平或非公平),并且可指定数组初始化数据。
    23  */
    24 public ArrayBlockingQueue(int capacity, boolean fair,
    25                           Collection<? extends E> c) {
    26     this(capacity, fair);
    27 
    28     final ReentrantLock lock = this.lock;
    29     lock.lock(); // Lock only for visibility, not mutual exclusion
    30     try {
    31         int i = 0;
    32         try {
    33             for (E e : c) {
    34                 checkNotNull(e);
    35                 items[i++] = e;
    36             }
    37         } catch (ArrayIndexOutOfBoundsException ex) {
    38             throw new IllegalArgumentException();
    39         }
    40         count = i;
    41         putIndex = (i == capacity) ? 0 : i;
    42     } finally {
    43         lock.unlock();
    44     }
    45 }

    4、函数:

    看下实现于BlockingQueue的函数即可,因为不是很复杂,我觉得不用再进一步的说明了。

    LinkedBlockingQueue源码分析

    与ArrayBlockingQueue类似,只不过基于链表实现,不再赘述。

  • 相关阅读:
    远程调用之RMI、Hessian、Burlap、Httpinvoker、WebService的比较
    遍历List/Map的时候删除成员遇到的奇怪问题
    Java事务处理
    ThreadLocal学习记录
    IntelliJ IDEA+Tomcat+Nginx运行git项目
    JavaIO和JavaNIO
    Spring MVC的启动过程
    Java中的集合类
    Java中的泛型
    Java 多线程的基本概念
  • 原文地址:https://www.cnblogs.com/bzfsdr/p/13205043.html
Copyright © 2011-2022 走看看