zoukankan      html  css  js  c++  java
  • 集合:ConcurrentHashMap

    为什么要用ConcurrentHashMap

    hashmap虽然线程不安全,但是使用Hashmap进行put操作可能会引起死循环(带环链表),在多线程的情况下不适合使用

    hashtable线程安全但是效率较低,无论度还是写都会对整个集合加锁

    1、构造方法

    (1)无参构造

     (2)有参构造

    构造方法:

    获取初始容量:

     sizeCtl:

    注意:以上这些构造方法中,都涉及到一个变量sizeCtl,这个变量是一个非常重要的变量,而且具有非常丰富的含义,它的值不同,对应的含义也不一样:

    • sizeCtl为0,代表数组未初始化,且数组的初始容量为16
    • sizeCtl为正数,如果数组未初始化,那么其记录的是数组的初始容量,如果数组已经初始化,那么其记录的是数组的扩容阈值
    • sizeCtl为-1,表示数组正在进行初始化
    • sizeCtl小于0,并且不是-1,表示数组正在扩容,-(1+n),表示此时有n个线程正在共同完成数组的扩容操作

     2、添加元素保证线程安全

     final V putVal(K key, V value, boolean onlyIfAbsent) {
            if (key == null || value == null) throw new NullPointerException();
            int hash = spread(key.hashCode());//获取hash值
            int binCount = 0;
            for (Node<K,V>[] tab = table;;) {//死循环
                Node<K,V> f; int n, i, fh;
                if (tab == null || (n = tab.length) == 0)//做数组的初始化
                    tab = initTable();
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//取数组相应索引处的元素并判断是否为空,没有元素直接添加元素
                    if (casTabAt(tab, i, null,
                                 new Node<K,V>(hash, key, value, null)))//成功的话跳出循环,否则还会再次判断
                        break;                   // no lock when adding to empty bin
                }
                else if ((fh = f.hash) == MOVED)//代表正在扩容,此时不允许添加元素(往一个数组中添加第一个元素)
                    tab = helpTransfer(tab, f);
                else {
                    V oldVal = null;
                    synchronized (f) {//加锁保证此桶位的元素插入是安全的,不会影响其他桶位元素的操作,而hashTable锁的是整个数组,这里是只对桶位加锁
                        if (tabAt(tab, i) == f) {
                            if (fh >= 0) {
                                binCount = 1;
                                for (Node<K,V> e = f;; ++binCount) {//循环遍历链表进行equals判断,插入元素
                                    K ek;
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    if ((e = e.next) == null) {
                                        pred.next = new Node<K,V>(hash, key,
                                                                  value, null);
                                        break;
                                    }
                                }
                            }
                            else if (f instanceof TreeBin) {//红黑树插入元素
                                Node<K,V> p;
                                binCount = 2;
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                        }
                    }
                    if (binCount != 0) {
                        if (binCount >= TREEIFY_THRESHOLD)
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
            addCount(1L, binCount);
            return null;
        }

    初始化数组:

     private final Node<K,V>[] initTable() {
            Node<K,V>[] tab; int sc;
            while ((tab = table) == null || tab.length == 0) {
                if ((sc = sizeCtl) < 0)//初始化
                    Thread.yield(); // 有其他线程在做初始化的话就让出CPU的使用权
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//当前值与主存中的值进行比对,相等的话就把主存的值改为-1,
    设置失败的话就意味着you其他线程在进行初始化,就会再次执行循环,没有加锁但是能保证安全
    try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY;//sc大于0表示已经初始化这是给自己的初始值,否则初始容量为16 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//创建存储元素的数组 table = tab = nt; sc = n - (n >>> 2);//n减去四分之一n,也就是乘以0.75,但是这里将乘法改为减法和位移,效率更高 } } finally { sizeCtl = sc;//记录数组的扩容阈值 } break; } } return tab; }

    总结:只给插入的桶位加锁,而不是整个数组

    获取集合的长度

     private final void addCount(long x, int check) {
            CounterCell[] as; long b, s;
            if ((as = counterCells) != null ||//如果只有一个线程的话可以直接对basecount添加数值
                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
                CounterCell a; long v; int m;
                boolean uncontended = true;//如果是多个线程可能会有线程对basecount添加操作不能成功,此时就需要一个数组协助
                if (as == null || (m = as.length - 1) < 0 ||//先计算出维护元素个数的数组位置,然后对value加1,如果多次尝试添加都没有成功就需要对数组扩容
    //最终的集合的长度是baseCount加上数组中所有value的和
    (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }

    3、数组扩容

    数据迁移:

    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
            int n = tab.length, stride;
            if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
                stride = MIN_TRANSFER_STRIDE; // subdivide range
            if (nextTab == null) {            // initiating
                try {
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                    nextTab = nt;
                } catch (Throwable ex) {      // try to cope with OOME
                    sizeCtl = Integer.MAX_VALUE;
                    return;

    将数组分割为不同的块,最少一个线程负责16个元素的迁移,如果计算的数量大于16那么就负责这么多的元素,小于等于16就负责16个元素

     synchronized (f) {
                        if (tabAt(tab, i) == f) {
                            Node<K,V> ln, hn;
                            if (fh >= 0) {
                                int runBit = fh & n;
                                Node<K,V> lastRun = f;
                                for (Node<K,V> p = f.next; p != null; p = p.next) {
                                    int b = p.hash & n;
                                    if (b != runBit) {
                                        runBit = b;
                                        lastRun = p;
                                    }
                                }

    迁移的时候加锁其他线程没有办法添加,保证数据不会丢失

    ConcurrentHashMap总结:

    数据结构:数组+链表+红黑树

    jdk1.8开始底结构变为哈希表(数组+链表+红黑树)

    并发原理:CAS+synchronized锁

    CAS:在数组的初始化的时候,用到了CAS算法,sizeCtl为-1,表示数组正在进行初始化,当一个线程已经把sizeCtl改为-1后,其他的线程就不能再对sizeCtl进行更改了,该线程也就不能进行数组的初始化了

    synchronized锁:插入数据的时候只会对桶位加锁,加锁保证此桶位的元素插入是安全的,不会影响其他桶位元素的操作,而hashTable锁的是整个数组,这里是只对桶位加锁

    迁移的时候加锁其他线程没有办法添加,保证数据不会丢失

    加锁对象:数组每个位置的头节点

    如插入数据的时候

    每个人都会有一段异常艰难的时光 。 生活的压力 , 工作的失意 , 学业的压力。 爱的惶惶不可终日。 挺过来的 ,人生就会豁然开朗。 挺不过来的 ,时间也会教你 ,怎么与它们握手言和 ,所以不必害怕的。 ——杨绛
  • 相关阅读:
    二分多重匹配(HDU5093)
    2-sat(and,or,xor)poj3678
    某个点到其他点的曼哈顿距离之和最小(HDU4311)
    第k最短路A*启发式搜索
    求树的直径和中心(ZOJ3820)
    并查集hdu4424
    map容器结构体离散化
    二维坐标系极角排序的应用(POJ1696)
    【进阶3-3期】深度广度解析 call 和 apply 原理、使用场景及实现(转)
    判断js数据类型的四种方法,以及各自的优缺点(转)
  • 原文地址:https://www.cnblogs.com/zhai1997/p/13602526.html
Copyright © 2011-2022 走看看