如何进行ThreadLocal源码分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
ThreadLocal是线程本地变量,ThreadLocal为每一个线程创建一个单独的变量副本ThreadLocalMap,所以每个线程修改自己变量副本不会影响其它的线程。区别于线程同步,我们知道线程同步是为了解决多线程下共享变量的安全问题,而ThreadLocal是为了解决线程内部数据传递问题。一个线程内部可以有多个ThreadLocal,但是它门维护线程的同一个ThreadLocalMap变量,共用同一个Entry数组。
ThreadLocal数据结构:
每个线程内部有一个ThreadLocalMap属性,ThreadLocal通过维护该属性来保证单个线程内部数据共享。ThreadLocalMap内部有一个entry数组,该数组是key,value型结构,key为当前ThreadLocal的弱引用,value用于存放具体的值,类型为一个泛型结构,支持各种数据变量。ThredLocalMap内Entry数组的下标值也是通过 key.threadLocalHashCode & (数组长度 - 1)来确定的,只不过这个threadLocalHashCode 是通过AutomicLong每次递增0x61c88647来确定的,这可以尽量减少hash碰撞。不同于HashMap,ThreadLocalMap内部只维护了一个Entry数组,所以当发生hash冲突的时候,ThreadLocalMap会将发生hash冲突的Entry放在当前key对应数组下标后面第一个为空的数组槽位内。ThreadLocal的扩容阈值默认为数组大小的 2/3。因为Entry的key为当前threadlcoal的弱引用,所以在发生gc的时候容易导致key被回收,但是此时value为强引用,所以这种情况会导致内存溢出。但是,当我们调用threadlocal的set,get,remove方法的时候,ThreadLocalMap内都会发生回收过期key的操作,不过这种回收是一种抽样回收,可能并不能回收所有的过期key。而且在执行set方法回收的时候,可能发生扩容,这时候的扩容判断是当前数组的长度的1/2。Entry数组默认初始化长度为16。
public class ThreadLocalTest { private static final ThreadLocal<String> threadLocal = new ThreadLocal(); private static String str = null; public static void print1() { System.out.println("打印方法1输出:" + threadLocal.get()); } public static void print2() { System.out.println("打印方法2输出:" + str); } public static void main(String[] args) { //线程1 new Thread(() -> { threadLocal.set("线程1设置的str1"); str = "线程1设置的str2"; //睡5秒钟 try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } //睡5秒钟后打印,此时第2个线程早已执行完 print1(); print2(); }).start(); //线程2 new Thread(() -> { threadLocal.set("线程2设置的str1"); str = "线程2设置的str2"; //直接打印 print1(); print2(); }).start(); }}
运行结果:
打印方法1输出:线程2设置的str1打印方法2输出:线程2设置的str2打印方法1输出:线程1设置的str1打印方法2输出:线程2设置的str2
根据运行结果分析出,使用ThreadLocal的存储的变量在多线程不存在线程安全问题,常规创建的属性在多线程下存在线程安全问题。
ThreadLocal中使用了斐波那契散列法,来保证哈希表的离散度。可以保证 nextHashCode 生成的哈希值,均匀的分布在 2 的幂次方上。具体的数学问题不在这里深究。
private final int threadLocalHashCode = nextHashCode();private static AtomicInteger nextHashCode = new AtomicInteger();//十进制1640531527=0.618*2^32,这个值是黄金分割率*2^32private static final int HASH_INCREMENT = 0x61c88647;//每次调用该方法,hashcode值就会递增HASH_INCREMENTprivate static int nextHashCode() { return nextHashCode.getAndAdd(HASH_INCREMENT);}
//用于计算数组下标的值,table.length - 1转二进制有N个1,那么//key.threadLocalHashCode & (table.length - 1)的值就是threadLocalHashCode的低N位int i = key.threadLocalHashCode & (table.length - 1);
public void set(T value) { //获取当前线程 Thread t = Thread.currentThread(); //根据当前线程获取ThreadLocalMap ThreadLocalMap map = getMap(t); //如果map为空则创建一个,否则设置属性值 if (map != null) //key为当前thread的引用则设置该值 map.set(this, value); else //map为空则创建当前线程的ThreadMap并和当前线程绑定 createMap(t, value);}
private void set(ThreadLocal<?> key, Object value) { //将初始化后的当前数组赋值给临时数组tab Entry[] tab = table; //获取当前临时tab数组长度 int len = tab.length; //计算当前key对应的数组下标 int i = key.threadLocalHashCode & (len-1); //从当前下标开始循环往后遍历,如果当前数组槽为空,则直接跳出循环,如果不为空,则进行key的判断 //因为ThreadLocalMap的结构只是数组,没有链表,当key发生冲突, //不同的key定位到相同的数组下标的时候,会往后寻找第一个下标为null //的槽或者第一个key位过期key的槽,并将entry放入并赋值 for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { //对应下标为i的槽位为空的时候才会走到循环里面的逻辑 //获取key ThreadLocal<?> k = e.get(); //CASE1:如果key相同,替换value并跳出循环 if (k == key) { e.value = value; return; } //CASE2:如果key为空,说明key已经过期了,当前下标对应的槽可以被使用 if (k == null) { //替换过期key的逻辑 replaceStaleEntry(key, value, i); return; } } //如果当前下标下的数组槽为空,占用该槽位并赋值 tab[i] = new Entry(key, value); //递增数组大小 int sz = ++size; //没有清理到数据,且size大小达到了扩容阈值 if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash();}
给当前key找数组槽位的时候,找到的下标对应的key为过期的key的时候,执行替换操作
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) { //数组列表 Entry[] tab = table; //数组长度 int len = tab.length; //临时变量 Entry e; //需要清理的数据的开始下标,默认为当前staleSlot int slotToExpunge = staleSlot; //从当前staleSlot向前查找,找对应数组槽下的entry,直到碰到空的槽则退出循环 for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len)) //如果在查找过程中,碰到key为过期key的情况,更新需要清理的数据的开始下标 if (e.get() == null) slotToExpunge = i; //从当前staleSlot向后查找,找对应数组槽下的entry,直到碰到空的槽则退出循环 for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { //获取当前元素的key ThreadLocal<?> k = e.get(); //如果key相同,则替换value,迁移数据位置 if (k == key { e.value = value; //将过期的tab[staleSlot]放到找到的i下标下 tab[i] = tab[staleSlot]; //当前staleSlot下标下的槽替换为当前的entry,数据的位置被优化了 tab[staleSlot] = e; //条件成立说明向前过程中并没有找到过期的key if (slotToExpunge == staleSlot) //修改需要清理数据的开始下标为替换数据后的下标 slotToExpunge = i; //清理数据 cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); return; } //k==null说明循环过程中未找到匹配的key //slotToExpunge == staleSlot说明向前遍历过程中未找到过期的key if (k == null && slotToExpunge == staleSlot) //可以将循环向后查找的i指向slotToExpunge,因为在向后查找的过程中没有找到相同的key //该段期间没必要处理了 slotToExpunge = i; } //走到这里说明循环向后查找的过程中,没有找到相同的key //直接使用当前下标并赋值 tab[staleSlot].value = null; tab[staleSlot] = new Entry(key, value); //条件成立,说明在向前向后遍历中,slotToExpunge被改变了 if (slotToExpunge != staleSlot) cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);}
为什么有while ( (n >>>= 1) != 0),这样不是可能清理不了所有数据吗?是的,ThreadLocal的设计行就是部分清除,类似于抽样,避免清理所有影响性能。
private boolean cleanSomeSlots(int i, int n) { boolean removed = false; Entry[] tab = table; int len = tab.length; do { i = nextIndex(i, len); Entry e = tab[i]; if (e != null && e.get() == null) { n = len; removed = true; //执行清理,可能会迁移数据 i = expungeStaleEntry(i); } } while ( (n >>>= 1) != 0); return removed;}
扩容之前,进行一次全面的清理操作
private void rehash() { expungeStaleEntries(); if (size >= threshold - threshold / 4) resize();}
扩容逻辑,比较简单,数组变大两倍,旧数据迁移到新数组,如果key已经过期的,则直接将value也设置为空。这里需要注意的时候,清理过程中扩容的阈值是原数组容量的 1/2, size >= threshold - threshold / 4,我们直到threashold = 2 / 3 * length, 所以转化后size >= 3 / 4 * (2 / 3) * length。
private void resize() { Entry[] oldTab = table; int oldLen = oldTab.length; int newLen = oldLen * 2; Entry[] newTab = new Entry[newLen]; int count = 0; for (int j = 0; j < oldLen; ++j) { Entry e = oldTab[j]; if (e != null) { ThreadLocal<?> k = e.get(); //如果对应的key已经回收 if (k == null) { //value设置为空 e.value = null; // Help the GC } else { //进行数据迁移,如果存在冲突,则放到计算出来的下标的后方第一个不为null的槽 int h = k.threadLocalHashCode & (newLen - 1); while (newTab[h] != null) h = nextIndex(h, newLen); newTab[h] = e; count++; } } } //重新设置扩容阈值 setThreshold(newLen); size = count; table = newTab;}
当我们调用threadLocal的get方法的时候,首先会调用getMap方法,该方法根据当前线程获取当前线程的ThreadLocal.ThreadLocalMap threadLocals属性,如果非空,再获取对应的ThreadLocal的ThreadLocalMap 里面的entry,根据entry获取对应的value,这个过程会调用expungestaleEntry方法,清空key为空的hash槽的值,并将key不为空的且通过key的hash值计算出来的下标发生过向后偏移的entry移动到更靠近计算出来的下标值的后面的某个空的槽内。如果getMap返回空,说明我们可能没用调用ThreadLocal的set方法的情况下调用了get方法,那么创建一个ThreadLocalMap,初始化entry数组,设置扩容阈值,并设置对应的ThreadLocal的hash槽的值为空。
public T get() { //获取当前线程 Thread t = Thread.currentThread(); //取出当前线程的ThreadLocalMap属性 ThreadLocalMap map = getMap(t); //如果当前线程的ThreadLocalMap不为空 if (map != null) { //获取ThreadLocalMap的Entry数组 ThreadLocalMap.Entry e = map.getEntry(this); //如果数组不为空,取出value值返回 if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue();}
//获取thread的threadLocals属性ThreadLocalMap getMap(Thread t) { return t.threadLocals;}
//获取ThreadLocalMap的entry数组对应下标的数据private Entry getEntry(ThreadLocal<?> key) { //计算下标 int i = key.threadLocalHashCode & (table.length - 1); //获取对应下标数据 Entry e = table[i]; if (e != null && e.get() == key) return e; //如果取不到,为什么有这种情况? //从put方法中我们知道,threadlocalMap不同于hashMap //内部只有数组,数组的每个hash槽下只有一个entry值 //如果在put的时候发现对应hash槽的值不为空,且key不相同 //则往后找第一个为空的hash槽,讲entry放入该hash槽 else return getEntryAfterMiss(key, i, e);}
//从对应下标往后循环查找,这里有个特殊的地方nextIndex//该方法:从对应下标往后循环返回下标,如果超出数组长度,//则从0下标开始继续往后循环返回下标private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) { Entry[] tab = table; int len = tab.length; //循环遍历 while (e != null) { ThreadLocal<?> k = e.get(); //case1:key值相同,返回对应的entry if (k == key) return e; //case2:发现对应entry数组下标下的key为空,清理 if (k == null) expungeStaleEntry(i); //case3:key不为空但key不相同,数组下标往后推进 else i = nextIndex(i, len); //返回下一个下标值对应的entry e = tab[i]; } return null;}
//从对应下标往后循环,如果超出数组长度,则从0下标开始继续往后循环//返回具体下标值private static int nextIndex(int i, int len) { return ((i + 1 < len) ? i + 1 : 0);}
从当前staleSlot开始循环清理过期key对应的entry数组内的值;如果key不为空且当前线程对应的threadlocal的hash值计算出来的下标发生过迁移,说明之前在put的时候,在对应下标下发生过hash冲突,将当前下标下的entry数组对应的值置为null,并将当前下标下的entry值移动到更接近通过hash值计算出来的下标之后的某个空的槽中。循环在进行下标右移的过程中,如果碰到对应下标下的槽数据为空,则退出循环。该方法在执行的时候会将本该在staleSlot位置的key对应的变量移动到该位置或更靠近该位置的后方。避免remove方法遍历的时候出现null导致清理不到的情况。
private int expungeStaleEntry(int staleSlot) { //将全局entry数组赋值给临时tab Entry[] tab = table; //临时entry数组当前长度 int len = tab.length; //设置对应数组下标下的entry的value为空 tab[staleSlot].value = null; //设置对应entry为空 tab[staleSlot] = null; //entry数组全局长度-1 size--; Entry e; int i; //从当前下标往后循环遍历,直到对应的下标下槽内数据为空跳出循环 for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { //获取对应下标下当前entry对应的key ThreadLocal<?> k = e.get(); //如果key为空则清理entry的value和设置当前数组对应entry为空 if (k == null) { e.value = null; tab[i] = null; size--; //如果key不为空 } else { //计算获取对应的下标,这个本该是存放entry的位置,但是可能由于hash冲突,put的时候向后偏移了 int h = k.threadLocalHashCode & (len - 1); //条件成立说明在put的时候计算出来的下标发生过hash冲突 //数据向后偏移过,而且 h < i if (h != i) { //将当前下标下entry设置为空 tab[i] = null; //从计算出来的下标h循环向后获取一个对应entry为空的下标值 //该下标下存放当前entry while (tab[h] != null) //这个新计算出来的h的值更靠近计算获取的下标 h = nextIndex(h, len); //将entry放在对应下标 tab[h] = e; } } //返回进行处理过后的起点下标i return i;}
private T setInitialValue() { //获取一个空值 T value = initialValue(); Thread t = Thread.currentThread(); //获取当前线程的ThreadMap ThreadLocalMap map = getMap(t); //如果不为空,则将当前空值注入 if (map != null) map.set(this, value); else //否则创建这个ThreadMap并和当前Thread绑定 createMap(t, value); return value;}
remove方法也很简单,就是将key的引用设置为null,然后找到key所对应的数组槽位,执行清理操作。
在ThreadLocal使用完毕后,执行remove方法防止内存溢出。
public void remove() { ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) m.remove(this);}
private void remove(ThreadLocal<?> key) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { if (e.get() == key) { e.clear(); expungeStaleEntry(i); return; } }}
public void clear() { this.referent = null;}
上面说完了ThreadLocal的问题,可以看出,ThreadLocal只能在单个线程内部传递参数,无法在子父线程间传递参数。
但是InheritableThreadLocal的出现解决了这个问题。
public class InheriTableThreadLocalTest { private static final InheritableThreadLocal<String> threadLocal = new InheritableThreadLocal<>(); public static void main(String[] args) { threadLocal.set("主线程设置值"); new Thread(() -> { System.out.println(threadLocal.get()); }).start(); }}
分析InheritableThreadLocal类,发现继承于ThreadLocal,但是在createMap,getMap的时候维护的是inheritableThreadLocals
public class InheritableThreadLocal<T> extends ThreadLocal<T> { protected T childValue(T parentValue) { return parentValue; } ThreadLocalMap getMap(Thread t) { return t.inheritableThreadLocals; } void createMap(Thread t, T firstValue) { t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue); }}
在线程初始化的代码init方法中,有这么一段逻辑:
如果父线程的inheritThreadLocals不为空,则调用ThreadLocal.createInheritedMap方法,该方法传递了父线程的inheritableThreadLocals
if (inheritThreadLocals && parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
再看看ThreadLocal.createInheritedMap方法,子线程在创建的时候,将父线程的inheritableThreadLocals复制了过来保存在了自己的inheritableThreadLocals中。
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) { return new ThreadLocalMap(parentMap);}
private ThreadLocalMap(ThreadLocalMap parentMap) { Entry[] parentTable = parentMap.table; int len = parentTable.length; setThreshold(len); table = new Entry[len]; for (int j = 0; j < len; j++) { Entry e = parentTable[j]; if (e != null) { @SuppressWarnings("unchecked") ThreadLocal<Object> key = (ThreadLocal<Object>) e.get(); if (key != null) { Object value = key.childValue(e.value); Entry c = new Entry(key, value); int h = key.threadLocalHashCode & (len - 1); while (table[h] != null) h = nextIndex(h, len); table[h] = c; size++; } } }}
看完上述内容,你们掌握如何进行ThreadLocal源码分析的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。