美文网首页
FastThreadLocal

FastThreadLocal

作者: Pillar_Zhong | 来源:发表于2019-08-01 17:11 被阅读0次

Netty-FastThreadLocal

在原生的ThreadLocal中有三个角色,Thread,ThreadLocal和ThreadLocalMap。分别对应了Netty的FastThreadLocalThread,FastThreadLocal和InternalThreadLocalMap。

众所周知, 原生的ThreadLocal效率不高有几个原因.

  • 查找当前Thread绑定的变量, 是通过在Map中根据ThreadLocal的hash去查找的,如果能命中,那么效率还是可以, 但是再快还是不如数组下标访问。

  • 当ThreadLocalMap命中但有hash冲突的时候,就很麻烦, 会做线性探测,一但触发会导致效率及低。

    也就是说当hash的index被人占用后,会往后看是否有位置可能存放,如此往复,直到有空位为止.

  • Map扩容涉及到重新计算部分index, 最糟的情况还要挪动元素的位置

  • 当线程执行完, 最好清掉所绑定的threadlocal变量, 不然会内存泄漏. 现实中, 往往会忘记清理.

Netty针对这些问题, 实现了满足原生ThreadLocal语义的实现, 更加高效.

Netty实现了java.lang.Thread子类FastThreadLocalThread来保证能够使用FastThreadLocal,FastThreadLocalThread持有一个InternalThreadLocalMap实例,类似于JDK的ThreadLocalMap,InternalThreadLocalMap存储了这个线程持有的所有线程私有变量,以FastThreadLocal为键,对应的变量为值。

线程封装

// 在EventLoop中的Executor在生成线程的时候借助threadFactory
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
    if (threadFactory == null) {
        throw new NullPointerException("threadFactory");
    }
    this.threadFactory = threadFactory;
}

// DefaultThreadFactory
public Thread newThread(Runnable r) {
    Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
    ...
}

Runnable

// 在Runnable之上再封装
public void run() {
    try {
        r.run();
    } finally {
        // 移除绑定在该线程上的所有variables
        FastThreadLocal.removeAll();
    }
}

Thread

// 可以看到针对Thread的增强,添加了InternalThreadLocalMap
public class FastThreadLocalThread extends Thread {

    // 类似Thread原生的ThreadLocalMap
    private InternalThreadLocalMap threadLocalMap;

    public FastThreadLocalThread() { }

    ...
    构造函数
    ...

    public final InternalThreadLocalMap threadLocalMap() {
        return threadLocalMap;
    }

    public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
        this.threadLocalMap = threadLocalMap;
    }
}

InternalThreadLocalMap

1563865785219.png

初始化

public static final Object UNSET = new Object();

private InternalThreadLocalMap() {
    super(newIndexedVariableTable());
}

private static Object[] newIndexedVariableTable() {
    // 初始化一个32长度的Object数组,并填满,作为保存线程绑定的变量之用,之后不够会扩容
    Object[] array = new Object[32];
    Arrays.fill(array, UNSET);
    return array;
}

UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
    // 将object数组保存到indexedVariables
    this.indexedVariables = indexedVariables;
}

扩容

private void expandIndexedVariableTableAndSet(int index, Object value) {
    Object[] oldArray = indexedVariables;
    final int oldCapacity = oldArray.length;
    int newCapacity = index;
    // 如果当前index已经超过了indexedVariables的长度,那么在这里进行扩容
    // 新的长度扩容为下一个2的幂,也就是当前的2倍,但要保证是2的幂
    newCapacity |= newCapacity >>>  1;
    newCapacity |= newCapacity >>>  2;
    newCapacity |= newCapacity >>>  4;
    newCapacity |= newCapacity >>>  8;
    newCapacity |= newCapacity >>> 16;
    newCapacity ++;

    // 将老的元素拷贝到新的数组下
    Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
    // 将新增的部分填充object对象
    Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
    // 将value设置到index位置
    newArray[index] = value;
    // 更新indexedVariables为新数组
    indexedVariables = newArray;
}

get

public static InternalThreadLocalMap get() {
    Thread thread = Thread.currentThread();
    // 如果是FastThreadLocalThread封装过的线程,那么返回netty的map
    if (thread instanceof FastThreadLocalThread) {
        return fastGet((FastThreadLocalThread) thread);
    } else {
        // 否则返回原生的threadlocalmap
        return slowGet();
    }
}

private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
    InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
    if (threadLocalMap == null) {
        thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
    }
    return threadLocalMap;
}

private static InternalThreadLocalMap slowGet() {
    ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
    InternalThreadLocalMap ret = slowThreadLocalMap.get();
    if (ret == null) {
        ret = new InternalThreadLocalMap();
        slowThreadLocalMap.set(ret);
    }
    return ret;
}

setIndexedVariable

public boolean setIndexedVariable(int index, Object value) {
    Object[] lookup = indexedVariables;
    // 如果当前能放下,则将value添加到index位置
    if (index < lookup.length) {
        Object oldValue = lookup[index];
        lookup[index] = value;
        return oldValue == UNSET;
    } else {
        // 扩容并set
        expandIndexedVariableTableAndSet(index, value);
        return true;
    }
}

remove

// 清空该线程绑定的所有threadlocal变量,不管是netty的还是原生的.
public static void remove() {
    Thread thread = Thread.currentThread();
    if (thread instanceof FastThreadLocalThread) {
        ((FastThreadLocalThread) thread).setThreadLocalMap(null);
    } else {
        slowThreadLocalMap.remove();
    }
}

removeIndexedVariable

public Object removeIndexedVariable(int index) {
    // 重置index位置的元素,并返回
    Object[] lookup = indexedVariables;
    if (index < lookup.length) {
        Object v = lookup[index];
        lookup[index] = UNSET;
        return v;
    } else {
        return UNSET;
    }
}

FastThreadLocal

1563870865100.png 1563866750196.png

初始化

public FastThreadLocal() {
    // 拿到InternalThreadLocalMap的下一个index
    // 而这个index代表当前最后一个变量位置
    index = InternalThreadLocalMap.nextVariableIndex();
}

Set

public final void set(V value) {
    // 首先你要set的value不能是空的object对象
    if (value != InternalThreadLocalMap.UNSET) {        
        set(InternalThreadLocalMap.get(), value);
    } else {
        // 如果set的value是空object,那么进行remove
        // 意味着要删除该线程绑定的FastThreadLocal变量
        remove();
    }
}

// 总结,当跟线程绑定的theadlocal变量设置时,会设置两个地方
// 1. 将该FastThreadLocal和value,写入该线程的ThreadLocalMap中。
// 2. 将FastThreadLocal记录到VariablesToRemove中
public final void set(InternalThreadLocalMap threadLocalMap, V value) {
    if (value != InternalThreadLocalMap.UNSET) {
        if (threadLocalMap.setIndexedVariable(index, value)) {
            // 添加到快速删除列表,优化删除
            addToVariablesToRemove(threadLocalMap, this);
        }
    } else {
        // 同上
        remove(threadLocalMap);
    }
}

addToVariablesToRemove

private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
    // 拿到index为0位置的Set,而这个set记录了所有当前保存的ThreadLocal变量列表用于快速删除
    Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
    Set<FastThreadLocal<?>> variablesToRemove;
    // 如果位置0没有,则新建一个
    if (v == InternalThreadLocalMap.UNSET || v == null) {
        variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
        threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
    } else {
        variablesToRemove = (Set<FastThreadLocal<?>>) v;
    }
    // 将新的变量加到这个variablesToRemove里面
    variablesToRemove.add(variable);
}

remove

public final void remove() {
    // 拿到当前线程绑定的InternalThreadLocalMap做remove
    remove(InternalThreadLocalMap.getIfSet());
}

public final void remove(InternalThreadLocalMap threadLocalMap) {
    if (threadLocalMap == null) {
        return;
    }
    
    // 移除该该线程绑定的FastThreadLocal变量
    Object v = threadLocalMap.removeIndexedVariable(index);
    // 再在VariablesToRemove列表中移除该FastThreadLocal
    removeFromVariablesToRemove(threadLocalMap, this);

    if (v != InternalThreadLocalMap.UNSET) {
        try {
            // 如果真实删除了变量,那么回调这里,子类去实现
            onRemoval((V) v);
        } catch (Exception e) {
            PlatformDependent.throwException(e);
        }
    }
}

removeFromVariablesToRemove

private static void removeFromVariablesToRemove(
        InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {

    // 拿到variablesToRemoveIndex的删除列表
    Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);

    // 如果没有,那么不用继续
    if (v == InternalThreadLocalMap.UNSET || v == null) {
        return;
    }

    @SuppressWarnings("unchecked")
    // 如果有,那么外面删除了元素的话,还需要同步删除这个列表
    Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
    variablesToRemove.remove(variable);
}

get

public final V get() {
    // 拿到当前线程绑定的InternalThreadLocalMap做get
    return get(InternalThreadLocalMap.get());
}

public final V get(InternalThreadLocalMap threadLocalMap) {
    // 拿到该FastThreadLocal变量在map中的位置,去获取可能的value
    Object v = threadLocalMap.indexedVariable(index);
    if (v != InternalThreadLocalMap.UNSET) {
        return (V) v;
    }
    
    // 到这里说明还没有设置变量
    return initialize(threadLocalMap);
}

// 这里相当于初始化一个value,并设置到map和remove列表中,至于初始化一个怎样的value,子类去决定
private V initialize(InternalThreadLocalMap threadLocalMap) {
    V v = null;
    try {
        // 子类去决定初始值
        v = initialValue();
    } catch (Exception e) {
        PlatformDependent.throwException(e);
    }

    threadLocalMap.setIndexedVariable(index, v);
    addToVariablesToRemove(threadLocalMap, this);
    return v;
}

removeAll

public static void removeAll() {
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
    ...

    try {
        // 这里定位到苦心经营的variablesToRemove列表
        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
        if (v != null && v != InternalThreadLocalMap.UNSET) {
            @SuppressWarnings("unchecked")
            Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
            FastThreadLocal<?>[] variablesToRemoveArray =
                    variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);
            for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
                // 遍历该列表的所有FastThreadLocal, 调用remove,删除对应的threadlocal变量
                tlv.remove(threadLocalMap);
            }
        }
    } finally {
        // 清空线程绑定的所有变量
        InternalThreadLocalMap.remove();
    }
}

相关文章

网友评论

      本文标题:FastThreadLocal

      本文链接:https://www.haomeiwen.com/subject/kajcdctx.html