zoukankan      html  css  js  c++  java
  • JUC之ConcurrentLinkedDeque源码分析

    前言

      我们知道Queue是一种具有FIFO特点的数据结构,元素只能在队首进行“入队”操作,在队尾进行“出队”操作。

      而Deque(double-ended queue这种数据结构,是一种双端队列,也就是说可以在任意一端进行 “入队”,也可以在任意一端进行 “出队”:

      

    Queue接口定义:

      它的接口比较简单,一共只有三种类型的操作:入队、出队、读取。

       

      每种操作类型,都给出了两种方法,区别就是其中一种操作在队列的状态不满足某些要求时,会抛出异常;另一种,则直接返回特殊值(如null)

    Deque接口定义

      Queue接口的所有方法Deque都具备,只不过队首/队尾都可以进行“出队”和“入队”操作:

      

      除此之外,Deque还可以当作“栈”来使用,我们知道“栈”是一种具有“LIFO”特点的数据结构。Deque提供了pushpoppeek这三个栈方法,一般实现这三个方法时,可以利用已有方法,即有如下映射关系:

      

     一、ConcurrentLinkedDeque简介

      ConcurrentLinkedDeque是JDK1.7时,JUC引入的集合工具。ConcurrentLinkedDeque作为双端队列,可以当作“栈”来使用,并且高效地支持并发环境。

      ConcurrentLinkedDeque和ConcurrentLinkedQueue一样,采用了无锁算法,底层基于自旋+CAS的方式实现。

      

    二、源码分析

    (1) 构造器

    // 空构造器
    public ConcurrentLinkedDeque() {
        head = tail = new Node<E>(null);  // 头部和尾部都为空节点
    }
    // 通过集合,构造队列
    public ConcurrentLinkedDeque(Collection<? extends E> c) {
        Node<E> h = null, t = null;
        for (E e : c) {    //循环
            checkNotNull(e);   // 空检查
            Node<E> newNode = new Node<E>(e); //创建节点
            if (h == null)   // 第一次循环,创建第一个节点
                h = t = newNode;
            else {  // 在队尾插入元素
                t.lazySetNext(newNode);
                newNode.lazySetPrev(t);
                t = newNode;
            }
        }
        initHeadTail(h, t);  // 最后设置头部和尾部节点
    }

    (2)属性

    private transient volatile Node<E> head;   // 头节点
    private transient volatile Node<E> tail;   // 尾结点
    private static final Node<Object> PREV_TERMINATOR, NEXT_TERMINATOR;  //终止节点
    private static final int HOPS = 2;    //移除节点时更新链表属性的阀值

    head/tail 的不变性:

    1. 第一个节点总是能以O(1)的时间复杂度从head通过prev链接到达;

    2. 最后一个节点总能以O(1)的时间复杂度tail通过next链接到达;

    3. 所有live节点(item不为null),都能从第一个节点通过调用 succ() 方法遍历可达;

    4. 所有live节点(item不为null的节点),都能从最后一个节点通过调用 pred() 方法遍历可达;

    5. head/tail 不能为 null;

    6. head 节点的 next 域不能引用到自身;

    7. head/tail 不会是GC-unlinked节点(但它可能是unlink节点)。

    head/tail的可变性:

    1. head/tail 节点的 item 域可能为 null,也可能不为 null;

    2. head/tail 节点可能从first/last/tail/head 节点访问时不可达;

    3. tail 节点的 next 域可以引用到自身。

    PREV_TERMINATOR / NEXT_TERMINATOR 终止节点

    • PREV_TERMINATOR:prev的终止节点,next指向自身,即 PREV_TERMINATOR.next = PREV_TERMINATOR。在 first 节点出列后,会把first.next指向自身(first.next=first),然后把prev设为 PREV_TERMINATOR
    • NEXT_TERMINATOR:next的终止节点,prev指向自身,即 NEXT_TERMINATOR.pre = NEXT_TERMINATOR。在 last 节点出列后,会把last.prev指向自身(last.prev=last),然后把next设为 NEXT_TERMINATOR

    (3)核心方法

    先看下数据结构代码

     static final class Node<E> {
            volatile Node<E> prev;  // 前驱节点
            volatile E item;   // 当前节点数据
            volatile Node<E> next;  //后继节点
            Node() { 
            }
            Node(E item) {
                UNSAFE.putObject(this, itemOffset, item);
            }
         // CAS 设置当前节点数据的值
    boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); }
         // CAS 新增下个节点内容
    void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); }
         // CAS 修改下个节点内容
    boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); }      // CAS 设置上个节点 void lazySetPrev(Node<E> val) { UNSAFE.putOrderedObject(this, prevOffset, val); }      boolean casPrev(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, prevOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long prevOffset; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; prevOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("prev")); itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }

      一个双链表结构,每入队一个元素就是插入一个Node类型的结点。字段head指向队列头,tail指向队列尾,通过Unsafe来CAS操作字段值以及Node对象的字段值。

      

      需要特别注意的是ConcurrentLinkedDeque包含两个特殊字段:PREV_TERMINATOR、NEXT_TERMINATOR。 这两个字段初始时都指向一个值为null的空结点,这两个字段在结点删除时使用。

        


     入队操作 

      双端队列与普通队列的区别是:双端队列既可以在“队尾”插入元素,也可以在“队首”插入元素。

      ConcurrentLinkedDeque的入队方法有很多:addFirst(e)addLast(e)offerFirst(e)offerLast(e)

    public void addFirst(E e) { linkFirst(e); }  // 添加到首位
    public void addLast(E e) { linkLast(e); }  //添加到末尾
    public boolean offerFirst(E e) {  
        linkFirst(e);
        return true;
    }
    public boolean offerLast(E e) {
        linkLast(e);
        return true;
    }

      可以看到,队入队”其实就是调用了linkFirst(e)方法,而队入队”是调用了 linkLast(e)方法。

    linkFirst 方法

    private void linkFirst(E e) {
            checkNotNull(e); // 空检查
            final Node<E> newNode = new Node<E>(e);  //创建新节点
            restartFromHead:
            for (;;)
                for (Node<E> h = head, p = h, q;;) {
              // 前驱节点 != null && 前驱的前驱节点 != null
    if ((q = p.prev) != null && (q = (p = q).prev) != null) p = (h != (h = head)) ? h : q; // 说明head被修改,返回head重新查找 else if (p.next == p) // 自连接节点,不能从p开始查找,退出重新循环 continue restartFromHead; else { // p 是第一个node,则更新新节点next指向p newNode.lazySetNext(p); // CAS piggyback
                // 尝试更新p的前驱指向新节点,更新失败则重新循环更新 if (p.casPrev(null, newNode)) { //新节点入队成功 if (p != h) // hop two nodes at a time casHead(h, newNode); //将新节点设置为头节点 return; } }
            
    // 执行到此处说明CAS操作失败,有其它线程也在队首插入元素
        } 
    }

    linkLast方法和上面类似,不在赘述。


     出队操作

      ConcurrentLinkedDeque的出队一样分为队首、队尾两种情况:removeFirst()pollFirst()removeLast()pollLast()

    public E removeFirst() { return screenNullResult(pollFirst()); }
    public E removeLast() { return screenNullResult(pollLast()); }
    // 移除第一个节点
    public E pollFirst() {
       // first() 找到第一个节点, succ()返回下一个节点
    for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; // 节点数据 if (item != null && p.casItem(item, null)) { // CAS 将当前节点数据清空 unlink(p); //取消当前节点连接 return item; //返回节点数据 } } return null; }
    // 移除最后一个节点
    public E pollLast() {
       // last() 找到最后一个节点, pred() 返回上一节点
    for (Node<E> p = last(); p != null; p = pred(p)) { E item = p.item; if (item != null && p.casItem(item, null)) { unlink(p); return item; } } return null; }

      上面涉及到的方法first()、last()、succ()、pred()

    // 返回首节点
    Node<E> first() { restartFromHead: for (;;)
           // 从head开始往前找
    for (Node<E> h = head, p = h, q;;) { if ((q = p.prev) != null && (q = (p = q).prev) != null)
                // 如果head被修改则返回新的head重新查找,否则继续往前(pred)查找 p = (h != (h = head)) ? h : q; else if (p == h || casHead(h, p)) // 找到的节点不是head节点,CAS修改head return p; else continue restartFromHead; } } // 返回尾结点 Node<E> last() { restartFromTail: for (;;) for (Node<E> t = tail, p = t, q;;) { if ((q = p.next) != null && (q = (p = q).next) != null) p = (t != (t = tail)) ? t : q; else if (p == t || casTail(t, p)) return p; else continue restartFromTail; } } // 返回后继节点 final Node<E> succ(Node<E> p) { Node<E> q = p.next; return (p == q) ? first() : q; } // 返回前驱节点 final Node<E> pred(Node<E> p) { Node<E> q = p.prev; return (p == q) ? last() : q; }

    CAS 修改节点的 item 为 null(即 “逻辑删除-logical deletion”),然后调用unlink(p)方法解除节点链接,最后返回 item。unlink(p)是移除节点的主方法

    void unlink(Node<E> x) {
            final Node<E> prev = x.prev; 
            final Node<E> next = x.next;
            if (prev == null) {  // 前驱为空,表示是第一个节点
                unlinkFirst(x, next);
            } else if (next == null) { // 后继为空,表示是最后一个节点
                unlinkLast(x, prev);
            } else {  // 中间节点
                Node<E> activePred, activeSucc;
                boolean isFirst, isLast;
                int hops = 1;
            // 从被删除节点往前找到第一个有效前驱节点
                for (Node<E> p = prev; ; ++hops) {
                    if (p.item != null) { // 找到有效节点
                        activePred = p;
                        isFirst = false;
                        break;
                    }
                    Node<E> q = p.prev;
                    if (q == null) {  // 已经到了头部了
                        if (p.next == p)  // 发现自链接,直接返回
                            return;
                        activePred = p;
                        isFirst = true;
                        break;
                    }
                    else if (p == q)  //同样是自链接
                        return;
                    else  // 更新循环指针
                        p = q;
                }
    
                // 从被删除节点往后找到第一个有效后继节点
                for (Node<E> p = next; ; ++hops) {
                    if (p.item != null) {  // 找到有效节点
                        activeSucc = p;
                        isLast = false;
                        break;
                    }
                    Node<E> q = p.next;
                    if (q == null) {  // 已经到队尾了
                        if (p.prev == p)  // 发现自链接,直接返回
                            return;
                        activeSucc = p;
                        isLast = true;
                        break;
                    }
                    else if (p == q) //自链接
                        return;
                    else
                        p = q; // 更新循环指针
                }
            // 如果已经积累了超过临界值的逻辑删除节点,或者是内部节点删除,我们需要进一步处理unlink / gc-unlink
    if (hops < HOPS && (isFirst | isLast)) return;         // 移除有效前驱和后继节点之间的那些节点(都是逻辑删除的节点),包括x节点本身,就是使 有效前驱 和 后继节点相连 skipDeletedSuccessors(activePred); skipDeletedPredecessors(activeSucc);
            // 如果更新的开头或者结尾,那么就可以尝试进行gc-unlink if ((isFirst | isLast) &&
                   // 确保前驱和后继的状态没有被改变
                  (activePred.next == activeSucc) && (activeSucc.prev == activePred) &&
                  (isFirst ? activePred.prev == null : activePred.item != null) && (isLast ? activeSucc.next == null : activeSucc.item != null)) {
              // 确保x节点不能从head/tail节点被访问 updateHead();
    updateTail(); x.lazySetPrev(isFirst ? prevTerminator() : x); // 前驱终结节点 x.lazySetNext(isLast ? nextTerminator() : x); // 后继终结节点 } } }

    unLinkFirst方法

    // 从first开始往后找到第一个有效节点,直到找到或者到达队列的最后一个节点为止,并把first的直接后继指向该有效节点:
    // 1) 如果first的后继本身就是有效节点,不做任何处理
    // 2) 否则往后依次找到第一个有效节点,并把first的后继指向该有效节点
    private
    void unlinkFirst(Node<E> first, Node<E> next) { for (Node<E> o = null, p = next, q;;) {
         // p是有效节点 || p是最后一个节点
    if (p.item != null || (q = p.next) == null) {
           // 第一次循环,o为null。从第二次循环开始,p是o的后继有效节点,并且p还没有断开o,将first后继指向该有效节点p
    if (o != null && p.prev != p && first.casNext(next, p)) { skipDeletedPredecessors(p);
              // 确保first还是第一个节点,没有被其他线程改变状态,并且它和它的后继节点p是直接相连接的,这种关系没有被破坏
    if (first.prev == null && (p.next == null || p.item != null) && p.prev == first) { updateHead(); // Ensure o is not reachable from head updateTail(); // Ensure o is not reachable from tail o.lazySetNext(o); o.lazySetPrev(prevTerminator()); } } return; } else if (p == q) return; else { o = p; p =
    q; } } }

    这里以pollFirst出队方法为例,其他方法逻辑都一样。

      先通过first()拿到队列头部的第一个节点,如果是活动节点(item不为null),则直接将item置为null,即完成了删除节点的第一步逻辑删除,

      然后执行unlink方法执行删除节点的第二unlinking、

      第三步GC-unlinking,unlink方法针对节点在不同的位置按不同的逻辑处理  

        ①如果出队的节点是队列的第一个节点,则执行unlinkFirst;

        ②如果是队列的最后一个节点,则执行unlinkLast,③否则表示是内部节点,执行unlink本身的通用节点逻辑。

    unlinkFirst的逻辑其实就分两个部分:

        ①实现从被移除节点p开始往后(队尾)找到第一个有效节点,直到找到或者到达队列的最后一个节点为止,并把p的直接后继指向该有效节点(如果本身不是其后继节点的话),其中的skipDeletedPredecessors方法实现将刚刚找到的后继节点的前驱也指向节点p,即完成它们的互联,这一步就是所谓的unlinking,使队列的活动节点无法访问被删除的节点;

        ②第二部分就是实现GC-unlinking了,通过updateHead、updateTail使被删除的节点无法从head/tail可达,最后让被删除节点后继自连接,前驱指向前向终结节点。

      如果是内部节点出队,执行unlink本身:

        先找到被删除节点x的有效前驱和后继节点,并记录它们中间的已经被逻辑删除的节点个数,如果已经积累了超过阈值的节点个数,或者是内部节点删除,

        我们需要进一步处理unlink/gc-unlink

          ①首先使被删除节点的有效前驱节点和后继节点互联,就相当于导致活动节点不会访问到中间已经被逻辑删除的节点(unlinking);

          ②若第①步导致重新链接到了对头或队尾,则通过updateHead、updateTail使被删除的节点无法从head/tail可达,最后让被删除节点自连接或者执行终结节点(GC-unlinking)。


    总结 

      ConcurrentLinkedDeque使用了自旋+CAS的非阻塞算法来保证线程并发访问时的数据一致性。

      由于队列本身是一种双链表结构,所以虽然算法看起来很简单,但其实需要考虑各种并发的情况,实现复杂度较高,并且ConcurrentLinkedDeque不具备实时的数据一致性,实际运用中,如果需要一种线程安全的栈结构,可以使用ConcurrentLinkedDeque。

    参考: https://segmentfault.com/a/1190000016284649

  • 相关阅读:
    mysql修改数据表名
    HDU 5742 It's All In The Mind (贪心)
    HDU 5752 Sqrt Bo (数论)
    HDU 5753 Permutation Bo (推导 or 打表找规律)
    HDU 5762 Teacher Bo (暴力)
    HDU 5754 Life Winner Bo (博弈)
    CodeForces 455C Civilization (并查集+树的直径)
    CodeForces 455B A Lot of Games (博弈论)
    CodeForces 455A Boredom (DP)
    HDU 4861 Couple doubi (数论 or 打表找规律)
  • 原文地址:https://www.cnblogs.com/FondWang/p/12148360.html
Copyright © 2011-2022 走看看