Netty-FastThreadLocal
在原生的ThreadLocal中有三个角色,Thread,ThreadLocal和ThreadLocalMap。分别对应了Netty的FastThreadLocalThread,FastThreadLocal和InternalThreadLocalMap。
众所周知, 原生的ThreadLocal效率不高有几个原因.
查找当前Thread绑定的变量, 是通过在Map中根据ThreadLocal的hash去查找的,如果能命中,那么效率还是可以, 但是再快还是不如数组下标访问。
当ThreadLocalMap命中但有hash冲突的时候,就很麻烦, 会做线性探测,一但触发会导致效率及低。
也就是说当hash的index被人占用后,会往后看是否有位置可能存放,如此往复,直到有空位为止.
Map扩容涉及到重新计算部分index, 最糟的情况还要挪动元素的位置
当线程执行完, 最好清掉所绑定的threadlocal变量, 不然会内存泄漏. 现实中, 往往会忘记清理.
Netty针对这些问题, 实现了满足原生ThreadLocal语义的实现, 更加高效.
Netty实现了java.lang.Thread子类FastThreadLocalThread来保证能够使用FastThreadLocal,FastThreadLocalThread持有一个InternalThreadLocalMap实例,类似于JDK的ThreadLocalMap,InternalThreadLocalMap存储了这个线程持有的所有线程私有变量,以FastThreadLocal为键,对应的变量为值。
线程封装
// 在EventLoop中的Executor在生成线程的时候借助threadFactory
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
// DefaultThreadFactory
public Thread newThread(Runnable r) {
Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
...
}
Runnable
// 在Runnable之上再封装
public void run() {
try {
r.run();
} finally {
// 移除绑定在该线程上的所有variables
FastThreadLocal.removeAll();
}
}
Thread
// 可以看到针对Thread的增强,添加了InternalThreadLocalMap
public class FastThreadLocalThread extends Thread {
// 类似Thread原生的ThreadLocalMap
private InternalThreadLocalMap threadLocalMap;
public FastThreadLocalThread() { }
...
构造函数
...
public final InternalThreadLocalMap threadLocalMap() {
return threadLocalMap;
}
public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
this.threadLocalMap = threadLocalMap;
}
}
InternalThreadLocalMap
1563865785219.png
初始化
public static final Object UNSET = new Object();
private InternalThreadLocalMap() {
super(newIndexedVariableTable());
}
private static Object[] newIndexedVariableTable() {
// 初始化一个32长度的Object数组,并填满,作为保存线程绑定的变量之用,之后不够会扩容
Object[] array = new Object[32];
Arrays.fill(array, UNSET);
return array;
}
UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
// 将object数组保存到indexedVariables
this.indexedVariables = indexedVariables;
}
扩容
private void expandIndexedVariableTableAndSet(int index, Object value) {
Object[] oldArray = indexedVariables;
final int oldCapacity = oldArray.length;
int newCapacity = index;
// 如果当前index已经超过了indexedVariables的长度,那么在这里进行扩容
// 新的长度扩容为下一个2的幂,也就是当前的2倍,但要保证是2的幂
newCapacity |= newCapacity >>> 1;
newCapacity |= newCapacity >>> 2;
newCapacity |= newCapacity >>> 4;
newCapacity |= newCapacity >>> 8;
newCapacity |= newCapacity >>> 16;
newCapacity ++;
// 将老的元素拷贝到新的数组下
Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
// 将新增的部分填充object对象
Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
// 将value设置到index位置
newArray[index] = value;
// 更新indexedVariables为新数组
indexedVariables = newArray;
}
get
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
// 如果是FastThreadLocalThread封装过的线程,那么返回netty的map
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
} else {
// 否则返回原生的threadlocalmap
return slowGet();
}
}
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
private static InternalThreadLocalMap slowGet() {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}
setIndexedVariable
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
// 如果当前能放下,则将value添加到index位置
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {
// 扩容并set
expandIndexedVariableTableAndSet(index, value);
return true;
}
}
remove
// 清空该线程绑定的所有threadlocal变量,不管是netty的还是原生的.
public static void remove() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
((FastThreadLocalThread) thread).setThreadLocalMap(null);
} else {
slowThreadLocalMap.remove();
}
}
removeIndexedVariable
public Object removeIndexedVariable(int index) {
// 重置index位置的元素,并返回
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object v = lookup[index];
lookup[index] = UNSET;
return v;
} else {
return UNSET;
}
}
FastThreadLocal
1563870865100.png
1563866750196.png
初始化
public FastThreadLocal() {
// 拿到InternalThreadLocalMap的下一个index
// 而这个index代表当前最后一个变量位置
index = InternalThreadLocalMap.nextVariableIndex();
}
Set
public final void set(V value) {
// 首先你要set的value不能是空的object对象
if (value != InternalThreadLocalMap.UNSET) {
set(InternalThreadLocalMap.get(), value);
} else {
// 如果set的value是空object,那么进行remove
// 意味着要删除该线程绑定的FastThreadLocal变量
remove();
}
}
// 总结,当跟线程绑定的theadlocal变量设置时,会设置两个地方
// 1. 将该FastThreadLocal和value,写入该线程的ThreadLocalMap中。
// 2. 将FastThreadLocal记录到VariablesToRemove中
public final void set(InternalThreadLocalMap threadLocalMap, V value) {
if (value != InternalThreadLocalMap.UNSET) {
if (threadLocalMap.setIndexedVariable(index, value)) {
// 添加到快速删除列表,优化删除
addToVariablesToRemove(threadLocalMap, this);
}
} else {
// 同上
remove(threadLocalMap);
}
}
addToVariablesToRemove
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
// 拿到index为0位置的Set,而这个set记录了所有当前保存的ThreadLocal变量列表用于快速删除
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
Set<FastThreadLocal<?>> variablesToRemove;
// 如果位置0没有,则新建一个
if (v == InternalThreadLocalMap.UNSET || v == null) {
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}
// 将新的变量加到这个variablesToRemove里面
variablesToRemove.add(variable);
}
remove
public final void remove() {
// 拿到当前线程绑定的InternalThreadLocalMap做remove
remove(InternalThreadLocalMap.getIfSet());
}
public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}
// 移除该该线程绑定的FastThreadLocal变量
Object v = threadLocalMap.removeIndexedVariable(index);
// 再在VariablesToRemove列表中移除该FastThreadLocal
removeFromVariablesToRemove(threadLocalMap, this);
if (v != InternalThreadLocalMap.UNSET) {
try {
// 如果真实删除了变量,那么回调这里,子类去实现
onRemoval((V) v);
} catch (Exception e) {
PlatformDependent.throwException(e);
}
}
}
removeFromVariablesToRemove
private static void removeFromVariablesToRemove(
InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
// 拿到variablesToRemoveIndex的删除列表
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
// 如果没有,那么不用继续
if (v == InternalThreadLocalMap.UNSET || v == null) {
return;
}
@SuppressWarnings("unchecked")
// 如果有,那么外面删除了元素的话,还需要同步删除这个列表
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
variablesToRemove.remove(variable);
}
get
public final V get() {
// 拿到当前线程绑定的InternalThreadLocalMap做get
return get(InternalThreadLocalMap.get());
}
public final V get(InternalThreadLocalMap threadLocalMap) {
// 拿到该FastThreadLocal变量在map中的位置,去获取可能的value
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
// 到这里说明还没有设置变量
return initialize(threadLocalMap);
}
// 这里相当于初始化一个value,并设置到map和remove列表中,至于初始化一个怎样的value,子类去决定
private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
// 子类去决定初始值
v = initialValue();
} catch (Exception e) {
PlatformDependent.throwException(e);
}
threadLocalMap.setIndexedVariable(index, v);
addToVariablesToRemove(threadLocalMap, this);
return v;
}
removeAll
public static void removeAll() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
...
try {
// 这里定位到苦心经营的variablesToRemove列表
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v != null && v != InternalThreadLocalMap.UNSET) {
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
FastThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);
for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
// 遍历该列表的所有FastThreadLocal, 调用remove,删除对应的threadlocal变量
tlv.remove(threadLocalMap);
}
}
} finally {
// 清空线程绑定的所有变量
InternalThreadLocalMap.remove();
}
}












网友评论