zoukankan      html  css  js  c++  java
  • JUC源码分析-集合篇(十)LinkedTransferQueue

    JUC源码分析-集合篇(十)LinkedTransferQueue

    LinkedTransferQueue(LTQ) 相比 BlockingQueue 更进一步,生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费(不仅仅是添加到队列里就完事)。新添加的 transfer 方法用来实现这种约束。顾名思义,阻塞就是发生在元素从一个线程 transfer 到另一个线程的过程中,它有效地实现了元素在线程之间的传递(以建立 Java 内存模型中的 happens-before 关系的方式)。Doug Lea 说从功能角度来讲,LinkedTransferQueue 实际上是 ConcurrentLinkedQueue、SynchronousQueue(公平模式)和 LinkedBlockingQueue 的超集。而且 LinkedTransferQueue 更好用,因为它不仅仅综合了这几个类的功能,同时也提供了更高效的实现。

    1. LinkedTransferQueue 概况

    推荐一篇 LinkedTransferQueue 的介绍:http://ifeve.com/java-transfer-queue/

    1.1 TransferQueue 接口

    LinkedTransferQueue 实现了 TransferQueue 接口,下面就主要介绍一下这个接口。 TransferQueue 继承了 BlockingQueue(BlockingQueue 又继承了 Queue)并扩展了一些新方法。BlockingQueue(和Queue)是 JDK5 中加入的接口,它是指这样的一个队列:当生产者向队列添加元素但队列已满时,生产者会被阻塞;当消费者从队列移除元素但队列为空时,消费者会被阻塞。

    TransferQueue 则更进一步,生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费(不仅仅是添加到队列里就完事)。新添加的 transfer 方法用来实现这种约束。顾名思义,阻塞就是发生在元素从一个线程 transfer 到另一个线程的过程中,它有效地实现了元素在线程之间的传递(以建立 Java 内存模型中的 happens-before 关系的方式)。

    TransferQueue 还包括了其他的一些方法:两个 tryTransfer 方法,一个是非阻塞的,另一个带有 timeout 参数设置超时时间的。还有两个辅助方法 hasWaitingConsumer() 和 getWaitingConsumerCount()。

    1.2 LinkedTransferQueue 特点

    当我第一次看到 LinkedTransferQueue 时,首先想到了已有的实现类 SynchronousQueue。SynchronousQueue 的队列长度为 0,特别是对于两个线程之间传递元素这种用例。

    LinkedTransferQueue 相比 SynchronousQueue 用处更广、更好用,因为你可以决定是使用 BlockingQueue 的方法(译者注:例如put方法)还是确保一次传递完成(译者注:即transfer方法)。在队列中已有元素的情况下,调用 transfer 方法,可以确保队列中被传递元素之前的所有元素都能被处理。

    LinkedTransferQueue 的性能分别是 SynchronousQueue 的3倍(非公平模式)和14倍(公平模式)。因为像 ThreadPoolExecutor 这样的类在任务传递时都是使用 SynchronousQueue,所以使用 LinkedTransferQueue 来代替 SynchronousQueue 也会使得 ThreadPoolExecutor 得到相应的性能提升。

    下面你可以参考这往篇文章实现一个自己的 LinkedTransferQueue:http://ifeve.com/customizing-concurrency-classes-11-2/#more-7388

    2. LTQ 原理

    LTQ 内部采用的是一种非常不同的队列,即松弛型双重队列(Dual Queues with Slack):http://ifeve.com/buglinkedtransferqueue-bug/#more-11117

    强烈建议大家读一下 Doug Lea 的 java doc 文档,对 LTQ 的数据结构有很清楚的说明。

    2.1 双重队列(Dual Queues)

    /**
     * A FIFO dual queue may be implemented using a variation of the
     * Michael & Scott (M&S) lock-free queue algorithm
     * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf).
     * It maintains two pointer fields, "head", pointing to a
     * (matched) node that in turn points to the first actual
     * (unmatched) queue node (or null if empty); and "tail" that
     * points to the last node on the queue (or again null if
     * empty). For example, here is a possible queue with four data
     * elements:
     *
     *  head                tail
     *    |                   |
     *    v                   v
     *    M -> U -> U -> U -> U
     *    
     *  M(matched)  U(unmatched)
     */
    

    翻译:FIFO 双队列可以使用 Michael & Scott(M&S)无锁队列算法的变体实现。它维护两个指针字段: head 指向第一个不匹配节点(M)的前驱节点(如果为空则为空);tail 指向队列中的最后一个节点(如果为空则为空)。

    双重是指有两种类型相互对立的节点(Node.isData==false或true),并且我理解的每种节点都有三种状态:

    1. UNMATCHED 节点构造完成,刚进入队列的状态
    2. MATCHED 节点备置为“满足”状态,即入队节点标识的线程成功取得或者传递了数据
    3. CANCELED 节点被置为取消状态,即入队节点标识的线程因为超时或者中断决定放弃等待

    2.2 松弛度(Slack)

    /**
     * 在更新head/tail和查找中寻求平衡,大多数场景1~3比较合适。
     * 本质上:是增加对 volatile 变量读操作来减少了对 volatile 变量的写操作
     * 而对 volatile 变量的写操作开销要远远大于读操作,因此使用Slack能增加效率
     * 
     * We introduce here an approach that lies between the extremes of
     * never versus always updating queue (head and tail) pointers.
     * This offers a tradeoff between sometimes requiring extra
     * traversal steps to locate the first and/or last unmatched
     * nodes, versus the reduced overhead and contention of fewer
     * updates to queue pointers. For example, a possible snapshot of
     * a queue is:
     *
     *  head           tail
     *    |              |
     *    v              v
     *    M -> M -> U -> U -> U -> U
     *
     * The best value for this "slack" (the targeted maximum distance
     * between the value of "head" and the first unmatched node, and
     * similarly for "tail") is an empirical matter. We have found
     * that using very small constants in the range of 1-3 work best
     * over a range of platforms. Larger values introduce increasing
     * costs of cache misses and risks of long traversal chains, while
     * smaller values increase CAS contention and overhead.
     */
    

    为了节省 CAS 操作的开销,LTQ 引入了“松弛度”的概念:在节点被匹配(被删除)之后,不会立即更新 head/tail,而是当 head/tail 节点和最近一个未匹配的节点之间的距离超过一个“松弛阀值”之后才会更新(在 LTQ 中,这个值为 2)。这个“松弛阀值”一般为1-3,如果太大会降低缓存命中率,并且会增加遍历链的长度;太小会增加 CAS 的开销。另外在 ConcurrentLinkedQueue 也有相应的应用:hops 设计意图

    2.3 节点自链接

    已匹配节点的 next 引用会指向自身。如果 GC 延迟回收,已删除节点链会积累的很长,此时垃圾收集会耗费高昂的代价,并且所有刚匹配的节点也不会被回收。为了避免这种情况,我们在 CAS 向后推进 head 时,会把已匹配的 head 的"next"引用指向自身(即“自链接节点”),这样就限制了连接已删除节点的长度(我们也采取类似的方法,清除在其他节点字段中可能的垃圾保留值)。如果在遍历时遇到一个自链接节点,那就表明当前线程已经滞后于另外一个更新 head 的线程,此时就需要重新获取 head 来遍历。

    所以,在 LTQ 中,数据在某个线程的“某一时刻”可能存在下面这种形式:

    LTQ节点

    • unmatched node:未被匹配的节点。可能是一个生产者节点(item!=null),也可能是一个消费者节点(item==null)。
    • matched node:已经被匹配的节点。可能是一个生产者节点(item!=null)的数据已经被一个消费者拿走;也可能是一个消费者节点(item==null)已经被一个生产者填充上数据。

    3. 数据结构

    数据结构

    3.1 Node 节点

    static final class QNode {
        volatile Object item;         // 节点包含的数据,非空表示生产者,空者是消费者
        final boolean isData;         // 表示该节点由生产者创建还是由消费者创建,生产者true,消费者false  
        volatile Thread waiter;       // 等待在该节点上的线程。to control park/unpark
        volatile QNode next;          // 指向队列中的下一个节点
    }
    

    Node 节点本身就是一个原子性操作,对节点的属性 item、waiter、next 都是原子性操作。

    • forgetNext 是将节点踢出队列。
    • forgetContents 是将节点踢出队列后,将节点的属性 item 和 waiter 置空或自连接,便于 GC 垃圾回收。
    • isMatched 判断节点是否已经匹配成功。
    • isUnmatchedRequest 是否是请求节点,且还未匹配成功。
    • cannotPrecede 能否将指定的节点 node 追加到当前节点后。node 节点属性与当前节点相反,且当前节点还未进行匹配则不能追加。
    • tryMatchData 尝试匹配数据节点,匹配成功返回 true,即将 item 设置为 null 成功

    3.2 LinkedTransferQueue

    transient volatile Node head;
    private transient volatile Node tail;
    
    // 立刻、异步、同步、超时返回
    private static final int NOW   = 0; // for untimed poll, tryTransfer
    private static final int ASYNC = 1; // for offer, put, add
    private static final int SYNC  = 2; // for transfer, take
    private static final int TIMED = 3; // for timed poll, tryTransfer
    

    LinkedTransferQueue 主要方法介绍:

    • transfer 阻塞式的将数据从一个线程传递到另一个线程。
    • tryTransfer 则是非阻塞式的将数据从一个线程传递到另一个线程。
    • xfer 最核心的方法。将数据从一个线程传递到另一个线程。
    • tryAppend 将节点添加到队列中。
    • awaitMatch 匹配节点。
    public LinkedTransferQueue() {
    }
    

    LinkedTransferQueue 初始化时什么也没做,也就是说 head=tail=null。

    4. 源码分析

    4.1 核心方法 xfer

    /** 
     * @param e the item or null for take
     * @param haveData true if this is a put, else a take
     * @param how NOW, ASYNC, SYNC, or TIMED
     * @param nanos timeout in nanosecs, used only if mode is TIMED
     */
    private E xfer(E e, boolean haveData, int how, long nanos) {
        if (haveData && (e == null))
            throw new NullPointerException();
        Node s = null;                        // the node to append, if needed
    
        retry:
        for (;;) {                            // restart on append race
            // 1. 尝试匹配
            for (Node h = head, p = h; p != null;) { // find & match first node
                boolean isData = p.isData;
                Object item = p.item;
                // 1.2 p节点还未匹配则尝试进行匹配,为什么不调用 !p.isMatched() ????
                if (item != p && (item != null) == isData) { // unmatched
                    // 1.3 两个节点的模式一样,则直接跳出循环,尝试入队
                    if (isData == haveData)   // can't match
                        break;
                    // 1.4 p匹配成功
                    if (p.casItem(item, e)) { // match
                        for (Node q = p; q != h;) {
                            // 1.5 p已经匹配,直接将n设置为头节点。h -> p -> n
                            Node n = q.next;  // update by 2 unless singleton
                            if (head == h && casHead(h, n == null ? q : n)) {
                                h.forgetNext();
                                break;
                            }                 // advance and retry
                            // 1.6 有其它线程更新了头节点,再次判断 slack<2。
                            //     h -> q 如果 q.isMatched() 则可以将 q.next 设置为头节点
                            if ((h = head)   == null ||
                                (q = h.next) == null || !q.isMatched())
                                break;        // unless slack < 2
                        }
                        LockSupport.unpark(p.waiter);   // 唤醒等待的线程后直接返回
                        return LinkedTransferQueue.<E>cast(item);
                    }
                }
                // 1.7 p==p.next 则说明p已经出队,失效了。需要重新从头节点开始匹配
                Node n = p.next;
                p = (p != n) ? n : (h = head); // Use head if p offlist
            }
    
            // 2. 到了这一步,只有未匹配上一种情况。根据how判断节点是否要入队并等待其它线程匹配
            if (how != NOW) {                 // No matches available
                if (s == null)
                    s = new Node(e, haveData);
                // 2.1 节点尝试入队,入队失败继续尝试
                Node pred = tryAppend(s, haveData);
                if (pred == null)
                    continue retry;           // lost race vs opposite mode
                // 2.2 等待其它线程匹配成功后唤醒当前线程
                if (how != ASYNC)
                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
            }
            return e; // not waiting
        }
    }
    

    说明:xfer 大致可以分三部分:

    1. 首先,节点尝试和队列中已经的元素进行匹配,匹配成功(1.4)则唤醒等待节点的线程后直接返回。匹配成功只要松弛度大于等于2(h -> p -> n),需要重新将头节点设置为 n。
    2. 其次,匹配失败则调用 tryAppend 尝试入队(2.1),入队失败后则自旋直至入队成功,入队后线程会自旋或被挂起;
    3. 最后,调用 awaitMatch 方法(2.2),等待其它线程匹配上后唤醒该线程。

    如果在上述操作中没有找到匹配节点,则根据参数 how 做不同的处理:

    • NOW(poll, tryTransfer):立即返回。
    • SYNC(transfer, take):通过 tryAppend 方法插入一个新的节点 s(item=e,isData = haveData)到队列尾,然后自旋或阻塞当前线程直到节点被匹配或者取消返回。
    • ASYNC(offer, put, add):通过 tryAppend 方法插入一个新的节点 s(item=e,isData = haveData)到队列尾,异步直接返回。
    • TIMED(timed poll, tryTransfer):通过 tryAppend 方法插入一个新的节点 s(item=e,isData = haveData)到队列尾,然后自旋或阻塞当前线程直到节点被匹配或者取消或等待超时返回。
    // 1. NOW(poll, tryTransfer)
    public boolean tryTransfer(E e) {
        return xfer(e, true, NOW, 0) == null;
    }
    // 2. SYNC(transfer, take)
    public void transfer(E e) throws InterruptedException {
        if (xfer(e, true, SYNC, 0) != null) {
            Thread.interrupted(); // failure possible only due to interrupt
            throw new InterruptedException();
        }
    }
    // 3. ASYNC(offer, put, add)
    public boolean offer(E e) {
        xfer(e, true, ASYNC, 0);
        return true;
    }
    // 4. TIMED(timed poll, tryTransfer)
    public boolean tryTransfer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
            return true;
        if (!Thread.interrupted())
            return false;
        throw new InterruptedException();
    }
    

    这里可以看到如果使用异步(ASYNC)的方式时线程不会阻塞,如 offer 时同一线程的数据节点也可以入队,也就是存储的数据长度不再是 0,这也是和 SynchronousQueue 一个很大的不同点。所以 Doug Lea 说从功能角度来讲,LinkedTransferQueue 实际上是 ConcurrentLinkedQueue、SynchronousQueue(公平模式)和 LinkedBlockingQueue 的超集。

    4.2 入队 tryAppend

    private Node tryAppend(Node s, boolean haveData) {
        for (Node t = tail, p = t;;) {        // move p to last node and append
            Node n, u;                        // temps for reads of next & tail
            // 1. 节点初始化
            if (p == null && (p = head) == null) {
                if (casHead(null, s))
                    return s;                 // initialize
            }
            // 2. 节点s不能追加到p节点后。①p和s的模式不同且②p还未匹配
            else if (p.cannotPrecede(haveData))
                return null;                  // lost race vs opposite mode
            // 3. p 不是尾节点
            else if ((n = p.next) != null)    // not last; keep traversing
                // t -> p 时 tail 改变则需要重新定位到尾节点
                // p节点已经出队则需要从 head 开始重新遍历
                p = p != t && t != (u = tail) ? (t = u) : // stale tail
                    (p != n) ? n : null;      // restart if off list
            // 4. 有其它线程添加节点时继续自旋,直到成功
            else if (!p.casNext(null, s))
                p = p.next;                   // re-read on CAS failure
            // 5. 终于添加到队列中。尝试更新尾节点
            else {
                // 如果 p!=t 则队列情况如下,需要更新尾节点: t -> p -> s
                if (p != t) {                 // update if slack now >= 2
                    // 5.1 其它线程已经更新 tail,重新进行下面三个条件的判断
                    // 5.2 t.next.next!=null 则需要重新更新 tail。至于s!=t则是此时t没有踢出队列
                    while ((tail != t || !casTail(t, s)) &&     
                           (t = tail)   != null &&
                           (s = t.next) != null && // advance and retry
                           (s = s.next) != null && s != t);
                }
                return p;
            }
        }
    }
    

    说明:添加给定节点 s 到队列尾并返回 s 的前继节点;失败时(与其他不同模式线程竞争失败)返回 null,此时 s 的前继节点 p 的模式和 s 不同且 p 尚未被匹配,如 s 为请求节点,p 为数据节点且未匹配则不能将 s 追加到 p 后面。

    4.3 匹配 awaitMatch

    /**
     * Spins/yields/blocks 直到s节点matched或canceled
     *
     * @param s the waiting node
     * @param pred s的前驱节点,如果没有前驱节点则为s自己
     * @param e s节点的原始值
     * @param timed true时限时等待,false时无限等待
     * @param nanos timeout in nanosecs, used only if timed is true
     * @return matched item, or e if unmatched on interrupt or timeout
     */
    private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Thread w = Thread.currentThread();
        int spins = -1; // initialized after first item and cancel checks
        ThreadLocalRandom randomYields = null; // bound if needed
    
        for (;;) {
            Object item = s.item;
            // 1. item已经被修改,说明匹配成功。返回匹配后的值
            if (item != e) {                  // matched
                // assert item != s;
                s.forgetContents();           // avoid garbage
                return LinkedTransferQueue.<E>cast(item);
            }
            // 2. 超时,返回匹配前的值
            if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) {        // cancel
                unsplice(pred, s);
                return e;
            }
            // 3. 设置自旋次数
            if (spins < 0) {                  // establish spins at/near front
                if ((spins = spinsFor(pred, s.isData)) > 0)
                    randomYields = ThreadLocalRandom.current();
            // 4. 自旋,有很小的概率调用 yeild
            } else if (spins > 0) {             // spin
                --spins;
                if (randomYields.nextInt(CHAINED_SPINS) == 0)
                    Thread.yield();           // occasionally yield
            // 5. 设置等待线程,让其它线程唤醒
            } else if (s.waiter == null) {
                s.waiter = w;                 // request unpark then recheck
            // 6. 阻塞直至其它线程唤醒,继续循环直到匹配成功或超时退出
            } else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos > 0L)
                    LockSupport.parkNanos(this, nanos);
            } else {
                LockSupport.park(this);
            }
        }
    }
    

    说明:当前操作为同步操作时,会调用 awaitMatch 方法阻塞等待匹配,成功返回匹配后节点 item,超时则返回匹配前节点的 item 值 e。在等待期间如果线程被中断或等待超时,则取消匹配,并调用 unsplice 方法解除节点 s 和其前继节点的链接。

    // 计算自旋次数 FRONT_SPINS=1<<7,CHAINED_SPINS=1<<6
    private static int spinsFor(Node pred, boolean haveData) {
        if (MP && pred != null) {
            if (pred.isData != haveData)      // phase change
                return FRONT_SPINS + CHAINED_SPINS;
            if (pred.isMatched())             // probably at front
                return FRONT_SPINS;
            if (pred.waiter == null)          // pred apparently spinning
                return CHAINED_SPINS;
        }
        return 0;   // 单核CPU时不自旋
    }
    

    4.4 其它方法说明

    4.4.1 数据节点个数 size

    // 统计数据节点个数
    public int size() {
        return countOfMode(true);
    }
    // 统计请求节点个数
    public int getWaitingConsumerCount() {
        return countOfMode(false);
    }
    
    private int countOfMode(boolean data) {
        int count = 0;
        for (Node p = head; p != null; ) {
            if (!p.isMatched()) {       // p未匹配且属性指定的data模式则 ++count
                if (p.isData != data)
                    return 0;
                if (++count == Integer.MAX_VALUE) // saturated
                    break;
            }
            Node n = p.next;            
            if (n != p)     // 下一个节点
                p = n;
            else {          // p节点失效则计数器归0,重新从 head 遍历
                count = 0;
                p = head;
            }
        }
        return count;
    }
    

    4.4.2 包含 contains

    // 查找是否包含指定的数据节点 item=o
    public boolean contains(Object o) {
        if (o == null) return false;
        for (Node p = head; p != null; p = succ(p)) {
            Object item = p.item;
            if (p.isData) {
                if (item != null && item != p && o.equals(item))
                    return true;
            } else if (item == null)    // 有请求节点了,不用再匹配
                break;
        }
        return false;
    }
    // 后继节点,如果节点失效,直接从 head 开始
    final Node succ(Node p) {
        Node next = p.next;
        return (p == next) ? head : next;
    }
    

    4.4.3 是否有请求节点 hasWaitingConsumer

    // 是否有请求节点
    public boolean hasWaitingConsumer() {
        return firstOfMode(false) != null;
    }
    
    // 查找第一个 isData 模式的未匹配节点
    private Node firstOfMode(boolean isData) {
        for (Node p = head; p != null; p = succ(p)) {
            if (!p.isMatched())
                return (p.isData == isData) ? p : null;
        }
        return null;
    }
    

    参考:

    1. 《JUC源码分析-集合篇(六):LinkedTransferQueue》:https://www.jianshu.com/p/42ceaed2afe6

    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    vmware-tools安装
    UBUNTU 安装教程
    CANO入门(三)
    CANOE入门(二)
    CANOE入门(一)
    ubuntu 常见错误--Could not get lock /var/lib/dpkg/lock
    vmware-tools安装
    root权限
    ARM嵌入式开发中的GCC内联汇编__asm__
    OpenCV3.1.0+VS2015开发环境配置
  • 原文地址:https://www.cnblogs.com/binarylei/p/10940441.html
Copyright © 2011-2022 走看看