zoukankan      html  css  js  c++  java
  • 【JUC源码解析】SynchronousQueue

    简介

    SynchronousQueue是一种特殊的阻塞队列,该队列没有容量。

    【存数据线程】到达队列后,若发现没有【取数据线程】在此等待,则【存数据线程】便入队等待,直到有【取数据线程】来取数据,并释放【存数据线程】;

    同理,【取数据线程】到达队列后,若发现没有【存数据线程】在此等待,则【取数据线程】便入队等待,直到有【存数据线程】来存数据,并释放【取数据线程】。

    公平模式,由伪栈实现,TransferStack

    公平模式,由伪队列实现,TransferQueue

    源码解析

    Transferer

    1     abstract static class Transferer<E> {
    2         abstract E transfer(E e, boolean timed, long nanos); // 实现一个put或者take操作
    3     }

    TransferStack

    非公平模式

    属性

    1         static final int REQUEST = 0; // 未得到数据的消费者
    2         static final int DATA = 1; // 未交出数据的生产者
    3         static final int FULFILLING = 2; // 正在与另外一个消费者/生产者匹配
    4         volatile SNode head; // stack的头结点

    内部类SNode

    属性

    1             volatile SNode next; // 指向下一个结点
    2             volatile SNode match; // 与当前结点匹配的结点
    3             volatile Thread waiter; // 记录当前线程
    4             Object item; // 数据,对于消费者,为空
    5             int mode; // 模式,取值:REQUEST, DATA, FULFILLING

    构造方法

    1             SNode(Object item) {
    2                 this.item = item;
    3             }

    关键方法

     1             boolean casNext(SNode cmp, SNode val) { // CAS next属性,cmp -> val
     2                 return cmp == next && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
     3             }
     4 
     5             boolean tryMatch(SNode s) { // 尝试匹配结点s和当前结点
     6                 if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { // 如果匹配成功
     7                     Thread w = waiter;
     8                     if (w != null) {
     9                         waiter = null;
    10                         LockSupport.unpark(w); // 唤醒阻塞的线程
    11                     }
    12                     return true;
    13                 }
    14                 return match == s;
    15             }
    16 
    17             void tryCancel() { // CAS match属性,null -> this
    18                 UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
    19             }
    20 
    21             boolean isCancelled() { // 判断此结点是否已经被取消
    22                 return match == this;
    23             }

    关键方法

     1         static boolean isFulfilling(int m) { // 查看结点是否处于匹配模式
     2             return (m & FULFILLING) != 0;
     3         }
     4 
     5         boolean casHead(SNode h, SNode nh) { // CAS head属性,h -> nh
     6             return h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
     7         }
     8 
     9         static SNode snode(SNode s, Object e, SNode next, int mode) { // 创建一个新的结点,或者重置s结点的相关域
    10             if (s == null)
    11                 s = new SNode(e);
    12             s.mode = mode;
    13             s.next = next;
    14             return s;
    15         }
    16         
    17         boolean shouldSpin(SNode s) { // 是否应该自旋等待
    18             SNode h = head;
    19             return (h == s || h == null || isFulfilling(h.mode));
    20         }

    transfer()

    基本的算法是在一个无限循环中,每次执行下面三种情况的其中一种:

    1. 如果当前栈为空,或者已经包含与当前结点模式相同的结点,尝试入栈,并一直等待,直到遇到与之匹配(模式互补)的结点前来将其唤醒,并返回结果:a. 如果被取消,则返回null; b.如果当前结点是消费者,则返回匹配结点的数据;c. 如果当前结点是生产者,则返回当前结点的数据。
    2. 如果栈中包含与当前结点模式互补的结点,则设置当前结点的模式为FULFILLING,并尝试入栈,和对应的结点互相匹配,完成后,双双出栈,并返回生产者的数据。
    3. 如果栈顶结点的模式是FULFILLING,说明此刻有结点正在配对,当前线程帮助它们配对和弹出栈,然后在处理自己的事情,继续循环执行相应的操作。
     1         E transfer(E e, boolean timed, long nanos) {
     2             SNode s = null;
     3             int mode = (e == null) ? REQUEST : DATA; // 当前结点的模式
     4             for (;;) {
     5                 SNode h = head;
     6                 if (h == null || h.mode == mode) { // 栈为空,或者栈顶结点与当前结点的模式相同
     7                     if (timed && nanos <= 0) { // 超时
     8                         if (h != null && h.isCancelled())
     9                             casHead(h, h.next); // 弹出已经取消的结点
    10                         else
    11                             return null;
    12                     } else if (casHead(h, s = snode(s, e, h, mode))) { // 构建结点入栈
    13                         SNode m = awaitFulfill(s, timed, nanos); // 在此等待匹配结点唤醒自己
    14                         if (m == s) { // 已取消
    15                             clean(s); // 清理工作
    16                             return null;
    17                         }
    18                         if ((h = head) != null && h.next == s)
    19                             casHead(h, s.next); // 帮助配对成功的结点出栈
    20                         return (E) ((mode == REQUEST) ? m.item : s.item); // 返回生产者的数据
    21                     }
    22                 } else if (!isFulfilling(h.mode)) { // 尝试匹配结点
    23                     if (h.isCancelled()) // 已经被取消
    24                         casHead(h, h.next); // 更新head
    25                     else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) { // 设置为FULFILLING模式入栈
    26                         for (;;) { // 寻找匹配者
    27                             SNode m = s.next; // m是s的匹配者
    28                             if (m == null) { // 为空
    29                                 casHead(s, null); // 弹出已经配对的结点
    30                                 s = null; // 下一次重新构造结点
    31                                 break; // 重新开始
    32                             }
    33                             SNode mn = m.next;
    34                             if (m.tryMatch(s)) {
    35                                 casHead(s, mn); // 弹出s和m
    36                                 return (E) ((mode == REQUEST) ? m.item : s.item); // 返回生产者的数据
    37                             } else // 失败
    38                                 s.casNext(m, mn); // 剔除m
    39                         }
    40                     }
    41                 } else {
    42                     SNode m = h.next; // m是h的匹配者
    43                     if (m == null) // 为空
    44                         casHead(h, null); // 弹出已经配对的结点
    45                     else {
    46                         SNode mn = m.next;
    47                         if (m.tryMatch(h)) // 帮助完成匹配
    48                             casHead(h, mn); //  弹出h和m
    49                         else // 失败
    50                             h.casNext(m, mn); // 剔除m
    51                     }
    52                 }
    53             }
    54         }

    awaitFulfill()

    在线程阻塞之前,设置到结点的waiter域,并且检查一次线程线程的中断状态,若中断则取消。如果执行结点处于栈顶,阻塞之前会自旋一会儿,说不定马上就有结点来匹配,这样就不用阻塞了。主循环检查顺序: 中断优先于正常返回,正常返回优先于超时。

     1         SNode awaitFulfill(SNode s, boolean timed, long nanos) {
     2             final long deadline = timed ? System.nanoTime() + nanos : 0L; // 超时时间点
     3             Thread w = Thread.currentThread(); // 当前线程
     4             int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); // 自旋次数
     5             for (;;) {
     6                 if (w.isInterrupted()) // 检查中断,若中断,则取消此结点
     7                     s.tryCancel();
     8                 SNode m = s.match;
     9                 if (m != null) // 正常返回
    10                     return m;
    11                 if (timed) { // 检查超时
    12                     nanos = deadline - System.nanoTime();
    13                     if (nanos <= 0L) { // 若超时,取消此结点
    14                         s.tryCancel();
    15                         continue;
    16                     }
    17                 }
    18                 if (spins > 0) // 自旋
    19                     spins = shouldSpin(s) ? (spins - 1) : 0;
    20                 else if (s.waiter == null)
    21                     s.waiter = w; // 记录线程
    22                 else if (!timed)
    23                     LockSupport.park(this); // 阻塞线程
    24                 else if (nanos > spinForTimeoutThreshold)
    25                     LockSupport.parkNanos(this, nanos); // 带超时的阻塞
    26             }
    27         }

    clean()

    清理分3步

    1. 清理s结点,并判断s的next结点past,如果past也取消了,则跳过此结点,使得past变量指向下一个结点,到此为止,此为清理工作的最大深度
    2. 从head结点开始,依次跳过已经取消的结点,直到遇到未取消的结点(或者遇到past结点,或为空),重新设置head结点为p结点
    3. 从p到past结点,清理掉所有已经取消的结点
     1         void clean(SNode s) {
     2             s.item = null; // 清理s结点
     3             s.waiter = null;
     4 
     5             SNode past = s.next; // s的下一个结点past
     6             if (past != null && past.isCancelled()) // 如果past也取消了,则直接跳到past的下一个结点
     7                 past = past.next;
     8 
     9             SNode p;
    10             while ((p = head) != null && p != past && p.isCancelled()) // 从head结点开始,遍历清理已经取消的结点,直到遇到没有被取消的结点,并设置为新的head结点
    11                 casHead(p, p.next);
    12 
    13             while (p != null && p != past) { // 从p结点到past结点(但不包括past), 遍历清理所有已经取消的结点
    14                 SNode n = p.next;
    15                 if (n != null && n.isCancelled())
    16                     p.casNext(n, n.next);
    17                 else
    18                     p = n;
    19             }
    20         }

    TransferQueue

    公平模式

    内部类QNode

    属性

    1             volatile QNode next; // 指向下一个结点
    2             volatile Object item; // 存放数据,isData为false时,该节点为null, 为true时,匹配后,该节点会置为null
    3             volatile Thread waiter; // 控制线程的park/unpark
    4             final boolean isData; // 表示该结点是存数据还是取数据

    关键方法

     1             QNode(Object item, boolean isData) { // 构造方法
     2                 this.item = item;
     3                 this.isData = isData;
     4             }
     5 
     6             boolean casNext(QNode cmp, QNode val) { // CAS next域 cmp -> val
     7                 return next == cmp && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
     8             }
     9 
    10             boolean casItem(Object cmp, Object val) { // CAS offset域 cmp -> val
    11                 return item == cmp && UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    12             }
    13 
    14             void tryCancel(Object cmp) { // 取消结点
    15                 UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
    16             }
    17 
    18             boolean isCancelled() { // 检查是否已取消
    19                 return item == this;
    20             }
    21 
    22             boolean isOffList() { // 检查结点是否离开了队列
    23                 return next == this;
    24             }

    属性

    1         transient volatile QNode head; // 头结点
    2         transient volatile QNode tail; // 尾结点
    3         transient volatile QNode cleanMe; // 当一个结点被标记取消时,恰巧又是最后也给结点,那么将cleanMe作为该结点的predecessor

    关键方法

     1         TransferQueue() { // 构造方法
     2             QNode h = new QNode(null, false); // 初始化 dummy node.
     3             head = h;
     4             tail = h;
     5         }
     6 
     7         void advanceHead(QNode h, QNode nh) { // CAS head域,h -> nh
     8             if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
     9                 h.next = h;
    10         }
    11 
    12         void advanceTail(QNode t, QNode nt) { // CAS tail域,t -> nt
    13             if (tail == t)
    14                 UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
    15         }
    16 
    17         boolean casCleanMe(QNode cmp, QNode val) { // CAS cleanMe域,cmp -> val
    18             return cleanMe == cmp && UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
    19         }

    transfer()

    基本的算法是在一个无限循环中,每次执行下面两种情况的其中一种:

    1. 如果当前队列为空,或者已经与当前结点模式相同的结点,尝试入队,并一直等待,直到遇到与之匹配(模式互补)的结点前来将其唤醒,并返回匹配结点的数据。
    2. 如果队列中包含与当前结点模式互补的结点,则尝试和对应的结点互相匹配,完成后,将等待结点出队,并返回匹配结点的数据。
    3. 在每个动作里面,都会检测并帮助其他线程来完成节点推进。
     1         E transfer(E e, boolean timed, long nanos) {
     2             QNode s = null;
     3             boolean isData = (e != null);
     4 
     5             for (;;) {
     6                 QNode t = tail;
     7                 QNode h = head;
     8                 if (t == null || h == null) // 当前线程看到未初始化的头尾结点
     9                     continue; // 自旋
    10 
    11                 if (h == t || t.isData == isData) { // 队列为空,或者包含相同模式的结点
    12                     QNode tn = t.next;
    13                     if (t != tail) // 过期数据
    14                         continue;
    15                     if (tn != null) { // 别的线程添加了新的结点,帮助更新tail域
    16                         advanceTail(t, tn);
    17                         continue;
    18                     }
    19                     if (timed && nanos <= 0) // 超时
    20                         return null;
    21                     if (s == null)
    22                         s = new QNode(e, isData); // 构造结点
    23                     if (!t.casNext(null, s)) // 连接失败
    24                         continue;
    25 
    26                     advanceTail(t, s); // 设置s为tail结点
    27                     Object x = awaitFulfill(s, e, timed, nanos); // 等待匹配
    28                     if (x == s) { // 如果取消,清理结点
    29                         clean(t, s);
    30                         return null;
    31                     }
    32 
    33                     if (!s.isOffList()) { // 如果s未离队
    34                         advanceHead(t, s); // 设置s为head结点
    35                         if (x != null)
    36                             s.item = s;
    37                         s.waiter = null;
    38                     }
    39                     return (x != null) ? (E) x : e;
    40 
    41                 } else { // 互补模式
    42                     QNode m = h.next;
    43                     if (t != tail || m == null || h != head)
    44                         continue; // 读取的是过期的值,继续循环
    45 
    46                     Object x = m.item;
    47                     if (isData == (x != null) || // m已经被匹配了
    48                             x == m || // m被取消
    49                             !m.casItem(x, e)) { // CAS失败
    50                         advanceHead(h, m); // h出队,m设置为head结点,重来
    51                         continue;
    52                     }
    53 
    54                     advanceHead(h, m); // 成功,推进头节点
    55                     LockSupport.unpark(m.waiter); // 唤醒等到线程
    56                     return (x != null) ? (E) x : e;
    57                 }
    58             }
    59         }

    awaitFulfill()

     1         Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
     2             final long deadline = timed ? System.nanoTime() + nanos : 0L;
     3             Thread w = Thread.currentThread();
     4             int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
     5             for (;;) {
     6                 if (w.isInterrupted())
     7                     s.tryCancel(e);
     8                 Object x = s.item;
     9                 if (x != e)
    10                     return x;
    11                 if (timed) {
    12                     nanos = deadline - System.nanoTime();
    13                     if (nanos <= 0L) {
    14                         s.tryCancel(e);
    15                         continue;
    16                     }
    17                 }
    18                 if (spins > 0)
    19                     --spins;
    20                 else if (s.waiter == null)
    21                     s.waiter = w;
    22                 else if (!timed)
    23                     LockSupport.park(this);
    24                 else if (nanos > spinForTimeoutThreshold)
    25                     LockSupport.parkNanos(this, nanos);
    26             }
    27         }

    同TransferStack.

    clean()

    在任意时间点,只有最后入队的结点不能立即删除,因为考虑到无所并发,线程争用下沉到CPU指令级别(CAS),最后入队的结点同时会有CAS Tail的动作,所以不能同一时间点,对同一个元素有多个CAS操作,因此,如果是最后入队的结点,可以将删除操作滞后。先将cleanMe结点的next域指向改结点,等到有新的结点入队时,再考虑删除上一版本的结点,此时,已满足条件。

     1         void clean(QNode pred, QNode s) {
     2             s.waiter = null; // 置空waiter域
     3             while (pred.next == s) {
     4                 QNode h = head;
     5                 QNode hn = h.next;
     6                 if (hn != null && hn.isCancelled()) {
     7                     advanceHead(h, hn); // 推进head结点
     8                     continue;
     9                 }
    10                 QNode t = tail;
    11                 if (t == h)
    12                     return;
    13                 QNode tn = t.next;
    14                 if (t != tail)
    15                     continue;
    16                 if (tn != null) {
    17                     advanceTail(t, tn);
    18                     continue;
    19                 }
    20                 if (s != t) { // 如果s不是尾结点,直接将其删除
    21                     QNode sn = s.next;
    22                     if (sn == s || pred.casNext(s, sn))
    23                         return;
    24                 }
    25                 QNode dp = cleanMe;
    26                 if (dp != null) { // 尝试删除前一版本取消的结点,借助cleanMe结点
    27                     QNode d = dp.next;
    28                     QNode dn;
    29                     if (d == null || // d已经被删除
    30                             d == dp || // d已经出
    31                             !d.isCancelled() || // d没被取消
    32                             (d != t && // d not tail and
    33                                     (dn = d.next) != null && // 有后继结点
    34                                     dn != d && dp.casNext(d, dn))) // 删除d
    35                         casCleanMe(dp, null);
    36                     if (dp == pred)
    37                         return;
    38                 } else if (casCleanMe(null, pred))
    39                     return;
    40             }
    41         }

    行文至此结束。

    尊重他人的劳动,转载请注明出处:http://www.cnblogs.com/aniao/p/aniao_sq.html

  • 相关阅读:
    ArrayList 和 Vector 的区别
    Redis在springboot中使用,读取字符串
    初始化Mysql
    Redis 安装
    React-脚手架
    React virtual DOM explained in simple English/简单语言解释React的虚拟DOM
    数据结构
    书单(18-19)
    算法复杂度
    otrs离线部署
  • 原文地址:https://www.cnblogs.com/aniao/p/aniao_sq.html
Copyright © 2011-2022 走看看