买药秒送 JADE动态线程池实践及原理浅析
一、布景及JADE介绍。
买药秒送是健康即时零售事务新的中心流量场域,面对京东主页高流量曝光,咱们对频道页整个技能架构计划进行晋级,保证。接口。高功能、体系高可用。
动态线程池是买药频道运用的技能之一,咱们经过3轮高保真压测终究开端确认了线程池的中心。参数。。但咱们仍面对一些保证体系安稳性问题:怎么监控线程池运转状况?以及因流量飙升呈现使命堆积和回绝时能否实时报警,线程池中心参数能否做到不重启运用,动态调整即时收效?
经调研,业界老练的动态线程池开源项目有 dynamic-tp 和 hippo4j,在京东内部运用比较广泛的计划是 JADE ,几种计划完成思路大致相同,感兴趣可自行了解。JADE 是由零售中台-研制架构组保护的项目,动态线程池是JADE的组件之一,其安稳性已得到广泛验证(集团运用 300+,零售买卖服务中台运用 250+ ,其间 0 级运用 130+),与JADE相得益彰的还有万象渠道:是可视化的JADE办理端,集成装备、监控、批阅等才能的JADE可视化渠道,能够更高效的运用JADE组件,进一步进步工作效率。
完成作用。
接入JADE和万象后,买药秒送线程池秒级监控作用如下:实时监控线程池运转状况以及阈值报警。
下面咱们从实践到原理一探终究。
二、JADE动态线程池+万象可视化渠道接入实践。
JADE动态线程池和万象全体流程图如下:运用中需求引进 JADE、DUCC和 PFinder。 SD。K,经过JADE创立线程池,线程池中心参数经过万象渠道装备,集成 DUCC 完成动态调参,即时收效。线程池运转状况监控经过 PFinder 完成秒级监控。
1、引进JADE POM依靠,jade从1.2.4版别开端支撑万象。
< !-- JADE 核心包,最新版本与发布说明-- >< dependency >< groupId >com.jd.jade。< /groupId >< artifactId >jade。< /artifactId >< version >1.2.4。< /version >< /dependency >< !-- 引用 PFinder SDK 库 -- >< dependency >< groupId >com.jd.pfinder。< /groupId >< artifactId >pfinder-profiler-sdk。< /artifactId >< version >1.1.5-FINAL。< /version >< /dependency >< !-- JADE 配置目前基于 XStream 进行 XML 格式序列化,若通过非 DBConfig 动态调参,需自行引入 -- >< dependency >< groupId >com.thoughtw。or。ks.xstream。< /groupId >< artifactId >xstream。< /artifactId >< version >1.4.19。< /version >< /dependency >< !-- JADE 尚未完全解除 dbconfig 依赖(主要是 ConfigBase 基类及 XmlConfigService 接口),非交易应用,需自行引入 dbconfig-client-api 精简包,交易应用一般已直接引用 dbconfig-client 实现包 -- >< dependency >< groupId >com.jd.purchase.config。< /groupId >< artifactId >dbconfig-client-a。pi。< /artifactId >< version >1.0.8。< /version >< /dependency >
2、创立jade.properties装备文件,并经过Spring加载该装备文件。
•留意姓名不能修正,JADE初始化会从该命名文件中加载装备特点。
# 万象渠道环境装备 jade.wx.env=pre # 以下为调试设置,线上环境无需装备 jade.log.level=debug jade.me。te。r.debug-enabled=true。
•Spring加载JADE装备文件。
< property name="locations" >< list >< !--JADE配置-- >< value >classpath:jade.properties。< /value >< /list >< /property >< property name="fileEncoding" >< value >U。TF。-8。< /value >< /property >< /bean >
3、装备JADE发动类,担任 JADE 自定义初始化 。
•假如不集成万象渠道,则能够运用装备的DUCC空间装备和修正线程池参数。
•【。引荐。】假如运用万象,万象会为JDOS运用默许创立一个DUCC空间,运用万象的DUCC进行装备和更新。
/** * descrip。ti。on:JADE装备类 * author: rongtao7 * date: 2024/4/5 1:09 下午 */ Configuration publ。ic。class J。ad。eConfig { Value("ucc://${ducc.application}:${ducc.token}${ducc.hostPort}/v1/namesp。ac。e/${ducc.namespace}/config/${ducc.config}/profiles/${ducc.profile}?longPolling=15000") private String duccUrl; Value("${jade.wx.env}") private String wxEnv; Bean public InitializeBean jadeInitBean() { InitializeBean initializeBean = new InitializeBean(); // 留意这儿,假如 uri 中 config 不是命名为 jade,则 name 特点需求设置为 jade ConfiguratorManager instance = new ConfiguratorManager(); instance.addResource("jade", duccUrl); initializeBean.setConfigServiceProvider(instance); // 万象环境 initializeBean.setWxEnv(wxEnv); return initializeBean; } }。
4、运用JADE创立线程池,并经过PFinder包装增强以支撑trace的传递。
•prestart()用于预热中心线程。
/** * 线程池装备类,集成JADE和万象渠道 */ Configuration public class TaskExecutePoolConfig { /** * 买药秒送频道线程池 */ Bean public ExecutorService msChannelPagePool(){ //JADE组件创立线程池 ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutorBuilder.newBuilder() .name(ThreadPoolName.MS_CHANNEL_PAGE_POOL.name()) // 线程池称号 .core(200) // 中心线程数 .max(200) // 最大线程数 .queue(100) // 设置行列长度,此行列支撑动态调整 .callerRuns() // 回绝战略,内置监控、日志 .keepAliveTime(60L, TimeUnit.SECONDS) //线程存活时刻 .prestart() // 预初始化一切中心线程数 .build(); // Pfinder增强 return PfinderContext.executorServiceWrapper(threadPoolExecutor); } }。
5、万象渠道接入。
1)创立万象环境:第一次接入需求创立预发和出产环境。
2)创立万象线程池组件。
6、验证作用。
•线程池参数动态改变 - 万象,更新后可观测到如下日志,阐明修正成功。
update executor 'MS_CHANNEL_PAGE_POOL' corePoolSize from 500 to 50 update executor 'MS_CHANNEL_PAGE_POOL' maxPoolSize from 500 to 200 update executor 'MS_CHANNEL_PAGE_POOL' keepAliveTime from 60 to 120 in seconds update executor 'MS_CHANNEL_PAGE_POOL' queueCapacity from 100 to 90。
•线程池监控 - PFinder,key格局为:executor.线程池称号.线程池状况(活泼/中心/最大线程数、行列巨细、回绝使命数)。
•注:运用需敞开pfinder监控而且PFinder SDK 要和 agent版别兼容。
•线程池使命RT监控 & 线程池状况监控:
•线程池行列参数装备反常报警:
以上几步操作,就完成了JADE和万象的动态线程池接入。下面从源码视点浅析一下原理。
三、原理源码浅析。
动态线程池的中心实质是对JDK的ThreadPoolExecutor包装增强,集成UMP、PFinder、Ducc、万象渠道,以完成线程池的可视化办理、动态调参、监控报警才能。
线程池参数怎么完成改变呢?
线程池有4个要害参数,即:中心线程数、最大线程数、行列巨细、存活时刻4个。
•中心、最大线程数、存活时刻3个参数经过JDK ThreadPoolExecutor供给了setCorePoolSize、set。Maxim。umPoolSize和setKeepAliveTime支撑更新参数。
•但行列长度capacity是不支撑修正的,其运用private final润饰。JADE是经过ResizeableLinkedBlockingQueue完成行列长度可变,完成办法是承继LinkedBlockingQueue,经过反射修正行列长度。
下面是JADE动态线程池简易原理图:
从万象渠道更新参数开端,万象会将装备数据保存到MySQL数据库中,并经过发布操作将更新的装备推送到JADE的DUCC集成模块DuccConfigService,Linstener监听到装备改变后调用ThreadPoolExecutorUpdater更新线程池参数,更新参数是经过承继JDK的ThreadPoolExecutor完成更新,以及经过ResizeableLinkedBlockingQueue修正行列长度。
JADE线程池监控才能经过Meter监控点 及MeterRegistry监控工厂集成PFinder和UMP完成。
了解根底原理后,从JADE装备类初始化进程及线程池创立进程,别离看一下源码完成。
>JADE装备类初始化进程 - 源码探求。
JADEInitBeanBase注入了Spring容器,并运用SpringInitializingBeanafterPropertiesSet()履行自定义初始化逻辑。
JADE 自定义初始化逻辑总共有8个初始化过程,咱们只需求重视其间几个即可。
public abstract class InitBeanBase implements InitializingBean, ApplicationContextAware, ApplicationListener。< ContextRefreshedEvent >{ Override public void afterPropertiesSet() throws Exception { log.info("jade init begin"); //1.读取装备文件,设置Val特点值 initProperties(); //2.初始化日志等级 initLogLevel(); //3.初始化零售DBConfig initDbConfig(); //4.初始化DUCC initConfig(); //5.初始化万象装备 initWX(); //6.初始化 jvm ump key initU。mps。(); //7.初始化PFinderMeterRegistry监控工厂 initMeter(); //8.初始化JSF监听注册 JSF POOL initJsf(); UST.record(getClass()); log.info("jade init end"); } }。
1、initProperties()用于读取jade.properties装备文件,设置Val特点值。
•从根目录读取jade.properties装备文件,姓名不可变,不然获取不到。
public final class JadeConfigs { //从根目录读取 jade.properties private static synchronized Config initConfig() { //略... Object cfg = Thread.currentThread().getContextClassLoader().getResourceAsStream("jade.properties"); } }。
•为Bean的Val注解标示的特点设置值,假如jade.properties装备了则运用装备的,不然运用默许值。
public abstract class InitBeanBase implements InitializingBean, ApplicationContextAware, ApplicationListener。< ContextRefreshedEvent >{ //为Val注解标示的特点设置值 private void pa。rs。eSpringValue(Config cfg) { //Spring PropertyPlaceholderHelper:解析和替换占位符的东西 PropertyPlaceholderHelper helper = new PropertyPlaceholderHelper("${", "}", ":", true); //反射获取一切字段 for (Field f : FieldUtils.getAllFields(getClass())) { f.setAccessible(true); if (f.get(this) != null) { // may set explicitly continue; } //获取 Val 注解 Val valAnno = f.getAnnotation(Val.class); if (valAnno != null && StringUtils.isNotEmpty(valAnno.value())) { try { //从Config(jade.properties) 装备文件读取特点值,没有则为默许值。 String actualVal = helper.replacePlaceholders(valAnno.value(), k ->{ String v = cfg.getString(k); if (v == null) { v = applicationContext.getEnvironment().getProperty(k); } return v; }); if (actualVal != null) { Function。< String, ? >parser = TYPE_PARSERS.get(f.getType()); if (parser != null) { Object parsedVal = parser.apply(actualVal); f.set(this, parsedVal); } } } catch (Exception e) { log.error("parse field {} error", f.getName()); throw e;} } } } }。
2、initConfig()初始化装备类中的jade装备的ducc,假如不集成万象,则运用这个ducc装备。运用万象,则运用万象渠道装备的ducc。
•代码与万象初始化逻辑相同,参阅下面的即可。
3、initWX()初始化万象渠道装备。
•万象初始化流程首要有3过程:1.拼接运用万象默许装备的Ducc空间;2.发动监听;3.拉取装备更新JADE组件。
•万象的默许Ducc空间格局为:经过运用名和环境Env的拼接:{ns:wxbizapps} {appName:diansong} {env:pre}。
class WXInit { //万象初始化 private void init0() { //1.万象默许的DUCC装备 String duccHost = DuccResource.getDefautHost(); Config config = JadeConfigs.getConfig(); String app = config.getString("jade.wx.app", "jdos_wxbizapps"); String token = config.getString("jade.wx.token", getDefaultDuccToken(duccHost)); String ns = config.getString("jade.wx.ns", "wxbizapps"); String cfg = config.getString("jade.wx.cfg", Env.getAppName()); if (failOrLog(cfg, "jade.wx.cfg")) { return; } String env = initBean.getWxEnv(); if (StringUtils.isEmpty(env)) { env = config.getString("jade.wx.env"); } if (failOrLog(env, "jade.wx.env")) { return; } String currentApp = Env.getDeployAppName(); if (failOrLog(currentApp, "current app name")) { return; } //DUCC URL拼接 String url = String.format(DuccResource.URL_FORMAT, app, token, duccHost, ns, cfg, env, 1000 * 60, isRequired()); log.info("connect to wanxiang via {}", url); // TODO: mark token //Resource Name jade-wx String resxName = "jade-wx"; ConfiguratorManager cm = new ConfiguratorManager(); cm.setApplication(currentApp); cm.addResource(resxName, url); cm.start(); //2.发动监听Ducc jade-wx ConfigService configService = new DuccConfigService(cm); //3.从万象渠道拉装备更新JADE组件 configService.getConfig(resxName, JadeConfig.class); // TODO: not found, throws? UST.record(getClass());} }。
• 发动监听DUCC调用DuccConfigService init()初始化办法。
public class DuccConfigService implements ConfigService { //结构办法,注入DUCC ConfiguratorManager public DuccConfigService(NonNull ConfiguratorManager configuratorManager) { if (configuratorManager == null) { throw new NullPointerException("configuratorManager is marked non-null but is null"); } else { //初始化 this.init(configuratorManager); } } }。
•init()初始化办法中会发动万象DUCC的线程,并增加监听事情,监听Resource name 为jade-wx的改变,改变后的回调函数经过DuccConfigService.this.updateConfig(configuration)用来更新JADE组件。
//初始化办法 private void init(ConfiguratorManager configuratorManager) { try { this.configuratorManager = configuratorManager; //1.发动Ducc线程 if (!configuratorManager.isStarted()) { if (StringUtils.isNotEmpty(Env.getDeployAppName())) { System.setProperty("application.name", Env.getDeployAppName()); } configuratorManager.start(); } List。< Resource >resources = DuccUtil.getResources(configuratorManager); Iterator var3 = resources.iterator(); while(var3.hasNext()) { final Resource resource = (Resource)var3.next(); //2.Ducc增加监听事情,Name是:jade-wx configuratorManager.addListener(new ConfigurationListener() { public String getName() { return resource.getName(); } //回调函数更新JADE组件 public void onUpdate(Configuration configuration) { DuccConfigService.this.updateConfig(configuration); } }); } UST.record(this.getClass()); } catch (Throwable var5) { throw var5; } }。
•DuccConfigService更新办法调用JadeConfig的init()办法,依据万象渠道装备更新JADE各个组件,包含动态线程池。
public class JadeConfig implements JadeConfigSupport, InitializingObject { public static void init(JadeConfigSupport cfg) { //JADE-日志组件 更新 //JADE-动态线程池组件 更新 ThreadPoolExecutorUpdater.update(cfg.getExecutorConfig()); //JADE-本地缓存组件 更新 //.... } }。
5、ThreadPoolExecutorUpdater更新线程池参数中心类。
•中心、最大线程数、存活时刻是经过承继JDK ThreadPoolExecutor完成更新的。
•在中心类中,当调大中心线程数后,会调用prestartAllCoreThreads()对中心线程进行预热,所以不用忧虑调大中心线程数后产生的“颤动”问题(实践是创立线程的开支)。
•留意core和max是一同更新的,不然或许会导致更改不收效的问题。
ThreadPoolExecutorUpdater更新线程池首要有以下5个过程。
•updatePoolSize更新中心、最大线程数,留意需求一同同步更新,不然或许导致更新失利问题。
•setKeepAliveTime更新KeepAliveTime存活时刻。
•setCapacity反射修正行列容量。
•prestartAllCoreThreads()预热中心线程数。
•updateRejectSetting()更新回绝战略。
private static void update0(ExecutorConfigSupport.ExecutorSetting executorSetting, ThreadPoolExecutor executor) { //1.更新中心、最大线程数,留意需求一同同步更新,不然或许导致更新失利问题 updatePoolSize(executorSetting, executor); //2.更新KeepAliveTime存活时刻 if (executorSetting.getKeepAliveSeconds() != null && executorSetting.getKeepAliveSeconds() != executor.getKeepAliveTime(TimeUnit.SECONDS)) { executor.setKeepAliveTime(executorSetting.getKeepAliveSeconds(), TimeUnit.SECONDS); } //3.更新行列 if (executorSetting.getQueueCapacity() != null) { if (executor.getQueue() instanceof LinkedBlockingQueue) { LinkedBlockingQueue currentQueue = (LinkedBlockingQueue) executor.getQueue(); int currentQueueCapacity = ResizableLinkedBlockingQueue.getCapacity(currentQueue); if (executorSetting.getQueueCapacity() >0 && executorSetting.getQueueCapacity() != currentQueueCapacity) { //反射修正行列数量,signalNotFull ResizableLinkedBlockingQueue.setCapacity(currentQueue, executorSetting.getQueueCapacity()); } else if (executorSetting.getQueueCapacity() == 0) { //调整行列数量为0,留意丢使命危险。 if (BooleanUtils.isTrue(executorSetting.getForceResizeQueue())) { setWorkQueue(executor, new SynchronousQueue()); } else { // log } } } //else 省掉 } //4.预热中心线程数 if (BooleanUtils.toBoolean(executorSetting.getPrestartAllCoreThreads()) && executor.getPoolSize()。 < executor.getCorePoolSize()) { int threads = executor.prestartAllCoreThreads(); } //5.更新拒绝策略 updateRejectSetting(executorSetting, executor); }
•行列长度修正经过ResizableLinkedBlockingQueue反射完成。
//可动态调整容量的 BlockingQueue //HACK: 内部直接承继自 LinkedBlockingQueue,经过反射修正其 private final capacity 字段 public class ResizableLinkedBlockingQueue。< E >extends LinkedBlockingQueue。< E >{ //反射设置行列巨细 static。 < E >void setCapacity(LinkedBlockingQueue。< E >queue, int capacity) { int ol。dC。apacity = getCapacity(queue); FieldUtils.writeField(queue, FN_CAPACITY, capacity, true); int size = queue.size(); //假如行列中的使命现已达到老行列容量约束,而且新的容量大于行列使命数 if (size >= oldCapacity && capacity >size) { // thanks to https://www.cnblogs.com/thisiswhy/p/15457810.html MethodUtils.invokeMethod(queue, true, "signalNotFull"); } } }。
•这儿有一个细节,假如行列容量满了,当调整完行列数后,手动调用signalNotFull宣布行列非满告诉,唤醒堵塞线程,能够持续向行列刺进使命了。
>创立JADE线程池build()- 源码探求。
以下是咱们经过 JADE ThreadPoolExecutorBuilder 创立线程池的 Bean,中心逻辑在 build() 封装。
/** * 秒送频道页线程池 */ Bean public ExecutorService msChannelPagePool(){ ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutorBuilder.newBuilder() .name(ThreadPoolName.MS_CHANNEL_PAGE_POOL.name()) // 线程池称号 .core(200) // 中心线程数 .max(200) // 最大线程数 .queue(1024) // 设置行列长度,此行列支撑动态调整 .callerRuns() // 方便设置回绝战略为丢掉,内置监控、日志 .keepAliveTime(60L, TimeUnit.SECONDS) //线程存活时刻 .prestart() // 预初始化一切中心线程数 .build(); return PfinderContext.executorServiceWrapper(threadPoolExecutor); }。
•build()首要逻辑有3步,1.创立线程池 ,2.发动一切中心线程, 3.注册线程池监控点。
public abstract class AbstractExecutorBuilder{ public synchronized E build() { //1.创立线程池 this.executor = createExecutor(); //2.发动一切中心线程 if (this.prestartAllCoreThreads) { executor.prestartAllCoreThreads(); } //3.创立监控 initMonitor(); return this.executor; } }。
•initMonitor()创立PFinder线程池监控,即 活泼线程数、中心/最大线程数,行列数量等。格局为:executor.线程池名.activeCount.(留意线程池必定要有姓名)。
•gauge()办法内部集成PFinder,运用代码。编程。的办法进行Gauge埋点,用于记载线程池的瞬时值目标:活动线程数、中心/最大、行列巨细等。PFinder埋点办法详见PFinder文档。
public abstract class MeterRegistry。< C extends MeterRegistryConfig >{ public List。< Gauge >gaugeExecutor(String executorName, ThreadPoolExecutor executor) { String namePrefix = "executor." + executorName; return gaugeExecutor0(namePrefix, executor); } private List。< Gauge >gaugeExecutor0(String namePrefix, ThreadPoolExecutor executor) { namePrefix += "."; List。< Gauge >gauges = new ArrayList。< >(); if (getConfig().isThreadPoolAllMetricsEnabled()) { gauges.add(gauge(namePrefix + "taskCount", executor::getTaskCount)); gauges.add(gauge(namePrefix + "completedTaskCount", executor::getCompletedTaskCount)); } gauges.add(gauge(namePrefix + "activeCount", executor::getActiveCount)); gauges.add(gauge(namePrefix + "corePoolSize", executor::getCorePoolSize)); gauges.add(gauge(namePrefix + "maxPoolSize", executor::getMaximumPoolSize)); gauges.add(gauge(namePrefix + "poolSize", executor::getPoolSize)); gauges.add(gauge(namePrefix + "queueSize", () ->executor.getQueue().size())); // return gauges; } }。
四、避坑攻略。
•线程池有必要有姓名,监控依靠,而且不能重名。当体系有问题时也便于经过jstack等东西排查定位问题。
•运用需敞开pfinder监控而且PFinder SDK 要和 agent版别兼容。
•线程池创立后,线程不会当即发动,而是在有使命提交时才发动,发动的瞬间会由于创立线程的开支形成功能“颤动”,能够运用prestartAllCoreThreads()预热中心线程。
•线程池的中心线程,默许是不会收回的,假如一个线程池活泼度长时刻很低,主张调整中心线程数,过多的线程会糟蹋内存资源,影响体系安稳性。
•Future、CompletableFuture异步使命运用线程池时设置合理的超时时刻,防止因外部服务毛病或。网络。等问题导致使命长时刻堵塞,形成资源糟蹋,严峻乃至拖垮整个线程池,导致线上问题。
•同理,体系中恳求外部Http恳求时,有必要设置超时时刻,防止资源被长时刻占用无法开释,影响体系功能和安稳性。
审阅修改 黄宇。
内容来源:https://havascm.com/app-1/kqxs onip,http://chatbotjud-teste.saude.mg.gov.br/app-1/89-bet-win-paga
(责任编辑:人文)