美文网首页
异步线程的变量传递

异步线程的变量传递

作者: Cheava | 来源:发表于2019-03-24 11:41 被阅读0次

设想场景

假如我们需要跟踪某条请求的所有后台日志,其中这些日志的埋点有同步的,也有异步的,甚至是使用Reactor的,那这个时候,我们应该怎么跟踪?这个在分布式服务和微服务下叫全链路监控-APM,现在我们就在单机环境下即同一jvm下说明这个问题。

同步线程

SLF4J 日志框架提供了一个 MDC(Mapped Diagnostic Contexts) 工具类

public class Main {

    private static final String KEY = "requestId";
    private static final Logger logger = LoggerFactory.getLogger(Main.class);
    
    public static void main(String[] args) {

        // 入口传入请求ID
        MDC.put(KEY, UUID.randomUUID().toString());
        
        // 打印日志
        logger.debug("log in main thread 1");
        logger.debug("log in main thread 2");
        logger.debug("log in main thread 3");

        // 出口移除请求ID
        MDC.remove(KEY);
    }
}

具体可以参考如何快速过滤出一次请求的所有日志

异步线程

由于 Logback 的 MDC 实际上是一个 ThreadLocal 的实现,因此,当异步执行产生线程切换时,需要将 MDC 保存的信息进行切换。
Spring 中有一个可用的线程装饰器TaskDecorator,这个是 Spring Core 4.3 版本才加入的接口,通过实现这个接口,可以自己控制传播那些变量

/**
 * 解决异步执行时MDC内容延续的问题
 */
public class MDCTaskDecorator implements TaskDecorator {
    
    @Override
    public Runnable decorate(Runnable runnable) {
        return new MDCContinueRunableDecorator(runnable);
    }
    
    /**
     * 执行线程装饰器
     */
    protected class MDCContinueRunableDecorator implements Runnable {
        
        private final Runnable delegate;
        
        protected final Map<String, String> logContextMap;
        
        public MDCContinueRunableDecorator(Runnable runnable) {
            this.delegate = runnable;
            this.logContextMap = MDC.getCopyOfContextMap();
        }
        
        @Override
        public void run() {
            MDC.setContextMap(this.logContextMap);
            this.delegate.run();
            MDC.clear();
        }
    }
}

然后,需要自定义实现一个 TaskExecutor,替换 Spring 提供的默认实现,代码如下。

 /**
     * 自定义线程池
     * <p>
     * 用于线程切换时的MDC延续
     */
    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(maxPoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setTaskDecorator(new MDCTaskDecorator());
        executor.setThreadNamePrefix("MDCAdaptTaskExcutor-");
        executor.initialize();
        return executor;
    }

只要异步处理使用了自定义的 TaskExecutor ,即可实现上下文的自动传递。

Reactor

spring5引入webflux,其底层是基于reactor,那么reactor如何进行上下文变量的传播呢?官方提供了Context对象来替代threadlocal。

其特性如下:

  • 类似map的kv操作,比如put(Object key, Object value),putAll(Context), hasKey(Object key)
  • immutable,即同一个key,后面put不会覆盖
  • 提供getOrDefault,getOrEmpty方法
  • Context与作用链上的每个Subscriber绑定
  • 通过subscriberContext(Context)来访问
  • Context的作用是自底向上

实例

设置及读取

    @Test
    public void testSubscriberContext(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello World")
                .verifyComplete();
    }

这里从最底部的subscriberContext设置message值为World,然后flatMap里头通过subscriberContext来访问。

自底向上

    @Test
    public void testContextSequence(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                //NOTE 这个subscriberContext设置的太高了
                .subscriberContext(ctx -> ctx.put(key, "World"))
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.getOrDefault(key, "Stranger")));

        StepVerifier.create(r)
                .expectNext("Hello Stranger")
                .verifyComplete();
    }
复制代码

由于这个例子的subscriberContext设置的太高了,不能作用在flatMap里头的Mono.subscriberContext()

不可变

    @Test
    public void testContextImmutable(){
        String key = "message";

        Mono<String> r = Mono.subscriberContext()
                .map( ctx -> ctx.put(key, "Hello"))
                //这里返回了一个新的,因此上面的设置失效了
                .flatMap( ctx -> Mono.subscriberContext())
                .map( ctx -> ctx.getOrDefault(key,"Default"));

        StepVerifier.create(r)
                .expectNext("Default")
                .verifyComplete();
    }

subscriberContext永远返回一个新的

多个连续的subscriberContext

    @Test
    public void testReadOrder(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello Reactor")
                .verifyComplete();
    }

operator只会读取离它最近的一个context

flatMap间的subscriberContext

    @Test
    public void testContextBetweenFlatMap(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello Reactor World")
                .verifyComplete();
    }

flatMap读取离它最近的context

flatMap中的subscriberContext

    @Test
    public void testContextInFlatMap(){
        String key = "message";
        Mono<String> r =
                Mono.just("Hello")
                        .flatMap( s -> Mono.subscriberContext()
                                .map( ctx -> s + " " + ctx.get(key))
                        )
                        .flatMap( s -> Mono.subscriberContext()
                                .map( ctx -> s + " " + ctx.get(key))
                                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                        )
                        .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello World Reactor")
                .verifyComplete();
    }

这里第一个flatMap无法读取第二个flatMap内部的context

具体可以参考聊聊reactor异步线程的变量传递

相关文章

  • 异步线程的变量传递

    设想场景 假如我们需要跟踪某条请求的所有后台日志,其中这些日志的埋点有同步的,也有异步的,甚至是使用Reactor...

  • 浅析Handler消息传递机制

    Android的异步消息处理机制:Handler消息传递机制。 1、Message Message是在线程之间传递...

  • Jmeter中不同线程组的参数传递

    Jmeter跨线程组参数传递 引言:Jmeter中再同一个线程组中,参数传递可以使用 ${变量名} 的方式去传递参...

  • AsyncTask源码解析

    AsyncTask 执行轻量级的异步任务,将结果传递给主线程,主线程根据结果更新UI. 使用 AsyncTask创...

  • 系统编程--11.27

    1.threading.local 作用:用来解决线程传递参数比较麻烦的问题 2.异步

  • ThreadLocal学习总结

    ThreadLocal提供线程内变量的存取操作,方便在同个线程里面,数据的获取。减少函数或者组件的公共变量传递的复...

  • [Python系列]Python多线程

    背景:说到多线程,我们会想到的是:异步编程、同步(锁)、共享变量、线程池等等,那么Python里面多线程是如何实现...

  • AsyncLogging.h

    异步日志 该类 AsyncLogging 成员变量 MutexLock mutex_;互斥量,线程安全的添加日志。...

  • 线程本地变量及传递ThreadLocal,Inheritable

    在实际开发中,我们经常需要传递一些上下文变量,有些是线程独立的,有些可能需要传递到子线程,甚至是线程池中,比如,分...

  • 安卓编程技巧总结(3) 进程与线程处理

    多进程 线程的管理 线程使用 线程锁 进程使用 进程间通信时注意URI传递 7.带返回值的异步任务Callable...

网友评论

      本文标题:异步线程的变量传递

      本文链接:https://www.haomeiwen.com/subject/dyayvqtx.html