你好,我是蜗牛!
在实际开发中需要在父子线程之间传递一些数据,比如用户登录信息使用ThreadLocal存放保证线程隔离,代码如下:
/** * @author 公众号:程序员蜗牛g * @description 用户上下文信息 */public class UserUtils{ private static final ThreadLocal<String> userLocal=new ThreadLocal<>(); public static String getUserId(){ return userLocal.get(); } public static void setUserId(String userId){ userLocal.set(userId); } public static void clear(){ userLocal.remove(); }}
那么子线程想要获取这个userId如何做呢?
1. 手动设置
代码如下:
public void handlerAsync(){ //1. 获取父线程的userId String userId = UserUtils.getUserId(); CompletableFuture.runAsync(()->{ //2. 设置子线程的值,复用 UserUtils.setUserId(userId); log.info("子线程的值:{}",UserUtils.getUserId()); });}
这样子每次开异步线程都需要手动设置,重复代码太多,看了头疼!
2. 线程池设置TaskDecoratorTaskDecorator是一个执行回调方法的装饰器,主要应用于传递上下文,或者提供任务的监控/统计信息。
那我们该如何去使用?代码如下:
/** *@author公众号:程序员蜗牛g *@description上下文装饰器*/publicCustomTaskDecoratorimplementsTaskDecorator {@OverridepublicRunnabledecorate(Runnablerunnable) {StringrobotId =UserUtils.getUserId();System.out.println(robotId);return()-> {try{//将主线程的请求信息,设置到子线程中UserUtils.setUserId(robotId);//执行子线程,这一步不要忘了runnable.run();}finally{//线程结束,清空这些信息,否则可能造成内存泄漏UserUtils.clear();} };}}
TaskDecorator需要结合线程池使用,实际开发中异步线程建议使用线程池,只需要在对应的线程池配置一下,代码如下:
@Bean(name="asyncServiceExecutor")publicExecutor asyncServiceExecutor() {log.info("start asyncServiceExecutor----------------");//ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //使用可视化运行状态的线程池ThreadPoolTaskExecutor executor =newVisiableThreadPoolTaskExecutor();//配置核心线程数executor.setCorePoolSize(corePoolSize);//配置最大线程数executor.setMaxPoolSize(maxPoolSize);//配置队列大小executor.setQueueCapacity(queueCapacity);//配置线程池中的线程的名称前缀executor.setThreadNamePrefix(namePrefix);// rejection-policy:当pool已经达到max size的时候,如何处理新任务// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());//增加线程池修饰类executor.setTaskDecorator(newCustomTaskDecorator());//增加MDC的线程池修饰类//executor.setTaskDecorator(new MDCTaskDecorator()); //执行初始化executor.initialize();log.info("end asyncServiceExecutor------------");returnexecutor;}
此时业务代码就不需要去设置子线程的值,直接使用即可,代码如下:
public voidhandlerAsync(){log.info("父线程的用户信息:{}",UserUtils.getUserId());//执行异步任务,需要指定的线程池CompletableFuture.runAsync(()->
log.info("子线程的用户信息:{}", UserUtils.getUserId()),
taskExecutor);}
来看一下结果,如下图:
这里使用的是CompletableFuture执行异步任务,使用@Async这个注解同样是可行的。
注意:无论使用何种方式,都需要指定线程池
3. InheritableThreadLocalInheritableThreadLocal虽然能够实现父子线程间的复用,但是在线程池中使用会存在复用的问题
这种方案使用也是非常简单,直接用InheritableThreadLocal替换ThreadLocal即可,代码如下:
/** *@author公众号:程序员蜗牛g*@description用户上下文信息*/
publicUserUtils{privatestatic final InheritableThreadLocal<String> threadLocal=newInheritableThreadLocal<>();publicstatic Stringget(){returnthreadLocal.get();}publicstaticvoidset(StringuserId){ threadLocal.set(userId);}publicstaticvoidclear(){ threadLocal.remove();}}
4. TransmittableThreadLocalTransmittableThreadLocal是由阿里开发的一个线程变量传递工具包,解决了InheritableThreadLocal只能再new Thread的时候传递本地变量,无法应用到线程池的问题。可以应用来作链路追踪,传递变量等用途,下面我们来了解一下原理。
使用起来也是非常简单,添加依赖如下:
<dependency> <groupId>com.alibaba</groupId> <artifactId>transmittable-thread-local</artifactId> <version>2.14.2</version></dependency>
UserUtils改造代码如下:
/** *@author公众号:程序员蜗牛g *@description用户上下文信息*/publicUserUtils{privatestatic finalTransmittableThreadLocal<String> threadLocal=newTransmittableThreadLocal<>();publicstatic Stringget(){returnthreadLocal.get();}publicstaticvoidset(StringuserId){ threadLocal.set(loginVal);}publicstaticvoidclear(){ threadLocal.remove();}}
TransmittableThreadLocal原理TransmittableThreadLocal继承自InheritableThreadLocal,因此它可以在创建线程的时候将值传递给子线程,那么怎么确保使用线程池的时候也有效呢?我们来看一下源码
1、构造方法
publicTransmittableThreadLocal(){this(false);}publicTransmittableThreadLocal(booleandisableIgnoreNullValueSemantics){//是否忽略null值set,默认falsethis.disableIgnoreNullValueSemantics =disableIgnoreNullValueSemantics;}
2、set方法
/** * {@inheritDoc} */@Overridepublicfinalvoidset(Tvalue) {if(!disableIgnoreNullValueSemantics &&value==null) {// may set null to remove valueremove(); }else{super.set(value);addThisToHolder(); }}
先看addThisToHolder方法
@SuppressWarnings("unchecked")private voidaddThisToHolder() {if(!holder.get().containsKey(this)) {holder.get().put((TransmittableThreadLocal<Object>)this,null);
// WeakHashMap supports null value.}}
属性holder又是什么呢?
1、final static修饰的变量,只会存在一份
2、使用了WeakHashMap,弱引用,方便垃圾回收
3、key就是TransmittableThreadLocal对象
remove方法
/** * {@inheritDoc} */@Overridepublicfinalvoidremove() {removeThisFromHolder();super.remove();}
4、get方法
/** * {@inheritDoc} */@Overridepublicfinal Tget() {Tvalue =super.get();if(disableIgnoreNullValueSemantics ||value !=null)addThisToHolder();returnvalue;}
5、当我们使用线程池时,需要使用TtlRunnable.get(runnable)对runnable进行包装,或者使用TtlExecutors.getTtlExecutor(executor)对执行器进行包装,才能使线程池的变量传递起效果,那么我们就接着看一下源码的执行流程
TtlExecutors.getTtlExecutor(executor)
publicstaticExecutor getTtlExecutor(@Nullable Executor executor){if(TtlAgent.isTtlAgentLoaded()||null==executor ||executor instanceofTtlEnhanced) {returnexecutor;}//包装执行器return newExecutorTtlWrapper(executor,true);}ExecutorTtlWrapper(@NonNull Executor executor,booleanidempotent){this.executor =executor;this.idempotent =idempotent;}public voidexecute(@NonNull Runnable command){//实际上也是通过TtlRunnable对原runnable进行包装executor.execute(TtlRunnable.get(command,false,idempotent));}
可以看到,两种方式原理一样,我们直接看TtlRunnable.get()
publicstaticTtlRunnable get(@Nullable Runnable runnable,booleanreleaseTtlValueReferenceAfterRun,booleanidempotent){if(null==runnable)return null;if(runnable instanceofTtlEnhanced) {if(idempotent)return(TtlRunnable)runnable;else throw newIllegalStateException("Already TtlRunnable!");}//返回TtlRunnablereturn newTtlRunnable(runnable,releaseTtlValueReferenceAfterRun);}
构建TtlRunnable
privateTtlRunnable(@NonNull Runnable runnable,booleanreleaseTtlValueReferenceAfterRun){//原子引用this.capturedRef =newAtomicReference<Object>(capture());this.runnable =runnable;this.releaseTtlValueReferenceAfterRun =releaseTtlValueReferenceAfterRun;}
capture捕获父线程的ttl
publicstatic Objectcapture(){return newSnapshot(captureTtlValues(),captureThreadLocalValues());}privatestatic
HashMap<TransmittableThreadLocal<Object>,Object>captureTtlValues(){HashMap<TransmittableThreadLocal<Object>,Object>ttl2Value =
newHashMap<TransmittableThreadLocal<Object>,Object>();//遍历了所有holderfor(TransmittableThreadLocal<Object>threadLocal :holder.get().keySet()) {// copyValue实际上调用了TransmittableThreadLocal的get方法获取线程存储的变量值ttl2Value.put(threadLocal,threadLocal.copyValue());}returnttl2Value;}privatestaticHashMap<ThreadLocal<Object>,Object>captureThreadLocalValues(){finalHashMap<ThreadLocal<Object>,Object>threadLocal2Value =
newHashMap<ThreadLocal<Object>,Object>();for(Map.Entry<ThreadLocal<Object>,TtlCopier<Object>>entry :
threadLocalHolder.entrySet()) {final ThreadLocal<Object>threadLocal =entry.getKey();finalTtlCopier<Object>copier =entry.getValue();threadLocal2Value.put(threadLocal,copier.copy(threadLocal.get()));}returnthreadLocal2Value;}
再看TtlRunnable的run方法
public voidrun(){//获取Snapshot对象,里面存储了父线程的值final Objectcaptured =capturedRef.get();if(captured ==null||releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured,null)) {throw newIllegalStateException("TTL value reference is released after run!");}//传入capture方法捕获的ttl,然后在子线程重放,也就是调用ttl的set方法,//这样就会把值设置到当前的线程中去,最后会把子线程之前存在的ttl返回final Objectbackup =replay(captured);try{//调用原runnable的runrunnable.run();}finally{// restore(backup);}}
总结
上述列举了4种方案,蜗牛这里推荐方案2和方案4,其中两种方案的缺点非常明显,实际开发中也是采用的方案2或者方案4。
最后说一句(求关注!)如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、转发、在看。
关注公众号:woniuxgg,在公众号中回复:笔记 就可以获得蜗牛为你精心准备的java实战语雀笔记,回复面试、开发手册、有超赞的粉丝福利!