zoukankan      html  css  js  c++  java
  • 【JDK源码分析】同步工具Exchanger,它的内部实现原理你看懂了吗?

    前言

    Exchanger应该算并发包中工具使用相对少的,因为它主要用于线程之间交换数据,它的用法比较简单在不同线程之间使用exchange方法交换数据,但是内部实现比较巧妙,使用了unsafe的CAS原子操作、自旋来解决冲突问题,下面我们通过源码一探究竟。

    源码

    先看看源码注释中关于核心算法的介绍

            for (;;) {
                if (slot is empty) { 
                    // slot为空时,将item 设置到Node 中                   
                    place item in a Node;
                    if (can CAS slot from empty to node) {
                        // 当将node通过CAS交换到slot中时,挂起线程等待被唤醒
                        wait for release;
                        // 被唤醒后返回node中匹配到的item
                        return matching item in node;
                    }
                } else if (can CAS slot from node to empty) { // release
                    // 将slot设置为空
                    // 获取node中的item,将需要交换的数据设置到匹配的item
                    get the item in node;
                    set matching item in node;
                    // 唤醒等待的线程
                    release waiting thread;
                }
                // else retry on CAS failure
            }

    比如有2条线程A和B,A线程交换数据时,发现slot为空,则将需要交换的数据放在slot中等待其它线程进来交换数据,等线程B进来,读取A设置的数据,然后设置线程B需要交换的数据,然后唤醒A线程,原理就是这么简单。当时当多个线程之间进行交换数据时就会出现问题,所以Exchanger加入了slot数组。

    Exchanger 属性及构造器

        // 用于左移Node数组下标,从而得出数据在内存中的偏移量来获取数据,避免伪共享
        private static final int ASHIFT = 7;
        // note数组最大下标
        private static final int MMASK = 0xff;
        // 用于递增bound,每次加一个SEQ
        private static final int SEQ = MMASK + 1;
        // CPU核心数
        private static final int NCPU = Runtime.getRuntime().availableProcessors();
        // 当前数组最大的下标(多处理器情况下)
        static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
        // 自旋次数,CPU核心为1个时,自旋被禁用
        private static final int SPINS = 1 << 10;
        // 空对象,用于当线程exchange方法中参数为null时传递给其他线程的对象
        private static final Object NULL_ITEM = new Object();
        // 用于超时时传递的对象
        private static final Object TIMED_OUT = new Object();
        // Participant 继承了ThreadLocal,也就是说该对象用于获取每条线程中存放的值
        private final Participant participant;
        // 多个线程交换
        private volatile Node[] arena;
        // 用于2个线程交换
        private volatile Node slot;
        // 该值主要用于与
        private volatile int bound; 
        // 通过unsafe用于CAS操作
        private static final sun.misc.Unsafe U;
        private static final long BOUND;
        private static final long SLOT;
        private static final long MATCH;
        private static final long BLOCKER;
        private static final int ABASE;
        static {
            int s;
            try {
                U = sun.misc.Unsafe.getUnsafe();
                Class<?> ek = Exchanger.class;
                Class<?> nk = Node.class;
                Class<?> ak = Node[].class;
                Class<?> tk = Thread.class;
                // bound属性在Exchanger对象中的偏移地址
                BOUND = U.objectFieldOffset
                    (ek.getDeclaredField("bound"));
                // slot属性在Exchanger对象中的偏移地址   
                SLOT = U.objectFieldOffset
                    (ek.getDeclaredField("slot"));
                // slot属性在Node对象中的偏移地址
                MATCH = U.objectFieldOffset
                    (nk.getDeclaredField("match"));
               // parkBlocker属性在Thread对象中的偏移地址
                BLOCKER = U.objectFieldOffset
                    (tk.getDeclaredField("parkBlocker"));
                // 获取Node[]数组中每个元素的大小,这里是4
                s = U.arrayIndexScale(ak);
                // ABASE absorbs padding in front of element 0
                // 获取Node[]数组中第一个元素的偏移地址 + 128
                ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);
    
            } catch (Exception e) {
                throw new Error(e);
            }
            if ((s & (s-1)) != 0 || s > (1 << ASHIFT))
                // 这里是为了保证 Node数组中的元素不会争用一个缓存行
                throw new Error("Unsupported array scale");
        }

    构造器及内部类

        public Exchanger() {
            participant = new Participant();
        }
        // 内部类,用于记录每个线程的状态
        static final class Participant extends ThreadLocal<Node> {
            public Node initialValue() { return new Node(); }
        }
        // 包含需要交换的数据等信息
        // Contended为 JDK8 新增的注解,用于避免伪共享,提高程序性能
        @sun.misc.Contended static final class Node {
            // arana数组中的下标
            int index;  
            // 上一次记录的bound            
            int bound;  
            // cas操作失败的次数            
            int collides; 
            // 用于自旋的伪随机数         
            int hash;               // Pseudo-random for spins
            // 当前线程需要交换的数据
            Object item;            // This thread's current item
            // 匹配线程交换的数据
            volatile Object match;  // Item provided by releasing thread
            // 记录当前挂起的线程
            volatile Thread parked; // Set to this thread when parked, else null
        }     

    方法exchange

        // 交换数据,参数X为本线程提供给其它线程的数据
        public V exchange(V x) throws InterruptedException {
            Object v;
            // 当参数为null时需要将item设置为空的对象
            Object item = (x == null) ? NULL_ITEM : x; // translate null args
            // 注意到这里的这个表达式是整个方法的核心
            if ((arena != null ||
                 (v = slotExchange(item, false, 0L)) == null) &&
                ((Thread.interrupted() || // disambiguates null return
                  (v = arenaExchange(item, false, 0L)) == null)))
                throw new InterruptedException();
            return (v == NULL_ITEM) ? null : (V)v;
        }

    仔细看if里的条件表达式,得知: 
    只有当arena为null时,才会执行slotExchange方法; 
    当arena不为null或者(arena为null且slotExchange方法返回null)时,此时线程未中断,才会执行arenaExchange方法; 
    线程中断时,就会直接抛出线程中断异常。 
    下面我们来看slotExchange方法

      1     // timed 为true表示设置了超时时间,ns为>0的值,反之没有设置超时时间
      2     private final Object slotExchange(Object item, boolean timed, long ns) {
      3         // 获取当前线程node对象
      4         Node p = participant.get();
      5         Thread t = Thread.currentThread();
      6         if (t.isInterrupted()) // preserve interrupt status so caller can recheck
      7             // 线程中断返回null
      8             return null;
      9         
     10         // 自旋
     11         for (Node q;;) {
     12             // 将slot值赋给q
     13             if ((q = slot) != null) {
     14                 // slot 不为null,即表示已有线程已经把需要交换的数据设置在slot中了
     15                 // 通过CAS将slot设置成null
     16                 if (U.compareAndSwapObject(this, SLOT, q, null)) {
     17                     // CAS操作成功后,将slot中的item赋值给对象v,以便返回。
     18                     // 这里也是就读取之前线程要交换的数据
     19                     Object v = q.item;
     20                     // 将当前线程需要交给的数据设置在q中的match
     21                     q.match = item;
     22                     // 获取被挂起的线程
     23                     Thread w = q.parked;
     24                     if (w != null)
     25                         // 如果线程不为null,唤醒它
     26                         U.unpark(w);
     27                     // 返回其他线程给的V
     28                     return v;
     29                 }
     30                 // CAS 操作失败,表示有其它线程竞争,在此线程之前将数据已取走
     31                 // create arena on contention, but continue until slot null
     32                 if (NCPU > 1 && bound == 0 &&
     33                     U.compareAndSwapInt(this, BOUND, 0, SEQ))
     34                     // CPU为多核心
     35                     // bound == 0 表示arena数组未初始化过,CAS操作bound将其增加SEQ
     36                     // 初始化arena数组
     37                     arena = new Node[(FULL + 2) << ASHIFT];
     38             }
     39             // 上面分析过,只有当arena才会执行slotExchange方法的
     40             // 所以表示刚好已有其它线程加入进来将arena初始化
     41             else if (arena != null)
     42                 // 这里就需要去执行arenaExchange
     43                 return null; // caller must reroute to arenaExchange
     44             else {
     45                 // 这里表示当前线程是以第一个线程进来交换数据
     46                 // 或者表示之前的数据交换已进行完毕,这里可以看作是第一个线程
     47                 // 将需要交换的数据先存放在当前线程变量p中
     48                 p.item = item;
     49                 // 将需要交换的数据通过CAS设置到交换区slot
     50                 if (U.compareAndSwapObject(this, SLOT, null, p))
     51                     // 交换成功后跳出自旋
     52                     break;
     53                 // CAS操作失败,表示有其它线程刚好先于当前线程将数据设置到交换区slot
     54                 // 将当前线程变量中的item设置为null,然后自旋获取其它线程存放在交换区slot的数据
     55                 p.item = null;
     56             }
     57         }
     58         // 执行到这里表示当前线程已将需要的交换的数据放置于交换区slot中了,
     59         // 等待其它线程交换数据然后唤醒当前线程
     60         // await release
     61         int h = p.hash;
     62         long end = timed ? System.nanoTime() + ns : 0L;
     63         // 自旋次数
     64         int spins = (NCPU > 1) ? SPINS : 1;
     65         Object v;
     66         // 自旋等待直到p.match不为null,也就是说等待其它线程将需要交换的数据放置于交换区slot
     67         while ((v = p.match) == null) {
     68             // 下面的逻辑主要是自旋等待,直到spins递减到0为止
     69             if (spins > 0) {
     70                 h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
     71                 if (h == 0)
     72                     h = SPINS | (int)t.getId();
     73                 else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
     74                     Thread.yield();
     75             }
     76             // slot 和 p本应该是相等,除非其它线程执行了第16行代码中的CAS操作将slot置为null,
     77             // 还未来得及设置match的值,此时只需要自旋等待第21行代码被其它线程执行,
     78             // 这样p.match才会不为null跳出循环
     79             else if (slot != p)
     80                 spins = SPINS;
     81             // 此处表示未设置超时或者时间未超时
     82             else if (!t.isInterrupted() && arena == null &&
     83                      (!timed || (ns = end - System.nanoTime()) > 0L)) {
     84                 // 设置线程t被当前对象阻塞
     85                 U.putObject(t, BLOCKER, this);
     86                 // 给p挂机线程的值赋值
     87                 p.parked = t;
     88                 if (slot == p)
     89                     // 如果slot还没有被置为null,也就表示暂未有线程过来交换数据,需要将当前线程挂起
     90                     U.park(false, ns);
     91                 // 线程被唤醒,将被挂起的线程设置为null
     92                 p.parked = null;
     93                 // 设置线程t未被任何对象阻塞
     94                 U.putObject(t, BLOCKER, null);
     95             }
     96             // 不是以上条件时(可能是arena已不为null或者超时)
     97             else if (U.compareAndSwapObject(this, SLOT, p, null)) {
     98                 // arena不为null则v为null,其它为超时则v为超市对象TIMED_OUT,并且跳出循环
     99                 v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
    100                 break;
    101             }
    102         }
    103         // 取走match值,并将p中的match置为null
    104         U.putOrderedObject(p, MATCH, null);
    105         // 设置item为null
    106         p.item = null;
    107         p.hash = h;
    108         // 返回交换值
    109         return v;
    110     }

    再来看arenaExchange方法,此方法被执行时表示多个线程进入交换区交换数据,arena数组已被初始化,此方法中的一些处理方式和slotExchange比较类似,它是通过遍历arena数组找到需要交换的数据

        // timed 为true表示设置了超时时间,ns为>0的值,反之没有设置超时时间
        private final Object arenaExchange(Object item, boolean timed, long ns) {
            Node[] a = arena;
            // 获取当前线程中的存放的node
            Node p = participant.get();
            //index初始值0
            for (int i = p.index;;) {                      // access slot at i
                // 遍历,如果在数组中找到数据则直接交换并唤醒线程,
                // 如未找到则将需要交换给其它线程的数据放置于数组中
                int b, m, c; long j;                       // j is raw array offset
                // 其实这里就是向右遍历数组,只是用到了元素在内存偏移的偏移量
                // q实际为arena数组偏移(i + 1) *  128个地址位上的node
                Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
                // 如果q不为null,并且CAS操作成功,将下标j的元素置为null
                if (q != null && U.compareAndSwapObject(a, j, q, null)) {
                    // 表示当前线程已发现有交换的数据,然后获取数据,唤醒等待的线程
                    Object v = q.item;                     // release
                    q.match = item;
                    Thread w = q.parked;
                    if (w != null)
                        U.unpark(w);
                    return v;
                }
                // q 为null 并且 i 未超过数组边界
                else if (i <= (m = (b = bound) & MMASK) && q == null) {
                    // 将需要给其它线程的item赋予给p中的item
                    p.item = item;                         // offer
                    if (U.compareAndSwapObject(a, j, null, p)) {
                        // 交换成功
                        long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
                        Thread t = Thread.currentThread(); // wait
                        // 自旋直到有其它线程进入,遍历到该元素并与其交换,同时当前线程被唤醒
                        for (int h = p.hash, spins = SPINS;;) {
                            Object v = p.match;
                            if (v != null) {
                                // 其它线程设置的需要交换的数据match不为null
                                // 将match设置null,item设置为null
                                U.putOrderedObject(p, MATCH, null);
                                p.item = null;             // clear for next use
                                p.hash = h;
                                return v;
                            }
                            else if (spins > 0) {
                                // 递减自旋次数
                                h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
                                if (h == 0)                // initialize hash
                                    h = SPINS | (int)t.getId();
                                else if (h < 0 &&          // approx 50% true
                                         (--spins & ((SPINS >>> 1) - 1)) == 0)
                                    Thread.yield();        // two yields per wait
                            }
                            else if (U.getObjectVolatile(a, j) != p)
                                // 和slotExchange方法中的类似,arena数组中的数据已被CAS设置
                                // match值还未设置,让其再自旋会等待match被设置
                                spins = SPINS;       // releaser hasn't set match yet
                            else if (!t.isInterrupted() && m == 0 &&
                                     (!timed ||
                                      (ns = end - System.nanoTime()) > 0L)) {
                                // 设置线程t被当前对象阻塞
                                U.putObject(t, BLOCKER, this); // emulate LockSupport
                                // 线程t赋值
                                p.parked = t;              // minimize window
                                if (U.getObjectVolatile(a, j) == p)
                                    // 数组中对象还相等,表示线程还未被唤醒,唤醒线程
                                    U.park(false, ns);
                                p.parked = null;
                                // 设置线程t未被任何对象阻塞
                                U.putObject(t, BLOCKER, null);
                            }
                            else if (U.getObjectVolatile(a, j) == p &&
                                     U.compareAndSwapObject(a, j, p, null)) {
                                // 这里给bound增加加一个SEQ
                                if (m != 0)                // try to shrink
                                    U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
                                p.item = null;
                                p.hash = h;
                                i = p.index >>>= 1;        // descend
                                if (Thread.interrupted())
                                    return null;
                                if (timed && m == 0 && ns <= 0L)
                                    return TIMED_OUT;
                                break;                     // expired; restart
                            }
                        }
                    }
                    else
                        // 交换失败,表示有其它线程更改了arena数组中下标i的元素
                        p.item = null;                     // clear offer
                }
                else {
                    // 此时表示下标不在bound & MMASK或q不为null但CAS操作失败
                    // 需要更新bound变化后的值
                    if (p.bound != b) {                    // stale; reset
                        p.bound = b;
                        p.collides = 0;
                        // 反向遍历
                        i = (i != m || m == 0) ? m : m - 1;
                    }
                    else if ((c = p.collides) < m || m == FULL ||
                             !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
                        // 记录CAS失败的次数
                        p.collides = c + 1;
                        // 循环遍历
                        i = (i == 0) ? m : i - 1;          // cyclically traverse
                    }
                    else
                        // 此时表示bound值增加了SEQ+1
                        i = m + 1;                         // grow
                    // 设置下标
                    p.index = i;
                }
            }
        }

    总结

    读到这里是不是还是感觉有很多疑问?

    1. 先看为什么ASHIFT设置成7,这是为了尽量避免slot数组中不同的元素在同一个缓存行上,<< ASHIFT 左移7位,表示至少移动了128地址位,而我们主流的缓存行大小一般为32字节到256字节,所以128个地址位基本覆盖到了常见的处理器平台。arena数组中元素的分布如图,它们之间间隔128个整数倍地址位,也就是说最小相差128个地址位。 
      arena数组
    2. 为什么Node类用@sun.misc.Contended注解呢?该注解是jdk8新增的注解,是为了解决之前手动填充数据的问题。填充数据也是为了避免arena数组中的不同的元素共享同一个缓存行,导致多线程修改数据时性能受到影响。

    参考: 
    伪共享(False Sharing)

  • 相关阅读:
    Joshua Bloch错了? ——适当改变你的Builder模式实现
    集成基于OAuth协议的单点登陆
    集成基于CAS协议的单点登陆
    数据库设计中的Soft Delete模式
    完成C++不能做到的事
    ExtJS in Review
    DTO – 服务实现中的核心数据
    保存好你的密码 —— 从芝麻金融被攻破说起
    WPF
    C# 反编译防范
  • 原文地址:https://www.cnblogs.com/d-homme/p/9387948.html
Copyright © 2011-2022 走看看