大纲
- BlockingQueue接口
- ArrayBlockingQueue
一、BlockingQueue接口
public interface BlockingQueue<E> extends Queue<E> { //add、offer向队列插值,返回插入是否成功 boolean add(E e); boolean offer(E e); //向队列插值,队列满则阻塞至队列非满 void put(E e) throws InterruptedException; //offer方法加上超时,超时返回false boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //从队列拿值,队列空则阻塞至队列非空 E take() throws InterruptedException; //向队列拿值,返回是否成功,超时返回false E poll(long timeout, TimeUnit unit) throws InterruptedException; //队列剩余空间 int remainingCapacity(); //删除 boolean remove(Object o); //是否包含 boolean contains(Object o); //移除队列中的值到集合中 int drainTo(Collection<? super E> c); //maxElements移除最大个数 int drainTo(Collection<? super E> c, int maxElements); }
二、ArrayBlockingQueue
主要成员变量
/** 队列 */ final Object[] items; /** 拿值的索引,用于 take, poll, peek, remove 方法*/ int takeIndex; /** 存值的索引, 用于 next put, offer, or, add 方法*/ int putIndex; /** 队列中元素数量 */ int count; /** 锁,两个condition都是由这个锁创建 */ final ReentrantLock lock; /** 可以拿值的Condition(不空可拿) */ private final Condition notEmpty; /** 可以存值的Condition(不满可存) */ private final Condition notFull;
构造函数
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { //初始化锁与condition,默认非公平所 if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); //将集合中元素拷贝至队列 final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } //队列数量等于集合数量 count = i; //存索引赋值:队列满赋0,不满赋集合数量 putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
主要方法
取:
//非阻塞 public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } //阻塞 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) //队列大小为0拿值线程阻塞 notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } //出队 private E dequeue() { final Object[] items = this.items; E x = (E) items[takeIndex]; items[takeIndex] = null; //对应putindex if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); //唤醒存值线程 notFull.signal(); return x; }
存:
//入队 private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } //非阻塞 public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } } //阻塞 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //队列满则阻塞 while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
阻塞队列阻塞的存取值方法通过2个condition的await和signal方法完成:当队列空时notEmpty.await取值阻塞,当队列满时notFull.await存值阻塞;入队操作完成时调用notEmpty.signal唤醒取值线程,出队才做完成时唤醒存值线程。