public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { //如果队列满了的话,就阻塞 notFull.await(); } enqueue(node); //将节点入队 c = count.getAndIncrement(); //getAndIncrement 是获得旧值之后再加1 if (c + 1 < capacity) //再次检查队列,如果没满就换醒工作线程 notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); //工作线程不为空的话就会唤醒消费线程 }
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { //如果队列为空的话,就阻塞消费线程 notEmpty.await(); } x = dequeue(); //从队列中取走第一个节点 c = count.getAndDecrement(); if (c > 1) //再次判断如果队列不为空的话,就唤醒消费线程 notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) //判断如果队列没满的话,就唤醒生产线程 signalNotFull(); return x; }
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) //offer方法和put方法的区别就是在队列满了的时候put会阻塞生产线程,offer就直接return false 入队失败; return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) //poll方法与take方法的区别就是如果队列为空的话,poll直接返回null,take会阻塞消费线程 return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; //peek方法与take方法的区别就是peek方法不会删除节点,只会返回第一个节点 takeLock.lock(); try { Node<E> first =; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }
public ArrayBlockingQueue(int capacity) { this(capacity, false); }
ublic ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); //ArrayBlcokingQueue是有界的基于数组实现的队列 this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) //如果队列已满的话,就阻塞生产线程 notFull.await(); enqueue(e); } finally { lock.unlock(); } }
public boolean add(E e) { return super.add(e); }
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); //如果队列满了,入队失败的话,会抛出异常 }
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) //如果队列满了的话,就返回false, return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); //如果队列为空的话,就返回null } finally { lock.unlock(); } }
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // null when queue is empty //只会返回第一个元素,不会删除元素 } finally { lock.unlock(); } }