zoukankan      html  css  js  c++  java
  • Netty中FastThreadLocal源码分析(转)

    转自https://blog.csdn.net/Z_ChenChen/article/details/90733986

    Netty中使用FastThreadLocal替代JDK中的ThreadLocal 【JAVA】ThreadLocal源码分析,其用法和ThreadLocal 一样,只不过从名字FastThreadLocal来看,其处理效率要比JDK中的ThreadLocal要高

    在类加载的时候,先初始化了一个静态成员:

    private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
    1
    实际上FastThreadLocal的操作都是通过对InternalThreadLocalMap的操作来实现的,

    而InternalThreadLocalMap是UnpaddedInternalThreadLocalMap的子类,UnpaddedInternalThreadLocalMap的定义比较简单:

    class UnpaddedInternalThreadLocalMap {
    static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal();
    static final AtomicInteger nextIndex = new AtomicInteger();
    Object[] indexedVariables;
    int futureListenerStackDepth;
    int localChannelReaderStackDepth;
    Map<Class<?>, Boolean> handlerSharableCache;
    IntegerHolder counterHashCode;
    ThreadLocalRandom random;
    Map<Class<?>, TypeParameterMatcher> typeParameterMatcherGetCache;
    Map<Class<?>, Map<String, TypeParameterMatcher>> typeParameterMatcherFindCache;
    StringBuilder stringBuilder;
    Map<Charset, CharsetEncoder> charsetEncoderCache;
    Map<Charset, CharsetDecoder> charsetDecoderCache;
    ArrayList<Object> arrayList;

    UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
    this.indexedVariables = indexedVariables;
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    可以看到在类加载时,会初始化一个泛型为InternalThreadLocalMap的JDK的ThreadLocal对象作为其静态成员slowThreadLocalMap ,还有一个原子化的Integer静态成员nextIndex

    InternalThreadLocalMap的定义如下:

    public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(InternalThreadLocalMap.class);
    private static final int DEFAULT_ARRAY_LIST_INITIAL_CAPACITY = 8;
    private static final int STRING_BUILDER_INITIAL_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalMap.stringBuilder.initialSize", 1024);
    private static final int STRING_BUILDER_MAX_SIZE;
    public static final Object UNSET = new Object();
    private BitSet cleanerFlags;
    1
    2
    3
    4
    5
    6
    7
    InternalThreadLocalMap的nextVariableIndex方法:

    public static int nextVariableIndex() {
    int index = nextIndex.getAndIncrement();
    if (index < 0) {
    nextIndex.decrementAndGet();
    throw new IllegalStateException("too many thread-local indexed variables");
    } else {
    return index;
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    这是一个CAS滞后自增操作,获取nextIndex自增前的值,那么variablesToRemoveIndex初始化时就是0,且恒为0,nextIndex此时变成了1

    FastThreadLocal对象的初始化:

    private final int index = InternalThreadLocalMap.nextVariableIndex();

    public FastThreadLocal() {
    }
    1
    2
    3
    4
    由上面可知,index成员恒等于nextVariableIndex的返回值,nextIndex 的CAS操作保障了每个FastThreadLocal对象的index是不同的

    首先看到set方法:

    public final void set(V value) {
    if (value != InternalThreadLocalMap.UNSET) {
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    if (this.setKnownNotUnset(threadLocalMap, value)) {
    this.registerCleaner(threadLocalMap);
    }
    } else {
    this.remove();
    }

    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    只要set的value不是InternalThreadLocalMap.UNSET,会先调用InternalThreadLocalMap的get方法:

    public static InternalThreadLocalMap get() {
    Thread thread = Thread.currentThread();
    return thread instanceof FastThreadLocalThread ? fastGet((FastThreadLocalThread)thread) : slowGet();
    }
    1
    2
    3
    4
    判断当前线程是否是FastThreadLocalThread,是则调用fastGet,否则调用slowGet
    FastThreadLocalThread是经过包装后的Thread:

    public class FastThreadLocalThread extends Thread {
    private final boolean cleanupFastThreadLocals;
    private InternalThreadLocalMap threadLocalMap;

    public FastThreadLocalThread() {
    this.cleanupFastThreadLocals = false;
    }

    public FastThreadLocalThread(Runnable target) {
    super(FastThreadLocalRunnable.wrap(target));
    this.cleanupFastThreadLocals = true;
    }

    public FastThreadLocalThread(ThreadGroup group, Runnable target) {
    super(group, FastThreadLocalRunnable.wrap(target));
    this.cleanupFastThreadLocals = true;
    }

    public FastThreadLocalThread(String name) {
    super(name);
    this.cleanupFastThreadLocals = false;
    }

    public FastThreadLocalThread(ThreadGroup group, String name) {
    super(group, name);
    this.cleanupFastThreadLocals = false;
    }

    public FastThreadLocalThread(Runnable target, String name) {
    super(FastThreadLocalRunnable.wrap(target), name);
    this.cleanupFastThreadLocals = true;
    }

    public FastThreadLocalThread(ThreadGroup group, Runnable target, String name) {
    super(group, FastThreadLocalRunnable.wrap(target), name);
    this.cleanupFastThreadLocals = true;
    }

    public FastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) {
    super(group, FastThreadLocalRunnable.wrap(target), name, stackSize);
    this.cleanupFastThreadLocals = true;
    }

    public final InternalThreadLocalMap threadLocalMap() {
    return this.threadLocalMap;
    }

    public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
    this.threadLocalMap = threadLocalMap;
    }

    public boolean willCleanupFastThreadLocals() {
    return this.cleanupFastThreadLocals;
    }

    public static boolean willCleanupFastThreadLocals(Thread thread) {
    return thread instanceof FastThreadLocalThread && ((FastThreadLocalThread)thread).willCleanupFastThreadLocals();
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    如果看过我之前写的ThreadLocal源码分析,看到这就明白,JDK的ThreadLocal中很重要的一点是在Thread类中有一个ThreadLocalMap类型的成员,每个线程都维护这一张ThreadLocalMap,通过ThreadLocalMap来和ThreadLocal对象产生映射关系;而这里和JDK同理绑定的就是InternalThreadLocalMap。

    fastGet方法:

    private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
    InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
    if (threadLocalMap == null) {
    thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
    }

    return threadLocalMap;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    这里也和JDK的ThreadLocal类似,判断FastThreadLocalThread 线程的threadLocalMap成员是否为null,若是null,则先创建一个InternalThreadLocalMap实例:

    private InternalThreadLocalMap() {
    super(newIndexedVariableTable());
    }
    1
    2
    3
    先调用newIndexedVariableTable方法:

    private static Object[] newIndexedVariableTable() {
    Object[] array = new Object[32];
    Arrays.fill(array, UNSET);
    return array;
    }
    1
    2
    3
    4
    5
    创建了一个大小为32的数组,并且用UNSET这个Object填充了整个数组,然后调用UnpaddedInternalThreadLocalMap的构造,令indexedVariables成员保存该数组

    再来看slowGet方法:

    private static InternalThreadLocalMap slowGet() {
    ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
    InternalThreadLocalMap ret = (InternalThreadLocalMap)slowThreadLocalMap.get();
    if (ret == null) {
    ret = new InternalThreadLocalMap();
    slowThreadLocalMap.set(ret);
    }

    return ret;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    可以看到,其实这里为了提高效率,并没有直接使用JDK的ThreadLocal,而是给当前非FastThreadLocalThread线程绑定了一个ThreadLocal对象,避免直接使用JDK的ThreadLocal效率低。

    回到FastThreadLocal的set方法,在取得到了当前线程的InternalThreadLocalMap成员后,调用setKnownNotUnset方法:

    private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
    if (threadLocalMap.setIndexedVariable(this.index, value)) {
    addToVariablesToRemove(threadLocalMap, this);
    return true;
    } else {
    return false;
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    首先调用了InternalThreadLocalMap的setIndexedVariable方法:

    public boolean setIndexedVariable(int index, Object value) {
    Object[] lookup = this.indexedVariables;
    if (index < lookup.length) {
    Object oldValue = lookup[index];
    lookup[index] = value;
    return oldValue == UNSET;
    } else {
    this.expandIndexedVariableTableAndSet(index, value);
    return true;
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    因为index是不可更改的常量,所以这里有两种情况:
    当indexedVariables这个Object数组的长度大于index时,直接将value放在indexedVariables数组下标为index的位置,返回oldValue是否等于UNSET,若是不等于UNSET,说明已经set过了,直进行替换,若是等于UNSET,还要进行后续的registerCleaner
    当indexedVariables这个Object数组的长度小于等于index时,调用expandIndexedVariableTableAndSet方法扩容

    expandIndexedVariableTableAndSet方法:

    private void expandIndexedVariableTableAndSet(int index, Object value) {
    Object[] oldArray = this.indexedVariables;
    int oldCapacity = oldArray.length;
    int newCapacity = index | index >>> 1;
    newCapacity |= newCapacity >>> 2;
    newCapacity |= newCapacity >>> 4;
    newCapacity |= newCapacity >>> 8;
    newCapacity |= newCapacity >>> 16;
    ++newCapacity;
    Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
    Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
    newArray[index] = value;
    this.indexedVariables = newArray;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    如果读过HashMap源码的话对上述的位运算操作因该不陌生,这个位运算产生的newCapacity的值是大于oldCapacity的最小的二的整数幂
    【Java】HashMap中的tableSizeFor方法
    然后申请一个newCapacity大小的数组,将原数组的内容拷贝到新数组,并且用UNSET填充剩余部分,还是将value放在下标为index的位置,用indexedVariables保存新数组。

    setIndexedVariable成立后,setKnownNotUnset继续调用addToVariablesToRemove方法:

    private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
    Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
    Set variablesToRemove;
    if (v != InternalThreadLocalMap.UNSET && v != null) {
    variablesToRemove = (Set)v;
    } else {
    variablesToRemove = Collections.newSetFromMap(new IdentityHashMap());
    threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
    }

    variablesToRemove.add(variable);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    上面说过variablesToRemoveIndex恒为0,调用InternalThreadLocalMap的indexedVariable方法:

    public Object indexedVariable(int index) {
    Object[] lookup = this.indexedVariables;
    return index < lookup.length ? lookup[index] : UNSET;
    }
    1
    2
    3
    4
    由于variablesToRemoveIndex恒等于0,所以这里判断indexedVariables这个Object数组是否为空,若是为空,则返回第0个元素,若不是则返回UNSET

    在addToVariablesToRemove中,接着对indexedVariables的返回值进行了判断,
    判断不是UNSET,并且不等于null,则说明是set过的,然后将刚才的返回值强转为Set类型
    若上述条件不成立,创建一个IdentityHashMap,将其包装成Set赋值给variablesToRemove,然后调用InternalThreadLocalMap的setIndexedVariable方法,这里就和上面不一样了,上面是将value放在下标为index的位置,而这里是将Set放在下标为0的位置。

    看到这,再结合上面来看,其实已经有一个大致的想法了,一开始在set时,是将value放在InternalThreadLocalMap的Object数组下标为index的位置,然后在这里获取下标为0的Set,说明value是暂时放在下标为index的位置,然后判断下标为0的位置有没有Set,若是有,取出这个Set ,将当前FastThreadLocal对象放入Set中,则说明这个Set中存放的是FastThreadLocal集合
    那么就有如下关系:


    回到FastThreadLocal的set方法,在setKnownNotUnset成立后,调用registerCleaner方法:

    private void registerCleaner(InternalThreadLocalMap threadLocalMap) {
    Thread current = Thread.currentThread();
    if (!FastThreadLocalThread.willCleanupFastThreadLocals(current) && !threadLocalMap.isCleanerFlagSet(this.index)) {
    threadLocalMap.setCleanerFlag(this.index);
    }
    }
    1
    2
    3
    4
    5
    6
    willCleanupFastThreadLocals的返回值在前面FastThreadLocalThread的初始化时就确定了,看到isCleanerFlagSet方法:

    public boolean isCleanerFlagSet(int index) {
    return this.cleanerFlags != null && this.cleanerFlags.get(index);
    }
    1
    2
    3
    cleanerFlags 是一个BitSet对象,在InternalThreadLocalMap初始化时是null,
    若不是第一次的set操作,则根据index,获取index在BitSet对应位的值

    这里使用BitSet,使其持有的位和indexedVariables这个Object数组形成了一一对应关系,每一位都是0和1代表当前indexedVariables的对应下标位置的使用情况,0表示没有使用对应UNSET,1则代表有value

    在上面条件成立的情况下,调用setCleanerFlag方法:

    public void setCleanerFlag(int index) {
    if (this.cleanerFlags == null) {
    this.cleanerFlags = new BitSet();
    }

    this.cleanerFlags.set(index);
    }
    1
    2
    3
    4
    5
    6
    7
    逻辑比较简单,判断cleanerFlags是否初始化,若没有,则立即初始化,再将cleanerFlags中对应index位的值设为1;

    这里通过registerCleaner直接标记了所有set了value的下标可,为以后的removeAll 清除提高效率。

    下来看FastThreadLocal的get方法:

    public final V get() {
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    Object v = threadLocalMap.indexedVariable(this.index);
    if (v != InternalThreadLocalMap.UNSET) {
    return v;
    } else {
    V value = this.initialize(threadLocalMap);
    this.registerCleaner(threadLocalMap);
    return value;
    }
    }

    和上面一样,先取得当前线程持有的InternalThreadLocalMap ,调用indexedVariable方法,根据当前FastThreadLocal的index定位,判断是否是UNSET(set过),若没有set过则和JDK一样调用initialize先set:

    private V initialize(InternalThreadLocalMap threadLocalMap) {
    Object v = null;

    try {
    v = this.initialValue();
    } catch (Exception var4) {
    PlatformDependent.throwException(var4);
    }

    threadLocalMap.setIndexedVariable(this.index, v);
    addToVariablesToRemove(threadLocalMap, this);
    return v;
    }

    initialValue()方法就是对外提供的,需要手动覆盖:

    protected V initialValue() throws Exception {
    return null;
    }
    1
    2
    3
    后面的操作就和set的逻辑一样。

    remove方法:

    public final void remove() {
    this.remove(InternalThreadLocalMap.getIfSet());
    }

    getIfSet方法:

    public static InternalThreadLocalMap getIfSet() {
    Thread thread = Thread.currentThread();
    return thread instanceof FastThreadLocalThread ? ((FastThreadLocalThread)thread).threadLocalMap() : (InternalThreadLocalMap)slowThreadLocalMap.get();
    }

    和上面的get方法思路相似,只不过在这里如果获取不到不会创建
    然后调用remove重载:

    public final void remove(InternalThreadLocalMap threadLocalMap) {
    if (threadLocalMap != null) {
    Object v = threadLocalMap.removeIndexedVariable(this.index);
    removeFromVariablesToRemove(threadLocalMap, this);
    if (v != InternalThreadLocalMap.UNSET) {
    try {
    this.onRemoval(v);
    } catch (Exception var4) {
    PlatformDependent.throwException(var4);
    }
    }

    }
    }

    先检查threadLocalMap是否存在,若存在才进行后续操作:
    调用removeIndexedVariable方法:

    public Object removeIndexedVariable(int index) {
    Object[] lookup = this.indexedVariables;
    if (index < lookup.length) {
    Object v = lookup[index];
    lookup[index] = UNSET;
    return v;
    } else {
    return UNSET;
    }
    }

    和之前的setIndexedVariable逻辑相似,只不过现在是把index位置的元素设置为UNSET

    接着调用removeFromVariablesToRemove方法:

    private static void removeFromVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
    Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
    if (v != InternalThreadLocalMap.UNSET && v != null) {
    Set<FastThreadLocal<?>> variablesToRemove = (Set)v;
    variablesToRemove.remove(variable);
    }
    }

    之前说过variablesToRemoveIndex恒为0,在Object数组中下标为0存储的Set<FastThreadLocal<?>>集合(不为UNSET情况下),从集合中,将当前FastThreadLocal移除掉
    最后调用了onRemoval方法,该方法需要由用户去覆盖:

    protected void onRemoval(V value) throws Exception {
    }
    1
    2
    removeAll方法,是一个静态方法:

    public static void removeAll() {
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
    if (threadLocalMap != null) {
    try {
    Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
    if (v != null && v != InternalThreadLocalMap.UNSET) {
    Set<FastThreadLocal<?>> variablesToRemove = (Set)v;
    FastThreadLocal<?>[] variablesToRemoveArray = (FastThreadLocal[])variablesToRemove.toArray(new FastThreadLocal[0]);
    FastThreadLocal[] var4 = variablesToRemoveArray;
    int var5 = variablesToRemoveArray.length;

    for(int var6 = 0; var6 < var5; ++var6) {
    FastThreadLocal<?> tlv = var4[var6];
    tlv.remove(threadLocalMap);
    }
    }
    } finally {
    InternalThreadLocalMap.remove();
    }

    }
    }

    首先获取当前线程的InternalThreadLocalMap,若是存在继续后续操作:
    通过indexedVariable方法,取出Object数组中下标为0的Set集合(如果不是UNSET情况下),将其转换为FastThreadLocal数组,遍历这个数组调用上面的remove方法。

  • 相关阅读:
    SpringBoot启动流程分析(六):IoC容器依赖注入
    SpringBoot启动流程分析(五):SpringBoot自动装配原理实现
    SpringBoot启动流程分析(四):IoC容器的初始化过程
    Razor语法大全
    VS快捷方式小技巧
    DataTable 修改列名 删除列 调整列顺序
    更改DataTable列名方法
    log4net使用详解
    C#使用Log4Net记录日志
    经典SQL语句大全
  • 原文地址:https://www.cnblogs.com/ffaiss/p/11158783.html
Copyright © 2011-2022 走看看