简介
一个基于链表的阻塞队列,FIFO的顺序,head指向的元素等待时间最长,tail指向的元素等待时间最短,新元素从队列尾部添加,检索元素从队列头部开始,队列的容量,默认是Integer#MAX_VALUE。
源码分析
内部类Node
1 static class Node<E> { 2 E item; // 结点的值 3 4 Node<E> next; // 指向下一个结点 5 6 Node(E x) { // 构造方法 7 item = x; 8 } 9 }
属性
1 private final int capacity; // 队列的容量,大小 2 3 private final AtomicInteger count = new AtomicInteger(); // 当前队列里元素的个数 4 5 transient Node<E> head; // 头结点,head.item = null 6 7 private transient Node<E> last; // 尾结点,last.next = null 8 9 private final ReentrantLock takeLock = new ReentrantLock(); // 可重入锁,take元素时,需持有该锁 10 11 private final Condition notEmpty = takeLock.newCondition(); // take锁上的条件,队列空时等待,不空时通知 12 13 private final ReentrantLock putLock = new ReentrantLock(); // 可重入锁,put元素时,需持有该锁 14 15 private final Condition notFull = putLock.newCondition(); // put锁上的条件,队列满时等待,不满时通知
通知方法
1 private void signalNotEmpty() { // 通知在take锁上等待的线程 2 final ReentrantLock takeLock = this.takeLock; 3 takeLock.lock(); // 加锁 4 try { 5 notEmpty.signal(); // 通知 6 } finally { 7 takeLock.unlock(); // 释放 8 } 9 } 10 11 private void signalNotFull() { // 通知在put锁上等待的线程 12 final ReentrantLock putLock = this.putLock; 13 putLock.lock(); // 加锁 14 try { 15 notFull.signal(); // 通知 16 } finally { 17 putLock.unlock(); // 释放 18 } 19 }
元素入队
1 private void enqueue(Node<E> node) { // 队尾入队 2 last = last.next = node; // last的next域指向新结点,last后移(指向新加入的结点) 3 }
元素出队
1 private E dequeue() { // 队首出队 2 Node<E> h = head; // 获得头结点 3 Node<E> first = h.next; // 活动第一个有效(item != null)结点(head结点的next结点) 4 h.next = h; // next域指向自己,帮助GC 5 head = first; // head后移 6 E x = first.item; // 取得结点值 7 first.item = null; // 置空 8 return x; // 返回 9 }
加锁与释放
1 void fullyLock() { // 加锁 2 putLock.lock(); 3 takeLock.lock(); 4 } 5 6 void fullyUnlock() { // 释放 7 takeLock.unlock(); 8 putLock.unlock(); 9 }
构造方法
1 public LinkedBlockingQueue() { // 构造方法 2 this(Integer.MAX_VALUE); 3 } 4 5 public LinkedBlockingQueue(int capacity) { // 构造方法 6 if (capacity <= 0) 7 throw new IllegalArgumentException(); 8 this.capacity = capacity; 9 last = head = new Node<E>(null); // 初始时,last和head指向一个DUMMY结点 10 } 11 12 public LinkedBlockingQueue(Collection<? extends E> c) { 13 this(Integer.MAX_VALUE); 14 final ReentrantLock putLock = this.putLock; 15 putLock.lock(); // 加锁,可见性 16 try { 17 int n = 0; 18 for (E e : c) { 19 if (e == null) 20 throw new NullPointerException(); // 空指针 21 if (n == capacity) 22 throw new IllegalStateException("Queue full"); // 越界 23 enqueue(new Node<E>(e)); // 元素入队 24 ++n; // 递增 25 } 26 count.set(n); // 设置当前队列里元素的个数 27 } finally { 28 putLock.unlock(); // 解锁 29 } 30 }
添加元素
put(E e)
1 public void put(E e) throws InterruptedException { // 添加元素 2 if (e == null) 3 throw new NullPointerException(); // 空指针 4 int c = -1; 5 Node<E> node = new Node<E>(e); // 创建新结点 6 final ReentrantLock putLock = this.putLock; // 获得put锁 7 final AtomicInteger count = this.count; // 获得当前元素的个数 8 putLock.lockInterruptibly(); // 加锁,响应中断 9 try { 10 while (count.get() == capacity) { // 队列满了 11 notFull.await(); // 要等一等 12 } 13 enqueue(node); // 入队 14 c = count.getAndIncrement(); // 获取队列的容量 15 if (c + 1 < capacity) // 不满,唤醒等待的线程 16 notFull.signal(); // 通知 17 } finally { 18 putLock.unlock(); // 解锁 19 } 20 if (c == 0) // 队列非空(c初始值为-1) 21 signalNotEmpty(); 22 }
offer(E e, long timeout, TimeUnit unit)
1 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { 2 3 if (e == null) 4 throw new NullPointerException(); // 空指针 5 long nanos = unit.toNanos(timeout); 6 int c = -1; 7 final ReentrantLock putLock = this.putLock; // 获得put锁 8 final AtomicInteger count = this.count; // 获得当前元素的个数 9 putLock.lockInterruptibly(); // 加锁,响应中断 10 try { 11 while (count.get() == capacity) { // 队列满了 12 if (nanos <= 0) // 超时,返回 13 return false; 14 nanos = notFull.awaitNanos(nanos); // 等待响应的时间 15 } 16 enqueue(new Node<E>(e)); // 入队 17 c = count.getAndIncrement(); // 获取队列的容量 18 if (c + 1 < capacity) // 不满,唤醒等待的线程 19 notFull.signal(); // 通知 20 } finally { 21 putLock.unlock(); // 解锁 22 } 23 if (c == 0) 24 signalNotEmpty(); // 队列非空(c初始值为-1) 25 return true; 26 }
offer(E e)
1 public boolean offer(E e) { 2 if (e == null) 3 throw new NullPointerException(); // 空指针 4 final AtomicInteger count = this.count; // 获得当前元素的个数 5 if (count.get() == capacity) // 队列满了,直接返回失败 6 return false; 7 int c = -1; 8 Node<E> node = new Node<E>(e); // 新建结点 9 final ReentrantLock putLock = this.putLock; // 获得put锁 10 putLock.lock(); // 加锁 11 try { 12 if (count.get() < capacity) { // 不满 13 enqueue(node); // 入队 14 c = count.getAndIncrement(); // 加1 15 if (c + 1 < capacity) // 不满,通知 16 notFull.signal(); 17 } 18 } finally { 19 putLock.unlock(); // 解锁 20 } 21 if (c == 0) 22 signalNotEmpty(); // 不空,通知 23 return c >= 0; 24 }
获取元素
take()
1 public E take() throws InterruptedException { 2 E x; 3 int c = -1; 4 final AtomicInteger count = this.count; // 当前队列元素个数 5 final ReentrantLock takeLock = this.takeLock; // 获取take锁 6 takeLock.lockInterruptibly(); // 加锁,响应中断 7 try { 8 while (count.get() == 0) { // 队列空了 9 notEmpty.await(); // 等待 10 } 11 x = dequeue(); // 出队 12 c = count.getAndDecrement(); // 减1 13 if (c > 1) // 不空 14 notEmpty.signal(); // 通知 15 } finally { 16 takeLock.unlock(); // 解锁 17 } 18 if (c == capacity) // 获取元素之前,队列是满的,有线程在put元素时阻塞,当前线程take一个元素后,空出一个位置 19 signalNotFull(); // 通知 20 return x; 21 }
poll(long timeout, TimeUnit unit)
1 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 2 E x = null; 3 int c = -1; 4 long nanos = unit.toNanos(timeout); // 计算等待时间 5 final AtomicInteger count = this.count; // 当前队列元素个数 6 final ReentrantLock takeLock = this.takeLock; // 获得take锁 7 takeLock.lockInterruptibly(); // 加锁,响应中断 8 try { 9 while (count.get() == 0) { // 队列空了 10 if (nanos <= 0) // 超时 11 return null; 12 nanos = notEmpty.awaitNanos(nanos); // 等待指定时间 13 } 14 x = dequeue(); // 出队 15 c = count.getAndDecrement(); // 个数减1 16 if (c > 1) // 非空 17 notEmpty.signal(); // 通知 18 } finally { 19 takeLock.unlock(); 20 } 21 if (c == capacity) // 同take()方法 22 signalNotFull(); 23 return x; 24 }
poll()
1 public E poll() { 2 final AtomicInteger count = this.count; // 当前队列元素个数 3 if (count.get() == 0) // 队列空了,直接返回 4 return null; 5 E x = null; 6 int c = -1; 7 final ReentrantLock takeLock = this.takeLock; // 获得take锁 8 takeLock.lock(); // 加锁 9 try { 10 if (count.get() > 0) { // 非空 11 x = dequeue(); // 元素出队 12 c = count.getAndDecrement(); // 个数减1 13 if (c > 1) // 非空,通知 14 notEmpty.signal(); 15 } 16 } finally { 17 takeLock.unlock(); // 解锁 18 } 19 if (c == capacity) 20 signalNotFull(); // 同take()方法 21 return x; 22 }
peek()
1 public E peek() { // 只获取元素,不出队 2 if (count.get() == 0) // 队列为空,直接返回null 3 return null; 4 final ReentrantLock takeLock = this.takeLock; // 获得take锁 5 takeLock.lock(); // 解锁 6 try { 7 Node<E> first = head.next; // 取得第一个有效元素 8 if (first == null) // 为空,直接返回null 9 return null; 10 else 11 return first.item; // 返回结果 12 } finally { 13 takeLock.unlock(); // 解锁 14 } 15 }
剔除结点p
1 void unlink(Node<E> p, Node<E> trail) { // 剔除结点p 2 p.item = null; // 置空 3 trail.next = p.next; // 断开p, 连接p的next结点 4 if (last == p) // 如果p是尾结点,last指针前移 5 last = trail; 6 if (count.getAndDecrement() == capacity) // 同take()方法 7 notFull.signal(); 8 }
删除元素
1 public boolean remove(Object o) { // 删除元素o 2 if (o == null) 3 return false; 4 fullyLock(); // 加锁 5 try { // 从头结点开始遍历,找寻o元素所在的结点,并从中剔除它 6 for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { 7 if (o.equals(p.item)) { 8 unlink(p, trail); // 剔除 9 return true; 10 } 11 } 12 return false; 13 } finally { 14 fullyUnlock(); // 解锁 15 } 16 }
迁徙
1 public int drainTo(Collection<? super E> c, int maxElements) { // 将当前队列里的元素移动到c中,并从当前队列里清除这些元素 2 if (c == null) 3 throw new NullPointerException(); // 空指针 4 if (c == this) 5 throw new IllegalArgumentException(); // 不合法参数 6 if (maxElements <= 0) // 参数校验 7 return 0; 8 boolean signalNotFull = false; 9 final ReentrantLock takeLock = this.takeLock; // 获得take锁 10 takeLock.lock(); // 加锁 11 try { 12 int n = Math.min(maxElements, count.get()); // 取其中较小值 13 Node<E> h = head; // 头结点 14 int i = 0; // 初始值 15 try { 16 while (i < n) { 17 Node<E> p = h.next; // 取得元素 18 c.add(p.item); // 添加到集合c中 19 p.item = null; // 置空 20 h.next = h; // 结点next域指向自己,帮助GC 21 h = p; // 元素出队 22 ++i; // 自增 23 } 24 return n; // 返回 25 } finally { 26 if (i > 0) { 27 head = h; // 更新头节点 28 signalNotFull = (count.getAndAdd(-i) == capacity); // 需要通知 29 } 30 } 31 } finally { 32 takeLock.unlock(); // 解锁 33 if (signalNotFull) 34 signalNotFull(); // 通知 35 } 36 }
行文至此结束。
尊重他人的劳动,转载请注明出处:http://www.cnblogs.com/aniao/p/aniao_lbq.html