ConcurrentLinkedQueue
ConcurrentLinkedQueue 能解决什么问题?什么时候使用 ConcurrentLinkedQueue?
1)ConcurrentLinkedQueue 是基于单向链表实现的线程安全【基于 CAS 实现】的、无界、FIFO、非阻塞队列。
2)ConcurrentLinkedQueue 的 offer 和 poll 操作都是非阻塞的。
如何使用 ConcurrentLinkedQueue?
1)ConcurrentLinkedQueue 并发性能比 LinkedBlockingQueue 高,但是当无元素可用时,
频繁的自旋拉取会导致 CPU 飙升,所以当消费者没有拉取到元素时,建议休眠指定的时间后再重试。
使用 ConcurrentLinkedQueue 有什么风险?
1)由于是无界非阻塞队列,当生产速率持续大于消费速率时,会导致资源耗尽,内存溢出。
2)极高的并发场景下,自旋 CAS 长时间不成功会给 CPU 带来非常大的执行开销。
ConcurrentLinkedQueue 核心操作的实现原理?
创建实例
static final class Node<E> {
volatile E item;
volatile Node<E> next;
/**
* 创建一个持有 item 的新节点
*/
Node(E item) {
ConcurrentLinkedQueue.ITEM.set(this, item);
}
/** Constructs a dead dummy node. */
Node() {}
// 写入后置节点
void appendRelaxed(Node<E> next) {
ConcurrentLinkedQueue.NEXT.set(this, next);
}
// 尝试原子更新 item
boolean casItem(E cmp, E val) {
return ConcurrentLinkedQueue.ITEM.compareAndSet(this, cmp, val);
}
}
/**
* 队列的第一个活性节点
* Invariants:
* - all live nodes are reachable from head via succ()
* 所有的节点都可以从 head 开始,通过递归 succ() 到达
* - head != null
* head 节点不为 null
* - (tmp = head).next != tmp || tmp != head
* 不会出现自连接
* Non-invariants:
* - head.item may or may not be null.
* - it is permitted for tail to lag behind head, that is, for tail
* to not be reachable from head!
*/
transient volatile Node<E> head;
/**
* A node from which the last node on list (that is, the unique
* node with node.next == null) can be reached in O(1) time.
* Invariants:
* - the last node is always reachable from tail via succ()
* 最后一个节点总是可以从 tail 递归 succ() 方法到达
* - tail != null
* tail 节点不为 null
* Non-invariants:
* - tail.item may or may not be null.
* - it is permitted for tail to lag behind head, that is, for tail
* to not be reachable from head!
* - tail.next may or may not be self-linked.
*/
private transient volatile Node<E> tail;
/**
* 创建一个空的 ConcurrentLinkedQueue 实例
*/
public ConcurrentLinkedQueue() {
head = tail = new Node<>();
}
插入元素
/**
* 将元素 e 插入到队列尾部,由于是无界队列,该操作不会被阻塞 && 返回值永远是 true
*/
@Override
public boolean offer(E e) {
// 元素值不允许为 null
final Node<E> newNode = new Node<>(Objects.requireNonNull(e));
/**
* t:tail
* p:predecessor
* 1)读取尾节点
*/
for (Node<E> t = tail, p = t;;) {
// 读取尾节点的后置节点
final Node<E> q = p.next;
// 1)当前尾节点为最后一个节点
if (q == null) {
// p is last node
// 则尝试将新节点链接到其后面
if (ConcurrentLinkedQueue.NEXT.compareAndSet(p, null, newNode)) {
/**
* 成功的 CAS 操作使得元素 e 成为队列的节点。
*/
if (p != t) {
// 如果其他线程并发修改了 tail 节点,则尝试更新 tail 节点。
ConcurrentLinkedQueue.TAIL.weakCompareAndSet(this, t, newNode);
}
return true;
}
// Lost CAS race to another thread; re-read next
}
// 2)tail 节点和最后一个节点间隔为 2
else if (p == q) {
/**
* We have fallen off list. If tail is unchanged, it will also be off-list,
* in which case we need to jump to head, from which all live nodes are always reachable.
* Else the new tail is a better bet.
* 1)如果尾节点未改变,则读取头节点
* 2)否则读取新的尾节点
*/
p = t != (t = tail) ? t : head;
// 3)如果滞后了两个节点,则重新读取 tail 节点
} else {
/**
* Check for tail updates after two hops.
* 1)tail 节点和最后一个节点间隔为 2,并且尾部节点已经更新,则读取新的尾部节点
* 2)否则更新 p 为其后置节点 q
*/
p = p != t && t != (t = tail) ? t : q;
}
}
}
拉取元素
/**
* 移除并返回队列头部元素,如果队列为空,则返回 null
* created by ZXD at 8 Dec 2018 T 17:12:16
* @return
*/
@Override
public E poll() {
restartFromHead: for (;;) {
// 读取头节点
for (Node<E> h = head, p = h, q;; p = q) {
final E item;
// 1)head 节点元素不为 null,则尝试将其更新为 null,此时的 head 不是傀儡节点。
if ((item = p.item) != null && p.casItem(item, null)) {
/**
* Successful CAS is the linearization point for item to be removed from this queue。
* 成功的 CAS 操作说明我们已经移除了队列头部元素
*/
if (p != h) {
// 头节点是哨兵节点,则一次移动两个位置,新头节点为数据节点
updateHead(h, (q = p.next) != null ? q : p);
}
// 返回目标元素
return item;
}
// 后置节点为 null
else if ((q = p.next) == null) {
// 将头结点 h 更新为 p
updateHead(h, p);
return null;
}
else if (p == q) {
continue restartFromHead;
}
}
}
}