一、背景及JADE介绍
秒送是健康即时零售业务新的核心流量场域,面对京东首页高流量曝光,我们对频道页整个技术架构方案进行升级,保障接口高性能、系统高可用。
动态线程池是频道应用的技术之一,我们通过3轮高保真压测最终初步确定了线程池的核心参数。但我们仍 面临一些保障系统稳定性问题:如何监控线程池运行状态?以及因流量飙升出现任务堆积和拒绝时能否实时报警,线程池核心参数能否做到不重启应用,动态调整即时生效?
经调研,业界成熟的动态线程池开源项目有 dynamic-tp 和 hippo4j,在京东内部应用比较广泛的方案是 JADE ,几种方案实现思路大致相同,感兴趣可自行了解。JADE 是由零售中台-研发架构组维护的项目,动态线程池是JADE的组件之一,其稳定性已得到广泛验证( 集团应用 300+,零售交易服务中台应用 250+ ,其中 0 级应用 130+),与JADE相辅相成的还有万象平台:是可视化的JADE管理端,集成配置、监控、审批等能力的JADE可视化平台,可以更高效的使用JADE组件,进一步提高工作效率。
实现效果
接入JADE和万象后,秒送线程池秒级监控效果如下: 实时监控线程池运行状态 以及 阈值报警。

下面我们从实践到原理一探究竟。
二、JADE动态线程池+万象可视化平台接入实践
JADE动态线程池和万象整体流程图如下:应用中需要引入 JADE、DUCC和 PFinder SDK,通过 JADE创建线程池,线程池核心参数通过 万象平台配置,集成 DUCC 实现动态调参,即时生效。线程池运行状态监控通过 PFinder 实现秒级监控。

1、引入JADE POM依赖,jade从1.2.4版本开始支持万象
com.jd.jade jade 1.2.4 com.jd.pfinder pfinder-profiler-sdk 1.1.5-FINAL com.thoughtworks.xstream xstream 1.4.19 com.jd.purchase.config dbconfig-client-api 1.0.8
2、创建
jade.properties
配置文件,并通过
Spring
加载该配置文件。
# 万象平台环境配置 jade.wx.env=pre # 以下为调试设置,线上环境无需配置 jade.log.level=debug jade.meter.debug-enabled=true
Spring加载
JADE配置文件
classpath:jade.properties UTF-8
3、 配置JADE启动类,负责 JADE 自定义初始化 。
JDOS
应用默认创建一个
DUCC
空间,使用万象的
DUCC
进行配置和更新。/**
* @description:JADE配置类
* @author: rongtao7
* @date: 2024/4/5 1:09 下午
*/@Configurationpublic class JadeConfig {
@Value("ucc://${ducc.application}:${ducc.token}@${ducc.hostPort}/v1/namespace/${ducc.namespace}/config/${ducc.config}/profiles/${ducc.profile}?longPolling=15000")
private String duccUrl;
@Value("${jade.wx.env}")
private String wxEnv;
@Bean
public InitializeBean jadeInitBean() {
InitializeBean initializeBean = new InitializeBean();
// 注意这里,如果 uri 中 config 不是命名为 jade,则 name 属性需要设置为 jade
ConfiguratorManager instance = new ConfiguratorManager();
instance.addResource("jade", duccUrl);
initializeBean.setConfigServiceProvider(instance);
// 万象环境
initializeBean.setWxEnv(wxEnv);
return initializeBean;
}}
4、使用
JADE
创建线程池,并通过
PFinder包装增强以支持
trace的传递
prestart()
用于预热核心线程/**
* 线程池配置类,集成JADE和万象平台
*/@Configurationpublic class TaskExecutePoolConfig {
/**
* 秒送频道线程池
*/
@Bean
public ExecutorService msChannelPagePool(){
//JADE组件创建线程池
ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutorBuilder.newBuilder()
.name(ThreadPoolName.MS_CHANNEL_PAGE_POOL.name()) // 线程池名称
.core(200) // 核心线程数
.max(200) // 最大线程数
.queue(100) // 设置队列长度,此队列支持动态调整
.callerRuns() // 拒绝策略,内置监控、日志
.keepAliveTime(60L, TimeUnit.SECONDS) //线程存活时间
.prestart() // 预初始化所有核心线程数
.build();
// Pfinder增强
return PfinderContext.executorServiceWrapper(threadPoolExecutor);
}}
5、万象平台接入
1)创建万象环境:第一次接入需要创建预发和生产环境。

2)创建万象线程池组件

6、验证效果
update executor 'MS_CHANNEL_PAGE_POOL' corePoolSize from 500 to 50update executor 'MS_CHANNEL_PAGE_POOL' maxPoolSize from 500 to 200update executor 'MS_CHANNEL_PAGE_POOL' keepAliveTime from 60 to 120 in seconds update executor 'MS_CHANNEL_PAGE_POOL' queueCapacity from 100 to 90



以上几步操作,就完成了JADE和万象的动态线程池接入。下面从源码角度浅析一下原理。
三、原理源码浅析
动态线程池 的
核心本质 是对
JDK的
ThreadPoolExecutor包装增强,集成
UMP、
PFinder、
Ducc、
万象平台,以实现线程池的可视化管理、动态调参、监控报警能力。
线程池参数如何实现变更呢 ?
线程池有4个关键参数,即:
核心线程数、
最大线程数、
队列大小、
存活时间4个。
JDK ThreadPoolExecutor 提供了
setCorePoolSize 、
setMaximumPoolSize 和
setKeepAliveTime 支持更新参数。
capacity
是不支持修改的,其使用
private final 修饰。JADE是通过
ResizeableLinkedBlockingQueue 实现队列长度可变,实现方式是继承
LinkedBlockingQueue,
通过反射修改队列长度。下面是JADE动态线程池简易原理图:

从万象平台
更新参数开始,万象会将配置数据
保存到
MySQL数据库中,并通过
发布操作将更新的配置
推送到JADE的
DUCC集成模块
DuccConfigService ,
Linstener 监听到配置变更后调用
ThreadPoolExecutorUpdater 更新线程池参数,更新参数是通过继承
JDK 的
ThreadPoolExecutor 实现更新,以及通过
ResizeableLinkedBlockingQueue 修改队列长度。
JADE线程池监控能力通过
Meter监控点 及
MeterRegistry监控工厂集成
PFinder和
UMP实现。
了解基础原理后,从 JADE配置类初始化过程 及 线程池创建过程,分别看一下源码实现。
> JADE
配置类初始化过程 - 源码探究
JADE
InitBeanBase 注入了
Spring容器,并利用
Spring
InitializingBean
afterPropertiesSet() 执行自定义初始化逻辑。

JADE 自定义初始化逻辑 总共有8个初始化步骤,我们只需要关注其中几个即可。
public abstract class InitBeanBase implements InitializingBean, ApplicationContextAware, ApplicationListener{ @Override public void afterPropertiesSet() throws Exception { log.info("jade init begin"); //1.读取配置文件,设置@Val属性值 initProperties(); //2.初始化日志级别 initLogLevel(); //3.初始化零售DBConfig initDbConfig(); //4.初始化DUCC initConfig(); //5.初始化万象配置 initWX(); //6.初始化 jvm ump key initUmps(); //7.初始化PFinderMeterRegistry监控工厂 initMeter(); //8.初始化JSF监听注册 JSF POOL initJsf(); UST.record(getClass()); log.info("jade init end"); }}
1、initProperties()用于读取jade.properties配置文件,设置@Val属性值
jade.properties配置文件,
名字不可变,否则获取不到。public final class JadeConfigs {
//从根目录读取 jade.properties
private static synchronized Config initConfig() {
//略...
Object cfg = Thread.currentThread().getContextClassLoader().getResourceAsStream("jade.properties");
}}
@Val注解标注的属性设置值,如果
jade.properties配置了则使用配置的,否则使用默认值。public abstract class InitBeanBase implements InitializingBean, ApplicationContextAware, ApplicationListener{ //为@Val注解标注的属性设置值 private void parseSpringValue(Config cfg) { //Spring PropertyPlaceholderHelper:解析和替换占位符的工具 PropertyPlaceholderHelper helper = new PropertyPlaceholderHelper("${", "}", ":", true); //反射获取所有字段 for (Field f : FieldUtils.getAllFields(getClass())) { f.setAccessible(true); if (f.get(this) != null) { // may set explicitly continue; } //获取 @Val 注解 Val valAnno = f.getAnnotation(Val.class); if (valAnno != null && StringUtils.isNotEmpty(valAnno.value())) { try { //从Config(jade.properties) 配置文件读取属性值,没有则为默认值。 String actualVal = helper.replacePlaceholders(valAnno.value(), k -> { String v = cfg.getString(k); if (v == null) { v = applicationContext.getEnvironment().getProperty(k); } return v; }); if (actualVal != null) { Function parser = TYPE_PARSERS.get(f.getType()); if (parser != null) { Object parsedVal = parser.apply(actualVal); f.set(this, parsedVal); } } } catch (Exception e) { log.error("parse field {} error", f.getName()); throw e; } } } }}
2、initConfig()初始化配置类中的jade配置的ducc,如果不集成万象,则使用这个ducc配置。使用万象,则使用万象平台配置的ducc。
3、initWX()
初始化万象平台配置。
Ducc空间;2.启动监听;3.拉取配置更新
JADE组件class WXInit {
//万象初始化
private void init0() {
//1.万象默认的DUCC配置
String duccHost = DuccResource.getDefautHost();
Config config = JadeConfigs.getConfig();
String app = config.getString("jade.wx.app", "jdos_wxbizapps");
String token = config.getString("jade.wx.token", getDefaultDuccToken(duccHost));
String ns = config.getString("jade.wx.ns", "wxbizapps");
String cfg = config.getString("jade.wx.cfg", Env.getAppName());
if (failOrLog(cfg, "jade.wx.cfg")) {
return;
}
String env = initBean.getWxEnv();
if (StringUtils.isEmpty(env)) {
env = config.getString("jade.wx.env");
}
if (failOrLog(env, "jade.wx.env")) {
return;
}
String currentApp = Env.getDeployAppName();
if (failOrLog(currentApp, "current app name")) {
return;
}
//DUCC URL拼接
String url = String.format(DuccResource.URL_FORMAT, app, token, duccHost, ns, cfg, env, 1000 * 60, isRequired());
log.info("connect to wanxiang via {}", url); // TODO: mark token
//Resource Name jade-wx
String resxName = "jade-wx";
ConfiguratorManager cm = new ConfiguratorManager();
cm.setApplication(currentApp);
cm.addResource(resxName, url);
cm.start();
//2.启动监听Ducc jade-wx
ConfigService configService = new DuccConfigService(cm);
//3.从万象平台拉配置更新JADE组件
configService.getConfig(resxName, JadeConfig.class); // TODO: not found, throws?
UST.record(getClass());
}}
DUCC 调用
DuccConfigService init()初始化方法public class DuccConfigService implements ConfigService {
//构造方法,注入DUCC ConfiguratorManager
public DuccConfigService(@NonNull ConfiguratorManager configuratorManager) {
if (configuratorManager == null) {
throw new NullPointerException("configuratorManager is marked non-null but is null");
} else {
//初始化
this.init(configuratorManager);
}
}}
init()初始化方法中会启动万象DUCC的线程,并添加监听事件,监听Resource name 为
jade-wx的变化,变化后的回调函数通过
DuccConfigService.this.updateConfig(configuration)用来更新
JADE组件//初始化方法private void init(ConfiguratorManager configuratorManager) {
try {
this.configuratorManager = configuratorManager;
//1.启动Ducc线程
if (!configuratorManager.isStarted()) {
if (StringUtils.isNotEmpty(Env.getDeployAppName())) {
System.setProperty("application.name", Env.getDeployAppName());
}
configuratorManager.start();
}
List resources = DuccUtil.getResources(configuratorManager);
Iterator var3 = resources.iterator();
while(var3.hasNext()) {
final Resource resource = (Resource)var3.next();
//2.Ducc添加监听事件,Name是:jade-wx
configuratorManager.addListener(new ConfigurationListener() {
public String getName() {
return resource.getName();
}
//回调函数更新JADE组件
public void onUpdate(Configuration configuration) {
DuccConfigService.this.updateConfig(configuration);
}
});
}
UST.record(this.getClass());
} catch (Throwable var5) {
throw var5;
}}
DuccConfigService更新方法调用
JadeConfig 的
init()方法,根据万象平台配置更新
JADE各个组件,包括动态线程池。public class JadeConfig implements JadeConfigSupport, InitializingObject {
public static void init(JadeConfigSupport cfg) {
//JADE-日志组件 更新
//JADE-动态线程池组件 更新
ThreadPoolExecutorUpdater.update(cfg.getExecutorConfig());
//JADE-本地缓存组件 更新
//....
}}
5、ThreadPoolExecutorUpdater
更新线程池参数核心类
JDK ThreadPoolExecutor 实现更新的。prestartAllCoreThreads() 对核心线程进行
预热,所以不必担心调大核心线程数后发生的“抖动”问题(实际是创建线程的开销)。 core和
max是一起更新的,否则可能会导致更改不生效的问题。
ThreadPoolExecutorUpdater 更新线程池主要有以下5个步骤。
updatePoolSize更新核心、最大线程数,注意需要一起同步更新,否则可能导致更新失败问题setKeepAliveTime更新KeepAliveTime存活时间setCapacity 反射修改队列容量prestartAllCoreThreads() 预热核心线程数updateRejectSetting() 更新拒绝策略private static void update0(ExecutorConfigSupport.ExecutorSetting executorSetting, ThreadPoolExecutor executor) {
//1.更新核心、最大线程数,注意需要一起同步更新,否则可能导致更新失败问题
updatePoolSize(executorSetting, executor);
//2.更新KeepAliveTime存活时间
if (executorSetting.getKeepAliveSeconds() != null && executorSetting.getKeepAliveSeconds() != executor.getKeepAliveTime(TimeUnit.SECONDS)) {
executor.setKeepAliveTime(executorSetting.getKeepAliveSeconds(), TimeUnit.SECONDS);
}
//3.更新队列
if (executorSetting.getQueueCapacity() != null) {
if (executor.getQueue() instanceof LinkedBlockingQueue) {
LinkedBlockingQueue currentQueue = (LinkedBlockingQueue) executor.getQueue();
int currentQueueCapacity = ResizableLinkedBlockingQueue.getCapacity(currentQueue);
if (executorSetting.getQueueCapacity() > 0 && executorSetting.getQueueCapacity() != currentQueueCapacity) {
//反射修改队列数量,signalNotFull
ResizableLinkedBlockingQueue.setCapacity(currentQueue, executorSetting.getQueueCapacity());
} else if (executorSetting.getQueueCapacity() == 0) {
//调整队列数量为0,注意丢任务风险。
if (BooleanUtils.isTrue(executorSetting.getForceResizeQueue())) {
setWorkQueue(executor, new SynchronousQueue());
} else {
// log
}
}
}
//else 省略
}
//4.预热核心线程数
if (BooleanUtils.toBoolean(executorSetting.getPrestartAllCoreThreads()) && executor.getPoolSize() < executor.getCorePoolSize()) {
int threads = executor.prestartAllCoreThreads();
}
//5.更新拒绝策略
updateRejectSetting(executorSetting, executor);}
ResizableLinkedBlockingQueue反射实现。//可动态调整容量的 BlockingQueue //HACK: 内部直接继承自 LinkedBlockingQueue,通过反射修改其 private final capacity 字段public class ResizableLinkedBlockingQueueextends LinkedBlockingQueue { //反射设置队列大小 static void setCapacity(LinkedBlockingQueue queue, int capacity) { int oldCapacity = getCapacity(queue); FieldUtils.writeField(queue, FN_CAPACITY, capacity, true); int size = queue.size(); //如果队列中的任务已经达到老队列容量限制,并且新的容量大于队列任务数 if (size >= oldCapacity && capacity > size) { // thanks to https://www.cnblogs.com/thisiswhy/p/15457810.html MethodUtils.invokeMethod(queue, true, "signalNotFull"); } }}
signalNotFull发出队列非满通知,唤醒阻塞线程,可以继续向队列插入任务了。
> 创建JADE线程池build()- 源码探究
以下是我们通过 JADE ThreadPoolExecutorBuilder 创建线程池的 Bean,核心逻辑在 build() 封装。
/**
* 秒送频道页线程池
*/@Beanpublic ExecutorService msChannelPagePool(){
ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutorBuilder.newBuilder()
.name(ThreadPoolName.MS_CHANNEL_PAGE_POOL.name()) // 线程池名称
.core(200) // 核心线程数
.max(200) // 最大线程数
.queue(1024) // 设置队列长度,此队列支持动态调整
.callerRuns() // 快捷设置拒绝策略为丢弃,内置监控、日志
.keepAliveTime(60L, TimeUnit.SECONDS) //线程存活时间
.prestart() // 预初始化所有核心线程数
.build();
return PfinderContext.executorServiceWrapper(threadPoolExecutor);}
build() 主要逻辑有3步,1.创建线程池 ,2.启动所有核心线程, 3.注册线程池监控点public abstract class AbstractExecutorBuilder {
public synchronized E build() {
//1.创建线程池
this.executor = createExecutor();
//2.启动所有核心线程
if (this.prestartAllCoreThreads) {
executor.prestartAllCoreThreads();
}
//3.创建监控
initMonitor();
return this.executor;
}}
initMonitor()创建PFinder线程池监控,即 活跃线程数、核心/最大线程数,队列数量等。格式为:
executor.线程池名.activeCount. (注意线程池一定要有名字)gauge() 方法内部集成
PFinder,使用代码编程的方式进行
Gauge埋点,用于记录线程池的瞬时值指标:活动线程数、核心/最大、队列大小等。PFinder埋点方式详见PFinder文档。public abstract class MeterRegistry{ public List gaugeExecutor(String executorName, ThreadPoolExecutor executor) { String namePrefix = "executor." + executorName; return gaugeExecutor0(namePrefix, executor); } private List gaugeExecutor0(String namePrefix, ThreadPoolExecutor executor) { namePrefix += "."; List gauges = new ArrayList<>(); if (getConfig().isThreadPoolAllMetricsEnabled()) { gauges.add(gauge(namePrefix + "taskCount", executor::getTaskCount)); gauges.add(gauge(namePrefix + "completedTaskCount", executor::getCompletedTaskCount)); } gauges.add(gauge(namePrefix + "activeCount", executor::getActiveCount)); gauges.add(gauge(namePrefix + "corePoolSize", executor::getCorePoolSize)); gauges.add(gauge(namePrefix + "maxPoolSize", executor::getMaximumPoolSize)); gauges.add(gauge(namePrefix + "poolSize", executor::getPoolSize)); gauges.add(gauge(namePrefix + "queueSize", () -> executor.getQueue().size())); // return gauges; }}
四、避坑指南
prestartAllCoreThreads() 预热核心线程。Future、
CompletableFuture异步任务使用线程池时设置合理的超时时间,避免因外部服务故障或网络等问题导致任务长时间阻塞,造成资源浪费,严重甚至拖垮整个线程池,导致线上问题。