zoukankan      html  css  js  c++  java
  • ThreadLocal源码分析

    当访问共享的可变数据时,通常需要使用同步。一种避免同步的方式就是不共享数据,仅在单线程内部访问数据,就不需要同步。该技术称之为线程封闭。
    当数据封装到线程内部,即使该数据不是线程安全的,也会实现自动线程安全性。

    维持线程封闭性可以通过Ad-hoc线程封闭、栈封闭来实现,一种更加规范的方法是使用ThreadLocal类。ThreadLocal类提供线程局部变量,通过get、set等方法访问变量,为每个使用该变量的线程创建一个独立的副本。

    一、ThreadLocal使用案例

    案例中只开启了一个线程threadA,展示了在线程内部设置、获取、清除局部变量。

    public class ThreadLocalTest {
        // 初始化ThreadLocal变量
        static ThreadLocal<String> localVariable = new ThreadLocal<>();
    
        static void print(String str) {
            // 打印当前线程本地内存中的变量值
            System.out.println(str + ": " + localVariable.get());
            // 清除当前线程本地内存中的变量
            localVariable.remove();
        }
    
        public static void main(String[] args) {
            // 创建线程A
            Thread threadA = new Thread(new Runnable() {
                @Override
                public void run() {
                    // 设置线程A中的本地变量的值
                    localVariable.set("threadA localVariable");
                    print("threadA");
                    // 获取线程A中的本地变量的值
                    System.out.println("threadA remove after: " + localVariable.get());
                }
            });
            // 启动线程
            threadA.start();
        }
    }
    
    运行结果:
    threadA: threadA localVariable
    threadA remove after: null
    

    案例二:线程唯一标识符生成器,为每个调用ThreadId.get()方法的线程创建id。

    public class ThreadId {
        // 下一个要被分配的线程id
        private static final AtomicInteger nextId = new AtomicInteger(0);
    
        // 线程局部变量
        private static final ThreadLocal<Integer> threadId = new ThreadLocal<Integer>() {
            @Override
            protected Integer initialValue() {
                return nextId.getAndIncrement();
            }
        };
    
        // 返回当前线程唯一的id
        public static int get() {
            return threadId.get();
        }
    }
    

    二、ThreadLocal类的实现原理

    在Thread类中有一个threadLocals成员变量,其类型是ThreadLocalMap,默认情况下为null。

    ThreadLocal.ThreadLocalMap threadLocals = null;
    

    当某线程首次调用ThreadLocal变量的get或set方法时,会进行对象创建。在线程退出时,当前线程的threadLocals变量被清空。

    private void exit() {
        ...
        threadLocals = null;
        inheritableThreadLocals = null;
        ...
    }
    

    每个线程的局部变量不是存放于ThreadLocal实例中,而是存放于线程的threadLocals变量,即线程内存空间中。threadLocals变量本质上是Map数据结构,可以存放多个ThreadLocal变量键值对。

    【助解】ThreadLocal类可以看出一个外壳,线程中调用某ThreadLocal变量的set方法可以将变量值放入到该线程的threadLocals变量中,数据格式是<当前线程中该ThreadLocal变量的this引用,变量值>。当调用线程调用ThreadLocal变量的get方法时,从当前线程的threadLocals变量中取出key(引用)对应的value值。

    1. ThreadLocal类核心方法--set()

    将ThreadLocal变量的当前线程副本的值设置为指定value值。

    public void set(T value) {
        // 获取调用方法的当前线程
        Thread t = Thread.currentThread();
        // 获取当前线程自身的threadLocals变量
        ThreadLocalMap map = getMap(t);
        if (map != null) 
            // map不为空,则设置
            map.set(this, value);
        else 
            // map为空,说明第一次调用,初始化线程的threadLocals变量
            createMap(t, value);
    }
    
    ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }
    
    void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }
    

    线程的threadLocals变量,即ThreadLocal.ThreadLocalMap,是HashMap结构,它的key是当前ThreadLocal的实例对象引用,value值是该ThreadLocal实例对象调用set方法设置的值。

    2. ThreadLocal类核心方法--T get()

    返回ThreadLocal变量在当前线程副本中的值。如果当前线程中没有该变量的值,返回值会被首次初始化为initialValue()方法的值。

    public T get() {
        // 获取当前线程以及其threadLocals变量
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        // 如果threadLocals变量不为空
        if (map != null) {
            // 根据当前ThreadLocal对象应用获取Entry,存在则直接返回value值
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        // threadLocals为空,初始化当前线程threadLocals变量
        return setInitialValue();
    }
    
    // threadLocals存在,设置初始值;不存在,初始化threadLocals变量
    private T setInitialValue() {
        // 返回当前ThreadLocal变量的当前线程初始值
        T value = initialValue();
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
        return value;
    }
    
    protected T initialValue() {
        return null;
    }
    

    3. ThreadLocal类核心方法--void remove()

    当前线程threadLocals变量存在的话,删除当前线程的ThreadLocal实例对象。

    public void remove() {
        ThreadLocal.ThreadLocalMap var1 = this.getMap(Thread.currentThread());
        if (var1 != null) {
            var1.remove(this);
        }
    }
    

    三、ThreadLocal.ThreadLocalMap结构分析

    ThreadLocal类图

    ThreadLocal是一种存储变量与线程绑定的方式,在每个线程中用自己的ThreadLocalMap安全隔离变量,实现线程封闭。

    ThreadLocalMap是ThreadLocal内的一个Map实现,没有实现任何接口,仅用于线程内部存储ThreadLocal变量值。

    static class ThreadLocalMap { ... }
    
    static class Entry extends WeakReference<ThreadLocal<?>> {
        Object value; // ThreadLocal变量值
        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }
    

    底层是Entry数组,Entry的key为ThreadLocal,value是线程的该ThreadLocal变量值。Entry内部类继承了WeakReference类。当entry.get()方法得到的ThreadLocal引用为空,表示该key不再被引用,此时Entry对象视为【过期】,在数组中删除。

    ThreadLocalMap类的字段

    private Entry[] table; // 表,必须为2的幂次方大小
    private int size = 0; // 初始Entry数
    private int threshold; // resize操作,元素个数阈值
    // 负载因子固定为2/3
    private void setThreshold(int len) {
        threshold = len * 2 / 3;
    }
    

    构造方法:

    ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
        // 初始化大小为16的Entry数组
        table = new Entry[INITIAL_CAPACITY];
        // 取模对应数组索引
        int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
        table[i] = new Entry(firstKey, firstValue); // 插入元素
        size = 1; // 更新size
        setThreshold(INITIAL_CAPACITY); // 设置resize阈值,此时为10
    }
    

    1. ThreadLocalMap类之hashcode的计算

    ThreadLocal变量与当前线程绑定,在HashMap中作为key,通过threadLocalHashCode值来查找。

    // 自定义hashcode,可以用来解决同一线程连续构造ThreadLocal对象引起的冲突。
    private final int threadLocalHashCode = nextHashCode()
    
    // 下一个hashCode值,原子更新,初始值为0
    private static AtomicInteger nextHashCode = new AtomicInteger();
    
    // 返回下一个hashcode
    private static int nextHashCode() {
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }
    
    // 魔数:相邻两个hashcode之间的偏移
    // 对2的幂次方大小的表,产生近似最优的hash值
    private static final int HASH_INCREMENT = 0x61c88647;
    

    2. ThreadLocalMap类之set()方法

    设置ThreadLocal变量值。

    private void set(ThreadLocal<?> key, Object value) {
        Entry[] tab = table;
        int len = tab.length;
        int i = key.threadLocalHashCode & (len-1); // 计算索引位置
    
        // 开放地址法
        for (Entry e = tab[i];
             e != null; // entry不为空
             e = tab[i = nextIndex(i, len)]) {
            ThreadLocal<?> k = e.get(); // 获取Entry的key--ThreadLocal
    
            // 如果当前Entry的key与形参key相等,更新value值
            if (k == key) {
                e.value = value;
                return;
            }
            // 如果当前Entry的key为空,说明已过期,做清理!
            if (k == null) {
                // 清理过期Entry,继续探索放置位置
                replaceStaleEntry(key, value, i); 
                return;
            }
        }
    
        // 若没有找到对应key,则在空位置创建Entry
        tab[i] = new Entry(key, value);
        int sz = ++size; // 更新size
        // 清理一些过期的位置,判断是否需要扩容
        if (!cleanSomeSlots(i, sz) && sz >= threshold)
            rehash();
    }
    

    从set方法中,可以看出ThreadLocalMap中哈希冲突解决方法是开放地址法,而不是HashMap等采用链地址法。
    前后索引位置--循环

    private static int nextIndex(int i, int len) {
        return ((i + 1 < len) ? i + 1 : 0);
    }
    private static int prevIndex(int i, int len) {
        return ((i - 1 >= 0) ? i - 1 : len - 1);
    }
    

    (1) replaceStaleEntry()方法

    清理过期Entry,设置输入键值对。

    // staleSlot:key == null的位置
    private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                   int staleSlot) {
        Entry[] tab = table;
        int len = tab.length;
        Entry e;
    
        // 从前一个位置开始向前寻找过期Entry,直到entry不为空
        // 不断向前移动清理位置
        int slotToExpunge = staleSlot; // 清理元素的最开始位置
        for (int i = prevIndex(staleSlot, len); 
             (e = tab[i]) != null;  // entry不为空
             i = prevIndex(i, len))
            if (e.get() == null)
                slotToExpunge = i; 
    
        // 从后一个位置向后探索
        for (int i = nextIndex(staleSlot, len);
             (e = tab[i]) != null; // entry不为空
             i = nextIndex(i, len)) {
            ThreadLocal<?> k = e.get();
    
            // 如果找到key,需要将它与过期位置元素做交换,来维持哈希表顺序。
            if (k == key) {
                e.value = value; // 更新value
    
                tab[i] = tab[staleSlot]; // 元素交换
                tab[staleSlot] = e; 
    
                // 如果相等,表示向前遍历时没有找到key为null的元素
                // 前面语句进行了元素交换,此时位置i之前的元素均不需要清理
                if (slotToExpunge == staleSlot)
                    slotToExpunge = i; // 更新清理开始位置
                // 从slotToExpunge位置清理过期Entry(key == null)
                cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                return;
            }
    
            // 如果遍历中找到key为null的元素,并且向前没有找到key为null的位置
            // 更新清理开始位置slotToExpunge为当前位置i
            if (k == null && slotToExpunge == staleSlot)
                slotToExpunge = i;
        }
    
        // 如果没找到key,新建Entry发在staleSlot位置
        tab[staleSlot].value = null; // 清理动作
        tab[staleSlot] = new Entry(key, value);
    
        // 如果除了staleSlot位置,还有其他位置元素要清理(key == null的Entry)
        if (slotToExpunge != staleSlot)
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
    }
    

    【注】slotToExpunge变量记录着元素清理的最开始位置。

    (2)cleanSomeSlots方法

    尝试扫描一些过期元素的位置,当添加新元素、清理其他过期元素时被调用。

    从i位置开始扫描,i位置不是过期元素。而参数n用来限制扫描次数,如果过期Entry没有发现,可以扫描log(n)次,log(n)的设定出于性能的考虑。

    private boolean cleanSomeSlots(int i, int n) {
        boolean removed = false;
        Entry[] tab = table;
        int len = tab.length;
        do {
            i = nextIndex(i, len); // 从下一个位置开始
            Entry e = tab[i];
            // 遍历到key==null的Entry
            if (e != null && e.get() == null) {
                n = len; // 重置n
                removed = true; // 标志有清理元素
                i = expungeStaleEntry(i); // 清理
            }
        } while ( (n >>>= 1) != 0); // log(n) 限制--对数次
        return removed;
    }
    

    (3)expungeStaleEntry(int staleSlot) 方法

    作用:从staleSlot位置开始,清理key为null的Entry,直到entry为null的位置。过程中,遇到的entry不为空,并且entry.get()不为空的元素,进行rehash重新确定该元素位置。

    private int expungeStaleEntry(int staleSlot) {
        Entry[] tab = table;
        int len = tab.length;
    
        // 清理staleSlot位置的Entry对象
        tab[staleSlot].value = null;
        tab[staleSlot] = null;
        size--;
    
        // rehash直到entry为null
        Entry e;
        int i;
        for (i = nextIndex(staleSlot, len);
             (e = tab[i]) != null; // entry不为null
             i = nextIndex(i, len)) {
            ThreadLocal<?> k = e.get();
            if (k == null) { // 过期Entry,处理方式同staleSlot
                e.value = null;
                tab[i] = null;
                size--;
            } else {
                // rehash计算出当前Entry索引
                int h = k.threadLocalHashCode & (len - 1);
                if (h != i) { // 不相等,说明原位置是探测出的。
                    tab[i] = null; // 原位置对象置空
    
                    // 如果h索引位置不为null,向后探测,直到找到null位置
                    while (tab[h] != null)
                        h = nextIndex(h, len);
                    tab[h] = e;
                }
            }
        }
        return i; // i位置 (entry = tab[i]) == null
    }
    

    3. ThreadLocalMap类之getEntry()方法

    根据ThreadLocal获取对应Entry对象。

    private Entry getEntry(ThreadLocal<?> key) {
        // 根据hashcode计算直接hash索引
        int i = key.threadLocalHashCode & (table.length - 1);
        Entry e = table[i];
        // 如果是找到并且是有效Entry对象,直接返回
        if (e != null && e.get() == key)
            return e;
        else // 不是目标Entry
            return getEntryAfterMiss(key, i, e);
    }
    
    private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
        Entry[] tab = table;
        int len = tab.length;
    
        while (e != null) {
            ThreadLocal<?> k = e.get();
            if (k == key) // 找到目标Entry
                return e;
            if (k == null) // 清理过期Entry
                expungeStaleEntry(i);
            else // 当前位置是有效Entry,索引右移
                i = nextIndex(i, len);
            e = tab[i];
        }
        return null; // 找不到,返回null
    }
    

    4. ThreadLocalMap类之remove()方法

    移除ThreadLocal变量对应的Entry对象。

    private void remove(ThreadLocal<?> key) {
        Entry[] tab = table;
        int len = tab.length;
        int i = key.threadLocalHashCode & (len-1);
        // 遍历找到指定key的Entry对象
        for (Entry e = tab[i];
             e != null;
             e = tab[i = nextIndex(i, len)]) {
            if (e.get() == key) {
                e.clear(); // 清理引用
                expungeStaleEntry(i); // 清理过期Entry
                return;
            }
        }
    }
    

    5. ThreadLocalMap类之rehash()方法

    private void rehash() {
        // 清理table中所有过期Entry对象。
        expungeStaleEntries();
    
        // 如果元素个数超过阈值的3/4时,进行扩容
        // 注:阈值本身是数组长度的2/3
        if (size >= threshold - threshold / 4)
            resize();
    }
    // 扩容至原容量的2倍
    private void resize() {
        Entry[] oldTab = table;
        int oldLen = oldTab.length;
        int newLen = oldLen * 2;
        Entry[] newTab = new Entry[newLen];
        int count = 0;
        // 遍历老表,设置新表
        for (int j = 0; j < oldLen; ++j) {
            Entry e = oldTab[j];
            if (e != null) {
                ThreadLocal<?> k = e.get();
                if (k == null) { // 过期Entry,清空value
                    e.value = null; // 利于垃圾回收
                } else {
                    // 计算索引
                    int h = k.threadLocalHashCode & (newLen - 1);
                    // 如果直接hash索引位置不为空,继续向后探索
                    while (newTab[h] != null)
                        h = nextIndex(h, newLen);
                    newTab[h] = e;
                    count++;
                }
            }
        }
        setThreshold(newLen); // 更新阈值
        size = count;
        table = newTab;
    }
    

    四、ThreadLocal内存泄漏问题

    每个线程的ThreadLocal变量都存放在该线程的threadLocals变量中,如果当前线程一直不退出,这些ThreadLocal变量会一直存在,因此可能会导致内存泄漏。通过调用ThreadLocal类的remove方法避免这一问题。

    参考博客的见解

    ThreadLocalMap中采用ThreadLocal弱引用作为Entry的key,如果一个ThreadLocal没有外部强引用来引用它,下一次系统GC时,这个ThreadLocal必然会被回收,ThreadLocalMap中就会出现key为null的Entry。

    ThreadLocal类的set、get、remove方法都可能触发对key为null的Entry清理操作。expungeStaleEntry方法会清空Entry及其value,Entry会在下次GC被回收。

    如果当前线程一直在运行,并且一直不执行get、set、remove方法,这些key为null的Entry的value就会一直存在一条强引用链:Thread Ref -> Thread -> ThreadLocalMap -> Entry -> value,导致这些key为null的Entry的value永远无法回收,造成内存泄漏。

    参考资料:

    1. 《Java并发编程实战》
    2. 《Java并发编程之美》
    3. https://blog.csdn.net/v123411739/article/details/78698834
    版权声明:本文为博主原创文章,未经博主允许不得转载。
  • 相关阅读:
    浅谈缓存管理
    [Architecture Pattern] Database Migration (上)
    knockout.js的学习笔记4
    cenOS5.5安装oracle10g(傻瓜篇)
    SportsStore:管理 — 精通ASP.NET MVC 3
    ASP.NET MVC下的异步Action的定义和执行原理
    基于EF 4.3.1 Code First的领域驱动设计实践案例
    asp.net缓存
    《Pro ASP.NET MVC 3 Framework》部署
    Asp.Net MVC 3.0【Hello World!】
  • 原文地址:https://www.cnblogs.com/dtyy/p/10664140.html
Copyright © 2011-2022 走看看