zoukankan      html  css  js  c++  java
  • 理解ConcurrentHashMap1.8源码

    ConcurrentHashMap源码分析

    其实ConcurrentHashMap我自己已经看过很多遍了,但是今天在面试阿里的时候自己在描述ConcurrentHashMap发现自己根本讲不清楚什么是ConcurrentHashMap,以及里面是怎么实现的,搞的我突然发现自己什么都不懂,所以我想要再次的来分析一下这个源码,完全理解ConcurrentHashMap,而不是以为自己懂了,实际上自己不懂。

    首先我们看一下put方法,put方法会调用到putVal方法上面。

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
    	  //如果put进去的是个链表,这个参数表示链表的大小
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
    			  //初始化链表
                tab = initTable();
    			//如果这个槽位没有数据
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {		
    				//使用CAS将这个新的node设置到hash桶里面去
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
    			//帮助迁移
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
    				//获取锁
                V oldVal = null;
                synchronized (f) {
    					//双重检查锁
                    if (tabAt(tab, i) == f) {
    						//如果hash值大于等于0,那么代表这个节点里的数据是链表
                        if (fh >= 0) {
                            binCount = 1;
    							//每次遍历完后binCount加1,表示链表长度
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
    								//如果hash值和key值都相同,那么覆盖,break结束循环
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
    								//下一个节点为null,说明遍历到尾节点了,那么直接在尾节点设值一个新的值
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
    						 //如果是红黑树
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
    					  //如果链表个数大于8,那么就调用这个方法
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
    

    解释一下上面的源码做了什么:

    1. 首先做一下判断,不允许key和value中任意一个为空,否则抛出异常
    2. 计算key的hash值,然后遍历table数组
    3. 如果table数组为null或为空,那么就调用initTable做初始化
    4. 为了保证可见性,会使用tab去table数组里获取数据,如果没有数据,那么用casTabAt通过CAS将新Node设置到table数组里。(注:这里也体现了和hashmap不一样的地方,hashmap直接通过数据拿就好了, 这个获取数据和设值都要保证可见性和线程安全性)
    5. 如果当前槽位所对应的hash值是MOVED,说明当前的table正在扩容迁移节点,那么就调用helpTransfer帮助迁移
    6. 走到这里,说明这个槽位里面的元素不止一个,有很多个,所以给头节点加上锁
    7. 如果当前的hash所对应的的槽位不是空的,并且hash值大于等于0,那么就说明这个槽位里面的对象是一个链表,那么就遍历链表
      1. 如果所遍历的链表里面有元素的hash值并且key和当前要插入的数据的是一样的,那么就覆盖原来的值
      2. 如果遍历到最后的节点都没有元素和要插入的值key是一样的,那么就新建一个Node节点,插入到链表的最后
      3. 每遍历一个节点就把binCount+1
    8. 如果当前的节点是TreeBin,那么说明该槽位里面的数据是红黑树,那么调用相应方法插入数据
    9. 最后如果binCount已经大于或等于8了,那么就调用treeifyBin

    接下来我们先看initTable 方法,再看treeifyBin和helpTransfer

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
    			//一开始的时候sizeCtl为0
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
    			//将sizeCtl用CAS设置成-1
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
    						//因为sc一开始为0,所以n取DEFAULT_CAPACITY为16
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
    						//将table赋值为大小为16的Node数组
                        table = tab = nt;
    						//将sc的设置为总容量的75%,如果 n 为 16 的话,那么这里 sc = 12
                        sc = n - (n >>> 2);
                    }
                } finally {
    					//最后将sizeCtl设置为sc的值
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
    
    

    这个方法里面初始化了一个很重要的变量sizeCtl,初始值为总容量的75%,table初始化为一个容量为16的数组

    下面我们在看看treeifyBin方法

    private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
    			//如果数据的长度小于64,那么调用tryPresize进行扩容
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                tryPresize(n << 1);
    			//如果这个槽位里面的元素是链表
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {			
    				//给链表头加上锁
                synchronized (b) {
                    if (tabAt(tab, index) == b) {
                        TreeNode<K,V> hd = null, tl = null;
    					 //遍历链表,然后初始化红黑树对象
                        for (Node<K,V> e = b; e != null; e = e.next) {
                            TreeNode<K,V> p =
                                new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  null, null);
                            if ((p.prev = tl) == null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
    						//给tab槽位为index的元素设置新的对象
                        setTabAt(tab, index, new TreeBin<K,V>(hd));
                    }
                }
            }
        }
    }
    

    treeifyBin这个方法里面并不是只是将链表转化为红黑树,而是当tab的长度大于64的时候才会将链表转成红黑树,否则的话,会调用tryPresize方法。

    然后我们进入到tryPresize方法里面看看,tryPresize传入的参数是当前tab数组长度的两倍。

    private final void tryPresize(int size) {
    		//原本传进来的size已经是两倍了,这里会再往上取最近的 2 的 n 次方
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
    			// 这个 if 分支和之前说的初始化数组的代码基本上是一样的,在这里,我们可以不用管这块代码
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            else if (tab == table) {
                int rs = resizeStamp(n);
    				//一开始进来的时候sc是大于0的
                if (sc < 0) {
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
    				//将SIZECTL设置为一个很大的复数
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }
    
    

    这个方法里面,会对tab数据进行校验,如果没有初始化的话会重新进行初始化大小,如果是第一次进来的话会将SIZECTL设置成一个很大的复数,然后调用transfer方法,传如当前的tab数据和null。

    接着我们来看transfer方法,这个方法比较长,主要的扩容和转移节点都在这个方法里面实现,我们将这个长方法分成代码块,一步步分析:

    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    		//如果当前tab数组长度为16
        int n = tab.length, stride;
    		//那么(n >>> 3) / NCPU  = 0 小于MIN_TRANSFER_STRIDE
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
    			//将stride设置为 16 
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
    			//如果n是16,那么nextTab就是一个容量为32的空数组
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
    			//将transferIndex赋值为16
            transferIndex = n;
        }
    		...
    }
    

    这个代码块主要是做nextTable、transferIndex 、stride的赋值操作。

    ...
    //初始化nextn为32
    int nextn = nextTab.length;
    //新建一个ForwardingNode对象,里面放入长度为32的nextTab数组
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false;
    //初始化bound为0
    for (int i = 0, bound = 0;;) {
    	...
    }
    

    下面的代码会全部包裹在这个for循环里面,所以我们来分析一下这个for循环里面的代码

    for (int i = 0, bound = 0;;) {
    		
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
    				//将nextIndex设置为transferIndex,一开始16
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
    				//一开始的时候nextIndex是和stride相同,那么nextBound为0,TRANSFERINDEX也为0
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
    					//这里bound也直接为0
                    bound = nextBound;
    					//i = 15
                    i = nextIndex - 1;
                    advance = false;
                }
            }
    		...
    }
    

    这个方法是为了设置transferIndex这个属性,transferIndex一开始是原tab数组的长度,每次会向前移动stride大小的值,如果transferIndex减到了0或小于0,那么就设置I等于-1,i在下面的代码会说到。

    for (int i = 0, bound = 0;;) {
    		...
    		//在上面一段代码块中,如果transferIndex已经小于等于0了,就会把i设置为-1
    		if (i < 0 || i >= n || i + n >= nextn) {
    		    int sc;
    				//表示迁移已经完成
    		    if (finishing) {
    					//将nextTable置空,表示不需要迁移了
    		        nextTable = null;
    					//将table设置为新的数组
    		        table = nextTab;
    					//sizeCtl设置为n的 1.5倍
    		        sizeCtl = (n << 1) - (n >>> 1);
    		        return;
    		    }
    		    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
    		        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
    		            return;
    		        // 到这里,说明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,
    		         // 也就是说,所有的迁移任务都做完了,也就会进入到上面的 if(finishing){} 分支了
    		        finishing = advance = true;
    		        i = n; // recheck before commit
    		    }
    		}
    ...
    }
    

    这个方法是用来表示已经迁移完毕了,可以退出。

    for (int i = 0, bound = 0;;) {
    	...
    	//如果该槽位没有元素,那么直接把tab的i槽位设置为fwd
    	else if ((f = tabAt(tab, i)) == null)
    	    advance = casTabAt(tab, i, null, fwd);
    	//说明这个槽位已经有其他线程迁移过了
    	else if ((fh = f.hash) == MOVED)
    	    advance = true; // already processed
    	//走到这里,说明tab的这个槽位里面有数据,那么我们需要获得槽位的头节点的监视器锁
    	else {
    	    synchronized (f) {	
    			if (tabAt(tab, i) == f) {
    				...
    			} 
    		  }
    	}
    	...
    }
    

    在这个代码块中,i会从最后一个元素一个个往前移动,然后根据i这个index来判断tab里面槽位的情况。

    下面的代码我们来分析监视器锁里面的内容:

    synchronized (f) {
    	if (tabAt(tab, i) == f) {
    		//fh是当前节点的hash值
    		if (fh >= 0) {
    		    int runBit = fh & n;
    			//lastRun设置为头节点
    		    Node<K,V> lastRun = f;
            // 需要将链表一分为二,
            //   找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的
            //   lastRun 之前的节点需要进行克隆,然后分到两个链表中
    		    for (Node<K,V> p = f.next; p != null; p = p.next) {
    		        int b = p.hash & n;
    		        if (b != runBit) {
    		            runBit = b;
    		            lastRun = p;
    		        }
    		    }
    		    if (runBit == 0) {
    		        ln = lastRun;
    		        hn = null;
    		    }
    		    else {
    		        hn = lastRun;
    		        ln = null;
    		    }
    		    for (Node<K,V> p = f; p != lastRun; p = p.next) {
    		        int ph = p.hash; K pk = p.key; V pv = p.val;
    		        if ((ph & n) == 0)
    		            ln = new Node<K,V>(ph, pk, pv, ln);
    		        else
    		            hn = new Node<K,V>(ph, pk, pv, hn);
    		    }
    			//其中的一个链表放在新数组的位置 i
    		    setTabAt(nextTab, i, ln);
    			//另一个链表放在新数组的位置 i+n
    		    setTabAt(nextTab, i + n, hn);
    			//将原数组该位置处设置为 fwd,代表该位置已经处理完毕
    			//其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
    		    setTabAt(tab, i, fwd);
    			//advance 设置为 true,代表该位置已经迁移完毕
    		    advance = true;
    		}
    		//下面红黑树的迁移和上面差不多
    		else if (f instanceof TreeBin) {
    			....
    		}
    	} 
    }
    

    这个方法主要是将头节点里面的链表拆分成两个链表,然后设置到新的数组中去,再给老的数组设置为fwd,表示这个节点已经迁移过了。

    到这里transfer方法已经分析完毕了。
    这里我再举个例子,让大家根据透彻的明白多线程之间是怎么进行迁移工作的。

    我们假设stride还是默认的16,第一次进来nextTab为null,但是tab的长度为32。
    
    一开始的赋值:
    1. n会设置成32,并且n只会赋值一次,代表被迁移的数组长度 
    2. nextTab会被设置成一个大小为64的数组,并塞入到新的ForwardingNode对象中去。
    3. transferIndex会被赋值为32
    
    进入循环:
    	初始化i为0,bound为0;
    	第一次循环:
    		1. 由于advance初始化为true,所以会进入到while循环中,循环出来后,transferIndex会被设置成16,bound被设置成16,i设置成31。这里你可能会问
    		2. 将原来tab[i]的元素迁移到新的数组中去,并将tab[i]设置为fwd,将advance设置成为true
    
    	第二次循环:
    		1. --i,变为30,--i >= bound成立,并将advance设置成false
    		2. 将原来tab[i]的元素迁移到新的数组中去,并将tab[i]设置为fwd,将advance设置成为true
    	。。。
    	第十六次循环:
    		1. --i,变为15,将transferIndex设置为0,bound也设置为0,i设置为15
    		2. 将原来tab[i]的元素迁移到新的数组中去,并将tab[i]设置为fwd,将advance设置成为true
    	第三十二次循环:
    		1. 这个时候--i等于-1,并且(nextIndex = transferIndex) <= 0成立,那么会将i设置为-1,advance设置为false
    		2. 会把SIZECTL用CAS设置为原来的值加1,然后设置finishing为true
    
    	第三十三次循环:
    		1. 由于finishing为true,那么nextTable设置为null,table设置为新的数组值,sizeCtl设置为旧tab的长度的1.5倍
    
  • 相关阅读:
    Effective Java 第三版——26. 不要使用原始类型
    Effective Java 第三版——25. 将源文件限制为单个顶级类
    Effective Java 第三版——24. 优先考虑静态成员类
    Effective Java 第三版——23. 优先使用类层次而不是标签类
    Effective Java 第三版——22. 接口仅用来定义类型
    Effective Java 第三版——21. 为后代设计接口
    Effective Java 第三版——20. 接口优于抽象类
    Effective Java 第三版——19. 如果使用继承则设计,并文档说明,否则不该使用
    Effective Java 第三版——18. 组合优于继承
    Effective Java 第三版——17. 最小化可变性
  • 原文地址:https://www.cnblogs.com/luozhiyun/p/11406557.html
Copyright © 2011-2022 走看看