zoukankan      html  css  js  c++  java
  • JAVA 数据结构 ConcurrentHashMap

    ConcurrentHashMap

    相对于HashMap,这个数据结构多了一个Concurrent,属于java.util.concurrent
    虽然名字看上去一样,但是不要被这个迷惑了。HashMap允许null,ConcurrentHashMap不允许null

    这个数据结构涉及很多知识,散列算法,链表结构,Java内存模型。
    这次只关注源码,因为JDK8中都有不同的实现,所以分1.7,1.8 来分析。

    JDK1.7 ConcurrentHashMap

    ConcurrentHashMap 类中包含两个静态内部类 HashEntry 和 Segment。HashEntry 用来封装映射表的键 / 值对;Segment 用来充当锁的角色,每个 Segment 对象守护整个散列映射表的若干个桶。每个桶是由若干个 HashEntry 对象链接起来的链表。一个 ConcurrentHashMap 实例中包含由若干个 Segment 对象组成的数组。

    
    static final class HashEntry<K,V> {
            final K key;                       // 声明 key 为 final 型
            final int hash;                   // 声明 hash 值为 final 型
            volatile V value;                 // 声明 value 为 volatile 型
            volatile HashEntry<K,V> next;      // 声明 next 为 volatile 型
    
            HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
                this.key = key;
                this.hash = hash;
                this.next = next;
                this.value = value;
            }
       .....
    

    Segment类

    Segment继承与ReentrantLock,使得Segment对象可以用来充当锁。每个Segment负责管理一个HashEntry数组【table】,table的每一个数组参与就是散列表 的一个

    count 变量是一个计数器,它表示每个 Segment 对象管理的 table 数组(若干个 HashEntry 组成的链表)包含的 HashEntry 对象的个数。每一个 Segment 对象都有一个 count 对象来表示本 Segment 中包含的 HashEntry 对象的总数。
    理论上:ConcurrentHashMap使用了分段锁技术,每当一个线程占用锁访问一个Segment,不会影响到其他的Segment。

    put方法

    public V put(K key, V value) {
           Segment<K,V> s;
           if (value == null)
               throw new NullPointerException();
           int hash = hash(key);
           int j = (hash >>> segmentShift) & segmentMask;
           if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
                (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
               s = ensureSegment(j);
           return s.put(key, hash, value, false);
       }
    

    先通过key 定位到Segment,在到具体的Segment中操作put

    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
                HashEntry<K,V> node = tryLock() ? null :
                    scanAndLockForPut(key, hash, value);
                V oldValue;
                try {
                    HashEntry<K,V>[] tab = table;
                    int index = (tab.length - 1) & hash;
                    HashEntry<K,V> first = entryAt(tab, index);
                    for (HashEntry<K,V> e = first;;) {
                        if (e != null) {
                            K k;
                            if ((k = e.key) == key ||
                                (e.hash == hash && key.equals(k))) {
                                oldValue = e.value;
                                if (!onlyIfAbsent) {
                                    e.value = value;
                                    ++modCount;
                                }
                                break;
                            }
                            e = e.next;
                        }
                        else {
                            if (node != null)
                                node.setNext(first);
                            else
                                node = new HashEntry<K,V>(hash, key, value, first);
                            int c = count + 1;
                            if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                                rehash(node);
                            else
                                setEntryAt(tab, index, node);
                            ++modCount;
                            count = c;
                            oldValue = null;
                            break;
                        }
                    }
                } finally {
                    unlock();
                }
                return oldValue;
            }
    

    volatile关键字并不能保证并发的安全性,所以put的时候还是加锁处理。
    先走trylock,如果没拿到锁,就scanAndLockForPut自旋获取锁。

    private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
                HashEntry<K,V> first = entryForHash(this, hash);
                HashEntry<K,V> e = first;
                HashEntry<K,V> node = null;
                int retries = -1; // negative while locating node
                while (!tryLock()) {
                    HashEntry<K,V> f; // to recheck first below
                    if (retries < 0) {
                        if (e == null) {
                            if (node == null) // speculatively create node
                                node = new HashEntry<K,V>(hash, key, value, null);
                            retries = 0;
                        }
                        else if (key.equals(e.key))
                            retries = 0;
                        else
                            e = e.next;
                    }
                    else if (++retries > MAX_SCAN_RETRIES) {
                        lock();
                        break;
                    }
                    else if ((retries & 1) == 0 &&
                             (f = entryForHash(this, hash)) != first) {
                        e = first = f; // re-traverse if entry changed
                        retries = -1;
                    }
                }
                return node;
            }
    

    1 进入循环trylock(),尝试自旋锁,如果retries大于MAX_SCAN_RETRIES,就获取阻塞锁。

    put步骤
    1. 先找Segment,然后定位到segment中的HashEntry。
    2. 遍历HashEntry,如果不为空就判断传入的keyhe 遍历的key是否相等,相等就覆盖old value。
    3. 为空就新建entry,并加入segment,而且会先判断有没有超阈,有的话就扩容。
    4. finally unlock().

    get方法

    public V get(Object key) {
            Segment<K,V> s; // manually integrate access methods to reduce overhead
            HashEntry<K,V>[] tab;
            int h = hash(key);
            long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
            if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
                (tab = s.table) != null) {
                for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                         (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                     e != null; e = e.next) {
                    K k;
                    if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                        return e.value;
                }
            }
            return null;
        }
    
    get步骤
    1. 先把key hash计算后找到segment,再经过hash计算出具体的位置。
    2. 由于value是volatile,所以每次拿都是最新值。

    get全程无锁


    看完了1.7的,已经开始仰望大佬们了,没想到1.8 又调整了


    JDK1.8 ConcurrentHashMap

    • 改动

    HashEntry换成了Node,而且没有了segment

     static class Node<K,V> implements Map.Entry<K,V> {
            final int hash;
            final K key;
            volatile V val;
            volatile Node<K,V> next;
    
            Node(int hash, K key, V val, Node<K,V> next) {
                this.hash = hash;
                this.key = key;
                this.val = val;
                this.next = next;
            }
    
            public final K getKey()       { return key; }
            public final V getValue()     { return val; }
            public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
            public final String toString(){ return key + "=" + val; }
            public final V setValue(V value) {
                throw new UnsupportedOperationException();
            }
    ...
    
    

    put方法

    public V put(K key, V value) {
            return putVal(key, value, false);
        }
    
    final V putVal(K key, V value, boolean onlyIfAbsent) {
            if (key == null || value == null) throw new NullPointerException();
            int hash = spread(key.hashCode());
            int binCount = 0;
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh;
                if (tab == null || (n = tab.length) == 0)
                    tab = initTable();
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                    if (casTabAt(tab, i, null,
                                 new Node<K,V>(hash, key, value, null)))
                        break;                   // no lock when adding to empty bin
                }
                else if ((fh = f.hash) == MOVED)
                    tab = helpTransfer(tab, f);
                else {
                    V oldVal = null;
                    synchronized (f) {
                        if (tabAt(tab, i) == f) {
                            if (fh >= 0) {
                                binCount = 1;
                                for (Node<K,V> e = f;; ++binCount) {
                                    K ek;
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    if ((e = e.next) == null) {
                                        pred.next = new Node<K,V>(hash, key,
                                                                  value, null);
                                        break;
                                    }
                                }
                            }
                            else if (f instanceof TreeBin) {
                                Node<K,V> p;
                                binCount = 2;
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                        }
                    }
                    if (binCount != 0) {
                        if (binCount >= TREEIFY_THRESHOLD)
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
            addCount(1L, binCount);
            return null;
        }
    

    put里面包含了几个有趣的内容,Node<k,v>[],casTabAt,synchronized,treeifyBin,所以大概我们知道1.8里面的ConcurrentHashMap用到了CAS,Tree,synchronized。

    • 计算key=>hash
    • 判断要不要初始化。
    • 定位到的node节点,如果为空就准备把数据存入,并使用CAS机制。
    • (fh = f.hash) == MOVED,启动扩容
    • 如果没有满足上面任一逻辑,就进入synchronized,被锁定的就是我们的目标位置。
      1. 没有任何数据,就直接保存。
      2. 链表就保存到链表尾部。
      3. 如果是TreeBin,就保存到树里面
    • 如果元素超过阈值,就转红黑树

    get方法

    public V get(Object key) {
            Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
            int h = spread(key.hashCode());
            if ((tab = table) != null && (n = tab.length) > 0 &&
                (e = tabAt(tab, (n - 1) & h)) != null) {
                if ((eh = e.hash) == h) {
                    if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                        return e.val;
                }
                else if (eh < 0)
                    return (p = e.find(h, key)) != null ? p.val : null;
                while ((e = e.next) != null) {
                    if (e.hash == h &&
                        ((ek = e.key) == key || (ek != null && key.equals(ek))))
                        return e.val;
                }
            }
            return null;
        }
    
    
    • 计算key的hash值,来查找元素位置
      1. 如果在桶上面,就直接返回
      2. 如果是红黑树,就进入树的模式查找
      3. 如果是链表,就遍历链表

    个人观点:

    1.7和1.8 相比下,新版本还是很有创意的,使用了红黑树来保证查询的效率,放弃了Reentrantlock而使用了synchronized,放弃了segment。使用了CAS机制,在保证线程安全的情况下,降低了锁的粒度。

  • 相关阅读:
    《c++ templates》学习笔记(8)——第十一章 模板实参演绎
    关于Decorator模式
    《c++ templates》学习笔记(11)——第十章 实例化
    《c++ templates》学习笔记(5)——第六章 模板实战
    《c++ templates》学习笔记(9)——第十二章 特化与重载
    关于windows Power Shell (1)
    VC版的IniFile
    《c++ templates》学习笔记(6)——第七章 模板术语
    《c++ templates》学习笔记(4)——第五章,技巧性基础知识
    boost::test
  • 原文地址:https://www.cnblogs.com/dreamtaker/p/13404665.html
Copyright © 2011-2022 走看看