Quartz 集成到Spring

Spring-quartz 工程

Spring 在spring-context-support.jar 中直接提供了对Quartz 的支持。

可以在配置文件中把JobDetail、Trigger、Scheduler 定义成Bean。

定义Job

<bean name="myJob1" class="org.springframework.scheduling.quartz.JobDetailFactoryBean"><property name="name" value="my_job_1"/><property name="group" value="my_group"/><property name="jobClass" value="com.leon.quartz.MyJob1"/><property name="durability" value="true"/>
</bean>

定义Trigger

<bean name="simpleTrigger" class="org.springframework.scheduling.quartz.SimpleTriggerFactoryBean"><property name="name" value="my_trigger_1"/><property name="group" value="my_group"/><property name="jobDetail" ref="myJob1"/><property name="startDelay" value="1000"/><property name="repeatInterval" value="5000"/><property name="repeatCount" value="2"/>
</bean>

定义Scheduler

<bean name="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"><property name="triggers"><list><ref bean="simpleTrigger"/><ref bean="cronTrigger"/></list></property>
</bean>

既然可以在配置文件配置,当然也可以用@Bean 注解配置。在配置类上加上@Configuration 让Spring 读取到。

public class QuartzConfig {@Beanpublic JobDetail printTimeJobDetail(){return JobBuilder.newJob(MyJob1.class).withIdentity("leonJob").usingJobData("leon", "职位更好的你").storeDurably().build();}@Beanpublic Trigger printTimeJobTrigger() {CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule("0/5 * * * * ?");return TriggerBuilder.newTrigger().forJob(printTimeJobDetail()).withIdentity("quartzTaskService").withSchedule(cronScheduleBuilder).build();}
}

运行spring-quartz 工程的com.leon.quartz.QuartzTest

package com.leon.quartz;import org.quartz.*;
import org.quartz.impl.StdScheduler;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;/***  单元测试类*/
public class QuartzTest {private static Scheduler scheduler;public static void main(String[] args) throws SchedulerException {// 获取容器ApplicationContext ac = new ClassPathXmlApplicationContext("spring_quartz.xml");// 从容器中获取调度器scheduler = (StdScheduler) ac.getBean("scheduler");// 启动调度器scheduler.start();}}

动态调度的实现

springboot-quartz 工程

传统的Spring 方式集成,由于任务信息全部配置在xml 文件中,如果需要操作任务或者修改任务运行频率,只能重新编译、打包、部署、重启,如果有紧急问题需要处理,会浪费很多的时间。

有没有可以动态调度任务的方法?比如停止一个Job?启动一个Job?修改Job 的触发频率?

读取配置文件、写入配置文件、重启Scheduler 或重启应用明显是不可取的。

对于这种频繁变更并且需要实时生效的配置信息,我们可以放到哪里?

ZK、Redis、DB tables。

并且,我们可以提供一个界面,实现对数据表的轻松操作。

配置管理

这里我们用最简单的数据库的实现。

问题1:建一张什么样的表?参考JobDetail 的属性。

CREATE TABLE `sys_job` (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',`job_name` varchar(512) NOT NULL COMMENT '任务名称',`job_group` varchar(512) NOT NULL COMMENT '任务组名',`job_cron` varchar(512) NOT NULL COMMENT '时间表达式',`job_class_path` varchar(1024) NOT NULL COMMENT '类路径,全类型',`job_data_map` varchar(1024) DEFAULT NULL COMMENT '传递map 参数',`job_status` int(2) NOT NULL COMMENT '状态:1 启用0 停用',`job_describe` varchar(1024) DEFAULT NULL COMMENT '任务功能描述',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8;

数据操作与任务调度

操作数据表非常简单,SSM 增删改查。

但是在修改了表的数据之后,怎么让调度器知道呢?

调度器的接口:Scheduler

在我们的需求中,我们需要做的事情:

1、新增一个任务

2、删除一个任务

3、启动、停止一个任务

4、修改任务的信息(包括调度规律)

因此可以把相关的操作封装到一个工具类中。com.leon.demo.util.SchedulerUtil

package com.leon.demo.util;import com.alibaba.fastjson.JSONObject;
import com.leon.demo.config.MyJobFactory;
import org.apache.commons.lang3.StringUtils;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;import java.util.Map;/*** Quartz工具类**/
public class SchedulerUtil {private static Logger logger = LoggerFactory.getLogger(SchedulerUtil.class);/*** 新增定时任务* @param jobClassName 类路径* @param jobName 任务名称* @param jobGroupName 组别* @param cronExpression Cron表达式* @param jobDataMap 需要传递的参数* @throws Exception*/public static void addJob(String jobClassName,String jobName, String jobGroupName, String cronExpression,String jobDataMap) throws Exception {// 通过SchedulerFactory获取一个调度器实例SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();// 启动调度器scheduler.start();// 构建job信息JobDetail jobDetail = JobBuilder.newJob(getClass(jobClassName).getClass()).withIdentity(jobName, jobGroupName).build();// JobDataMap用于传递任务运行时的参数,比如定时发送邮件,可以用json形式存储收件人等等信息if (StringUtils.isNotEmpty(jobDataMap)) {JSONObject jb = JSONObject.parseObject(jobDataMap);Map<String, Object> dataMap =(Map<String, Object>) jb.get("data");for (Map.Entry<String, Object> m:dataMap.entrySet()) {jobDetail.getJobDataMap().put(m.getKey(),m.getValue());}}// 表达式调度构建器(即任务执行的时间)CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);// 按新的cronExpression表达式构建一个新的triggerCronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName).withSchedule(scheduleBuilder).startNow().build();try {scheduler.scheduleJob(jobDetail, trigger);} catch (SchedulerException e) {logger.info("创建定时任务失败" + e);throw new Exception("创建定时任务失败");}}/*** 停用一个定时任务* @param jobName 任务名称* @param jobGroupName 组别* @throws Exception*/public static void jobPause(String jobName, String jobGroupName) throws Exception {// 通过SchedulerFactory获取一个调度器实例SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();scheduler.pauseJob(JobKey.jobKey(jobName, jobGroupName));}/*** 启用一个定时任务* @param jobName 任务名称* @param jobGroupName 组别* @throws Exception*/public static void jobresume(String jobName, String jobGroupName) throws Exception {// 通过SchedulerFactory获取一个调度器实例SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();scheduler.resumeJob(JobKey.jobKey(jobName, jobGroupName));}/*** 删除一个定时任务* @param jobName 任务名称* @param jobGroupName 组别* @throws Exception*/public static void jobdelete(String jobName, String jobGroupName) throws Exception {// 通过SchedulerFactory获取一个调度器实例SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();scheduler.pauseTrigger(TriggerKey.triggerKey(jobName, jobGroupName));scheduler.unscheduleJob(TriggerKey.triggerKey(jobName, jobGroupName));scheduler.deleteJob(JobKey.jobKey(jobName, jobGroupName));}/*** 更新定时任务表达式* @param jobName 任务名称* @param jobGroupName 组别* @param cronExpression Cron表达式* @throws Exception*/public static void jobReschedule(String jobName, String jobGroupName, String cronExpression) throws Exception {try {SchedulerFactory schedulerFactory = new StdSchedulerFactory();Scheduler scheduler = schedulerFactory.getScheduler();TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);// 表达式调度构建器CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);// 按新的cronExpression表达式重新构建triggertrigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).startNow().build();// 按新的trigger重新设置job执行scheduler.rescheduleJob(triggerKey, trigger);} catch (SchedulerException e) {System.out.println("更新定时任务失败" + e);throw new Exception("更新定时任务失败");}}/*** 检查Job是否存在* @throws Exception*/public static Boolean isResume(String jobName, String jobGroupName) throws Exception {SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);Boolean state = scheduler.checkExists(triggerKey);return state;}/*** 暂停所有任务* @throws Exception*/public static void pauseAlljob() throws Exception {SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();scheduler.pauseAll();}/*** 唤醒所有任务* @throws Exception*/public static void resumeAlljob() throws Exception {SchedulerFactory sf = new StdSchedulerFactory();Scheduler sched = sf.getScheduler();sched.resumeAll();}/*** 获取Job实例* @param classname* @return* @throws Exception*/public static BaseJob getClass(String classname) throws Exception {try {Class<?> c = Class.forName(classname);return (BaseJob) c.newInstance();} catch (Exception e) {throw new Exception("类["+classname+"]不存在!");}}}

容器启动与Service 注入

容器启动

因为任务没有定义在ApplicationContext.xml 中,而是放到了数据库中,SpringBoot 启动时,怎么读取任务信息?

或者,怎么在Spring 启动完成的时候做一些事情?

创建一个类,实现CommandLineRunner 接口,实现run 方法。

从表中查出状态是1 的任务,然后构建。

Service 类注入到Job 中

Spring Bean 如何注入到实现了Job 接口的类中?

例如在TestTask3 中,需要注入ISysJobService,查询数据库发送邮件。

如果没有任何配置,注入会报空指针异常。

原因:

因为定时任务Job 对象的实例化过程是在Quartz 中进行的,而Service Bean 是由Spring 容器管理的,Quartz 察觉不到Service Bean 的存在,所以无法将Service Bean装配到Job 对象中。

分析:

Quartz 集成到Spring 中,用到SchedulerFactoryBean,其实现了InitializingBean方法,在唯一的方法afterPropertiesSet()在Bean 的属性初始化后调用。

调度器用AdaptableJobFactory 对Job 对象进行实例化。所以,如果我们可以把这个JobFactory 指定为我们自定义的工厂的话,就可以在Job 实例化完成之后,把Job纳入到Spring 容器中管理。

解决这个问题的步骤:

1、定义一个AdaptableJobFactory,实现JobFactory 接口,实现接口定义的newJob 方法,在这里面返回Job 实例

2、定义一个MyJobFactory,继承AdaptableJobFactory。使用Spring 的AutowireCapableBeanFactory,把Job 实例注入到容器中。

@Component
public class MyJobFactory extends AdaptableJobFactory {@Autowiredprivate AutowireCapableBeanFactory capableBeanFactory;protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {Object jobInstance = super.createJobInstance(bundle);capableBeanFactory.autowireBean(jobInstance);return jobInstance;}
}

3、指定Scheduler 的JobFactory 为自定义的JobFactory。com.leon.demo.config.InitStartSchedule 中:

scheduler.setJobFactory(myJobFactory);
package com.leon.demo.config;import com.alibaba.fastjson.JSONObject;
import com.leon.demo.entity.SysJob;
import com.leon.demo.service.ISysJobService;
import com.leon.demo.util.BaseJob;
import org.apache.commons.lang3.StringUtils;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** 这个类用于启动SpringBoot时,加载作业。run方法会自动执行。** 另外可以使用 ApplicationRunner**/
@Component
public class InitStartSchedule implements CommandLineRunner {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate ISysJobService sysJobService;@Autowiredprivate MyJobFactory myJobFactory;@Overridepublic void run(String... args) throws Exception {/*** 用于程序启动时加载定时任务,并执行已启动的定时任务(只会执行一次,在程序启动完执行)*///查询job状态为启用的HashMap<String,String> map = new HashMap<String,String>();map.put("jobStatus", "1");List<SysJob> jobList= sysJobService.querySysJobList(map);if( null == jobList || jobList.size() ==0){logger.info("系统启动,没有需要执行的任务... ...");}// 通过SchedulerFactory获取一个调度器实例SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();// 如果不设置JobFactory,Service注入到Job会报空指针scheduler.setJobFactory(myJobFactory);// 启动调度器scheduler.start();for (SysJob sysJob:jobList) {String jobClassName=sysJob.getJobName();String jobGroupName=sysJob.getJobGroup();//构建job信息JobDetail jobDetail = JobBuilder.newJob(getClass(sysJob.getJobClassPath()).getClass()).withIdentity(jobClassName, jobGroupName).build();if (StringUtils.isNotEmpty(sysJob.getJobDataMap())) {JSONObject jb = JSONObject.parseObject(sysJob.getJobDataMap());Map<String, Object> dataMap = (Map<String, Object>)jb.get("data");for (Map.Entry<String, Object> m:dataMap.entrySet()) {jobDetail.getJobDataMap().put(m.getKey(),m.getValue());}}//表达式调度构建器(即任务执行的时间)CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(sysJob.getJobCron());//按新的cronExpression表达式构建一个新的triggerCronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobClassName, jobGroupName).withSchedule(scheduleBuilder).startNow().build();// 任务不存在的时候才添加if( !scheduler.checkExists(jobDetail.getKey()) ){try {scheduler.scheduleJob(jobDetail, trigger);} catch (SchedulerException e) {logger.info("\n创建定时任务失败"+e);throw new Exception("创建定时任务失败");}}}}public static BaseJob getClass(String classname) throws Exception{Class<?>  c= Class.forName(classname);return (BaseJob)c.newInstance();}
}

考虑这么一种情况:

正在运行的Quartz 节点挂了,而所有人完全不知情……

Quartz 集群部署

springboot-quartz 工程

为什么需要集群?

1、防止单点故障,减少对业务的影响

2、减少节点的压力,例如在10 点要触发1000 个任务,如果有10 个节点,则每个节点之需要执行100 个任务

集群需要解决的问题?

1、任务重跑,因为节点部署的内容是一样的,到10 点的时候,每个节点都会执行相同的操作,引起数据混乱。比如跑批,绝对不能执行多次。

2、任务漏跑,假如任务是平均分配的,本来应该在某个节点上执行的任务,因为节点故障,一直没有得到执行。

3、水平集群需要注意时间同步问题

4、Quartz 使用的是随机的负载均衡算法,不能指定节点执行

所以必须要有一种共享数据或者通信的机制。在分布式系统的不同节点中,我们可以采用什么样的方式,实现数据共享?

两两通信,或者基于分布式的服务,实现数据共享。

例如:ZK、Redis、DB。

在Quartz 中,提供了一种简单的方式,基于数据库共享任务执行信息。也就是说,一个节点执行任务的时候,会操作数据库,其他的节点查询数据库,便可以感知到了。

同样的问题:建什么表?哪些字段?依旧使用系统自带的11 张表。

集群配置与验证

quartz.properties 配置。

四个配置:集群实例ID、集群开关、数据库持久化、数据源信息

注意先清空quartz 所有表、改端口、两个任务频率改成一样

验证1:先后启动2 个节点,任务是否重跑

验证2:停掉一个节点,任务是否漏跑

Quartz 调度原理

问题:

1、Job 没有继承Thread 和实现Runnable,是怎么被调用的?通过反射还是什么?

2、任务是什么时候被调度的?是谁在监视任务还是监视Trigger?

3、任务是怎么被调用的?谁执行了任务?

4、任务本身有状态吗?还是触发器有状态?

看源码的入口

Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
scheduler.start();

获取调度器实例

读取配置文件

public Scheduler getScheduler() throws SchedulerException {if (cfg == null) {// 读取quartz.properties 配置文件initialize();}// 这个类是一个HashMap,用来基于调度器的名称保证调度器的唯一性SchedulerRepository schedRep = SchedulerRepository.getInstance();Scheduler sched = schedRep.lookup(getSchedulerName());// 如果调度器已经存在了if (sched != null) {// 调度器关闭了,移除if (sched.isShutdown()) {schedRep.remove(getSchedulerName());} else {// 返回调度器return sched;}}// 调度器不存在,初始化sched = instantiate();return sched;
}

instantiate()方法中做了初始化的所有工作:

// 存储任务信息的JobStore
JobStore js = null;
// 创建线程池,默认是SimpleThreadPool
ThreadPool tp = null;
// 创建调度器
QuartzScheduler qs = null;
// 连接数据库的连接管理器
DBConnectionManager dbMgr = null;
// 自动生成ID
// 创建线程执行器,默认为DefaultThreadExecutor
ThreadExecutor threadExecutor;

创建线程池(包工头)

830 行和839 行,创建了一个线程池,默认是配置文件中指定的SimpleThreadPool。

String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());
tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();

SimpleThreadPool 里面维护了三个list,分别存放所有的工作线程、空闲的工作线程和忙碌的工作线程。我们可以把SimpleThreadPool 理解为包工头。

private List<WorkerThread> workers;
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();

tp 的runInThread()方法是线程池运行线程的接口方法。参数Runnable 是执行的任务内容。

取出WorkerThread 去执行参数里面的runnable(JobRunShell)。

WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);

WorkerThread(工人)

WorkerThread 是SimpleThreadPool 的内部类, 用来执行任务。我们把WorkerThread 理解为工人。在WorkerThread 的run 方法中,执行传入的参数runnable任务:

runnable.run();

创建调度线程(项目经理)

1321 行,创建了调度器QuartzScheduler:

qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);

在QuartzScheduler 的构造函数中,创建了QuartzSchedulerThread,我们把它理解为项目经理,它会调用包工头的工人资源,给他们安排任务。

并且创建了线程执行器schedThreadExecutor , 执行了这个QuartzSchedulerThread,也就是调用了它的run 方法。

// 创建一个线程,resouces 里面有线程名称
this.schedThread = new QuartzSchedulerThread(this, resources);
// 线程执行器
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
//执行这个线程,也就是调用了线程的run 方法
schedThreadExecutor.execute(this.schedThread);

点开QuartzSchedulerThread 类,找到run 方法,这个是Quartz 任务调度的核心方法:

public void run() {int acquiresFailed = 0;// 检查scheuler 是否为停止状态while (!halted.get()) {try {// check if we're supposed to pause...synchronized (sigLock) {// 检查是否为暂停状态while (paused && !halted.get()) {try {// wait until togglePause(false) is called...// 暂停的话会尝试去获得信号锁,并wait 一会sigLock.wait(1000L);} catch (InterruptedException ignore) {}// reset failure counter when paused, so that we don't// wait again after unpausingacquiresFailed = 0;}if (halted.get()) {break;}}// wait a bit, if reading from job store is consistently// failing (e.g. DB is down or restarting)..// 从JobStore 获取Job 持续失败,sleep 一下if (acquiresFailed > 1) {try {long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);Thread.sleep(delay);} catch (Exception ignore) {}}// 从线程池获取可用的线程int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...List<OperableTrigger> triggers;long now = System.currentTimeMillis();clearSignaledSchedulingChange();try {// 获取需要下次执行的triggers// idleWaitTime: 默认30s// availThreadCount:获取可用(空闲)的工作线程数量,总会大于1,因为该方法会一直阻塞,直到有工作线程空闲下来。// maxBatchSize:一次拉取trigger 的最大数量,默认是1// batchTimeWindow:时间窗口调节参数,默认是0// misfireThreshold: 超过这个时间还未触发的trigger,被认为发生了misfire,默认60s// 调度线程一次会拉取NEXT_FIRETIME 小于(now + idleWaitTime +batchTimeWindow),大于(now - misfireThreshold)的,min(availThreadCount,maxBatchSize)个triggers,默认情况下,会拉取未来30s、过去60s 之间还未fire 的1 个triggertriggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow());// 省略…………// set triggers to 'executing'List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();boolean goAhead = true;synchronized(sigLock) {goAhead = !halted.get();}if(goAhead) {try {// 触发Trigger,把ACQUIRED 状态改成EXECUTING// 如果这个trigger 的NEXTFIRETIME 为空,也就是未来不再触发,就将其状态改为COMPLETE// 如果trigger 不允许并发执行(即Job 的实现类标注了@DisallowConcurrentExecution),则将状态变为BLOCKED,否则就将状态改为WAITINGList<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);// 省略…………continue;}}// 循环处理Triggerfor (int i = 0; i < bndles.size(); i++) {TriggerFiredResult result = bndles.get(i);TriggerFiredBundle bndle = result.getTriggerFiredBundle();Exception exception = result.getException();// 省略…………JobRunShell shell = null;try {// 根据trigger 信息实例化JobRunShell(implements Runnable),同时依据JOB_CLASS_NAME 实例化Job,随后我们将JobRunShell 实例丢入工作线。shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);shell.initialize(qs);} catch (SchedulerException se) {qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(),CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);continue;}// 执行JobRunShell 的run 方法if (qsRsrcs.getThreadPool().runInThread(shell) == false) {// 省略…………

JobRunShell 的作用

JobRunShell instances are responsible for providing the 'safe' environment for Job s to run in, and for performing all of
the work of executing the Job, catching ANY thrown exceptions, updating the Trigger with the Job's completion code, etc.

A JobRunShell instance is created by a JobRunShellFactory on behalf of the QuartzSchedulerThread which then runs the
shell in a thread from the configured ThreadPool when the scheduler determines that a Job has been triggered.

JobRunShell 用来为Job 提供安全的运行环境的,执行Job 中所有的作业,捕获运行中的异常,在任务执行完毕的时候更新Trigger 状态,等等

JobRunShell 实例是用JobRunShellFactory 为QuartzSchedulerThread 创建的,在调度器决定一个Job 被触发的时候,它从线程池中取出一个线程来执行任务。

线程模型总结

SimpleThreadPool:包工头,管理所有WorkerThread

WorkerThread:工人,把Job 包装成JobRunShell,执行

QuartSchedulerThread:项目经理,获取即将触发的Trigger,从包工头出拿到worker,执行Trigger 绑定的任务

绑定JobDetail 和Trigger

// 存储JobDetail 和Trigger
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
// 通知相关的Listener
notifySchedulerListenersJobAdded(jobDetail);
notifySchedulerThread(trigger.getNextFireTime().getTime());
notifySchedulerListenersSchduled(trigger);

启动调度器

// 通知监听器
notifySchedulerListenersStarting();
if (initialStart == null) {initialStart = new Date();this.resources.getJobStore().schedulerStarted();startPlugins();
} else {resources.getJobStore().schedulerResumed();
}
// 通知QuartzSchedulerThread 不再等待,开始干活
schedThread.togglePause(false);
// 通知监听器
notifySchedulerListenersStarted();

源码总结

getScheduler 方法创建线程池ThreadPool,创建调度器QuartzScheduler,创建调度线程QuartzSchedulerThread,调度线程初始处于暂停状态。

scheduleJob 将任务添加到JobStore 中。

scheduler.start()方法激活调度器,QuartzSchedulerThread 从timeTrriger 取出待触发的任务, 并包装成TriggerFiredBundle , 然后由JobRunShellFactory 创建TriggerFiredBundle 的执行线程JobRunShell , 调度执行通过线程池SimpleThreadPool 去执行JobRunShell,而JobRunShell 执行的就是任务类的execute方法:job.execute(JobExecutionContext context)。

集群原理

基于数据库,如何实现任务的不重跑不漏跑?

问题1:如果任务执行中的资源是“下一个即将触发的任务”,怎么基于数据库实现这个资源的竞争?

问题2:怎么对数据的行加锁?

QuartzSchedulerThread 第287 行,获取下一个即将触发的Trigger

triggers = qsRsrcs.getJobStore().acquireNextTriggers(

调用JobStoreSupport 的acquireNextTriggers()方法,2793 行

调用JobStoreSupport.executeInNonManagedTXLock()方法,3829 行:

return executeInNonManagedTXLock(lockName,

尝试获取锁,3843 行:

transOwner = getLockHandler().obtainLock(conn, lockName);

下面有回滚和释放锁的语句,即使发生异常,锁同样能释放。

调用DBSemaphore 的obtainLock()方法,103 行

public boolean obtainLock(Connection conn, String lockName)throws LockException {if (!isLockOwner(lockName)) {executeSQL(conn, lockName, expandedSQL, expandedInsertSQL);

调用StdRowLockSemaphore 的executeSQL()方法,78 行。

最终用JDBC 执行SQL,语句内容是expandedSQL 和expandedInsertSQL

ps = conn.prepareStatement(expandedSQL);

问题:expandedSQL 和expandedInsertSQL 是一条什么SQL 语句?似乎我们没有赋值?

在StdRowLockSemaphore 的构造函数中,把定义的两条SQL 传进去

public StdRowLockSemaphore() {super(DEFAULT_TABLE_PREFIX, null, SELECT_FOR_LOCK, INSERT_LOCK);
}
public static final String SELECT_FOR_LOCK = "SELECT * FROM "+ TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " +
SCHED_NAME_SUBST+ " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";
public static final String INSERT_LOCK = "INSERT INTO "+ TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " +
COL_LOCK_NAME + ") VALUES ("+ SCHED_NAME_SUBST + ", ?)";

它调用了父类DBSemaphore 的构造函数:

public DBSemaphore(String tablePrefix, String schedName, String defaultSQL, String defaultInsertSQL) {this.tablePrefix = tablePrefix;this.schedName = schedName;setSQL(defaultSQL);setInsertSQL(defaultInsertSQL);
}

在setSQL()和setInsertSQL()中为expandedSQL 和expandedInsertSQL 赋值。

执行的SQL 语句:

select * from QRTZ_LOCKS t where t.lock_name='TRIGGER_ACCESS' for update

在我们执行官方的建表脚本的时候,QRTZ_LOCKS 表,它会为每个调度器创建两行数据,获取Trigger 和触发Trigger 是两把锁:

任务为什么重复执行

在我们的演示过程中,有多个调度器,任务没有重复执行,也就是默认会加锁,什么情况下不会上锁呢?

JobStoreSupport 的executeInNonManagedTXLock()方法

如果lockName 为空,则不上锁

if (lockName != null) {// If we aren't using db locks, then delay getting DB connection// until after acquiring the lock since it isn't needed.if (getLockHandler().requiresConnection()) {conn = getNonManagedTXConnection();}transOwner = getLockHandler().obtainLock(conn, lockName);
}
if (conn == null) {conn = getNonManagedTXConnection();
}

而上一步JobStoreSupport 的acquireNextTriggers()方法,

1 ) 如果acquireTriggersWithinLock=true 或者batchTriggerAcquisitionMaxCount>1 时, locaName 赋值为LOCK_TRIGGER_ACCESS,此时获取Trigger 会加锁。

2)否则,如果isAcquireTriggersWithinLock()值是false 并且maxCount=1 的话,lockName 赋值为null,这种情况获取Trigger 下不加锁。

public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)throws JobPersistenceException {String lockName;if(isAcquireTriggersWithinLock() || maxCount > 1) {lockName = LOCK_TRIGGER_ACCESS;} else {lockName = null;}

acquireTriggersWithinLock 变量默认是false:

private boolean acquireTriggersWithinLock = false;

maxCount 来自QuartzSchedulerThread:

triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow());

getMaxBatchSize()来自QuartzSchedulerResources,代表Scheduler 一次拉取trigger 的最大数量,默认是1:

private int maxBatchSize = 1;

这个值可以通过参数修改,代表允许调度程序节点一次获取(用于触发)的触发器的最大数量,默认值是1。

org.quartz.scheduler.batchTriggerAcquisitionMaxCount=1

根据以上两个默认值,理论上在获取Trigger 的时候不会上锁,但是实际上为什么没有出现频繁的重复执行问题?因为每个调度器的线程持有锁的时间太短了,单机的测试无法体现,而在高并发的情况下,有可能会出现这个问题。

QuartzSchedulerThread 的triggersFired()方法:

List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);

调用了JobStoreSupport 的triggersFired()方法,接着又调用了一个triggerFiredtriggerFired(Connection conn, OperableTrigger trigger)方法:

如果Trigger 的状态不是ACQUIRED,也就是说被其他的线程fire 了,返回空。但是这种乐观锁的检查在高并发下难免会出现ABA 的问题,比如线程A 拿到的时候还是ACQUIRED 状态,但是刚准备执行的时候已经变成了EXECUTING 状态,这个时候就会出现重复执行的问题。

if (!state.equals(STATE_ACQUIRED)) {return null;
}

总结,如果:

如果设置的数量为1(默认值),并且使用JDBC JobStore(RAMJobStore 不支持分布式, 只有一个调度器实例, 所以不加锁) , 则属性org.quartz.jobStore.acquireTriggersWithinLock 应设置为true。否则不加锁可能会导致任务重复执行。

org.quartz.scheduler.batchTriggerAcquisitionMaxCount=1
org.quartz.jobStore.acquireTriggersWithinLock=true

任务调度之Quartz2相关推荐

  1. Quartz-2.2.1 任务调度框架在Java项目中的使用实例

    < Quartz-2.2.1 任务调度框架在Java项目中的使用实例 > 本实例是基于Quartz 2.2.1 版本为中心,也是目前最新的Quartz任务调度框架. 目前在 J2EE 项目 ...

  2. 项目ITP(五) spring4.0 整合 Quartz 实现任务调度

    2014-05-16 22:51 by Jeff Li 前言 系列文章:[传送门] 项目需求: 二维码推送到一体机上,给学生签到扫描用. 然后须要的是 上课前20分钟 .幸好在帮带我的学长做 p2p ...

  3. Quartz 实现分布式任务调度

    2019独角兽企业重金招聘Python工程师标准>>> 情景 在开发的时候可能需要用到任务调度,通常我们使用quartz,单独部署一个实例来进行任务调度,然后就会存在单点的风险,qu ...

  4. 一文揭秘定时任务调度框架quartz

    之前写过quartz或者引用过quartz的一些文章,有很多人给我发消息问quartz的相关问题, quartz 报错:java.lang.classNotFoundException quartz源 ...

  5. Spring与Quartz的整合实现定时任务调度

    Spring与Quartz的整合实现定时任务调度 摘自: http://kevin19900306.iteye.com/blog/1397744 最近在研究Spring中的定时任务功能,最好的办法当然 ...

  6. 任务调度之Quartz1

    目标 1.了解任务调度的应用场景和Quartz的基本特性 2.掌握Quartz Java编程和Spring集成的使用 3.掌握Quartz动态调度和集群部署的实现 4.理解Quartz原理与线程模型 ...

  7. .net core 实现基于 cron 表达式的任务调度

    .net core 实现基于 cron 表达式的任务调度 Intro 上次我们实现了一个简单的基于 Timer 的定时任务,详细信息可以看这篇文章 . 但是使用过程中慢慢发现这种方式可能并不太合适,有 ...

  8. Spring的两种任务调度Scheduled和Async

    Spring提供了两种后台任务的方法,分别是: 调度任务,@Schedule 异步任务,@Async 当然,使用这两个是有条件的,需要在spring应用的上下文中声明 <task:annotat ...

  9. Quartz_简单编程式任务调度使用(SimpleTrigger)

    最近在工作中,要做定时任务的更能,最开始的时候,想到的是 JavaSE 中,自带 Timer 及 TimerTask 联合使用,完成定时任务.最后发现,随着业务的复杂,JDK 中的 Timer 和 T ...

最新文章

  1. linux signal函数用法,linux信号机制之sigaction构造体浅析,signal 函数,信号捕捉.
  2. 通过System.Management获取操作系统信息
  3. 2021年春季学期-信号与系统-第十三次作业参考答案-第二小题
  4. IT人 不要一辈子靠技术生存(转)
  5. java之父求职_Java求职实战之继承和多态
  6. python3的样子_python3与python2不一样的地方
  7. 用友财务软件主要数据表字段含义
  8. linux mc服务器 mod_如何在linux搭建MC服务器
  9. img 固定在父容器底部_容器苗容器怎么选?六种容器特点详解!
  10. 从零开始实现数据结构(一) 动态数组
  11. 《天天数学》连载45:二月十四日
  12. 野外帐篷露营避难有感
  13. GitHub简单教程
  14. 全卷机神经网络图像分割(U-net)-keras实现
  15. 基于java的房屋出租管理系统
  16. Linux系统Centos7安装RabbitMQ使用压缩包配置环境变量RabbitMQ 3.8.16.tar.xz Erlang 24
  17. 生活是苦难的,我又划着我的断桨出发了
  18. hexo添加文章更新时间
  19. java 获取est时间_将日期字符串(EST)转换为Java日期(UTC)
  20. 【中土世界】宏大地名汇总

热门文章

  1. NOVO Nordisk IT SOP List
  2. BCB中的RTTI机制
  3. JDK自带VM分析工具jps,jstat,jmap,jconsole
  4. 【Spring学习】IOC容器
  5. 【转】别人整理的DP大全
  6. MVC把表格导出到Excel
  7. 代码生成工具之Winform查询列表界面生成
  8. Windows 8 Release Preview 安装秘技两则
  9. iOS再现安全漏洞 “1970变砖”问题仍未解决
  10. 构建高可用服务器之 Keepalive参数详解