zoukankan      html  css  js  c++  java
  • 死磕ConcurrentHashMap 1.8源码解析

    JDK1.8的实现已经摒弃了Segment的概念,而是直接用Node数组+链表+红黑树的数据结构来实现,并发控制使用Synchronized和CAS来操作,整个看起来就像是优化过且线程安全的HashMap,虽然在JDK1.8中还能看到Segment的数据结构,但是已经简化了属性,只是为了兼容旧版本。

    // 最大容量:2^30=1073741824
    private static final int MAXIMUM_CAPACITY = 1 << 30;
    // 默认初始值,必须是2的幕数
    private static final int DEFAULT_CAPACITY = 16;
    //
    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    //
    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    // 
    private static final float LOAD_FACTOR = 0.75f;
    // 链表转红黑树阀值,> 8 链表转换为红黑树
    static final int TREEIFY_THRESHOLD = 8;
    //树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))
    static final int UNTREEIFY_THRESHOLD = 6;
    //
    static final int MIN_TREEIFY_CAPACITY = 64;
    //
    private static final int MIN_TRANSFER_STRIDE = 16;
    //
    private static int RESIZE_STAMP_BITS = 16;
    // 2^15-1,help resize的最大线程数
    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
    // 32-16=16,sizeCtl中记录size大小的偏移量
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
    // forwarding nodes的hash值
    static final int MOVED     = -1; 
    // 树根节点的hash值
    static final int TREEBIN   = -2; 
    // ReservationNode的hash值
    static final int RESERVED  = -3; 
    // 可用处理器数量
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    table:用来存放Node节点数据的,默认为null,默认大小为16的数组,每次扩容时大小总是2的幂次方;
    nextTable:扩容时新生成的数据,数组为table的两倍;
    Node:节点,保存key-value的数据结构;
    ForwardingNode:一个特殊的Node节点,hash值为-1,其中存储nextTable的引用。只有table发生扩容的时候,ForwardingNode才会发挥作用,作为一个占位符放在table中表示当前节点为null或则已经被移动
    sizeCtl:控制标识符,用来控制table初始化和扩容操作的,在不同的地方有不同的用途,其值也不同,所代表的含义也不同
                  负数代表正在进行初始化或扩容操作
                 -1代表正在初始化
                 -N 表示有N-1个线程正在进行扩容操作
                 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小

    Node

    Node是ConcurrentHashMap存储结构的基本单元,继承于HashMap中的Entry,用于存储数据,源代码如下

    static class Node<K,V> implements Map.Entry<K,V> {
        //链表的数据结构
        final int hash;
        final K key;
        //val和next都会在扩容时发生变化,所以加上volatile来保持可见性和禁止重排序
        volatile V val;
        volatile Node<K,V> next;
        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
        public final K getKey()       { return key; }
        public final V getValue()     { return val; }
        public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
        public final String toString(){ return key + "=" + val; }
        //不允许更新value  
        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }
        public final boolean equals(Object o) {
            Object k, v, u; Map.Entry<?,?> e;
            return ((o instanceof Map.Entry) &&
                    (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                    (v = e.getValue()) != null &&
                    (k == key || k.equals(key)) &&
                    (v == (u = val) || v.equals(u)));
        }
        //用于map中的get()方法,子类重写
        Node<K,V> find(int h, Object k) {
            Node<K,V> e = this;
            if (k != null) {
                do {
                    K ek;
                    if (e.hash == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                } while ((e = e.next) != null);
            }
            return null;
        }
    }

     put 方法

     public V put(K key, V value) {
            return putVal(key, value, false);
        }
    
        /** Implementation for put and putIfAbsent */
        final V putVal(K key, V value, boolean onlyIfAbsent) {
            if (key == null || value == null) throw new NullPointerException();
            int hash = spread(key.hashCode());//两次hash,减少hash冲突,可以均匀分布
            int binCount = 0;
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh;
                // 1)这里就是上面构造方法没有进行初始化,在这里进行判断,为null就调用initTable进行初始化,属于懒汉模式初始化
                if (tab == null || (n = tab.length) == 0)
                    tab = initTable();
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 2)如果i位置没有数据,就直接无锁插入
                   //利用CAS操作将元素插入到Hash表中
                    if (casTabAt(tab, i, null,
                                 new Node<K,V>(hash, key, value, null)))
                        break;                   // no lock when adding to empty bin
                }
                else if ((fh = f.hash) == MOVED) //3)如果在进行扩容,则先进行扩容操作
                    tab = helpTransfer(tab, f);
                else {   // 4) 如果以上条件都不满足,那就要进行加锁操作,也就是存在hash冲突,锁住链表或者红黑树的头结点
                    V oldVal = null;
                    synchronized (f) {
                        if (tabAt(tab, i) == f) {
                            if (fh >= 0) { //表示该节点是链表结构  TreeBin的hash 是 -2 ??
                                binCount = 1;
                                for (Node<K,V> e = f;; ++binCount) {
                                    K ek;
                                    //这里涉及到相同的key进行put就会覆盖原先的value
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    if ((e = e.next) == null) { //插入链表尾部
                                        pred.next = new Node<K,V>(hash, key,
                                                                  value, null);
                                        break;
                                    }
                                }
                            }
                            else if (f instanceof TreeBin) { //红黑树结构
                                Node<K,V> p;
                                binCount = 2;
                                //红黑树结构旋转插入
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                        }
                    }
                    if (binCount != 0) {  //如果链表的长度大于8时就会进行红黑树的转换
                        if (binCount >= TREEIFY_THRESHOLD)
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
            addCount(1L, binCount); //统计size,并且检查是否需要扩容
            return null;
        }

    这个put的过程很清晰,对当前的table进行无条件自循环直到put成功,可以分成以下六步流程来概述。

    1. 如果没有初始化就先调用initTable()方法来进行初始化过程
    2. 如果没有hash冲突就直接CAS插入
    3. 如果还在进行扩容操作就先进行扩容
    4. 如果存在hash冲突,就加锁来保证线程安全,这里有两种情况,一种是链表形式就直接遍历到尾端插入,一种是红黑树就按照红黑树结构插入,
    5. 最后一个如果该链表的数量大于阈值8,就要先转换成黑红树的结构,break再一次进入循环
    6. 如果添加成功就调用addCount()方法统计size,并且检查是否需要扩容

    initTable

      private final Node<K,V>[] initTable() {
            Node<K,V>[] tab; int sc;
            while ((tab = table) == null || tab.length == 0) {
                if ((sc = sizeCtl) < 0) //表示其他线程已经在初始化了或者扩容了,则直接让出自己的CPU时间片
                    Thread.yield(); // lost initialization race; just spin
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {  //通过cas操作,将sizeCtl替换为-1,标识当前线程抢占到了初始化资格
                    try {
                        if ((tab = table) == null || tab.length == 0) {
                            int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = tab = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                    break;
                }
            }
            return tab;
        }

     初始化方法initTable()的关键就在于sizeCtl,该值默认为0,如果在构造函数时有参数传入该值则为2的幂次方。该值如果 < 0,表示有其他线程正在初始化,则必须暂停该线程。如果线程获得了初始化的权限则先将sizeCtl设置为-1,防止有其他线程进入,最后将sizeCtl设置0.75 * n,表示扩容的阈值。

    helpTransfer

     。。。

     get

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode()); //计算两次hash
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {//读取首节点的Node元素
            if ((eh = e.hash) == h) { //如果该节点就是首节点就返回
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            //hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到nextTable来
            //查找,查找到就返回
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {//既不是首节点也不是ForwardingNode,那就往下遍历
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

    ConcurrentHashMap的get操作的流程很简单,也很清晰,可以分为三个步骤来描述

    1. 计算hash值,定位到该table索引位置,如果是首节点符合就返回
    2. 如果遇到扩容的时候,会调用标志正在扩容节点ForwardingNode的find方法,查找该节点,匹配就返回
    3. 以上都不符合的话,就往下遍历节点,匹配就返回,否则最后就返回null。

     get没有加锁的话,ConcurrentHashMap是如何保证读到的数据不是脏数据的呢?

     Node的元素val和指针next是用volatile修饰的,在多线程环境下线程A修改因为hash冲突修改结点的val或者新增节点的时候是对线程B可见的。

    • 在1.8中ConcurrentHashMap的get操作全程不需要加锁,这也是它比其他并发集合比如hashtable、用Collections.synchronizedMap()包装的hashmap;安全效率高的原因之一。
    • get操作全程不需要加锁是因为Node的成员val是用volatile修饰的和数组用volatile修饰没有关系。
    • 数组用volatile修饰主要是保证在数组扩容的时候保证可见性。

    Size

    ConcurrentHashMap的size()方法返回的是一个不精确的值,因为在进行统计的时候有其他线程正在进行插入和删除操作。当然为了这个不精确的值,ConcurrentHashMap也是操碎了心。

    为了更好地统计size,ConcurrentHashMap提供了baseCount、counterCells两个辅助变量和一个CounterCell辅助内部类。

     public int size() {
            long n = sumCount();
            return ((n < 0L) ? 0 :
                    (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                    (int)n);
        }
        final long sumCount() {
            CounterCell[] as = counterCells; CounterCell a;
            long sum = baseCount;
            if (as != null) {
                for (int i = 0; i < as.length; ++i) {
                    //遍历,所有counter求和
                    if ((a = as[i]) != null)
                        sum += a.value;     
                }
            }
            return sum;
        }

    https://www.cnblogs.com/loren-Yang/p/7466111.html

     https://blog.csdn.net/weixin_38426554/article/details/96482064

    https://www.cnblogs.com/fanguangdexiaoyuer/p/10733236.html#_label2_1

    https://www.jianshu.com/p/77fda250bddf

    https://blog.csdn.net/chenssy/article/details/73521950

    待啃。。。。

  • 相关阅读:
    TP框架实现分页及条件查询
    tp框架连贯操作
    php查询
    php修改数据
    php增加数据处理
    php删除数据
    php怎么访问数据库
    php查询
    克隆及加载类
    php静态成员和接口
  • 原文地址:https://www.cnblogs.com/dingpeng9055/p/11733698.html
Copyright © 2011-2022 走看看