JUC源码分析-集合篇(十)LinkedTransferQueue
LinkedTransferQueue(LTQ) 相比 BlockingQueue 更进一步,生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费(不仅仅是添加到队列里就完事)。新添加的 transfer 方法用来实现这种约束。顾名思义,阻塞就是发生在元素从一个线程 transfer 到另一个线程的过程中,它有效地实现了元素在线程之间的传递(以建立 Java 内存模型中的 happens-before 关系的方式)。Doug Lea 说从功能角度来讲,LinkedTransferQueue 实际上是 ConcurrentLinkedQueue、SynchronousQueue(公平模式)和 LinkedBlockingQueue 的超集。而且 LinkedTransferQueue 更好用,因为它不仅仅综合了这几个类的功能,同时也提供了更高效的实现。
1. LinkedTransferQueue 概况
推荐一篇 LinkedTransferQueue 的介绍:http://ifeve.com/java-transfer-queue/
1.1 TransferQueue 接口
LinkedTransferQueue 实现了 TransferQueue 接口,下面就主要介绍一下这个接口。 TransferQueue 继承了 BlockingQueue(BlockingQueue 又继承了 Queue)并扩展了一些新方法。BlockingQueue(和Queue)是 JDK5 中加入的接口,它是指这样的一个队列:当生产者向队列添加元素但队列已满时,生产者会被阻塞;当消费者从队列移除元素但队列为空时,消费者会被阻塞。
TransferQueue 则更进一步,生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费(不仅仅是添加到队列里就完事)。新添加的 transfer 方法用来实现这种约束。顾名思义,阻塞就是发生在元素从一个线程 transfer 到另一个线程的过程中,它有效地实现了元素在线程之间的传递(以建立 Java 内存模型中的 happens-before 关系的方式)。
TransferQueue 还包括了其他的一些方法:两个 tryTransfer 方法,一个是非阻塞的,另一个带有 timeout 参数设置超时时间的。还有两个辅助方法 hasWaitingConsumer() 和 getWaitingConsumerCount()。
1.2 LinkedTransferQueue 特点
当我第一次看到 LinkedTransferQueue 时,首先想到了已有的实现类 SynchronousQueue。SynchronousQueue 的队列长度为 0,特别是对于两个线程之间传递元素这种用例。
LinkedTransferQueue 相比 SynchronousQueue 用处更广、更好用,因为你可以决定是使用 BlockingQueue 的方法(译者注:例如put方法)还是确保一次传递完成(译者注:即transfer方法)。在队列中已有元素的情况下,调用 transfer 方法,可以确保队列中被传递元素之前的所有元素都能被处理。
LinkedTransferQueue 的性能分别是 SynchronousQueue 的3倍(非公平模式)和14倍(公平模式)。因为像 ThreadPoolExecutor 这样的类在任务传递时都是使用 SynchronousQueue,所以使用 LinkedTransferQueue 来代替 SynchronousQueue 也会使得 ThreadPoolExecutor 得到相应的性能提升。
下面你可以参考这往篇文章实现一个自己的 LinkedTransferQueue:http://ifeve.com/customizing-concurrency-classes-11-2/#more-7388
2. LTQ 原理
LTQ 内部采用的是一种非常不同的队列,即松弛型双重队列(Dual Queues with Slack):http://ifeve.com/buglinkedtransferqueue-bug/#more-11117
强烈建议大家读一下 Doug Lea 的 java doc 文档,对 LTQ 的数据结构有很清楚的说明。
2.1 双重队列(Dual Queues)
/**
* A FIFO dual queue may be implemented using a variation of the
* Michael & Scott (M&S) lock-free queue algorithm
* (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf).
* It maintains two pointer fields, "head", pointing to a
* (matched) node that in turn points to the first actual
* (unmatched) queue node (or null if empty); and "tail" that
* points to the last node on the queue (or again null if
* empty). For example, here is a possible queue with four data
* elements:
*
* head tail
* | |
* v v
* M -> U -> U -> U -> U
*
* M(matched) U(unmatched)
*/
翻译:FIFO 双队列可以使用 Michael & Scott(M&S)无锁队列算法的变体实现。它维护两个指针字段: head 指向第一个不匹配节点(M)的前驱节点(如果为空则为空);tail 指向队列中的最后一个节点(如果为空则为空)。
双重是指有两种类型相互对立的节点(Node.isData==false或true),并且我理解的每种节点都有三种状态:
- UNMATCHED 节点构造完成,刚进入队列的状态
- MATCHED 节点备置为“满足”状态,即入队节点标识的线程成功取得或者传递了数据
- CANCELED 节点被置为取消状态,即入队节点标识的线程因为超时或者中断决定放弃等待
2.2 松弛度(Slack)
/**
* 在更新head/tail和查找中寻求平衡,大多数场景1~3比较合适。
* 本质上:是增加对 volatile 变量读操作来减少了对 volatile 变量的写操作
* 而对 volatile 变量的写操作开销要远远大于读操作,因此使用Slack能增加效率
*
* We introduce here an approach that lies between the extremes of
* never versus always updating queue (head and tail) pointers.
* This offers a tradeoff between sometimes requiring extra
* traversal steps to locate the first and/or last unmatched
* nodes, versus the reduced overhead and contention of fewer
* updates to queue pointers. For example, a possible snapshot of
* a queue is:
*
* head tail
* | |
* v v
* M -> M -> U -> U -> U -> U
*
* The best value for this "slack" (the targeted maximum distance
* between the value of "head" and the first unmatched node, and
* similarly for "tail") is an empirical matter. We have found
* that using very small constants in the range of 1-3 work best
* over a range of platforms. Larger values introduce increasing
* costs of cache misses and risks of long traversal chains, while
* smaller values increase CAS contention and overhead.
*/
为了节省 CAS 操作的开销,LTQ 引入了“松弛度”的概念:在节点被匹配(被删除)之后,不会立即更新 head/tail,而是当 head/tail 节点和最近一个未匹配的节点之间的距离超过一个“松弛阀值”之后才会更新(在 LTQ 中,这个值为 2)。这个“松弛阀值”一般为1-3,如果太大会降低缓存命中率,并且会增加遍历链的长度;太小会增加 CAS 的开销。另外在 ConcurrentLinkedQueue 也有相应的应用:hops 设计意图
2.3 节点自链接
已匹配节点的 next 引用会指向自身。如果 GC 延迟回收,已删除节点链会积累的很长,此时垃圾收集会耗费高昂的代价,并且所有刚匹配的节点也不会被回收。为了避免这种情况,我们在 CAS 向后推进 head 时,会把已匹配的 head 的"next"引用指向自身(即“自链接节点”),这样就限制了连接已删除节点的长度(我们也采取类似的方法,清除在其他节点字段中可能的垃圾保留值)。如果在遍历时遇到一个自链接节点,那就表明当前线程已经滞后于另外一个更新 head 的线程,此时就需要重新获取 head 来遍历。
所以,在 LTQ 中,数据在某个线程的“某一时刻”可能存在下面这种形式:
- unmatched node:未被匹配的节点。可能是一个生产者节点(item!=null),也可能是一个消费者节点(item==null)。
- matched node:已经被匹配的节点。可能是一个生产者节点(item!=null)的数据已经被一个消费者拿走;也可能是一个消费者节点(item==null)已经被一个生产者填充上数据。
3. 数据结构
3.1 Node 节点
static final class QNode {
volatile Object item; // 节点包含的数据,非空表示生产者,空者是消费者
final boolean isData; // 表示该节点由生产者创建还是由消费者创建,生产者true,消费者false
volatile Thread waiter; // 等待在该节点上的线程。to control park/unpark
volatile QNode next; // 指向队列中的下一个节点
}
Node 节点本身就是一个原子性操作,对节点的属性 item、waiter、next 都是原子性操作。
- forgetNext 是将节点踢出队列。
- forgetContents 是将节点踢出队列后,将节点的属性 item 和 waiter 置空或自连接,便于 GC 垃圾回收。
- isMatched 判断节点是否已经匹配成功。
- isUnmatchedRequest 是否是请求节点,且还未匹配成功。
- cannotPrecede 能否将指定的节点 node 追加到当前节点后。node 节点属性与当前节点相反,且当前节点还未进行匹配则不能追加。
- tryMatchData 尝试匹配数据节点,匹配成功返回 true,即将 item 设置为 null 成功
3.2 LinkedTransferQueue
transient volatile Node head;
private transient volatile Node tail;
// 立刻、异步、同步、超时返回
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer
LinkedTransferQueue 主要方法介绍:
- transfer 阻塞式的将数据从一个线程传递到另一个线程。
- tryTransfer 则是非阻塞式的将数据从一个线程传递到另一个线程。
- xfer 最核心的方法。将数据从一个线程传递到另一个线程。
- tryAppend 将节点添加到队列中。
- awaitMatch 匹配节点。
public LinkedTransferQueue() {
}
LinkedTransferQueue 初始化时什么也没做,也就是说 head=tail=null。
4. 源码分析
4.1 核心方法 xfer
/**
* @param e the item or null for take
* @param haveData true if this is a put, else a take
* @param how NOW, ASYNC, SYNC, or TIMED
* @param nanos timeout in nanosecs, used only if mode is TIMED
*/
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
// 1. 尝试匹配
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
Object item = p.item;
// 1.2 p节点还未匹配则尝试进行匹配,为什么不调用 !p.isMatched() ????
if (item != p && (item != null) == isData) { // unmatched
// 1.3 两个节点的模式一样,则直接跳出循环,尝试入队
if (isData == haveData) // can't match
break;
// 1.4 p匹配成功
if (p.casItem(item, e)) { // match
for (Node q = p; q != h;) {
// 1.5 p已经匹配,直接将n设置为头节点。h -> p -> n
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
// 1.6 有其它线程更新了头节点,再次判断 slack<2。
// h -> q 如果 q.isMatched() 则可以将 q.next 设置为头节点
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
LockSupport.unpark(p.waiter); // 唤醒等待的线程后直接返回
return LinkedTransferQueue.<E>cast(item);
}
}
// 1.7 p==p.next 则说明p已经出队,失效了。需要重新从头节点开始匹配
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
// 2. 到了这一步,只有未匹配上一种情况。根据how判断节点是否要入队并等待其它线程匹配
if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, haveData);
// 2.1 节点尝试入队,入队失败继续尝试
Node pred = tryAppend(s, haveData);
if (pred == null)
continue retry; // lost race vs opposite mode
// 2.2 等待其它线程匹配成功后唤醒当前线程
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
说明:xfer 大致可以分三部分:
- 首先,节点尝试和队列中已经的元素进行匹配,匹配成功(1.4)则唤醒等待节点的线程后直接返回。匹配成功只要松弛度大于等于2(h -> p -> n),需要重新将头节点设置为 n。
- 其次,匹配失败则调用 tryAppend 尝试入队(2.1),入队失败后则自旋直至入队成功,入队后线程会自旋或被挂起;
- 最后,调用 awaitMatch 方法(2.2),等待其它线程匹配上后唤醒该线程。
如果在上述操作中没有找到匹配节点,则根据参数 how 做不同的处理:
NOW(poll, tryTransfer)
:立即返回。SYNC(transfer, take)
:通过 tryAppend 方法插入一个新的节点 s(item=e,isData = haveData)到队列尾,然后自旋或阻塞当前线程直到节点被匹配或者取消返回。ASYNC(offer, put, add)
:通过 tryAppend 方法插入一个新的节点 s(item=e,isData = haveData)到队列尾,异步直接返回。TIMED(timed poll, tryTransfer)
:通过 tryAppend 方法插入一个新的节点 s(item=e,isData = haveData)到队列尾,然后自旋或阻塞当前线程直到节点被匹配或者取消或等待超时返回。
// 1. NOW(poll, tryTransfer)
public boolean tryTransfer(E e) {
return xfer(e, true, NOW, 0) == null;
}
// 2. SYNC(transfer, take)
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
// 3. ASYNC(offer, put, add)
public boolean offer(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
// 4. TIMED(timed poll, tryTransfer)
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
这里可以看到如果使用异步(ASYNC)的方式时线程不会阻塞,如 offer 时同一线程的数据节点也可以入队,也就是存储的数据长度不再是 0,这也是和 SynchronousQueue 一个很大的不同点。所以 Doug Lea 说从功能角度来讲,LinkedTransferQueue 实际上是 ConcurrentLinkedQueue、SynchronousQueue(公平模式)和 LinkedBlockingQueue 的超集。
4.2 入队 tryAppend
private Node tryAppend(Node s, boolean haveData) {
for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
// 1. 节点初始化
if (p == null && (p = head) == null) {
if (casHead(null, s))
return s; // initialize
}
// 2. 节点s不能追加到p节点后。①p和s的模式不同且②p还未匹配
else if (p.cannotPrecede(haveData))
return null; // lost race vs opposite mode
// 3. p 不是尾节点
else if ((n = p.next) != null) // not last; keep traversing
// t -> p 时 tail 改变则需要重新定位到尾节点
// p节点已经出队则需要从 head 开始重新遍历
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
// 4. 有其它线程添加节点时继续自旋,直到成功
else if (!p.casNext(null, s))
p = p.next; // re-read on CAS failure
// 5. 终于添加到队列中。尝试更新尾节点
else {
// 如果 p!=t 则队列情况如下,需要更新尾节点: t -> p -> s
if (p != t) { // update if slack now >= 2
// 5.1 其它线程已经更新 tail,重新进行下面三个条件的判断
// 5.2 t.next.next!=null 则需要重新更新 tail。至于s!=t则是此时t没有踢出队列
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
return p;
}
}
}
说明:添加给定节点 s 到队列尾并返回 s 的前继节点;失败时(与其他不同模式线程竞争失败)返回 null,此时 s 的前继节点 p 的模式和 s 不同且 p 尚未被匹配,如 s 为请求节点,p 为数据节点且未匹配则不能将 s 追加到 p 后面。
4.3 匹配 awaitMatch
/**
* Spins/yields/blocks 直到s节点matched或canceled
*
* @param s the waiting node
* @param pred s的前驱节点,如果没有前驱节点则为s自己
* @param e s节点的原始值
* @param timed true时限时等待,false时无限等待
* @param nanos timeout in nanosecs, used only if timed is true
* @return matched item, or e if unmatched on interrupt or timeout
*/
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = -1; // initialized after first item and cancel checks
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
Object item = s.item;
// 1. item已经被修改,说明匹配成功。返回匹配后的值
if (item != e) { // matched
// assert item != s;
s.forgetContents(); // avoid garbage
return LinkedTransferQueue.<E>cast(item);
}
// 2. 超时,返回匹配前的值
if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { // cancel
unsplice(pred, s);
return e;
}
// 3. 设置自旋次数
if (spins < 0) { // establish spins at/near front
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
// 4. 自旋,有很小的概率调用 yeild
} else if (spins > 0) { // spin
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield
// 5. 设置等待线程,让其它线程唤醒
} else if (s.waiter == null) {
s.waiter = w; // request unpark then recheck
// 6. 阻塞直至其它线程唤醒,继续循环直到匹配成功或超时退出
} else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
} else {
LockSupport.park(this);
}
}
}
说明:当前操作为同步操作时,会调用 awaitMatch 方法阻塞等待匹配,成功返回匹配后节点 item,超时则返回匹配前节点的 item 值 e。在等待期间如果线程被中断或等待超时,则取消匹配,并调用 unsplice 方法解除节点 s 和其前继节点的链接。
// 计算自旋次数 FRONT_SPINS=1<<7,CHAINED_SPINS=1<<6
private static int spinsFor(Node pred, boolean haveData) {
if (MP && pred != null) {
if (pred.isData != haveData) // phase change
return FRONT_SPINS + CHAINED_SPINS;
if (pred.isMatched()) // probably at front
return FRONT_SPINS;
if (pred.waiter == null) // pred apparently spinning
return CHAINED_SPINS;
}
return 0; // 单核CPU时不自旋
}
4.4 其它方法说明
4.4.1 数据节点个数 size
// 统计数据节点个数
public int size() {
return countOfMode(true);
}
// 统计请求节点个数
public int getWaitingConsumerCount() {
return countOfMode(false);
}
private int countOfMode(boolean data) {
int count = 0;
for (Node p = head; p != null; ) {
if (!p.isMatched()) { // p未匹配且属性指定的data模式则 ++count
if (p.isData != data)
return 0;
if (++count == Integer.MAX_VALUE) // saturated
break;
}
Node n = p.next;
if (n != p) // 下一个节点
p = n;
else { // p节点失效则计数器归0,重新从 head 遍历
count = 0;
p = head;
}
}
return count;
}
4.4.2 包含 contains
// 查找是否包含指定的数据节点 item=o
public boolean contains(Object o) {
if (o == null) return false;
for (Node p = head; p != null; p = succ(p)) {
Object item = p.item;
if (p.isData) {
if (item != null && item != p && o.equals(item))
return true;
} else if (item == null) // 有请求节点了,不用再匹配
break;
}
return false;
}
// 后继节点,如果节点失效,直接从 head 开始
final Node succ(Node p) {
Node next = p.next;
return (p == next) ? head : next;
}
4.4.3 是否有请求节点 hasWaitingConsumer
// 是否有请求节点
public boolean hasWaitingConsumer() {
return firstOfMode(false) != null;
}
// 查找第一个 isData 模式的未匹配节点
private Node firstOfMode(boolean isData) {
for (Node p = head; p != null; p = succ(p)) {
if (!p.isMatched())
return (p.isData == isData) ? p : null;
}
return null;
}
参考:
- 《JUC源码分析-集合篇(六):LinkedTransferQueue》:https://www.jianshu.com/p/42ceaed2afe6
每天用心记录一点点。内容也许不重要,但习惯很重要!