xxl-job 原理:
quarts的缺点:
随机负载(for update );不能分片
阻塞处理策略:
分片原理:
for (int i = 0; i < group.getRegistryList().size(); i++) {// 同时给多个客户端发送命令processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());}
在代码中可以通过工具类 获得 当前 是第几个分片 n 执行任务:
取数据需要自己从SQL 中处理,保证同一条数据 不会被不同的执行器取到就可以
在 查询数据的时候,将 n 作为 参数
select * from my_job where mod(id,n) = #{n} 作为当前分片要执行的任务
select * from XXL_JOB_QRTZ_TRIGGER_INFO where mod(sha1(id),3) = 1 ;select * from XXL_JOB_QRTZ_TRIGGER_INFO where mod(id,3) = 1 ;
原理:
执行器 怎么 将 线程的运行日志发送给 调度器? 执行器 会将 执行任务日志放到 自己的队列里,有一个线程会消费这个队列,并通过 http 请求 传给 调度器,调度器去 更新 日志表。
1.
调度中心(注册中心): 当一个job 可以执行的时候,调度中心 通过 http 请求 将 任务 传给 某个 worker注册:HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();key 是 appName ;value 是: ip 端口 ;while 循环 ------> JobScheduleHelper : select * from xxl_job_lock where lock_name = 'schedule_lock' for update------> 系统当前时间 大于 jobInfo.getTriggerNextTime()------> 触发------> 更新下一次的触发时间jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());jobInfo.setTriggerNextTime(nextValidTime.getTime()); UPDATE xxl_job_infoSETtrigger_last_time = #{triggerLastTime},trigger_next_time = #{triggerNextTime},trigger_status = #{triggerStatus}WHERE id = #{id} while 循环 里的伪代码如下:try {Connection conn = null;Boolean connAutoCommit = null;PreparedStatement preparedStatement = null;// 注册中心(调度中心) 有多台机器,防止多个机器 同时给 一个 执行器发送 http 请求,此处需要加 悲观锁,// 同一时刻只能 有 一个 注册中心,给某一个 worker 发送任务select * from xxl_job_lock where lock_name = 'schedule_lock' for update // 获取 job 集合for (XxlJobInfo jobInfo: scheduleList) {// 触发 ThreadPoolExecutor 线程池 中 去执行任务(向客户端发送请求)------> 分片的话,一个时间 选择一台机器发送请求// 更新 xxl_job_info 中 ,下一次任务的执行时间(long 类型的)}} catch(Exception e){} finally(){{// commitif (conn != null) {try {// 释放锁conn.commit();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {conn.setAutoCommit(connAutoCommit);} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {conn.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}// close PreparedStatementif (null != preparedStatement) {try {preparedStatement.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}}} 调度中心: 通过 http 向 客户端发送 请求后(URI 为 /run)
客户端接收到请求后:
客户端在收到 服务端的执行任务指令后如何操作的?① 客户端引入了 xxl-job-core 包,这个包在 spring bean注册完之后,会有一个回调函数,将 bean 中 含有 @xxlJob 注解 的Component 和 method 方法 组装成一个 MethodJobHandler 对象同事以 @xxlJob 上的value 为key ,MethodJobHandler 实例 为value 放到一个 ConcurrentHashMap 中② 服务端的执行任务指令过来,会根据 服务端传来的 xxlJob 注解上的value 从 ConcurrentHashMap 获取对应的 Handler 对象③ 注册并开启一个工作线程,执行任务,并在 jobHandler 方法里 写入执行结果(不是return success ) ,而是调用官方的方法,通过 InhertableTheadLocal 将执行结果写入到线程私有变量里,并将finally 代码块将执行结果 推到 回调线程里 ④ 回调线程 反馈任务运行结果,服务端接受到结果后根据日志id 更新日志 表
客户端在服务启动的时候,将被xxlJob 标记的method 和相关Component 组成对象,并保存到map里,供调用(有点类似策略模式)
// 伪代码如下:xxl job core 项目 在 bean 注册到容器 里后,有一个回调方法(通过实现 SmartInitializingSingleton 接口实现,重写方法)回调方法的作用:获取 容器中 的bean ,主要是 方法被 @XxlJob 注解标记的 bean, // bean spring 中的bean ,executeMethod 被 xxlJob 标注的方法,registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));// name 是 Bean 里 加在方法上的 XxlJob 注解public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);return jobHandlerRepository.put(name, jobHandler);}map 结构如下:private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
反射调用目标方法:
MethodJobHandler extends IJobHandler 中有如下方法:@Overridepublic void execute() throws Exception {Class<?>[] paramTypes = method.getParameterTypes();if (paramTypes.length > 0) {method.invoke(target, new Object[paramTypes.length]); // method-param can not be primitive-types} else {// 反射调用 被 xxlJob 标注的目标方法// method 是目标方法,target 是目标方法所在的beanmethod.invoke(target);}}
③ 注册并开启 工作线程
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){JobThread newJobThread = new JobThread(jobId, handler);// 开启新线程 newJobThread.start();logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});// 获取上传一次的 旧线程JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!if (oldJobThread != null) {// 通过变量 让线程 终止(线程执行完也算是终止)oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();}return newJobThread;}
工作线程:
public class JobThread extends Thread{private static Logger logger = LoggerFactory.getLogger(JobThread.class);private int jobId;// 封装了Bean 和目标方法 ,这样反射就可以 调用目标方法private IJobHandler handler; private LinkedBlockingQueue<TriggerParam> triggerQueue;private Set<Long> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID// 通过一个变量 让一个线程 结束运行,运行完就算线程销毁了(不会旧线程一直 while 循环)private volatile boolean toStop = false;private String stopReason;private boolean running = false; // if running jobprivate int idleTimes = 0; // idel timespublic JobThread(int jobId, IJobHandler handler) {this.jobId = jobId;this.handler = handler;// 每个线程 都有自己的私有 队列this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());}public IJobHandler getHandler() {return handler;}/*** new trigger to queue** @param triggerParam* @return*/public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {// avoid repeatif (triggerLogIdSet.contains(triggerParam.getLogId())) {logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());}triggerLogIdSet.add(triggerParam.getLogId());triggerQueue.add(triggerParam);return ReturnT.SUCCESS;}/*** kill job thread** @param stopReason*/public void toStop(String stopReason) {/*** Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),* 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;* 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;*/this.toStop = true;this.stopReason = stopReason;}/*** is running job* @return*/public boolean isRunningOrHasQueue() {return running || triggerQueue.size()>0;}
重点看下执行任务的 run 方法:
// 没有终止就一直运行while(!toStop){try{// log filename, like "logPath/yyyy-MM-dd/9999.log"String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());XxlJobContext xxlJobContext = new XxlJobContext(triggerParam.getJobId(),triggerParam.getExecutorParams(),logFileName,triggerParam.getBroadcastIndex(),triggerParam.getBroadcastTotal());// init job context//底层: InheritableThreadLocal<XxlJobContext> contextHolder = new InheritableThreadLocal<XxlJobContext>()// 线程私有变量的传递 ; 在自定义的 jobHandler 方法里写的日志 要用官方的日志 组件 ,会将 日志内容写入到日志文件里// eg:他获取文件名称的方法 String logFileName = xxlJobContext.getJobLogFileName();XxlJobContext.setXxlJobContext(xxlJobContext);// executeXxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());// 有过期时间用 FutureTask (这个可以设置超时时间)if (triggerParam.getExecutorTimeout() > 0) {// limit timeoutThread futureThread = null;try {FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {@Overridepublic Boolean call() throws Exception {// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);// 反射调用 目标方法handler.execute();return true;}});futureThread = new Thread(futureTask);futureThread.start();Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);} catch (TimeoutException e) {XxlJobHelper.log("<br>----------- xxl-job job execute timeout");XxlJobHelper.log(e);// handle resultXxlJobHelper.handleTimeout("job execute timeout ");} finally {futureThread.interrupt();}} else {// 没有过期时间// just execute// 反射调用 目标方法handler.execute();}}catch{}finally{if(triggerParam != null) {// callback handler infoif (!toStop) {// 往回调线程里 push 消息,通知执行完结果TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),// 这个是在哪里设置的值? 在 自己写的任务 xxJobHandler 方法里(底层 调用 InheritableThreadLocal 获取当前线程的私有属性,然后设置成功吗)// 代码如下:// if (exitValue == 0) {// default success//} else {// XxlJobHelper.handleFail("command exit value("+exitValue+") is failed");//}XxlJobContext.getXxlJobContext().getHandleCode(),XxlJobContext.getXxlJobContext().getHandleMsg() ));} else {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_COCE_FAIL,stopReason + " [job running, killed]" ));}}}}
④ 回调线程 通过http 请求 反馈工作 线程 执行结果
说明:没有执行完任务后直接回调,而是 放到一个队列里,批量 反馈,减少了网络开销
主要属性和方法:
LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>()// 添加执行结果public static void pushCallBack(HandleCallbackParam callback){getInstance().callBackQueue.add(callback);logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());}
@Overridepublic void run() {// normal callbackwhile(!toStop){try {HandleCallbackParam callback = getInstance().callBackQueue.take();if (callback != null) {// callback list paramList<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);callbackParamList.add(callback);// callback, will retry if errorif (callbackParamList!=null && callbackParamList.size()>0) {// 通过 http 进行回调doCallback(callbackParamList);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}// last callbacktry {List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);if (callbackParamList!=null && callbackParamList.size()>0) {doCallback(callbackParamList);}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");}});继续进入:@Overridepublic ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);}
1. 注册中心收到 请求后的操作:
api/callback
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {// 线程池callbackThreadPool.execute(new Runnable() {@Overridepublic void run() {for (HandleCallbackParam handleCallbackParam: callbackParamList) {ReturnT<String> callbackResult = callback(handleCallbackParam);logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}",(callbackResult.getCode()== ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult);}}});return ReturnT.SUCCESS;}
callBack 方法如下:
private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {// valid log itemXxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId());if (log == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found.");}if (log.getHandleCode() > 0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc}// handle msgStringBuffer handleMsg = new StringBuffer();if (log.getHandleMsg()!=null) {handleMsg.append(log.getHandleMsg()).append("<br>");}if (handleCallbackParam.getHandleMsg() != null) {handleMsg.append(handleCallbackParam.getHandleMsg());}// success, update log ;(说明:insert log 是 在服务端向客户端发送请求的时候插入的)log.setHandleTime(new Date());log.setHandleCode(handleCallbackParam.getHandleCode());log.setHandleMsg(handleMsg.toString());XxlJobCompleter.updateHandleInfoAndFinish(log);return ReturnT.SUCCESS;}
调度日志的显示:
点击 页面 执行日志,执行了两部操作:
首先从 调度器获取 日志id(日志文件的文件名 日志id.log ),日志所在执行器服务器信息 ② 通过页面 ajax 请求 访问执行器 日志文件(配置文件里配置的路径)
代码如下:
① 页面 获取日志id,执行器服务器信息 如下:
说明:
文件名称 和 日志id 有关:(每次执行完一个任务,就生成一个文件,而不是 所有日志文件堆积在一起 )
log filename: logPath/yyyy-MM-dd/9999.log
② 读取日志文件(说明:如果 调度中心和 执行器不在一台服务器上,得修改源码,将本地读取日志文件改为 远程读取日志文件)
@RequestMapping("/logDetailCat")@ResponseBodypublic ReturnT<LogResult> logDetailCat(String executorAddress, long triggerTime, int logId, int fromLineNum){try {ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(executorAddress);// 读取日志文件的内容ReturnT<LogResult> logResult = executorBiz.log(triggerTime, logId, fromLineNum);// is endif (logResult.getContent()!=null && logResult.getContent().getFromLineNum() > logResult.getContent().getToLineNum()) {XxlJobLog jobLog = xxlJobLogDao.load(logId);if (jobLog.getHandleCode() > 0) {logResult.getContent().setEnd(true);}}return logResult;} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<LogResult>(ReturnT.FAIL_CODE, e.getMessage());}}
读取方法:
@Overridepublic ReturnT<LogResult> log(long logDateTim, int logId, int fromLineNum) {//获取日志名称,说明 9999 是 日志id // log filename: logPath/yy yy-MM-dd/9999.logString logFileName = XxlJobFileAppender.makeLogFileName(new Date(logDateTim), logId);// 读取日志 LogResult logResult = XxlJobFileAppender.readLog(logFileName, fromLineNum);return new ReturnT<LogResult>(logResult);}
如何获取日志文件名称?
public static String makeLogFileName(Date triggerDate, int logId) {// filePath/yyyy-MM-ddSimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); // avoid concurrent problem, can not be staticFile logFilePath = new File(getLogPath(), sdf.format(triggerDate));if (!logFilePath.exists()) {logFilePath.mkdir();}// filePath/yyyy-MM-dd/9999.logString logFileName = logFilePath.getPath().concat(File.separator)// 拼接 logId 为日志名称.concat(String.valueOf(logId))// 日志后缀.concat(".log");return logFileName;}
读取日志的方法:
public static LogResult readLog(String logFileName, int fromLineNum){// valid log fileif (logFileName==null || logFileName.trim().length()==0) {return new LogResult(fromLineNum, 0, "readLog fail, logFile not found", true);}// 创建文件对象File logFile = new File(logFileName);if (!logFile.exists()) {return new LogResult(fromLineNum, 0, "readLog fail, logFile not exists", true);}// read fileStringBuffer logContentBuffer = new StringBuffer();int toLineNum = 0;LineNumberReader reader = null;try {//reader = new LineNumberReader(new FileReader(logFile));reader = new LineNumberReader(new InputStreamReader(new FileInputStream(logFile), "utf-8"));String line = null;while ((line = reader.readLine())!=null) {toLineNum = reader.getLineNumber(); // [from, to], start as 1if (toLineNum >= fromLineNum) {// 读完一行就换行logContentBuffer.append(line).append("\n");}}} catch (IOException e) {logger.error(e.getMessage(), e);} finally {if (reader != null) {try {reader.close();} catch (IOException e) {logger.error(e.getMessage(), e);}}}// 返回结果集LogResult logResult = new LogResult(fromLineNum, toLineNum, logContentBuffer.toString(), false);return logResult;}
xxl-job 原理:相关推荐
- XXL-JOB核心源码解读及时间轮原理剖析
你好,今天我想和你分享一下XXL-JOB的核心实现.如果你是XXL-JOB的用户,那么你肯定思考过它的实现原理:如果你还未接触过这个产品,那么可以通过本文了解一下. XXL-JOB的架构图(2.0版本 ...
- 说完了 xxl-job 的执行器原理,再来聊聊调度中心是如何调度任务的
前言 在上一篇 xxl-job 执行器原理分析 一文中,我们提到了 xxl-job 框架中包含了两个核心模块:调度中心 和 执行器, 其中调度中心主要负责 任务的调度 , 而执行器负责 任务的执行, ...
- xxl-job的使用及简述原理
文章目录 前言 1. 介绍 2. 部署篇 2.1. 初始化数据库 2.2. 部署调度中心 2.2.1 集群部署 2.3. 部署执行器 2.3.1 集群部署 3. 使用篇 3.1. 设置执行器 3.2. ...
- XxlJob(二) 负载均衡用法及实现原理详解
目录 一.配置一个应用执行器 二.同一台机器上模拟负载均衡 1. 环境准备 2. 触发任务,选择轮询策略 3. 机器实例动态伸缩 三.负载均衡原理解析 1. 根据应用名查找地址列表 2. Exec ...
- xxl-job源码—调度器/执行器工作原理
目录 一.架构图 1.1 功能架构图 2.2 任务调度工作原理 二.ER图 三.调度器 3.1 启动过程时序图 3.2 启动过程核心代码解析 3.2.1 启动初始化 3.2.2 执行器健康检查 3.2 ...
- xxl-job(v2.1.0 Release)执行器端的执行器自动注册原理
(一)xxl-job介绍以及搭建过程 (二)xxl-job执行器端的执行器自动注册原理 (三)xxl-job调度器端的执行器自动注册原理 (四)xxl-job任务管理以及调度器端任务手动执行的原理 ( ...
- UUID的使用及其原理
今天敲项目要用UUID,想起之前老师告诉UUID的使用,但没说具体的生成逻辑,于是我进行了百度 首先,UUID的使用: //生成随机的UUID String uuid = UUID.randomUUI ...
- etcd 笔记(01)— etcd 简介、特点、应用场景、常用术语、分布式 CAP 理论、分布式原理
1. etcd 简介 etcd 官网定义: A highly-available key value store for shared configuration and service discov ...
- git原理及常见使用方法
Git 原理入门-来自阮一峰 Git 是最流行的版本管理工具,也是程序员的必备技能之一. 即使天天使用它,很多人也未必了解它的原理.Git 为什么可以管理版本?git add.git commit这些 ...
- 微机原理—定时计数控制接口
别看题目很高深,其实就是很简单的定时器和计数器而已. 通常用手机定个闹钟,就是定时器的使用. 工厂里通过传送带上安装传感器,传感器传输给计算机的信号用来计数. 这是一些很简单的应用,通过很小的一个芯片 ...
最新文章
- CodeGen CreateFile实用程序
- 专家解读下一代互联网创新模式,核心技术是根本
- 调用天气预报Web Service
- Oracle 常用查询
- js代码判断身份证号合法性
- input输入框汇总
- JEECG Framework 3.4.1 beta 版本发布
- 阿里云虚拟主机的使用,附幸运券领取
- php fpm高并发,php-fpm 高并发、502解决方案
- 对于无线网络经常掉线的问题
- 正弦函数泰勒展开c语言,C++ 学习笔记_0012_函数(泰勒展开式、三角函数表)
- ipad4使用教程 ipad mini使用技巧
- Java代码混淆工具-ProGuard
- AVR单片机及其编译软件
- 直流有刷电机H桥正反转调速原理及Matlab/Simulink仿真
- 解决Choregraphe无法连接虚拟Nao机器人(报错显示无法连接至NAOqi)以及Choregraphe闪退问题
- 航空航天科学与工程专业术语翻译
- host 修改rancher_搭建Rancher
- Python写得好,壁纸无烦恼!
- 苹果cms8.x 命令执行漏洞本地攻击演示
热门文章
- pdf转cad怎么弄_还在为cad转pdf烦恼吗?教你CAD批量转pdf
- 初级官方卡刷包精简 添加万能ROOT
- web项目接入指纹识别+识别过程信息推送
- 在桌面计算机找不到光盘驱动,驱动程序存放在Windows7系统光盘的哪 – 手机爱问...
- vb连接mysql教程视频_VB连接MYSQL数据的方法
- Windows10中同时安装MySQL5和MySQL8
- python之类和对象
- H.264 学习建议
- python机械数据分析_记一次小机器的 Python 大数据分析
- IDEA破解码(至2099)