zoukankan      html  css  js  c++  java
  • 014-多线程-基础-Exchanger-行线程间的数据交换

    一、简介

      Exchanger类允许在两个线程之间定义同步点,当两个线程都到达同步点时,它们交换数据。也就是第一个线程的数据进入到第二个线程中,第二线程的数据进入到第一个线程中。

      Exchanger可以用于校对工作的场景。

      Exchanger只有一个构造函数:

    public Exchanger() {
        participant = new Participant();
    }

      这个类提供对外的接口非常简洁,两个重载的范型exchange方法:

    // 除非当前线程被中断,否则一直等待另一个线程到达这个交换点,然后将交换的数据    x传输给它,并收到另一个线程传过来的数据。
    public V exchange(V x) throws InterruptedException
     
    // 和上一个方法功能基本一样,只不过这个方法增加了等待超时时间
    public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

    1.1、Exchanger源码详解

      Exchanger算法的核心是通过一个可交换数据的slot,以及一个可以带有数据item的参与者。源码中的描述如下:

    for (;;) {
        if (slot is empty) {                       // offer
            place item in a Node;
            if (can CAS slot from empty to node) {
            wait for release;
            return matching item in node;
            }
        }
        else if (can CAS slot from node to empty) { // release
            get the item in node;
            set matching item in node;
            release waiting thread;
        }
        // else retry on CAS failure
    }

      Exchanger中定义了如下几个重要的成员变量:

    /**
    * Per-thread state
    */
    private final Participant participant;
     
    /**
    * Elimination array; null until enabled (within slotExchange).
    * Element accesses use emulation of volatile gets and CAS.
    */
    private volatile Node[] arena;
     
    /**
    * Slot used until contention detected.
    */
    private volatile Node slot;

    participant的作用是为每个线程关联一个Node对象。Participant继承自ThreadLocal:

    /** The corresponding thread local class */
    static final class Participant extends ThreadLocal<Node> {
        public Node initialValue() { return new Node(); }
    }

    Node类定义如下:

    @sun.misc.Contended static final class Node {
        int index;              // Arena index
        int bound;              // Last recorded value of Exchanger.bound
        int collides;           // Number of CAS failures at current bound
        int hash;               // Pseudo-random for spins
        Object item;            // This thread's current item
        volatile Object match;  // Item provided by releasing thread
        volatile Thread parked; // Set to this thread when parked, else null
    }

    index:arena的下标,多个槽位的时候利用;
    bound:上一次记录的Exchanger.bound;
    collides:在当前bound下CAS失败的次数;
    hash:伪随机数,用于自旋;
    item:这个线程的当前项,也就是需要交换的数据;
    match:交换的数据;
    parked:挂起时设置线程值,其他情况下为null;

    看exchange(V x)方法。

    exchange(V x)方法

    如果一个线程先执行exchange方法,那么它会同步等待另一个线程也执行exchange方法,这个时候两个线程就都达到了同步点,两个线程就可以交换数据。该方法源码如下:

    public V exchange(V x) throws InterruptedException {
        Object v;
        Object item = (x == null) ? NULL_ITEM : x; // translate null args
        if ((arena != null ||
                (v = slotExchange(item, false, 0L)) == null) &&
            ((Thread.interrupted() || // disambiguates null return
                (v = arenaExchange(item, false, 0L)) == null)))
            throw new InterruptedException();
        return (v == NULL_ITEM) ? null : (V)v;
    }

    exchange(V x)方法主要步骤如下:

    1. 判断程序要交换的数据是否为空指针,若为空指针,则将NULL_ITEM设置为要交换的数据,NULL_ITEM是一个来代替空指针的对象,它定义为:

    private static final Object NULL_ITEM = new Object();

    2. 若arena为null,则通过slotExchange(Object item, boolean timed, long ns)方法来交换数据;否则,若arena不为null,则运行下一步骤。

    3. 判断程序中断状态,若程序没有被中断,则运行arenaExchange(Object item, boolean timed, long ns)方法来交换数据;否则,抛出InterruptedException异常。

    4. 返回交换后的数据,若数据为NULL_ITEM,则将其转换为空指针null。

    在整个过程中,最主要的就是那两个数据交换方法,我们先来看一看slotExchange(Object item, boolean timed, long ns)方法。

    slotExchange(Object item, boolean timed, long ns)方法
    该方法源码如下:

    private final Object slotExchange(Object item, boolean timed, long ns) {
        // 获取与线程相关联的Node对象
        Node p = participant.get();
        // 获取当前线程对象
        Thread t = Thread.currentThread();
        // 判断线程中断状态
        if (t.isInterrupted()) // preserve interrupt status so caller can recheck
            return null;
        
        // 进入自旋
        for (Node q;;) {
            // 如果slot不为null,表明已经有其他线程等待交换数据
            if ((q = slot) != null) {
                // 通过CAS交换数据信息,成功则返回交换数据
                if (U.compareAndSwapObject(this, SLOT, q, null)) {
                    // 获取其他线程交换的数据
                    Object v = q.item;
                    // 槽位内值被改为参数item,这是等待线程需要的数据
                    q.match = item;
                    // 获取等待线程
                    Thread w = q.parked;
                    // 等待线程不为null,则将其唤醒
                    if (w != null)
                        U.unpark(w);
                    // 返回拿到的数据
                    return v;
                }
                // CAS操作失败,则创建arena用于竞争
                if (NCPU > 1 && bound == 0 &&
                    U.compareAndSwapInt(this, BOUND, 0, SEQ))
                    arena = new Node[(FULL + 2) << ASHIFT];
            }
            // 如果arena不为null,方法返回null,随后进入arenaExchange方法
            else if (arena != null)
                return null; // caller must reroute to arenaExchange
            // 否则,q(slot)为空,通过CAS尝试将slot设置为p,失败之后自旋重试,成功则跳出自旋,进入spin+block模式
            else {
                p.item = item;
                // 将slot设置为占据该slot线程所对应的Node
                if (U.compareAndSwapObject(this, SLOT, null, p))
                    break;
                p.item = null;
            }
        }
     
        // 等待release
        // 若exchange操作有时间限制,则先计算结束时间和自旋次数,进入自旋+阻塞
        int h = p.hash;
        long end = timed ? System.nanoTime() + ns : 0L;
        int spins = (NCPU > 1) ? SPINS : 1;
        Object v;
        // 直到成功交换到数据
        while ((v = p.match) == null) {
            // 自旋
            if (spins > 0) {
                h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
                if (h == 0)
                    h = SPINS | (int)t.getId();
                else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                    Thread.yield();
            }
            // 其它线程来交换数据了,另一个线程修改了solt,但是还没有设置match数据,这时可以再稍等一会
            else if (slot != p)
                spins = SPINS;
            // 需要阻塞当前线程,等待其它线程来交换数据
            else if (!t.isInterrupted() && arena == null &&
                        (!timed || (ns = end - System.nanoTime()) > 0L)) {
                U.putObject(t, BLOCKER, this);
                // 设置Node节点中的parked属性为当前线程,当其他线程要交换数据时,需要通过parked属性来唤醒该线程
                p.parked = t;
                // 阻塞当前线程
                if (slot == p)
                    U.park(false, ns);
                // 当前线程被唤醒之后,做一些清除操作
                p.parked = null;
                U.putObject(t, BLOCKER, null);
            }
            // 交换失败,重置slot
            else if (U.compareAndSwapObject(this, SLOT, p, null)) {
                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                break;
            }
        }
        // 清除match信息
        U.putOrderedObject(p, MATCH, null);
        p.item = null;
        p.hash = h;
        // 返回交换得到的数据,若失败,则返回值为null
        return v;
    }

    slotExchange(Object item, boolean timed, long ns)方法的整个业务逻辑如下所示:

    当一个线程来交换数据时,槽位(slot)有两种状态:

    如果发现槽位(solt)没有数据,即slot为null,这时,当前线程就可以通过CAS操作占据slot,若CAS操作成功,则slot就已经被当前线程占据。如果失败,则有可能其他线程抢先了占据了slot,当前线程需要重头开始循环。占据slot成功的线程,需要等待其它线程来进行数据交换,此时,当前线程需要进行一段时间的自旋:

    若在线程自旋期间,有其他线程来交换数据,则获取交换数据后,直接返回数据,而不用阻塞该进程。

    若在线程自旋期间,没有其他线程来交换数据,那么就需要阻塞当前线程,在阻塞之前,还需要进行一次槽位判断,若槽位发生了变化,说明有其它线程来交换数据了,此时会延长当前线程的自旋时间,可能数据交换马上就完成;若槽位没有发生变化,则直接挂起当前线程,等待其他线程来交换数据,在另一个线程交换数据完成之后,另一个线程会唤醒与之配对交换的线程(即前面被挂起的线程),被唤醒的线程,继续执行,拿到交换的数据之后,直接返回,若出现了超时、被中断的情况,则返回值为null。

    如果发现槽位(solt)已有数据,即slot不为null,这表明已经有其它线程占据了槽位,正在等待交换数据,那么当前线程就可以尝试进行数据交换,首先要通过CAS操作设置slot变量值,若CAS成功,则表示当前线程可以进行数据交换,否则,若CAS失败,则表示有其他线程抢先交换了数据,那这时,多个线程产生了竞争,然后,当前线程就会创建arena数组来避免竞争,用于后续的数据交换。

    slotExchange(Object item, boolean timed, long ns)方法的整个执行步骤就是这样了,下面我们再看一看arenaExchange(Object item, boolean timed, long ns)方法的执行步骤。

    arenaExchange(Object item, boolean timed, long ns)方法
    在介绍该方法之前,我们需要先来了解一下与之相关的数据结构

    在前面介绍的Node类,被加上了一个@sun.misc.Contended注解,这个是用来避免伪共享的,关于伪共享的详解,可以看这篇博客。在Exchanger类中,ASHIFT就是用来避免伪共享的:

    /**
        * The byte distance (as a shift value) between any two used slots
        * in the arena.  1 << ASHIFT should be at least cacheline size.
        */
    private static final int ASHIFT = 7;

    对ASHIFT进行详细说明,下面看一看arenaExchange(Object item, boolean timed, long ns)方法:

    private final Object arenaExchange(Object item, boolean timed, long ns) {
        // Node数组,具有多个槽位
        Node[] a = arena;
        // 获取与线程相关联的Node对象
        Node p = participant.get();
        // p.index初始值为0
        for (int i = p.index;;) {                      // access slot at i
            int b, m, c; long j;                       // j is raw array offset
            // 在数组中,根据索引i取出数据,j相当于该线程要访问的第一个槽位
            Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
            // 该槽位有数据,即已经有线程在此槽位等待交换数据
            if (q != null && U.compareAndSwapObject(a, j, q, null)) {
                // 进行数据交换
                Object v = q.item;                     // release
                q.match = item;
                Thread w = q.parked;
                if (w != null)
                    U.unpark(w);
                return v;
            }
            // bound是最大的有效位置,和MMASK相与,得到真正存储数据的最大索引值
            // 如果i小于最大索引,且对应槽位为空
            else if (i <= (m = (b = bound) & MMASK) && q == null) {
                // 将需要交换的数据赋值给p
                p.item = item;                         // offer
                // 通过CAS来设置该槽位的数据,等待其他线程来交换
                if (U.compareAndSwapObject(a, j, null, p)) {
                    long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
                    Thread t = Thread.currentThread(); // wait
                    // 自旋
                    for (int h = p.hash, spins = SPINS;;) {
                        Object v = p.match;
                        // 有其他线程来和该线程交换数据
                        if (v != null) {
                            U.putOrderedObject(p, MATCH, null);
                            p.item = null;             // clear for next use
                            p.hash = h;
                            return v;
                        }
                        else if (spins > 0) {
                            h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
                            if (h == 0)                // initialize hash
                                h = SPINS | (int)t.getId();
                            else if (h < 0 &&          // approx 50% true
                                        (--spins & ((SPINS >>> 1) - 1)) == 0)
                                Thread.yield();        // two yields per wait
                        }
                        // 其它线程来交换数据了,另一个线程修改了槽位数据,但是还没有设置match数据,这时可以再稍等一会
                        else if (U.getObjectVolatile(a, j) != p)
                            spins = SPINS;       // releaser hasn't set match yet
                        // m == 0表明已经到达arena数组中最小的存储数据槽位,当前线程需要阻塞在这里
                        else if (!t.isInterrupted() && m == 0 &&
                                    (!timed ||
                                    (ns = end - System.nanoTime()) > 0L)) {
                            U.putObject(t, BLOCKER, this); // emulate LockSupport
                            p.parked = t;              // minimize window
                            // 再次检查槽位,看看在阻塞前,有没有线程来交换数据
                            if (U.getObjectVolatile(a, j) == p)
                                U.park(false, ns);
                            p.parked = null;
                            U.putObject(t, BLOCKER, null);
                        }
                        // 当前这个槽位一直没有线程来交换数据,可以换个槽位试试
                        else if (U.getObjectVolatile(a, j) == p &&
                                    U.compareAndSwapObject(a, j, p, null)) {
                            // 更新bound
                            if (m != 0)                // try to shrink
                                U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
                            p.item = null;
                            p.hash = h;
                            // 减小索引值,往最小的存储数据槽位的方向挪动
                            i = p.index >>>= 1;        // descend
                            if (Thread.interrupted())
                                return null;
                            // 超时
                            if (timed && m == 0 && ns <= 0L)
                                return TIMED_OUT;
                            break;                     // expired; restart
                        }
                    }
                }
                // 占据槽位失败
                else
                    p.item = null;                     // clear offer
            }
            // i不在有效索引范围内,或者对应槽位已经被其它线程抢先交换了
            else {
                // 更新p.bound
                if (p.bound != b) {                    // stale; reset
                    p.bound = b;
                    // bound的CAS失败次数初始为0
                    p.collides = 0;
                    // i如果到达最大值,就递减
                    i = (i != m || m == 0) ? m : m - 1;
                }
                else if ((c = p.collides) < m || m == FULL ||
                            !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
                    // 更新bound的CAS失败次数
                    p.collides = c + 1;
                    i = (i == 0) ? m : i - 1;          // cyclically traverse
                }
                // 递增i
                else
                    i = m + 1;                         // grow
                // 更新index
                p.index = i;
            }
        }
    }

    在slotExchange方法中,当存在竞争时,会创建arena数组:

    arena = new Node[(FULL + 2) << ASHIFT];

    在创建arena数组之前,会先设置bound为SEQ(SEQ=MMASK + 1),即bound的初始值为256:

    /**
    * The maximum supported arena index. The maximum allocatable
    * arena size is MMASK + 1. Must be a power of two minus one, less
    * than (1<<(31-ASHIFT)). The cap of 255 (0xff) more than suffices
    * for the expected scaling limits of the main algorithms.
    */
    private static final int MMASK = 0xff;
     
    /**
    * Unit for sequence/version bits of bound field. Each successful
    * change to the bound also adds SEQ.
    */
    private static final int SEQ = MMASK + 1;

    arena的大小为(FULL + 2) << ASHIFT,因为1 << ASHIFT 是用于避免伪共享的,因此实际有效的Node 只有FULL + 2 个。

    然后通过以下代码来获取arena中的节点:

    Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);

    ABASE定义如下:

    Class<?> ak = Node[].class;
    // ABASE absorbs padding in front of element 0
    ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);

    U.arrayBaseOffset(ak)就是计算存放第一个元素的内存地址,相对于数组对象起始地址的内存偏移量。可以看出,ABASE是计算出了一个新的起始地址,其前面的(1 << ASHIFT)位置都没有被利用。

    当一个线程来交换数据时,若计算出的槽位索引有效,那对应的槽位有两种状态:

    如果发现槽位(solt)没有数据,即slot为null,这时,当前线程就可以通过CAS操作占据slot,

    若CAS操作成功
    则slot就已经被当前线程占据。然后该线程会采用自旋+阻塞的方式进行等待交换数据。只有当槽位是第一个(m==0,i <= m)时,线程才会阻塞,否则,若长时间没有其他线程来交换数据,当前线程会换个槽位等待,首先,线程会将旧槽位的值通过CAS置为null,然后更新bound,索引值减半(i = p.index >>>= 1),如果设置了超时,需要进行超时判断,若发生超时,则直接返回。

    若CAS操作失败
    则有可能其他线程抢先了占据了slot,则将p.item设置为null,重新自旋。

    如果发现槽位(solt)已有数据,即slot不为null,这表明已经有其它线程占据了槽位,正在等待交换数据,那么当前线程就可以尝试进行数据交换,首先要通过CAS操作设置slot变量值,若CAS成功,则表示当前线程可以进行数据交换,否则,若CAS失败,则表示有其他线程抢先交换了数据,那这时,多个线程产生了竞争,那么就更新bound和p.index。

    arenaExchange(Object item, boolean timed, long ns)方法的运行逻辑总结如下:

    当一个线程来交换的时候,如果”第一个”槽位是空的,那么自己就在那里等待,如果发现”第一个”槽位有等待线程,那么就直接交换,如果交换失败,说明其它线程在进行交换,那么就往后挪一个槽位,如果有数据就交换,没数据就等一会,但是不会阻塞在这里,在这里等了一会,发现还没有其它线程来交换数据,那么就往“第一个”槽位的方向挪,如果反复这样过后,挪到了第一个槽位,没有线程来交换数据了,那么自己就在”第一个”槽位阻塞等待。 第一个槽位并不是指的数组中的第一个,而是逻辑第一个,因为存在伪共享,多槽位中,部分空间没有被利用。

  • 相关阅读:
    分享一个一直在用的golang单测小脚本
    JakeCoffman/Cron定时任务库核心实现源码解析
    uniapp h5部署二级目录
    Selenium
    Unable to connect to the server: x509: cannot validate certificate for 172.25.97.19 because it doesn't contain any IP SANs
    python
    chrome
    edit-plus 添加单引号 ''
    CALL_AND_RETRY_LAST Allocation failed
    nacos 客户端异常:SocketTimeoutException: connect timed out
  • 原文地址:https://www.cnblogs.com/bjlhx/p/11131553.html
Copyright © 2011-2022 走看看