抛出异常 | 特殊值 | 阻塞 | 超时 | |
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
检查 | element() | peek() | 不可用 | 不可用 |
【1】BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个remainingCapacity,超出此容量,便无法无阻塞地put 附加元素。没有任何内部容量约束的BlockingQueue 总是报告Integer.MAX_VALUE 的剩余容量。
【2】BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持Collection
接口。因此,举例来说,使用remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操作通常不 会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。
【3】BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操作(addAll、containsAll、retainAll 和removeAll)没有 必要自动执行,除非在实现中特别说明。因此,举例来说,在只添加了c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock;//每个对象对应一个显示的锁 lock.lock();//请求锁直到获得锁(不可以被interrupte) try { if (count == items.length)//如果队列已经满了 return false; else { insert(e); return true; } } finally { lock.unlock();// } } 看insert方法: private void insert(E x) { items[putIndex] = x; //增加全局index的值。 /* Inc方法体内部: final int inc(int i) { return (++i == items.length)? 0 : i; } 这里可以看出ArrayBlockingQueue采用从前到后向内部数组插入的方式插入新元素的。如果插完了,putIndex可能重新变为0(在已经执行了移除操作的前提下,否则在之前的判断中队列为满) */ putIndex = inc(putIndex); ++count; notEmpty.signal();//wake up one waiting thread }
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly();//请求锁直到得到锁或者变为interrupted try { try { while (count == items.length)//如果满了,当前线程进入noFull对应的等waiting状态 notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(e); } finally { lock.unlock(); } }
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { if (count != items.length) { insert(e); return true; } if (nanos <= 0) return false; try { //如果没有被 signal/interruptes,需要等待nanos时间才返回 nanos = notFull.awaitNanos(nanos); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } } } finally { lock.unlock(); } }
public boolean add(E e) { return super.add(e); } 父类: public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
用三个数字来维护这个队列中的数据变更: /** items index for next take, poll or remove */ private int takeIndex; /** items index for next put, offer, or add. */ private int putIndex; /** Number of items in the queue */ private int count;