父子线程变量传递一般都用JDK的InheritableThreadLocal
,当遇到了线程池就麻烦了,因为线程池的线程是复用的,线程变量应该从哪传递呢?
[TOC]
背景
我们线上有一个核心业务指标突然掉了,如图所示,于是开始排查原因:
排查思路
快速恢复
通过我们的归因分析,发现我们的系统从某一个时刻开始线程变量取错了,如图所示:
在这一时刻,我们改动了线程池热配置,将核心线程数提高至最大线程数,如图所示:
于是我们重启了所有线上机器,指标恢复了
问题定位
指标恢复后,我们开始排查具体原因,我们系统用了一个自定义的线程池组件,可以通过热配置中心来更改核心线程数,从而实现线程池的热配置。
这是我们设置线程变量的代码,我们通过InheritableThreadLocal
创建了一个静态变量TL_APPCODE
,初始化的时候将appcode放入TL_APPCODE
和MDC
中,使用的时候依次从TL_APPCODE
和MDC
中取用appcode。
private static InheritableThreadLocal<String> TL_APPCODE = new InheritableThreadLocal<>();
/**
* 获取 appcode
*/
public static String getAppcode() {
String result = TL_APPCODE.get();
if (StringUtils.isBlank(result)) {
result = MDC.get(NM_APPCODE);
}
return result;
}
/**
* 设置 appcode
*/
public static void setAppcode(String appcode) {
if (StringUtils.isNotBlank(appcode)) {
TL_APPCODE.set(appcode);
QTracer.addKVAnnotation(NM_APPCODE, appcode);
MDC.put(NM_APPCODE, appcode);
}
}
我们在父线程中调用setAppcode()
方法设置线程变量TL_APPCODE
,在子线程中调用getAppcode()
方法获取TL_APPCODE
。
InheritableThreadLocal (Java SE 21 & JDK 21) (oracle.com)
相较于
ThreadLocal
类,JDK的InheritableThreadLocal
类可以完成父线程到子线程的值传递。
但我们有一部分流程从子线程中并没有取到父线程设置的TL_APPCODE
,原来是因为我们用了线程池,但并没有在线程池组件中对父子线程变量的传递做处理。
这是线程池组件的使用代码示例,从组件中获取一个ExecutorService线程池实例,然后通过CompletableFuture的supplyAsync方法调用异步代码,从而实现线程池的使用。
ExecutorService tgqExecutor = configuredExecutorManager.getExecutor(ExecuteType.tgq_rule_query.invoker);
CompletableFuture<TgqInfoVO> tgqFuture = CompletableFuture.supplyAsync(() -> tgqBizService.queryTgqInfo(qry), tgqExecutor);
其中getExecutor()
方法如下所示,getAndCached()
方法通过线程池配置取线程池实例:
public ExecutorService getExecutor(String threadPoolName) {
Preconditions.checkArgument(StringUtils.isNotBlank(threadPoolName), "threadPoolName is null");
ExecutorService result = Optional.ofNullable(EXECUTOR_MAP.get(threadPoolName))
.orElseGet(() -> {
ThreadPoolConfig defaultConfig = ConfigResolver.makeDefault(threadPoolName);
return getAndCached(defaultConfig);
});
int queueSize = THREAD_POOL_MAP.get(threadPoolName).getQueue().size();
QMonitor.recordSize(threadPoolName + "_queue_size", queueSize);
return result;
}
通过ThreadPoolWrapper的create方法创建ThreadPoolExecutor线程池实例,然后通过ThreadPoolWrapper的wrap方法进行修饰,最后返回对应的线程池实例。
public ExecutorService getAndCached(@NotNull ThreadPoolConfig config) {
ThreadPoolExecutor executor = Optional.ofNullable(THREAD_POOL_MAP.get(config.getName()))
.map(e -> cas(e, config))
.orElseGet(() -> {
ThreadPoolExecutor newExecutor = ThreadPoolWrapper.create(config);
ExecutorService newExecutorService = ThreadPoolWrapper.wrap(config.getName(), newExecutor);
ThreadPoolExecutor originExecutor = THREAD_POOL_MAP.putIfAbsent(config.getName(), newExecutor);
if (Objects.nonNull(originExecutor)) {
newExecutorService.shutdown();
return originExecutor;
} else {
EXECUTOR_MAP.putIfAbsent(config.getName(), newExecutorService);
return newExecutor;
}
});
QMonitor.recordSize("thread_active_" + config.getName(), executor.getActiveCount());
int size = executor.getQueue().size();
QMonitor.recordMany(config.getName() + "_queue_size", 1, size);
return EXECUTOR_MAP.get(config.getName());
}
下面分别是create()
创建方法和wrap
修饰方法,其中wrap
修饰包括了Qtracer的修饰和MetricThreadPoolExecutor的修饰。
QTracer是我们的链路工具,为了在使用线程池的时候子线程能使用父线程的trace,我们通过QTracer的wrap方法对线程池进行修饰,让线程池带有trace上下文。
public static ThreadPoolExecutor create(ThreadPoolConfig config) {
return create(config.getName(), config.getCore(), config.getMax(), config.getQueueSize(), new RejectedAbortPolicy());
}
public static ThreadPoolExecutor create(String name, int core, int max, int queues, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> workQueue = queues == 0 ? new SynchronousQueue<>() : new LimitedBlockingQueueWrapper<>(queues);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(core, max, KEEP_ALIVE_TIME, TimeUnit.SECONDS, workQueue, new NamedThreadFactory(name), rejectedExecutionHandler);
//启动核心线程
startCoreThread(threadPoolExecutor);
return threadPoolExecutor;
}
public static ExecutorService wrap(String threadPoolName, ThreadPoolExecutor threadPool) {
return MoreExecutors.listeningDecorator(QTracer.wrap(MetricThreadPoolExecutor.wrap(threadPoolName, threadPool)));
}
MetricThreadPoolExecutor在任务提交的时候对Runnable
和Callable
做了处理,包括线程间MDC的传递等。
public class MetricThreadPoolExecutor implements ExecutorService {
private final String threadPoolName;
private final ExecutorService delegate;
public static MetricThreadPoolExecutor wrap(String threadPoolName, ExecutorService delegate) {
return new MetricThreadPoolExecutor(threadPoolName, delegate);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(EnhancedCallable.wrap(threadPoolName, task));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate.submit(EnhancedRunnable.wrap(threadPoolName, task), result);
}
}
自定义的EnhancedRunnable
,对MDC处理,Callable同理
public class EnhancedRunnable extends NamedRunnable {
private final Map mdcContext = MDC.getCopyOfContextMap();
...
@Override
protected void doRun() {
long start = System.currentTimeMillis();
try {
if (mdcContext != null) {
MDC.setContextMap(mdcContext);
}
ThreadMetrics.threadWaitTime(super.getThreadName(), createTime);
super.doRun();
} catch (Exception e) {
// 防止异常没有被catch而被线程吞掉
ThreadMetrics.errorFor(super.getThreadName());
throw e;
} finally {
ThreadMetrics.threadExecuteTime(super.getThreadName(), start);
ThreadMetrics.threadTotalTime(super.getThreadName(), createTime);
MDC.clear();
}
}
}
可以看到,我们的线程池组件无法对我们上面系统创建的InheritableThreadLocal
做传递处理。
解决方案
我们可以使用阿里的开源组件TransmittableThreadLocal
,从而实现线程池中传递线程变量。
TransmittableThreadLocal:在使用线程池等会池化复用线程的执行组件情况下,提供
ThreadLocal
值的传递功能,解决异步执行时上下文传递的问题。
JDK
的InheritableThreadLocal
类可以完成父线程到子线程的值传递。但对于使用线程池等会池化复用线程的执行组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;这时父子线程关系的ThreadLocal
值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时的ThreadLocal
值传递到 任务执行时。本库提供的
TransmittableThreadLocal
类继承并加强InheritableThreadLocal
类,解决上述的问题。
这个组件提供三种使用方法:
- 创建线程传递:类似
InheritableThreadLocal
,不使用线程池,在创建子线程的时候由父线程传递。 - 在线程池中传递:
- 通过
TtlRunnable
和TtlCallable
修饰传入线程池的Runnable
和Callable
。 - 通过
TtlExecutors
修饰接口Executor
,ExecutorService
,ScheduledExecutorService
。
- 通过
- 通过Java Agent修饰JDK线程池实现类。
由于我们多个系统统一使用了线程池组件,而且系统内已有多个线程变量,所以我们采用第二个方法通过TtlExecutors
修饰接口ExecutorService
,代码如下所示:
ExecutorService executorService = ...
// 额外的处理,生成修饰了的对象executorService
executorService = TtlExecutors.getTtlExecutorService(executorService);
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// =====================================================
// 在父线程中设置
context.set("value-set-in-parent");
Runnable task = new RunnableTask();
Callable call = new CallableTask();
executorService.submit(task);
executorService.submit(call);
// =====================================================
// Task或是Call中可以读取,值是"value-set-in-parent"
String value = context.get();
思考
为什么在更改核心线程数后才发现此问题?
虽然我们使用了InheritableThreadLocal
,可以实现将父线程的线程变量传递给子线程,但我们的线程池组件并没有对线程变量传递做任何处理,也就是说我们系统内的InheritableThreadLocal
一直是失效状态,那为什么之前一直没发现此问题呢?
可以看获取线程变量的代码,在我们从线程变量TL_APPCODE
中没有取到值的情况下,会从MDC中取值兜底。
/**
* 获取 appcode
*/
public static String getAppcode() {
String result = TL_APPCODE.get();
if (StringUtils.isBlank(result)) {
result = MDC.get(NM_APPCODE);
}
return result;
}
MDC (SLF4J javadoc) 是 Mapped Diagnostic Context 的缩写,在 SLF4J 日志框架中,它是一种提供线程级别的日志记录上下文的机制。MDC 可以用来存储每个线程独有的信息,这些信息可以在整个线程执行的过程中随时被日志语句访问和记录。
使用 MDC 的主要目的是为了在多线程环境下,能够方便地将相关的日志信息关联起来。例如,在处理Web请求时,你可能希望将所有与特定请求相关的日志条目都标记上一个唯一的请求ID。通过使用 MDC,你可以在请求开始时将请求ID放入 MDC,然后在整个请求处理过程中的任何地方,都可以在日志消息中包含这个请求ID。
如果只使用MDC,还是需要我们从父线程传递到子线程,而我们的线程池组件已经对MDC做了传递处理。
public class EnhancedRunnable extends NamedRunnable {
private final Map mdcContext = MDC.getCopyOfContextMap();
...
@Override
protected void doRun() {
long start = System.currentTimeMillis();
try {
if (mdcContext != null) {
MDC.setContextMap(mdcContext);
}
...
总结:
我们自定义的线程变量InheritableThreadLocal
只有在用户请求阶段(dubbo用户线程)才会赋值。
当我们系统启动时,主线程(非dubbo用户线程)创建了一批核心子线程,核心子线程从主线程获取到的线程变量为空。这些核心子线程永远取不到线程变量。子线程会使用MDC的线程变量。
当 使用线程数超过核心线程数 或 直接更改核心线程数,我们的主线程(dubbo用户线程)有赋值线程变量,主线程创建的核心子线程能从主线程获取到线程变量。这些核心子线程永远取到当初创建它的父线程的线程变量。子线程使用错误的线程变量。
由于用户请求总数较少,单机核心线程配置数比较大,机器数比较多且负载均衡,一般情况下使用线程数不会超过核心线程数,而且我们很少改动核心线程数,因此一直没有发现。