zoukankan      html  css  js  c++  java
  • Exchanger类

    1.简述

      Exchanger是适用在两个线程之间数据交换的并发工具类,它的作用是找到一个同步点,当两个线程都执行到了同步点(exchange方法)之后(有一个没有执行到就一直等待,也可以设置等待超时时间),就将自身线程的数据与对方交换。

      Exchanger使用场景

    • 线程间交互数据。

    2.Exchanger的常用方法

    /**构造方法
     */
    //创建一个新的Exchanger
    Exchanger()
    
    /**常用方法
     */
    //exchange方法用于交互数据V
    V exchange(V x)
    //延迟一定时间交换数据
    V exchange(V x, long timeout, TimeUnit unit)
    View Code

    3.Exchanger的源码分析

      Exchanger的算法核心是通过一个可以交换数据的slot和一个可以带有数据item的参与者。

      Exchanger的主要属性

    /** The number of CPUs, for sizing and spin control */
    private static final int NCPU = Runtime.getRuntime().availableProcessors();
    //arena(Slot数组)的容量。设置这个值用来避免竞争。
    private static final int CAPACITY = 32;
    //arena最大不会超过FULL,避免空间浪费。如果单核或者双核CPU,FULL=0,只有一个SLot可以用。
    private static final int FULL = Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1);
    //自旋等待次数。单核情况下,自旋次数为0;多核情况下为大多数系统线程上下文切换的平均值。该值设置太大会消耗CPU。
    private static final int SPINS = (NCPU == 1) ? 0 : 2000;
    //若在超时机制下,自旋次数更少,因为多个检测超时的时间,这是一个经验值。
    private static final int TIMED_SPINS = SPINS / 20;
    
    private static final class Node extends AtomicReference<Object> {
        //创建这个节点的线程提供的用于交换的数据。
        public final Object item;
        //等待唤醒的线程
        public volatile Thread waiter;
        /**
         * Creates node with given item and empty hole.
         * @param item the item
         */
        public Node(Object item) {
            this.item = item;
        }
    }  
     
    //一个Slot就是一对线程交换数据的地方。这里对Slot做了缓存行填充,能够避免伪共享问题。虽然填充导致浪费了一些空间,但Slot是按需创建,一般没什么问题。
    private static final class Slot extends AtomicReference<Object> {
        // Improve likelihood of isolation on <= 64 byte cache lines
        long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
    }  
    //Slot数组,在需要时才进行初始化,用volatile修饰,因为这样可以安全的使用双重锁检测方式构建。
    private volatile Slot[] arena = new Slot[CAPACITY];
    //正在使用的slot下标的最大值。当一个线程经历了多次CAS竞争后,这个值会递增。当一个线程自旋等待超时后,这个值会递减。
    private final AtomicInteger max = new AtomicInteger(); 
    View Code

      Exchanger的exchange方法

    /**
     * 等待其他线程到达交换点,然后与其进行数据交换。
     * 如果其他线程到来,那么交换数据,返回。
     * 如果其他线程未到来,那么当前线程等待,直到如下情况发生:
     *   1.有其他线程来进行数据交换。
     *   2.当前线程被中断。
     */
    public V exchange(V x) throws InterruptedException {
        //检测当前线程是否被中断。
        if (!Thread.interrupted()) {
            //进行数据交换。
            Object v = doExchange((x == null) ? NULL_ITEM : x, false, 0);
            //检测结果是否为null。
            if (v == NULL_ITEM)
                return null;
            //检测是否被取消。
            if (v != CANCEL)
                return (V)v;
            //清除中断标记。
            Thread.interrupted(); // Clear interrupt status on IE throw
        }
        throw new InterruptedException();
    }
    /**
     * 等待其他线程到达交换点,然后与其进行数据交换。
     * 如果其他线程到来,那么交换数据,返回。
     * 如果其他线程未到来,那么当前线程等待,直到如下情况发生:
     *   1.有其他线程来进行数据交换。
     *   2.当前线程被中断。
     *   3.超时。
     */
    public V exchange(V x, long timeout, TimeUnit unit)
        throws InterruptedException, TimeoutException {
        //检测当前线程是否被中断。
        if (!Thread.interrupted()) {
            //进行数据交换。
            Object v = doExchange((x == null) ? NULL_ITEM : x,
                                  true, unit.toNanos(timeout));
            //检测结果是否为null。
            if (v == NULL_ITEM)
                return null;
            //检测是否被取消。
            if (v != CANCEL)
                return (V)v;
            if (!Thread.interrupted())
                throw new TimeoutException();
        }
        throw new InterruptedException();
    }
    
    /**doExchange方法,进行数据交换
     */
    private Object doExchange(Object item, boolean timed, long nanos) {
        Node me = new Node(item);
        //根据thread id计算出自己要去的那个交易位置(slot)
        int index = hashIndex();
        int fails = 0;
    
        for (;;) {
            Object y;
            Slot slot = arena[index];
            //slot = null,创建一个slot,然后会回到for循环,再次开始
            if (slot == null)
                createSlot(index);
            else if ((y = slot.get()) != null &&//slot里面有人等着(有Node),则尝试和其交换
                     slot.compareAndSet(y, null)) {//关键点1:slot清空,Node拿出来,俩人在Node里面交互。把Slot让给后面的人,做交互地点
                Node you = (Node)y;
                //把Node里面的东西,换成自己的
                if (you.compareAndSet(null, item)) {
                    //唤醒对方
                    LockSupport.unpark(you.waiter);
                    //自己把对方的东西拿走
                    return you.item;
                }//关键点2:如果运气不好,在Node里面要交换的时候,被另一个线程抢了,回到for循环,重新开始
            }
            else if (y == null &&//slot里面为空(没有Node),则自己把位置占住
                     slot.compareAndSet(null, me)) {
                //如果是0这个位置,自己阻塞,等待别人来交换
                if (index == 0)
                    return timed ?
                        awaitNanos(me, slot, nanos) :
                        await(me, slot);
                //不是0这个位置,自旋等待
                Object v = spinWait(me, slot);
                //自旋等待的时候,运气好,有人来交换了,返回
                if (v != CANCEL)
                    return v;
                //自旋的时候,没人来交换。走执行下面的,index减半,挪个位置,重新开始for循环
                me = new Node(item);
                int m = max.get();
                if (m > (index >>>= 1))
                    max.compareAndSet(m, m - 1);
            }
            else if (++fails > 1) {//失败 case1: slot有人,要交互,但被人家抢了  case2: slot没人,自己要占位置,又被人家抢了
                int m = max.get();
                //3次匹配失败,把index扩大,再次开始for循环
                if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
                    index = m + 1;
                else if (--index < 0)
                    index = m;
            }
        }
    }
    
    /**
     * 在下标为0的Slot上等待获取其他线程填充的值。
     * 如果在Slot被填充之前超时或者被中断,那么操作失败。
     */
    private Object awaitNanos(Node node, Slot slot, long nanos) {
        int spins = TIMED_SPINS;
        long lastTime = 0;
        Thread w = null;
        for (;;) {
            Object v = node.get();
            if (v != null)
                //如果已经被其他线程填充了值,那么返回这个值。
                return v;
            long now = System.nanoTime();
            if (w == null)
                w = Thread.currentThread();
            else
                nanos -= now - lastTime;
            lastTime = now;
            if (nanos > 0) {
                if (spins > 0)
                    --spins; //先自旋几次。
                else if (node.waiter == null)
                    node.waiter = w; //自旋阶段完毕后,将当前线程设置到node的waiter域。
                else if (w.isInterrupted())
                    tryCancel(node, slot); //如果当前线程被中断,尝试取消node。
                else
                    LockSupport.parkNanos(node, nanos); //阻塞给定的时间。
            }
            else if (tryCancel(node, slot) && !w.isInterrupted())
                //超时后,如果当前线程没有被中断,那么从Slot数组的其他位置看看有没有等待交换数据的节点
                return scanOnTimeout(node);
        }
    }
    View Code

    4.Exchanger的使用示例

    public class Test {
        public static void main(String[] args) throws Exception {
            final Exchanger<String> exgr = new Exchanger<String>();
            new Thread((new Runnable() {
                @Override
                public void run() {
                    try {
                        String A = Thread.currentThread().getName()+"的数据";
                        System.out.println(Thread.currentThread().getName()+"交互前:"+ A);
                        A = exgr.exchange(A);
                        System.out.println(Thread.currentThread().getName()+"交互后:"+ A);
                    } catch (InterruptedException e) {
                    }
                }
            }), "线程1").start();
    
            new Thread((new Runnable() {
                @Override
                public void run() {
                    try {
                        String B = Thread.currentThread().getName()+"的数据";
                        System.out.println(Thread.currentThread().getName()+"交互前:"+ B);
                        B = exgr.exchange(B);
                        System.out.println(Thread.currentThread().getName()+"交互后:"+ B);
                    } catch (InterruptedException e) {
                    }
                }
            }), "线程2").start();
        }
    }
    View Code

    5.总结

      Exchange和SynchronousQueue类似,都是通过两个线程操作同一个对象实现数据交换,只不过就像我们开始说的,SynchronousQueue使用的是同一个属性,通过不同的isData来区分,多线程并发时,使用了队列进行排队。Exchange使用了一个对象里的两个属性,item和match,不需要isData 属性了,因为在Exchange里面,没有isData这个语义。而多线程并发时,使用数组来控制,每个线程访问数组中不同的槽。

  • 相关阅读:
    IL查看泛型
    IL查看委托
    IL查看override
    为VS集成IL环境
    HashTable Dictionary HashMap
    C#集合类
    Resharper团队协作之TODO
    怪物彈珠Monster Strike 攻略
    [Editor]Unity Editor类常用方法
    C# Reflection BindingFlags
  • 原文地址:https://www.cnblogs.com/bl123/p/14189113.html
Copyright © 2011-2022 走看看