zoukankan      html  css  js  c++  java
  • Exchanger

    本文参考群主的博客http://cmsblogs.com/?p=2269

    Java 并发 API 提供了一种允许2个并发任务间相互交换数据的同步应用。更具体的说,Exchanger 类允许在2个线程间定义同步点,当2个线程到达这个点,他们相互交换数据类型,使用第一个线程的数据类型变成第二个的,然后第二个线程的数据类型变成第一个的。

    package com;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Exchanger;
    
    class Producer implements Runnable
    {  
      
        // 要被相互交换的数据类型。  
        private List<String> buffer;  
      
        // 用来同步 producer和consumer  
        private final Exchanger<List<String>> exchanger;  
      
        public Producer(List<String> buffer, Exchanger<List<String>> exchanger)  
        {  
            this.buffer = buffer;  
            this.exchanger = exchanger;  
        }  
      
        public void run()  
        {  
            // 实现10次交换  
            for (int i = 0; i < 10; i++)
            {  
                buffer.add("第" + i + "次生产者的数据" + i);  
                try  
                {  
                    // 调用exchange方法来与consumer交换数据  
                    System.out.println("第" + i + "次生产者在等待.....");  
                    buffer = exchanger.exchange(buffer);  
                    System.out.println("第" + i + "次生产者交换后的数据:" + buffer.get(i));  
                }  
                catch (InterruptedException e)  
                {  
                    e.printStackTrace();  
                }  
            }  
        }  
    }  
      
      
    class Consumer implements Runnable
    {  
        // 用来相互交换  
        private List<String> buffer;  
      
        // 用来同步 producer和consumer  
        private final Exchanger<List<String>> exchanger;  
      
        public Consumer(List<String> buffer, Exchanger<List<String>> exchanger)  
        {  
            this.buffer = buffer;  
            this.exchanger = exchanger;  
        }  
      
        public void run()  
        {  
            // 实现10次交换  
            for (int i = 0; i < 10; i++)  
            {  
                buffer.add("第" + i + "次消费者的数据" + i);  
                try  
                {  
                    // 调用exchange方法来与consumer交换数据  
                    System.out.println("第" + i + "次消费者在等待.....");  
                    buffer = exchanger.exchange(buffer);  
                    System.out.println("第" + i + "次消费者交换后的数据:" + buffer.get(i));  
                }  
                catch (InterruptedException e)  
                {  
                    e.printStackTrace();  
                }  
            }  
        }  
    }  
      
    //主类
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Exchanger;
    
    public class Core
    {
        public static void main(String[] args)
        {
            // 创建2个buffers,分别给producer和consumer使用
            List<String> buffer1 = new ArrayList<String>();
            List<String> buffer2 = new ArrayList<String>();
    
            // 创建Exchanger对象,用来同步producer和consumer
            Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
    
            // 创建Producer对象和Consumer对象
            Producer producer = new Producer(buffer1, exchanger);
            Consumer consumer = new Consumer(buffer2, exchanger);
    
            // 创建线程来执行producer和consumer并开始线程
            Thread threadProducer = new Thread(producer);
            Thread threadConsumer = new Thread(consumer);
            threadProducer.start();
            threadConsumer.start();
        }
    }

    在Exchanger中,如果一个线程已经到达了exchanger节点时,对于它的伙伴节点的情况有三种:

    1. 如果它的伙伴节点在该线程到达之前已经调用了exchanger方法,则它会唤醒它的伙伴然后进行数据交换,得到各自数据返回。
    2. 如果它的伙伴节点还没有到达交换点,则该线程将会被挂起,等待它的伙伴节点到达被唤醒,完成数据交换。
    3. 如果当前线程被中断了则抛出异常,或者等待超时了,则抛出超时异常。

    Exchanger算法的核心是通过一个可交换数据的slot,以及一个可以带有数据item的参与者。

    for (;;) {
            if (slot is empty) {                       // offer
              place item in a Node;
              if (can CAS slot from empty to node) {
                wait for release;
                return matching item in node;
              }
            }
            else if (can CAS slot from node to empty) { // release
              get the item in node;
              set matching item in node;
              release waiting thread;
            }
            // else retry on CAS failure
          }

    Exchanger中定义了如下几个重要的成员变量:

    private final Participant participant;
    private volatile Node[] arena;
    private volatile Node slot;

    participant的作用是为每个线程保留唯一的一个Node节点。

    slot为单个槽,arena为数组槽。他们都是Node类型。在这里可能会感觉到疑惑,slot作为Exchanger交换数据的场景,应该只需要一个就可以了啊?为何还多了一个Participant 和数组类型的arena呢?一个slot交换场所原则上来说应该是可以的,但实际情况却不是如此,多个参与者使用同一个交换场所时,会存在严重伸缩性问题。既然单个交换场所存在问题,那么我们就安排多个,也就是数组arena。通过数组arena来安排不同的线程使用不同的slot来降低竞争问题,并且可以保证最终一定会成对交换数据。但是Exchanger不是一来就会生成arena数组来降低竞争,只有当产生竞争是才会生成arena数组。那么怎么将Node与当前线程绑定呢?Participant ,Participant 的作用就是为每个线程保留唯一的一个Node节点,它继承ThreadLocal,同时在Node节点中记录在arena中的下标index。

    Node定义如下:

     @sun.misc.Contended static final class Node {
            int index;              // arena的下标;
            int bound;              // 上一次记录的Exchanger.bound
            int collides;           // 在当前bound下CAS失败的次数
            int hash;               // 伪随机数,用于自旋;
            Object item;            // 这个线程的当前项,也就是需要交换的数据;
            volatile Object match;  // 做releasing操作的线程传递的项;
            volatile Thread parked; //挂起时设置线程值,其他情况下为null;
        }

    exchange(V x)

    exchange(V x):等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。

     public V exchange(V x) throws InterruptedException {
            Object v;
            Object item = (x == null) ? NULL_ITEM : x; // translate null args
            if ((arena != null ||
                 (v = slotExchange(item, false, 0L)) == null) &&
                ((Thread.interrupted() || // disambiguates null return
                  (v = arenaExchange(item, false, 0L)) == null)))
                throw new InterruptedException();
            return (v == NULL_ITEM) ? null : (V)v;
        }

    这个方法比较好理解:arena为数组槽,如果为null,则执行slotExchange()方法,否则判断线程是否中断,如果中断值抛出InterruptedException异常,没有中断则执行arenaExchange()方法。整套逻辑就是:如果slotExchange(Object item, boolean timed, long ns)方法执行失败了就执行arenaExchange(Object item, boolean timed, long ns)方法,最后返回结果V。

    NULL_ITEM 为一个空节点,其实就是一个Object对象而已,slotExchange()为单个slot交换。

    slotExchange(Object item, boolean timed, long ns)

    private final Object slotExchange(Object item, boolean timed, long ns) {
            // 获取当前线程的节点 p
            Node p = participant.get();
            // 当前线程
            Thread t = Thread.currentThread();
            // 线程中断,直接返回
            if (t.isInterrupted())
                return null;
            // 自旋
            for (Node q;;) {
                //slot != null
                if ((q = slot) != null) {
                    //尝试CAS替换
                    if (U.compareAndSwapObject(this, SLOT, q, null)) {
                        Object v = q.item;      // 当前线程的项,也就是交换的数据
                        q.match = item;         // 做releasing操作的线程传递的项
                        Thread w = q.parked;    // 挂起时设置线程值
                        // 挂起线程不为null,线程挂起
                        if (w != null)
                            U.unpark(w);
                        return v;
                    }
                    //如果失败了,则创建arena
                    //bound 则是上次Exchanger.bound
                    if (NCPU > 1 && bound == 0 &&
                            U.compareAndSwapInt(this, BOUND, 0, SEQ))
                        arena = new Node[(FULL + 2) << ASHIFT];
                }
                //如果arena != null,直接返回,进入arenaExchange逻辑处理
                else if (arena != null)
                    return null;
                else {
                    p.item = item;
                    if (U.compareAndSwapObject(this, SLOT, null, p))
                        break;
                    p.item = null;
                }
            }
    
            /*
             * 等待 release
             * 进入spin+block模式
             */
            int h = p.hash;
            long end = timed ? System.nanoTime() + ns : 0L;
            int spins = (NCPU > 1) ? SPINS : 1;
            Object v;
            while ((v = p.match) == null) {
                if (spins > 0) {
                    h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
                    if (h == 0)
                        h = SPINS | (int)t.getId();
                    else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                        Thread.yield();
                }
                else if (slot != p)
                    spins = SPINS;
                else if (!t.isInterrupted() && arena == null &&
                        (!timed || (ns = end - System.nanoTime()) > 0L)) {
                    U.putObject(t, BLOCKER, this);
                    p.parked = t;
                    if (slot == p)
                        U.park(false, ns);
                    p.parked = null;
                    U.putObject(t, BLOCKER, null);
                }
                else if (U.compareAndSwapObject(this, SLOT, p, null)) {
                    v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                    break;
                }
            }
            U.putOrderedObject(p, MATCH, null);
            p.item = null;
            p.hash = h;
            return v;
        }

    程序首先通过participant获取当前线程节点Node。检测是否中断,如果中断return null,等待后续抛出InterruptedException异常。

    如果slot不为null,则进行slot消除,成功直接返回数据V,否则失败,则创建arena消除数组。

    如果slot为null,但arena不为null,则返回null,进入arenaExchange逻辑。

    如果slot为null,且arena也为null,则尝试占领该slot,失败重试,成功则跳出循环进入spin+block(自旋+阻塞)模式。

    在自旋+阻塞模式中,首先取得结束时间和自旋次数。如果match(做releasing操作的线程传递的项)为null,其首先尝试spins+随机次自旋(改自旋使用当前节点中的hash,并改变之)和退让。当自旋数为0后,假如slot发生了改变(slot != p)则重置自旋数并重试。否则假如:当前未中断&arena为null&(当前不是限时版本或者限时版本+当前时间未结束):阻塞或者限时阻塞。假如:当前中断或者arena不为null或者当前为限时版本+时间已经结束:不限时版本:置v为null;限时版本:如果时间结束以及未中断则TIMED_OUT;否则给出null(原因是探测到arena非空或者当前线程中断)。

    match不为空时跳出循环。

    arenaExchange(Object item, boolean timed, long ns)

     private final Object arenaExchange(Object item, boolean timed, long ns) {
            Node[] a = arena;
            Node p = participant.get();
            for (int i = p.index;;) {                      // access slot at i
                int b, m, c; long j;                       // j is raw array offset
                Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
                if (q != null && U.compareAndSwapObject(a, j, q, null)) {
                    Object v = q.item;                     // release
                    q.match = item;
                    Thread w = q.parked;
                    if (w != null)
                        U.unpark(w);
                    return v;
                }
                else if (i <= (m = (b = bound) & MMASK) && q == null) {
                    p.item = item;                         // offer
                    if (U.compareAndSwapObject(a, j, null, p)) {
                        long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
                        Thread t = Thread.currentThread(); // wait
                        for (int h = p.hash, spins = SPINS;;) {
                            Object v = p.match;
                            if (v != null) {
                                U.putOrderedObject(p, MATCH, null);
                                p.item = null;             // clear for next use
                                p.hash = h;
                                return v;
                            }
                            else if (spins > 0) {
                                h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
                                if (h == 0)                // initialize hash
                                    h = SPINS | (int)t.getId();
                                else if (h < 0 &&          // approx 50% true
                                         (--spins & ((SPINS >>> 1) - 1)) == 0)
                                    Thread.yield();        // two yields per wait
                            }
                            else if (U.getObjectVolatile(a, j) != p)
                                spins = SPINS;       // releaser hasn't set match yet
                            else if (!t.isInterrupted() && m == 0 &&
                                     (!timed ||
                                      (ns = end - System.nanoTime()) > 0L)) {
                                U.putObject(t, BLOCKER, this); // emulate LockSupport
                                p.parked = t;              // minimize window
                                if (U.getObjectVolatile(a, j) == p)
                                    U.park(false, ns);
                                p.parked = null;
                                U.putObject(t, BLOCKER, null);
                            }
                            else if (U.getObjectVolatile(a, j) == p &&
                                     U.compareAndSwapObject(a, j, p, null)) {
                                if (m != 0)                // try to shrink
                                    U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
                                p.item = null;
                                p.hash = h;
                                i = p.index >>>= 1;        // descend
                                if (Thread.interrupted())
                                    return null;
                                if (timed && m == 0 && ns <= 0L)
                                    return TIMED_OUT;
                                break;                     // expired; restart
                            }
                        }
                    }
                    else
                        p.item = null;                     // clear offer
                }
                else {
                    if (p.bound != b) {                    // stale; reset
                        p.bound = b;
                        p.collides = 0;
                        i = (i != m || m == 0) ? m : m - 1;
                    }
                    else if ((c = p.collides) < m || m == FULL ||
                             !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
                        p.collides = c + 1;
                        i = (i == 0) ? m : i - 1;          // cyclically traverse
                    }
                    else
                        i = m + 1;                         // grow
                    p.index = i;
                }
            }
        }

    首先通过participant取得当前节点Node,然后根据当前节点Node的index去取arena中相对应的节点node。前面提到过arena可以确保不同的slot在arena中是不会相冲突的,那么是怎么保证的呢?

    arena = new Node[(FULL + 2) << ASHIFT];

    取得arena中的node节点后,如果定位的节点q 不为空,且CAS操作成功,则交换数据,返回交换的数据,唤醒等待的线程。

    如果q等于null且下标在bound & MMASK范围之内,则尝试占领该位置,如果成功,则采用自旋 + 阻塞的方式进行等待交换数据。

    如果下标不在bound & MMASK范围之内获取由于q不为null但是竞争失败的时候:消除p。加入bound 不等于当前节点的bond(b != p.bound),则更新p.bound = b,collides = 0 ,i = m或者m – 1。如果冲突的次数不到m 获取m 已经为最大值或者修改当前bound的值失败,则通过增加一次collides以及循环递减下标i的值;否则更新当前bound的值成功:我们令i为m+1即为此时最大的下标。最后更新当前index的值。

    Exchanger使用、原理都比较好理解,但是这个源码看起来真心有点儿复杂,是真心难看懂,但是这种交换的思路Doug Lea在后续博文中还会提到,例如SynchronousQueue、LinkedTransferQueue。

        其实就是”我”和”你”(可能有多个”我”,多个”你”)在一个叫Slot的地方做交易(一手交钱,一手交货),过程分以下步骤:

      1. 我先到一个叫做Slot的交易场所交易,发现你已经到了,那我就尝试喊你交易,如果你回应了我,决定和我交易那么进入第2步;如果别人抢先一步把你喊走了,那我就进入第5步。
      2. 我拿出钱交给你,你可能会接收我的钱,然后把货给我,交易结束;也可能嫌我掏钱太慢(超时)或者接个电话(中断),TM的不卖了,走了,那我只能再找别人买货了(从头开始)。
      3. 我到交易地点的时候,你不在,那我先尝试把这个交易点给占了(一屁股做凳子上…),如果我成功抢占了单间(交易点),那就坐这儿等着你拿货来交易,进入第4步;如果被别人抢座了,那我只能在找别的地方儿了,进入第5步。
      4. 你拿着货来了,喊我交易,然后完成交易;也可能我等了好长时间你都没来,我不等了,继续找别人交易去,走的时候我看了一眼,一共没多少人,弄了这么多单间(交易地点Slot),太TM浪费了,我喊来交易地点管理员:一共也没几个人,搞这么多单间儿干毛,给哥撤一个!。然后再找别人买货(从头开始);或者我老大给我打了个电话,不让我买货了(中断)。
      5. 我跑去喊管理员,尼玛,就一个坑交易个毛啊,然后管理在一个更加开阔的地方开辟了好多个单间,然后我就挨个来看每个单间是否有人。如果有人我就问他是否可以交易,如果回应了我,那我就进入第2步。如果我没有人,那我就占着这个单间等其他人来交易,进入第4步。
      6. 如果我尝试了几次都没有成功,我就会认为,是不是我TM选的这个单间风水不好?不行,得换个地儿继续(从头开始);如果我尝试了多次发现还没有成功,怒了,把管理员喊来:给哥再开一个单间(Slot),加一个凳子,这么多人就这么几个破凳子够谁用!
  • 相关阅读:
    在Windows Mobile 中如何改变Datagrid中字的颜色?
    使用FileSystemWatcher监视文件更改
    MVVM体验记之DataGrid绑定
    实现IDisposable以实现更优雅的代码
    适当使用enum做数据字典
    web.config customErrors无法处理的is not a valid virtual path
    Windows下使用skipfish
    MVVM打造无限级TreeView
    System.Drawing.Image在Save之后Type变了
    Windows下使用RatProxy
  • 原文地址:https://www.cnblogs.com/gudulijia/p/6892120.html
Copyright © 2011-2022 走看看