zoukankan      html  css  js  c++  java
  • Exchanger 相关整理

    1. 简介

    • Exchanger(交换者)是自 JDK 1.5 起开始提供的工具套件,源于 java.util.concurrent 包。
      • 是一个用于线程间协作的工具类。
      • Exchanger 用于进行线程间的数据交换。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。
    • 此类提供对外的操作是同步的。
    • 用于 成对 出现的线程之间交换数据。
    • 可以视作双向的同步队列。
    • 可应用于基因算法、流水线设计等场景。

    2. Exchanger 的原理

    • Exchanger 用于进行线程间的数据交换。
      • 它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。
      • 两个线程通过 exchange() 方法交换数据, 如果第一个线程先执行 exchange() 方法,会一直等待第二个线程也执行 exchange(),当两个线程都到达同步点时,两个线程交换数据,将本线程生产出来的数据传递给对方。
      • 使用 Exchanger 的重点是成对的线程使用 exchange() 方法。
    • 这个类提供一个无参构造函数,两个重载的范型 exchange() 方法。
    public V exchange(V x) throws InterruptedException
    public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
    • 在 Exchanger 中,如果一个线程已经到达了 exchanger() 时,对于其伙伴结点的情况分为三种。
      • 如果伙伴结点在该线程到达之前已经调用了 exchanger() 方法,则唤醒该伙伴结点然后进行数据交换,得到各自数据返回。
      • 如果伙伴结点还没有到达交换点,则该线程被挂起,等待伙伴结点到达后被唤醒,完成数据交换。
      • 如果当前线程被中断了则抛出异常,或者等待超时,则抛出超时异常。
    • Exchanger 有单槽位和多槽位之分,单个槽位在同一时刻只能用于两个线程交换数据,这样在竞争比较激烈的时候,会影响到性能,多个槽位就是多个线程可以同时进行两个的数据交换,彼此之间不受影响,这样可以很好的提高吞吐量。

    数据结构

    @sun.misc.Contended 
    static final class Node {
        int index;              // arena的下标,多个槽位的时候利用
        int bound;              // 上一次记录的Exchanger.bound;
        int collides;           // 在当前bound下CAS失败的次数;
        int hash;               // 用于自旋;
        Object item;            // 这个线程的当前项,也就是需要交换的数据;
        volatile Object match;  // 交换的数据
        volatile Thread parked; // 线程
    }
    /**
     * Value representing null arguments/returns from public
     * methods. Needed because the API originally didn't disallow null
     * arguments, which it should have.
     * 如果交换的数据为 null,则用NULL_ITEM  代替
     */
    private static final Object NULL_ITEM = new Object();
    • Node 定义中,index,bound,collides 用于多槽位。
    • item 是当前线程需要交换的数据。
    • match 是和其它线程交换后的数据,初始为 null。
    • parked 是记录线程,用于阻塞和唤醒线程。

    2.1 单槽 Exchanger

    • Node 是每个线程自身用于数据交换的结点,每个 Node 就代表了每个线程,为了保证线程安全,把线程的 Node 结点放在 ThreadLocal
    • slot 为单槽。
    /** The number of CPUs, for sizing and spin control */
    private static final int NCPU = Runtime.getRuntime().availableProcessors();
    /**
     * The bound for spins while waiting for a match. The actual
     * number of iterations will on average be about twice this value
     * due to randomization. Note: Spinning is disabled when NCPU==1.
     */
    private static final int SPINS = 1 << 10; // 自旋次数
    /**
     * Slot used until contention detected.
     */
    private volatile Node slot; // 用于交换数据的槽位
    /**
     * Per-thread state  每个线程的数据,ThreadLocal 子类
     */
    private final Participant participant;
    
    /** The corresponding thread local class */
     static final class Participant extends ThreadLocal<Node> {
         // 初始值返回Node
         public Node initialValue() { return new Node(); }
     }

    exchange 方法

    • 等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。

    没有设定超时时间的 exchange 方法

    public V exchange(V x) throws InterruptedException {
            Object v;
            Object item = (x == null) ? NULL_ITEM : x; // 转换成空对象
            // arena == null, 路由到slotExchange(单槽交换), 如果arena != null或者单槽交换失败,且线程没有被中断,则路由到arenaExchange(多槽交换),返回null,则抛出中断异常
            if ((arena != null || (v = slotExchange(item, false, 0L)) == null)
                    && ((Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null)))
                throw new InterruptedException();
            return (v == NULL_ITEM) ? null : (V) v;
    }
    • arena 为多槽位,如果为 null,则执行 slotExchange() 单槽方法,否则判断线程是否中断,如果中断值抛出 InterruptedException 异常,没有中断则执行 arenaExchange() 多槽方法,如果该方法返回 null,抛出中断异常,最后返回结果。

    具有超时功能的 exchange 方法

    public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
            Object v;
            Object item = (x == null) ? NULL_ITEM : x;// 转换成空对象
            long ns = unit.toNanos(timeout);
            // arena == null, 路由到slotExchange(单槽交换), 如果arena != null或者单槽交换失败,且线程没有被中断,则路由到arenaExchange(多槽交换),返回null,则抛出中断异常
            if ((arena != null || (v = slotExchange(item, true, ns)) == null)
                    && ((Thread.interrupted() || (v = arenaExchange(item, true, ns)) == null)))
                throw new InterruptedException();
            if (v == TIMED_OUT)// 超时
                throw new TimeoutException();
            return (v == NULL_ITEM) ? null : (V) v;
    }
    • 增加超时的判断。

    slotExchange 方法

    private final Object slotExchange(Object item, boolean timed, long ns) {
            Node p = participant.get(); // 获取当前线程携带的Node
            Thread t = Thread.currentThread(); // 当前线程
            if (t.isInterrupted()) // 保留中断状态,以便调用者可以重新检查,Thread.interrupted() 会清除中断状态标记
                return null;
            for (Node q;;) {
                if ((q = slot) != null) { // slot不为null, 说明已经有线程在这里等待了
                    if (U.compareAndSwapObject(this, SLOT, q, null)) { // 将slot重新设置为null, CAS操作
                        Object v = q.item; // 取出等待线程携带的数据
                        q.match = item; // 将当前线程的携带的数据交给等待线程
                        Thread w = q.parked; // 可能存在的等待线程(可能中断,不等了)
                        if (w != null)
                            U.unpark(w); // 唤醒等待线程
                        return v; // 返回结果,交易成功
                    }
                    // CPU的个数多于1个,并且bound为0时创建 arena,并将bound设置为SEQ大小
                    if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ))
                        arena = new Node[(FULL + 2) << ASHIFT]; // 根据CPU的个数估计Node的数量
                } else if (arena != null)
                    return null; // 如果slot为null, 但arena不为null, 则转而路由到arenaExchange方法
                else { // 最后一种情况,说明当前线程先到,则占用此slot
                    p.item = item; // 将携带的数据卸下,等待别的线程来交易
                    if (U.compareAndSwapObject(this, SLOT, null, p)) // 将slot的设为当前线程携带的Node
                        break; // 成功则跳出循环
                    p.item = null; // 失败,将数据清除,继续循环
                }
            }
            // 当前线程等待被释放, spin -> yield -> block/cancel
            int h = p.hash; // 伪随机,用于自旋
            long end = timed ? System.nanoTime() + ns : 0L; // 如果timed为true,等待超时的时间点; 0表示没有设置超时
            int spins = (NCPU > 1) ? SPINS : 1; // 自旋次数
            Object v;
            while ((v = p.match) == null) { // 一直循环,直到有线程来交易
                if (spins > 0) { // 自旋,直至spins不大于0
                    h ^= h << 1; // 伪随机算法, 目的是等h小于0(随机的)
                    h ^= h >>> 3;
                    h ^= h << 10;
                    if (h == 0) // 初始值
                        h = SPINS | (int) t.getId();
                    else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                        Thread.yield(); // 等到h < 0, 而spins的低9位也为0(防止spins过大,CPU空转过久),让出CPU时间片,每一次等待有两次让出CPU的时机(SPINS >>> 1)
                } else if (slot != p) // 别的线程已经到来,正在准备数据,自旋等待一会儿,马上就好
                    spins = SPINS;
                // 如果线程没被中断,且arena还没被创建,并且没有超时
                else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) {
                    U.putObject(t, BLOCKER, this); // 设置当前线程将阻塞在当前对象上
                    p.parked = t; // 挂在此结点上的阻塞着的线程
                    if (slot == p)
                        U.park(false, ns); // 阻塞, 等着被唤醒或中断
                    p.parked = null; // 醒来后,解除与结点的联系
                    U.putObject(t, BLOCKER, null); // 解除阻塞对象
                } else if (U.compareAndSwapObject(this, SLOT, p, null)) { // 超时或其它(取消),给其它线程腾出slot
                    v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                    break;
                }
            }
            // 归位
            U.putOrderedObject(p, MATCH, null);
            p.item = null;
            p.hash = h;
            return v;
    }
    • 执行流程。
      1. 检查 slot 是否为空(null),不为空,说明已经有线程在此等待,尝试占领该槽位,如果占领成功,与等待线程交换数据,并唤醒等待线程,交易结束,返回。
      2. 如果占领槽位失败,创建 arena,继续步骤 1 尝试抢占 slot,直至 slot 为空,或者抢占成功,交易结束返回。
      3. 如果 slot 为空,则判断 arena 是否为空,如果 arena 不为空,返回 null,重新路由到 arenaExchange 方法。
      4. 如果 arena 为空,说明当前线程是先到达的,尝试占有 slot,如果成功,将 slot 标记为自己占用,跳出循环,继续步骤 5,如果失败,则继续步骤 1。
      5. 当前线程等待被释放,等待的顺序是先自旋(spin),不成功则让出 CPU 时间片(yield),最后还不行就阻塞(block),spin -> yield -> block。
      6. 如果超时(设置超时的话)或被中断,则退出循环。
      7. 最后,重置数据,下次重用,返回结果,结束。
    slotExchange 流程图

    2.2 多槽 Exchanger

    • 一个 Node 数组 arena,代表了很多的槽位
    private static final int ASHIFT = 7; // 两个有效槽(slot -> Node)之间的字节地址长度(内存地址,以字节为单位),1 << 7至少为缓存行的大小,防止伪共享 
    private static final int MMASK = 0xff; // 场地(一排槽,arena -> Node[])的可支持的最大索引,可分配的大小为 MMASK + 1
    private static final int SEQ = MMASK + 1; // bound的递增单元,确立其唯一性
    private static final int NCPU = Runtime.getRuntime().availableProcessors(); // CPU的个数,用于场地大小和自旋控制
    static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; // 最大的arena索引
    private static final int SPINS = 1 << 10; // 自旋次数,NCPU = 1时,禁用
    private static final Object NULL_ITEM = new Object();// 空对象,对应null
    private static final Object TIMED_OUT = new Object();// 超时对象,对应timeout
    // 多个线程交换/多槽位
    private volatile Node[] arena;

    arenaExchange 方法

    private final Object arenaExchange(Object item, boolean timed, long ns) {
        Node[] a = arena; // 交换场地,一排slot
        Node p = participant.get(); // 获取当前线程携带的Node   p.index 初始值为 0
        for (int i = p.index;;) { // arena的索引,数组下标
            int b, m, c;
            long j; // 原数组偏移量,包括填充值
            // 从场地中选出偏移地址为(i << ASHIFT) + ABASE的内存值,也即真正可用的Node
            //如果i为0,j相当于是 "第一个"槽位
            Node q = (Node) U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
            if (q != null && U.compareAndSwapObject(a, j, q, null)) { // 此槽位不为null, 说明已经有线程在这里等了,重新将其设置为null, CAS操作
                Object v = q.item; // 取出等待线程携带的数据
                q.match = item; // 将当前线程携带的数据交给等待线程
                Thread w = q.parked; // 可能存在的等待线程
                if (w != null)
                    U.unpark(w); // 唤醒等待线程
                return v; // 返回结果, 交易成功
            } else if (i <= (m = (b = bound) & MMASK) && q == null) { // 有效交换位置,且槽位为空
                p.item = item; // 将携带的数据卸下,等待别的线程来交易
                if (U.compareAndSwapObject(a, j, null, p)) { // 槽位占领成功
                    long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; // 计算出超时结束时间点
                    Thread t = Thread.currentThread(); // 当前线程
                    for (int h = p.hash, spins = SPINS;;) { // 一直循环,直到有别的线程来交易,或超时,或中断
                        Object v = p.match; // 检查是否有别的线程来交换数据
                        if (v != null) { // 有则返回
                            U.putOrderedObject(p, MATCH, null); // match重置,等着下次使用
                            p.item = null; // 清空,下次接着使用
                            p.hash = h;
                            return v; // 返回结果,交易结束
                        } else if (spins > 0) { // 自旋
                            h ^= h << 1;
                            h ^= h >>> 3;
                            h ^= h << 10; // 移位加异或,伪随机
                            if (h == 0) // 初始值
                                h = SPINS | (int) t.getId();
                            else if (h < 0 && // SPINS >>> 1, 一半的概率
                                    (--spins & ((SPINS >>> 1) - 1)) == 0)
                                Thread.yield(); // 每一次等待有两次让出CPU的时机
                        } else if (U.getObjectVolatile(a, j) != p)
                            spins = SPINS; // 别的线程已经到来,正在准备数据,自旋等待一会儿,马上就好
                        else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) {
                            U.putObject(t, BLOCKER, this); // 设置当前线程将阻塞在当前对象上
                            p.parked = t; // 挂在此结点上的阻塞着的线程
                            if (U.getObjectVolatile(a, j) == p)
                                U.park(false, ns); // 阻塞, 等着被唤醒或中断
                            p.parked = null; // 醒来后,解除与结点的联系
                            U.putObject(t, BLOCKER, null); // 解除阻塞对象
                        } else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) {
                            if (m != 0) // 尝试缩减
                                U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); // 更新bound, 高位递增,低位 -1
                            p.item = null; // 重置
                            p.hash = h;
                            i = p.index >>>= 1; // 索引减半,为的是快速找到汇合点(最左侧)
                            if (Thread.interrupted())// 保留中断状态,以便调用者可以重新检查,Thread.interrupted() 会清除中断状态标记
                                return null;
                            if (timed && m == 0 && ns <= 0L) // 超时
                                return TIMED_OUT;
                            break; // 重新开始
                        }
                    }
                } else
                    p.item = null; // 重置
            } else {
                if (p.bound != b) { // 别的线程更改了bound,重置collides为0, i的情况如下:当i != m, 或者m = 0时,i = m; 否则,i = m-1; 从右往左遍历
                    p.bound = b;
                    p.collides = 0;
                    i = (i != m || m == 0) ? m : m - 1; // index 左移
                } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { // 更新bound, 高位递增,低位 +1
                    p.collides = c + 1;
                    i = (i == 0) ? m : i - 1; // 左移,遍历槽位,m == FULL时,i == 0(最左侧),重置i = m, 重新从右往左循环遍历
                } else
                    i = m + 1; // 槽位增长
                p.index = i;
            }
        }
    }
    • 执行流程。
      1. 从场地中选出偏移地址为(i << ASHIFT)+ ABASE 的内存值,也即第 i 个真正可用的 Node,判断其槽位是否为空,为空,进入步骤 2。
        • 不为空,说明有线程在此等待,尝试抢占该槽位,抢占成功,交换数据,并唤醒等待线程,返回,结束。
        • 没有抢占成功,进入步骤 9。
      2. 检查索引是否越界,越界,进入步骤 9。没有越界,进入步骤 3。
      3. 尝试占有该槽位,抢占失败,进入步骤 1。抢占成功,进入步骤 4。
      4. 检查 match,是否有线程来交换数据,如果有,交换数据,结束。如果没有,进入步骤 5。
      5. 检查 spin 是否大于 0,如果不大于 0,进入步骤 6。
        • 如果大于 0,检查 hash 是否小于 0,并且 spin 减半或为 0,如果不是,进入步骤 4。
        • 如果是,让出 CPU 时间,过一会儿,进入步骤 4。
      6. 检查是否中断,m 达到最小值,是否超时,如果没有中断,没有超时,并且 m 达到最小值,阻塞,过一会儿进入步骤 4。否则,进入步骤 7。
      7. 没有线程交换数据,尝试丢弃原有的槽位重新开始,丢弃失败,进入步骤 4。否则,进入步骤 8。
      8. bound 减 1(m>0),索引减半。
        • 检查是否中断或超时,如果没有,进入步骤 1。
        • 否则,返回,结束。
      9. 检查 bound 是否发生变化,如果变化,重置 collides,索引重置为 m 或左移,转向步骤 1。否则,进入步骤 10。
      10. 检查 collides 是否达到最大值,如果没有,进入步骤13。否则,进入步骤 11。
      11. m 是否达到 FULL,是,进入步骤13。否则,进入步骤 12。
      12. CAS bound 加 1 是否成功,如果成功,i 置为 m+1,槽位增长,进入步骤 1。否则,进入步骤 13。
      13. collides 加 1,索引左移,进入步骤 1。
    static final class Participant extends ThreadLocal<Node> {
        public Node initialValue() { return new Node(); }
    }
    
    • 通过 participant 取得当前结点 Node,然后根据当前结点 Node 的 index 去取 arena 中相对应的结点。

    伪随机

    h ^= h << 1; 
    h ^= h >>> 3; 
    h ^= h << 10;
    
    • xorshift 算法。
      • T = (I + La)(I + Rb)(I + Lc)。
        • L 代表左移。
        • R 代表右移。
        • a,b,c 分别为代码中的 1,3,10。
        • I 代表矩阵 {0, 1} 共 32 位,即是 int 类型的二进制。
        • T 代表的是随机算法。
    • 伪随机通过 xorshift 算法模拟随机,为了达到更好的随机效果,周期自然是越大越好。
      • 周期 指的是,当给定一个输入,得到的输出再作为下一次的输入,如此反复,直到某次输出恰巧等于最初的输入,这便是随机算法的一个周期。
      • int 类型的最大周期应该是遍历该类型所有的值(0 除外(奇异矩阵),如果是 0 ,输出便一直是 0,不能随机),即 max(2^31-1) - min(-2^31) = 2^32 - 1 = 4294967295

    为什么选用 1,3,10

    • 当 a,b,c 分别为 1,3,10 时,周期刚好是 2^32 - 1 = 4294967295
    • 以下几种组合也是可以的。
    [4294967295] (1, 3, 10) (2, 7, 7) (2, 7, 9) (5, 9, 7) (7, 1, 9) (7, 7, 2) (7, 9, 5)
    

    为什么要有两次左移和一次右移

    • 虽然只一次左移+异或就能达到随机的效果。
    • 但是第一次左移(I + La)可以让高位多 1,右移(I + Rb)可以让低位多 1,高位低位都参与计算,可以增加随机性,第二次左移(I + Lc),再进行真正的随机计算。

    自旋等待

    private static final int SPINS = 1 << 10;
    else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) // h < 0,一半的概率
           Thread.yield(); // 每一次等待有两次让出CPU的时机
    
    • 等待其它线程交换数据时,会进行自旋等待,自旋的过程中,当前线程会有 2 次让出 CPU 的时机。
      • SPINS 为 1024, ((1024 >>>1) -1) = 511 = 0111111111,spins 默认为 1024 循环递减。
      • 当 spins 的最高位为 0 或 1 并且其它位为 0 时(0 或 512)进行 (&) 计算的结果为 0。

    arena 的创建

    static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
    private static final int ASHIFT = 7;
    
    private static final int NCPU = Runtime.getRuntime().availableProcessors();
    private static final int MMASK = 0xff;      // 255
    ......
    if (NCPU > 1 && bound == 0 &&U.compareAndSwapInt(this, BOUND, 0, SEQ))
          arena = new Node[(FULL + 2) << ASHIFT];
    
    • slotExchange() 方法中存在竞争时,会构建 arena。
      • 初始化 arena 时会设置 bound 为 SEQ(SEQ=MMASK + 1),255 + 1 = 256。
      • NCPU 为到 Java 虚拟机可用的处理器数量。Runtime.getRuntime().availableProcessors()
      • 假设 NCPU 为 2,则 arena 数组大小为 384(2 >>> 1 然后 (1+2) << 7)。
    private static final sun.misc.Unsafe U;
    private static final int ABASE;
    U = sun.misc.Unsafe.getUnsafe();
    Class<?> ak = Node[].class;
    s = U.arrayIndexScale(ak);
    ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);
    

    FULL 和 ASHIFT 的定义

    • arena 数组很大,但里面并不是每个位置都被使用了,还有一些是没有使用的。
      • 通过 UnsafearrayBaseOffset(ak) 方法可以返回 arena 数组中第一个元素的偏移地址。
      • 通过 arrayIndexScale(ak) 方法可以返回 arena 数组中每一个元素占用的大小,也就是元素与元素之间的间隔,即 1 << ASHIFT 为 128。
        • ABASE = arrayBaseOffset + (1 << ASHIFT) 是 arena 的起始位置加上 128 位这个偏移量。
        • arena 实际使用了 ABASE 做为起始位置,那么其前 128 位的位置都是没有使用的。
        • 那么要访问 arena 的第 N 个元素(结点),偏移量 offset 为 arrayBaseOffset + N * arrayIndexScale
      • @sun.misc.Contended 注解 和 1 << ASHIFT 主要是用于避免 伪共享1 << ASHIFT 可以避免两个 Node 在同一个共享区(缓存行)。
        • 主流缓存行大小一般为 32 字节到 256 字节,128 个地址位基本覆盖到了常见的处理器平台。
        • arena 数组中元素(结点)的分布间隔为 128 个整数倍地址位,也就是说最小相差 128 个地址位。
     
    arena 数组结构
    Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
     if (q != null && U.compareAndSwapObject(a, j, q, null)) {
            Object v = q.item;   // 获取槽位中结点 q 的数据
            q.match = item;      // 把当前线程的数据交换给它
            Thread w = q.parked; // 获得槽位中结点 q 对应的线程对象
            if (w != null)     
                U.unpark(w);     //唤醒该线程
            return v;
    }
    

    bound 和 collides

    • bound 是上一次记录的 Exchanger.bound。
      • bound 会记录 最大有效 的 arena 索引,是动态变化的,竞争激烈时(槽位全满)增加, 槽位空旷时减小。
      • bound + SEQ 确立其唯一性(版本),低 8 位记录 有效索引
    • collides 是在当前 bound 下 CAS 失败的次数。
      • 最大为 m,m(bound & MMASK)为当前 bound 下最大有效索引。
      • 槽位最大值为 MMASK(255),bound 最大值也就是 255,m 和 i 的范围为 [0,255]。
      • 从右往左遍历,等到 collides == m 时,有效索引的槽位已经遍历完,这时需要增长槽位。
      • 增长的方式是重置 bound(依赖 SEQ 更新其版本,低位 + 1),同时 collides 重置。
    private static final int MMASK = 0xff;
    private static final int SEQ = MMASK + 1;
    ......
    // MASK: 00000000000000000000000011111111
    //  SEQ: 00000000000000000000000100000000(MASK + 1)
    //    1: 00000000000000000000000000000001
    if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ))
    // 当 bound 为 0 时,bound 被更新为 SEQ
    
    //第一次更新
    //b0: 00000000000000000000000100000000
    U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)
    //SEQ+1: 00000000000000000000000100000001
    //b0+SEQ+1=b1: 00000000000000000000000200000001
    
    //第二次更新
    //b1+SEQ: 00000000000000000000000300000001
    //第二次是 -1 的情况
    U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1)
    //b1+SEQ-1=b2: 00000000000000000000000300000000
    
    • bound + SEQ 是版本递增的过程,b + SEQ + 1 后再 b + SEQ - 1,实际经历了两个版本,并且会将 collides 重置。
    • 下图中去除了实际存在的未使用位置,只保留了数组中被使用的位置。
      • 其中被使用的位置数量最大值为 MMASK(255),FULL <= MMASK。
      • 当前线程进入 " 第一个 " 槽位,发现有其它线程在交换数据,则增加 1 个槽位并且 bound 递增,此时最大有效索引为 1。
        • m 等于 1,i 范围为 [0,1],p.index 等于 1。
      • 当前线程进入后续槽位(包含之前增加的槽位),如果发现同样有其它线程在交换数据,则继续增加槽位,bound 递增。
      • 当前线程进入后续槽位(包含之前增加的槽位),没有元素(结点),则尝试占据该槽位,占据成功则等待其它线程。
        • 当等待超时则删除该槽位,再次从头开始遍历有效索引,寻找其它线程交换数据。
     
    bound 操作
    • bound 版本唯一性的作用主要用于更新索引,将有效索引更新到最右侧位置,使得可以再次从右向左(从头)遍历。
      • 如果没有 bound 的版本唯一性,便没有索引更新,就会一直往左遍历竞争激烈的槽位。
      • 如果没有 bound 的版本唯一性,还会使得 bound 只增不减,影响效率。

    3. 总结

    • 当前线程 A 和其它线程 B(一个或多个)在槽中交换数据。
      1. 单槽方法(slotExchange)执行,A 发现 B 已经在槽中,则尝试交换数据,如果成功,则进入第 2 步骤。如果失败则说明有其它线程已经在和 B 进行数据交换,则进入第 5 步骤。
      2. 交换数据成功,则交换结束。也可能超时或者中断,造成交换失败,只能从头开始。
      3. 到达槽位,未发现其它线程,则尝试占位,抢占成功,则自旋等待其它线程交换数据,进入第 4 步骤。抢占失败,则说明被其它线程抢占了槽位,则进入第 5 步骤。
      4. 其它线程来交换数据,成功则交换结束。如果等待超时则寻找其它线程进行交换,先删除一个槽位,再从头开始寻找其它线程交换数据。也有可能会被中断。
      5. 转为多槽方法(arenaExchange)执行,挨个寻找槽中是否有可交换数据的对象,如果发现交换对象且尝试交换数据成功,则进入第 2 步骤。如果为空槽,则占据并等待其它线程来交换数据,进入第 4 步骤。
      6. 尝试多次交换都未成功,则增加槽位,然后再从头开始。

    参考资料

    https://blog.csdn.net/carson0408/article/details/79477280
    https://blog.csdn.net/u014634338/article/details/78385521
    https://blog.csdn.net/chenssy/article/details/72550933
    https://www.cnblogs.com/d-homme/p/9387948.html
    https://www.cnblogs.com/aniao/p/aniao_exchanger.html

  • 相关阅读:
    使用某些 DOCTYPE 时会导致 document.body.scrollTop 失效
    VB.NET 笔记1
    知识管理系统Data Solution研发日记之一 场景设计与需求列出
    知识管理系统Data Solution研发日记之五 网页下载,转换,导入
    折腾了这么多年的.NET开发,也只学会了这么几招 软件开发不是生活的全部,但是好的生活全靠它了
    分享制作精良的知识管理系统 博客园博客备份程序 Site Rebuild
    知识管理系统Data Solution研发日记之四 片段式数据解决方案
    知识管理系统Data Solution研发日记之二 应用程序系列
    知识管理系统Data Solution研发日记之七 源代码与解决方案
    知识管理系统Data Solution研发日记之三 文档解决方案
  • 原文地址:https://www.cnblogs.com/youngao/p/12573935.html
Copyright © 2011-2022 走看看