原文连接:SynchronousQueue原理解析
源码解析
1、SynchronousQueue.java
1 public class SynchronousQueue<E> extends AbstractQueue<E> 2 implements BlockingQueue<E>, java.io.Serializable { 3 4 //Transferer是一个抽象类,SynchronousQueue内部有2个Transferer的子类,分别是TransferQueue和TransferStack 5 // 6 private transient volatile Transferer<E> transferer; 7 8 //默认构造方法的线程等待队列是不保证顺序的 9 public SynchronousQueue() { 10 this(false); 11 } 12 13 //如果fair为true,那SynchronousQueue所采用的是能保证先进先出的TransferQueue,也就是先被挂起的线程会先返回 14 public SynchronousQueue(boolean fair) { 15 transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); 16 } 17 18 //向SynchronousQueue中添加数据,如果此时线程队列中没有获取数据的线程的话,当前的线程就会挂起等待 19 public void put(E e) throws InterruptedException { 20 //添加的数据不能是null 21 if (e == null) throw new NullPointerException(); 22 //可以看到添加的方法调用的是transfer方法,如果添加失败会抛出InterruptedException异常 23 //后面我们可以在transfer方法的源码中调用put方法添加数据在当前线程被中断时才会返回null 24 //这里相当于继续把线程中断的InterruptedException向上抛出 25 if (transferer.transfer(e, false, 0) == null) { 26 Thread.interrupted(); 27 throw new InterruptedException(); 28 } 29 } 30 31 //不带超时时间的offer方法,如果此时没有线程正在等待获取数据的话transfer就会返回null,也就是添加数据失败 32 public boolean offer(E e) { 33 if (e == null) throw new NullPointerException(); 34 return transferer.transfer(e, true, 0) != null; 35 } 36 37 //带超时时间的offer方法,与上面的不同的是这个方法会等待一个超时时间,如果时间过了还没有线程来获取数据就会返回失败 38 public boolean offer(E e, long timeout, TimeUnit unit) 39 throws InterruptedException { 40 if (e == null) throw new NullPointerException(); 41 //添加的数据被其他线程成功获取,返回成功 42 if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) 43 return true; 44 //如果添加数据失败了,有可能是线程被中断了,不是的话直接返回false 45 if (!Thread.interrupted()) 46 return false; 47 //是线程被中断的话就向上跑出InterruptedException异常 48 throw new InterruptedException(); 49 } 50 51 //take方法用于从队列中取数据,如果此时没有添加数据的线程被挂起,那当前线程就会被挂起等待 52 public E take() throws InterruptedException { 53 E e = transferer.transfer(null, false, 0); 54 //成功获取数据 55 if (e != null) 56 return e; 57 //没有获取到数据,同时又退出挂起状态了,那说明线程被中断了,向上抛出InterruptedException 58 Thread.interrupted(); 59 throw new InterruptedException(); 60 } 61 62 //poll方法同样用于获取数据 63 public E poll() { 64 return transferer.transfer(null, true, 0); 65 } 66 67 //带超时时间的poll方法,如果超时时间到了还没有线程插入数据,就会返回失败 68 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 69 E e = transferer.transfer(null, true, unit.toNanos(timeout)); 70 //返回结果有2种情况 71 //e != null表示成功取到数据了 72 //!Thread.interrupted()表示返回失败了,且是因为超时失败的,此时e是null 73 if (e != null || !Thread.interrupted()) 74 return e; 75 //返回失败了,并且是因为当前线程被中断了 76 throw new InterruptedException(); 77 } 78 79 //可以看到SynchronousQueue的isEmpty方法一直返回的是true,因为SynchronousQueue没有任何容量 80 public boolean isEmpty() { 81 return true; 82 } 83 84 //同样的size方法也返回0 85 public int size() { 86 return 0; 87 } 88 89 <!--下面我们看看TransferQueue的具体实现,TransferQueue中的关键方法就是transfer方法了--> 90 91 //先看看TransferQueue的父类Transferer,比较简单,就是提供了一个transfer方法,需要子类具体实现 92 abstract static class Transferer<E> { 93 abstract E transfer(E e, boolean timed, long nanos); 94 } 95 96 //TransferQueue 97 static final class TransferQueue<E> extends Transferer<E> { 98 99 //内部的节点类,用于表示一个请求 100 //这里可以看出TransferQueue内部是一个单链表,因此可以保证先进先出 101 static final class QNode { 102 volatile QNode next; // next node in queue 103 volatile Object item; // CAS'ed to or from null 104 //请求所在的线程 105 volatile Thread waiter; // to control park/unpark 106 //用于判断是入队还是出队,true表示的是入队操作,也就是添加数据 107 final boolean isData; 108 109 QNode(Object item, boolean isData) { 110 this.item = item; 111 this.isData = isData; 112 } 113 114 //可以看到QNode内部通过volatile关键字以及Unsafe类的CAS方法来实现线程安全 115 //compareAndSwapObject方法第一个参数表示需要改变的对象,第二个参数表示偏移量 116 //第三个参数表示参数期待的值,第四个参数表示更新后的值 117 //下面的方法调用的意思是将当前的QNode对象(this)的next字段赋值为val,当目前的next的值是cmp时就会更新next字段成功 118 boolean casNext(QNode cmp, QNode val) { 119 return next == cmp && 120 U.compareAndSwapObject(this, NEXT, cmp, val); 121 } 122 123 //方法的原理同上面的类似,这里就是更新item的值了 124 boolean casItem(Object cmp, Object val) { 125 return item == cmp && 126 U.compareAndSwapObject(this, ITEM, cmp, val); 127 } 128 129 //方法的原理同上面的类似,这里把item赋值为自己,就表示取消当前节点表示的操作了 130 void tryCancel(Object cmp) { 131 U.compareAndSwapObject(this, ITEM, cmp, this); 132 } 133 134 //调用tryCancel方法后item就会是this,就表示当前任务被取消了 135 boolean isCancelled() { 136 return item == this; 137 } 138 139 //表示当前任务已经被返回了 140 boolean isOffList() { 141 return next == this; 142 } 143 144 // Unsafe mechanics 145 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); 146 private static final long ITEM; 147 private static final long NEXT; 148 149 static { 150 try { 151 ITEM = U.objectFieldOffset 152 (QNode.class.getDeclaredField("item")); 153 NEXT = U.objectFieldOffset 154 (QNode.class.getDeclaredField("next")); 155 } catch (ReflectiveOperationException e) { 156 throw new Error(e); 157 } 158 } 159 } 160 161 //首节点 162 transient volatile QNode head; 163 //尾部节点 164 transient volatile QNode tail; 165 /** 166 * Reference to a cancelled node that might not yet have been 167 * unlinked from queue because it was the last inserted node 168 * when it was cancelled. 169 */ 170 transient volatile QNode cleanMe; 171 172 //构造函数中会初始化一个出队的节点,并且首尾都指向这个节点 173 TransferQueue() { 174 QNode h = new QNode(null, false); // initialize to dummy node. 175 head = h; 176 tail = h; 177 } 178 179 //transfer方法用于提交数据或者是获取数据 180 @SuppressWarnings("unchecked") 181 E transfer(E e, boolean timed, long nanos) { 182 QNode s = null; // constructed/reused as needed 183 //如果e不为null,就说明是添加数据的入队操作 184 boolean isData = (e != null); 185 186 for (;;) { 187 QNode t = tail; 188 QNode h = head; 189 if (t == null || h == null) // saw uninitialized value 190 continue; // spin 191 192 //当队列为空的时候或者新加的操作和队尾的操作是同一个操作,可能都是入队操作也可能是出队操作,说明当前没有反向操作的线程空闲 193 if (h == t || t.isData == isData) { // empty or same-mode 194 QNode tn = t.next; 195 //这是一个检查,确保t指向队尾 196 if (t != tail) // inconsistent read 197 continue; 198 //tn不为null,说明t不是尾部节点,就执行advanceTail操作,将tn作为尾部节点,继续循环 199 if (tn != null) { // lagging tail 200 advanceTail(t, tn); 201 continue; 202 } 203 //如果timed为true,表示带有超时参数,等待超时期间没有其他相反操作的线程提交就会直接返回null 204 //这里如果nanos初始值就是0,比如不带超时时间的offer和poll方法,当队尾的节点不是相反操作时就会直接返回null 205 if (timed && nanos <= 0L) // can't wait 206 return null; 207 //如果没有超时时间或者超时时间不为0的话就创建新的节点 208 if (s == null) 209 s = new QNode(e, isData); 210 //使tail的next指向新的节点 211 if (!t.casNext(null, s)) // failed to link in 212 continue; 213 //更新TransferQueue的tail指向新的节点,这样tail节点就始终是尾部节点 214 advanceTail(t, s); // swing tail and wait 215 //如果当前操作是带超时时间的,则进行超时等待,否则就挂起线程,直到有新的反向操作提交 216 Object x = awaitFulfill(s, e, timed, nanos); 217 //当挂起的线程被中断或是超时时间已经过了,awaitFulfill方法就会返回当前节点,这样就会有x == s为true 218 if (x == s) { // wait was cancelled 219 //将队尾节点移出,并重新更新尾部节点,返回null,就是入队或是出队操作失败了 220 clean(t, s); 221 return null; 222 } 223 224 //如果s还没有被 225 if (!s.isOffList()) { // not already unlinked 226 advanceHead(t, s); // unlink if head 227 if (x != null) // and forget fields 228 s.item = s; 229 s.waiter = null; 230 } 231 return (x != null) ? (E)x : e; 232 233 } 234 //提交操作的时候刚刚好有反向的操作在等待 235 else { // complementary-mode 236 QNode m = h.next; // node to fulfill 237 if (t != tail || m == null || h != head) 238 continue; // inconsistent read 239 240 Object x = m.item; 241 //这里先判断m是否是有效的操作 242 if (isData == (x != null) || // m already fulfilled 243 x == m || // m cancelled 244 !m.casItem(x, e)) { // lost CAS 245 advanceHead(h, m); // dequeue and retry 246 continue; 247 } 248 249 //更新头部节点 250 advanceHead(h, m); // successfully fulfilled 251 //唤醒m节点的被挂起的线程 252 LockSupport.unpark(m.waiter); 253 //返回的结果用于给对应的操作,如take、offer等判断是否执行操作成功 254 return (x != null) ? (E)x : e; 255 } 256 } 257 } 258 259 <!--下面看看执行挂起线程的方法awaitFulfill--> 260 Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { 261 /* Same idea as TransferStack.awaitFulfill */ 262 //首先获取超时时间 263 final long deadline = timed ? System.nanoTime() + nanos : 0L; 264 //当前操作所在的线程 265 Thread w = Thread.currentThread(); 266 //线程被挂起或是进入超时等待之前阻止自旋的次数 267 int spins = (head.next == s) 268 ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS) 269 : 0; 270 for (;;) { 271 //这里首先判断线程是否被中断了,如果被中断了就取消等待,并设置s的item指向s本身作为标记 272 if (w.isInterrupted()) 273 s.tryCancel(e); 274 Object x = s.item; 275 //x != e就表示超时时间到了或是线程被中断了,也就是执行了tryCancel方法 276 if (x != e) 277 return x; 278 //这里先判断超时的时间是否过了 279 if (timed) { 280 nanos = deadline - System.nanoTime(); 281 if (nanos <= 0L) { 282 s.tryCancel(e); 283 continue; 284 } 285 } 286 //这里通过多几次循环来避免直接挂起线程 287 if (spins > 0) 288 --spins; 289 else if (s.waiter == null) 290 s.waiter = w; 291 else if (!timed) 292 //park操作会让线程挂起进入等待状态(Waiting),需要其他线程调用unpark方法唤醒 293 LockSupport.park(this); 294 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD) 295 //parkNanos操作会让线程挂起进入限期等待(Timed Waiting),不用其他线程唤醒,时间到了会被系统唤醒 296 LockSupport.parkNanos(this, nanos); 297 } 298 } 299 300 } 301 }
2、构造函数
1 public SynchronousQueue() { 2 this(false); 3 } 4 5 public SynchronousQueue(boolean fair) { 6 // 通过 fair 值来决定公平性和非公平性 7 // 公平性使用TransferQueue,非公平性采用TransferStack 8 transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); 9 }
3、公平模式 TransferQueue
1 /** 2 * 这是一个非常典型的 queue , 它有如下的特点 3 * 1. 整个队列有 head, tail 两个节点 4 * 2. 队列初始化时会有个 dummy 节点 5 * 3. 这个队列的头节点是个 dummy 节点/ 或 哨兵节点, 所以操作的总是队列中的第二个节点(AQS的设计中也是这也) 6 */ 7 8 /** 头节点 */ 9 transient volatile QNode head; 10 /** 尾节点 */ 11 transient volatile QNode tail; 12 /** 13 * Reference to a cancelled node that might not yet have been 14 * unlinked from queue because it was last inserted node 15 * when it was cancelled 16 */ 17 /** 18 * 对应 中断或超时的 前继节点,这个节点存在的意义是标记, 它的下个节点要删除 19 * 何时使用: 20 * 当你要删除 节点 node, 若节点 node 是队列的末尾, 则开始用这个节点, 21 * 为什么呢? 22 * 大家知道 删除一个节点 直接 A.CASNext(B, B.next) 就可以,但是当 节点 B 是整个队列中的末尾元素时, 23 * 一个线程删除节点B, 一个线程在节点B之后插入节点 这样操作容易致使插入的节点丢失, 这个cleanMe很像 24 * ConcurrentSkipListMap 中的 删除添加的 marker 节点, 他们都是起着相同的作用 25 */ 26 transient volatile QNode cleanMe; 27 28 TransferQueue(){ 29 /** 30 * 构造一个 dummy node, 而整个 queue 中永远会存在这样一个 dummy node 31 * dummy node 的存在使得 代码中不存在复杂的 if 条件判断 32 */ 33 QNode h = new QNode(null, false); 34 head = h; 35 tail = h; 36 } 37 38 /** 39 * 推进 head 节点,将 老节点的 oldNode.next = this, help gc, 40 * 这种和 ConcurrentLinkedQueue 中一样 41 */ 42 void advanceHead(QNode h, QNode nh){ 43 if(h == head && unsafe.compareAndSwapObject(this, headOffset, h, nh)){ 44 h.next = h; // forget old next help gc 45 } 46 } 47 48 /** 更新新的 tail 节点 */ 49 void advanceTail(QNode t, QNode nt){ 50 if(tail == t){ 51 unsafe.compareAndSwapObject(this, tailOffset, t, nt); 52 } 53 } 54 55 /** CAS 设置 cleamMe 节点 */ 56 boolean casCleanMe(QNode cmp, QNode val){ 57 return cleanMe == cmp && unsafe.compareAndSwapObject(this, cleanMeOffset, cmp, val); 58 }
4、公平模式 TransferQueue中定义了QNode类
1 static final class QNode { 2 // next 域 3 volatile QNode next; 4 // item数据项 5 volatile Object item; 6 // 等待线程,用于park/unpark 7 volatile Thread waiter; // to control park/unpark 8 //模式,表示当前是数据还是请求,只有当匹配的模式相匹配时才会交换 9 final boolean isData; 10 11 QNode(Object item, boolean isData) { 12 this.item = item; 13 this.isData = isData; 14 } 15 ...
5、公平模式 TransferQueue transfer() 方法
1 /** 2 * Puts or takes an item 3 * 主方法 4 * 5 * @param e if non-null, the item to be handed to a consumer; 6 * if null, requests that transfer return an item 7 * offered by producer. 8 * @param timed if this operation should timeout 9 * @param nanos the timeout, in nanosecond 10 * @return 11 */ 12 @Override 13 E transfer(E e, boolean timed, long nanos) { 14 /** 15 * Basic algorithm is to loop trying to take either of 16 * two actions: 17 * 18 * 1. If queue apparently empty or holding same-mode nodes, 19 * try to add node to queue of waiters, wait to be 20 * fulfilled (or cancelled) and return matching item. 21 * 22 * 2. If queue apparently contains waiting items, and this 23 * call is of complementary mode, try to fulfill by CAS'ing 24 * item field of waiting node and dequeuing it, and then 25 * returning matching item. 26 * 27 * In each case, along the way, check for gurading against 28 * seeing uninitialized head or tail value. This never 29 * happens in current SynchronousQueue, but could if 30 * callers held non-volatile/final ref to the 31 * transferer. The check is here anyway because it places 32 * null checks at top of loop, which is usually faster 33 * than having them implicity interspersed 34 * 35 * 这个 producer / consumer 的主方法, 主要分为两种情况 36 * 37 * 1. 若队列为空 / 队列中的尾节点和自己的 类型相同, 则添加 node 38 * 到队列中, 直到 timeout/interrupt/其他线程和这个线程匹配 39 * timeout/interrupt awaitFulfill方法返回的是 node 本身 40 * 匹配成功的话, 要么返回 null (producer返回的), 或正真的传递值 (consumer 返回的) 41 * 42 * 2. 队列不为空, 且队列的 head.next 节点是当前节点匹配的节点, 43 * 进行数据的传递匹配, 并且通过 advanceHead 方法帮助 先前 block 的节点 dequeue 44 */ 45 QNode s = null; // constrcuted/reused as needed 46 boolean isData = (e != null); // 1.判断 e != null 用于区分 producer 与 consumer 47 48 for(;;){ 49 QNode t = tail; 50 QNode h = head; 51 if(t == null || h == null){ // 2. 数据未初始化, continue 重来 52 continue; // spin 53 } 54 if(h == t || t.isData == isData){ // 3. 队列为空, 或队列尾节点和自己相同 (注意这里是和尾节点比价, 下面进行匹配时是和 head.next 进行比较) 55 QNode tn = t.next; 56 if(t != tail){ // 4. tail 改变了, 重新再来 57 continue; 58 } 59 if(tn != null){ // 5. 其他线程添加了 tail.next, 所以帮助推进 tail 60 advanceTail(t, tn); 61 continue; 62 } 63 if(timed && nanos <= 0){ // 6. 调用的方法的 wait 类型的, 并且 超时了, 直接返回 null, 直接见 SynchronousQueue.poll() 方法,说明此 poll 的调用只有当前队列中正好有一个与之匹配的线程在等待被【匹配才有返回值 64 return null; 65 } 66 if(s == null){ 67 s = new QNode(e, isData); // 7. 构建节点 QNode 68 } 69 if(!t.casNext(null, s)){ // 8. 将 新建的节点加入到 队列中 70 continue; 71 } 72 73 advanceTail(t, s); // 9. 帮助推进 tail 节点 74 Object x = awaitFulfill(s, e, timed, nanos); // 10. 调用awaitFulfill, 若节点是 head.next, 则进行一些自旋, 若不是的话, 直接 block, 知道有其他线程 与之匹配, 或它自己进行线程的中断 75 if(x == s){ // 11. 若 (x == s)节点s 对应额线程 wait 超时 或线程中断, 不然的话 x == null (s 是 producer) 或 是正真的传递值(s 是 consumer) 76 clean(t, s); // 12. 对接点 s 进行清除, 若 s 不是链表的最后一个节点, 则直接 CAS 进行 节点的删除, 若 s 是链表的最后一个节点, 则 要么清除以前的 cleamMe 节点(cleamMe != null), 然后将 s.prev 设置为 cleanMe 节点, 下次进行删除 或直接将 s.prev 设置为cleanMe 77 return null; 78 } 79 80 if(!s.isOffList()){ // 13. 节点 s 没有 offlist 81 advanceHead(t, s); // 14. 推进head 节点, 下次就调用 s.next 节点进行匹配(这里调用的是 advanceHead, 因为代码能执行到这边说明s已经是 head.next 节点了) 82 if(x != null){ // and forget fields 83 s.item = s; 84 } 85 s.waiter = null; // 15. 释放线程 ref 86 } 87 88 return (x != null) ? (E)x :e; 89 90 }else{ // 16. 进行线程的匹配操作, 匹配操作是从 head.next 开始匹配 (注意 队列刚开始构建时 有个 dummy node, 而且 head 节点永远是个 dummy node 这个和 AQS 中一样的) 91 QNode m = h.next; // 17. 获取 head.next 准备开始匹配 92 if(t != tail || m == null || h != head){ 93 continue; // 18. 不一致读取, 有其他线程改变了队列的结构inconsistent read 94 } 95 96 /** producer 和 consumer 匹配操作 97 * 1. 获取 m的 item (注意这里的m是head的next节点 98 * 2. 判断 isData 与x的模式是否匹配, 只有produce与consumer才能配成一对 99 * 3. x == m 判断是否 节点m 是否已经进行取消了, 具体看(QNOde#tryCancel) 100 * 4. m.casItem 将producer与consumer的数据进行交换 (这里存在并发时可能cas操作失败的情况) 101 * 5. 若 cas操作成功则将h节点dequeue 102 * 103 * 疑惑: 为什么将h进行 dequeue, 而不是 m节点 104 * 答案: 因为每次进行配对时, 都是将 h 是个 dummy node, 正真的数据节点 是 head.next 105 */ 106 Object x = m.item; 107 if(isData == (x != null) || // 19. 两者的模式是否匹配 (因为并发环境下 有可能其他的线程强走了匹配的节点) 108 x == m || // 20. m 节点 线程中断或者 wait 超时了 109 !m.casItem(x, e) // 21. 进行 CAS 操作 更改等待线程的 item 值(等待的有可能是 concumer / producer) 110 ){ 111 advanceHead(h, m); // 22.推进 head 节点 重试 (尤其 21 操作失败) 112 continue; 113 } 114 115 advanceHead(h, m); // 23. producer consumer 交换数据成功, 推进 head 节点 116 LockSupport.unpark(m.waiter); // 24. 换线等待中的 m 节点, 而在 awaitFulfill 方法中 因为 item 改变了, 所以 x != e 成立, 返回 117 return (x != null) ? (E)x : e; // 25. 操作到这里若是 producer, 则 x != null, 返回 x, 若是consumer, 则 x == null,.返回 producer(其实就是 节点m) 的 e 118 } 119 } 120 121 }
6、公平模式 TransferQueueawaitFulfill
() 方法
1 // 自旋或者等待,直到填充完毕 2 // 这里的策略是什么呢?如果自旋次数不够了,通常是 16 次,但还有超过 1 秒的时间,就阻塞等待被唤醒。 3 // 如果时间到了,就取消这次的入队行为。 4 // 返回的是 Node 本身 5 // s.item 就是 e 6 Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { 7 final long deadline = timed ? System.nanoTime() + nanos : 0L; 8 Thread w = Thread.currentThread(); 9 int spins = ((head.next == s) ?// 如果成功将 tail.next 覆盖了 tail,如果有超时机制,则自旋 32 次,如果没有超时机制,则自旋 32 *16 = 512次 10 (timed ? maxTimedSpins : maxUntimedSpins) : 0); 11 for (;;) { 12 if (w.isInterrupted())// 当前线程被中断 13 s.tryCancel(e);// 尝试取消这个 item 14 Object x = s.item;// 获取到这个 tail 的 item 15 if (x != e) // 如果不相等,说明 node 中的 item 取消了,返回这个 item。 16 // 这里是唯一停止循环的地方。当 s.item 已经不是当初的哪个 e 了,说明要么是时间到了被取消了,要么是线程中断被取消了。 17 // 当然,不仅仅只有这2种 “意外” 情况,还有一种情况是:当另一个线程拿走了这个数据,并修改了 item,也会通过这个判断,返回被“修改”过的 item。 18 return x; 19 if (timed) {// 如果有时间限制 20 nanos = deadline - System.nanoTime(); 21 if (nanos <= 0L) {// 如果时间到了 22 s.tryCancel(e);// 尝试取消 item,供上面的 x != e 判断 23 continue;// 重来 24 } 25 } 26 if (spins > 0)// 如果还有自旋次数 27 --spins;// 减一 28 else if (s.waiter == null)// 如果自旋不够,且 tail 的等待线程还没有赋值 29 s.waiter = w;// 当前线程赋值给 tail 的等待线程 30 else if (!timed)// 如果自旋不够,且如果线程赋值过了,且没有限制时间,则 wait,(危险操作) 31 LockSupport.park(this); 32 else if (nanos > spinForTimeoutThreshold)// 如果自旋不够,且如果限制了时间,且时间还剩余超过 1 秒,则 wait 剩余时间。 33 // 主要目的就是等待,等待其他线程唤醒这个节点所在的线程。 34 LockSupport.parkNanos(this, nanos); 35 } 36 }
====
View Code
链接:https://www.jianshu.com/p/af6f83c78506