zoukankan      html  css  js  c++  java
  • ConcurrentHashMap小结

    ConcurrentHashMap小结

    ConcurrentHashMap与HashTable

    • 都是线程安全的.
    • HashTable:对getput相关操作设加锁synchronized,相当于给整个哈希表加锁,多线程访问时只有一个线程可以访问和操作数据,其他线程阻塞,性能差.
    • ConcurrentHashMap:有多把锁,每个锁锁一小段数据,不同的数据段之间可以并发访问,提高了效率.

    JDK1.7的版本

    结构图:

    Segment[] Segment 1 ... Segment 2
    HashEntry[] HashEntry1.1 HashEntry1.2 ... HashEntry2.1
    • Segment[]数组 和 HashEntry[]组成,为数组+链表形式.

    成员变量

    final Segment<K,V>[] segments; // 数组
    transient Set<K> keySet; // key集合
    transient Set<Map.Entry<K,V>> entrySet; // 对应的一个个键值对
    

    Segment是一个内部类,代表一个个数组元素.

    static final class Segment<K,V> extends ReentrantLock implements Serializable {
        transient volatile HashEntry<K,V>[] table; // 放数据的桶
        transient int count;
        transient int modCount;
        transient int threshold;
        final float loadFactor;
        
    }
    

    HashEntry为一个个键值对.

    static final class HashEntry<K,V>{
        final int hash;
        final K key;
        volatile V value; // 保证获取时的可见性
        volatile HashEntry<K,V> next; // 指向链表的下一个元素
    }
    

    注:

    • 采用分段锁技术.
    • 不像HashTable对putget方法做同步处理.
    • 每个线程占用锁访问一个Segment时,不影响其他的Segment.
    • 默认初始容量为16个Segment,负载因子为0.75,并发线程为16,对应16个线程可并发执行.

    构造器

    • 若用户不指定则使用默认值,初始容量为16,负载因子为0.75,并发线程数为16.
    • Segment数组的容量为大于并发线程数的2的幂次。(2的幂次方便扩容时定位Segment的位置)

    put方法

    // 添加元素
    // 1. 若为null,则抛出异常
    // 2. 若非null,则确保Segment大小足够
    // 3. 添加键值对
    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null) // 不允许value为空
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask; // 通过哈希值获得在数组的位置。
        if ((s = (Segment<K,V>)UNSAFE.getObject
             (segments, (j << SSHIFT) + SBASE)) == null)
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }
    
    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        HashEntry<K,V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value); // 尝试加锁,若失败则自旋重试,超过MAX_SCAN_RETRIES则改为阻塞锁获取,从而保证成功
        V oldValue;
        try {
            HashEntry<K,V>[] tab = table;
            int index = (tab.length - 1) & hash;
            HashEntry<K,V> first = entryAt(tab, index);
            for (HashEntry<K,V> e = first;;) {
                if (e != null) {
                    K k;
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {
                            e.value = value;
                            ++modCount;
                        }
                        break;
                    }
                    e = e.next;
                }
                else {
                    if (node != null)
                        node.setNext(first);
                    else
                        node = new HashEntry<K,V>(hash, key, value, first);
                    int c = count + 1;
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        rehash(node);
                    else
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    oldValue = null;
                    break;
                }
            }
        } finally {
            unlock();
        }
        return oldValue;
    }
    

    流程:

    1. 将当前Segment中的table通过key的hashcode定位到HashEntry。
    2. 遍历HashEntry,若不为空则判断是否相等,相等则直接覆盖旧值。
    3. 若为空,则创建新的HashEntry并加入到Segment中,并判断是否需要扩容。
    4. 解除获得的Segment的锁。

    get方法

    public V get(Object key) {
        Segment<K,V> s;
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }
    

    流程:

    1. 通过key的进行hash获得对应的Segment。
    2. 若不为null,则再次hash定位到具体的元素HashEntry上。
    3. 若不为空,则返回获取的元素;否则直接返回null。

    注:

    • valuevolatile的,所以获取时不用加锁,每次获取的是最新值。

    JDK1.8的版本

    改进:

    • JDK1.7在查询时遍历链表,效率低。
    • 抛弃了之前的Segment的分段锁,采用CAS+synchronized的方式保证并发的安全性。
    • 底层 == > 使用数组+链表/红黑树实现。

    几个重要的字段

    private static final int DEFAULT_CAPACITY = 16;
    private static final float LOAD_FACTOR = 0.75f; // 负载因子
    
    static final int TREEIFY_THRESHOLD = 8; // 从链表转换成红黑树的元素个数的阈值,即>=8时转换为红黑树
    static final int UNTREEIFY_THRESHOLD = 6; // 从红黑树转换为链表的阈值
    static final int MIN_TREEIFY_CAPACITY = 64; // 最小表容量,过小导致过多的结点在一起
    
    private static final int MIN_TRANSFER_STRIDE = 16;
    static final int NCPU = Runtime.getRuntime().availableProcessors(); // CPU的数量
    

    构造函数

    // 默认的table大小为16
    public ConcurrentHashMap() {
    }
    
    // 指定默认大小
    // 若大于最大容量,则设为最大容量
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                    MAXIMUM_CAPACITY :
                    tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }
    

    关键域

    // 用来存储键值对的结点
    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash; // 结点对应的hash值
        final K key;
        volatile V val;
        volatile Node<K,V> next;
    }
    
    // 红黑树的结点
    static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;
        boolean red;
    }
    
    // TreeNode用在红黑树的头结点
    // TreeBin用来指向TreeNode和根结点
    // 同时持有读写锁,使得写者阻塞直到读者完成
    static final class TreeBin<K,V> extends Node<K,V> {
        TreeNode<K,V> root;
        volatile TreeNode<K,V> first;
        volatile Thread waiter;
        volatile int lockState;
        // 阻塞状态
        static final int WRITER = 1; // 持有写锁
        static final int WAITER = 2; // 等待写锁
        static final int READER = 4; // 设置读锁的增量
    }
    

    核心方法

    扩容方法

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
    

    流程:

    1. 若有线程在扩容,则当前线程等待。
    2. 否则创建容量为默认大小的桶数组。
    3. 可用的大小是容量的0.75。

    put方法

    public V put(K key, V value) {
        return putVal(key, value, false);
    }
    
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                                new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                        (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                                value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                            value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
    

    put的整体流程:

    1. 首先检查key和value是否为null,若为空,则抛出异常。
    2. 重复下列动作,直到退出循环
    3. 检查桶数组table是否初始化,若没有初始化,则先初始化,再尝试插入键值对。
    4. 检查键值对所对应的桶table位置是否为空,为空则利用CAS操作在该位置上插入键值对,并退出循环。
    5. 若非空,则表示出现碰撞,则判断是否在扩容
    6. 若在扩容,则协助扩容。
    7. 若不在扩容,则synchronized锁定桶table数组的首个结点。
    8. 从桶数组的首个结点向后遍历,若遇到相等的结点,说明已经插入,则直接退出循环。
    9. 否则找到最后一个结点,将新的键值对插入其后(尾插入),退出循环。
    10. 若是红黑树,则调用红黑树的插入方法,并退出循环。
    11. 最后检查一下链表是否需要转换成红黑树,若需要,则进行转换。

    参考:

  • 相关阅读:
    广义表的创建和遍历
    dev c++ Boost库的安装
    NAT模式
    vmware桥接模式
    smb与samba
    利用Linux的Samba服务模拟NT域
    使用samba进行共享文件操作步骤
    安装chrome
    使用虚拟机上网第二步
    TCP协议三次握手过程分析
  • 原文地址:https://www.cnblogs.com/truestoriesavici01/p/13214026.html
Copyright © 2011-2022 走看看