文章目录

  • 1. 概念
    • 1. 定时任务的基本概念
    • 2. 定时任务的使用场景
    • 3. 原生定时任务缺陷有哪些缺陷?
    • 4. 基于当前 XXL-JOB 我们能做什么?
  • 2. 系统架构和整理流程
    • 2.1. 设计思想
    • 2.2. 架构图
    • 2.3. 执行流程
  • 3. 启动流程
    • 3.1. 服务器启动
    • 3.2. 客户端启动
  • 4. 服务注册
    • 1. 任务执行器
    • 2. 调度中心
  • 5. 主动触发
    • 1. 调度中心
    • 2. 任务执行器
  • 6. 自动触发
    • 1. 自动触发逻辑
    • 2. 时间轮线程
  • 7. 设计亮点
    • 1. 路由策略
    • 2. 注册中心
    • 3. 全异步化 & 轻量级
      • 1. 调度中心
      • 2. 任务执行器
      • 3. 异步化
      • 4. 轻量级
    • 4. 时间轮算法
      • 1. 是什么
      • 2. xxl-job实现

1. 概念

1. 定时任务的基本概念

程序为解决一个信息处理任务而预先编制的工作执行方案,这就是定时任务,核心组成如下:

  • 执行器:负责管理应用运行时环境,用于调度定时任务。
  • 任务:任务执行的流程,是一个类,具体的业务。
  • 触发器:按照某种时间规则,执行具体的调度任务。

2. 定时任务的使用场景

日常开发中,定时任务主要分为如下两种使用场景:

时间驱动:

  • 对账单、日结
  • 营销类短信
  • MQ定时检查生产失败的消息

数据驱动:

  • 异步数据交换
  • 数据同步

3. 原生定时任务缺陷有哪些缺陷?

分布式技术应用的时代,原生定时任务的缺陷显得更为突出。结合传统项目与分布式微服务的架构,思考总结如下,欢迎各位大神给与补充:

  • 不支持集群多节点部署,需要自己实现避免任务重复执行的问题。
  • 不支持分片任务,处理有序数据时,多机器分片执行任务处理不同数据。
  • 不支持动态调整,不重启服务的情况下修改任务的参数。
  • 没有报警机制,当任务失败后没有报警机制通知。
  • 不支持生命周期统一管理,如不重启服务情况下关闭、启动任务。
  • 不支持失败重试,出现异常后任务终结,不能根据状态控制任务重新执行。
  • 无法统计任务数据,当任务数据量大的时候,对于任务执行情况无法高效的统计执行情况。

4. 基于当前 XXL-JOB 我们能做什么?

  • 执行器 HA(分布式):天生支持任务分布式执行,无需自己实现。任务"执行器"支持集群部署,可保证任务执行 HA;
  • 调度中心 HA(中心式):调度中心相当于传统调度任务的触发器,调度采用中心式设计,“调度中心”自研调度组件并支持集群部署,可保证调度中心 HA;

2. 系统架构和整理流程

https://www.xuxueli.com/xxl-job/

2.1. 设计思想

  • 将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。
  • 将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。
  • 因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;

2.2. 架构图

2.3. 执行流程

3. 启动流程

3.1. 服务器启动


首先找到配置类 XxlJobAdminConfig, 可以发现该类实现 InitializingBean接口,这里直接看 afterPropertiesSet方法即可。

@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {// ---------------------- XxlJobScheduler ----------------------private XxlJobScheduler xxlJobScheduler;@Overridepublic void afterPropertiesSet() throws Exception {adminConfig = this;// 初始化xxljob调度器xxlJobScheduler = new XxlJobScheduler();xxlJobScheduler.init();}...
}
public void init() throws Exception {// init i18ninitI18n();// admin trigger pool start// 初始化触发器线程池JobTriggerPoolHelper.toStart();// admin registry monitor run/*** 30秒执行一次,维护注册表信息, 判断在线超时时间90s* 1. 删除90s未有心跳的执行器节点;jobRegistry* 2. 获取所有的注册节点,更新到jobGroup(执行器)*/JobRegistryHelper.getInstance().start();// admin fail-monitor run 运行事变监视器,主要失败发送邮箱,重试触发器JobFailMonitorHelper.getInstance().start();// admin lose-monitor run ( depend on JobTriggerPoolHelper )// 将丢失主机调度日志置为失败JobCompleteHelper.getInstance().start();// admin log report start  统计一些失败成功报表JobLogReportHelper.getInstance().start();// start-schedule  ( depend on JobTriggerPoolHelper )/*** 调度器执行任务(两个线程 + 线程池执行调度逻辑)* 1. 调度线程50s执行一次;查询5s秒内执行的任务,并按照不同逻辑执行* 2. 时间轮线程每1秒执行一次;时间轮算法,并向前跨一个时刻;*/JobScheduleHelper.getInstance().start();logger.info(">>>>>>>>> init xxl-job admin success.");}

3.2. 客户端启动


这里我们看XxlJobSpringExecutor,实现了 SmartInitializingSingleton 接口,实现该接口的当spring容器初始完成,调用afterSingletonsInstantiated()方法。紧接着执行监听器发送监听后,就会遍历所有的Bean然后初始化所有单例非懒加载的bean。实现DisposableBean当实例bean摧毁时调用destroy()方法。

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);// start@Overridepublic void afterSingletonsInstantiated() {// init JobHandler Repository/*initJobHandlerRepository(applicationContext);*/// init JobHandler Repository (for method) 初始化调度器资源管理器/*** ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();* handle名; Handler->MethodJobHandler(反射 Object、Bean、initMethod、destroyMethod)*/initJobHandlerMethodRepository(applicationContext);// refresh GlueFactoryGlueFactory.refreshInstance(1);// super start 启动try {super.start();} catch (Exception e) {throw new RuntimeException(e);}}
}

再看super.start()

 public void start() throws Exception {// init logpath 初始化日志目录,用来存储调度日志执行指令到磁盘XxlJobFileAppender.initLogPath(logPath);// init invoker, admin-client 初始化admin链接路径存储集合// 在AdminBizClient设置好addressUrl+accessTokeninitAdminBizList(adminAddresses, accessToken);// init JobLogFileCleanThread 清除过期日志(30天)// 根据存储路径目录的日志(目录名为时间),根据其目录时间进行删除,1天跑一次,守护线程JobLogFileCleanThread.getInstance().start(logRetentionDays);// init TriggerCallbackThread 回调调度中心任务执行状态TriggerCallbackThread.getInstance().start();// init executor-server  执行内嵌服务/*** 1. 使用netty开放端口,等待服务端调用* 2. 维护心跳时间到服务端(心跳30S)* 3. 向服务端申请剔除服务*/initEmbedServer(address, ip, port, appname, accessToken);}

4. 服务注册

1. 任务执行器

com.xxl.job.core.thread.ExecutorRegistryThread#start

    public void start(final String appname, final String address){...registryThread = new Thread(new Runnable() {@Overridepublic void run() {// registrywhile (!toStop) {try {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);// 遍历所有的调度中心for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registry(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {// 休眠30s,每30s执行一次if (!toStop) {TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);}} catch (InterruptedException e) {if (!toStop) {logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());}}}// registry remove// 线程终止后,主动断开连接try {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;...break;} else {logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {if (!toStop) {logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}...}});// 设置为守护线程registryThread.setDaemon(true);registryThread.setName("xxl-job, executor ExecutorRegistryThread");registryThread.start();}

再来看看其RPC调用,采用的是HTTP传输协议,并采用了JSON作为序列化。

 @Overridepublic ReturnT<String> registry(RegistryParam registryParam) {return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);}// 可以再细看 com.xxl.job.core.util.XxlJobRemotingUtil,postBody采用就是Http协议,GsonTool将对象转成JSON。

2. 调度中心

再看看调度中心如何接收任务执行器请求的;
JobApiController就为SpringMVC的Controller,负责接收请求映射

  @RequestMapping("/{uri}")@ResponseBody@PermissionLimit(limit=false)public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {// validif (!"POST".equalsIgnoreCase(request.getMethod())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");}if (uri==null || uri.trim().length()==0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");}if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null&& XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0&& !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) {return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");}// services mapping/*** 1. 更新调度日志状态;* 2. 当执行器执行成功并且存在有子任务时,触发执行子任务*/if ("callback".equals(uri)) {List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);return adminBiz.callback(callbackParamList);}// 服务注册else if ("registry".equals(uri)) {RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registry(registryParam);}// 服务下线else if ("registryRemove".equals(uri)) {RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registryRemove(registryParam);} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");}}
 public ReturnT<String> registry(RegistryParam registryParam) {// valid  校验if (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");}// async execute 异步注册registryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() { //更新修改时间int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());if (ret < 1) {//说明暂未数据,才新增            XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());// fresh  空实现freshGroupRegistryInfo(registryParam);}}});return ReturnT.SUCCESS;}

5. 主动触发

1. 调度中心

触发地址:com.xxl.job.admin.controller.JobInfoController#triggerJob

 @RequestMapping("/trigger")@ResponseBody//@PermissionLimit(limit = false)public ReturnT<String> triggerJob(int id, String executorParam, String addressList) {// force cover job param 设置默认值if (executorParam == null) {executorParam = "";}// 触发器类型,手动 ,重试次数,'执行器任务分片参数,格式如 1/2',任务参数,机器地址JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList);return ReturnT.SUCCESS;}
   public void addTrigger(final int jobId,final TriggerTypeEnum triggerType,final int failRetryCount,final String executorShardingParam,final String executorParam,final String addressList) {// choose thread pool  获取线程池ThreadPoolExecutor triggerPool_ = fastTriggerPool;// 获取超时次数AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);// 一分钟内超时10次,则采用慢触发器执行if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 mintriggerPool_ = slowTriggerPool;}// triggertriggerPool_.execute(new Runnable() {@Overridepublic void run() {long start = System.currentTimeMillis();try {// do trigger // 执行触发器XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);} catch (Exception e) {logger.error(e.getMessage(), e);} finally {// check timeout-count-map  更新成为下一分钟long minTim_now = System.currentTimeMillis()/60000;if (minTim != minTim_now) {minTim = minTim_now; // 当达到下一分钟则清除超时任务jobTimeoutCountMap.clear();}// incr timeout-count-maplong cost = System.currentTimeMillis()-start;if (cost > 500) {       // ob-timeout threshold 500ms// 执行时间超过500ms,则记录执行次数AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));if (timeoutCount != null) {timeoutCount.incrementAndGet();}}}}});}

注意当触发器在一分钟内超时10次,则采用慢触发器执行

 private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){// param 获取阻塞处理策略ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy// 获取路由策略,默认firstExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategyString shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;// 1、save log-id  保存执行日志XxlJobLog jobLog = new XxlJobLog();jobLog.setJobGroup(jobInfo.getJobGroup());jobLog.setJobId(jobInfo.getId());jobLog.setTriggerTime(new Date());XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());// 2、init trigger-paramTriggerParam triggerParam = new TriggerParam();triggerParam.setJobId(jobInfo.getId());triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());triggerParam.setExecutorParams(jobInfo.getExecutorParam());triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());triggerParam.setLogId(jobLog.getId());triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());triggerParam.setGlueType(jobInfo.getGlueType());triggerParam.setGlueSource(jobInfo.getGlueSource());triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());triggerParam.setBroadcastIndex(index);triggerParam.setBroadcastTotal(total);// 3、init address 获取触发器执行地址String address = null;ReturnT<String> routeAddressResult = null;if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {// 路由策略为分配广播if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {if (index < group.getRegistryList().size()) {address = group.getRegistryList().get(index);} else {address = group.getRegistryList().get(0);}} else {// 根据设置的路由策略,执行路由器,获取返回结果,这里用到了策略模式/***  1. ExecutorRouteFirst (第一个)固定选择第一个机器*  2. ExecutorRouteLast (最后一个)*  3. ExecutorRouteRound (轮询), 通过Map记录任务的执行次数进行取模*  4. ExecutorRouteRandom (随机)*  5. ExecutorRouteConsistentHash (一致性hash),每个jobId都会hash到指定的机器上,每次都会构建虚拟节点*  6. ExecutorRouteLFU (最不频繁使用,1天的使用频繁), 通过Map存储每个jobId在每个地址的使用次数,拿到最少使用地址;*  7. ExecutorRouteLRU (最近最久未使用), 通过LinkedHashMap accessOrder进行实现,其内部通过双向链表实现*  8. ExecutorRouteFailover(故障转移) 通过顺序遍历执行器地址,进行心跳检查*  9. ExecutorRouteBusyover(忙碌转移) 照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;*/routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}}} else {routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));}// 4、trigger remote executorReturnT<String> triggerResult = null;if (address != null) {// 已经获取到任务执行器地址,通过HTTP进行调度triggerResult = runExecutor(triggerParam, address);} else {triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);}// 5、collection trigger infoStringBuffer triggerMsgSb = new StringBuffer();triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":").append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());if (shardingParam != null) {triggerMsgSb.append("("+shardingParam+")");}triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>").append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");// 6、save log trigger-infojobLog.setExecutorAddress(address);jobLog.setExecutorHandler(jobInfo.getExecutorHandler());jobLog.setExecutorParam(jobInfo.getExecutorParam());jobLog.setExecutorShardingParam(shardingParam);jobLog.setExecutorFailRetryCount(finalFailRetryCount);//jobLog.setTriggerTime();jobLog.setTriggerCode(triggerResult.getCode());jobLog.setTriggerMsg(triggerMsgSb.toString());XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());}

2. 任务执行器

com.xxl.job.core.server.EmbedServer#process()

 private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {// validif (HttpMethod.POST != httpMethod) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");}if (uri == null || uri.trim().length() == 0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");}if (accessToken != null&& accessToken.trim().length() > 0&& !accessToken.equals(accessTokenReq)) {return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");}// services mappingtry {switch (uri) {case "/beat":return executorBiz.beat();case "/idleBeat":IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);return executorBiz.idleBeat(idleBeatParam);// 触发执行器case "/run":TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);return executorBiz.run(triggerParam);case "/kill":KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);return executorBiz.kill(killParam);case "/log":LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);return executorBiz.log(logParam);default:return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");}} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));}}

JobThread执行调度逻辑
com.xxl.job.core.thread.JobThread#start()

 @Overridepublic void run() {// inittry {// 执行初始化方法(初始化连接池等信息,一个job只能执行一次)handler.init();} catch (Throwable e) {logger.error(e.getMessage(), e);}// executewhile(!toStop){running = false;idleTimes++;TriggerParam triggerParam = null;try {// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)// 从队列中获取调度日志triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);if (triggerParam!=null) {running = true;idleTimes = 0;triggerLogIdSet.remove(triggerParam.getLogId());// log filename, like "logPath/yyyy-MM-dd/9999.log" 写入log文件String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());XxlJobContext xxlJobContext = new XxlJobContext(triggerParam.getJobId(),triggerParam.getExecutorParams(),logFileName,triggerParam.getBroadcastIndex(),triggerParam.getBroadcastTotal());// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);// executeXxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());// 设置了超时就异步线程处理(FutureTask设置超时时间)if (triggerParam.getExecutorTimeout() > 0) {// limit timeoutThread futureThread = null;try {FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {@Overridepublic Boolean call() throws Exception {// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);handler.execute();return true;}});futureThread = new Thread(futureTask);futureThread.start();Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);} catch (TimeoutException e) {XxlJobHelper.log("<br>----------- xxl-job job execute timeout");XxlJobHelper.log(e);// handle resultXxlJobHelper.handleTimeout("job execute timeout ");} finally {futureThread.interrupt();}} else {// just execute// 反射,invoke handler; 没设置超时时间,则立刻执行触发器handler.execute();}// 记录执行日志// valid execute handle dataif (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {XxlJobHelper.handleFail("job handle result lost.");} else {String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)?tempHandleMsg.substring(0, 50000).concat("..."):tempHandleMsg;XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);}XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="+ XxlJobContext.getXxlJobContext().getHandleCode()+ ", handleMsg = "+ XxlJobContext.getXxlJobContext().getHandleMsg());} else {// 空闲执行次数超过30次,且队列没任务,则删除并终止线程if (idleTimes > 30) {if(triggerQueue.size() == 0) {    // avoid concurrent trigger causes jobId-lostXxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");}}}}/*** 当任务调度有异常时,捕捉异常,通过XxlJobHelper.handleFail(errorMsg)设置失败;* 所以当JobHandler处理业务逻辑时,记得抛出异常*/catch (Throwable e) {if (toStop) {XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);}// handle resultStringWriter stringWriter = new StringWriter();e.printStackTrace(new PrintWriter(stringWriter));String errorMsg = stringWriter.toString();XxlJobHelper.handleFail(errorMsg);XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");} finally {if(triggerParam != null) {// callback handler info// 添加回调队列if (!toStop) {// commonmTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.getXxlJobContext().getHandleCode(),XxlJobContext.getXxlJobContext().getHandleMsg() ));} else {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason + " [job running, killed]" ));}}}}

6. 自动触发


1. 自动触发逻辑

com.xxl.job.admin.core.thread.JobScheduleHelper#start()
scheduleThread定时线程

scheduleThread = new Thread(new Runnable() {@Overridepublic void run() {try {// 保证5秒执行一次TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );} catch (InterruptedException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>> init xxl-job admin scheduler success.");// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)// 每秒处理20个任务,200个线程int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;while (!scheduleThreadToStop) {// Scan Joblong start = System.currentTimeMillis();Connection conn = null;Boolean connAutoCommit = null;PreparedStatement preparedStatement = null;boolean preReadSuc = true;try {conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();connAutoCommit = conn.getAutoCommit();conn.setAutoCommit(false);//获取任务调度锁表内数据信息,加写锁(分布式锁)preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );preparedStatement.execute();// tx start// 1、pre readlong nowTime = System.currentTimeMillis();//获取当前时间后5秒,同时最多负载的分页数List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);if (scheduleList!=null && scheduleList.size()>0) {// 2、push time-ringfor (XxlJobInfo jobInfo: scheduleList) {// time-ring jump(时间轮)if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {// 2.1、trigger-expire > 5s:pass && make next-trigger-timelogger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());// 1、misfire match/*** 调度过期策略:*   - 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;*   - 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;*/MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {// FIRE_ONCE_NOW 》 triggerJobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );}// 2、fresh next 更新下次执行时间refreshNextValidTime(jobInfo, new Date());} else if (nowTime > jobInfo.getTriggerNextTime()) {// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time// 1、trigger// 执行触发器JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );// 2、fresh next 更新下次执行时间refreshNextValidTime(jobInfo, new Date());// next-trigger-time in 5s, pre-read again 下次触发时间在当前时间往后5秒范围内if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {// 1、make ring second 获取下次执行秒int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、push time ring 添加到时间轮pushTimeRing(ringSecond, jobInfo.getId());// 3、fresh next 更新下次执行时间refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time// 未来五秒以内执行的所有任务添加到ringData// 1、make ring secondint ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、push time ring 添加到时间轮pushTimeRing(ringSecond, jobInfo.getId());// 3、fresh next 更新下次执行时间refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}// 3、update trigger info 更新执行时间和上次执行时间到数据库for (XxlJobInfo jobInfo: scheduleList) {XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);}} else {preReadSuc = false;}// tx stop} catch (Exception e) {if (!scheduleThreadToStop) {logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);}} finally {// commitif (conn != null) {try {conn.commit();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {conn.setAutoCommit(connAutoCommit);} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {conn.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}// close PreparedStatementif (null != preparedStatement) {try {preparedStatement.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}}long cost = System.currentTimeMillis()-start;// Wait seconds, align secondif (cost < 1000) {  // scan-overtime, not waittry {// pre-read period: success > scan each second; fail > skip this period;TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);} catch (InterruptedException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");}});scheduleThread.setDaemon(true);scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");scheduleThread.start();

2. 时间轮线程

com.xxl.job.admin.core.thread.JobScheduleHelper#start()
ringThread时间轮线程

// ring thread(时间轮线程)ringThread = new Thread(new Runnable() {@Overridepublic void run() {while (!ringThreadToStop) {// align secondtry {TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);} catch (InterruptedException e) {if (!ringThreadToStop) {logger.error(e.getMessage(), e);}}try {// second dataList<Integer> ringItemData = new ArrayList<>();int nowSecond = Calendar.getInstance().get(Calendar.SECOND);// 避免处理耗时太长,跨过刻度,向前校验一个刻度;for (int i = 0; i < 2; i++) {List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );if (tmpData != null) {ringItemData.addAll(tmpData);}}// ring triggerlogger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );if (ringItemData.size() > 0) {// do triggerfor (int jobId: ringItemData) {// do trigger// 执行触发器;逻辑就跟主动触发是一致的了。JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);}// clearringItemData.clear();}} catch (Exception e) {if (!ringThreadToStop) {logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);}}}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");}});ringThread.setDaemon(true);ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");ringThread.start();

7. 设计亮点

1. 路由策略

  1. 路由策略使用了 策略设计模式,根据选择的策略去获取对应的调度中心地址;
  2. 支持了首个、最后、随机、轮询、一致性hash、LRU、LFU、故障转移、忙碌转移、分配广播;

2. 注册中心

  1. 续期线程每30秒对任务执行器进行续期
  2. 过期线程每30秒把90未续期的任务执行器移除;

3. 全异步化 & 轻量级

1. 调度中心

  1. 调度任务:线程定时获取要执行的任务,并交给调度线程池异步调用;
  2. 心跳: 新开线程清理过期的任务执行器;
  3. 失败任务:线程重试并告警;

2. 任务执行器

  1. 执行任务: 每个job任务都有各自jobThread从队列中获取;
  2. 回调: 有两个线程 回调和重试线程,负责向xxlAdmin回调任务执行状态;
  3. 心跳: 新开线程每隔30s进行续期

3. 异步化

  1. 异步调用:交给线程池进行异步调用任务给任务执行器
  2. 异步执行:任务执行器每个job都有各自的线程,并异步回调给xxlAdmin;

4. 轻量级

  1. 架构上非常的轻,基本通过Mysql实现了分布式锁、注册中心、任务调度等功能,只需依赖Mysql + Java;
  2. 在全异步化的基础上,单个JOB一次运行平均耗时基本在 “10ms” 之内(基本为一次请求的网络开销),可以保证使用有限的线程支撑大量的JOB并发运行;
  3. 官方文档表示,在理论的调度中心下,单机能支撑5000任务并发;
  4. 如何提高性能:1. 机器上;2. 不同业务进行区分; 3. 修改源码(不同的xxl-job集群处理不同的job)

4. 时间轮算法

1. 是什么

  1. 时间轮方案将现实生活中的时钟概念引入到软件设计中,主要思路是定义一个时钟周期(比如时钟的12小时)和步长(比如时钟的一秒走一次),当指针每走一步的时候,会获取当前时钟刻度上挂载的任务并执行。
  2. 一个环形数组存储时间轮的所有槽(看你的手表),每个槽对应当前时间轮的最小精度
  3. 超过当前时间轮最大表示范围的会被丢到上层时间轮,上层时间轮的最小精度即为下层时间轮能表达的最大时间(时分秒概念)
  4. 每个槽对应一个环形链表存储该时间应该被执行的任务
  5. 需要一个线程去驱动指针运转,获取到期任务

2. xxl-job实现

  1. xxl-job的时间环只会存储之后5s内执行的任务,使用一个Map<Interger, List>进行存储;
  2. Map的key为执行时间的秒数%60,value为这个秒执行的jobIdList;
  3. 时间轮线程每1秒执行一次,从时间轮从获取到jobIdList,最后进行调度任务;

xxl-job源码解析(技术分享)相关推荐

  1. MyBatis 源码分析-技术分享

    2019独角兽企业重金招聘Python工程师标准>>> MyBatis 源码分析 MyBatis的主要成员 Configuration MyBatis所有的配置信息都保存在Confi ...

  2. java 并发框架源码_Java并发编程高阶技术-高性能并发框架源码解析与实战

    Java并发编程高阶技术-高性能并发框架源码解析与实战 1 _0 Z' @+ l: s3 f6 r% t|____资料3 Z9 P- I2 x8 T6 ^ |____coding-275-master ...

  3. Android技术栈--HashMap和ArrayMap源码解析

    1 总览 WARNING!!:本文字数较多,内容较为完整并且部分内容难度较大,阅读本文需要较长时间,建议读者分段并耐心阅读. 本文会对 Android 中常用的数据结构进行源码解析,包括 HashMa ...

  4. H.264压缩技术之视频基础(foundation of learning video)——Matlab源码解析

    前言 为了后续能更好的理解,I帧编码与P帧编码,所以笔者先对数字视频中的一些基础概念进行铺垫.后续比较复杂的帧内预测,与帧间预测理解起来就会相对容易些. 关于Matlab中h.264的main函数部分 ...

  5. Android技术栈(五)HashMap(包括红黑树)与ArrayMap源码解析

    1 总览 本文会对 Android 中常用HashMap(有红黑树)和ArrayMap进行源码解析,其中 HashMap 源码来自 Android Framework API 28 (JDK=1.8) ...

  6. openGauss数据库源码解析系列文章—— AI技术之“自调优”

    上一篇介绍了第七章执行器解析中"7.6 向量化引擎"及"7.7 小结"的相关内容,本篇我们开启第八章 AI技术中"8.1 概述"及" ...

  7. VVeboTableView 源码解析

    原文链接:http://www.jianshu.com/p/78027a3a2c41 最近在看一些 iOS 性能优化的文章,我找到了 VVeboTableView 这个框架.严格来说这个不属于框架,而 ...

  8. Laravel5.2之Filesystem源码解析(下)

    2019独角兽企业重金招聘Python工程师标准>>> 说明:本文主要学习下\League\Flysystem这个Filesystem Abstract Layer,学习下这个pac ...

  9. ConcurrentHashMap源码解析(1)

    此文已由作者赵计刚授权网易云社区发布. 欢迎访问网易云社区,了解更多网易技术产品运营经验. 注:在看这篇文章之前,如果对HashMap的层不清楚的话,建议先去看看HashMap源码解析. http:/ ...

最新文章

  1. Android开发之使用BroadcastReceiver实现开机自动启动(源代码分享)
  2. 可能大家都能跑通的ignite的HelloWorld
  3. k8s集群DNS无法解析问题的处理过程
  4. 一个对象,数组去重的方法
  5. WPF 左侧菜单样式
  6. iOS多线程:『pthread、NSThread』详尽总结
  7. table中的td内容过长显示为固定长度,多余部分用省略号代替
  8. 20145239杜文超 《Java程序设计》第7周学习总结
  9. Power Desiner逆向生成pdm
  10. 什么样的人适合学编程?
  11. 前端实现——html2pdf功能(完成)
  12. 8款逆天的在线实用工具
  13. java ssl 双向认证_Java实现SSL双向认证的方法
  14. rasp 系统_一类PHP RASP实现
  15. 移动地图定位软件完成了
  16. 倡议书格式范文_写倡议书的格式及范文
  17. 读“王东升 新时空 硅碳融合的产业革命”拙见
  18. LightSensor传感器
  19. 网易云音乐解除灰色小工具 - 资源
  20. jquery easyui中文培训文档

热门文章

  1. 【react+umi】国际化配置:浏览器默认英文,如何让工程默认语言为中文?
  2. 一文了解什么是嵌入式?
  3. 拿图就走系列之《深入理解java虚拟机》
  4. 车金融|我在M公司的那两年
  5. 如何设置计算机自动连接宽带,宽带连接怎么设置,怎么设置宽带自动连接
  6. 服务端高并发分布式架构演进
  7. Visio中旋转文本框与箭头平行
  8. 软件测试论文参考文献
  9. 使用HTML实现树形结构
  10. 《深入理解分布式事务》,初识分布式......