zoukankan      html  css  js  c++  java
  • Java同步数据结构之SynchronousQueue




    SynchronousQueue的实现目标主要就是阻塞每一个操作,直到与其互补的操作出现,才完成数据传递交接返回。例如,当一个线程执行put(提供数据的生产者)操作,如果队列/堆栈中上一次入队的操作是take(请求数据的消费者),这时它们形成互补,那么它们进行数据交接,双双出队并返回;如果队列/堆栈为空或上一次入队的操作也是put操作,那么当前线程只能入队等待,反之亦然。当然SynchronousQueue同时也提供了相应的非阻塞以及指定超时时间的互补操作 offer/poll,它们在执行的时候或在指定的超时时间内,如果没有出现与其互补的操作则立即返回,例如,一个线程执行无超时时间的offer操作,这时候如果队列/堆栈中不存在与其互补的操作 poll或take,那么它将立即出队返回(offer提供的数据也不会保留在队列/堆栈中),而不是像put那样非要等到一个与其互补的操作出现拿走它的数据,反之亦然。




     1 static void test1() throws Exception{
     2     SynchronousQueue sq = new SynchronousQueue();
     4     CompletableFuture putFuture = CompletableFuture.runAsync(() -> {
     6         try{
     7             sq.put("AS");//can't be offer
     8         }catch (Exception e){
     9             e.printStackTrace();
    10         }
    12     });
    14     TimeUnit.SECONDS.sleep(2); //等待2秒,才启动消费线程
    16     CompletableFuture takeResultFuture = CompletableFuture.supplyAsync(() -> {
    17         try{
    18             return sq.take();
    19             //or return sq.poll(5, TimeUnit.SECONDS);
    20         }catch (Exception e){
    21             e.printStackTrace();
    22         }
    23         return null;
    24     });
    26     CompletableFuture.allOf(putFuture, takeResultFuture);
    28     System.out.println(takeResultFuture.get());
    29 }
    View Code

    示例中, 一个线程先执行put(“AS”),另一个线程执行take,这样就可以完成数据的传递,最终takeResultFuture将得到“AS”,在上例中,put操作不能用offer替代,因为该线程先执行,offer在执行的时候另一个线程可能还没有执行,offer将立即返回,等到消费线程执行take时,已经没有与其互补的操作了,所以take将被阻塞,直到下一个与其互补的操作offer、put操作出现如果成功与其完成数据交接,才返回。另外,消费线程也可以使用 sq.poll(5, TimeUnit.SECONDS);替代,这时候poll会等待5秒,在这五秒之内,put线程肯定已经执行就绪,所以肯定可以与其发生数据交接。


    根据Java Doc, SynchronousQueue为了支持可选的公平策略,即为等待的生产者和消费者线程排序,分别扩展了W. N. Scherer III和M. L. Scott在2004年10月的第18届分布式计算年会上提出的“Nonblocking Concurrent Objects with Condition Synchronization”中描述的双堆栈和双队列算法。LIFO的堆栈用于非公平模式,FIFO的队列用于公平模式,两者的性能大体相似。FIFO通常在争用情况下支持更高的吞吐量,但LIFO在普通应用程序中维护更高的线程局部性。双队列/双堆栈的解释是指在任何给定时间要么持有数据(被put操作提供),要么请求数据(take操作),要么为空的队列。fulfill操作即是出队一个互补的节点,SynchronousQueue的内部是无锁的实现,都是基于CAS + Park 来实现线程的阻塞等待。


     1 /**
     2  * Shared internal API for dual stacks and queues. 用于双堆栈和队列共享内部API。
     3  */
     4 abstract static class Transferer<E> {
     6     /**
     7      * Performs a put or take. 执行put或take操作。
     8      *
     9      * @param e if non-null, the item to be handed to a consumer;//如果非空,则是被传递给消费者的数据;
    10      *          if null, requests that transfer return an item offered by producer.
    11      *          // 如果为空,则请求该transfer返回生产者提供的数据。
    13      * @param timed if this operation should timeout //如果该操作应该超时
    14      * @param nanos the timeout, in nanoseconds
    15      * @return if non-null, the item provided or received; if null,
    16      *         the operation failed due to timeout or interrupt --
    17      *         the caller can distinguish which of these occurred
    18      *         by checking Thread.interrupted.
    19        返回值如果非空,就是被提供或者接受的数据;如果返回值为空,则操作因为超时或者线程中断而失败,
    20        调用者可以通过Thread.interrupted查看中断标记来区分是中断还是超时。
    21      */
    22     abstract E transfer(E e, boolean timed, long nanos);
    23 }
    View Code



      1 /** Dual stack */ 双堆栈
      2 static final class TransferStack<E> extends Transferer<E> {
      3 /*
      4  * This extends Scherer-Scott dual stack algorithm, differing,
      5  * among other ways, by using "covering" nodes rather than
      6  * bit-marked pointers: Fulfilling operations push on marker
      7  * nodes (with FULFILLING bit set in mode) to reserve a spot
      8  * to match a waiting node.
     10    这扩展了scher - scott双堆栈算法,不同的是,它使用“覆盖”节点而不是位标记指针:
     11    在标记节点上执行push操作(在模式位上标记Fulfilling标志)来保留一个位置来匹配等待的节点。
     13    Fulfilling操作将标记成Fulfilling的节点压入堆栈来匹配等待的节点。
     14  */
     16 /* Modes for SNodes, ORed together in node fields */ snode的模式,在节点字段中组合在一起
     17 /** Node represents an unfulfilled consumer */ 表示unfulfilled的消费者
     18 static final int REQUEST    = 0;
     20 /** Node represents an unfulfilled producer */ 表示unfulfilled的生产者
     21 static final int DATA       = 1;
     23 /** Node is fulfilling another unfulfilled DATA or REQUEST */ 节点正在fulfilling另一个unfulfilled的数据或请求
     24 static final int FULFILLING = 2;
     26 /** Returns true if m has fulfilling bit set. */ 如果 m 有fulfilling标记位返回true,表示正在交接数据即fulfilling
     27 static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
     29 /** Node class for TransferStacks. */
     30 static final class SNode {
     31     volatile SNode next;        // next node in stack           栈中的下一个节点
     32     volatile SNode match;       // the node matched to this   与此匹配的节点
     33     volatile Thread waiter;     // to control park/unpark     阻塞当前节点的线程
     34     Object item;                // data; or null for REQUESTs 节点数据;对于消费者来说为null
     35     int mode;                   //节点的模式
     37     //注意:item和mode字段不需要被volatile修饰,因为它们总是在其他volatile/原子操作之前写,之后读。
     38     // Note: item and mode fields don't need to be volatile 
     39     // since they are always written before, and read after,
     40     // other volatile/atomic operations.
     42     SNode(Object item) {
     43         this.item = item;
     44     }
     46     boolean casNext(SNode cmp, SNode val) {//将当前节点的下一个节点从cmp更新成val
     47         return cmp == next &&
     48             UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
     49     }
     51     /**
     52      * Tries to match node s to this node, if so, waking up thread. 尝试将s节点与当前节点匹配,如果匹配成功则唤醒线程。
     53      * Fulfillers call tryMatch to identify their waiters. Fulfillers 
     54      * Waiters block until they have been matched.
     55        Fulfillers 通过调用 tryMatch 确定它们的等待者。Fulfillers等待者被阻塞直到它们被匹配。
     56      *
     57      * @param s the node to match
     58      * @return true if successfully matched to s
     59      */
     60     boolean tryMatch(SNode s) {  //如果match为nul,尝试将节点s设置成当前节点的match
     61         if (match == null &&  //还没有被其它Fulfillers 匹配,那么尝试与它配对
     62             UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
     63             Thread w = waiter;
     64             if (w != null) {    // waiters need at most one unpark
     65                 waiter = null;
     66                 LockSupport.unpark(w);  //唤醒等待的线程Waiters。
     67             }
     68             return true; //返回true
     69         }
     70         /*
     71          * 已经和s匹配过了也返回true、否则返回false。
     72          * match == s这种情况就发生在transfer的情况3:即当前节点与s的匹配被其它线程抢先帮助完成了
     73          * 所以当该方法再次调用的时候,match其实已经指向了s。
     74          */
     75         return match == s; 
     76     }
     78     /**
     79      * Tries to cancel a wait by matching node to itself. 尝试通过将节点与自身匹配来取消等待。
     80      */
     81     void tryCancel() {
     82         UNSAFE.compareAndSwapObject(this, matchOffset, null, this); //将match设置成自己,避免被其它线程匹配
     83     }
     85     boolean isCancelled() {//如果节点match指向自己的话就是已经取消等待
     86         return match == this;
     87     }
     89     // Unsafe mechanics
     90     private static final sun.misc.Unsafe UNSAFE;
     91     private static final long matchOffset;
     92     private static final long nextOffset;
     94     static {
     95         try {
     96             UNSAFE = sun.misc.Unsafe.getUnsafe();
     97             Class<?> k = SNode.class;
     98             matchOffset = UNSAFE.objectFieldOffset
     99                 (k.getDeclaredField("match"));
    100             nextOffset = UNSAFE.objectFieldOffset
    101                 (k.getDeclaredField("next"));
    102         } catch (Exception e) {
    103             throw new Error(e);
    104         }
    105     }
    106 }
    108 /** The head (top) of the stack */  栈顶节点head
    109 volatile SNode head;
    111 boolean casHead(SNode h, SNode nh) { //更新栈顶节点,尝试将栈顶节点从h 更新成nh
    112     return h == head &&
    113         UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
    114 }
    116 /**
    117  * Creates or resets fields of a node. Called only from transfer
    118  * where the node to push on stack is lazily created and
    119  * reused when possible to help reduce intervals between reads
    120  * and CASes of head and to avoid surges of garbage when CASes
    121  * to push nodes fail due to contention.
    123    创建或重置节点的字段。只有在要推入堆栈的节点是延迟创建并且可重用的时候从transfer调用,
    124    以帮助减少读操作和CAS置换head之间的间隔,并避免由于竞争而导致push节点的CAS操作失败时出现垃圾激增。
    126  */
    127 static SNode snode(SNode s, Object e, SNode next, int mode) {
    128     if (s == null) s = new SNode(e);//s为空才创建新节点,否则更新原节点的字段。
    129     s.mode = mode;
    130     s.next = next;
    131     return s;
    132 }
    View Code


    关于堆栈实现中的int型节点模式字段mode,其实它由两部分组成(两个二进制位),二进制位的最低位表示当前节点的操作类型(即是数据提供还是数据消费),第二位表示当前节点是否找到与其互补的节点,例如,当mode为0(REQUEST常量)表示当前节点是数据消费者(即take/poll操作),为1(DATA常量)表示数据提供者(put/offer操作),当与其互补节点到达时,mode的二进制将变成 1X:消费者的二进制即10,生产者的二进制即11,所以源码中的isFulfilling方法通过将mode与2(二进制10)按位与就可以得到二进制的高位指示的值,即是否找到互补节点,正在完成数据交接。


      1 /**
      2  * Puts or takes an item. 放置或者拿出一个item
      3  */
      4 @SuppressWarnings("unchecked")
      5 E transfer(E e, boolean timed, long nanos) {
      6     /*
      7      * Basic algorithm is to loop trying one of three actions: 
      8      *    基本算法是循环尝试下面三个动作之一:
      9      *
     10      * 1. If apparently empty or already containing nodes of same
     11      *    mode, try to push node on stack and wait for a match,
     12      *    returning it, or null if cancelled.
     13      *
     14      *      如果栈为空或者栈中已经存在了模式相同的节点(都是读或写),就尝试将当前节点压入栈顶等待被匹配,最后返回。
     15      *    如果当前节点在等待过程中被取消,则返回null。
     16      *
     17      * 2. If apparently containing node of complementary mode,
     18      *    try to push a fulfilling node on to stack, match
     19      *    with corresponding waiting node, pop both from
     20      *    stack, and return matched item. The matching or
     21      *    unlinking might not actually be necessary because of
     22      *    other threads performing action 3:
     23      *
     24      *      如果栈包含一个与当前操作模式互补(即可以匹配)的节点,则尝试将当前线程作为作为满足匹配条件(模式打上fulfilling标记)的新节点入栈,
     25      *    并和栈中相应的等待节点进行匹配,匹配完成之后将这一对相匹配的节点都从栈中弹出,返回匹配的数据。
     26      *      实际上匹配和出栈这两个节点不是必须的,因为可能其它线程执行了行为3,即帮助它们完成了匹配和出栈。
     27      *
     28      * 3. If top of stack already holds another fulfilling node,
     29      *    help it out by doing its match and/or pop
     30      *    operations, and then continue. The code for helping
     31      *    is essentially the same as for fulfilling, except
     32      *    that it doesn't return the item.
     33      *      
     34      *      如果当前栈顶元素找到了和它匹配的节点(即栈顶节点处于fulfilling模式),当前线程会尝试帮助它们完成匹配和弹出操作,
     35      *      然后继续处理当前线程自己的操作。帮助的代码与情况2中匹配的代码本质上是相同的,只是它不返回匹配的对象。
     36      *      
     37      */
     39     SNode s = null; // constructed/reused as needed
     40     int mode = (e == null) ? REQUEST : DATA; //传入对象为空表示消费者(take),不为空表示生产者(put)
     42     for (;;) {
     43         SNode h = head;
     44         // 情况1:如果栈为空或与栈顶节点模式相同
     45         if (h == null || h.mode == mode) {  
     46             //如果定时等待已经超时或者时间本身为0(offer)
     47             if (timed && nanos <= 0) {      
     48                 if (h != null && h.isCancelled()) //栈顶节点已经取消了,清除它,继续循环比较新的栈顶节点
     49                     casHead(h, h.next);     // pop cancelled node
     50                 else
     51                     //这里直接返回null,针对已经超时或者等待时间为0(offer),栈为空或栈顶节点模式相同,返回null
     52                     return null; 
     53             } 
     54             //构造新节点,压入栈顶。这里延迟创建节点实例可以避免CAS失败导致垃圾快速增长
     55             else if (casHead(h, s = snode(s, e, h, mode))) {
     56                 SNode m = awaitFulfill(s, timed, nanos); //等待被匹配或者取消(中断或超时)才返回
     57                 //在返回后,如果发现返回的结果等于自身,则说明在等待过程中已经取消了,则将节点从栈中清除
     58                 if (m == s) {               // wait was cancelled
     59                     clean(s);
     60                     return null; 
     61                 }
     62                 /*
     63                  * 阻塞醒来后,如果发现有新的节点插入到当前节点s的前面成为了新的栈顶节点
     64                  * 那么该新节点就是与当前节点s匹配的节点
     65                  /
     66                 if ((h = head) != null && h.next == s)
     67                     //同时将两个节点都出队,辅助s的匹配操作
     68                     casHead(h, s.next);     // help s's fulfiller
     69                 //是消费者(读)就返回匹配到的对象,是生产者(写)就返回本身携带的对象
     70                 return (E) ((mode == REQUEST) ? m.item : s.item);
     71             }
     72         } 
     73         /*
     74          * 情况2:栈不为空,并且与栈顶节点模式不同,并且栈顶节点还没有找到可以匹配节点(模式中没有Fulfilling标记)
     75          * 说明当前操作与栈顶节点h是可以匹配的互补节点。
     76          */
     77         else if (!isFulfilling(h.mode)) { // try to fulfill
     78             //栈顶节点已经取消了,重置栈顶节点继续循环,会重新判断当前操作与新的Head是否可以匹配
     79             if (h.isCancelled())            // already cancelled
     80                 casHead(h, h.next);         // pop and retry
     81             //构造新匹配节点(模式打上fulfilling标记),压入栈顶
     82             else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
     83                 /*
     84                  * 循环,让s与原来的head(h)进行匹配,匹配成功则返回匹配的数据,
     85                  * 匹配失败(等待被匹配者取消了等待)则移除那个与s互补的节点,继续与它的下一个节点进行匹配,
     86                  * 直到成功或到达堆栈结尾。若最终都没成功则重新外层循环。
     87                  */
     88                 for (;;) { // loop until matched or waiters disappear
     89                     //s.next 就是要与s匹配的模式不同的那个节点,也就是原来的栈顶节点
     90                     SNode m = s.next;       // m is s's match
     91                     /* 
     92                      * m == null,表示事先准备与s匹配的节点m取消了等待,并且后面已经没有节点了
     93                      * 因为如果m取消了,m.next不为空,那么此时m就将指向m.next,所以m == null,
     94                      * 其实也表示栈空(all waiters are gone)
     95                      */
     96                     if (m == null) {        // all waiters are gone
     97                         //把s节点从栈顶弹出,重新循环。
     98                         casHead(s, null);   // pop fulfill node
     99                         //将s重置为空,下一次循环重新创建实例
    100                         s = null;           // use new node next time
    101                         //退出内循环,继续外层主循环
    102                         break;              // restart main loop
    103                     }
    104                     SNode mn = m.next;
    105                     //尝试匹配,匹配成功会唤醒m节点对应的线程
    106                     if (m.tryMatch(s)) {
    107                         //匹配成功,将s、m都出队,m.next成为新的栈顶
    108                         casHead(s, mn);     // pop both s and m
    109                         //是消费者就返回匹配到的对象,是生产者就返回本身携带的对象
    110                         return (E) ((mode == REQUEST) ? m.item : s.item);
    111                     } else                  // lost match
    112                         /*
    113                          * 没有匹配成功,可能是s与m匹配的过程中,m节点取消了等待,将match指向了自己
    114                          * 将m节点移除,让s.next指向m.next即mn,继续循环让s与新的next进行匹配
    115                          */
    116                         s.casNext(m, mn);   // help unlink
    117                 }
    118             }
    119         } 
    120         //情况3:说明栈顶节点正在被匹配,mode变量的二进制位第2位为1,则辅助栈顶节点与其next节点完成匹配
    121         else {                            // help a fulfiller
    122             //m 和 h 就是理论上正在匹配的一对节点
    123             SNode m = h.next;               // m is h's match
    124             //m == null,表示事先准备与h匹配的节点m取消了等待
    125             if (m == null)                  // waiter is gone
    126                 //把栈顶节点弹出,重新循环
    127                 casHead(h, null);           // pop fulfilling node
    128             else {
    129                 //否则,帮助它们完成匹配
    130                 SNode mn = m.next;
    131                 //尝试匹配m 、 h
    132                 if (m.tryMatch(h))          // help match
    133                     //匹配成功,将h、m都出队,m.next成为新的栈顶
    134                     casHead(h, mn);         // pop both h and m
    135                 else                        // lost match
    136                     //没有匹配成功,可能是h与m匹配的过程中,m节点取消了等待,将match指向了自己
    137                     h.casNext(m, mn);       // help unlink
    138             }
    139         }
    140     }
    141 }
    143 /**
    144  * Spins/blocks until node s is matched by a fulfill operation.
    145  * 自旋/阻塞,直到节点s被fulfill操作匹配。
    147  * @param s the waiting node
    148  * @param timed true if timed wait true为定时等待
    149  * @param nanos timeout value
    150  * @return matched node, or s if cancelled 返回匹配的节点,或者如果取消返回s
    151  */
    152 SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    153     /*
    154      * When a node/thread is about to block, it sets its waiter
    155      * field and then rechecks state at least one more time
    156      * before actually parking, thus covering race vs
    157      * fulfiller noticing that waiter is non-null so should be
    158      * woken.
    159        当一个节点/线程将要阻塞时,它会设置它的waiter字段,然后在实际阻塞之前至少要重新检查一次状态,
    160        fulfiller会注意到waiter是非空的,因此应该被唤醒。
    162      *
    163      * When invoked by nodes that appear at the point of call
    164      * to be at the head of the stack, calls to park are
    165      * preceded by spins to avoid blocking when producers and
    166      * consumers are arriving very close in time.  This can
    167      * happen enough to bother only on multiprocessors.
    168        当调用点上出现在堆栈顶部的节点调用park时,调用之前会有自旋,以避免在生产者和消费者到达非常接近时阻塞。
    169        这种情况只会发生在多处理器上。
    170      *
    171      * The order of checks for returning out of main loop
    172      * reflects fact that interrupts have precedence over
    173      * normal returns, which have precedence over
    174      * timeouts. (So, on timeout, one last check for match is
    175      * done before giving up.) Except that calls from untimed
    176      * SynchronousQueue.{poll/offer} don't check interrupts
    177      * and don't wait at all, so are trapped in transfer
    178      * method rather than calling awaitFulfill.
    179        从主循环返回的检查顺序反映了这样一个事实:中断优先于正常返回,而正常返回优先于超时。
    180        (因此,在超时时,要在放弃之前进行最后一次匹配检查。) 除了来自非定时同步队列的调用。
    181        poll/offer的非定时调用不检查中断且根本不需要等待,因此只会在transfer方法中返回,而不会调用awaitFulfill。
    183      */
    184     final long deadline = timed ? System.nanoTime() + nanos : 0L;//如果有等待时间,计算超时截至时间
    185     Thread w = Thread.currentThread();
    186     int spins = (shouldSpin(s) ?  //如果应该自旋,获取定时等待或非定时等待的自旋次数
    187                  (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    188     for (;;) {
    189         if (w.isInterrupted()) 
    190             s.tryCancel(); //如果中断则尝试取消。尝试将match指向自己,避免再被其它线程匹配。
    191         SNode m = s.match;
    192         if (m != null)
    193             return m; //取消了则返回自身s,否则返回匹配的节点。
    194         if (timed) { //如果还没有被匹配或取消,是定时等待
    195             nanos = deadline - System.nanoTime();
    196             if (nanos <= 0L) { //等待时间已经超时,尝试取消。
    197                 s.tryCancel();
    198                 continue; //使用continue,在放弃之前会再循环一次,避免漏掉已经匹配的节点。
    199             }
    200         }
    201         //没有中断,没有取消,或定时等待没有超时
    202         if (spins > 0) //应该自旋,每次自旋都会判断下一次是否还需要自旋,以及自旋的次数。
    203             spins = shouldSpin(s) ? (spins-1) : 0;
    204         else if (s.waiter == null) //准备进入阻塞之前先设置waiter,下一次循环再次检测状态之后才阻塞
    205             s.waiter = w; // establish waiter so can park next iter
    206         else if (!timed) 
    207             LockSupport.park(this); //非定时阻塞
    208         else if (nanos > spinForTimeoutThreshold)
    209             LockSupport.parkNanos(this, nanos); //定时阻塞
    210     }
    211 }
    213 /**
    214    判断节点s是否应该自旋。
    215  * Returns true if node s is at head or there is an active fulfiller.
    216  * 如果s是栈顶节点或者堆栈为空或者当前时刻正好存在一个活动的fulfiller,返回true。
    217  * 
    218  * 堆栈为空或者s是栈顶节点应该自旋很好理解,因为s就是栈中当前时刻最优先出队/被匹配的节点。
    219  * 因此在自旋的过程中,随时可能有新的线程进来就可以完成与当前线程的匹配,从而减少当前线程的一次挂起;
    220  * 
    221  * 但是当前时刻存在一个活动的fulfiller为什么也应该自旋呢?存在的fulfiller当然是优先尝试与栈顶节点匹配,
    222  * 如果节点s不是栈顶,那么理论上来说是不大可能被该fulfiller线程匹配的,
    223  * 只有在该fulfiller匹配的过程中刚好位于该fulfiller与s节点之间的所有节点都取消了,自然而然就轮到s节点了。
    224  */
    225 boolean shouldSpin(SNode s) {
    226     SNode h = head;
    227     return (h == s || h == null || isFulfilling(h.mode));
    228 }
    230 /**
    231  * Unlinks s from the stack. 从堆栈中分离s节点,
    232    会清除栈顶至s.next(如果next已经取消,则至next.next)之间所有已经取消的节点。
    233  */
    234 void clean(SNode s) {
    235     // 将引用置null,方便垃圾回收。
    236     s.item = null;   // forget item
    237     s.waiter = null; // forget thread
    239     /*
    240      * At worst we may need to traverse entire stack to unlink
    241      * s. If there are multiple concurrent calls to clean, we
    242      * might not see s if another thread has already removed
    243      * it. But we can stop when we see any node known to
    244      * follow s. We use s.next unless it too is cancelled, in
    245      * which case we try the node one past. We don't check any
    246      * further because we don't want to doubly traverse just to
    247      * find sentinel.
    248      * 在最坏的情况下,我们可能需要遍历整个堆栈来分离s节点。
    249      * 因为如果有多个线程并发调用clean方法,由于另一个线程可能已经移除了s节点,我们可能找不到s节点。
    250      * 但是我们可以在遍历到s的下一个节点或者下下个节点(s.next已经取消)时停止。
    251      * 我们使用s的下一个节点s.next,除非它也被取消了,在这种情况下,我们尝试下下个节点。
    252      * 我们不再进一步检查该下下个节点是否取消,因为我们不想为了找到它而重复遍历。
    253      * 
    254      * 如果它的下下个节点也取消了,那么就是最坏的情况,要遍历整个堆栈。
    255      */
    257     SNode past = s.next;
    258     if (past != null && past.isCancelled()) //s.next已经取消,取下下个节点,不论它是否也取消了
    259         past = past.next;
    261     // Absorb cancelled nodes at head 移除堆栈头部已经取消的连续节点,重新确定栈顶节点
    262     SNode p;
    263     while ((p = head) != null && p != past && p.isCancelled())
    264         casHead(p, p.next);  
    267     // Unsplice embedded nodes
    268     while (p != null && p != past) { //移除当前有效栈顶节点与past节点之间已经取消的节点
    269         SNode n = p.next;
    270         if (n != null && n.isCancelled())
    271             p.casNext(n, n.next);
    272         else
    273             p = n;
    274     }
    275 }
    View Code

    关于堆栈实现的逻辑通过注释很容易理解, 简单来说,如果堆栈为空或当前操作与栈顶节点模式相同(都是生产者或都是消费者)那么构造成新的节点入栈成为新的栈顶以 自旋 + 阻塞的方式进行等待,如果发现当前操作与栈顶节点模式互补,那么将当前操作的模式打上fulfilling标记,然后入栈成为新的栈顶(入栈抢占栈顶的过程是可以被其它互补操作抢先的,但一旦入栈成功那么其它操作再也不能抢走与其匹配的next节点,除非它的next节点自己取消了等待),接着再完成该栈顶节点与其next节点的数据匹配交接,成功就唤醒等待节点将两个节点都出栈返回,失败(说明与其互补的节点中断或超时而取消了)则继续与栈中下下个节点(因为那些后续节点肯定都是与当前操作模式互补的)匹配交接数据,直到成功或者堆栈结尾,另外当标记为fulfilling的互补节点入栈之后,再还没完成匹配之前,后续到达的其它操作也会帮助它们完成数据交接匹配。




     最终如果take与put(B)完成了匹配,那么堆栈中最后只剩下put (A)在等待被互补操作匹配交接数据。






      1 /** Dual Queue */ 双队列
      2 static final class TransferQueue<E> extends Transferer<E> {
      3     /*
      4      * This extends Scherer-Scott dual queue algorithm, differing,
      5      * among other ways, by using modes within nodes rather than
      6      * marked pointers. The algorithm is a little simpler than
      7      * that for stacks because fulfillers do not need explicit
      8      * nodes, and matching is done by CAS'ing QNode.item field
      9      * from non-null to null (for put) or vice versa (for take).
     10      *
     11      * 这扩展了scher - scott双队列算法,与其他方法不同的是,它使用节点内的模式而不是标记指针。
     12      * 由于fulfillers不需要构造成新节点,匹配由互补节点自己完成的对节点数据的CAS更新操作,因此该算法比堆栈算法简单一些。
     13      * item字段从非null到null(用于put),反之亦然(用于take)。
     14      */
     16     /** Node class for TransferQueue. */
     17     static final class QNode {
     18         volatile QNode next;          // next node in queue     队列中的下一个节点
     19         volatile Object item;         // CAS'ed to or from null 节点数据
     20         volatile Thread waiter;       // to control park/unpark 阻塞当前节点的线程
     21         final boolean isData;         // 节点的读写标记,类似堆栈的模式
     23         QNode(Object item, boolean isData) {
     24             this.item = item;
     25             this.isData = isData;
     26         }
     28         boolean casNext(QNode cmp, QNode val) {//将当前节点的下一个节点从cmp更新成val
     29             return next == cmp &&
     30                 UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
     31         }
     33         boolean casItem(Object cmp, Object val) {//将当前节点的数据从cmp更新成val
     34             return item == cmp &&
     35                 UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
     36         }
     38         /**
     39          * Tries to cancel by CAS'ing ref to this as item. 尝试通过将节点数据指向自身来取消等待。
     40          */
     41         void tryCancel(Object cmp) {
     42             UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
     43         }
     45         boolean isCancelled() { //如果节点数据item指向自己的话就是已经取消等待
     46             return item == this;
     47         }
     49         /**
     50          * Returns true if this node is known to be off the queue
     51          * because its next pointer has been forgotten due to
     52          * an advanceHead operation.
     53          * 如果当前节点已经不在队列中,即通过advanceHead操作将next指向自己了,则返回true。
     54          */
     55         boolean isOffList() {
     56             return next == this;
     57         }
     59         // Unsafe mechanics
     60         private static final sun.misc.Unsafe UNSAFE;
     61         private static final long itemOffset;
     62         private static final long nextOffset;
     64         static {
     65             try {
     66                 UNSAFE = sun.misc.Unsafe.getUnsafe();
     67                 Class<?> k = QNode.class;
     68                 itemOffset = UNSAFE.objectFieldOffset
     69                     (k.getDeclaredField("item"));
     70                 nextOffset = UNSAFE.objectFieldOffset
     71                     (k.getDeclaredField("next"));
     72             } catch (Exception e) {
     73                 throw new Error(e);
     74             }
     75         }
     76     }
     78     /** Head of queue */ 
     79     transient volatile QNode head; //队列头
     80     /** Tail of queue */
     81     transient volatile QNode tail; //队列尾
     82     /**
     83      * Reference to a cancelled node that might not yet have been
     84      * unlinked from queue because it was the last inserted node
     85      * when it was cancelled.
     86      * 指向一个已取消的节点,该节点可能尚未从队列中解除链接,因为它是在取消时最后插入的节点。
     87      */
     88     transient volatile QNode cleanMe;
     90     //初始化队列的时候,创建一个虚拟节点,既是头节点也是尾节点。
     91     TransferQueue() {
     92         QNode h = new QNode(null, false); // initialize to dummy node.
     93         head = h;
     94         tail = h;
     95     }
     97     /**
     98      * Tries to cas nh as new head; if successful, unlink
     99      * old head's next node to avoid garbage retention.
    100      * 尝试将头节点从h设置成nh,如果成功将原来的节点next指向自己,方便垃圾回收。
    101      */
    102     void advanceHead(QNode h, QNode nh) {
    103         if (h == head &&
    104             UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
    105             h.next = h; // forget old next
    106     }
    108     /**
    109      * Tries to cas nt as new tail. 尝试将尾节点从t设置成nt
    110      */
    111     void advanceTail(QNode t, QNode nt) {
    112         if (tail == t)
    113             UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
    114     }
    116     /**
    117      * Tries to CAS cleanMe slot.尝试将cleanMe从cmp重置为val
    118      */
    119     boolean casCleanMe(QNode cmp, QNode val) {
    120         return cleanMe == cmp &&
    121             UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
    122     }
    View Code


      1     /**
      2      * Puts or takes an item. 放置或者拿出一个item
      3      */
      4     @SuppressWarnings("unchecked")
      5     E transfer(E e, boolean timed, long nanos) {
      6         /* Basic algorithm is to loop trying to take either of
      7          * two actions:
      8          * 基本算法是循环尝试下面两个动作之一:
      9          *
     10          * 1. If queue apparently empty or holding same-mode nodes,
     11          *    try to add node to queue of waiters, wait to be
     12          *    fulfilled (or cancelled) and return matching item.
     13          *
     14          *    如果队列为空或者持有相同模式的节点,则尝试将节点加入到等待队列中,等待被fulfilled(或者取消)并返回匹配的数据。
     15          *
     16          * 2. If queue apparently contains waiting items, and this
     17          *    call is of complementary mode, try to fulfill by CAS'ing
     18          *    item field of waiting node and dequeuing it, and then
     19          *    returning matching item.
     20          *
     21          *    如果队列中包含等待节点,并且当前操作是与其模式互补的,尝试通过CAS设置等待节点的item字段来fulfill,并将等待节点出队,然后返回匹配的数据。
     22          *
     23          * In each case, along the way, check for and try to help
     24          * advance head and tail on behalf of other stalled/slow
     25          * threads.
     26          *
     27          * 在每一种情况下,在整个过程中,检查并试图帮助推进头和尾代表其他停滞/缓慢的线程。
     28          *
     29          * The loop starts off with a null check guarding against
     30          * seeing uninitialized head or tail values. This never
     31          * happens in current SynchronousQueue, but could if
     32          * callers held non-volatile/final ref to the
     33          * transferer. The check is here anyway because it places
     34          * null checks at top of loop, which is usually faster
     35          * than having them implicitly interspersed.
     36          *
     37          * 循环以null检查开始,以防止看到未初始化的头节点和尾节点。
     38          * 这在当前的同步队列中从未发生过,但如果调用者持有 非volatile/final的transferer引用,则可能发生。
     39          * 无论如何,检查都在这里,因为它将null检查放在循环的顶部,这通常比隐式地穿插检查要快。
     40          */
     42         QNode s = null; // constructed/reused as needed
     43         boolean isData = (e != null); //传入对象为空表示消费者(take),不为空表示生产者(put)
     45         for (;;) {
     46             QNode t = tail;
     47             QNode h = head;
     48             //如果transferer还未初始化完成,自旋等待。
     49             //这里只会在调用者持有的transferer引用没有被volatile/final修饰的时候才可能发生。
     50             //因为构造transferer实例与将其赋值给一个非volatile/final变量之间不存在happens-before 关系。
     51             if (t == null || h == null)         // saw uninitialized value
     52                 continue;                       // spin
     54             //情况1:队列为空或者持有(队尾节点)相同模式的节点
     55             if (h == t || t.isData == isData) { // empty or same-mode
     56                 QNode tn = t.next;
     57                 if (t != tail)                  // inconsistent read 有新节点入队,重新循环
     58                     continue;
     59                 if (tn != null) {               // lagging tail 有新节点入队,但还没来得及将新节点设置成尾节点。
     60                     advanceTail(t, tn);         //帮助新入队的节点完成成为新的尾节点,然后重新循环
     61                     continue;
     62                 }
     63                 if (timed && nanos <= 0)        // can't wait  定时等待超时,直接返回null
     64                     return null;
     65                 if (s == null)
     66                     s = new QNode(e, isData);    //构造新节点
     67                 //先尝试将该新节点挂在当前尾节点的后面,失败表示被其它新节点抢先入队,重新循环重试。
     68                 if (!t.casNext(null, s))        // failed to link in
     69                     continue;
     70                 //在将新节点挂到当前尾节点的后面只后,再将其设为新的尾节点。
     71                 advanceTail(t, s);              // swing tail and wait
     73                 //等待互补的节点来匹配或者等待中断超时
     74                 Object x = awaitFulfill(s, e, timed, nanos); 
     75                 //在等待返回后,如果发现返回的结果等于自身,则说明在等待过程中已经取消了(中断或超时),则将节点从队列中清除
     76                 if (x == s) {                   // wait was cancelled
     77                     clean(t, s);
     78                     return null;
     79                 }
     80                 //等待等到了互补的节点,如果当前节点还没有出队,将其出队
     81                 if (!s.isOffList()) {           // not already unlinked
     82                     //如果当前节点是排在最头部的节点,即 head->s-s1-s2...这种情况,那么既然s要出队,就需要重新设置head
     83                     //就是把原来的head出队了,当前节点变成了新的head节点。
     84                     advanceHead(t, s);          // unlink if head
     85                     if (x != null)              // and forget fields
     86                         s.item = s;              //如果当前节点是一个请求数据的消费者,需要把获取到的数据忘记,方便垃圾回收。
     87                     s.waiter = null;
     88                 }
     89                 return (x != null) ? (E)x : e; //消费者返回获取的数据,生产者返回自己提供的数据
     91             }
     92             //情况2:队列不为空,并且当前操作与其(尾节点)模式互补的        
     93             else {                            // complementary-mode
     94                 //head.next就是与其互补的节点,稍后会尝试与其匹配
     95                 QNode m = h.next;               // node to fulfill
     96                 //队列这时候发生了变化:
     97                 //(1)有新节点入队 t != tail,(2)等待的节点取消(中断或超时)了等待 m == null,(3)已经被其它线程抢先匹配了 h != head
     98                 if (t != tail || m == null || h != head) 
     99                     continue;                   // inconsistent read 重新循环
    101                 //只有在m还没有被其它线程匹配,也没有取消时才尝试与其配对(m.casItem(x, e))
    102                 //否则不论m是取消了,还是被其它线程抢先匹配了,都将其出队(advanceHead(h, m))
    103                 Object x = m.item;
    104                 if (isData == (x != null) ||    // m already fulfilled
    105                     x == m ||                   // m cancelled
    106                     !m.casItem(x, e)) {         // lost CAS
    107                     advanceHead(h, m);          // dequeue and retry
    108                     continue;
    109                 }
    110                 //成功匹配,将被匹配的m出队(其实还是出队的head,m变成了新的head)
    111                 advanceHead(h, m);              // successfully fulfilled
    112                 LockSupport.unpark(m.waiter);    //唤醒等待线程,使其返回
    113                 return (x != null) ? (E)x : e;    //当前操作是消费者则返回获取的数据,否则返回自己提供的数据
    114             }
    115         }
    116     }
    118     /**
    119      * Spins/blocks until node s is fulfilled.
    120      *
    121      * @param s the waiting node
    122      * @param e the comparison value for checking match
    123      * @param timed true if timed wait
    124      * @param nanos timeout value
    125      * @return matched item, or s if cancelled
    126      */
    127     Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    128         /* Same idea as TransferStack.awaitFulfill */
    129         final long deadline = timed ? System.nanoTime() + nanos : 0L;
    130         Thread w = Thread.currentThread();
    131         //如果当前节点s就是排在最前面的(head.next),很可能立即就有互补线程来匹配,所以应该自旋
    132         int spins = ((head.next == s) ?
    133                      (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    134         for (;;) {
    135             if (w.isInterrupted()) //如果中断了就取消(将item指向自身)
    136                 s.tryCancel(e);
    137             Object x = s.item;
    138             if (x != e) //节点已经取消(中断或超时)或者被互补线程匹配了数据,可用返回了
    139                 return x;
    140             if (timed) {
    141                 nanos = deadline - System.nanoTime();
    142                 if (nanos <= 0L) { //定时等待超时了,也取消,然后返回
    143                     s.tryCancel(e);
    144                     continue;
    145                 }
    146             }
    147             if (spins > 0) //自旋
    148                 --spins;
    149             else if (s.waiter == null) // 准备进入阻塞之前先设置waiter,下一次循环再次检测状态之后才阻塞
    150                 s.waiter = w;
    151             else if (!timed)
    152                 LockSupport.park(this);    //非定时阻塞
    153             else if (nanos > spinForTimeoutThreshold)
    154                 LockSupport.parkNanos(this, nanos); //定时阻塞
    155         }
    156     }
    158     /**
    159      * Gets rid of cancelled node s with original predecessor pred.使用原来的前驱节点pred删除已取消的节点s。
    160      */
    161     void clean(QNode pred, QNode s) {
    162         s.waiter = null; // forget thread
    163         /*
    164          * At any given time, exactly one node on list cannot be
    165          * deleted -- the last inserted node. To accommodate this,
    166          * if we cannot delete s, we save its predecessor as
    167          * "cleanMe", deleting the previously saved version
    168          * first. At least one of node s or the node previously
    169          * saved can always be deleted, so this always terminates.
    170          * 
    171          * 清理节点的时候,只有非尾节点才能立即被清理,对于尾节点(也即最后一个入队的节点)不能立即清理,
    172          * 而是将其前驱节点保存到 cleanMe字段,等下一次调用clean方法的时候,发现尾节点已经变化了,
    173          * 即上一次保存的节点由尾节点变成了非尾节点的时候才将其清理掉,这是对尾节点的延迟清理方式。
    175          * 在任意时刻,都不能删除队列中最后插入的那个节点。为了适应这种情况,
    176          * 如果我们不能删除节点s,我们将其前驱节点保存到"cleanMe"字段,先删除之前保存的版本。
    177          * 至少节点s或者之前保存的节点总是能够被删除,所以结束。
    178          */
    180          //如果s与其前驱节点已经断开连接,不做处理,直接返回。
    181         while (pred.next == s) { // Return early if already unlinked
    182             QNode h = head;
    183             QNode hn = h.next;   // Absorb cancelled first node as head
    184             /*
    185              * 如果第一个有效节点(head.next)已经取消了,需要将其出队。
    186              * 这里并不是直接使head.next出队,而是使head出队,使head.next成为head。
    187              * advanceHead还会将出队的head.next指向自身(h.next = h)使原来的head出队.
    188              */
    189             if (hn != null && hn.isCancelled()) {
    190                 advanceHead(h, hn);
    191                 continue;
    192             }
    193             QNode t = tail;      // Ensure consistent read for tail
    194             if (t == h) 
    195                 return; //队列为空,返回
    196             QNode tn = t.next;
    197             if (t != tail) //尾节点已经变化,即有新节点加入到队列中,重新循环
    198                 continue;
    199             //这里说明有新节点入队,但是它只是将新节点设置为原来tail的next,还未来得及将新节点设置为新的tail
    200             if (tn != null) { 
    201                 advanceTail(t, tn); //帮助它完成身份的转变--改变tail指向新的尾节点
    202                 continue;
    203             }
    204             //s不是尾节点,尝试将s从队列中断开连接,即将pred.next指向s.next,成功返回
    205             if (s != t) {        // If not tail, try to unsplice
    206                 QNode sn = s.next;
    207                 if (sn == s || pred.casNext(s, sn))
    208                     return;
    209             }
    210             //s是尾节点,
    211             QNode dp = cleanMe;
    212             /*
    213              * cleanMe表示上一次推迟清理的节点的前驱节点。
    214              * cleanMe不为空,表示已经有需要清理的节点,就先尝试清理上一次取消的节点
    215              * 
    216              */
    217             if (dp != null) {    // Try unlinking previous cancelled node
    218                 QNode d = dp.next;  //d就是上一次需要被清理的节点
    219                 QNode dn;
    220                 /*
    221                  * 下面一大堆判断条件,其实就是只有在需要被清理的节点d:
    222                  * (1)还存在队列中,(2)确实已经取消了,(3)并且不是尾节点,这些情况下才清理。
    223                  *  注意清理的方法dp.casNext(d, dn)是直接放在if判断条件里面的,
    224                  *  如果d已经不存在了或者被本次调用清理了就将cleanMe还原成null
    225                  */
    226                 if (d == null ||               // d is gone or          d消失了
    227                     d == dp ||                 // d is off list or      d已经出队了
    228                     !d.isCancelled() ||        // d not cancelled or    d没有被取消
    229                     (d != t &&                 // d not tail and    d不是尾节点
    230                      (dn = d.next) != null &&  //   has successor    d还有后继节点
    231                      dn != d &&                //   that is on list    d还没有断开与队列的连接
    232                      dp.casNext(d, dn)))       // d unspliced        尝试断开d与队列的连接
    234                     casCleanMe(dp, null);         //重置cleanMe为空
    235                 /*
    236                  * 与上一次取消的节点是同一个前驱节点,如果上面的if语句已经清理成功,那么s已经被清理
    237                  * 否则,s的前驱依然保存在cleanMe字段中,只有等下一次在清理。
    238                  */
    239                 if (dp == pred)            
    240                     return;      // s is already saved node
    241             } else if (casCleanMe(null, pred)) //将前驱节点保存到cleanMe字段,返回
    242                 return;          // Postpone cleaning s 推迟清理s节点
    243         }
    244     }
    246     private static final sun.misc.Unsafe UNSAFE;
    247     private static final long headOffset;
    248     private static final long tailOffset;
    249     private static final long cleanMeOffset;
    250     static {
    251         try {
    252             UNSAFE = sun.misc.Unsafe.getUnsafe();
    253             Class<?> k = TransferQueue.class;
    254             headOffset = UNSAFE.objectFieldOffset
    255                 (k.getDeclaredField("head"));
    256             tailOffset = UNSAFE.objectFieldOffset
    257                 (k.getDeclaredField("tail"));
    258             cleanMeOffset = UNSAFE.objectFieldOffset
    259                 (k.getDeclaredField("cleanMe"));
    260         } catch (Exception e) {
    261             throw new Error(e);
    262         }
    263     }
    264 }
    View Code

    首先队列的数据存取方式与LinkedBlockingQueue相类似,最开始队列有指向同一个实例的首尾节点head和tail,一但有节点入队(总是从尾部入队)则成为新的尾节点tail,某一个时刻不为空的队列数据像这样:head- > n1- > n2 ->n3 ....nN(tail),即队列有一个非实际的head节点,所有真正的实例节点都在它后面排列;队列中元素的出队都是从对头开始的,假设现在n1要出队,那么实际出队的节点其实是head,只不过将n1携带的数据剥离掉使其成为了新的不携带数据的head:n1(head) -> n2 -> n3....nN(tail)。







     1 private transient volatile Transferer<E> transferer;
     3 /**
     4  * Creates a {@code SynchronousQueue} with nonfair access policy. 默认创建非公平策略的实例
     5  */
     6 public SynchronousQueue() {
     7     this(false);
     8 }
    10 /**
    11  * Creates a {@code SynchronousQueue} with the specified fairness policy.
    12  * 传入true,创建公平的实例,等待线程使用FIFO队列排队
    13  */
    14 public SynchronousQueue(boolean fair) {
    15     transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    16 }
    View Code



     1 /**
     2  * 将指定的元素添加到此队列中,等待另一个线程接收它或者被中断。
     3  * 不允许放入null元素,
     4  */
     5 public void put(E e) throws InterruptedException {
     6     if (e == null) throw new NullPointerException();
     7     if (transferer.transfer(e, false, 0) == null) {
     8         Thread.interrupted();
     9         throw new InterruptedException();
    10     }
    11 }
    13 /**
    14  * 如果另一个线程正在等待接收指定的元素,则将其插入此队列。
    15  * 只有当已经有一个线程正在等待接收数据,该方法立即返回true,否则立即返回false
    16  */
    17 public boolean offer(E e) {
    18     if (e == null) throw new NullPointerException();
    19     return transferer.transfer(e, true, 0) != null;
    20 }
    View Code

    offer的超时版本的方法并没有贴出来,put的实现是transferer.transfer(e, false, 0),第二个参数timed是false,所以put是非定时的无限制等待方法,直到有线程取走该线程放入的数据或被中断。而offer的实现是transferer.transfer(e, true, 0),第二个参数timed是true,所以是定时等待,但是第三个表示等待时长的参数是0,所以offer会立即返回,如果调用offer的时候存在一个线程正在等待接收数据,则offer方法立即返回true,否则立即返回false。


     1 /**
     2  * 检索并删除此队列的头,等待直到另一个线程插入它或者被中断
     3  */
     4 public E take() throws InterruptedException {
     5     E e = transferer.transfer(null, false, 0);
     6     if (e != null)
     7         return e;
     8     Thread.interrupted();
     9     throw new InterruptedException();
    10 }
    13 /**
    14  * 如果另一个线程当前正在放入数据,则检索并删除此队列的头。
    15  */
    16 public E poll() {
    17     return transferer.transfer(null, true, 0);
    18 }
    View Code

    poll的超时版本的方法并没有贴出来,take的实现是transferer.transfer(null, false, 0),第二个参数timed是false,所以take是非定时的无限制等待方法,直到有线程放入数据被该线程获取到或被中断。而poll的实现是transferer.transfer(null, true, 0),第二个参数timed是true,所以是定时等待,但是第三个表示等待时长的参数是0,所以poll会立即返回,如果调用poll的时候存在一个线程正在放入数据,并被当前线程接收则poll返回接收到的数据,否则返回null。




  • 相关阅读:
    结对项目-四则运算 “软件”之升级版
    第三次作业:个人项目-小学四则运算 “软件”之初版
  • 原文地址:https://www.cnblogs.com/txmfz/p/10346856.html
Copyright © 2011-2022 走看看