分布式日志追寻ID实战
作者:京东物流 张小龙。
本文经过介绍分布式运用下各个场景的大局日志ID透传思路,以及介绍分布式日志追寻ID简略完结原理和实战作用,然后到达经过进步日志查询排查问题的功率。
布景。
开发排查体系问题用得最多的手法便是检查体系日志,信赖不少人都值过班当过小秘吧:给下。接口。和收支参吧,费事看看日志里的有没有反常。信息。啊等等,可是在并发大时运用日志定位问题仍是比较费事,因为很多的其他用户/其他线程的日志也一同输出穿行其间导致很难筛选出指定恳求的悉数相关日志,以及下流线程/服务对应的日志,乃至一些特别场景的收支参只打印了一些比如gis坐标、四级地址等没有单据信息的日志,使得日志定位起来十分不便利。
场景剖析。
自己地点组担任的体系首要是web运用,其间涉及到的恳求办法首要有:springmvc的servlet的http场景、jsf场景、MQ场景、resteasy场景、clover场景、easyjob场景,每一种场景都需求不同的办法进行logTr。ac。eId的透传,接下来逐一探析上述各个场景的透传计划。
在这之前咱们先要简略了解一下日志中透传和打印logTraceId的办法,一般咱们运用M。DC。进行logTraceId的透传与打印,可是根据MDC内部运用的是Thre。ad。Local所以只要本线程才有用,子线程服务的MDC里的值会丢掉,所以这儿咱们要么是在一切涉及到父子线程的当地以编码侵入式自行完结值的传递,要么便是经过覆写MDCAdapter:经过阿里的TransmittableThreadLocal来处理父子线程传递问题,而本文选用的是比较粗糙地以编码侵入式来处理此问题。
springmvc的servlet的http场景。
这个场景信赖咱们都现已烂熟到骨子里了,首要思路是经过拦截器的办法进行logTraceId的透传,新建一个类完结HandlerIn。te。rcept。or。
preHandle:在事务。处理器。处理恳求之前被调用,这儿完结logTraceId的设置与透传。
postHandle:在事务处理器处理恳求履行完结后,生成视图之前履行,这儿空完结就好。
afterCompletion:在Dispatche。rS。ervlet彻底处理完恳求后被调用,这儿用于铲除MDC的logTraceId。
Slf4jpubl。ic。class TraceInterceptor implements HandlerInterceptor { Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object o) throws Excep。ti。on { try{ String traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY); if (StringUtils.isBlank(traceId)) { MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, TraceUtils.getTraceId()); } }catch (RuntimeException e){ log.error("mvc自定义log盯梢拦截器履行反常",e); } return true; } Override public void postHandle(。java。x.servlet.http.HttpServletRequest httpServletRequest, javax.servlet.http.HttpServletResponse httpServletResponse, Object o, ModelAn。dVi。ew modelAndView) throws Exception { } Override public void afterCompletion(javax.servlet.http.HttpServletRequest httpServletRequest, javax.servlet.http.HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception { try{ MDC.clear(); }catch (RuntimeException ex){ log.error("mvc自定义log盯梢拦截器履行反常",ex); } }}。
jsf场景。
信赖咱们关于jsf并不生疏,而jsf也支撑自定义filter,根据jsf过滤器的运转办法,能够经过装备大局过滤器(承继Abstrac。tF。ilter)的办法进行logTraceId的透传,需求留意的是jsf是在线程池中履行的所以一定要信赖音讯体中的logTraceId。
jsf顾客过滤器:首要从上下文环境中获取logTraceId并进行透传,完结代码如下。
Slf4jpublic class TraceIdGlobalJsfFilter extends AbstractFilter { Override public ResponseMessage invoke(RequestMessage requestMessage) { //设置traceId setAndGetTraceId(requestMessage); try{ return this.getNext().invoke(requestMessage); }finally { } } /** * 设置并回来traceId * pa。ram。requestMessage * return */ private void setAndGetTraceId(RequestMessage requestMessage) { try{ String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY); Object logTraceIdObj = requestMessage.getInvocationBody().getAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY); if(StringUtils.isBlank(logTraceId) && logTraceIdObj == null){ //假如filter和MDC都没有获取到则阐明有遗失,打印日志 if(log.i。sD。ebugEnabled()){ log.debug("jsf顾客自定义log盯梢拦截器预警,filter和MDC都没有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));} } else if(StringUtils.isBlank(logTraceId) && logTraceIdObj != null) { //假如MDC没有,filter有,打印日志 if(log.isDebugEnabled()){ log.debug("jsf顾客自定义log盯梢拦截器预警,MDC没有filter有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));} } else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj == null){ //假如MDC有,filter没有,阐明是源头现已有了,可是jsf是第一次调,透传 requestMessage.getInvocationBody().addAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY, logTraceId); }else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj != null){ //MDC和fitler都有,可是并不持平,则存在问题打印日志 if(log.isDebugEnabled()){ log.debug("jsf顾客自定义log盯梢拦截器预警,MDC和filter都有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));} } }catch (RuntimeException e){ log.error("jsf顾客自定义log盯梢拦截器履行反常",e); } }}。
jsf提供者过滤器:经过拿到顾客在音讯体中透传的logTraceId来完结,完结代码如下。
Slf4jpublic class TraceIdGlobalJ。sfP。roduce。rF。ilter extends AbstractFilter { Override public ResponseMessage invoke(RequestMessage requestMessage) { //设置traceId boolean isNeedCle。arM。dc = transferTraceId(requestMessage); try{ return this.getNext().invoke(requestMessage); }finally { if(isNeedClearMdc){ clear(); } } } /** * 设置并回来traceId * param requestMessage * return */ private boolean transferTraceId(RequestMessage requestMessage) { boolean isNeedClearMdc = false; try{ String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY); Object logTraceIdObj = requestMessage.getInvocationBody().getAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY); if(StringUtils.isBlank(logTraceId) && logTraceIdObj == null){ //假如filter和MDC都没有获取到,阐明存在遗失场景或是提供给外部体系调用的接口,打印日志进行调查 String traceId = TraceUtils.getTraceId(); MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,traceId); requestMessage.getInvocationBody().addAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY, traceId); if(log.isDebugEnabled()){ log.debug("jsf生产者自定义log盯梢拦截器预警,filter和MDC都没有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));} isNeedClearMdc = true; } else if(StringUtils.isBlank(logTraceId) && logTraceIdObj != null) { //假如MDC没有,filter有,阐明是被调用方,需求透传下去 MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceIdObj.toString()); isNeedClearMdc = true; } else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj == null){ //假如MDC有,filter没有,存在问题,打印日志 if(log.isDebugEnabled()){ log.debug("jsf生产者自定义log盯梢拦截器预警,MDC有filter没有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));} isNeedClearMdc = true; }else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj != null && !logTraceId.equals(logTraceIdObj.toString())){ //MDC和fitler都有,可是并不持平,则信赖filter透传成果 TraceUtils.resetTraceId(logTraceIdObj.toString()); if(log.isDebugEnabled()){ log.debug("jsf生产者自定义log盯梢拦截器预警,MDC和fitler都有traceId,可是并不持平,jsf信息:{}", JSON.toJSONString(requestMessage));} } return isNeedClearMdc; }catch (RuntimeException e){ log.error("jsf生产者自定义log盯梢拦截器履行反常",e); return false; } } /** * 铲除MDC */ private void clear() { try{ MDC.clear(); }catch (RuntimeException e){ log.error("jsf生产者自定义log盯梢拦截器履行反常",e); } }}。
MQ场景。
提到MQ信赖咱们关于此就更不生疏了,此种场景首要经过在提供者发送音讯时拿到上下文中的logTraceId,将其以扩展信息的办法设置进音讯体中进行透传,而顾客则从音讯体中进行获取。
生产者:新建一个笼统类承继MessageProducer,覆写父类中的两个send办法(批量发送、单条发送),send办法中首要调用笼统加工音讯体的办法(logTraceId特点赋值)和日志打印,在子类中进行发送前对音讯体的加工处理,具体代码如下。
Slf4jpublic abstract class BaseTraceIdProducer extends MessageProducer { private static final String SEPARATOR_COMMA = ","; public BaseTraceIdProducer() { } public BaseTraceIdProducer(TransportManager transportManager) { super(transportManager); } /** * 获取音讯体-单个 * param messageContext * return */ protected abstract Message getMessage(MessageContext messageContext); /** 获取音讯体-批量 * * param messageContext * return */ protected abstract List。< Message >getMessages(MessageContext messageContext); /** * 填充音讯体上下文信息 * param message * param messageContext */ protected void fillContext(Message message,MessageContext messageContext) { if(message == null){ return; } if(StringUtils.isBlank(messageContext.getLogTraceId())){ String logTraceId = message.getAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY); messageContext.setLogTraceId(logTraceId); } if(StringUtils.isBlank(messageContext.getTo。pi。c())){ String topic = message.getTopic(); messageContext.setTopic(topic); } String businessId = message.getBusinessId(); messageContext.getBusinessIdBuf().append(SEPARATOR_COMMA).append(businessId); } /** * traceId嵌入音讯体中 * param message */ protected void generateTraceIdIntoMessage(Message message){ if(message == null){ return; } try{ String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY); if(StringUtils.isBlank(logTraceId)){ logTraceId = TraceUtils.getTraceId(); MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceId); } message.setAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY,logTraceId); }catch (RuntimeException e){ log.error("jmq2自定义log盯梢拦截器履行反常",e); } } /** * 批量发送音讯-无回调 * param messages * param timeout * throws JMQException */ public void send(List。< Message >messages, int timeout) throws JMQException { MessageContext messageContext = new MessageContext(); messageContext.setMessages(messages); List。< Message >messageList = this.getMessages(messageContext); //打印日志,便利排查问题 printLog(messageContext); super.send(messageList, timeout); } /** * 单个发送音讯 * param message * param transaction * param。 < T >* return * throws JMQException */ public。 < T >T send(Message message, LocalTransaction。< T >transaction) throws JMQException { MessageContext messageContext = new MessageContext(); messageContext.setMessage(message); Message msg = this.getMessage(messageContext); //打印日志,便利排查问题 printLog(messageContext); return super.send(msg, transaction); } /** * 批量发送音讯-有回调 * param messages * param timeout * param callback * throws JMQException */ public void send(List。< Message >messages, int timeout, AsyncSendCallback callback) throws JMQException { MessageContext messageContext = new MessageContext(); messageContext.setMessages(messages); List。< Message >messageList = this.getMessages(messageContext); //打印日志,便利排查问题 printLog(messageContext); super.send(messageList, timeout, callback); } /** * 打印日志,便利排查问题 * param messageContext */ private void printLog(MessageContext messageContext) { if(messageContext==null){ return; } if(log.isInfoEnabled()){ log.info("MQ发送:traceId:{},topic:{},businessIds:[{}]",messageContext.getLogTraceId(),messageContext.getTopic(),messageContext.getBusinessIdBuf()==null?"":messageContext.getBusinessIdBuf().toString());} }}Slf4jpublic class TraceIdEnvMessageProducer extends BaseTraceIdProducer { private static final String UAT_TRUE = String.valueOf(true); private boolean uat = false; public TraceIdEnvMessageProducer() { } public TraceIdEnvMessageProducer(TransportManager transportManager) { super(transportManager); } /** * 环境变量打标-单个音讯体 * param message */ private void convertUatMessage(Message message) { if (message != null) { message.setAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT, UAT_TRUE); } } /** * 音讯转化-批量音讯体 * param messageContext * return */ private List。< Message >convertMessages(MessageContext messageContext) { List。< Message >messages = messageContext.getMessages(); if (!CollectionUtils.isEmpty(messages)) { Iterator messageIterator = messages.iterator(); while(messageIterator.hasNext()) { Message message = (Message)messageIterator.next(); if(this.isUat()){ this.convertUatMessage(message); } super.generateTraceIdIntoMessage(message); super.fillContext(message,messageContext); } } return messageContext.getMessages(); } /** * 音讯转化-单个音讯体 * param messageContext * return */ private Message convertMessage(MessageContext messageContext){ Message message = messageContext.getMessage(); if(this.isUat()){ this.convertUatMessage(message); } super.generateTraceIdIntoMessage(message); super.fillContext(message,messageContext); return message; } protected Message getMessage(MessageContext messageContext) { if(log.isDebugEnabled()){ log.debug("current environment is UAT : {}", this.isUat());} return this.convertMessage(messageContext); } protected List。< Message >getMessages(MessageContext messageContext) { if(log.isDebugEnabled()){ log.debug("current environment is UAT : {}", this.isUat());} return this.convertMessages(messageContext); } public void setUat(boolean uat) { this.uat = uat; } boolean isUat() { return this.uat; }}。
顾客:新建一个笼统类承继MessageListener,覆写父类中的onMessage办法,首要进行设置日志traceId和消费完结后的traceId整理等,而在子类中进行一些自定义处理,具体代码如下。
Slf4jpublic abstract class BaseTraceIdMessageListener implements MessageListener { public BaseTraceIdMessageListener() { } public abstract void onMessageList(List。< Message >messages) throws Exception; Override public final void onMessage(List。< Message >messages) throws Exception { try{ if(CollectionUtils.isEmpty(messages)){ return; } //设置日志traceId setLogTraceId(messages); this.onMessageList(messages); //消费完后铲除traceId clear(); }catch (Exception e){ throw e; }finally { MDC.clear(); } } /** * 设置日志traceId * param messages */ private void setLogTraceId(List。< Message >messages) { try{ Message message = messages.get(0); String logTraceId = message.getAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY); if(StringUtils.isBlank(logTraceId)){ logTraceId = TraceUtils.getTraceId(); } MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceId); }catch (RuntimeException e){ log.error("jmq2自定义log盯梢拦截器履行反常",e); } } /** * 铲除traceId */ private void clear() { try{ MDC.clear(); }catch (RuntimeException e){ log.error("jmq2自定义log盯梢拦截器履行反常",e); } }}Slf4jpublic abstract class TraceIdEnvMessageListener extends BaseTraceIdMessageListener{ private String uat; public TraceIdEnvMessageListener() { } public abstract void onMessages(List。< Message >var1) throws Exception; Override public void onMessageList(List。< Message >messages) throws Exception { Iterator iterator; Message message; if (this.getUat() != null && Boolean.valueOf(this.getUat())) { iterator = messages.iterator(); while(true) { while(iterator.hasNext()) { message = (Message)iterator.next(); if (message != null && Boolean.valueOf(message.getAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT))) { this.onMessages(Arrays.asList(message)); } else { log.debug("Ignore message: [BusinessId: {}, Text: {}]", message.getBusinessId(), message.getText());} } return; } } else if (this.getUat() != null && !Boolean.valueOf(this.getUat())) { iterator = messages.iterator(); while(true) { while(iterator.hasNext()) { message = (Message)iterator.next(); if (message != null && !Boolean.valueOf(message.getAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT))) { this.onMessages(Arrays.asList(message)); } else { log.debug("Ignore message: [BusinessId: {}, Text: {}]", message.getBusinessId(), message.getText());} } return; } } else { this.onMessages(messages); } } public void setUat(String uat) { if (!"true".equals(uat) && !"false".equals(uat)) { throw new IllegalArgumentException("uat 特点值只能为 true 或 false."); } else { this.uat = uat; } } public String getUat() { return this.uat; }}。
resteasy场景。
此场景类似于spinrg-mvc场景,也是http恳求,需求经过拦截器在音讯头中进行logTraceId的透传,首要有客户端拦截器,服务端:预处理拦截器、后置拦截器,代码如下。
ClientInterceptorProviderSlf4jpublic class ResteasyClientInterceptor implements ClientExecutionInterceptor { Override public ClientResponse execute(ClientExecutionContext clientExecutionContext) throws Exception { try{ String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY); ClientRequest request = clientExecutionContext.getRequest(); String headerTraceId = request.getHeaders().getFirst(LogConstants.HEADER_LOG_TRACE_ID_KEY); if(StringUtils.isBlank(logTraceId) && StringUtils.isBlank(headerTraceId)){ //假如filter和MDC都没有获取到则阐明是调用源头 String traceId = TraceUtils.getTraceId(); TraceUtils.resetTraceId(traceId); request.header(LogConstants.HEADER_LOG_TRACE_ID_KEY,traceId); } else if(StringUtils.isBlank(headerTraceId)){ //假如MDC有可是filter没有则需求传递 request.header(LogConstants.HEADER_LOG_TRACE_ID_KEY,logTraceId); } }catch (RuntimeException e){ log.error("resteasy客户端log盯梢拦截器履行反常",e); } return clientExecutionContext.proceed(); }}Slf4jProviderServerInterceptorpublic class RestEasyPreInterceptor implements PreProcessInterceptor { Override public ServerResponse preProcess(HttpRequest request, ResourceMethod resourceMethod) throws Failure, WebApplicationException { try{ MultivaluedMap。< String, String >requestHeaders = request.getHttpHeaders().getRequestHeaders(); String headerTraceId = requestHeaders.getFirst(LogConstants.HEADER_LOG_TRACE_ID_KEY); if(StringUtils.isNotBlank(headerTraceId)){ //假如filter则透传 TraceUtils.resetTraceId(headerTraceId); } }catch (RuntimeException e){ log.error("resteasy服务端log盯梢前置拦截器履行反常",e); } return null; }}Slf4jProviderServerInterceptorpublic class ResteasyPostInterceptor implements PostProcessInterceptor { Override public void postProcess(ServerResponse serverResponse) { try{ MDC.clear(); }catch (RuntimeException e){ log.error("resteasy服务端log盯梢后置拦截器履行反常",e); } }}。
clover场景。
clover的大体机制首要是在项目发动的时分扫描到带有注解HessianWebService的类进行服务注册并保持心跳。检测。,而clover端则经过servlet恳求办法进行使命的回调,一起承继AbstractScheduleTaskProcess办法的使命是以线程池的办法进行事务的处理。
根据上述原理咱们需求处理两个问题:1.新建一个类承继ServiceExporterServlet,并在web.xml装备中进行servlet装备,代码如下;
Slf4jpublic class ServiceExporterTraceIdServlet extends ServiceExporterServlet { Override public void service(ServletRequest req, ServletResponse res) throws ServletException, IOException { try { String traceId = MDC.get("traceId"); if (StringUtils.isBlank(traceId)) { MDC.put("traceId", TraceUtils.getTraceId()); } } catch (Exception e) { log.error("clover恳求servlet履行反常", e); } try { super.service(req, res); } catch (Throwable e) { log.error("clover恳求servlet履行反常", e); throw e; }finally { try{ MDC.clear(); }catch (RuntimeException ex){ log.error("clover恳求servlet履行反常",ex); } } }}。
2.新建一个笼统类承继AbstractScheduleTaskProcess,在类中以编码办法进行父子线程的透传(可优化:经过覆写MDCAdapter:经过阿里的TransmittableThreadLocal来处理父子线程传递问题),一切使命均改为承继此类,要害代码如下。
try{ traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY); if (StringUtils.isBlank(traceId)) { log.warn("clover自定义log盯梢拦截器预警,mdc没有traceId"); } }catch (RuntimeException e){ log.error("clover自定义log盯梢拦截器履行反常",e); } final String logTraceId = traceId; while(iterator.hasNext()) { final List。< TcTask >list = (List。< TcTask >)iterator.next(); this.executor.。sub。mit(new Callable。< Object >() { public Object call() throws Exception { try{ if (StringUtils.isNotBlank(logTraceId)) { MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, logTraceId); } }catch (RuntimeException e){ log.error("clover自定义log盯梢拦截器履行反常",e); } Object var1; try { if (BaseTcTaskProcessWorker.logger.isInfoEnabled()) { BaseTcTaskProcessWorker.logger.info("正在履行使命[" + this.getClass().getName() + "],条数:" + list.size() + "..."); } BaseTcTaskProcessWorker.this.executeTasks(list); if (BaseTcTaskProcessWorker.logger.isInfoEnabled()) { BaseTcTaskProcessWorker.logger.info("履行使命[" + this.getClass().getName() + "],条数:" + list.size() + "成功!"); } var1 = null; } catch (Exception var5) { BaseTcTaskProcessWorker.logger.error(var5.getMessage(), var5); throw var5; } finally { try{ MDC.clear(); }catch (RuntimeException ex){ log.error("clover自定义log盯梢拦截器履行反常",ex); } latch.countDown(); } return var1; } }); }。
easyjob场景。
easyjob的大体机制是在项目发动的时分经过扫描完结接口Scheduler的类进行上报注册,一起发动一个acceptor(获取使命的线程池),而acceptor拉取到使命后会将父使命放进一个叫executor的线程池,子使命范进一个叫slowExecutor的线程池,咱们能够新建一个抽奖类完结接口ScheduleFlowTask,复用clover场景硬编码办法进行父子线程logTraceId的透传处理(可优化:经过覆写MDCAdapter:经过阿里的TransmittableThreadLocal来处理父子线程传递问题),示例代码如下。
。
Slf4jpublic abstract class AbstractEasyjobOnlyScheduleProcess。< T >implements ScheduleFlowTask { /** * EASYJOB渠道UMP监控key前缀 */ private static final String EASYJOB_UMP_KEY_RREFIX = "trans.easyjob.dotask."; /** * EASYJOB单个使命处理分布式锁前缀 */ private static final String EASYJOB_SINGLE_TASK_LOCK_PREFIX = "b。asic。_easyjob_single_task_lock_prefix_"; /** * 环境标识-开关装备进行环境阻隔 */ Value("${spring.profiles.active}") private String activeEnv; Value("${task.scene.mark}") private String sceneMark = TaskSceneMarkEnum.PRODUCTION.getDesc(); /** * easyJob维度线程池变量 */ private ThreadPoolExecutor easyJobExecutor; /** * easyJob维度服务器个数-分片个数 */ private volatile int easyJobLastThre。adC。ount = 0; /** * easyjob多线程称号 */ private static final String EASYJOB_THREAD_NAME = "dts.easyJobs"; /** * 子类的泛型。参数。类型 */ private Class。< T >argumentType; /** * 无参结构 */ public AbstractEasyjobOnlyScheduleProcess() { //设置子类泛型参数类型 argumentType = this.getArgumentType(); } Autowired private RedisHelper redisHelper; /** * 非task表扫描待处理的使命数据 * param taskServerParam * param curServer * return */ protected abstract List。< T >loadTasks(TaskServerParam taskServerParam, int curServer); /** * 事务处理笼统办法-单个 * param task */ protected abstract void doSingleTask(T task); /** * 事务处理笼统办法-批量 * param tasks */ protected abstract void doBatchTasks(List。< T >tasks); /** * 组装ump监控key * param prefix * param taskNameKey * return */ private String getUmpKey(String prefix,String taskNameKey) { StringBuffer umpKeyBuf = new StringBuffer(); umpKeyBuf.append(prefix).append(taskNameKey); return umpKeyBuf.toString(); } /** * easyjob渠道异步使命回调办法 * param scheduleContext * return * throws Exception */ Override public TaskResult doTask(ScheduleContext scheduleContext) throws Exception { String requestNo = TraceUtils.getTraceId(); try { String traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY); if (StringUtils.isBlank(traceId)) { MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, requestNo); } } catch (Exception e) { log.error("easyjob履行反常", e); } EasyJobTaskServerParam taskServerParam = null; CallerInfo callerinfo = null; try { //条件转化 taskServerParam = EasyJobCoreUtil.transTaskServerParam(scheduleContext); String taskNameKey = getTaskNameKey(); String umpKey = getUmpKey(EASYJOB_UMP_KEY_RREFIX,taskNameKey); callerinfo = Profiler.registerInfo(umpKey, Constants.TRANS_BASIC, false, true); //多服务器,而且非子使命,本次不履行,提交子使命 if (taskServerParam.getServerCount() >1 && !taskServerParam.isSubTask()) { submitSubTask(scheduleContext, taskServerParam,requestNo); return TaskResult.success(); } if (log.isInfoEnabled()) { log.info("恳求编号[{}],开端获取使命,使命ID[{}],使命称号[{}],履行参数[{}]", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), JSON.toJSONString(taskServerParam));} TaskServerParam cloverTaskServerParam = EasyJobCoreUtil.transferCloverTaskServerParam(taskServerParam); List。< T >tasks = this.selectTasks(cloverTaskServerParam, taskServerParam.getCurServer()); if (log.isInfoEnabled()) { log.info("恳求编号[{}],获取使命ID[{}],使命称号[{}]共{}条", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), tasks == null ? 0 : tasks.size());} if (CollectionUtils.isNotEmpty(tasks)) { if (log.isInfoEnabled()) { log.info("恳求编号[{}],开端履行使命,使命ID[{}],使命称号[{}]", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName());} this.easyJobExecuteTasksInner(taskServerParam, tasks,requestNo); if (log.isInfoEnabled()) { log.info("恳求编号[{}],履行使命,使命ID[{}],使命称号[{}],履行数量[{}]完结....", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), tasks.size());} } return TaskResult.success(); } catch (Exception e) { Profiler.functionError(callerinfo); if (log.isInfoEnabled()) { log.error("恳求编号[{}],使命履行失利,使命ID[{}],使命称号[{}]", requestNo, taskServerParam == null ? "" : taskServerParam.getTaskId(), taskServerParam == null ? "" :taskServerParam.getTaskName(), e);} return TaskResult.fail(e.getMessage()); }finally { try{ MDC.clear(); }catch (RuntimeException ex){ log.error("easyjob履行反常",ex); } Profiler.registerInfoEnd(callerinfo); } } /** * 多分片提交子使命 * param scheduleContext 调度使命上下文参数 * param taskServerParam 调度使命参数 * param requestNo 调度使命参数 * return void */ private void submitSubTask(ScheduleContext scheduleContext, EasyJobTaskServerParam taskServerParam,String requestNo) throws IOException { log.info("恳求编号[{}],履行使命,使命ID[{}],使命称号[{}],子使命个数[{}],开端提交子使命", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), taskServerParam.getServerCount()); String jobClass = scheduleContext.getTaskGetResponse().getJobClass(); if (StringUtils.isBlank(jobClass)) { throw new RuntimeException("jobClass get error"); } for (int i = 0; i。 < taskServerParam.getServerCount(); i++) { Map< String, String >dataMap = scheduleContext.getParameters(); //提交子使命标识 dataMap.put("isSubTask", "true"); //给子使命进行编号 dataMap.put("curServer", String.valueOf(i)); //父使命称号传递子使命 dataMap.put("taskName", taskServerParam.getTaskName()); scheduleContext.commitSubTask(jobClass, dataMap, taskServerParam.getExpected(), taskServerParam.getTransactionalAccept()); } // 父使命等候子使命履行结束再更改状况,假如履行时刻超越等候时刻,抛反常 //scheduleContext.w。ai。tForSubtaskCompleted((long) taskServerParam.getServerCount() * taskServerParam.getExpected()); log.info("恳求编号[{}],履行使命,使命ID[{}],使命称号[{}],子使命个数[{}],提交完结....", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), taskServerParam.getServerCount());} /** * 创立线程池,按装备参数履行task * param param 履行参数 * param tasks 使命调集 * param requestNoStr * return void */ private void easyJobExecuteTasksInner(final EasyJobTaskServerParam param, List。< T >tasks,String requestNoStr) { int threadCount = param.getThreadCount(); synchronized (this) { if (this.easyJobExecutor == null) { this.easyJobExecutor = (ThreadPoolExecutor) EasyJobCoreUtil.createCustomeasyJobExecutorService(threadCount, EASYJOB_THREAD_NAME); this.easyJobLastThreadCount = threadCount; } else if (threadCount >this.easyJobLastThreadCount) { this.easyJobExecutor.set。Maxim。umPoolSize(threadCount); this.easyJobExecutor.setCorePoolSize(threadCount); this.easyJobLastThreadCount = threadCount; } else if (threadCount。 < this.easyJobLastThreadCount) { this.easyJobExecutor.setCorePoolSize(threadCount); this.easyJobExecutor.setMaximumPoolSize(threadCount); this.easyJobLastThreadCount = threadCount; } } List< List< T >>lists = Lists.partition(tasks, param.getExecuteCount()); final CountDownLatch latch = new CountDownLatch(lists.size()); final String requestNo = requestNoStr; for (final List。< T >list : lists) { this.easyJobExecutor.submit( new Callable。< Object >() { public Object call() throws Exception { try{ if (StringUtils.isNotBlank(requestNo)) { MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, requestNo); } }catch (RuntimeException e){ log.error("easyjob自定义log盯梢拦截器履行反常",e); } try { if (log.isInfoEnabled()) { log.info("恳求编号[{}],正在履行使命,使命ID[{}],使命称号[{}],[{}],条数:[{}]...", requestNo, param.getTaskId(), param.getTaskName(), Thread.currentThread().getName(), list.size());} executeTasks(list); if (log.isInfoEnabled()) { log.info("恳求编号[{}],履行使命,使命ID[{}],使命称号[{}],[{}],条数:[{}]成功!", requestNo, param.getTaskId(), param.getTaskName(), Thread.currentThread().getName(), list.size());} } catch (Exception e) { log.error(e.getMessage(), e); throw e; } finally { try{ MDC.clear(); }catch (RuntimeException ex){ log.error("easyjob自定义log盯梢拦截器履行反常",ex); } latch.countDown(); } return null; } } ); } try { latch.await(); } catch (InterruptedException e) { throw new RuntimeException("interrupted when processing data access request in concurrency", e); } } /** * 获取使命称号 * return */ private String getTaskNameKey(){ StringBuffer keyBuf = new StringBuffer(); keyBuf.append(activeEnv) .append(Constants.SEPARATOR_UNDERLINE) .append(this.getClass().get。Sim。pleName()); return keyBuf.toString(); } protected void executeTasks(List。< T >taskList) { if(CollectionUtils.isEmpty(taskList)) { return; } this.doTasks(taskList); } /** * 事务处理笼统办法 * param list */ protected void doTasks(List。< T >list){ if(isDoBatchTasks()){ CallerInfo info = Profiler.registerInfo(getClass().getName()+"_batch", Constants.TRANS_BASIC,false, true); try { /** 开端履行各个子类真实事务逻辑 */ this.doBatchTasks(list); } catch(CommonBusinessException ex){ log.warn(ex.getMessage()); } catch (Exception e) { Profiler.functionError(info); log.error("使命处理失利,办法:{},使命:{}",ClassHelper.getMethod(),JSON.toJSONString(list), e);} finally { Profiler.registerInfoEnd(info); } }else{ for (T task : list) { CallerInfo info = Profiler.registerInfo(getClass().getName(), Constants.TRANS_BASIC,false, true); if(task == null) { continue; } String lockKey = ""; try { /** 开端履行各个子类真实事务逻辑 */ if (useConcurrentLock()) { lockKey = getLockKey(task); if (redisHelper.lock(RedisKeyDef.SyncLockKeyPrefix.TASK_PROCESS_LOCK_PREFIX, lockKey)) { this.doSingleTask(task); }else{ lockKey = ""; log.warn("lockKey:{},加载失利,正在被其他用户确定,请重试!",lockKey);} } else { this.doSingleTask(task); } } catch(CommonBusinessException ex){ log.warn(ex.getMessage()); } catch (Exception e) { Profiler.functionError(info); log.error("使命处理失利,办法:{},使命:{}",ClassHelper.getMethod(),JSON.toJSONString(task), e);} finally { Profiler.registerInfoEnd(info); if (StringUtils.isNotBlank(lockKey)) { redisHelper.unlock(RedisKeyDef.SyncLockKeyPrefix.TASK_PROCESS_LOCK_PREFIX, lockKey); } } } } } /** * 获取实体类的实践类型 * * return */ private Class。< T >getArgumentType() { return (Class。< T >) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0]; } /** * 是否运用防并发锁 * 默许不运用,如需运用子类重写该办法 * return */ protected boolean useConcurrentLock() { return false; } /** * 根所注解获取LockKey,可被子类重写,进步功率 * * param businessObj 事务目标 * return concurrent lock key */ protected String getLockKey( T businessObj) { StringBuilder lockKey = new StringBuilder(EASYJOB_SINGLE_TASK_LOCK_PREFIX); //若存在注解指定的防重字段,则运用这些字段组装防重Key,不然运用MQ事务主键防重 List。< ValueEntryInfo >valueEntries = getAnnotaionConcurrentKeys(businessObj); if (!CollectionUtils.isEmpty(valueEntries)) { for (ValueEntryInfo valueEntry : valueEntries) { lockKey.append(Constants.SEPARATOR_UNDERLINE); lockKey.append(valueEntry.getValue()); } } else { throw new CommonBusinessException(String.format("此使命处理需求加分布式锁,可是未设置锁key,所以不做事务处理,请检查,使命信息:%s",JSON.toJSONString(businessObj))); } return lockKey.toString(); } /** * 查找目标的ConccurentKey注解,获取防重字段,并排序回来 * * param businessObj 事务目标 * return 有序的事务字段值列表 */ private List。< ValueEntryInfo >getAnnotaionConcurrentKeys(T businessObj) { List。< ValueEntryInfo >valueEntries = new ArrayList。< ValueEntryInfo >(); Field[] fields = businessObj.getClass().getDeclaredFields(); for (int i = 0; i。 < fields.length; i++) { ConcurrentKey concurrentKey = fields[i].getAnnotation(ConcurrentKey.class); if (concurrentKey != null) { fields[i].setAccessible(true); Object fieldVal = null; try { ValueEntryInfo valueEntry = new ValueEntryInfo(); fieldVal = fields[i].get(businessObj); if (fieldVal != null) { valueEntry.setValue(String.format("%1$s", fieldVal)); valueEntry.setOrder(concurrentKey.order()); valueEntries.add(valueEntry); } } catch (IllegalAccessException e) { log.error("IllegalAccess-{}.{}", businessObj.getClass().getName(), fields[i].getName());} } } if (valueEntries.size() >1) { //排序ConcurrentKey Collections.sort(valueEntries, new Comparator。< ValueEntryInfo >() { Override public int compare(ValueEntryInfo o1, ValueEntryInfo o2) { if (o1.getOrder() >o2.getOrder()) { return 1; } else if (o1.getOrder() == o2.getOrder()) { return 0; } else { return -1; } } }); } return valueEntries; } protected List。< T >selectTasks(TaskServerParam taskServerParam, int curServer) { return this.loadTasks(taskServerParam, curServer); } /** * 获取select时的使命创立开端时刻 * param serverArg * return */ protected Date getCreateTimeF。rom。(String serverArg){ return null; } /** * 是否以批量办法处理使命 * return */ protected boolean isDoBatchTasks(){ return false; }}。
实战成果。
上述所述均为透传ID场景的原理和示例代码,实战作用如下图:调用jsf超时,跨体系检查日志进行排查,得知为慢sql引起。
上述大部分场景现已抽出一个通用jar包,具体运用。教程。见我的另一篇文章:分布式日志追寻ID运用教程。
审阅修改 黄宇。
内容来源:https://artdesignphuong.com/app-1/trực tiếp giải quần vợt wimbledon,https://chatbotjud-hml.saude.mg.gov.br/app-1/ice-casino-apk
(责任编辑:人文)