前言
在高并发的环境下,当我们使用一个公共的变量时如果不加锁会出现并发问题,例如SimpleDateFormat,但是加锁的话会影响性能,对于这种情况我们可以使用ThreadLocal。ThreadLocal是将公共变量copy一份到线程私有内存中以消除并发问题,ThreadLocal是JDK内部提供的高效解决并发问题的工具类之一,本文介绍ThreadLocal的重要方法的源码实现以及相关问题的分析。
数据结构
由上图可以看出,在Thread中维护了一个Entry的列表,Entry存储的是公共变量的copy,这个列表是由ThreadLocal维护,每次从ThreadLocal中获取的是一个公共变量的副本,所以避免了并发问题,接下来在看实现方法之前我们先看一下上面提到的类的定义。
变量
首先来看一下ThreadLocal重要的成员变量。
1 public class ThreadLocal<T> { 2 private final int threadLocalHashCode = nextHashCode(); 3 4 private static AtomicInteger nextHashCode = 5 new AtomicInteger(); 6 7 private static final int HASH_INCREMENT = 0x61c88647; 8 9 private static int nextHashCode() { 10 return nextHashCode.getAndAdd(HASH_INCREMENT); 11 } 12 }
threadLocalHashCode是ThreadLocal实例的ID,其主要用于做散列(后面会讲到),从上面可以看出来threadLocalHashCode是所有ThreadLocal贡献的原子变量nextHashCode加上一个固定的HASH_INCREMENT生成的。为什么
HASH_INCREMENT的值是0x61c88647?
前面说了,ThreadLocal是使用散列做存储的,而这个数字可以让生成出来的ThreadLocal的ID较为均匀地分布在大小为2的N次方的数组中。
接下来看一下ThreadLocal中真正存储数据的Entry类。
1 static class ThreadLocalMap { 2 3 static class Entry extends WeakReference<ThreadLocal<?>> { 4 5 Object value; 6 7 Entry(ThreadLocal<?> k, Object v) { 8 super(k); 9 value = v; 10 } 11 } 12 13 private static final int INITIAL_CAPACITY = 16; 14 15 private Entry[] table; 16 17 private int size = 0; 18 19 private int threshold; 20 }
ThreadLocalMap是ThreadLocal的内部类,Entry继承WeakReference,前面提到Entry是K-V形式,key就是WeakReference的成员变量referent,value就是Entry的value。我们可以发现,key是一个弱引用,它的生命周期到下一个GC就结束了,为什么要这样设计呢?在后面的内存泄漏中会提到。
table是一个大小为2的N次方的数组,threshold是数组扩容的临界点(与HashMap一个作用),默认是数组的大小的2/3,数组每次扩容是将长度扩大1倍。
方法
ThreadLocal重要的方法如下:
1 public T get(); //获取值 2 3 public void set(T value); //设置值 4 5 public void remove(); //删除值
首先来看set()方法:
1 public void set(T value) { 2 Thread t = Thread.currentThread(); 3 ThreadLocalMap map = getMap(t); //获取当前Thread中的ThreadLocalMap 4 if (map != null) 5 map.set(this, value); //设置值 6 else 7 createMap(t, value); //创建一个ThreadLocalMap 8 }
在Thread类中有 ThreadLocal.ThreadLocalMap threadLocals = null; 这样的成员变量,就是用来存储当前线程设置的值,所有的ThreadLocal都是在操作这个成员变量。
接下来看一下map.set()这个方法:
1 private void set(ThreadLocal<?> key, Object value) { 2 Entry[] tab = table; 3 int len = tab.length; 4 int i = key.threadLocalHashCode & (len - 1); // 计算hash散列 5 6 // 从i开始遍历列表 7 for (Entry e = tab[i]; 8 e != null; 9 e = tab[i = nextIndex(i, len)]) { 10 ThreadLocal<?> k = e.get(); 11 // 找到了key值 12 if (k == key) { 13 e.value = value; 14 return; 15 } 16 // key=null表示该key值已被回收 17 if (k == null) { 18 replaceStaleEntry(key, value, i);19 return; 20 } 21 } 22 // 当遍历到一个可以插入数据的空位置时 23 tab[i] = new Entry(key, value); 24 int sz = ++size; 25 if (!cleanSomeSlots(i, sz) // 清理列表中已经被GC回收的 26 && sz >= threshold) // 判断是否需要扩容 27 rehash();// 扩容并重新计算hash散列 28 }
ThreadLocal通过遍历Entry数组,如果当前key已存在则覆盖,没有则新增,如果在遍历过程中遇到已经被GC回收的key,则会清除掉无效的key对应的值。
从上面的循环可以看出来,ThreadLocal采用hash散列的线性探测存储,这种方式实现简单但是无法存储大量的数据,所以不建议用ThreadLocal存储大量的数据。这里还有一个问题是,nextIndex()方法是循环的获取数组下标,所以如果table满的就会导致无限循环,所以threshold的值是小于table的大小并且每次set之后都会清理一次数组的无效数据。
我们先看一下cleanSomeSlots()方法:
1 private boolean cleanSomeSlots(int i, int n) { 2 boolean removed = false; 3 Entry[] tab = table; 4 int len = tab.length; 5 do { 6 i = nextIndex(i, len); 7 Entry e = tab[i]; 8 if (e != null && e.get() == null) { 9 n = len; 10 removed = true; 11 i = expungeStaleEntry(i); // 从下标i开始清除数组中的无效元素 12 } 13 } while ( (n >>>= 1) != 0); // 该循环会循环log2(n)次 14 return removed; 15 }
该方法是通过线性查找,从下标为i开始查找已经被GC清理的key对应的值,一般情况下会查找log2(n)次,expungeStaleEntry()是真正的清理方法,源码如下:
1 private int expungeStaleEntry(int staleSlot) { 2 Entry[] tab = table; 3 int len = tab.length; 4 5 // 清理对应数据 6 tab[staleSlot].value = null; 7 tab[staleSlot] = null; 8 size--; 9 10 // 重新计算hash散列,直到遇见null 11 Entry e; 12 int i; 13 for (i = nextIndex(staleSlot, len); 14 (e = tab[i]) != null; 15 i = nextIndex(i, len)) { 16 ThreadLocal<?> k = e.get(); 17 if (k == null) { 18 e.value = null; 19 tab[i] = null; 20 size--; 21 } else { 22 int h = k.threadLocalHashCode & (len - 1); 23 if (h != i) { 24 tab[i] = null; 25 while (tab[h] != null) 26 h = nextIndex(h, len); 27 tab[h] = e; 28 } 29 } 30 } 31 return i; 32 }
前面提到,ThreadLocal存储数据的方式是使用线性探测法,所以expungeStaleEntry()方法在清理掉一个数据之后会将该下标之后的所有非null位置重新计算一次hash散列,经过这样的操作之后,数组中的元素最终会符合hash散列的要求,如果不重新计算一次hash散列,那么最终数组结果可能不符合hash散列的要求,比如:元素a、b计算后得到存储位置冲突,通过线性探测法,最终结果是元素a在下标0,元素b在下标1,如果删除a之后,b就应该放在下标0而不是下标1上。
我们在回过头来看看map.set()方法中的replaceStaleEntry()方法:
1 private void replaceStaleEntry(ThreadLocal<?> key, Object value, 2 int staleSlot) { 3 Entry[] tab = table; 4 int len = tab.length; 5 Entry e; 6 // slotToExpunge用来记录清除无效key的开始位置,初始值等于staleSlot,staleSlot的值是无效key的下标 7 int slotToExpunge = staleSlot; 8 // 向前查找无效的key 9 for (int i = prevIndex(staleSlot, len); 10 (e = tab[i]) != null; 11 i = prevIndex(i, len)) 12 if (e.get() == null) 13 slotToExpunge = i; 14 15 // 向后查找无效的key,直到遍历到一个有效key 16 for (int i = nextIndex(staleSlot, len); 17 (e = tab[i]) != null; 18 i = nextIndex(i, len)) { 19 ThreadLocal<?> k = e.get(); 20 21 // 如果在查找的过程中找到了我们需要找的key,则将无效key与该值替换 22 if (k == key) { 23 e.value = value; 24 25 tab[i] = tab[staleSlot]; 26 tab[staleSlot] = e; 27 28 // 判断在向前查找无效key的过程中有没有找到 29 if (slotToExpunge == staleSlot) 30 slotToExpunge = i; 31 // 清理无效key 32 cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); 33 return; 34 } 35 // 如果向前没有查找到无效key并且当前key为无效key 36 if (k == null && slotToExpunge == staleSlot) 37 slotToExpunge = i; 38 } 39 40 // 将需要插入的数据插入到staleSlot位置 41 tab[staleSlot].value = null; 42 tab[staleSlot] = new Entry(key, value); 43 44 // 如果向前查找到了无效key或者向后查找到了无效key,则清理无效key 45 if (slotToExpunge != staleSlot) 46 cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); 47 }
该方法时在set数据值遍历到了一个无效key执行的方法,ThreadLocal认为,出现无效key的位置附近也会出现无效key,所以在清理该无效key的时候回查找附近连续的无效key。
值得注意的是,代码中找到了我们需要找的key的时候,将无效key与有效key交换的原因是,清除方法expungeStaleEntry()只能够清除连续的无效的key,如果向前没有找到无效key,向后找到了无效key的情况下,会出现如下情况:
SS是staleSlot,SE是slotToExpunge,如果不交换,则无法清理到SS位置的无效key,而且需要找的key后面同样也可能会出现无效key,同样无法清除到。
除了这种情况,还有一种情况是向前找到了无效key,会出现如下情况:
这种情况下,交换了SS跟需要找的key之后会出现无法清理SS位置的无效key的请款,所以代码中不止调用了一次expungeStaleEntry(),还会调用cleanSomeSlots(),源码在前面已经分析过了,这样就可以把SS以及之后的无效key都清掉。
以上set()方法的源码就分析完了,接下来是get()方法,get()方法是调用ThreadLocalMap的getEntry()方法,所以我们直接分析getEntry()方法:
1 private Entry getEntry(ThreadLocal<?> key) { 2 int i = key.threadLocalHashCode & (table.length - 1); 3 Entry e = table[i]; 4 if (e != null && e.get() == key) 5 return e; 6 else 7 return getEntryAfterMiss(key, i, e); // hash散列之后的位置不是需要找的key,即发生了hash碰撞 8 } 9 10 private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) { 11 Entry[] tab = table; 12 int len = tab.length; 13 14 while (e != null) { 15 ThreadLocal<?> k = e.get(); 16 if (k == key) 17 return e; 18 if (k == null) 19 expungeStaleEntry(i); // 清理无效key 20 else 21 i = nextIndex(i, len); // 向后遍历 22 e = tab[i]; 23 } 24 return null; 25 }
get()方法较为简单,这里不做过多赘述,接下来是remove()方法,同样该方法是直接调用的ThreadLocalMap的remove()方法,源码如下:
1 private void remove(ThreadLocal<?> key) { 2 Entry[] tab = table; 3 int len = tab.length; 4 int i = key.threadLocalHashCode & (len-1); 5 for (Entry e = tab[i]; 6 e != null; 7 e = tab[i = nextIndex(i, len)]) { 8 if (e.get() == key) { 9 e.clear();// 将Entry的key置为null 10 expungeStaleEntry(i); // 清理该key 11 return; 12 } 13 } 14 }
为什么使用弱引用
弱引用可以防止内存泄漏。试想,如果一个线程中的ThreadLocal已经不需要了,所以将指向ThreadLocal的强引用删掉,希望GC清理调它。如果Entry是强引用的话GC就无法清理该对象,因为还存在一个Thread->ThreadLocalMap->Entry->ThreadLocal的强引用链,导致整个Entry都不会被回收,从而发生内存泄漏。
线程池与ThreadLocal共有引发的问题
为了方便分析,先假设线程池是固定大小的,当我们在从线程池中拿了一个线程运行如下逻辑:
1 if(threadLocal.get()!=null){ 2 //doSomething 3 }else{ 4 threadLocal.set(data); 5 }
或许你在最开始程序可以正常运行,但是后面你会发现代码 threadLocal.get() 永远都不为null,这是因为线程池中的线程时复用的,所以Thread就不会被回收,Thread不回收那么对应ThreadLocal也不会被回收,所以最后就出现了上述问题。在使用Tomcat等使用线程池的技术时需要注意这一点。