quarts的缺点:

随机负载(for  update );不能分片

阻塞处理策略:

分片原理:

 for (int i = 0; i < group.getRegistryList().size(); i++) {// 同时给多个客户端发送命令processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());}

在代码中可以通过工具类 获得 当前 是第几个分片 n  执行任务:

取数据需要自己从SQL 中处理,保证同一条数据 不会被不同的执行器取到就可以

在 查询数据的时候,将  n 作为 参数

select * from my_job  where  mod(id,n) = #{n}  作为当前分片要执行的任务

select  * from   XXL_JOB_QRTZ_TRIGGER_INFO  where mod(sha1(id),3) = 1  ;select  * from   XXL_JOB_QRTZ_TRIGGER_INFO  where mod(id,3) = 1  ;

原理:

 执行器 怎么 将  线程的运行日志发送给 调度器? 执行器 会将 执行任务日志放到 自己的队列里,有一个线程会消费这个队列,并通过 http 请求 传给 调度器,调度器去 更新 日志表。

1.

 调度中心(注册中心): 当一个job 可以执行的时候,调度中心 通过 http 请求 将 任务 传给 某个 worker注册:HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();key  是 appName ;value  是: ip 端口 ;while 循环  ------> JobScheduleHelper : select * from xxl_job_lock where lock_name = 'schedule_lock' for update------> 系统当前时间 大于  jobInfo.getTriggerNextTime()------>  触发------>   更新下一次的触发时间jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());jobInfo.setTriggerNextTime(nextValidTime.getTime());  UPDATE xxl_job_infoSETtrigger_last_time = #{triggerLastTime},trigger_next_time = #{triggerNextTime},trigger_status = #{triggerStatus}WHERE id = #{id}   while 循环 里的伪代码如下:try  {Connection conn = null;Boolean connAutoCommit = null;PreparedStatement preparedStatement = null;// 注册中心(调度中心) 有多台机器,防止多个机器 同时给 一个 执行器发送 http 请求,此处需要加 悲观锁,// 同一时刻只能 有 一个 注册中心,给某一个  worker 发送任务select * from xxl_job_lock where lock_name = 'schedule_lock' for update // 获取 job 集合for (XxlJobInfo jobInfo: scheduleList) {// 触发   ThreadPoolExecutor 线程池 中 去执行任务(向客户端发送请求)------>   分片的话,一个时间 选择一台机器发送请求// 更新 xxl_job_info 中 ,下一次任务的执行时间(long 类型的)}} catch(Exception e){} finally(){{// commitif (conn != null) {try {// 释放锁conn.commit();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {conn.setAutoCommit(connAutoCommit);} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {conn.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}// close PreparedStatementif (null != preparedStatement) {try {preparedStatement.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}}}                                       调度中心:  通过 http 向 客户端发送 请求后(URI 为  /run)

客户端接收到请求后:

客户端在收到 服务端的执行任务指令后如何操作的?① 客户端引入了  xxl-job-core 包,这个包在 spring bean注册完之后,会有一个回调函数,将 bean 中 含有 @xxlJob 注解  的Component 和 method 方法 组装成一个 MethodJobHandler 对象同事以 @xxlJob 上的value 为key ,MethodJobHandler 实例 为value 放到一个 ConcurrentHashMap 中② 服务端的执行任务指令过来,会根据 服务端传来的 xxlJob 注解上的value 从 ConcurrentHashMap 获取对应的 Handler 对象③ 注册并开启一个工作线程,执行任务,并在 jobHandler 方法里 写入执行结果(不是return success ) ,而是调用官方的方法,通过 InhertableTheadLocal 将执行结果写入到线程私有变量里,并将finally 代码块将执行结果 推到  回调线程里    ④  回调线程 反馈任务运行结果,服务端接受到结果后根据日志id 更新日志 表

客户端在服务启动的时候,将被xxlJob 标记的method 和相关Component 组成对象,并保存到map里,供调用(有点类似策略模式)

// 伪代码如下:xxl job  core  项目 在 bean 注册到容器 里后,有一个回调方法(通过实现 SmartInitializingSingleton 接口实现,重写方法)回调方法的作用:获取 容器中 的bean ,主要是  方法被   @XxlJob 注解标记的  bean, // bean  spring 中的bean ,executeMethod 被 xxlJob 标注的方法,registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));// name  是  Bean 里 加在方法上的 XxlJob 注解public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);return jobHandlerRepository.put(name, jobHandler);}map 结构如下:private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();

反射调用目标方法:

MethodJobHandler extends IJobHandler 中有如下方法:@Overridepublic void execute() throws Exception {Class<?>[] paramTypes = method.getParameterTypes();if (paramTypes.length > 0) {method.invoke(target, new Object[paramTypes.length]);       // method-param can not be primitive-types} else {// 反射调用  被 xxlJob 标注的目标方法// method 是目标方法,target 是目标方法所在的beanmethod.invoke(target);}}

③ 注册并开启 工作线程

 public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){JobThread newJobThread = new JobThread(jobId, handler);// 开启新线程 newJobThread.start();logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});// 获取上传一次的 旧线程JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);  // putIfAbsent | oh my god, map's put method return the old value!!!if (oldJobThread != null) {// 通过变量 让线程 终止(线程执行完也算是终止)oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();}return newJobThread;}

工作线程:

public class JobThread extends Thread{private static Logger logger = LoggerFactory.getLogger(JobThread.class);private int jobId;// 封装了Bean 和目标方法 ,这样反射就可以 调用目标方法private IJobHandler handler;  private LinkedBlockingQueue<TriggerParam> triggerQueue;private Set<Long> triggerLogIdSet;      // avoid repeat trigger for the same TRIGGER_LOG_ID// 通过一个变量 让一个线程 结束运行,运行完就算线程销毁了(不会旧线程一直 while 循环)private volatile boolean toStop = false;private String stopReason;private boolean running = false;    // if running jobprivate int idleTimes = 0;         // idel timespublic JobThread(int jobId, IJobHandler handler) {this.jobId = jobId;this.handler = handler;// 每个线程 都有自己的私有 队列this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());}public IJobHandler getHandler() {return handler;}/*** new trigger to queue** @param triggerParam* @return*/public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {// avoid repeatif (triggerLogIdSet.contains(triggerParam.getLogId())) {logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());}triggerLogIdSet.add(triggerParam.getLogId());triggerQueue.add(triggerParam);return ReturnT.SUCCESS;}/*** kill job thread** @param stopReason*/public void toStop(String stopReason) {/*** Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),* 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;* 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;*/this.toStop = true;this.stopReason = stopReason;}/*** is running job* @return*/public boolean isRunningOrHasQueue() {return running || triggerQueue.size()>0;}

重点看下执行任务的 run 方法:

// 没有终止就一直运行while(!toStop){try{// log filename, like "logPath/yyyy-MM-dd/9999.log"String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());XxlJobContext xxlJobContext = new XxlJobContext(triggerParam.getJobId(),triggerParam.getExecutorParams(),logFileName,triggerParam.getBroadcastIndex(),triggerParam.getBroadcastTotal());// init job context//底层:  InheritableThreadLocal<XxlJobContext> contextHolder = new InheritableThreadLocal<XxlJobContext>()// 线程私有变量的传递  ; 在自定义的  jobHandler 方法里写的日志 要用官方的日志 组件  ,会将 日志内容写入到日志文件里// eg:他获取文件名称的方法  String logFileName = xxlJobContext.getJobLogFileName();XxlJobContext.setXxlJobContext(xxlJobContext);// executeXxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());// 有过期时间用 FutureTask  (这个可以设置超时时间)if (triggerParam.getExecutorTimeout() > 0) {// limit timeoutThread futureThread = null;try {FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {@Overridepublic Boolean call() throws Exception {// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);// 反射调用 目标方法handler.execute();return true;}});futureThread = new Thread(futureTask);futureThread.start();Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);} catch (TimeoutException e) {XxlJobHelper.log("<br>----------- xxl-job job execute timeout");XxlJobHelper.log(e);// handle resultXxlJobHelper.handleTimeout("job execute timeout ");} finally {futureThread.interrupt();}} else {// 没有过期时间// just execute//  反射调用 目标方法handler.execute();}}catch{}finally{if(triggerParam != null) {// callback handler infoif (!toStop) {//  往回调线程里 push 消息,通知执行完结果TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),// 这个是在哪里设置的值? 在  自己写的任务 xxJobHandler   方法里(底层 调用  InheritableThreadLocal 获取当前线程的私有属性,然后设置成功吗)// 代码如下://  if (exitValue == 0) {// default success//} else {//    XxlJobHelper.handleFail("command exit value("+exitValue+") is failed");//}XxlJobContext.getXxlJobContext().getHandleCode(),XxlJobContext.getXxlJobContext().getHandleMsg() ));} else {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_COCE_FAIL,stopReason + " [job running, killed]" ));}}}}

④ 回调线程 通过http 请求 反馈工作 线程 执行结果

说明:没有执行完任务后直接回调,而是  放到一个队列里,批量 反馈,减少了网络开销

主要属性和方法:

 LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>()// 添加执行结果public static void pushCallBack(HandleCallbackParam callback){getInstance().callBackQueue.add(callback);logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());}
 @Overridepublic void run() {// normal callbackwhile(!toStop){try {HandleCallbackParam callback = getInstance().callBackQueue.take();if (callback != null) {// callback list paramList<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);callbackParamList.add(callback);// callback, will retry if errorif (callbackParamList!=null && callbackParamList.size()>0) {// 通过 http 进行回调doCallback(callbackParamList);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}// last callbacktry {List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);if (callbackParamList!=null && callbackParamList.size()>0) {doCallback(callbackParamList);}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");}});继续进入:@Overridepublic ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);}

1.  注册中心收到 请求后的操作:

api/callback

public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {// 线程池callbackThreadPool.execute(new Runnable() {@Overridepublic void run() {for (HandleCallbackParam handleCallbackParam: callbackParamList) {ReturnT<String> callbackResult = callback(handleCallbackParam);logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}",(callbackResult.getCode()== ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult);}}});return ReturnT.SUCCESS;}

callBack 方法如下:

private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {// valid log itemXxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId());if (log == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found.");}if (log.getHandleCode() > 0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback.");     // avoid repeat callback, trigger child job etc}// handle msgStringBuffer handleMsg = new StringBuffer();if (log.getHandleMsg()!=null) {handleMsg.append(log.getHandleMsg()).append("<br>");}if (handleCallbackParam.getHandleMsg() != null) {handleMsg.append(handleCallbackParam.getHandleMsg());}// success, update    log  ;(说明:insert log 是 在服务端向客户端发送请求的时候插入的)log.setHandleTime(new Date());log.setHandleCode(handleCallbackParam.getHandleCode());log.setHandleMsg(handleMsg.toString());XxlJobCompleter.updateHandleInfoAndFinish(log);return ReturnT.SUCCESS;}

调度日志的显示:

点击 页面 执行日志,执行了两部操作:

首先从 调度器获取 日志id(日志文件的文件名 日志id.log ),日志所在执行器服务器信息  ② 通过页面 ajax 请求 访问执行器 日志文件(配置文件里配置的路径)

代码如下:

① 页面 获取日志id,执行器服务器信息 如下:

说明:

文件名称 和  日志id 有关:(每次执行完一个任务,就生成一个文件,而不是 所有日志文件堆积在一起 )

log filename: logPath/yyyy-MM-dd/9999.log

② 读取日志文件(说明:如果 调度中心和 执行器不在一台服务器上,得修改源码,将本地读取日志文件改为 远程读取日志文件)

@RequestMapping("/logDetailCat")@ResponseBodypublic ReturnT<LogResult> logDetailCat(String executorAddress, long triggerTime, int logId, int fromLineNum){try {ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(executorAddress);// 读取日志文件的内容ReturnT<LogResult> logResult = executorBiz.log(triggerTime, logId, fromLineNum);// is endif (logResult.getContent()!=null && logResult.getContent().getFromLineNum() > logResult.getContent().getToLineNum()) {XxlJobLog jobLog = xxlJobLogDao.load(logId);if (jobLog.getHandleCode() > 0) {logResult.getContent().setEnd(true);}}return logResult;} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<LogResult>(ReturnT.FAIL_CODE, e.getMessage());}}

读取方法:

    @Overridepublic ReturnT<LogResult> log(long logDateTim, int logId, int fromLineNum) {//获取日志名称,说明  9999 是 日志id    // log filename: logPath/yy yy-MM-dd/9999.logString logFileName = XxlJobFileAppender.makeLogFileName(new Date(logDateTim), logId);// 读取日志 LogResult logResult = XxlJobFileAppender.readLog(logFileName, fromLineNum);return new ReturnT<LogResult>(logResult);}

如何获取日志文件名称?

 public static String makeLogFileName(Date triggerDate, int logId) {// filePath/yyyy-MM-ddSimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");  // avoid concurrent problem, can not be staticFile logFilePath = new File(getLogPath(), sdf.format(triggerDate));if (!logFilePath.exists()) {logFilePath.mkdir();}// filePath/yyyy-MM-dd/9999.logString logFileName = logFilePath.getPath().concat(File.separator)// 拼接  logId 为日志名称.concat(String.valueOf(logId))//  日志后缀.concat(".log");return logFileName;}

读取日志的方法:

 public static LogResult readLog(String logFileName, int fromLineNum){// valid log fileif (logFileName==null || logFileName.trim().length()==0) {return new LogResult(fromLineNum, 0, "readLog fail, logFile not found", true);}// 创建文件对象File logFile = new File(logFileName);if (!logFile.exists()) {return new LogResult(fromLineNum, 0, "readLog fail, logFile not exists", true);}// read fileStringBuffer logContentBuffer = new StringBuffer();int toLineNum = 0;LineNumberReader reader = null;try {//reader = new LineNumberReader(new FileReader(logFile));reader = new LineNumberReader(new InputStreamReader(new FileInputStream(logFile), "utf-8"));String line = null;while ((line = reader.readLine())!=null) {toLineNum = reader.getLineNumber();     // [from, to], start as 1if (toLineNum >= fromLineNum) {// 读完一行就换行logContentBuffer.append(line).append("\n");}}} catch (IOException e) {logger.error(e.getMessage(), e);} finally {if (reader != null) {try {reader.close();} catch (IOException e) {logger.error(e.getMessage(), e);}}}// 返回结果集LogResult logResult = new LogResult(fromLineNum, toLineNum, logContentBuffer.toString(), false);return logResult;}

xxl-job 原理:相关推荐

  1. XXL-JOB核心源码解读及时间轮原理剖析

    你好,今天我想和你分享一下XXL-JOB的核心实现.如果你是XXL-JOB的用户,那么你肯定思考过它的实现原理:如果你还未接触过这个产品,那么可以通过本文了解一下. XXL-JOB的架构图(2.0版本 ...

  2. 说完了 xxl-job 的执行器原理,再来聊聊调度中心是如何调度任务的

    前言 在上一篇 xxl-job 执行器原理分析 一文中,我们提到了 xxl-job 框架中包含了两个核心模块:调度中心 和 执行器, 其中调度中心主要负责 任务的调度 , 而执行器负责 任务的执行, ...

  3. xxl-job的使用及简述原理

    文章目录 前言 1. 介绍 2. 部署篇 2.1. 初始化数据库 2.2. 部署调度中心 2.2.1 集群部署 2.3. 部署执行器 2.3.1 集群部署 3. 使用篇 3.1. 设置执行器 3.2. ...

  4. XxlJob(二) 负载均衡用法及实现原理详解

    目录 一.配置一个应用执行器 二.同一台机器上模拟负载均衡 1. 环境准备 2. 触发任务,选择轮询策略 3. 机器实例动态伸缩 三.负载均衡原理解析 1.  根据应用名查找地址列表 ​2. Exec ...

  5. xxl-job源码—调度器/执行器工作原理

    目录 一.架构图 1.1 功能架构图 2.2 任务调度工作原理 二.ER图 三.调度器 3.1 启动过程时序图 3.2 启动过程核心代码解析 3.2.1 启动初始化 3.2.2 执行器健康检查 3.2 ...

  6. xxl-job(v2.1.0 Release)执行器端的执行器自动注册原理

    (一)xxl-job介绍以及搭建过程 (二)xxl-job执行器端的执行器自动注册原理 (三)xxl-job调度器端的执行器自动注册原理 (四)xxl-job任务管理以及调度器端任务手动执行的原理 ( ...

  7. UUID的使用及其原理

    今天敲项目要用UUID,想起之前老师告诉UUID的使用,但没说具体的生成逻辑,于是我进行了百度 首先,UUID的使用: //生成随机的UUID String uuid = UUID.randomUUI ...

  8. etcd 笔记(01)— etcd 简介、特点、应用场景、常用术语、分布式 CAP 理论、分布式原理

    1. etcd 简介 etcd 官网定义: A highly-available key value store for shared configuration and service discov ...

  9. git原理及常见使用方法

    Git 原理入门-来自阮一峰 Git 是最流行的版本管理工具,也是程序员的必备技能之一. 即使天天使用它,很多人也未必了解它的原理.Git 为什么可以管理版本?git add.git commit这些 ...

  10. 微机原理—定时计数控制接口

    别看题目很高深,其实就是很简单的定时器和计数器而已. 通常用手机定个闹钟,就是定时器的使用. 工厂里通过传送带上安装传感器,传感器传输给计算机的信号用来计数. 这是一些很简单的应用,通过很小的一个芯片 ...

最新文章

  1. CodeGen CreateFile实用程序
  2. 专家解读下一代互联网创新模式,核心技术是根本
  3. 调用天气预报Web Service
  4. Oracle 常用查询
  5. js代码判断身份证号合法性
  6. input输入框汇总
  7. JEECG Framework 3.4.1 beta 版本发布
  8. 阿里云虚拟主机的使用,附幸运券领取
  9. php fpm高并发,php-fpm 高并发、502解决方案
  10. 对于无线网络经常掉线的问题
  11. 正弦函数泰勒展开c语言,C++ 学习笔记_0012_函数(泰勒展开式、三角函数表)
  12. ipad4使用教程 ipad mini使用技巧
  13. Java代码混淆工具-ProGuard
  14. AVR单片机及其编译软件
  15. 直流有刷电机H桥正反转调速原理及Matlab/Simulink仿真
  16. 解决Choregraphe无法连接虚拟Nao机器人(报错显示无法连接至NAOqi)以及Choregraphe闪退问题
  17. 航空航天科学与工程专业术语翻译
  18. host 修改rancher_搭建Rancher
  19. Python写得好,壁纸无烦恼!
  20. 苹果cms8.x 命令执行漏洞本地攻击演示

热门文章

  1. pdf转cad怎么弄_还在为cad转pdf烦恼吗?教你CAD批量转pdf
  2. 初级官方卡刷包精简 添加万能ROOT
  3. web项目接入指纹识别+识别过程信息推送
  4. 在桌面计算机找不到光盘驱动,驱动程序存放在Windows7系统光盘的哪 – 手机爱问...
  5. vb连接mysql教程视频_VB连接MYSQL数据的方法
  6. Windows10中同时安装MySQL5和MySQL8
  7. python之类和对象
  8. H.264 学习建议
  9. python机械数据分析_记一次小机器的 Python 大数据分析
  10. IDEA破解码(至2099)