前言
Exchanger应该算并发包中工具使用相对少的,因为它主要用于线程之间交换数据,它的用法比较简单在不同线程之间使用exchange方法交换数据,但是内部实现比较巧妙,使用了unsafe的CAS原子操作、自旋来解决冲突问题,下面我们通过源码一探究竟。
源码
先看看源码注释中关于核心算法的介绍
for (;;) { if (slot is empty) { // slot为空时,将item 设置到Node 中 place item in a Node; if (can CAS slot from empty to node) { // 当将node通过CAS交换到slot中时,挂起线程等待被唤醒 wait for release; // 被唤醒后返回node中匹配到的item return matching item in node; } } else if (can CAS slot from node to empty) { // release // 将slot设置为空 // 获取node中的item,将需要交换的数据设置到匹配的item get the item in node; set matching item in node; // 唤醒等待的线程 release waiting thread; } // else retry on CAS failure }
比如有2条线程A和B,A线程交换数据时,发现slot为空,则将需要交换的数据放在slot中等待其它线程进来交换数据,等线程B进来,读取A设置的数据,然后设置线程B需要交换的数据,然后唤醒A线程,原理就是这么简单。当时当多个线程之间进行交换数据时就会出现问题,所以Exchanger加入了slot数组。
Exchanger 属性及构造器
// 用于左移Node数组下标,从而得出数据在内存中的偏移量来获取数据,避免伪共享 private static final int ASHIFT = 7; // note数组最大下标 private static final int MMASK = 0xff; // 用于递增bound,每次加一个SEQ private static final int SEQ = MMASK + 1; // CPU核心数 private static final int NCPU = Runtime.getRuntime().availableProcessors(); // 当前数组最大的下标(多处理器情况下) static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; // 自旋次数,CPU核心为1个时,自旋被禁用 private static final int SPINS = 1 << 10; // 空对象,用于当线程exchange方法中参数为null时传递给其他线程的对象 private static final Object NULL_ITEM = new Object(); // 用于超时时传递的对象 private static final Object TIMED_OUT = new Object(); // Participant 继承了ThreadLocal,也就是说该对象用于获取每条线程中存放的值 private final Participant participant; // 多个线程交换 private volatile Node[] arena; // 用于2个线程交换 private volatile Node slot; // 该值主要用于与 private volatile int bound; // 通过unsafe用于CAS操作 private static final sun.misc.Unsafe U; private static final long BOUND; private static final long SLOT; private static final long MATCH; private static final long BLOCKER; private static final int ABASE; static { int s; try { U = sun.misc.Unsafe.getUnsafe(); Class<?> ek = Exchanger.class; Class<?> nk = Node.class; Class<?> ak = Node[].class; Class<?> tk = Thread.class; // bound属性在Exchanger对象中的偏移地址 BOUND = U.objectFieldOffset (ek.getDeclaredField("bound")); // slot属性在Exchanger对象中的偏移地址 SLOT = U.objectFieldOffset (ek.getDeclaredField("slot")); // slot属性在Node对象中的偏移地址 MATCH = U.objectFieldOffset (nk.getDeclaredField("match")); // parkBlocker属性在Thread对象中的偏移地址 BLOCKER = U.objectFieldOffset (tk.getDeclaredField("parkBlocker")); // 获取Node[]数组中每个元素的大小,这里是4 s = U.arrayIndexScale(ak); // ABASE absorbs padding in front of element 0 // 获取Node[]数组中第一个元素的偏移地址 + 128 ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT); } catch (Exception e) { throw new Error(e); } if ((s & (s-1)) != 0 || s > (1 << ASHIFT)) // 这里是为了保证 Node数组中的元素不会争用一个缓存行 throw new Error("Unsupported array scale"); }
构造器及内部类
public Exchanger() { participant = new Participant(); } // 内部类,用于记录每个线程的状态 static final class Participant extends ThreadLocal<Node> { public Node initialValue() { return new Node(); } } // 包含需要交换的数据等信息 // Contended为 JDK8 新增的注解,用于避免伪共享,提高程序性能 @sun.misc.Contended static final class Node { // arana数组中的下标 int index; // 上一次记录的bound int bound; // cas操作失败的次数 int collides; // 用于自旋的伪随机数 int hash; // Pseudo-random for spins // 当前线程需要交换的数据 Object item; // This thread's current item // 匹配线程交换的数据 volatile Object match; // Item provided by releasing thread // 记录当前挂起的线程 volatile Thread parked; // Set to this thread when parked, else null }
方法exchange
// 交换数据,参数X为本线程提供给其它线程的数据 public V exchange(V x) throws InterruptedException { Object v; // 当参数为null时需要将item设置为空的对象 Object item = (x == null) ? NULL_ITEM : x; // translate null args // 注意到这里的这个表达式是整个方法的核心 if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || // disambiguates null return (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); return (v == NULL_ITEM) ? null : (V)v; }
仔细看if里的条件表达式,得知: 只有当arena为null时,才会执行slotExchange方法;
当arena不为null或者(arena为null且slotExchange方法返回null)时,此时线程未中断,才会执行arenaExchange方法;
线程中断时,就会直接抛出线程中断异常。
下面我们来看slotExchange方法
1 // timed 为true表示设置了超时时间,ns为>0的值,反之没有设置超时时间 2 private final Object slotExchange(Object item, boolean timed, long ns) { 3 // 获取当前线程node对象 4 Node p = participant.get(); 5 Thread t = Thread.currentThread(); 6 if (t.isInterrupted()) // preserve interrupt status so caller can recheck 7 // 线程中断返回null 8 return null; 9 10 // 自旋 11 for (Node q;;) { 12 // 将slot值赋给q 13 if ((q = slot) != null) { 14 // slot 不为null,即表示已有线程已经把需要交换的数据设置在slot中了 15 // 通过CAS将slot设置成null 16 if (U.compareAndSwapObject(this, SLOT, q, null)) { 17 // CAS操作成功后,将slot中的item赋值给对象v,以便返回。 18 // 这里也是就读取之前线程要交换的数据 19 Object v = q.item; 20 // 将当前线程需要交给的数据设置在q中的match 21 q.match = item; 22 // 获取被挂起的线程 23 Thread w = q.parked; 24 if (w != null) 25 // 如果线程不为null,唤醒它 26 U.unpark(w); 27 // 返回其他线程给的V 28 return v; 29 } 30 // CAS 操作失败,表示有其它线程竞争,在此线程之前将数据已取走 31 // create arena on contention, but continue until slot null 32 if (NCPU > 1 && bound == 0 && 33 U.compareAndSwapInt(this, BOUND, 0, SEQ)) 34 // CPU为多核心 35 // bound == 0 表示arena数组未初始化过,CAS操作bound将其增加SEQ 36 // 初始化arena数组 37 arena = new Node[(FULL + 2) << ASHIFT]; 38 } 39 // 上面分析过,只有当arena才会执行slotExchange方法的 40 // 所以表示刚好已有其它线程加入进来将arena初始化 41 else if (arena != null) 42 // 这里就需要去执行arenaExchange 43 return null; // caller must reroute to arenaExchange 44 else { 45 // 这里表示当前线程是以第一个线程进来交换数据 46 // 或者表示之前的数据交换已进行完毕,这里可以看作是第一个线程 47 // 将需要交换的数据先存放在当前线程变量p中 48 p.item = item; 49 // 将需要交换的数据通过CAS设置到交换区slot 50 if (U.compareAndSwapObject(this, SLOT, null, p)) 51 // 交换成功后跳出自旋 52 break; 53 // CAS操作失败,表示有其它线程刚好先于当前线程将数据设置到交换区slot 54 // 将当前线程变量中的item设置为null,然后自旋获取其它线程存放在交换区slot的数据 55 p.item = null; 56 } 57 } 58 // 执行到这里表示当前线程已将需要的交换的数据放置于交换区slot中了, 59 // 等待其它线程交换数据然后唤醒当前线程 60 // await release 61 int h = p.hash; 62 long end = timed ? System.nanoTime() + ns : 0L; 63 // 自旋次数 64 int spins = (NCPU > 1) ? SPINS : 1; 65 Object v; 66 // 自旋等待直到p.match不为null,也就是说等待其它线程将需要交换的数据放置于交换区slot 67 while ((v = p.match) == null) { 68 // 下面的逻辑主要是自旋等待,直到spins递减到0为止 69 if (spins > 0) { 70 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; 71 if (h == 0) 72 h = SPINS | (int)t.getId(); 73 else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) 74 Thread.yield(); 75 } 76 // slot 和 p本应该是相等,除非其它线程执行了第16行代码中的CAS操作将slot置为null, 77 // 还未来得及设置match的值,此时只需要自旋等待第21行代码被其它线程执行, 78 // 这样p.match才会不为null跳出循环 79 else if (slot != p) 80 spins = SPINS; 81 // 此处表示未设置超时或者时间未超时 82 else if (!t.isInterrupted() && arena == null && 83 (!timed || (ns = end - System.nanoTime()) > 0L)) { 84 // 设置线程t被当前对象阻塞 85 U.putObject(t, BLOCKER, this); 86 // 给p挂机线程的值赋值 87 p.parked = t; 88 if (slot == p) 89 // 如果slot还没有被置为null,也就表示暂未有线程过来交换数据,需要将当前线程挂起 90 U.park(false, ns); 91 // 线程被唤醒,将被挂起的线程设置为null 92 p.parked = null; 93 // 设置线程t未被任何对象阻塞 94 U.putObject(t, BLOCKER, null); 95 } 96 // 不是以上条件时(可能是arena已不为null或者超时) 97 else if (U.compareAndSwapObject(this, SLOT, p, null)) { 98 // arena不为null则v为null,其它为超时则v为超市对象TIMED_OUT,并且跳出循环 99 v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; 100 break; 101 } 102 } 103 // 取走match值,并将p中的match置为null 104 U.putOrderedObject(p, MATCH, null); 105 // 设置item为null 106 p.item = null; 107 p.hash = h; 108 // 返回交换值 109 return v; 110 }
再来看arenaExchange方法,此方法被执行时表示多个线程进入交换区交换数据,arena数组已被初始化,此方法中的一些处理方式和slotExchange比较类似,它是通过遍历arena数组找到需要交换的数据
// timed 为true表示设置了超时时间,ns为>0的值,反之没有设置超时时间 private final Object arenaExchange(Object item, boolean timed, long ns) { Node[] a = arena; // 获取当前线程中的存放的node Node p = participant.get(); //index初始值0 for (int i = p.index;;) { // access slot at i // 遍历,如果在数组中找到数据则直接交换并唤醒线程, // 如未找到则将需要交换给其它线程的数据放置于数组中 int b, m, c; long j; // j is raw array offset // 其实这里就是向右遍历数组,只是用到了元素在内存偏移的偏移量 // q实际为arena数组偏移(i + 1) * 128个地址位上的node Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); // 如果q不为null,并且CAS操作成功,将下标j的元素置为null if (q != null && U.compareAndSwapObject(a, j, q, null)) { // 表示当前线程已发现有交换的数据,然后获取数据,唤醒等待的线程 Object v = q.item; // release q.match = item; Thread w = q.parked; if (w != null) U.unpark(w); return v; } // q 为null 并且 i 未超过数组边界 else if (i <= (m = (b = bound) & MMASK) && q == null) { // 将需要给其它线程的item赋予给p中的item p.item = item; // offer if (U.compareAndSwapObject(a, j, null, p)) { // 交换成功 long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; Thread t = Thread.currentThread(); // wait // 自旋直到有其它线程进入,遍历到该元素并与其交换,同时当前线程被唤醒 for (int h = p.hash, spins = SPINS;;) { Object v = p.match; if (v != null) { // 其它线程设置的需要交换的数据match不为null // 将match设置null,item设置为null U.putOrderedObject(p, MATCH, null); p.item = null; // clear for next use p.hash = h; return v; } else if (spins > 0) { // 递减自旋次数 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift if (h == 0) // initialize hash h = SPINS | (int)t.getId(); else if (h < 0 && // approx 50% true (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // two yields per wait } else if (U.getObjectVolatile(a, j) != p) // 和slotExchange方法中的类似,arena数组中的数据已被CAS设置 // match值还未设置,让其再自旋会等待match被设置 spins = SPINS; // releaser hasn't set match yet else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) { // 设置线程t被当前对象阻塞 U.putObject(t, BLOCKER, this); // emulate LockSupport // 线程t赋值 p.parked = t; // minimize window if (U.getObjectVolatile(a, j) == p) // 数组中对象还相等,表示线程还未被唤醒,唤醒线程 U.park(false, ns); p.parked = null; // 设置线程t未被任何对象阻塞 U.putObject(t, BLOCKER, null); } else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) { // 这里给bound增加加一个SEQ if (m != 0) // try to shrink U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); p.item = null; p.hash = h; i = p.index >>>= 1; // descend if (Thread.interrupted()) return null; if (timed && m == 0 && ns <= 0L) return TIMED_OUT; break; // expired; restart } } } else // 交换失败,表示有其它线程更改了arena数组中下标i的元素 p.item = null; // clear offer } else { // 此时表示下标不在bound & MMASK或q不为null但CAS操作失败 // 需要更新bound变化后的值 if (p.bound != b) { // stale; reset p.bound = b; p.collides = 0; // 反向遍历 i = (i != m || m == 0) ? m : m - 1; } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { // 记录CAS失败的次数 p.collides = c + 1; // 循环遍历 i = (i == 0) ? m : i - 1; // cyclically traverse } else // 此时表示bound值增加了SEQ+1 i = m + 1; // grow // 设置下标 p.index = i; } } }
总结
读到这里是不是还是感觉有很多疑问?
- 先看为什么
ASHIFT
设置成7,这是为了尽量避免slot数组中不同的元素在同一个缓存行上,<< ASHIFT
左移7位,表示至少移动了128地址位,而我们主流的缓存行大小一般为32字节到256字节,所以128个地址位基本覆盖到了常见的处理器平台。arena数组中元素的分布如图,它们之间间隔128个整数倍地址位,也就是说最小相差128个地址位。 - 为什么Node类用
@sun.misc.Contended
注解呢?该注解是jdk8新增的注解,是为了解决之前手动填充数据的问题。填充数据也是为了避免arena数组中的不同的元素共享同一个缓存行,导致多线程修改数据时性能受到影响。