zoukankan      html  css  js  c++  java
  • 跳表ConcurrentSkipListMap学习笔记

    一、介绍

    ConcurrentSkipListMap是一个内部使用跳表,并且支持排序并发的一个Map,是线程安全的。一般很少会被用到,也是一个比较偏门的数据结构。

    简单介绍下跳表

    跳表是一种允许在一个有顺序的序列中进行快速查询的数据结构。

    在普通的顺序链表中查询一个元素,需要从链表头部开始一个一个节点进行遍历,然后找到节点。

    跳表可以解决这种查询时间过长,跳表是一种使用”空间换时间”的概念用来提高查询效率的链表

    ConcurrentSkipListMap 和 ConcurrentHashMap 的主要区别:

    a.底层实现方式不同。ConcurrentSkipListMap底层基于跳表。ConcurrentHashMap底层基于Hash桶和红黑树。

    b.ConcurrentHashMap不支持排序。ConcurrentSkipListMap支持排序。

    二、使用场景

    Redis中的有序集合底层就是跳表,所以其也是开发排行榜功能时可以考虑使用的。

    跳跃表的应用场景大概是这样的:

    1. 有序
    2. 频繁插入删除
    3. 频繁的查找
    对于有序来说,数组和普通链表都可以通过排序算法来实现,在排序复杂度上不相上下。
    链表插入删除上性能较好,数组在查找上性能较好,所以都不能满足我们的要求。
    跳跃表则是在插入删除和查找的性能上做了折中,复杂度为log(n)

    三、跳跃表结构  

    为了更好的支持插入和删除,所以采用链表的形式,可以看到图片中最下面一行是一个有序的链表。但如果只是一个单一的链表,查找时复杂度为O(n),性能太差。

    在有序数组中,我们查找时用的是二分查找,一次查找可以排除一半元素的遍历。在数组中之所以可以用二分查找,是因为我们能够快速的以O(1)的复杂度定位到中间的位置,但是链表只能是O(N)。所以跳跃表采取空间换时间的方式,既然你找不到中间点,或者三分之一点等中间位置,那么我可以通过多增加一个节点来指向中间位置,这样你也能够快速的定位到中间的位置,然后一定程度的减少你遍历元素的个数,提高效率。图中有多层,相邻的两层,采用的都是这样的思想。

    这个图一目了然,很容易就可以让大家了解跳表的思想。至于我们应该添加多少层额外的链表,给什么位置的节点添加索引才能更好的优化检索和插入的效率,就是我希望通过阅读源码找寻的.

    四、节点

    通过上图,可以发现有两种节点类型,第一种是最下层的,与普通链表相似,第二种是除了最后一层以外的其他索引层节点,有两个指针。
    但是在jdk源码中还存在一种节点,是索引层的头节点,还维护了其层数信息,下面先给出源码注释中的跳表样例。
         * Head nodes          Index nodes
         * +-+    right        +-+                      +-+
         * |2|---------------->| |--------------------->| |->null
         * +-+                 +-+                      +-+
         *  | down              |                        |
         *  v                   v                        v
         * +-+            +-+  +-+       +-+            +-+       +-+
         * |1|----------->| |->| |------>| |----------->| |------>| |->null
         * +-+            +-+  +-+       +-+            +-+       +-+
         *  v              |    |         |              |         |
         * Nodes  next     v    v         v              v         v
         * +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+
         * | |->|A|->|B|->|C|->|D|->|E|->|F|->|G|->|H|->|I|->|J|->|K|->null
         * +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+

    接着我们来分别看下它们在源码中是如何定义的。

    // Node是最底层的链表的节点,包括键值对和指向下一个节点的指针
    static final class Node<K,V> {
        final K key;
        volatile Object value;
        volatile Node<K,V> next;
    // 至于为什么需要两个构造函数,后面源码会有解释
        Node(K key, Object value, Node<K,V> next) {
            this.key = key;
            this.value = value;
            this.next = next;
        }
        Node(Node<K,V> next) {
            this.key = null;
            this.value = this;
            this.next = next;
        }
        // ...配套method
    }
    
    // 索引节点结构
    // 存储了两个指针,分别指向右边和下边的节点
    // 索引节点的value为链表节点
    static class Index<K,V> {
        final Node<K,V> node;
        final Index<K,V> down;
        volatile Index<K,V> right;
    
        Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
            this.node = node;
            this.down = down;
            this.right = right;
        }
        // ...配套method
    }
    
    // 索引层的头节点结构
    // 在索引节点的基础上添加了表示层数的level变量
    static final class HeadIndex<K,V> extends Index<K,V> {
        final int level;
        HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
            super(node, down, right);
            this.level = level;
        }
    }

    五、构造函数

    // Compartor接口用来指定key的排序规则
    public ConcurrentSkipListMap() {
        this.comparator = null;
        initialize();
    }
    public ConcurrentSkipListMap(Comparator<? super K> comparator) {
        this.comparator = comparator;
        initialize();
    }
    // 还有两个传入map和sortedMap的构造函数
    private void initialize() {
        keySet = null;// 内部类
        entrySet = null;// 内部类
        values = null;// 内部类
        descendingMap = null;// 内部类
    // private static final Object BASE_HEADER = new Object();
    // 从注释给出的图来看,这个head应该是一直处于第一层的头节点
        head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
                                  null, null, 1);
    }

    六、findPredecessor()

    // findPredecessor
    // 回想一下前面那个跳表的结构,该方法就是根据key,先从head往右找,然后往下找
    //然后再往右再往下,知道找到比key小的节点
    private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
        if (key == null)
            throw new NullPointerException();
        for (;;) {
            for (Index<K,V> q = head, r = q.right, d;;) {
          //q为第一层的head节点, r=q.right说明先向右遍历
    // 从head头节点点向后遍历,右边节点不为空 if (r != null) { Node<K,V> n = r.node; K k = n.key; //说明被删除了,更新q.next; 在remove()方法中,n.casValue(v, null)将节点的值置空;置空后,在这里进行索引节点的删除 if (n.value == null) {
                //unlink操作为将原本的q->r->r.next 转换为q->r.next
    if (!q.unlink(r)) break; // restart r = q.right; // reread r continue; }
              //如果r的节点没被删除,就执行到这里  
             //如果key大于当前的k,则向后移一个节点                
              //head索引节点也向后移一个节点
    if (cpr(cmp, key, k) > 0) { q = r; r = r.right; continue; } } // 失败了就从下一层开始         
            //如果索引节点的右边没有节点了,则向下移动
            
            //向下移动后如果还是索引层,则继
    续向后  //如果是节点,直接返回了
                if ((d = q.down) == null)
                    return q.node;
                q = d;
                r = d.right;
            }
        }
    }

    举例: 比如我要找最接近节点6的节点 (暂时是5)

     步骤分析:

    ①外部for循环主要是一直查找临近节点,内部for循环大概分4部分

    ②内部循环开始,从头节点开始,当前节点头层1,r是右节点(没有右节点),d暂时未赋值;

    ③进入(代码3),d指向2层1节点,q(当前节点)替换为2层1节点,r表示2层7节点;

    ④内循环进入(代码1),n表示2层7节点的node,如果n.value=null,表示被删除了,需要更新一下q.next

    ⑤cpr()函数比较key和n的值(2层7)<0,进入代码3,向下走,进入1层1节点(当前节点q),r为1层4节点,再次内部循环

    ⑥假设没有并发,进入代码2判断,n为1层4节点,cpr(key,4)>0,进入代码2,当前节点替换成1层4节点(q),r右节点为1层7节点;重新内循环

    ⑦,n为右节点值k=7,代码2判断 cpr(key,7)<0,进入代码3和代码4,替换当前节点q为0层4节点,r表示0层5节点,继续内循环;

    ⑧代码2判断,cpr(key,右节点k=5)>0,当前节点替换成0层5节点,右节点为0层6节点,重新内循环;

    ⑨再次进入代码2判断,key=右节点6的值k,进入代码3, d表示当前节点q(0层节点5)的下节点=null,直接返回当前节点q,即0层节点5

      说明 key=6, q表示当前节点,r表示右节点,d表示下节点,  n是指向r节点的node的临时变量, k表示n的值;  cpr()函数主要是进行CAS比较大小; 

     总之 : findPredecessor() 是一个向右和向下的循环查找过程,并且有内部并发修改的判断和刷新数据的逻辑。

    七、put()

    // 与一般的map一样,通过put插入键值对,,key,value不能为空
    public V put(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        return doPut(key, value, false);
    }
    
    // doPut
    private V doPut(K key, V value, boolean onlyIfAbsent) {
        Node<K,V> z;   // 要被添加的Node
        if (key == null)
            throw new NullPointerException();
        // key的比较方法
        Comparator<? super K> cmp = comparator;
        outer: for (;;) { // 因为cas操作可能失败,套了个无限循环
        // findPrecessor返回小于key但最接近key的节点,不存在则为链表的头节点,下面也会介绍
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                if (n != null) {
                    Object v; int c;
                    Node<K,V> f = n.next;
                    // b--->n--->f
                    if (n != b.next)
                        break;
                    if ((v = n.value) == null) {   // n is deleted
                    // 简单来说就是b.next = f,但是考虑了cas
                        n.helpDelete(b, f);
                        break;
                    }
                    if (b.value == null || v == n) // b is deleted
                        break;
                    // 如果key大于n.key
                    if ((c = cpr(cmp, key, n.key)) > 0) {
                    // 说明有并发插入,继续向后遍历一个节点
                        b = n;
                        n = f;
                        continue;
                    }
                    // 如果key相等
                    if (c == 0) {
                        // 没有指定putIfAbsent的话,通过cas替换value并返回新的value
                        // 指定了putIfabsent则返回原有value值
                        if (onlyIfAbsent || n.casValue(v, value)) {
                            @SuppressWarnings("unchecked") V vv = (V)v;
                            return vv;
                        }
                        break; //cas失败,重试
                    }
                    // else c < 0;则位置正确,插入就行
                }
                // 创建新的Node节点
                z = new Node<K,V>(key, value, n);
                // 更新链表b->n->f ==> b->z->n->f
                if (!b.casNext(n, z))
                    break;     
                break outer;
            }
        }
    
        int rnd = ThreadLocalRandom.nextSecondarySeed();
        // 0x80000001转换为二进制1000...0001
        // 看起来似乎添加层数是有一定随机性的
        // rnd为0xxx...xxx0时可以进入
        if ((rnd & 0x80000001) == 0) { 
            int level = 1, max;
            // 有多少个1,level递增多少次
            while (((rnd >>>= 1) & 1) != 0)
                ++level;
            Index<K,V> idx = null;
            HeadIndex<K,V> h = head;
            if (level <= (max = h.level)) {
            // 如果level比现在的层数小,则在新增加的节点z上
            // 建立level个索引节点,忘记了可以回上面看看索引节点和其他节点区别
                for (int i = 1; i <= level; ++i)
                    idx = new Index<K,V>(z, idx, null);
            }
            else {
            // 如果level大于层数,则level设为层数+1
                level = max + 1; 
                // 构造索引节点数组
                @SuppressWarnings("unchecked")Index<K,V>[] idxs =
                    (Index<K,V>[])new Index<?,?>[level+1];
                //为新建的节点z创造level个索引节点
                //下标从1开始
                for (int i = 1; i <= level; ++i)
                    idxs[i] = idx = new Index<K,V>(z, idx, null);
                for (;;) {
                    h = head;
                    int oldLevel = h.level;
                //用于判断并发修改,不考虑
                //正确情况下该分支的level>当前层数
                    if (level <= oldLevel) // lost race to add level
                        break;
                    HeadIndex<K,V> newh = h;
                    Node<K,V> oldbase = h.node;
                    for (int j = oldLevel+1; j <= level; ++j)
                    //为每层生成一个headIndex
                        newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                    //更新最上层的headIndex指针
                    if (casHead(h, newh)) {
                        h = newh;
                        idx = idxs[level = oldLevel];
                        break;
                    }
                }
            }
            
            splice: for (int insertionLevel = level;;) {
                int j = h.level;
                // h为最新的头节点
                for (Index<K,V> q = h, r = q.right, t = idx;;) {
                    if (q == null || t == null)
                        break splice;
                    // r为前面的idxs[x]
                    if (r != null) {
                        Node<K,V> n = r.node;
            // compare before deletion check avoids needing recheck
                        int c = cpr(cmp, key, n.key);
                        if (n.value == null) {
                            if (!q.unlink(r))
                                break;
                            r = q.right;
                            continue;
                        }
                    // 没并发的情况下,idxs数组里都是新增的key
                    // c应该=0
                        if (c > 0) {
                            q = r;
                            r = r.right;
                            continue;
                        }
                    }
                // j开始时为新跳表层数
                //insertionLevel为旧跳表,经过后面的几个j--才会进入该分支
                    if (j == insertionLevel) {
                        //将t插入q,r之间
                        if (!q.link(r, t))
                            break; // restart
                        if (t.node.value == null) {
                            findNode(key);
                            break splice;
                        }
                        if (--insertionLevel == 0)
                            break splice;
                    }
    
                    if (--j >= insertionLevel && j < level)
                        t = t.down;
                //向下一层
                    q = q.down;
                    r = q.right;
                }
            }
        }
        return null;
    }
    

    小结: put操作先是找到key节点,如果存在就替换掉value的值;如果不存在就需要new一个新的节点,然后调整层级等操作

    八、get()

    public V get(Object key) {
        return doGet(key);
    }
    
    private V doGet(Object key) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            //findPredecessor找到离key最近的,小于key的node
            // 没有并发的情况下,要么是n,要么没有要找的key
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                Node<K,V> f = n.next;
                if (n != b.next)                // inconsistent read
                    break;
                if ((v = n.value) == null) {    // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)  // b is deleted
                    break;
                //key与n.key相同
                if ((c = cpr(cmp, key, n.key)) == 0) {
                    @SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
                if (c < 0)
                    break outer;
                //并发导致key>n.key,说明findPredecessor找的位置不对
                //往后位移一个节点,重新开始
                b = n;
                n = f;
            }
        }
        return null;
    }

     举例: 找到上面key=6的节点

    ①内循环,先找到最接近key的节点b; n表示b的后继

    ②如果n==null,跳出外循环,返回null结果

    ③f表示n的后继,此时如果n不为b的后继,表示并发修改情况,读的数据不一致,需要重新外部循环;

    ④v表示n的值,如果v=null,表示n节点被删除了,需要执行一下helpDelete操作,重新外部循环;

    ⑤如果b的值为null,或者v的指向自己节点n(b的后继值指向自己节点),表示b被删除; 重新外部循环;

    ⑥cpr(key,n.key),没有并发情况时,6==6,表示n就是key节点,用新的V值来代替n的Value值,并返回;

    ⑦如果比较值<0,则表示有并发操作,此时n的key值可能是7.8.9之类的值,跳出外循环,找不到key节点,返回null;

    ⑧如果比较值>0,表示n节点不是需要找的key, 重新进行外部for循环,往后查找;

    小结: get方法还是比较简单的,findPrecessor方法找到的就是小于key但是最接近key的节点,所以key如果存在,必然是findPrecessor找到的节点的下一个节点,而代码中为了考虑并发带来的修改,还要做很多其他的判断。

    九、remove()

    public V remove(Object key) {
        return doRemove(key, null);
    }
    
    final V doRemove(Object key, Object value) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
        // findPrecessor找到比key小但是最近的node
        //不考虑并发的话,如果存在key那就是n=b.next
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                Node<K,V> f = n.next;
                //考虑一些并发修改的问题
                if (n != b.next)                    // inconsistent read
                    break;
                if ((v = n.value) == null) {        // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)      // b is deleted
                    break;
                // 如果key<n,说明key不存在,退出最外层循环
                if ((c = cpr(cmp, key, n.key)) < 0)
                    break outer;
                //c>0说明findPre方法找到的节点过期了,重新找
                if (c > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                // c==0
                if (value != null && !value.equals(v))
                    break outer;
                //通过cas将节点n的value设为null,就是key对应的节点
                //findPrecessor方法会在读到value为null的值时进行删除
                if (!n.casValue(v, null))
                    break;
                //n添加删除标志并且更新b.next指针
                //appendMarker创建一个key为null,value为自己的节点
                if (!n.appendMarker(f) || !b.casNext(n, f))
                    findNode(key);                  // retry via findNode
                else {
                //删除成功并且更新b.next后进入该分支
                //findPredecessor会在value==null时,更新next指针,实现删除
                    findPredecessor(key, cmp);      // clean index
                    if (head.right == null)
                        tryReduceLevel();
                }
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
        }
        return null;
    }

     举例:删除key=6节点

    ①同样是外循环+内循环,先找到最接近key的节点b, n表示b的后继节点
    ②如果n节点为null,返回null
    ③f表示n的下一个节点,如果读取不一致或者n被删除,b被删除情况,需要重新外部循环;
    ④cpr(key,n.key)比较大小,如果<0,跳出外部循环,返回null
    ⑤如果>0,表示n不是要找的节点,继续向后遍历,重新内部循环;
    ⑥剩下情况就是==0, CAS删除v,并返回执行结果,如果失败,则跳出内循环,继续外循环;
    ⑦n.appendMarker(f)表示n添加删除标志;并更新b,next指针,从n更新到f; 失败的话重新找key节点;
    ⑧成功的话,找key最近的节点,findPredecessor会在value==null时,更新next指针,实现删除;如果head没有right节点,尝试降低层的高度;
    ⑨返回被删除的值

    参考:

    http://hollischuang.gitee.io/tobetopjavaer/#/ ,

    https://www.jianshu.com/p/7fce19e8fe52

  • 相关阅读:
    转:常用svn命令
    如何识别网页类型(wap页面还是wise页面)
    [转]手机web HTML头信息解释和viewport meta标签解释
    网页正文抽取
    python 去除不可见的控制字符
    11_MySQL_分页查询
    10_MySQL DQL_子查询(嵌套的select)
    静态函数和实例化方法
    GET 和 POST 方法的区别
    C# .NET 开发心得
  • 原文地址:https://www.cnblogs.com/coloz/p/12894146.html
Copyright © 2011-2022 走看看