目录:
- 什么是ArrayBlockingQueue
- 为什么要有ArrayBlockingQueue
- 如何使用ArrayBlockingQueue
- ArrayBlockingQueue源码分析
- LinkedBlockingQueue源码分析
什么是ArrayBlockingQueue
首先在说明ArrayBlockingQueue前,我们需要只要ArrayBlockingQueue是实现与BlockingQueue的,而BlockingQueue是一个阻塞队列。
也就是说ArrayBlockingQueue其实也是一个阻塞队列,只不过是众多阻塞队列中的一种实现。
你可以简单的查阅一下源码就可以知道,ArrayBlockingQueue是一个通过数组实现的线程安全的有界的阻塞队列。
- 线程安全:内部通过互斥锁保护了资源的竞争,实现了多线程的资源互斥访问。
- 有界:指对应的数组是有界限的。
- 阻塞队列:指多个线程竞争资源时,当某个资源已被其它线程获取到,其它线程需要阻塞等待。
为什么要有ArrayBlockingQueue
其实不管是ArrayBlockingQueue还是BlockingQueue的其它实现,如DelayQueue、LinkedBlockingQueue等都是用于应对与多线程并发场景。
如何使用ArrayBlockingQueue
ArrayBlockingQueue的使用很简单,就和数组一样,但你要熟悉下它的api,其实也就是BlockingQueue定义的一些方法。
1 public interface BlockingQueue<E> extends Queue<E> { 2 /** 3 * 将给定元素添加到队列,成功返回true,否则false。基于offer实现,添加失败,也就是队列满了则抛出异常。 4 * 如果是往“限定长度”的队列中添加,推荐使用offer()方法。 5 */ 6 boolean add(E e); 7 8 /** 9 * 将给定元素添加到队列,成功返回true,否则false。 10 * e为空则抛出空指针异常。 11 */ 12 boolean offer(E e); 13 14 /** 15 * 将给定元素添加到队列,若队列中无多余空间,则会一直阻塞,知道有对于空间。 16 */ 17 void put(E e) throws InterruptedException; 18 19 /** 20 * 将给定元素在规定时间内添加到队列,成功返回true,否则false。 21 */ 22 boolean offer(E e, long timeout, TimeUnit unit) 23 throws InterruptedException; 24 25 /** 26 * 从队列中获取值,若队列中无值则会一直阻塞,知道有值才会返回。 27 */ 28 E take() throws InterruptedException; 29 30 /** 31 * 在给定时间内从队列中取值,时间到了后还未取到则调用普通的poll方法,为null则就返回null。 32 */ 33 E poll(long timeout, TimeUnit unit) 34 throws InterruptedException; 35 36 /** 37 * 获取队列中剩余的空间。 38 */ 39 int remainingCapacity(); 40 41 /** 42 * 从队列中移除指定的值。 43 */ 44 boolean remove(Object o); 45 46 /** 47 * 判断队列中是否存在该值。 48 */ 49 public boolean contains(Object o); 50 51 /** 52 * 将队列中的值全部移除,并设置到指定的集合中。 53 */ 54 int drainTo(Collection<? super E> c); 55 56 /** 57 * 从队列中移除maxElements数量,并设置到指定的集合中。 58 */ 59 int drainTo(Collection<? super E> c, int maxElements); 60 }
ArrayBlockingQueue源码分析
1、声明:
1 public class ArrayBlockingQueue<E> extends AbstractQueue<E> 2 implements BlockingQueue<E>, java.io.Serializable { 3 }
没啥好说的,继承自AbstractQueue,实现与BlockingQueue。
2、属性:
1 /** 2 * Serialization ID. 3 */ 4 private static final long serialVersionUID = -817911632652898426L; 5 6 /** 存储数据的数组 */ 7 final Object[] items; 8 9 /** take, poll, peek or remove的下一个索引 */ 10 int takeIndex; 11 12 /** put, offer, or add的下一个索引 */ 13 int putIndex; 14 15 /** 队列中元素的个数 */ 16 int count; 17 18 /** 重入锁 */ 19 final ReentrantLock lock; 20 21 /** 队列不为空的条件 */ 22 private final Condition notEmpty; 23 24 /** 队列未满的条件 */ 25 private final Condition notFull; 26 27 /** 28 * 当前活动迭代器的共享状态 29 */ 30 transient Itrs itrs = null;
通过属性我们就可以看出ArrayBlockingQueue的底层是通过数据来实现的,并且还使用了ReentrantLock控制资源的访问,以及两个Condition。
- 数组的容量是在构造函器中指定的,后面会说到。
- Condition notEmpty,某线程A获取数据时发现队列还无数据,就会执行notEmpty.await()进行等待;直到另一个线程B插入了一条数据后,便会调用notEmpty.signal()唤醒notEmpty上等待的线程A。
- Condition notFull,某线程C插入数据的时候,发现数组已满,则执行notFull.await()进行等待;直到某个线程D取出数据后,便会调用notFull.signal()唤醒notFull上等待的线程C。
3、构造器:
1 /** 2 * 创建指定容量的ArrayBlockingQueue对象。 3 */ 4 public ArrayBlockingQueue(int capacity) { 5 // 默认采用非公平锁策略 6 this(capacity, false); 7 } 8 9 /** 10 * 创建指定容量的ArrayBlockingQueue对象,可指定锁的策略(公平或非公平)。 11 */ 12 public ArrayBlockingQueue(int capacity, boolean fair) { 13 if (capacity <= 0) 14 throw new IllegalArgumentException(); 15 this.items = new Object[capacity]; 16 lock = new ReentrantLock(fair); 17 notEmpty = lock.newCondition(); 18 notFull = lock.newCondition(); 19 } 20 21 /** 22 * 创建指定容量的ArrayBlockingQueue对象,可指定锁的策略(公平或非公平),并且可指定数组初始化数据。 23 */ 24 public ArrayBlockingQueue(int capacity, boolean fair, 25 Collection<? extends E> c) { 26 this(capacity, fair); 27 28 final ReentrantLock lock = this.lock; 29 lock.lock(); // Lock only for visibility, not mutual exclusion 30 try { 31 int i = 0; 32 try { 33 for (E e : c) { 34 checkNotNull(e); 35 items[i++] = e; 36 } 37 } catch (ArrayIndexOutOfBoundsException ex) { 38 throw new IllegalArgumentException(); 39 } 40 count = i; 41 putIndex = (i == capacity) ? 0 : i; 42 } finally { 43 lock.unlock(); 44 } 45 }
4、函数:
看下实现于BlockingQueue的函数即可,因为不是很复杂,我觉得不用再进一步的说明了。
LinkedBlockingQueue源码分析
与ArrayBlockingQueue类似,只不过基于链表实现,不再赘述。