前言
前面介绍完了队列Queue/BlockingQueue的实现类,接下来介绍双端队列Deque/BlockingDeque的实现类之一LinkedBlockingDeque,它是一种基于链表的可选容量的同时支持FIFO、LIFO的阻塞双端队列。
比起前面介绍的队列实现中的节点都有一个指向下一个节点的next字段引用,双端队列为了能够实现能够从尾部添加元素,在队列节点类中不但有指向下一个节点的next字段,而且还有一个指向前一个前驱节点的字段prev。就是因为双端队列的节点同时持有指向下一个节点(后继)的next字段和指向前一个节点(前驱)的prev字段,所以才能实现双端队列特有的可以从对头、对尾两端插入和移除元素的双向特性。
LinkedBlockingDeque在实现的时候采用了单个非公平的可重入锁ReentrantLock保证队列的线程安全,通过两个Condition(notEmpty,notFull)实现阻塞唤醒机制,所以它的逻辑其实非常简单。LinkedBlockingDeque不但使用单个可重入锁来保证线程安全,而且几乎100%的方法都加了该锁(包括迭代器,toString、size()方法),所以它的性能可想而知有多么差,尤其是在竞争激烈的时候,它在双端队列中的地位就如同ArrayBlockingQueue在队列Queue的家族中的地位,所以在多线程环境下LinkedBlockingDeque基本上没有使用的价值,最好使用双端队列的另一个实现ConcurrentLinkedDeque。
LinkedBlockingDeque在对首尾节点进行出队的时候,采用了将数据item置空以及将相应后继或前驱指向自身的方式辅助垃圾回收,而内部节点进行移除的时候,仅仅将数据item置空,没有改变节点内部的前驱和后继节点的引用,这保证了迭代器的弱一致性,避免迭代器异常终止。LinkedBlockingDeque内部有两个字段first、last分别指向对头第一个节点和队尾最后一个节点,它们有如下的不变性:①队列为空时, first,last都为null;②队列不为空时,first,last的数据项item都不为null(所以它不存在虚拟节点);③队列不为空时,头节点first没有前驱节点,尾节点last没有后继节点。
LinkedBlockingDeque的大多数操作在恒定时间内运行完成(不计阻塞消耗的时间)。而remove、removeFirstOccurrence、removeLastOccurrence、contains、iterator.remove()和bulk操作,所有这些操作都在线性时间内运行完成。
源码解读
首先看它的一些基础字段、节点内部类以及构造方法:
1 /** Doubly-linked list node class 双链表节点类 */ 2 static final class Node<E> { 3 /** 4 * 数据项,如果该节点已被删除,则为空。 5 */ 6 E item; 7 8 /** 9 * 前驱为下面其中之一 10 * - 正常的前驱节点 11 * - 当前节点自身(自链接),意味着它的前驱是last(例如当前节点作为last从队尾出队之后) 12 * - 空,意味着没有前驱节点(例如first) 13 */ 14 Node<E> prev; 15 16 /** 17 * 后继为下面其中之一: 18 * - 正常的后继节点 19 * - 当前节点自身(自链接),意味着它的后继节点是first(例如当前节点作为first从队头出队之后) 20 * - 空,意味着没有后继节点(例如last) 21 */ 22 Node<E> next; 23 24 Node(E x) { 25 item = x; 26 } 27 } 28 29 //不变性: 30 //队列为空时, first,last都为null 31 //队列不为空时,first,last的数据项都不为null 32 //队列不为空时,头节点first没有前驱节点,尾节点last没有后继节点 33 /** 34 * Pointer to first node. 头节点 35 * Invariant: (first == null && last == null) || 头尾节点都为空 36 * (first.prev == null && first.item != null) 头节点的数据项不为空但没有前驱 37 */ 38 transient Node<E> first; 39 40 /** 41 * Pointer to last node. 尾节点 42 * Invariant: (first == null && last == null) ||头尾节点都为空 43 * (last.next == null && last.item != null) 尾节点的数据项不为空但没有后继 44 */ 45 transient Node<E> last; 46 47 /** Number of items in the deque 双端队列中数据项的数量*/ 48 private transient int count; 49 50 /** Maximum number of items in the deque 双端队列最大容量*/ 51 private final int capacity; 52 53 /** Main lock guarding all access 保护所有访问的锁*/ 54 final ReentrantLock lock = new ReentrantLock(); 55 56 /** Condition for waiting takes 消费阻塞条件*/ 57 private final Condition notEmpty = lock.newCondition(); 58 59 /** Condition for waiting puts 生产阻塞条件*/ 60 private final Condition notFull = lock.newCondition(); 61 62 //默认容量为Integer.MAX_VALUE 63 public LinkedBlockingDeque() { 64 this(Integer.MAX_VALUE); 65 } 66 //指定容量 67 public LinkedBlockingDeque(int capacity) { 68 if (capacity <= 0) throw new IllegalArgumentException(); 69 this.capacity = capacity; 70 } 71 //创建一个容量为Integer.MAX_VALUE,并且初始状态包含给定集合中的元素的实例 72 public LinkedBlockingDeque(Collection<? extends E> c) { 73 this(Integer.MAX_VALUE); 74 final ReentrantLock lock = this.lock; 75 lock.lock(); // 没有竞争,但是为了可见性而加锁 76 try { 77 for (E e : c) { 78 if (e == null) 79 throw new NullPointerException(); 80 if (!linkLast(new Node<E>(e))) //将节点链接到队尾,如果已满则返回false。 81 throw new IllegalStateException("Deque full"); 82 } 83 } finally { 84 lock.unlock(); 85 } 86 }
可见节点内部类有前驱prev和后继next字段,LinkedBlockingDeque内部维护两个字段first、last分别指向对头和队尾,队列在为空的时候,first和last都为空。LinkedBlockingDeque在不指定容量时大小为Integer.MAX_VALUE。内部使用一把非公平可重入锁保证线程安全。
通过LinkedBlockingDeque的构造方法指定容量可以防止内存的过度扩展,如果没有指定容量大小,默认容量为Integer.MAX_VALUE。每次插入元素时都会动态创建链接节点,除非这会使deque超出容量限制。
入队实现
头部入队
1 /** 2 * 将节点链接为第一个元素,如果已满则返回false。 3 */ 4 private boolean linkFirst(Node<E> node) { 5 // assert lock.isHeldByCurrentThread(); 6 if (count >= capacity) //已满,返回false 7 return false; 8 Node<E> f = first; 9 node.next = f; //旧first置为当前节点的后继节点 10 first = node; //当前节点成为新的first 11 if (last == null) //如果队尾为空,使其指向当前节点 12 last = node; 13 else 14 f.prev = node; //将当前节点置为旧first的前驱 15 ++count; 16 notEmpty.signal(); 17 return true; 18 }
linkFirst是从对头入队新节点的具体逻辑实现(被其它入队方法调用),看起来很简单:队列为空表示是第一个入队节点,则first、last都指向该节点;否则在原来的first节点前面插入该新节点,该新节点成为新的first,成为原first的前驱,原first成为该节点的后继。
尾部入队
1 /** 2 * 链接节点作为最后一个元素,如果已满则返回false。 3 */ 4 private boolean linkLast(Node<E> node) { 5 // assert lock.isHeldByCurrentThread(); 6 if (count >= capacity) 7 return false; //已满,返回false 8 Node<E> l = last; 9 node.prev = l;//旧last置为当前节点的前驱节点 10 last = node; //当前节点成为新的last 11 if (first == null) 12 first = node; //如果队尾为空,使其指向当前节点 13 else 14 l.next = node;//将当前节点置为旧last的后继 15 ++count; 16 notEmpty.signal(); 17 return true; 18 }
linkLast是从队尾入队新节点的具体逻辑实现(被其它入队方法调用),看起来很简单:队列为空表示是第一个入队节点,则first、last都指向该节点;否则在原来的last节点后面插入该新节点,该新节点成为新的last,成为原last的后继,原last成为该节点的前驱。
头部出队
1 /** 2 * 删除并返回第一个元素,如果为空则返回null。 3 */ 4 private E unlinkFirst() { 5 // assert lock.isHeldByCurrentThread(); 6 Node<E> f = first; 7 if (f == null) 8 return null; //队列为空,返回null 9 Node<E> n = f.next; 10 E item = f.item; 11 f.item = null; //将第一个节点数据置空 12 f.next = f; // 将第一个节点自链接,辅助垃圾回收 13 first = n; //将第一个节点的后继置为first 14 if (n == null) 15 last = null; //队列为空了,把first,last都置为null 16 else 17 n.prev = null;//否则把新的first的前驱置为null 18 --count; 19 notFull.signal(); 20 return item; 21 }
unlinkFirst是从对头移除元素的具体逻辑(被其它出队方法调用),看起来很简单:队列为空时返回null;否则将first节点出队,返回first节点的数据项之前将其数据item置为null,后继指向自身,first的后继节点成为新的first,新first 的前驱保持为null,如果移除之后队列为空了,则将first、last都置为null,还原到最初状态。
尾部出队
1 /** 2 * 删除并返回最后一个元素,如果为空则返回null。 3 */ 4 private E unlinkLast() { 5 // assert lock.isHeldByCurrentThread(); 6 Node<E> l = last; 7 if (l == null) 8 return null; //队列为空,返回null 9 Node<E> p = l.prev; 10 E item = l.item; 11 l.item = null; //将最后一个节点数据置空 12 l.prev = l; // 将最后一个节点自链接,辅助垃圾回收 13 last = p; //将最后一个节点的前驱置为新的last 14 if (p == null) 15 first = null; //队列为空了,把first,last都置为null 16 else 17 p.next = null;//否则把新的last的后继置为null 18 --count; 19 notFull.signal(); 20 return item; 21 }
unlinkLast是从对尾移除元素的具体逻辑(被其它出队方法调用),看起来很简单:队列为空时返回null;否则将last节点出队,返回last节点的数据项之前将其数据item置为null,前驱指向自身,last的前驱节点成为新的last,新last的后继保持为null,如果移除之后队列为空了,则将first、last都置为null,还原到最初状态。
内部节点移除
1 /** 2 * Unlinks x. 断开节点x 3 */ 4 void unlink(Node<E> x) { 5 // assert lock.isHeldByCurrentThread(); 6 Node<E> p = x.prev; 7 Node<E> n = x.next; 8 if (p == null) { //当前节点没有前驱,则说明是first,直接使用unlinkFirst出队 9 unlinkFirst(); 10 } else if (n == null) {//当前节点没有后继,说明是last,直接使用unlinkLast出队 11 unlinkLast(); 12 } else { 13 //是非端点的内部节点,直接改变前驱和后继节点的相关链接 14 p.next = n; 15 n.prev = p; 16 x.item = null; //将当前节点数据item置空 17 //注意:这里并没有将节点本身的前驱后继指针改变,是为了防止迭代器异常 18 // Don't mess with x's links. They may still be in use by 19 // an iterator. 20 --count; 21 notFull.signal(); 22 } 23 }
unlink是内部节点移除的逻辑(被其它加锁方法调用),逻辑依然很简单,只是要注意的是,对内部节点的移除,只是将其数据item置空,然后让它的前驱和后继节点链接起来,而没有直接改变被移除节点对其它节点的链接,是为了保证迭代器的正常工作。
以上这些方法基本算组成了LinkedBlockingDeque其它方法的真正实现,那些真正使用时调用的public方法都是基于这些方法实现的,包括阻塞/超时方法也仅仅是在这些方法的基础上加入锁的等待唤醒机制。例如如下这些方法都是基于这些方法实现的:
1 public boolean offerFirst(E e) { 2 if (e == null) throw new NullPointerException(); 3 Node<E> node = new Node<E>(e); 4 final ReentrantLock lock = this.lock; 5 lock.lock(); 6 try { 7 return linkFirst(node); 8 } finally { 9 lock.unlock(); 10 } 11 } 12 13 /** 14 * @throws NullPointerException {@inheritDoc} 15 */ 16 public boolean offerLast(E e) { 17 if (e == null) throw new NullPointerException(); 18 Node<E> node = new Node<E>(e); 19 final ReentrantLock lock = this.lock; 20 lock.lock(); 21 try { 22 return linkLast(node); 23 } finally { 24 lock.unlock(); 25 } 26 } 27 public void putFirst(E e) throws InterruptedException { 28 if (e == null) throw new NullPointerException(); 29 Node<E> node = new Node<E>(e); 30 final ReentrantLock lock = this.lock; 31 lock.lock(); 32 try { 33 while (!linkFirst(node)) 34 notFull.await(); 35 } finally { 36 lock.unlock(); 37 } 38 } 39 public void putLast(E e) throws InterruptedException { 40 if (e == null) throw new NullPointerException(); 41 Node<E> node = new Node<E>(e); 42 final ReentrantLock lock = this.lock; 43 lock.lock(); 44 try { 45 while (!linkLast(node)) 46 notFull.await(); 47 } finally { 48 lock.unlock(); 49 } 50 } 51 public boolean offerFirst(E e, long timeout, TimeUnit unit) 52 throws InterruptedException { 53 if (e == null) throw new NullPointerException(); 54 Node<E> node = new Node<E>(e); 55 long nanos = unit.toNanos(timeout); 56 final ReentrantLock lock = this.lock; 57 lock.lockInterruptibly(); 58 try { 59 while (!linkFirst(node)) { 60 if (nanos <= 0) 61 return false; 62 nanos = notFull.awaitNanos(nanos); 63 } 64 return true; 65 } finally { 66 lock.unlock(); 67 } 68 }
包括removeFirstOccurrence、removeLastOccurrence
1 //从first开始遍历,拿走item相等的,并断开节点 2 public boolean removeFirstOccurrence(Object o) { 3 if (o == null) return false; 4 final ReentrantLock lock = this.lock; 5 lock.lock(); 6 try { 7 for (Node<E> p = first; p != null; p = p.next) { 8 if (o.equals(p.item)) { 9 unlink(p); 10 return true; 11 } 12 } 13 return false; 14 } finally { 15 lock.unlock(); 16 } 17 } 18 //从last开始向前遍历,拿走item相等的,并断开节点 19 public boolean removeLastOccurrence(Object o) { 20 if (o == null) return false; 21 final ReentrantLock lock = this.lock; 22 lock.lock(); 23 try { 24 for (Node<E> p = last; p != null; p = p.prev) { 25 if (o.equals(p.item)) { 26 unlink(p); 27 return true; 28 } 29 } 30 return false; 31 } finally { 32 lock.unlock(); 33 } 34 }
这些方法汇总如下:①操作队头的方法
操作队头的方法 | ||||
失败抛异常 | 失败返回特殊值 | 条件不满足阻塞 | 条件不满足等待给定时长 | |
入队 | push/addFirst(e) |
offerFirst(e) |
putFirst(e) | offerFirst(e, time, unit) |
出队(移除) | pop/remove/removeFirst() |
poll/pollFirst() |
take/takeFirst() |
poll/pollFirst(time, unit) |
获取但不移除 |
element/getFirst() |
peek/peekFirst() | 不支持 | 不支持 |
②操作队尾的方法:
操作队尾的方法 | ||||
失败抛异常 | 失败返回特殊值 | 条件不满足阻塞 | 条件不满足等待给定时长 | |
入队 |
add/addLast(e) |
offer/offerLast(e) |
put/putLast(e) | offer/offerLast(e, time, unit) |
出队(移除) | removeLast() |
pollLast() |
takeLast() |
pollLast(time, unit) |
获取但不移除 | getLast() |
peekLast() |
不支持 | 不支持 |
其它一些方法,drainTo从对头开始一个一个的将全部或指定数量的节点的item数据添加到另一个集合中,并移除相应的节点,toArray从队头开始获取但不移除全部或指定数量节点的数据至数组中,contains(Object o)从队头开始依次比较节点的数据item,找到一个相等的立即返回true,遍历完都没找到返回false,这些方法并没有做item是否为空的判断,这是因为所以方法都是需要先获取同一个锁,所以在这些方法执行的时候不存在那样的节点。
clear()方法从对头first开始一个一个的将每一个节点的数据,前驱,后驱清空,执行完之后队列为空。LinkedBlockingDeque的方法都很简单就不一一介绍了。
迭代器
LinkedBlockingDeque支持正向和逆向的两种迭代器,分别是方法iterator、descendingIterator:
1 //按正确的顺序返回deque中元素的迭代器。元素将按从第一个(head)到最后一个(tail)的顺序返回。 2 //返回的迭代器是弱一致的。 3 public Iterator<E> iterator() { 4 return new Itr(); 5 } 6 7 //以相反的顺序返回deque中元素的迭代器。元素将按从最后(tail)到第一个(head)的顺序返回。 8 //返回的迭代器是弱一致的。 9 public Iterator<E> descendingIterator() { 10 return new DescendingItr(); 11 }
它们的逻辑主要是由一个内部抽象类AbstractItr来实现,而iterator和descendingIterator仅仅实现了AbstractItr的抽象方法,用来指示迭代器的开始位置和迭代方法,为了保证迭代器的弱一致性,迭代器在创建实例的时候就已经拿到了第一个节点next和其节点数据,为了实现迭代器的remove方法,迭代器还保留了迭代的上一个节点lastRet,用于获取迭代器的下一个节点的主要逻辑由succ方法实现:
1 /** 2 * 返回给定非空节点的后续节点,但可能已被删除。 3 */ 4 private Node<E> succ(Node<E> n) { 5 for (;;) { 6 Node<E> s = nextNode(n); 7 if (s == null) //队列没有更多元素了,空或者到头了 8 return null; 9 else if (s.item != null)//节点有效 10 return s; 11 else if (s == n) //自链接表示是正常的从队列头或者尾出队,所以直接拿最近的头或者尾即可 12 return firstNode(); 13 else 14 n = s; //否则表示s是被移除的内部节点,继续看下一个 15 } 16 }
可见迭代器会排除那些被移除的无效节点,迭代器的其它方法很简单就不一一列举了。
可拆分迭代器Spliterator
LinkedBlockingDeque的可拆分迭代器由内部类LBDSpliterator实现,它不像普通迭代器那样可以支持正向和反向迭代,可拆分迭代器仅支持正向的拆分迭代:
1 public Spliterator<E> spliterator() { 2 return new LBDSpliterator<E>(this); 3 }
LinkedBlockingDeque的可拆分迭代器虽然也需要获取同一把锁,但是不同于其它所有方法,可拆分迭代器会队节点的数据item进行null值判断,只对item不为空的数据做处理,该可拆分迭代器的实现和LinkedTransferQueue的实现几乎一致,tryAdvance从对头开始查找获取队列中第一个item不为空的数据节点的数据做指定的操作,forEachRemaining从队头开始循环遍历当前队列中item不为空的数据节点的数据做指定的操作源码都很简单,就不贴代码了,至于它的拆分方法trySplit,其实和ConcurrentLinkedQueue拆分方式是一样的,代码都几乎一致,它不是像ArrayBlockingQueue那样每次分一半,而是第一次只拆一个元素,第二次拆2个,第三次拆三个,依次内推,拆分的次数越多,拆分出的新迭代器分的得元素越多,直到一个很大的数MAX_BATCH(33554432) ,后面的迭代器每次都分到这么多的元素,拆分的实现逻辑很简单,每一次拆分结束都记录下拆分到哪个元素,下一次拆分从上次结束的位置继续往下拆分,直到没有元素可拆分了返回null。
总结
LinkedBlockingDeque是以单个锁实现的基于链表的可选容量的双端队列,可以从对头、对尾两端插入和移除元素,这主要是通过节点同时持有前驱节点和后继节点的引用来达到同时支持正向反向的操作,它的迭代器支持正向和反向两种迭代方式。但是由于它的所有方法(几乎100%)都需要获取同一把独占锁,因此在竞争激烈的多线程并发环境下吞吐量非常底下。