目录

1、xxl目录结构

1.1 xxl-job-admin

1.2 xxl-job-core

1.3 xxl-job-executor-samples

2、架构设计

2.1  整体架构设计

2.2 服务集成

3、启动源码分析


从官方将源码下载下来,当前发布版本为2.2.0,该版本下载的为最新版本2.2.1,。代码相差不大,接下来对xxl-job进行一步一步拆解

github地址    GitHub - xuxueli/xxl-job: A distributed task scheduling framework.(分布式任务调度平台XXL-JOB)

码云地址    xxl-job: 一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。

该框架有如下优点

1、使用层面

简单、好用,源码易读,修改简单

2、功能层面

1、有可视化界面进行操作,可以集中化管理任务

2、通过可视化可以对任务的管理,执行,调度,任务生命周期管理

3、任务执行支持手动和corn表达式定时触发

4、任务调度支持配置任务链以及任务执行失败的阻断策略

5、任务执行时间,执行信息等记录,可以更好地分析任务瓶颈以及执行分布时间

等等

1、xxl目录结构

xxl-job的目录如下

1.1 xxl-job-admin

该目录为调度中心源码,包括任务的管理,执行,调度,以及运行监控报表。喏,就下面这个

1.2 xxl-job-core

该代码为服务集成的核心源码包,包括重要注解,执行器等。

1.3 xxl-job-executor-samples

该包为服务集成测试demo,支持frameless,jfinal,spring,SpringBoot等多个框架集成。

2、架构设计

2.1  整体架构设计

通过服务注册的方式,将job注册到任务调度中心,通过调度中心进行统一的任务管理

2.2 服务集成

以SpringBoot集成xxl-job为例,集成直接运行demo即可,可以参考01.XXL-JOB这个文章。

1、配置文件

# web port spring web容器端口
server.port=8083
# no web
#spring.main.web-environment=false# log config
logging.config=classpath:logback.xml
### xxl-job注册中心地址
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### xxl-job access token
xxl.job.accessToken=
### xxl-job 执行器名称
xxl.job.executor.appname=omsJob
### xxl-job executor 执行器ip
xxl.job.executor.ip=
### xxl-job executor 执行器端口
xxl.job.executor.port=9999
### xxl-job executor log-path 日志配置
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job executor log-retention-days 日志定时清理时间
xxl.job.executor.logretentiondays=30

2、初始化执行器

@Bean
public XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;
}

3、使用xxl-job注解配置任务

/*** 1、简单任务示例(Bean模式)*/
@XxlJob("demoJobHandler")
public ReturnT<String> demoJobHandler(String param) throws Exception {XxlJobLogger.log("XXL-JOB, Hello World.");for (int i = 0; i < 5; i++) {XxlJobLogger.log("beat at:" + i);TimeUnit.SECONDS.sleep(2);}return ReturnT.SUCCESS;
}

4、启动成功后,服务会开启两个端口,一个是业务端口,一个是调度中心下发job执行的端口。(此处要优化,服务和job触发其实使用一个端口即可)

3、启动源码分析

1、服务启动流程

@Bean
public XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;
}

服务主要启动是初始化XxlJobSpringExecutor这个bean对象,该对象定义了执行器xxlJobSpringExecutor的相关配置,如注册中心地址,服务提供地址,以及授权token等。该对象类图如下,

XxlJobSpringExecutor执行器继承XxlJobExecutor并实现Spring的ApplicationContextAware等类,对该bean进行了增强。主要核心类为XxlJobExecutor。

因为继承SmartInitializingSingleton,所以bean初始化完成之后会执行afterSingletonsInstantiated方法,该类主要为initJobHandlerMethodRepository这个方法,用于扫描xxl-job注解,进行任务加载和管理

// start
@Override
public void afterSingletonsInstantiated() {// 扫描xxl-job注解,进行任务加载和管理initJobHandlerMethodRepository(applicationContext);// refresh GlueFactoryGlueFactory.refreshInstance(1);// super starttry {super.start();} catch (Exception e) {throw new RuntimeException(e);}
}
initJobHandlerMethodRepository方法主要如下

该方法主要是通过获取Spring管理的容器bean,然后扫描带有xxljob注解的方法,将他们保存在jobHandlerRepository对象中

private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {if (applicationContext == null) {return;}// 扫描Spring管理的beanString[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);for (String beanDefinitionName : beanDefinitionNames) {Object bean = applicationContext.getBean(beanDefinitionName);Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBeantry {annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),new MethodIntrospector.MetadataLookup<XxlJob>() {@Overridepublic XxlJob inspect(Method method) {// 获取注解为XxlJob的方法,并保存在annotatedMethodsreturn AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);}});} catch (Throwable ex) {logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);}if (annotatedMethods==null || annotatedMethods.isEmpty()) {continue;}//获取方法属性,并存储在jobHandlerRepository对象中for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {Method method = methodXxlJobEntry.getKey();XxlJob xxlJob = methodXxlJobEntry.getValue();if (xxlJob == null) {continue;}String name = xxlJob.value();if (name.trim().length() == 0) {throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");}if (loadJobHandler(name) != null) {throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");}// execute methodif (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +"The correct method format like \" public ReturnT<String> execute(String param) \" .");}if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +"The correct method format like \" public ReturnT<String> execute(String param) \" .");}method.setAccessible(true);// init and destoryMethod initMethod = null;Method destroyMethod = null;if (xxlJob.init().trim().length() > 0) {try {initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());initMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");}}if (xxlJob.destroy().trim().length() > 0) {try {destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());destroyMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");}}// registry jobhandler//将任务存储在jobHandlerRepository对象中,后续下发任务使用registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));}}
}

之后执行super.start(),执行父类XxlJobExecutor的start方法。该方法主要有日志初始化,日志清理任务初始化,RPC调用触发器回调线程启动,调度中心列表初始化以及执行器端口初始化。

public void start() throws Exception { // init logpath//初始化任务执行日志路径XxlJobFileAppender.initLogPath(logPath);// 日志定时清理任务JobLogFileCleanThread.getInstance().start(logRetentionDays);// 初始化触发器回调线程(用RPC回调调度中心接口)TriggerCallbackThread.getInstance().start();//初始化调度中心列表initAdminBizList( adminAddresses,  accessToken);// init executor-server  执行器端口启动initEmbedServer(address, ip, port, appname, accessToken);
}
 XxlJobFileAppender.initLogPath(logPath)和JobLogFileCleanThread.getInstance().start(logRetentionDays)主要对执行日志进行初始化,就不多解释了,直接往下看。
TriggerCallbackThread.getInstance().start();
public void start() {// 调度中心注册表会否为空if (XxlJobExecutor.getAdminBizList() == null) {logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");return;}// callbacktriggerCallbackThread = new Thread(new Runnable() {@Overridepublic void run() {// 监听阻塞队列while(!toStop){try {HandleCallbackParam callback = getInstance().callBackQueue.take();if (callback != null) {// 组装callback返回的参数List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);callbackParamList.add(callback);// 执行回调if (callbackParamList!=null && callbackParamList.size()>0) {doCallback(callbackParamList);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}// last callbacktry {List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);if (callbackParamList!=null && callbackParamList.size()>0) {doCallback(callbackParamList);}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");}});triggerCallbackThread.setDaemon(true);triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");triggerCallbackThread.start();// retrytriggerRetryCallbackThread = new Thread(new Runnable() {@Overridepublic void run() {while(!toStop){try {retryFailCallbackFile();} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");}});triggerRetryCallbackThread.setDaemon(true);triggerRetryCallbackThread.start();
}
 doCallback(callbackParamList)如下
/*** do callback, will retry if error* @param callbackParamList*/
private void doCallback(List<HandleCallbackParam> callbackParamList){boolean callbackRet = false;// 向所有的调度中心发送回调信息for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {//本质上是调用注册中心的api/callback接口。记录调用结果。ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");callbackRet = true;break;} else {callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);}} catch (Exception e) {callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());}}if (!callbackRet) {appendFailCallbackFile(callbackParamList);}
}
adminBiz.callback(callbackParamList) 

调用注册中心api接口

@Override
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
}
initAdminBizList( adminAddresses, accessToken); 初始化注册中心列表,用于后期和注册中心交互
//扫描xxl.job.admin.addresses配置,将他们加入注册中心列表adminBizList对象中。用于后期发送回调
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {if (adminAddresses!=null && adminAddresses.trim().length()>0) {for (String address: adminAddresses.trim().split(",")) {if (address!=null && address.trim().length()>0) {AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);if (adminBizList == null) {adminBizList = new ArrayList<AdminBiz>();}adminBizList.add(adminBiz);}}}
}
// init executor-server
initEmbedServer(address, ip, port, appname, accessToken);<核心>
//初始化xxljob执行器服务
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {//初始化ip和端口,如果没有ip则自动获取本地ipport = port>0?port: NetUtil.findAvailablePort(9999);ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();// generate addressif (address==null || address.trim().length()==0) {String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is nulladdress = "http://{ip_port}/".replace("{ip_port}", ip_port_address);}// 启动服务embedServer = new EmbedServer();embedServer.start(address, port, appname, accessToken);
}
embedServer.start(address, port, appname, accessToken); 本质上是一个Netty服务,标准的Netty服务启动,我们只看EmbedHttpServerHandler,Netty处理请求的handler
public void start(final String address, final int port, final String appname, final String accessToken) {executorBiz = new ExecutorBizImpl();thread = new Thread(new Runnable() {@Overridepublic void run() {// paramEventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(0,200,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");}});try {// start serverServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle.addLast(new HttpServerCodec()).addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));}}).childOption(ChannelOption.SO_KEEPALIVE, true);// bindChannelFuture future = bootstrap.bind(port).sync();logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);//注册到调度中心startRegistry(appname, address);// wait util stopfuture.channel().closeFuture().sync();} catch (InterruptedException e) {if (e instanceof InterruptedException) {logger.info(">>>>>>>>>>> xxl-job remoting server stop.");} else {logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);}} finally {// stoptry {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();} catch (Exception e) {logger.error(e.getMessage(), e);}}}});thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leavethread.start();
}
public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);private ExecutorBiz executorBiz; //执行器private String accessToken; //tokenprivate ThreadPoolExecutor bizThreadPool;//执行器线程池public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {this.executorBiz = executorBiz;this.accessToken = accessToken;this.bizThreadPool = bizThreadPool;}@Overrideprotected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {// request parse//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);String requestData = msg.content().toString(CharsetUtil.UTF_8);//获取请求数据String uri = msg.uri();HttpMethod httpMethod = msg.method();boolean keepAlive = HttpUtil.isKeepAlive(msg);String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);// invokebizThreadPool.execute(new Runnable() {@Overridepublic void run() {// 处理请求Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);// 格式化为JSONString responseJson = GsonTool.toJson(responseObj);// 写回客户端writeResponse(ctx, keepAlive, responseJson);}});}
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {// validif (HttpMethod.POST != httpMethod) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");}if (uri==null || uri.trim().length()==0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");}if (accessToken!=null&& accessToken.trim().length()>0&& !accessToken.equals(accessTokenReq)) {return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");}// services mappingtry {//接收注册中心请求接口处理if ("/beat".equals(uri)) {return executorBiz.beat();} else if ("/idleBeat".equals(uri)) {IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);return executorBiz.idleBeat(idleBeatParam);} else if ("/run".equals(uri)) { //注册中心执行接口TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);return executorBiz.run(triggerParam);} else if ("/kill".equals(uri)) {KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);return executorBiz.kill(killParam);} else if ("/log".equals(uri)) {LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);return executorBiz.log(logParam);} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");}} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));}
}
我们主要看下run方法的执行过程
@Override
public ReturnT<String> run(TriggerParam triggerParam) {// 根据jobid加载对应的job执行信息,第一次执行为nullJobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;//根绝jobThread获取job处理handlerString removeOldReason = null;// valid:jobHandler + jobThreadGlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); //获取任务类型if (GlueTypeEnum.BEAN == glueTypeEnum) {// new jobhandlerIJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());//获取任务的执行器// 校验新老job是否一致,不一致将老的进行初始化。有可能任务更新。通过jobid获取的是老的if (jobThread!=null && jobHandler != newJobHandler) {// change handler, need kill old threadremoveOldReason = "change jobhandler or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {jobHandler = newJobHandler; //将新处理handler赋值给老的if (jobHandler == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");}}} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {// valid old jobThreadif (jobThread != null &&!(jobThread.getHandler() instanceof GlueJobHandler&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {// change handler or gluesource updated, need kill old threadremoveOldReason = "change job source or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}if (jobHandler == null) {try {IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());}}} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {// valid old jobThreadif (jobThread != null &&!(jobThread.getHandler() instanceof ScriptJobHandler&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {// change script or gluesource updated, need kill old threadremoveOldReason = "change job source or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));}} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");}// executor block strategyif (jobThread != null) {ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {// discard when runningif (jobThread.isRunningOrHasQueue()) {return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());}} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {// kill running jobThreadif (jobThread.isRunningOrHasQueue()) {removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();jobThread = null;}} else {// just queue trigger}}// 如果jobThread 为null,则将任务信息注册到jobThreadRepository对象进行缓存,并启动线程if (jobThread == null) {jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}//将任务推送到自己现成的任务队列中区ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);return pushResult;
}
看下registJobThread方法,该方法主要是根据任务信息,创建一个jobThread,之后启动该线程。然后将其缓存到jobThreadRepository中。如果存在老的任务,则将老的任务停掉。
private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){JobThread newJobThread = new JobThread(jobId, handler);//创建新得jobThread对象newJobThread.start();//启动线程logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});//将新的jobThread放入map中,并弹出老的。JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!if (oldJobThread != null) {
//将老的停掉oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();}return newJobThread;
}

该线程执行如下,主要就是获取队列中的任务,然后通过新建FutureTask线程执行任务。之后将执行结果推到TriggerCallbackThread的队列中。通过TriggerCallbackThread推到任务调度中心,进行记录结果信息。

 @Override
public void run() {// inittry {handler.init();} catch (Throwable e) {logger.error(e.getMessage(), e);}// executewhile(!toStop){running = false;idleTimes++;TriggerParam triggerParam = null;ReturnT<String> executeResult = null;try {// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)//弹出队列任务triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);if (triggerParam!=null) {running = true;idleTimes = 0;triggerLogIdSet.remove(triggerParam.getLogId());// log filename, like "logPath/yyyy-MM-dd/9999.log"String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());XxlJobContext.setXxlJobContext(new XxlJobContext(triggerParam.getLogId(),logFileName,triggerParam.getBroadcastIndex(),triggerParam.getBroadcastTotal()));// executeXxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());if (triggerParam.getExecutorTimeout() > 0) {// limit timeoutThread futureThread = null;try {final TriggerParam triggerParamTmp = triggerParam;//创建futureTask线程任务,并执行FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {@Overridepublic ReturnT<String> call() throws Exception {return handler.execute(triggerParamTmp.getExecutorParams());}});futureThread = new Thread(futureTask);futureThread.start();//获取执行结果executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);} catch (TimeoutException e) {XxlJobLogger.log("<br>----------- xxl-job job execute timeout");XxlJobLogger.log(e);executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");} finally {futureThread.interrupt();}} else {// just executeexecuteResult = handler.execute(triggerParam.getExecutorParams());}if (executeResult == null) {executeResult = IJobHandler.FAIL;} else {executeResult.setMsg((executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000)?executeResult.getMsg().substring(0, 50000).concat("..."):executeResult.getMsg());executeResult.setContent(null);    // limit obj size}XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);} else {if (idleTimes > 30) {if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lostXxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");}}}} catch (Throwable e) {if (toStop) {XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);}StringWriter stringWriter = new StringWriter();e.printStackTrace(new PrintWriter(stringWriter));String errorMsg = stringWriter.toString();executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");} finally {//结果添加到回调队列中if(triggerParam != null) {// callback handler infoif (!toStop) {// commonmTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));} else {// is killedReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));}}}}// callback trigger request in queuewhile(triggerQueue !=null && triggerQueue.size()>0){TriggerParam triggerParam = triggerQueue.poll();if (triggerParam!=null) {// is killedReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));}}// destroytry {handler.destroy();} catch (Throwable e) {logger.error(e.getMessage(), e);}logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}

至此,整个XXlJOb的启动和接收请求的处理大致梳理完了。代码整体很简单。就不细致展开了。了解整个设计思想即可。

XXL-Job启动源码详解相关推荐

  1. 源码详解Android 9.0(P) 系统启动流程之SystemServer

    源码详解Android 9.0(P) 系统启动流程目录: 源码详解Android 9.0(P)系统启动流程之init进程(第一阶段) 源码详解Android 9.0(P)系统启动流程之init进程(第 ...

  2. cocos android-1,Cocos2D-Android-1之源码详解:5.Box2dTest

    Cocos2D-Android-1之源码详解:5.Box2dTest 发布时间:2020-08-06 06:19:28 来源:51CTO 阅读:398 作者:abab99 package org.co ...

  3. Android编程之Intent源码详解

    Intent源码详解,直接开始入题: Intent源码6700多行代码,但真正核心代码 就那么几百行,大部分都用来定义常量字符串了 先来看一下 public class Intent implemen ...

  4. AidLux“换脸”案例源码详解 (Python)

    "换脸"案例源码详解 (Python) faceswap_gui.py用于换脸,可与facemovie_gui.py身体互换源码(上一篇文章)对照观看 打开faceswap_gui ...

  5. 【JAVA秘籍心法篇-Spring】Spring XML解析源码详解

    [JAVA秘籍心法篇-Spring]Spring XML解析源码详解 所谓天下武功,无坚不摧,唯快不破.但有又太极拳法以快制慢,以柔克刚.武功外式有拳打脚踢,刀剑棍棒,又有内功易筋经九阳神功.所有外功 ...

  6. Laravel5.5源码详解 -- Laravel-debugbar及使用elementUI-ajax的注意事项

    Laravel5.5源码详解 – Laravel-debugbar 及使用elementUI - ajax的注意事项 关于laravel对中间件的处理,请参中间件考另文, Laravel5.5源码详解 ...

  7. Android AR开发实践之七:OpenGLES相机预览背景绘制源码详解

    Android AR开发实践之七:OpenGLES相机预览背景绘制源码详解 目录 Android AR开发实践之七:OpenGLES相机预览背景绘制源码详解 一.OpenGL ES渲染管线 1.基本处 ...

  8. AidLux “人脸测试”案例源码详解

    "人脸检测"案例源码详解 testface.py用于进行人脸检测 构建APP框架和添加主要控件 人脸关键点识别的方法 打开人脸测试案例 1.在VScode中进入代码编辑状态. 2. ...

  9. Rocksdb Compaction源码详解(二):Compaction 完整实现过程 概览

    文章目录 1. 摘要 2. Compaction 概述 3. 实现 3.1 Prepare keys 过程 3.1.1 compaction触发的条件 3.1.2 compaction 的文件筛选过程 ...

最新文章

  1. 李彦宏被泼水背后,这些python AI发展的大事你都了解吗?
  2. 个人站立会议-----20181216
  3. Warning: Missing charsets in String to FontSet conversion
  4. php-fpm.conf 配置文件详解
  5. 一个屌丝程序猿的人生(三十九)
  6. 安装mysql8避坑指南_2019 MySQL 8 安全安装避坑指南-Go语言中文社区
  7. 2023成都精密光学展览会
  8. 计算机基金经理排名,2019年基金经理排行_2017年一季度 基金经理排行榜大揭秘 规模 盈利 经验...
  9. 新版MacBookPro风扇狂转的问题
  10. 全向轮三轮小车的搭建(一)
  11. Excel根据单元格内容分类并插入空行的方法
  12. 持续集成在Hexo自动化部署上的实践
  13. CST结果导入MATLAB,请教大家可以将CST结果调入到MATLAB中进行计算吗
  14. 囚徒困境困境_社会困境两部恐怖电影主演
  15. 我看电影阿凡达(Avatar)
  16. AddressBook 地址簿 (电话簿) 访问与修改
  17. 支持向量机SMO算法
  18. 湖北智禾教育:淘宝店铺详情页提高转化率该怎么做
  19. android登陆按钮图片素材,Android精美登录界面设计
  20. CentOS7挂载CentOS7 everything安装光盘和设置yum源为挂载的光盘

热门文章

  1. 硬盘、U盘、软盘之比较
  2. win7计算机二进制,二进制-系统爱好者
  3. eclipse 上传代码到github,我花费了一个月测的。就这样了
  4. 怎么设置android投屏 桌面程序,手机怎么投屏到电脑?
  5. kafka的安装和使用(详细版)
  6. HTML之设置背景、边框、边距和补白
  7. USB移动硬盘WinPE启动盘的制作方法
  8. css怎么两线合并,【2人回答】CAD中如何把两条线合并成一条线?-3D溜溜网
  9. GetLastError函数封装显示具体错误信息
  10. 网赚小项目,聊天挣钱,打字聊天就能挣钱的方法