BlockingQueue也是JUC的重要知识,而且包含的BlockingQueue的种类还比较多,本篇先分析ArrayBlockingQueue ,主要分为三部分
1 创建
2 放数据
3 取数据
一 创建
public ArrayBlockingQueue(int capacity) { this(capacity, false); }
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
BlockingQueue和HashMap可不一样啊,必须指定好初始值的大小,且可不能扩容
可以看内部,其实就是通过重入锁和Condition实现的
看看还有哪些重要的属性,这样的数据结构不难理解,底层存储是一个数组,要从数据取值就要有下标。而BlockingQueue是支持并发的,所以必须得有重入锁进行同步
/** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; /** * Shared state for currently active iterators, or null if there * are known not to be any. Allows queue operations to update * iterator state. */ transient Itrs itrs = null;
二 放数据
操作 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
检查 | element() | peek() | - | - |
offer的特殊值表示操作成功返回true,失败了返回false
poll也是的
我们首先分析三个放数据的方法,按照offer put add的顺序,因为add就我看源码的经验看用的真心不多
1 offer
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0;//到头了就返回为0 count++; notEmpty.signal(); }
2 put
public void put(E e) throws InterruptedException {//具有抛中断异常的能力 checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await();//用condition的await方法阻塞线程 enqueue(e); } finally { lock.unlock(); } }
3 add
add方法定义在 AbstracQueue里面
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full");//抛异常的逻辑 }
三 取数据
其实取数据的逻辑相比放数据要复杂好多,还是按照 poll take remove的方式分析
1 poll
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex];//纯数组操作 items[takeIndex] = null;//原位置放null if (++takeIndex == items.length)//到头了就返回0 takeIndex = 0; count--;//数量减一 if (itrs != null) itrs.elementDequeued(); notFull.signal();//唤醒Condition中的等待线程 return x; }
2 take()
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await();//靠的就是这个阻塞当前线程 return dequeue(); } finally { lock.unlock(); } }
3 remove
定义在 AbstracQueue里面,里面调用poll,如果返回值是null,直接抛异常
public E remove() { E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); }
四 查看数据
查看数据有两个方法 element和peek,先看peek
1 peek
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // null when queue is empty,就是从数组拿值 } finally { lock.unlock(); } }
2 element
element定义在AbstracQueue
public E element() { E x = peek(); if (x != null) return x; else throw new NoSuchElementException(); }
五 总结
从上面的源码看,凡是定义在AbstractQueue里的都抛异常
阻塞都是通过conditioin的await方法