zoukankan      html  css  js  c++  java
  • ConcurrentLinkedQueue

           ConcurrentLinkedQueue是一种非阻塞的线程安全队列,与阻塞队列LinkedBlockingQueue相对应,ConcurrentLinkedQueue同样也是使用链表实现的FIFO队列,但不同的是它没有使用任何锁机制,而是用CAS来实现线程安全。

    1,成员变量

    1 //头结点,transient表示该成员变量不会被序列化,volatile表示该变量的可见性和有序性
    2 // head永远不会为null,它也不含数据域
    3 //head.next是它本身,其他任何活动的节点通过succ方法,都能找到head节点
    4 private transient volatile Node<E> head;
    5 //可能的尾结点,该节点仅仅是一个优化,在O(1)的时间复杂度内查找尾结点
    6 //最好还是使用head.next在O(n)的时间复杂度内找到节点
    7 private transient volatile Node<E> tail;

      head和tail作为链表的首尾节点存在,说明ConcurrentLinkedQueue使用双向链表实现的,改双向链表存储着全部数据,但是head和tail都被transient修饰,不会被序列化,由此可以推断,        ConcurrentLinkedQueue应当实现了writeObject和readObject序列化方法来完成序列化。 

     1 private void writeObject(java.io.ObjectOutputStream s) throws java.io.Exception {
     2     s.defaultWriteObject();
     3     //从头遍历节点,写入流
     4     for(Node<E> p = first(); p != null; p = succ(p)) {
     5         Object item = p.item;
     6         if(item != null) {
     7             s.writeObject(item);
     8         }
     9         //写入null作为结束符
    10         s.writeObject(null);
    11     }
    12 }
    13 
    14 private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException {
    15     s.defaultReadObject();
    16     //读取元素直到读取到结束符null
    17     Node<E> h = null;
    18     Object item;
    19     while((item = s.readObject()) != null) {
    20         @SuppressWarnings("unchecked")
    21         Node<E> newNode = new Node<E>((E)item);
    22         if(h == null) {
    23             h = t = newNode;
    24         } else {
    25             t.lazySetNext(newNode);
    26             t.newNode;
    27         }
    28     }
    29     if(h == null) {
    30         h = t = new Node<E>(null);
    31     }
    32     head = h;
    33     tail = t;
    34 }

    2,UNSAFE和CAS在ConcurrentLinkedQueue里的应用 

      UNSAFE是java提供的一个不安全的操作类,他可以通过直接操作内存来灵活操作java对象,

     1 static {
     2     try {
     3         //获取UNSAFE对象,只有jre的类才能使用此种方式获取
     4         UNSAFE = sun.misc.Unsafe.getUnsafe();
     5         Class<?> k = ConcurrentLinkedQueue.class;
     6         //获取head字段在ConcurrentLinkedQueue勒种内存地址偏移量
     7         headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));
     8         //获取tail字段在ConcurrentLinkedQueue类中的内存地址偏移量
     9         tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail"));
    10     } catch (Exception e) {
    11         throw new Error();
    12     }
    13 }

           ConcurrentLinkedQueue的CAS操作针对的是head和tail,其节点实现类的CAS操作则针对的是next(下一个节点)和item(数据域)。 

    1 //比较交换尾结点,判断tail和cmp是否一致,如果一直,那么设置tail为val,返回true,否则false
    2 private boolean casTail(Node<E> cmp, Node<E> val) {
    3     return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
    4 }
    5 //比较交换头结点,判断head和cmp是否一致,如果一致,那么设置head为val,返回true,否则false
    6 private boolean casHead(Node<E> cmp. Node<E> val) {
    7     return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
    8 }

    3,succ方法 

           succ方法用于保证取出next节点时的安全性,避免出现循环闭合

    1 final Node<E> succ(Node<E> p) {
    2     Node<E> next = p.next;
    3     //当p == next时说明链表发生了闭合,需要从head重新开始
    4     return (p == next) ? head : next;
    5 }

    4,add/offer方法 

           add和offer是ConcurrentLinkedQueue提供的入队方法,add方法调用的是offer。

     1 public boolean offer(E e) {
     2     checkNotNull(e);
     3     final Node<E> newNode = new Node<E>(e);
     4     for(Node<E> t = tail, p = t;;) {
     5         Node<E> q = p.next;
     6         if(q == null) {
     7             //p.next为null说明p为最后一个节点,调用casNext,如果p依然是最后一个节点
     8             //那么设置newNode为p的next节点,newNode也就是新的尾结点
     9             if(p.casNext(null, newNode)) {
    10                 //p不为tail时,设置tail节点为newNode
    11                 if(p != t) {
    12                     casTail(t, newNode);
    13                 }
    14                 return true;
    15             }
    16         } else if(p == q) {
    17             //p等于1是个特殊情况,说明该链表发生了闭合,如果t依然还是tail,那么需要重新从head寻找尾结点
    18             p = (t != (t = tail)) ? t : q;
    19         } else {
    20             //根据p节点的状态是否被其它线程改变,来决定p重新指向tail节点或者next节点
    21             p = (p != t && t != (t = tail)) ? t : q;
    22         }
    23     }
    24 }

           ConcurrentLinkedQueue的同步非阻塞算法使用循环+CAS来实现,这一类的源码阅读不能按照线性代码执行的思维去思考,而是应该用类似状态机的思路去理解。 

    1)       在确定达到执行目的前,循环不会终止

    2)       非线程安全的全局变量要用局部变量引用以保证初始状态。

    3)       由于全局变量坑呢过被其它线程修改,在使用对应局部变量时,要验证是否合法。

    4)       最终赋值要用CAS方法以保证原子性,避免线程发生不期望的修改。

    变量含义:

    1)       p节点的期望值为最后一个节点

    2)       newNode是新节点,期望添加到p节点之后

    3)       q节点为p节点的后继节点,可能为null,也可能因为多线程修改而不为空(指向新节点)

    4)       t节点为代码执行开始时的tail节点(成员变量),也可能因为多线程修改了tail节点从而和tail几点不一致。

    执行目的:

    1)       newNode作为新节点,需要插入到最后一个节点的next位置,如果它成为了最后一个节点,那么把它设置为尾结点

    2)       需要注意的是,多线程环境下,再多个插入同时进行,不保证节点顺序与执行顺序的一致性,当然,这不影响执行成功。

    状态解析:

    1)       该插入算法,是以p节点的状态判断为核心

    2)       当p节点的下一个节点为null时,说明没有后继节点,此时执行p.casNext(null,newNode),如果失败,那么说明其他线程在之前的瞬间修改了p.next,此时就需要从头开始再找一次尾结点,如果成功,则执行目的达到,循环体可以结束。

    3)       当p节点和q节点相等,这时链表因为发生了闭合,这时一个特殊的情况,产生的原因有很多种,但本质上是因为保证效率导致意外情况,tail作为尾结点的引用可以在O(1)的时间复杂内可以找到,但是,tail是可变的,所以其next可能指向它本身(比如重新设置casTail代码可能还没执行)。所以,如果t不是tail,那么使用tail重新计算,如果依然是tail,那么需要重置p为head,从头开始遍历链表,虽然复杂度o(n),但是能保证以正确的方式找到队尾。

    4)       如果以上情况都不满足,那么判断p是否还是队尾,不满足则设置为队尾,满足则p重新指向p.next,这里可能产生疑惑,队尾tail节点的next不应该是null吗。

    其实,tail是一个优化算法,不代表真正的队尾,它有三种状态:

    a)       初始化时,它是head

    b)       奇数次插入时,它是队尾

    c)        偶数次插入时,它是队尾的前一个节点。

    由此可知,p==q一定发生在q!=null的时候

    5)       合理需要注意以下代码

    1 if(p != t) casTail(t, newNode)

      在q == null的时候,说明p应当为最后一个节点,如果p != t,那么说明tail并不是尾结点,而是尾结点的前驱节点,此时需要重新设置tail为newNode,之后,tail会指向真正的尾结点。正是这句代码导致了奇数次插入时tail是队尾,偶数次是队尾前的一个节点。 

    5,poll/peek方法

           poll用于取出队头节点,peek用于查看队头节点,原理和add/offer类似,不多赘述。

    6,size方法和isEmpty

           size方法用于获取队列的长度,该方法效率极低,需要遍历所有节点,因为offer和poll都没有维护队列的长度。,而isEmpty时间复杂度比size要高很多,因此,尽量不要使用size()而是使用isEmpty。

  • 相关阅读:
    jenkins之配置构建执行
    神器sublime02-编写python
    神器sublime02-连接github提交代码
    神器sublime01-基础使用
    阿里云部署opms用于练习自动化
    Spark性能优化之道——解决Spark数据倾斜(Data Skew)的N种姿势
    修改Flume-NG的hdfs sink解析时间戳源码大幅提高写入性能
    Hbase 布隆过滤器BloomFilter介绍
    Hadoop创始人Doug Cutting寄语2017:五种让开源项目成功的方法
    HBase在数据统计应用中的使用心得
  • 原文地址:https://www.cnblogs.com/guanghe/p/13494544.html
Copyright © 2011-2022 走看看