秒送 JADE动态线程池实践及原理浅析

一、背景及JADE介绍

秒送是健康即时零售业务新的核心流量场域,面对京东首页高流量曝光,我们对频道页整个技术架构方案进行升级,保障接口高性能、系统高可用。

动态线程池是频道应用的技术之一,我们通过3轮高保真压测最终初步确定了线程池的核心参数。但我们仍 面临一些保障系统稳定性问题:如何监控线程池运行状态?以及因流量飙升出现任务堆积和拒绝时能否实时报警,线程池核心参数能否做到不重启应用,动态调整即时生效?

经调研,业界成熟的动态线程池开源项目有 dynamic-tphippo4j,在京东内部应用比较广泛的方案是 JADE ,几种方案实现思路大致相同,感兴趣可自行了解。JADE 是由零售中台-研发架构组维护的项目,动态线程池是JADE的组件之一,其稳定性已得到广泛验证( 集团应用 300+,零售交易服务中台应用 250+ ,其中 0 级应用 130+),与JADE相辅相成的还有万象平台:是可视化的JADE管理端,集成配置、监控、审批等能力的JADE可视化平台,可以更高效的使用JADE组件,进一步提高工作效率。

实现效果

接入JADE和万象后,秒送线程池秒级监控效果如下: 实时监控线程池运行状态 以及 阈值报警





下面我们从实践到原理一探究竟。

二、JADE动态线程池+万象可视化平台接入实践

JADE动态线程池和万象整体流程图如下:应用中需要引入 JADE、DUCC和 PFinder SDK,通过 JADE创建线程池,线程池核心参数通过 万象平台配置,集成 DUCC 实现动态调参,即时生效。线程池运行状态监控通过 PFinder 实现秒级监控。







1、引入JADE POM依赖,jade从1.2.4版本开始支持万象


    com.jd.jade
    jade
    1.2.4
    com.jd.pfinder
    pfinder-profiler-sdk
    1.1.5-FINAL
    com.thoughtworks.xstream
    xstream
    1.4.19
    com.jd.purchase.config
    dbconfig-client-api
    1.0.8

2、创建 jade.properties 配置文件,并通过 Spring 加载该配置文件

注意名字不能修改,JADE初始化会从该命名文件中加载配置属性
# 万象平台环境配置
jade.wx.env=pre
# 以下为调试设置,线上环境无需配置
jade.log.level=debug
jade.meter.debug-enabled=true
Spring加载 JADE配置文件

    
        
            
            classpath:jade.properties
        
    
    
        UTF-8
    

3、 配置JADE启动类,负责 JADE 自定义初始化

如果不集成万象平台,则可以使用配置的DUCC空间配置和修改线程池参数。
【推荐】如果使用万象,万象会为 JDOS 应用默认创建一个 DUCC 空间,使用万象的 DUCC 进行配置和更新。
/**
 * @description:JADE配置类
 * @author: rongtao7
 * @date: 2024/4/5 1:09 下午
 */@Configurationpublic class JadeConfig {
    @Value("ucc://${ducc.application}:${ducc.token}@${ducc.hostPort}/v1/namespace/${ducc.namespace}/config/${ducc.config}/profiles/${ducc.profile}?longPolling=15000")
    private String duccUrl;
    @Value("${jade.wx.env}")
    private String wxEnv;
    @Bean
    public InitializeBean jadeInitBean() {
        InitializeBean initializeBean = new InitializeBean();
        // 注意这里,如果 uri 中 config 不是命名为 jade,则 name 属性需要设置为 jade
        ConfiguratorManager instance = new ConfiguratorManager();
        instance.addResource("jade", duccUrl);
        initializeBean.setConfigServiceProvider(instance);
        // 万象环境
        initializeBean.setWxEnv(wxEnv);
        return initializeBean;
    }}

4、使用 JADE 创建线程池,并通过 PFinder包装增强以支持 trace的传递

prestart() 用于预热核心线程
/**
 * 线程池配置类,集成JADE和万象平台
 */@Configurationpublic class TaskExecutePoolConfig {
    /**
     * 秒送频道线程池
     */
    @Bean
    public ExecutorService msChannelPagePool(){
        //JADE组件创建线程池
        ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutorBuilder.newBuilder()
                .name(ThreadPoolName.MS_CHANNEL_PAGE_POOL.name()) // 线程池名称
                .core(200) // 核心线程数
                .max(200) // 最大线程数
                .queue(100) // 设置队列长度,此队列支持动态调整
                .callerRuns() // 拒绝策略,内置监控、日志
                .keepAliveTime(60L, TimeUnit.SECONDS) //线程存活时间
                .prestart() // 预初始化所有核心线程数
                .build();
        //  Pfinder增强
        return PfinderContext.executorServiceWrapper(threadPoolExecutor);
    }}

5、万象平台接入

1)创建万象环境:第一次接入需要创建预发和生产环境。





2)创建万象线程池组件





6、验证效果

线程池参数动态变更 - 万象,更新后可观测到如下日志,说明修改成功
update executor 'MS_CHANNEL_PAGE_POOL' corePoolSize from 500 to 50update executor 'MS_CHANNEL_PAGE_POOL' maxPoolSize from 500 to 200update executor 'MS_CHANNEL_PAGE_POOL' keepAliveTime from 60 to 120 in seconds
update executor 'MS_CHANNEL_PAGE_POOL' queueCapacity from 100 to 90
线程池监控 - PFinder,key格式为:executor.线程池名称.线程池状态(活跃/核心/最大线程数、队列大小、拒绝任务数)
注:应用需开启pfinder监控并且PFinder SDK 要和 agent版本兼容







线程池任务RT监控 & 线程池状态监控:







线程池队列参数配置异常报警:


以上几步操作,就完成了JADE和万象的动态线程池接入。下面从源码角度浅析一下原理。



三、原理源码浅析

动态线程池 核心本质 是对 JDKThreadPoolExecutor包装增强,集成 UMPPFinderDucc万象平台,以实现线程池的可视化管理、动态调参、监控报警能力。

线程池参数如何实现变更呢

线程池有4个关键参数,即: 核心线程数最大线程数队列大小存活时间4个。

核心、最大线程数、存活时间3个参数通过 JDK ThreadPoolExecutor 提供了 setCorePoolSizesetMaximumPoolSizesetKeepAliveTime 支持更新参数。
但队列长度 capacity 是不支持修改的,其使用 private final 修饰。JADE是通过 ResizeableLinkedBlockingQueue 实现队列长度可变,实现方式是继承 LinkedBlockingQueue通过反射修改队列长度

下面是JADE动态线程池简易原理图





从万象平台 更新参数开始,万象会将配置数据 保存MySQL数据库中,并通过 发布操作将更新的配置 推送到JADE的 DUCC集成模块 DuccConfigService Linstener 监听到配置变更后调用 ThreadPoolExecutorUpdater 更新线程池参数,更新参数是通过继承 JDK ThreadPoolExecutor 实现更新,以及通过 ResizeableLinkedBlockingQueue 修改队列长度。

JADE线程池监控能力通过 Meter监控点 及 MeterRegistry监控工厂集成 PFinderUMP实现。

了解基础原理后,从 JADE配置类初始化过程线程池创建过程,分别看一下源码实现。

> JADE 配置类初始化过程 - 源码探究

JADE InitBeanBase 注入了 Spring容器,并利用 Spring InitializingBean afterPropertiesSet() 执行自定义初始化逻辑。









JADE 自定义初始化逻辑 总共有8个初始化步骤,我们只需要关注其中几个即可。

public abstract class InitBeanBase implements InitializingBean, ApplicationContextAware, ApplicationListener {
    @Override
    public void afterPropertiesSet() throws Exception {
        log.info("jade init begin");
        //1.读取配置文件,设置@Val属性值
        initProperties();
        //2.初始化日志级别
        initLogLevel();
        //3.初始化零售DBConfig
        initDbConfig();
        //4.初始化DUCC
        initConfig();
        //5.初始化万象配置
        initWX();
        //6.初始化 jvm ump key
        initUmps();
        //7.初始化PFinderMeterRegistry监控工厂
        initMeter();
        //8.初始化JSF监听注册 JSF POOL
        initJsf();
        UST.record(getClass());
        log.info("jade init end");
    }}

1、 initProperties()用于读取 jade.properties配置文件,设置@Val属性值
从根目录读取 jade.properties配置文件, 名字不可变,否则获取不到。
public final class JadeConfigs {
    //从根目录读取 jade.properties
    private static synchronized Config initConfig() {
        //略...
        Object cfg = Thread.currentThread().getContextClassLoader().getResourceAsStream("jade.properties");
    }}
为Bean的 @Val注解标注的属性设置值,如果 jade.properties配置了则使用配置的,否则使用默认值。
public abstract class InitBeanBase implements InitializingBean, ApplicationContextAware, ApplicationListener {
    //为@Val注解标注的属性设置值
    private void parseSpringValue(Config cfg) {
    
        //Spring PropertyPlaceholderHelper:解析和替换占位符的工具
        PropertyPlaceholderHelper helper = new PropertyPlaceholderHelper("${", "}", ":", true);
        
        //反射获取所有字段
        for (Field f : FieldUtils.getAllFields(getClass())) {
            f.setAccessible(true);
            if (f.get(this) != null) { // may set explicitly
                continue;
            }
            
            //获取 @Val 注解
            Val valAnno = f.getAnnotation(Val.class);
            if (valAnno != null && StringUtils.isNotEmpty(valAnno.value())) {
                try {
                
                    //从Config(jade.properties) 配置文件读取属性值,没有则为默认值。
                    String actualVal = helper.replacePlaceholders(valAnno.value(), k -> {
                        String v = cfg.getString(k);
                        if (v == null) {
                            v = applicationContext.getEnvironment().getProperty(k);
                        } 
                        return v;
                    });
                    if (actualVal != null) {
                        Function parser = TYPE_PARSERS.get(f.getType());
                        if (parser != null) {
                            Object parsedVal = parser.apply(actualVal);
                            f.set(this, parsedVal);
                        } 
                    }
                } catch (Exception e) {
                    log.error("parse field {} error", f.getName());
                    throw e;
                }
            }
        }
    }}

2、initConfig() 初始化配置类中的 jade配置的 ducc,如果不集成万象,则使用这个 ducc配置。使用万象,则使用万象平台配置的 ducc
代码与万象初始化逻辑相同,参考下面的即可。



3、initWX() 初始化万象平台配置。
万象初始化流程主要有3步骤:1.拼接使用万象默认配置的 Ducc空间;2.启动监听;3.拉取配置更新 JADE组件
万象的默认Ducc空间格式为:通过应用名和环境Env的拼接:{ns: wxbizapps} {appName: diansong} {env: pre}
class WXInit {
    //万象初始化
    private void init0() {
    
        //1.万象默认的DUCC配置
        String duccHost = DuccResource.getDefautHost();
        Config config = JadeConfigs.getConfig();
        String app = config.getString("jade.wx.app", "jdos_wxbizapps");
        String token = config.getString("jade.wx.token", getDefaultDuccToken(duccHost));
        String ns = config.getString("jade.wx.ns", "wxbizapps");
        String cfg = config.getString("jade.wx.cfg", Env.getAppName());
        if (failOrLog(cfg, "jade.wx.cfg")) {
            return;
        }
        String env = initBean.getWxEnv();
        if (StringUtils.isEmpty(env)) {
            env = config.getString("jade.wx.env");
        }
        if (failOrLog(env, "jade.wx.env")) {
            return;
        }
        String currentApp = Env.getDeployAppName();
        if (failOrLog(currentApp, "current app name")) {
            return;
        }
        //DUCC URL拼接
        String url = String.format(DuccResource.URL_FORMAT, app, token, duccHost, ns, cfg, env, 1000 * 60, isRequired());
        log.info("connect to wanxiang via {}", url); // TODO: mark token
        
        //Resource Name jade-wx
        String resxName = "jade-wx";
        ConfiguratorManager cm = new ConfiguratorManager();
        cm.setApplication(currentApp);
        cm.addResource(resxName, url);
        cm.start();
        
        //2.启动监听Ducc jade-wx
        ConfigService configService = new DuccConfigService(cm);
        
        //3.从万象平台拉配置更新JADE组件
        configService.getConfig(resxName, JadeConfig.class); // TODO: not found, throws?
        
        UST.record(getClass());
    }}
启动监听 DUCC 调用 DuccConfigService init()初始化方法
public class DuccConfigService implements ConfigService {
    //构造方法,注入DUCC ConfiguratorManager
    public DuccConfigService(@NonNull ConfiguratorManager configuratorManager) {
        if (configuratorManager == null) {
            throw new NullPointerException("configuratorManager is marked non-null but is null");
        } else {
            //初始化
            this.init(configuratorManager);
        }
    }}



init()初始化方法中会启动万象DUCC的线程,并添加监听事件,监听Resource name 为 jade-wx的变化,变化后的回调函数通过 DuccConfigService.this.updateConfig(configuration)用来更新 JADE组件
//初始化方法private void init(ConfiguratorManager configuratorManager) {
    try {
        this.configuratorManager = configuratorManager;
        //1.启动Ducc线程
        if (!configuratorManager.isStarted()) {
            if (StringUtils.isNotEmpty(Env.getDeployAppName())) {
                System.setProperty("application.name", Env.getDeployAppName());
            }
            configuratorManager.start();
        }
        List resources = DuccUtil.getResources(configuratorManager);
        Iterator var3 = resources.iterator();
        while(var3.hasNext()) {
            final Resource resource = (Resource)var3.next();
            
            //2.Ducc添加监听事件,Name是:jade-wx
            configuratorManager.addListener(new ConfigurationListener() {
                public String getName() {
                    return resource.getName();
                }
                //回调函数更新JADE组件
                public void onUpdate(Configuration configuration) {
                    DuccConfigService.this.updateConfig(configuration);
                }
            });
        }
        UST.record(this.getClass());
    } catch (Throwable var5) {
        throw var5;
    }}

DuccConfigService更新方法调用 JadeConfiginit()方法,根据万象平台配置更新 JADE各个组件,包括动态线程池。
public class JadeConfig implements JadeConfigSupport, InitializingObject {
    public static void init(JadeConfigSupport cfg) {
        //JADE-日志组件 更新
        //JADE-动态线程池组件 更新
        ThreadPoolExecutorUpdater.update(cfg.getExecutorConfig());
        //JADE-本地缓存组件 更新
        //....
    }}

5、ThreadPoolExecutorUpdater 更新线程池参数核心类
核心、最大线程数、存活时间是通过继承 JDK ThreadPoolExecutor 实现更新的。
在核心类中,当调大核心线程数后,会调用 prestartAllCoreThreads() 对核心线程进行 预热,所以不必担心调大核心线程数后发生的“抖动”问题(实际是创建线程的开销)。
注意 coremax是一起更新的,否则可能会导致更改不生效的问题。

ThreadPoolExecutorUpdater 更新线程池主要有以下5个步骤。

updatePoolSize更新核心、最大线程数,注意需要一起同步更新,否则可能导致更新失败问题
setKeepAliveTime更新KeepAliveTime存活时间
setCapacity 反射修改队列容量
prestartAllCoreThreads() 预热核心线程数
updateRejectSetting() 更新拒绝策略
private static void update0(ExecutorConfigSupport.ExecutorSetting executorSetting, ThreadPoolExecutor executor) {
    //1.更新核心、最大线程数,注意需要一起同步更新,否则可能导致更新失败问题
    updatePoolSize(executorSetting, executor);
    //2.更新KeepAliveTime存活时间
    if (executorSetting.getKeepAliveSeconds() != null && executorSetting.getKeepAliveSeconds() != executor.getKeepAliveTime(TimeUnit.SECONDS)) {
        executor.setKeepAliveTime(executorSetting.getKeepAliveSeconds(), TimeUnit.SECONDS);
    }
    //3.更新队列
    if (executorSetting.getQueueCapacity() != null) {        
        if (executor.getQueue() instanceof LinkedBlockingQueue) {
            LinkedBlockingQueue currentQueue = (LinkedBlockingQueue) executor.getQueue();
            int currentQueueCapacity = ResizableLinkedBlockingQueue.getCapacity(currentQueue);
            if (executorSetting.getQueueCapacity() > 0 && executorSetting.getQueueCapacity() != currentQueueCapacity) {
                //反射修改队列数量,signalNotFull
                ResizableLinkedBlockingQueue.setCapacity(currentQueue, executorSetting.getQueueCapacity());
            } else if (executorSetting.getQueueCapacity() == 0) {
                //调整队列数量为0,注意丢任务风险。
                if (BooleanUtils.isTrue(executorSetting.getForceResizeQueue())) {
                    setWorkQueue(executor, new SynchronousQueue());
                } else {
                   // log    
                }
            }
        } 
        //else 省略
    }
    
    //4.预热核心线程数
    if (BooleanUtils.toBoolean(executorSetting.getPrestartAllCoreThreads()) && executor.getPoolSize() < executor.getCorePoolSize()) {
        int threads = executor.prestartAllCoreThreads();
    }
    //5.更新拒绝策略
    updateRejectSetting(executorSetting, executor);}



队列长度修改通过 ResizableLinkedBlockingQueue反射实现。
//可动态调整容量的 BlockingQueue //HACK: 内部直接继承自 LinkedBlockingQueue,通过反射修改其 private final capacity 字段public class ResizableLinkedBlockingQueue extends LinkedBlockingQueue {
    //反射设置队列大小
    static  void setCapacity(LinkedBlockingQueue queue, int capacity) {
        int oldCapacity = getCapacity(queue);
        FieldUtils.writeField(queue, FN_CAPACITY, capacity, true);
        int size = queue.size();
        //如果队列中的任务已经达到老队列容量限制,并且新的容量大于队列任务数
        if (size >= oldCapacity && capacity > size) {
            // thanks to https://www.cnblogs.com/thisiswhy/p/15457810.html
            MethodUtils.invokeMethod(queue, true, "signalNotFull");
        }
    }}
这里有一个细节,如果队列容量满了,当调整完队列数后,手动调用 signalNotFull发出队列非满通知,唤醒阻塞线程,可以继续向队列插入任务了。



> 创建 JADE 线程池 build() - 源码探究

以下是我们通过 JADE ThreadPoolExecutorBuilder 创建线程池的 Bean,核心逻辑在 build() 封装。

/**
 * 秒送频道页线程池
 */@Beanpublic ExecutorService msChannelPagePool(){
    ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutorBuilder.newBuilder()
            .name(ThreadPoolName.MS_CHANNEL_PAGE_POOL.name()) // 线程池名称
            .core(200) // 核心线程数
            .max(200) // 最大线程数
            .queue(1024) // 设置队列长度,此队列支持动态调整
            .callerRuns() // 快捷设置拒绝策略为丢弃,内置监控、日志
            .keepAliveTime(60L, TimeUnit.SECONDS) //线程存活时间
            .prestart() // 预初始化所有核心线程数
            .build();
    return PfinderContext.executorServiceWrapper(threadPoolExecutor);}
build() 主要逻辑有3步,1.创建线程池 ,2.启动所有核心线程, 3.注册线程池监控点
public abstract class AbstractExecutorBuilder {    
    public synchronized E build() {
        //1.创建线程池
        this.executor = createExecutor();
        //2.启动所有核心线程
        if (this.prestartAllCoreThreads) {
            executor.prestartAllCoreThreads();
        }
        //3.创建监控
        initMonitor();
        return this.executor;
     }}
initMonitor()创建PFinder线程池监控,即 活跃线程数、核心/最大线程数,队列数量等。格式为: executor.线程池名.activeCount. (注意线程池一定要有名字)
gauge() 方法内部集成 PFinder,使用代码编程的方式进行 Gauge埋点,用于记录线程池的瞬时值指标:活动线程数、核心/最大、队列大小等。PFinder埋点方式详见PFinder文档。
public abstract class MeterRegistry {
    public List gaugeExecutor(String executorName, ThreadPoolExecutor executor) {
        String namePrefix = "executor." + executorName;
        return gaugeExecutor0(namePrefix, executor);
    }
    private List gaugeExecutor0(String namePrefix, ThreadPoolExecutor executor) {
        namePrefix += ".";
        List gauges = new ArrayList<>();
        if (getConfig().isThreadPoolAllMetricsEnabled()) {
            gauges.add(gauge(namePrefix + "taskCount", executor::getTaskCount));
            gauges.add(gauge(namePrefix + "completedTaskCount", executor::getCompletedTaskCount));
        }
        gauges.add(gauge(namePrefix + "activeCount", executor::getActiveCount));
        gauges.add(gauge(namePrefix + "corePoolSize", executor::getCorePoolSize));
        gauges.add(gauge(namePrefix + "maxPoolSize", executor::getMaximumPoolSize));
        gauges.add(gauge(namePrefix + "poolSize", executor::getPoolSize));
        gauges.add(gauge(namePrefix + "queueSize", () -> executor.getQueue().size()));
        //
        return gauges;
    }}

四、避坑指南

线程池必须有名字,监控依赖,并且不能重名。当系统有问题时也便于通过jstack等工具排查定位问题。
应用需开启pfinder监控并且PFinder SDK 要和 agent版本兼容
线程池创建后,线程不会立即启动,而是在有任务提交时才启动,启动的瞬间会因为创建线程的开销造成性能“抖动”,可以使用 prestartAllCoreThreads() 预热核心线程。
线程池的核心线程,默认是不会回收的,如果一个线程池活跃度长时间很低,建议调整核心线程数,过多的线程会浪费内存资源,影响系统稳定性。
FutureCompletableFuture异步任务使用线程池时设置合理的超时时间,避免因外部服务故障或网络等问题导致任务长时间阻塞,造成资源浪费,严重甚至拖垮整个线程池,导致线上问题。
同理,系统中请求外部Http请求时,必须设置超时时间,避免资源被长时间占用无法释放,影响系统性能和稳定性。


请使用浏览器的分享功能分享到微信等