zoukankan      html  css  js  c++  java
  • 【Java多线程】SynchronousQueue源码分析(二十四)

    原文连接:SynchronousQueue原理解析

    源码解析

    1、SynchronousQueue.java

      1 public class SynchronousQueue<E> extends AbstractQueue<E>
      2     implements BlockingQueue<E>, java.io.Serializable {
      3     
      4     //Transferer是一个抽象类,SynchronousQueue内部有2个Transferer的子类,分别是TransferQueue和TransferStack
      5     //
      6     private transient volatile Transferer<E> transferer;
      7 
      8     //默认构造方法的线程等待队列是不保证顺序的
      9     public SynchronousQueue() {
     10         this(false);
     11     }
     12 
     13     //如果fair为true,那SynchronousQueue所采用的是能保证先进先出的TransferQueue,也就是先被挂起的线程会先返回
     14     public SynchronousQueue(boolean fair) {
     15         transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
     16     }
     17     
     18     //向SynchronousQueue中添加数据,如果此时线程队列中没有获取数据的线程的话,当前的线程就会挂起等待
     19     public void put(E e) throws InterruptedException {
     20         //添加的数据不能是null
     21         if (e == null) throw new NullPointerException();
     22         //可以看到添加的方法调用的是transfer方法,如果添加失败会抛出InterruptedException异常
     23         //后面我们可以在transfer方法的源码中调用put方法添加数据在当前线程被中断时才会返回null
     24         //这里相当于继续把线程中断的InterruptedException向上抛出
     25         if (transferer.transfer(e, false, 0) == null) {
     26             Thread.interrupted();
     27             throw new InterruptedException();
     28         }
     29     }
     30     
     31     //不带超时时间的offer方法,如果此时没有线程正在等待获取数据的话transfer就会返回null,也就是添加数据失败
     32     public boolean offer(E e) {
     33         if (e == null) throw new NullPointerException();
     34         return transferer.transfer(e, true, 0) != null;
     35     }
     36     
     37     //带超时时间的offer方法,与上面的不同的是这个方法会等待一个超时时间,如果时间过了还没有线程来获取数据就会返回失败
     38     public boolean offer(E e, long timeout, TimeUnit unit)
     39         throws InterruptedException {
     40         if (e == null) throw new NullPointerException();
     41         //添加的数据被其他线程成功获取,返回成功
     42         if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
     43             return true;
     44         //如果添加数据失败了,有可能是线程被中断了,不是的话直接返回false
     45         if (!Thread.interrupted())
     46             return false;
     47         //是线程被中断的话就向上跑出InterruptedException异常
     48         throw new InterruptedException();
     49     }
     50     
     51     //take方法用于从队列中取数据,如果此时没有添加数据的线程被挂起,那当前线程就会被挂起等待
     52     public E take() throws InterruptedException {
     53         E e = transferer.transfer(null, false, 0);
     54         //成功获取数据
     55         if (e != null)
     56             return e;
     57         //没有获取到数据,同时又退出挂起状态了,那说明线程被中断了,向上抛出InterruptedException
     58         Thread.interrupted();
     59         throw new InterruptedException();
     60     }
     61     
     62     //poll方法同样用于获取数据
     63     public E poll() {
     64         return transferer.transfer(null, true, 0);
     65     }
     66     
     67     //带超时时间的poll方法,如果超时时间到了还没有线程插入数据,就会返回失败
     68     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
     69         E e = transferer.transfer(null, true, unit.toNanos(timeout));
     70         //返回结果有2种情况
     71         //e != null表示成功取到数据了
     72         //!Thread.interrupted()表示返回失败了,且是因为超时失败的,此时e是null
     73         if (e != null || !Thread.interrupted())
     74             return e;
     75         //返回失败了,并且是因为当前线程被中断了
     76         throw new InterruptedException();
     77     }
     78     
     79     //可以看到SynchronousQueue的isEmpty方法一直返回的是true,因为SynchronousQueue没有任何容量
     80     public boolean isEmpty() {
     81         return true;
     82     }
     83 
     84     //同样的size方法也返回0
     85     public int size() {
     86         return 0;
     87     }
     88     
     89     <!--下面我们看看TransferQueue的具体实现,TransferQueue中的关键方法就是transfer方法了-->
     90     
     91     //先看看TransferQueue的父类Transferer,比较简单,就是提供了一个transfer方法,需要子类具体实现
     92     abstract static class Transferer<E> {
     93         abstract E transfer(E e, boolean timed, long nanos);
     94     }
     95     
     96     //TransferQueue
     97     static final class TransferQueue<E> extends Transferer<E> {
     98     
     99         //内部的节点类,用于表示一个请求
    100         //这里可以看出TransferQueue内部是一个单链表,因此可以保证先进先出
    101         static final class QNode {
    102             volatile QNode next;          // next node in queue
    103             volatile Object item;         // CAS'ed to or from null
    104             //请求所在的线程
    105             volatile Thread waiter;       // to control park/unpark
    106             //用于判断是入队还是出队,true表示的是入队操作,也就是添加数据
    107             final boolean isData;
    108 
    109             QNode(Object item, boolean isData) {
    110                 this.item = item;
    111                 this.isData = isData;
    112             }
    113 
    114             //可以看到QNode内部通过volatile关键字以及Unsafe类的CAS方法来实现线程安全
    115             //compareAndSwapObject方法第一个参数表示需要改变的对象,第二个参数表示偏移量
    116             //第三个参数表示参数期待的值,第四个参数表示更新后的值
    117             //下面的方法调用的意思是将当前的QNode对象(this)的next字段赋值为val,当目前的next的值是cmp时就会更新next字段成功
    118             boolean casNext(QNode cmp, QNode val) {
    119                 return next == cmp &&
    120                     U.compareAndSwapObject(this, NEXT, cmp, val);
    121             }
    122 
    123             //方法的原理同上面的类似,这里就是更新item的值了
    124             boolean casItem(Object cmp, Object val) {
    125                 return item == cmp &&
    126                     U.compareAndSwapObject(this, ITEM, cmp, val);
    127             }
    128 
    129             //方法的原理同上面的类似,这里把item赋值为自己,就表示取消当前节点表示的操作了
    130             void tryCancel(Object cmp) {
    131                 U.compareAndSwapObject(this, ITEM, cmp, this);
    132             }
    133 
    134             //调用tryCancel方法后item就会是this,就表示当前任务被取消了
    135             boolean isCancelled() {
    136                 return item == this;
    137             }
    138 
    139             //表示当前任务已经被返回了
    140             boolean isOffList() {
    141                 return next == this;
    142             }
    143 
    144             // Unsafe mechanics
    145             private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
    146             private static final long ITEM;
    147             private static final long NEXT;
    148 
    149             static {
    150                 try {
    151                     ITEM = U.objectFieldOffset
    152                         (QNode.class.getDeclaredField("item"));
    153                     NEXT = U.objectFieldOffset
    154                         (QNode.class.getDeclaredField("next"));
    155                 } catch (ReflectiveOperationException e) {
    156                     throw new Error(e);
    157                 }
    158             }
    159         }
    160         
    161         //首节点
    162         transient volatile QNode head;
    163         //尾部节点
    164         transient volatile QNode tail;
    165         /**
    166          * Reference to a cancelled node that might not yet have been
    167          * unlinked from queue because it was the last inserted node
    168          * when it was cancelled.
    169          */
    170         transient volatile QNode cleanMe;
    171 
    172         //构造函数中会初始化一个出队的节点,并且首尾都指向这个节点
    173         TransferQueue() {
    174             QNode h = new QNode(null, false); // initialize to dummy node.
    175             head = h;
    176             tail = h;
    177         }
    178         
    179         //transfer方法用于提交数据或者是获取数据
    180         @SuppressWarnings("unchecked")
    181         E transfer(E e, boolean timed, long nanos) {
    182             QNode s = null; // constructed/reused as needed
    183             //如果e不为null,就说明是添加数据的入队操作
    184             boolean isData = (e != null);
    185 
    186             for (;;) {
    187                 QNode t = tail;
    188                 QNode h = head;
    189                 if (t == null || h == null)         // saw uninitialized value
    190                     continue;                       // spin
    191 
    192                 //当队列为空的时候或者新加的操作和队尾的操作是同一个操作,可能都是入队操作也可能是出队操作,说明当前没有反向操作的线程空闲
    193                 if (h == t || t.isData == isData) { // empty or same-mode
    194                     QNode tn = t.next;
    195                     //这是一个检查,确保t指向队尾
    196                     if (t != tail)                  // inconsistent read
    197                         continue;
    198                     //tn不为null,说明t不是尾部节点,就执行advanceTail操作,将tn作为尾部节点,继续循环
    199                     if (tn != null) {               // lagging tail
    200                         advanceTail(t, tn);
    201                         continue;
    202                     }
    203                     //如果timed为true,表示带有超时参数,等待超时期间没有其他相反操作的线程提交就会直接返回null
    204                     //这里如果nanos初始值就是0,比如不带超时时间的offer和poll方法,当队尾的节点不是相反操作时就会直接返回null
    205                     if (timed && nanos <= 0L)       // can't wait
    206                         return null;
    207                     //如果没有超时时间或者超时时间不为0的话就创建新的节点
    208                     if (s == null)
    209                         s = new QNode(e, isData);
    210                     //使tail的next指向新的节点
    211                     if (!t.casNext(null, s))        // failed to link in
    212                         continue;
    213                     //更新TransferQueue的tail指向新的节点,这样tail节点就始终是尾部节点
    214                     advanceTail(t, s);              // swing tail and wait
    215                     //如果当前操作是带超时时间的,则进行超时等待,否则就挂起线程,直到有新的反向操作提交
    216                     Object x = awaitFulfill(s, e, timed, nanos);
    217                     //当挂起的线程被中断或是超时时间已经过了,awaitFulfill方法就会返回当前节点,这样就会有x == s为true
    218                     if (x == s) {                   // wait was cancelled
    219                         //将队尾节点移出,并重新更新尾部节点,返回null,就是入队或是出队操作失败了
    220                         clean(t, s);
    221                         return null;
    222                     }
    223                     
    224                     //如果s还没有被
    225                     if (!s.isOffList()) {           // not already unlinked
    226                         advanceHead(t, s);          // unlink if head
    227                         if (x != null)              // and forget fields
    228                             s.item = s;
    229                         s.waiter = null;
    230                     }
    231                     return (x != null) ? (E)x : e;
    232 
    233                 } 
    234                 //提交操作的时候刚刚好有反向的操作在等待
    235                 else {                            // complementary-mode
    236                     QNode m = h.next;               // node to fulfill
    237                     if (t != tail || m == null || h != head)
    238                         continue;                   // inconsistent read
    239 
    240                     Object x = m.item;
    241                     //这里先判断m是否是有效的操作
    242                     if (isData == (x != null) ||    // m already fulfilled
    243                         x == m ||                   // m cancelled
    244                         !m.casItem(x, e)) {         // lost CAS
    245                         advanceHead(h, m);          // dequeue and retry
    246                         continue;
    247                     }
    248                     
    249                     //更新头部节点
    250                     advanceHead(h, m);              // successfully fulfilled
    251                     //唤醒m节点的被挂起的线程
    252                     LockSupport.unpark(m.waiter);
    253                     //返回的结果用于给对应的操作,如take、offer等判断是否执行操作成功
    254                     return (x != null) ? (E)x : e;
    255                 }
    256             }
    257         }
    258         
    259         <!--下面看看执行挂起线程的方法awaitFulfill-->
    260         Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    261             /* Same idea as TransferStack.awaitFulfill */
    262             //首先获取超时时间
    263             final long deadline = timed ? System.nanoTime() + nanos : 0L;
    264             //当前操作所在的线程
    265             Thread w = Thread.currentThread();
    266             //线程被挂起或是进入超时等待之前阻止自旋的次数
    267             int spins = (head.next == s)
    268                 ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
    269                 : 0;
    270             for (;;) {
    271                 //这里首先判断线程是否被中断了,如果被中断了就取消等待,并设置s的item指向s本身作为标记
    272                 if (w.isInterrupted())
    273                     s.tryCancel(e);
    274                 Object x = s.item;
    275                 //x != e就表示超时时间到了或是线程被中断了,也就是执行了tryCancel方法
    276                 if (x != e)
    277                     return x;
    278                 //这里先判断超时的时间是否过了
    279                 if (timed) {
    280                     nanos = deadline - System.nanoTime();
    281                     if (nanos <= 0L) {
    282                         s.tryCancel(e);
    283                         continue;
    284                     }
    285                 }
    286                 //这里通过多几次循环来避免直接挂起线程
    287                 if (spins > 0)
    288                     --spins;
    289                 else if (s.waiter == null)
    290                     s.waiter = w;
    291                 else if (!timed)
    292                     //park操作会让线程挂起进入等待状态(Waiting),需要其他线程调用unpark方法唤醒
    293                     LockSupport.park(this);
    294                 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
    295                     //parkNanos操作会让线程挂起进入限期等待(Timed Waiting),不用其他线程唤醒,时间到了会被系统唤醒
    296                     LockSupport.parkNanos(this, nanos);
    297             }
    298         }
    299         
    300     }
    301 }
    View Code

    2、构造函数

    1  public SynchronousQueue() {
    2         this(false);
    3     }
    4 
    5     public SynchronousQueue(boolean fair) {
    6         // 通过 fair 值来决定公平性和非公平性
    7         // 公平性使用TransferQueue,非公平性采用TransferStack
    8         transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    9     }
    View Code

    3、公平模式 TransferQueue

     1 /**
     2  *  这是一个非常典型的 queue , 它有如下的特点
     3  *  1. 整个队列有 head, tail 两个节点
     4  *  2. 队列初始化时会有个 dummy 节点
     5  *  3. 这个队列的头节点是个 dummy 节点/ 或 哨兵节点, 所以操作的总是队列中的第二个节点(AQS的设计中也是这也)
     6  */
     7 
     8 /** 头节点 */
     9 transient volatile QNode head;
    10 /** 尾节点 */
    11 transient volatile QNode tail;
    12 /**
    13  * Reference to a cancelled node that might not yet have been
    14  * unlinked from queue because it was last inserted node
    15  * when it was cancelled
    16  */
    17 /**
    18  * 对应 中断或超时的 前继节点,这个节点存在的意义是标记, 它的下个节点要删除
    19  * 何时使用:
    20  *      当你要删除 节点 node, 若节点 node 是队列的末尾, 则开始用这个节点,
    21  * 为什么呢?
    22  *      大家知道 删除一个节点 直接 A.CASNext(B, B.next) 就可以,但是当  节点 B 是整个队列中的末尾元素时,
    23  *      一个线程删除节点B, 一个线程在节点B之后插入节点 这样操作容易致使插入的节点丢失, 这个cleanMe很像
    24  *      ConcurrentSkipListMap 中的 删除添加的 marker 节点, 他们都是起着相同的作用
    25  */
    26 transient volatile QNode cleanMe;
    27 
    28 TransferQueue(){
    29     /**
    30      * 构造一个 dummy node, 而整个 queue 中永远会存在这样一个 dummy node
    31      * dummy node 的存在使得 代码中不存在复杂的 if 条件判断
    32      */
    33     QNode h = new QNode(null, false);
    34     head = h;
    35     tail = h;
    36 }
    37 
    38 /**
    39  * 推进 head 节点,将 老节点的 oldNode.next = this, help gc,
    40  * 这种和 ConcurrentLinkedQueue 中一样
    41  */
    42 void advanceHead(QNode h, QNode nh){
    43     if(h == head && unsafe.compareAndSwapObject(this, headOffset, h, nh)){
    44         h.next = h; // forget old next help gc
    45     }
    46 }
    47 
    48 /** 更新新的 tail 节点 */
    49 void advanceTail(QNode t, QNode nt){
    50     if(tail == t){
    51         unsafe.compareAndSwapObject(this, tailOffset, t, nt);
    52     }
    53 }
    54 
    55 /** CAS 设置 cleamMe 节点 */
    56 boolean casCleanMe(QNode cmp, QNode val){
    57     return cleanMe == cmp && unsafe.compareAndSwapObject(this, cleanMeOffset, cmp, val);
    58 }
    View Code

    4、公平模式 TransferQueue中定义了QNode类

     1 static final class QNode {
     2         // next 域
     3         volatile QNode next;
     4         // item数据项
     5         volatile Object item;
     6         //  等待线程,用于park/unpark
     7         volatile Thread waiter;       // to control park/unpark
     8         //模式,表示当前是数据还是请求,只有当匹配的模式相匹配时才会交换
     9         final boolean isData;
    10 
    11         QNode(Object item, boolean isData) {
    12             this.item = item;
    13             this.isData = isData;
    14         }
    15 ...

    5、公平模式 TransferQueue transfer() 方法

      1  /**
      2  * Puts or takes an item
      3  * 主方法
      4  *
      5  * @param e  if non-null, the item to be handed to a consumer;
      6  *           if null, requests that transfer return an item
      7  *           offered by producer.
      8  * @param timed if this operation should timeout
      9  * @param nanos the timeout, in nanosecond
     10  * @return
     11  */
     12 @Override
     13 E transfer(E e, boolean timed, long nanos) {
     14     /**
     15      * Basic algorithm is to loop trying to take either of
     16      * two actions:
     17      *
     18      * 1. If queue apparently empty or holding same-mode nodes,
     19      *    try to add node to queue of waiters, wait to be
     20      *    fulfilled (or cancelled) and return matching item.
     21      *
     22      * 2. If queue apparently contains waiting items, and this
     23      *    call is of complementary mode, try to fulfill by CAS'ing
     24      *    item field of waiting node and dequeuing it, and then
     25      *    returning matching item.
     26      *
     27      * In each case, along the way, check for gurading against
     28      * seeing uninitialized head or tail value. This never
     29      * happens in current SynchronousQueue, but could if
     30      * callers held non-volatile/final ref to the
     31      * transferer. The check is here anyway because it places
     32      * null checks at top of loop, which is usually faster
     33      * than having them implicity interspersed
     34      *
     35      * 这个 producer / consumer 的主方法, 主要分为两种情况
     36      *
     37      * 1. 若队列为空 / 队列中的尾节点和自己的 类型相同, 则添加 node
     38      *      到队列中, 直到 timeout/interrupt/其他线程和这个线程匹配
     39      *      timeout/interrupt awaitFulfill方法返回的是 node 本身
     40      *      匹配成功的话, 要么返回 null (producer返回的), 或正真的传递值 (consumer 返回的)
     41      *
     42      * 2. 队列不为空, 且队列的 head.next 节点是当前节点匹配的节点,
     43      *      进行数据的传递匹配, 并且通过 advanceHead 方法帮助 先前 block 的节点 dequeue
     44      */
     45     QNode s = null; // constrcuted/reused as needed
     46     boolean isData = (e != null); // 1.判断 e != null 用于区分 producer 与 consumer
     47 
     48     for(;;){
     49         QNode t = tail;
     50         QNode h = head;
     51         if(t == null || h == null){         // 2. 数据未初始化, continue 重来
     52             continue;                       // spin
     53         }
     54         if(h == t || t.isData == isData){   // 3. 队列为空, 或队列尾节点和自己相同 (注意这里是和尾节点比价, 下面进行匹配时是和 head.next 进行比较)
     55             QNode tn = t.next;
     56             if(t != tail){                  // 4. tail 改变了, 重新再来
     57                 continue;
     58             }
     59             if(tn != null){                 // 5. 其他线程添加了 tail.next, 所以帮助推进 tail
     60                 advanceTail(t, tn);
     61                 continue;
     62             }
     63             if(timed && nanos <= 0){        // 6. 调用的方法的 wait 类型的, 并且 超时了, 直接返回 null, 直接见 SynchronousQueue.poll() 方法,说明此 poll 的调用只有当前队列中正好有一个与之匹配的线程在等待被【匹配才有返回值
     64                 return null;
     65             }
     66             if(s == null){
     67                 s = new QNode(e, isData);  // 7. 构建节点 QNode
     68             }
     69             if(!t.casNext(null, s)){      // 8. 将 新建的节点加入到 队列中
     70                 continue;
     71             }
     72 
     73             advanceTail(t, s);             // 9. 帮助推进 tail 节点
     74             Object x = awaitFulfill(s, e, timed, nanos); // 10. 调用awaitFulfill, 若节点是 head.next, 则进行一些自旋, 若不是的话, 直接 block, 知道有其他线程 与之匹配, 或它自己进行线程的中断
     75             if(x == s){                   // 11. 若 (x == s)节点s 对应额线程 wait 超时 或线程中断, 不然的话 x == null (s 是 producer) 或 是正真的传递值(s 是 consumer)
     76                 clean(t, s);              // 12. 对接点 s 进行清除, 若 s 不是链表的最后一个节点, 则直接 CAS 进行 节点的删除, 若 s 是链表的最后一个节点, 则 要么清除以前的 cleamMe 节点(cleamMe != null), 然后将 s.prev 设置为 cleanMe 节点, 下次进行删除 或直接将 s.prev 设置为cleanMe
     77                 return null;
     78             }
     79 
     80             if(!s.isOffList()){          // 13. 节点 s 没有 offlist
     81                 advanceHead(t, s);       // 14. 推进head 节点, 下次就调用 s.next 节点进行匹配(这里调用的是 advanceHead, 因为代码能执行到这边说明s已经是 head.next 节点了)
     82                 if(x != null){          // and forget fields
     83                     s.item = s;
     84                 }
     85                 s.waiter = null;       // 15. 释放线程 ref
     86             }
     87 
     88             return (x != null) ? (E)x :e;
     89 
     90         }else{                              // 16. 进行线程的匹配操作, 匹配操作是从 head.next 开始匹配 (注意 队列刚开始构建时 有个 dummy node, 而且 head 节点永远是个 dummy node 这个和 AQS 中一样的)
     91             QNode m = h.next;               // 17. 获取 head.next 准备开始匹配
     92             if(t != tail || m == null || h != head){
     93                 continue;                  // 18. 不一致读取, 有其他线程改变了队列的结构inconsistent read
     94             }
     95 
     96             /** producer 和 consumer 匹配操作
     97              *  1. 获取 m的 item (注意这里的m是head的next节点
     98              *  2. 判断 isData 与x的模式是否匹配, 只有produce与consumer才能配成一对
     99              *  3. x == m 判断是否 节点m 是否已经进行取消了, 具体看(QNOde#tryCancel)
    100              *  4. m.casItem 将producer与consumer的数据进行交换 (这里存在并发时可能cas操作失败的情况)
    101              *  5. 若 cas操作成功则将h节点dequeue
    102              *
    103              *  疑惑: 为什么将h进行 dequeue, 而不是 m节点
    104              *  答案: 因为每次进行配对时, 都是将 h 是个 dummy node, 正真的数据节点 是 head.next
    105              */
    106             Object x = m.item;
    107             if(isData == (x != null) ||    // 19. 两者的模式是否匹配 (因为并发环境下 有可能其他的线程强走了匹配的节点)
    108                     x == m ||               // 20. m 节点 线程中断或者 wait 超时了
    109                     !m.casItem(x, e)        // 21. 进行 CAS 操作 更改等待线程的 item 值(等待的有可能是 concumer / producer)
    110                     ){
    111                 advanceHead(h, m);          // 22.推进 head 节点 重试 (尤其 21 操作失败)
    112                 continue;
    113             }
    114 
    115             advanceHead(h, m);             // 23. producer consumer 交换数据成功, 推进 head 节点
    116             LockSupport.unpark(m.waiter); // 24. 换线等待中的 m 节点, 而在 awaitFulfill 方法中 因为 item 改变了,  所以 x != e 成立, 返回
    117             return (x != null) ? (E)x : e; // 25. 操作到这里若是 producer, 则 x != null, 返回 x, 若是consumer, 则 x == null,.返回 producer(其实就是 节点m) 的 e
    118         }
    119     }
    120 
    121 }
    View Code

    6、公平模式 TransferQueueawaitFulfill() 方法

     1 // 自旋或者等待,直到填充完毕
     2 // 这里的策略是什么呢?如果自旋次数不够了,通常是 16 次,但还有超过 1 秒的时间,就阻塞等待被唤醒。
     3 // 如果时间到了,就取消这次的入队行为。
     4 // 返回的是 Node 本身
     5 // s.item 就是 e 
     6 Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
     7     final long deadline = timed ? System.nanoTime() + nanos : 0L;
     8     Thread w = Thread.currentThread();
     9     int spins = ((head.next == s) ?// 如果成功将 tail.next 覆盖了 tail,如果有超时机制,则自旋 32 次,如果没有超时机制,则自旋 32 *16 = 512次
    10                  (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    11     for (;;) {
    12         if (w.isInterrupted())// 当前线程被中断
    13             s.tryCancel(e);// 尝试取消这个 item
    14         Object x = s.item;// 获取到这个 tail 的 item
    15         if (x != e) // 如果不相等,说明 node 中的 item 取消了,返回这个 item。
    16             // 这里是唯一停止循环的地方。当 s.item 已经不是当初的哪个 e 了,说明要么是时间到了被取消了,要么是线程中断被取消了。
    17             // 当然,不仅仅只有这2种 “意外” 情况,还有一种情况是:当另一个线程拿走了这个数据,并修改了 item,也会通过这个判断,返回被“修改”过的 item。
    18             return x;
    19         if (timed) {// 如果有时间限制
    20             nanos = deadline - System.nanoTime();
    21             if (nanos <= 0L) {// 如果时间到了
    22                 s.tryCancel(e);// 尝试取消 item,供上面的 x != e 判断
    23                 continue;// 重来
    24             }
    25         }
    26         if (spins > 0)// 如果还有自旋次数
    27             --spins;// 减一
    28         else if (s.waiter == null)// 如果自旋不够,且 tail 的等待线程还没有赋值
    29             s.waiter = w;// 当前线程赋值给 tail 的等待线程
    30         else if (!timed)// 如果自旋不够,且如果线程赋值过了,且没有限制时间,则 wait,(危险操作)
    31             LockSupport.park(this);
    32         else if (nanos > spinForTimeoutThreshold)// 如果自旋不够,且如果限制了时间,且时间还剩余超过 1 秒,则 wait 剩余时间。
    33             // 主要目的就是等待,等待其他线程唤醒这个节点所在的线程。
    34             LockSupport.parkNanos(this, nanos);
    35     }
    36 }
    View Code

    ====

    View Code

    链接:https://www.jianshu.com/p/af6f83c78506

  • 相关阅读:
    IDEA右侧代码预览、代码地图消失(快捷键:Ctrl+Shift+g)
    vscode 清除多余空行
    StringJoiner
    redis远程连接
    el-container全屏布局(ElementUI)
    阿里云开启selinux无法启动系统问题
    fail模块场景(ansible)
    "***.sh" is read-only (add ! to override) 问题解决
    ansible 报错解决:ERROR! this task '****' has extra params, which is only allowed in the following modules:..
    java实现Excel定制导出(基于POI的工具类)
  • 原文地址:https://www.cnblogs.com/h--d/p/14596091.html
Copyright © 2011-2022 走看看