用这4招优雅的实现SpringBoot异步线程间数据传递

程序员蜗牛 2024-01-13 11:02:43

你好,我是蜗牛!

在实际开发中需要在父子线程之间传递一些数据,比如用户登录信息使用ThreadLocal存放保证线程隔离,代码如下:

/** * @author 公众号:程序员蜗牛g * @description 用户上下文信息 */public class UserUtils{    private static  final  ThreadLocal<String> userLocal=new ThreadLocal<>();    public static  String getUserId(){        return userLocal.get();    }    public static void setUserId(String userId){        userLocal.set(userId);    }    public static void clear(){        userLocal.remove();    }}

那么子线程想要获取这个userId如何做呢?

1. 手动设置

代码如下:

public void handlerAsync(){    //1. 获取父线程的userId        String userId = UserUtils.getUserId();        CompletableFuture.runAsync(()->{            //2. 设置子线程的值,复用            UserUtils.setUserId(userId);            log.info("子线程的值:{}",UserUtils.getUserId());        });}

这样子每次开异步线程都需要手动设置,重复代码太多,看了头疼!

2. 线程池设置TaskDecorator

TaskDecorator是一个执行回调方法的装饰器,主要应用于传递上下文,或者提供任务的监控/统计信息。

那我们该如何去使用?代码如下:

/** *@author公众号:程序员蜗牛g *@description上下文装饰器*/publicCustomTaskDecoratorimplementsTaskDecorator {@OverridepublicRunnabledecorate(Runnablerunnable) {StringrobotId =UserUtils.getUserId();System.out.println(robotId);return()-> {try{//将主线程的请求信息,设置到子线程中UserUtils.setUserId(robotId);//执行子线程,这一步不要忘了runnable.run();}finally{//线程结束,清空这些信息,否则可能造成内存泄漏UserUtils.clear();}        };}}

TaskDecorator需要结合线程池使用,实际开发中异步线程建议使用线程池,只需要在对应的线程池配置一下,代码如下:

@Bean(name="asyncServiceExecutor")publicExecutor asyncServiceExecutor() {log.info("start asyncServiceExecutor----------------");//ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();    //使用可视化运行状态的线程池ThreadPoolTaskExecutor executor =newVisiableThreadPoolTaskExecutor();//配置核心线程数executor.setCorePoolSize(corePoolSize);//配置最大线程数executor.setMaxPoolSize(maxPoolSize);//配置队列大小executor.setQueueCapacity(queueCapacity);//配置线程池中的线程的名称前缀executor.setThreadNamePrefix(namePrefix);// rejection-policy:当pool已经达到max size的时候,如何处理新任务// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());//增加线程池修饰类executor.setTaskDecorator(newCustomTaskDecorator());//增加MDC的线程池修饰类//executor.setTaskDecorator(new MDCTaskDecorator());    //执行初始化executor.initialize();log.info("end asyncServiceExecutor------------");returnexecutor;}

此时业务代码就不需要去设置子线程的值,直接使用即可,代码如下:

public voidhandlerAsync(){log.info("父线程的用户信息:{}",UserUtils.getUserId());//执行异步任务,需要指定的线程池CompletableFuture.runAsync(()->

log.info("子线程的用户信息:{}", UserUtils.getUserId()),

taskExecutor);}

来看一下结果,如下图:

这里使用的是CompletableFuture执行异步任务,使用@Async这个注解同样是可行的。

注意:无论使用何种方式,都需要指定线程池

3. InheritableThreadLocal

InheritableThreadLocal虽然能够实现父子线程间的复用,但是在线程池中使用会存在复用的问题

这种方案使用也是非常简单,直接用InheritableThreadLocal替换ThreadLocal即可,代码如下:

/** *@author公众号:程序员蜗牛g*@description用户上下文信息*/

publicUserUtils{privatestatic  final  InheritableThreadLocal<String>  threadLocal=newInheritableThreadLocal<>();publicstatic  Stringget(){returnthreadLocal.get();}publicstaticvoidset(StringuserId){        threadLocal.set(userId);}publicstaticvoidclear(){        threadLocal.remove();}}

4. TransmittableThreadLocal

TransmittableThreadLocal是由阿里开发的一个线程变量传递工具包,解决了InheritableThreadLocal只能再new Thread的时候传递本地变量,无法应用到线程池的问题。可以应用来作链路追踪,传递变量等用途,下面我们来了解一下原理。

使用起来也是非常简单,添加依赖如下:

<dependency> <groupId>com.alibaba</groupId> <artifactId>transmittable-thread-local</artifactId> <version>2.14.2</version></dependency>

UserUtils改造代码如下:

/** *@author公众号:程序员蜗牛g *@description用户上下文信息*/publicUserUtils{privatestatic  finalTransmittableThreadLocal<String> threadLocal=newTransmittableThreadLocal<>();publicstatic  Stringget(){returnthreadLocal.get();}publicstaticvoidset(StringuserId){        threadLocal.set(loginVal);}publicstaticvoidclear(){        threadLocal.remove();}}

TransmittableThreadLocal原理

TransmittableThreadLocal继承自InheritableThreadLocal,因此它可以在创建线程的时候将值传递给子线程,那么怎么确保使用线程池的时候也有效呢?我们来看一下源码

1、构造方法

publicTransmittableThreadLocal(){this(false);}publicTransmittableThreadLocal(booleandisableIgnoreNullValueSemantics){//是否忽略null值set,默认falsethis.disableIgnoreNullValueSemantics =disableIgnoreNullValueSemantics;}

2、set方法

/** * {@inheritDoc} */@Overridepublicfinalvoidset(Tvalue) {if(!disableIgnoreNullValueSemantics &&value==null) {// may set null to remove valueremove();    }else{super.set(value);addThisToHolder();    }}

先看addThisToHolder方法

@SuppressWarnings("unchecked")private voidaddThisToHolder() {if(!holder.get().containsKey(this)) {holder.get().put((TransmittableThreadLocal<Object>)this,null);

// WeakHashMap supports null value.}}

属性holder又是什么呢?

1、final static修饰的变量,只会存在一份

2、使用了WeakHashMap,弱引用,方便垃圾回收

3、key就是TransmittableThreadLocal对象

remove方法

/** * {@inheritDoc} */@Overridepublicfinalvoidremove() {removeThisFromHolder();super.remove();}

4、get方法

/** * {@inheritDoc} */@Overridepublicfinal Tget() {Tvalue =super.get();if(disableIgnoreNullValueSemantics ||value !=null)addThisToHolder();returnvalue;}

5、当我们使用线程池时,需要使用TtlRunnable.get(runnable)对runnable进行包装,或者使用TtlExecutors.getTtlExecutor(executor)对执行器进行包装,才能使线程池的变量传递起效果,那么我们就接着看一下源码的执行流程

TtlExecutors.getTtlExecutor(executor)

publicstaticExecutor getTtlExecutor(@Nullable Executor executor){if(TtlAgent.isTtlAgentLoaded()||null==executor ||executor instanceofTtlEnhanced) {returnexecutor;}//包装执行器return newExecutorTtlWrapper(executor,true);}ExecutorTtlWrapper(@NonNull Executor executor,booleanidempotent){this.executor =executor;this.idempotent =idempotent;}public voidexecute(@NonNull Runnable command){//实际上也是通过TtlRunnable对原runnable进行包装executor.execute(TtlRunnable.get(command,false,idempotent));}

可以看到,两种方式原理一样,我们直接看TtlRunnable.get()

publicstaticTtlRunnable get(@Nullable Runnable runnable,booleanreleaseTtlValueReferenceAfterRun,booleanidempotent){if(null==runnable)return null;if(runnable instanceofTtlEnhanced) {if(idempotent)return(TtlRunnable)runnable;else throw newIllegalStateException("Already TtlRunnable!");}//返回TtlRunnablereturn newTtlRunnable(runnable,releaseTtlValueReferenceAfterRun);}

构建TtlRunnable

privateTtlRunnable(@NonNull Runnable runnable,booleanreleaseTtlValueReferenceAfterRun){//原子引用this.capturedRef =newAtomicReference<Object>(capture());this.runnable =runnable;this.releaseTtlValueReferenceAfterRun =releaseTtlValueReferenceAfterRun;}

capture捕获父线程的ttl

publicstatic Objectcapture(){return newSnapshot(captureTtlValues(),captureThreadLocalValues());}privatestatic

HashMap<TransmittableThreadLocal<Object>,Object>captureTtlValues(){HashMap<TransmittableThreadLocal<Object>,Object>ttl2Value =

newHashMap<TransmittableThreadLocal<Object>,Object>();//遍历了所有holderfor(TransmittableThreadLocal<Object>threadLocal :holder.get().keySet()) {// copyValue实际上调用了TransmittableThreadLocal的get方法获取线程存储的变量值ttl2Value.put(threadLocal,threadLocal.copyValue());}returnttl2Value;}privatestaticHashMap<ThreadLocal<Object>,Object>captureThreadLocalValues(){finalHashMap<ThreadLocal<Object>,Object>threadLocal2Value =

newHashMap<ThreadLocal<Object>,Object>();for(Map.Entry<ThreadLocal<Object>,TtlCopier<Object>>entry :

threadLocalHolder.entrySet()) {final ThreadLocal<Object>threadLocal =entry.getKey();finalTtlCopier<Object>copier =entry.getValue();threadLocal2Value.put(threadLocal,copier.copy(threadLocal.get()));}returnthreadLocal2Value;}

再看TtlRunnable的run方法

public voidrun(){//获取Snapshot对象,里面存储了父线程的值final Objectcaptured =capturedRef.get();if(captured ==null||releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured,null)) {throw newIllegalStateException("TTL value reference is released after run!");}//传入capture方法捕获的ttl,然后在子线程重放,也就是调用ttl的set方法,//这样就会把值设置到当前的线程中去,最后会把子线程之前存在的ttl返回final Objectbackup =replay(captured);try{//调用原runnable的runrunnable.run();}finally{// restore(backup);}}

总结

上述列举了4种方案,蜗牛这里推荐方案2和方案4,其中两种方案的缺点非常明显,实际开发中也是采用的方案2或者方案4。

最后说一句(求关注!)

如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、转发、在看。

关注公众号:woniuxgg,在公众号中回复:笔记  就可以获得蜗牛为你精心准备的java实战语雀笔记,回复面试、开发手册、有超赞的粉丝福利!

0 阅读:0

程序员蜗牛

简介:一个大厂程序员