zoukankan      html  css  js  c++  java
  • Exchanger 源码分析

    Exchanger

    此类提供对外的操作是同步的;
    用于成对出现的线程之间交换数据【主场景】;
    可以视作双向的同步队列;
    可应用于基因算法、流水线设计、数据校对等场景
    

    创建实例

        /**
         * arena 数组中两个已使用的 slot 之间的索引距离,将它们分开以避免错误的共享
         */
        private static final int ASHIFT = 5;
    
        /**
         * arena 数组的最大索引值,最大 size 值为 0xff+1
         */
        private static final int MMASK = 0xff;
    
        /**
         * Unit for sequence/version bits of bound field. Each successful
         * change to the bound also adds SEQ.
         */
        private static final int SEQ = MMASK + 1;
    
        /** JVM 的 CPU 核数,用于自旋和扩容控制 */
        private static final int NCPU = Runtime.getRuntime().availableProcessors();
    
        /**
         *  arena 的最大索引值:原则上可以让所有线程不发生竞争
         */
        static final int FULL = NCPU >= MMASK << 1 ? MMASK : NCPU >>> 1;
    
        /**
         * 当前线程阻塞等待匹配节点前的自旋次数,CPU==1 时不进行自旋
         */
        private static final int SPINS = 1 << 10;
    
        /**
         * Value representing null arguments/returns from public methods.
         * 旧 API 不支持 null 值所以需要适配。
         */
        private static final Object NULL_ITEM = new Object();
    
        /**
         *  交换超时的返回值对象
         */
        private static final Object TIMED_OUT = new Object();
    
        @jdk.internal.vm.annotation.Contended static final class Node {
            // Arena index
            int index;          
            // Last recorded value of Exchanger.bound
            int bound;              
            // Number of CAS failures at current bound
            int collides;  
            // 自旋的伪随机数
            int hash;               
            // 线程的当前数据对象
            Object item;           
            // 匹配线程的数据对象
            volatile Object match; 
            // 驻留阻塞线程
            volatile Thread parked; 
        }
    
        /** 参与者 */
        static final class Participant extends ThreadLocal<Node> {
            @Override
            public Node initialValue() { return new Node(); }
        }
    
        /**
         *  每个线程的状态
         */
        private final Participant participant;
    
        /**
         *  消除数组,只在出现竞争时初始化。
         */
        private volatile Node[] arena;
    
        /**
         *  未发生竞争时使用的 slot
         */
        private volatile Node slot;
    
        /**
         * The index of the largest valid arena position, OR'ed with SEQ
         * number in high bits, incremented on each update.  The initial
         * update from 0 to SEQ is used to ensure that the arena array is
         * constructed only once.
         */
        private volatile int bound;
    
        /**
         * Creates a new Exchanger.
         */
        public Exchanger() {
            participant = new Participant();
        }
    

    线程间交换数据

        /**
         *  阻塞等待其他线程到达交换点后执行数据交换,支持中断
         */
        @SuppressWarnings("unchecked")
        public V exchange(V x) throws InterruptedException {
            Object v;
            // 将目标对象 v 进行编码
            final Object item = x == null ? NULL_ITEM : x; // translate null args
            /**
             * 1)arena==null,表示未出现线程竞争,则使用 slot 进行数据交换
             * 2)线程已经中断,则抛出 InterruptedException
             * 3)arena!=null,则使用 arena 中的 slot 进行数据交换
             */
            if ((arena != null ||
                    (v = slotExchange(item, false, 0L)) == null) &&
                    (Thread.interrupted() || // disambiguates null return
                            (v = arenaExchange(item, false, 0L)) == null)) {
                throw new InterruptedException();
            }
            // 解码目标对象
            return v == NULL_ITEM ? null : (V)v;
        }
    
        /**
         *  未出现竞争时的数据交换方式
         * @param item  需要交换的目标对象
         * @param timed 是否是超时模式
         * @param ns    超时的纳秒数
         * @return 
         *  1)目标线程的数据对象
         *  2)null slot 交换出现竞争、线程被中断
         *  3)TIMED_OUT 交换超时
         */
        private final Object slotExchange(Object item, boolean timed, long ns) {
            // 读取参与者节点
            final Node p = participant.get();
            // 读取当前线程
            final Thread t = Thread.currentThread();
            // 线程被设置了中断标识,则返回 null
            if (t.isInterrupted()) {
                return null;
            }
    
            for (Node q;;) {
                // 1)已经有线程在阻塞等待交换数据
                if ((q = slot) != null) {
                    // 将 slot 置为 null
                    if (SLOT.compareAndSet(this, q, null)) {
                        // 读取目标对象
                        final Object v = q.item;
                        // 写入交换对象
                        q.match = item;
                        // 如果线程在阻塞等待
                        final Thread w = q.parked;
                        if (w != null) {
                            // 则唤醒交换线程
                            LockSupport.unpark(w);
                        }
                        // 返回交换到的对象
                        return v;
                    }
                    /**
                     * NCPU > 1 多核 CPU 才会启用竞技场 && 
                     * 设置最大有效的 arena 索引值
                     */
                    if (NCPU > 1 && bound == 0 &&
                            BOUND.compareAndSet(this, 0, SEQ)) {
                        // 创建竞技场
                        arena = new Node[FULL + 2 << ASHIFT];
                    }
                }
                // 2)启用了 arena
                else if (arena != null) {
                    return null; // caller must reroute to arenaExchange
                // 3)slot 为空 && 未启用 arena
                } else {
                    // 写入交换数据
                    p.item = item;
                    // 将 Node 写入 slot,成功则退出循环
                    if (SLOT.compareAndSet(this, null, p)) {
                        break;
                    }
                    // 出现竞争,则重试
                    p.item = null;
                }
            }
    
            // 等待释放
            int h = p.hash;
            // 计算截止时间
            final long end = timed ? System.nanoTime() + ns : 0L;
            // 计算自旋次数,多核 CPU 为 1024
            int spins = NCPU > 1 ? SPINS : 1;
            Object v;
            // 只要没有匹配的交换数据
            while ((v = p.match) == null) {
                // 1)自旋还未完成
                if (spins > 0) {
                    h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
                    if (h == 0) {
                        h = SPINS | (int)t.getId();
                    } else if (h < 0 && (--spins & (SPINS >>> 1) - 1) == 0) {
                        Thread.yield();
                    }
                }
                // 2)slot 已经更新
                else if (slot != p) {
                    spins = SPINS;
                /**
                 * 3)线程未中断 && 未启用竞技场 && 不是超时模式;
                 *  如果是超时模式,则计算剩余时间,当前还未超时  
                 */
                } else if (!t.isInterrupted() && arena == null &&
                        (!timed || (ns = end - System.nanoTime()) > 0L)) {
                    // 写入驻留线程
                    p.parked = t;
                    // 如果 slot 未更新,没有线程来进行数据交换
                    if (slot == p) {
                        // 1)阻塞等待
                        if (ns == 0L) {
                            LockSupport.park(this);
                        // 2)超时阻塞等待 
                        } else {
                            LockSupport.parkNanos(this, ns);
                        }
                    }
                    // 线程释放后,清空 parked
                    p.parked = null;
                }
                // 如果线程被中断或已经超时,则将 slot 清空
                else if (SLOT.compareAndSet(this, p, null)) {
                    // 如果是超时,则返回 TIMED_OUT;线程中断,则返回 null
                    v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                    break;
                }
            }
            // 清空 match
            MATCH.setRelease(p, null);
            // 清空 item
            p.item = null;
            p.hash = h;
            // 返回交换到的数据对象
            return v;
        }
    
        /**
         * Exchange function when arenas enabled. See above for explanation.
         *
         * @param item  需要交换的目标对象
         * @param timed 是否是超时模式
         * @param ns    超时的纳秒数
         * @return
         *  1)目标线程的数据对象
         *  2)null 线程被中断
         *  3)TIMED_OUT 交换超时
         */
        private final Object arenaExchange(Object item, boolean timed, long ns) {
            // 读取 arena
            final Node[] a = arena;
            // 读取数组长度
            final int alen = a.length;
            // 读取当前线程的参与者,初始值为 0
            final Node p = participant.get();
            for (int i = p.index;;) { // access slot at i
                int b, m, c;
                // 一般为 31
                int j = (i << ASHIFT) + (1 << ASHIFT) - 1;
                if (j < 0 || j >= alen) {
                    j = alen - 1;
                }
                // 读取指定 slot 的 Node
                final Node q = (Node) AA.getAcquire(a, j);
                // 1)目标 slot 已经有线程在等待交换数据,则尝试清空 slot
                if (q != null && AA.compareAndSet(a, j, q, null)) {
                    // 读取目标对象
                    final Object v = q.item; // release
                    // 写入交换对象
                    q.match = item;
                    final Thread w = q.parked;
                    if (w != null) {
                        // 唤醒驻留线程
                        LockSupport.unpark(w);
                    }
                    // 返回交换到的值
                    return v;
                // 2)目标索引 i 在有效索引范围内 && slot 为 null 
                } else if (i <= (m = (b = bound) & MMASK) && q == null) {
                    // 写入 item
                    p.item = item; // offer
                    // 写入节点
                    if (AA.compareAndSet(a, j, null, p)) {
                        // 计算截止时间
                        final long end = timed && m == 0 ? System.nanoTime() + ns : 0L;
                        // 读取当前线程
                        final Thread t = Thread.currentThread(); // wait
                        // 读取自旋次数 1024
                        for (int h = p.hash, spins = SPINS;;) {
                            // 读取匹配数据
                            final Object v = p.match;
                            // 1)已经有线程将交换数据写入
                            if (v != null) {
                                MATCH.setRelease(p, null);
                                p.item = null; // clear for next use
                                p.hash = h;
                                return v;
                            // 2)自旋还未结束 
                            } else if (spins > 0) {
                                h ^= h << 1;
                                h ^= h >>> 3;
                                h ^= h << 10; // xorshift
                                if (h == 0) {
                                    h = SPINS | (int) t.getId();
                                } else if (h < 0 && // approx 50% true
                                        (--spins & (SPINS >>> 1) - 1) == 0) {
                                    Thread.yield(); // two yields per wait
                                }
                            // 3)slot 已经更新  
                            } else if (AA.getAcquire(a, j) != p) {
                                spins = SPINS; // releaser hasn't set match yet
                            // 4) 线程未中断、未超时
                            } else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) {
                                // 写入驻留线程
                                p.parked = t; // minimize window
                                // 如果 slot 未更新,则线程被阻塞
                                if (AA.getAcquire(a, j) == p) {
                                    if (ns == 0L) {
                                        LockSupport.park(this);
                                    } else {
                                        LockSupport.parkNanos(this, ns);
                                    }
                                }
                                p.parked = null;
                            // 5)slot 未更新 && 线程超时或中断,则清空 slot   
                            } else if (AA.getAcquire(a, j) == p && AA.compareAndSet(a, j, p, null)) {
                                if (m != 0) {
                                    BOUND.compareAndSet(this, b, b + SEQ - 1);
                                }
                                p.item = null;
                                p.hash = h;
                                i = p.index >>>= 1; // descend
                                // 线程被中断
                                if (Thread.interrupted()) {
                                    return null;
                                }
                                // 线程超时
                                if (timed && m == 0 && ns <= 0L) {
                                    return TIMED_OUT;
                                }
                                break; // expired; restart
                            }
                        }
                    // 2)写入 slot 出现竞争   
                    } else {
                        p.item = null; // clear offer
                    }
                } else {
                    if (p.bound != b) { // stale; reset
                        p.bound = b;
                        p.collides = 0;
                        i = i != m || m == 0 ? m : m - 1;
                    } else if ((c = p.collides) < m || m == FULL || !BOUND.compareAndSet(this, b, b + SEQ + 1)) {
                        p.collides = c + 1;
                        i = i == 0 ? m : i - 1; // cyclically traverse
                    } else {
                        i = m + 1; // grow
                    }
                    p.index = i;
                }
            }
        }
    
  • 相关阅读:
    Educational Codeforces Round 85 D. Minimum Euler Cycle(模拟/数学/图)
    Educational Codeforces Round 85 C. Circle of Monsters(贪心)
    NOIP 2017 提高组 DAY1 T1小凯的疑惑(二元一次不定方程)
    Educational Codeforces Round 85 B. Middle Class(排序/贪心/水题)
    Educational Codeforces Round 85 A. Level Statistics(水题)
    IOS中的三大事件
    用Quartz 2D画小黄人
    strong、weak、copy、assign 在命名属性时候怎么用
    用代码生成UINavigationController 与UITabBarController相结合的简单QQ框架(部分)
    Attempting to badge the application icon but haven't received permission from the user to badge the application错误解决办法
  • 原文地址:https://www.cnblogs.com/zhuxudong/p/10124099.html
Copyright © 2011-2022 走看看