





public void run() {

    int acquiresFailed = 0;

    while (!halted.get()) {

        try {

            // check if we're supposed to pause...

            synchronized (sigLock) {

                while (paused && !halted.get()) {

                    try {

                        // wait until togglePause(false) is called...


                    catch (InterruptedException ignore) {


                    // reset failure counter when paused, so that we don't

                    // wait again after unpausing

                    acquiresFailed = 0;


                if (halted.get()) {




            // wait a bit, if reading from job store is consistently

            // failing (e.g. DB is down or restarting)..

            if (acquiresFailed > 1) {

                try {

                    long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);


                catch (Exception ignore) {



            // 获取工作线程池中可用的线程数量

            int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();

            if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...


                List<OperableTrigger> triggers;

                long now = System.currentTimeMillis();


                try {

                    // idleWaitTime默认是30秒,

                    // maxBatchSize默认为1 , 每次获取一个Trigger来执行,

                    // 最后一个参数,默认为0

                    // 总的来说,这个方法是将最近30秒内最近的一批需要执行的JOB给抓出来,抓取数量为maxBatchSize,同时更新JOB的状态为ACQUIRED

                    // 同时更新JOB的下次执行时间。  这里如果maxBatchSize等于1 的话,则默认不加悲观锁

                    // 该方法后面重点讲。

                    triggers = qsRsrcs.getJobStore().acquireNextTriggers(

                            now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());


                    acquiresFailed = 0;

                    if (log.isDebugEnabled())

                        log.debug("batch acquisition of " + (triggers == null 0 : triggers.size()) + " triggers");

                catch (JobPersistenceException jpe) {

                    if (acquiresFailed == 0) {


                            "An error occurred while scanning for the next triggers to fire.",



                    // 执行失败

                    if (acquiresFailed < Integer.MAX_VALUE)



                catch (RuntimeException e) {

                    if (acquiresFailed == 0) {

                        getLog().error("quartzSchedulerThreadLoop: RuntimeException "

                                +e.getMessage(), e);


                    if (acquiresFailed < Integer.MAX_VALUE)




                // triggers不为空

                if (triggers != null && !triggers.isEmpty()) {

                    // 获取

                    now = System.currentTimeMillis();

                    // 获取第一个

                    long triggerTime = triggers.get(0).getNextFireTime().getTime();


                    long timeUntilTrigger = triggerTime - now;

                    while(timeUntilTrigger > 2) {

                        synchronized (sigLock) {

                            if (halted.get()) {



                            if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {

                                try {

                                    // we could have blocked a long while

                                    // on 'synchronize', so we must recompute

                                    now = System.currentTimeMillis();

                                    timeUntilTrigger = triggerTime - now;

                                    if(timeUntilTrigger >= 1)


                                catch (InterruptedException ignore) {





                        // 如果重新获取新的任务的时间,依赖赶不上新任务的触发时间,那么就继续执行当前的任务。否则放弃任务

                        // 比如: 下次任务的触发时间是0.1秒后,但是获取任务的时候就需要0.2秒,那么即使去获取了,也没有意义,索性直接执行当前的任务。

                        if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {



                        now = System.currentTimeMillis();

                        timeUntilTrigger = triggerTime - now;


                    // 这里再次做一次非空判断,防止上面那个循环里面,将triggers清空了

                    // this happens if releaseIfScheduleChangedSignificantly decided to release triggers



                    // set triggers to 'executing'

                    List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();

                    boolean goAhead = true;

                    synchronized(sigLock) {

                        goAhead = !halted.get();


                    if(goAhead) {

                        try {

                            // 这个地方是获取trigger的详情信息,并且做一系列的状态判断,防止重复执行,是否是串行还是并行执行,在这里面都有处理

                            List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);

                            if(res != null)

                                // 获取结果

                                bndles = res;

                        catch (SchedulerException se) {


                                    "An error occurred while firing triggers '"

                                            + triggers + "'", se);

                            //QTZ-179 : a problem occurred interacting with the triggers from the db

                            //we release them and loop again

                            for (int i = 0; i < triggers.size(); i++) {






                    // 循环trigger的结果信息

                    for (int i = 0; i < bndles.size(); i++) {

                        TriggerFiredResult result =  bndles.get(i);

                        TriggerFiredBundle bndle =  result.getTriggerFiredBundle();

                        Exception exception = result.getException();

                            // 异常处理

                        if (exception instanceof RuntimeException) {

                            getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);




                        // it's possible to get 'null' if the triggers was paused,

                        // blocked, or other similar occurrences that prevent it being

                        // fired at this time...  or if the scheduler was shutdown (halted)

                        // 为空的话,则释放状态绑定,将状态从ACQUIRED修改为WAITING

                        if (bndle == null) {




                        JobRunShell shell = null;

                        try {

                            // 构建任务执行的脚本信息

                            shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);


                        catch (SchedulerException se) {

                            qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);



                        // 将任务丢到线程池里面去处理,至此,任务触发算是完成了

                        if (qsRsrcs.getThreadPool().runInThread(shell) == false) {

                            // this case should never happen, as it is indicative of the

                            // scheduler being shutdown or a bug in the thread pool or

                            // a thread pool being used concurrently - which the docs

                            // say not to do...

                            getLog().error("ThreadPool.runInThread() return false!");

                            qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);



                    continue// while (!halted)


            else // if(availThreadCount > 0)

                // should never happen, if threadPool.blockForAvailableThreads() follows contract

                continue// while (!halted)


            long now = System.currentTimeMillis();

            long waitTime = now + getRandomizedIdleWaitTime();

            long timeUntilContinue = waitTime - now;

            synchronized(sigLock) {

                try {

                  if(!halted.get()) {

                    // QTZ-336 A job might have been completed in the mean time and we might have

                    // missed the scheduled changed signal by not waiting for the notify() yet

                    // Check that before waiting for too long in case this very job needs to be

                    // scheduled very soon

                    if (!isScheduleChanged()) {




                catch (InterruptedException ignore) {



        catch(RuntimeException re) {

            getLog().error("Runtime error occurred in main trigger firing loop.", re);


    // while (!halted)

    // drop references to scheduler stuff to aid garbage collection...

    qs = null;

    qsRsrcs = null;





public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)

    throws JobPersistenceException {


    String lockName;

    // 从这个地方可以看到maxCount大于1 的时候才会使用悲观锁, isAcquireTriggersWithinLock默认为false

    if(isAcquireTriggersWithinLock() || maxCount > 1) {

        lockName = LOCK_TRIGGER_ACCESS;

    else {

        lockName = null;


    return executeInNonManagedTXLock(lockName,

            new TransactionCallback<List<OperableTrigger>>() {

                public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {

                    // 重点看这个方法

                    // executeInNonManagedTXLock 里面最终主要的就是执行这个方法。

                    return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);



            new TransactionValidator<List<OperableTrigger>>() {

                public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {

                    try {

                        List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());

                        Set<String> fireInstanceIds = new HashSet<String>();

                        for (FiredTriggerRecord ft : acquired) {



                        for (OperableTrigger tr : result) {

                            if (fireInstanceIds.contains(tr.getFireInstanceId())) {

                                return true;



                        return false;

                    catch (SQLException e) {

                        throw new JobPersistenceException("error validating trigger acquisition", e);





protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)

    throws JobPersistenceException {

    if (timeWindow < 0) {

      throw new IllegalArgumentException();



    List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();

    Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();

    // 最多重试三次

    final int MAX_DO_LOOP_RETRY = 3;

    int currentLoopCount = 0;

    do {

        // 进入do while循环

        currentLoopCount ++;

        try {


            List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);


            // No trigger is ready to fire yet.

            if (keys == null || keys.size() == 0)

                return acquiredTriggers;

            // 设置截止时间

            long batchEnd = noLaterThan;

            for(TriggerKey triggerKey: keys) {

                // If our trigger is no longer available, try a new one.

                // 判断 trigger是否存在

                OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);

                if(nextTrigger == null) {

                    continue// next trigger



                // If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then

                // put it back into the timeTriggers set and continue to search for next trigger.

                JobKey jobKey = nextTrigger.getJobKey();

                // 判断trigger对应的jobDetail是否存在

                JobDetail job;

                try {

                    job = retrieveJob(conn, jobKey);

                catch (JobPersistenceException jpe) {

                    try {

                        getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);

                        getDelegate().updateTriggerState(conn, triggerKey, STATE_ERROR);

                    catch (SQLException sqle) {

                        getLog().error("Unable to set trigger state to ERROR.", sqle);




                // 是否允许并发执行, JobBean上面含@DisallowConcurrentExecution这个注解的,表示不允许并发执行

                if (job.isConcurrentExectionDisallowed()) {

                    // 进入这里,表示不允许并发执行

                    if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {

                        continue// next trigger

                    else {




                // 如果该任务的下次执行时间大于截止时间,那么跳过

                if (nextTrigger.getNextFireTime().getTime() > batchEnd) {



                // We now have a acquired trigger, let's add to return list.

                // If our trigger was no longer in the expected state, try a new one.

                // 更新这个trigger的状态为ACQUIRED ,表示正在准备出发。

                int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);

                if (rowsUpdated <= 0) {

                    continue// next trigger



                // 插入出发记录

                getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);

                if(acquiredTriggers.isEmpty()) {

                    batchEnd = Math.max(nextTrigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow;


                // 加入返回trigger



            // if we didn't end up with any trigger to fire from that first

            // batch, try again for another batch. We allow with a max retry count.

            if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) {




            // We are done with the while loop.


        catch (Exception e) {

            throw new JobPersistenceException(

                      "Couldn't acquire next trigger: " + e.getMessage(), e);


    while (true);


    // Return the acquired trigger list

    return acquiredTriggers;


上面看到的是触发器的获取详细实现,如果每次获取的maxCount大于1 ,那么就会使用悲观锁,防止任务在集群状态下

被重复获取,默认maxCount=1 , 这也就导致了,在默认的集群模式下,如果不做这个配置,在并发状态下,就会有出现任务




List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);

public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException {

    // 直接传入锁名,使用悲观锁

    return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,

            new TransactionCallback<List<TriggerFiredResult>>() {

                public List<TriggerFiredResult> execute(Connection conn) throws JobPersistenceException {

                    List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>();

                    TriggerFiredResult result;

                    for (OperableTrigger trigger : triggers) {

                        try {

                          // 单个任务来慢慢搞

                          TriggerFiredBundle bundle = triggerFired(conn, trigger);

                          result = new TriggerFiredResult(bundle);

                        catch (JobPersistenceException jpe) {

                            result = new TriggerFiredResult(jpe);

                        catch(RuntimeException re) {

                            result = new TriggerFiredResult(re);




                    return results;



            new TransactionValidator<List<TriggerFiredResult>>() {


                public Boolean validate(Connection conn, List<TriggerFiredResult> result) throws JobPersistenceException {

                    try {

                        List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());

                        Set<String> executingTriggers = new HashSet<String>();

                        for (FiredTriggerRecord ft : acquired) {

                            if (STATE_EXECUTING.equals(ft.getFireInstanceState())) {




                        for (TriggerFiredResult tr : result) {

                            if (tr.getTriggerFiredBundle() != null && executingTriggers.contains(tr.getTriggerFiredBundle().getTrigger().getFireInstanceId())) {

                                return true;



                        return false;

                    catch (SQLException e) {

                        throw new JobPersistenceException("error validating trigger acquisition", e);





protected TriggerFiredBundle triggerFired(Connection conn,

        OperableTrigger trigger)

    throws JobPersistenceException {

    JobDetail job;

    Calendar cal = null;

    // Make sure trigger wasn't deleted, paused, or completed...

    try // if trigger was deleted, state will be STATE_DELETED

        // 验证trigger的状态,如果不是等于ACQUIRED的,则直接return null

        String state = getDelegate().selectTriggerState(conn,


        if (!state.equals(STATE_ACQUIRED)) {

            return null;


    catch (SQLException e) {

        throw new JobPersistenceException("Couldn't select trigger state: "

                + e.getMessage(), e);


    try {

        // 获取这个trigger的任务详情。

        job = retrieveJob(conn, trigger.getJobKey());

        if (job == null) { return null; }

    catch (JobPersistenceException jpe) {

        try {

            getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);

            getDelegate().updateTriggerState(conn, trigger.getKey(),


        catch (SQLException sqle) {

            getLog().error("Unable to set trigger state to ERROR.", sqle);


        throw jpe;


    if (trigger.getCalendarName() != null) {

        // 这里主要是对非集群模式下做一些缓存处理

        cal = retrieveCalendar(conn, trigger.getCalendarName());

        if (cal == null) { return null; }


    try {

        // 更新触发记录的状态为EXECUTING

        getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);

    catch (SQLException e) {

        throw new JobPersistenceException("Couldn't insert fired trigger: "

                + e.getMessage(), e);


    Date prevFireTime = trigger.getPreviousFireTime();

    // call triggered - to update the trigger's next-fire-time state...

    // 计算下一次的trigger的执行时间


    String state = STATE_WAITING;

    boolean force = true;


    if (job.isConcurrentExectionDisallowed()) {

        state = STATE_BLOCKED;

        force = false;

        try {

            getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),

                    STATE_BLOCKED, STATE_WAITING);

            getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),

                    STATE_BLOCKED, STATE_ACQUIRED);

            getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),


        catch (SQLException e) {

            throw new JobPersistenceException(

                    "Couldn't update states of blocked triggers: "

                            + e.getMessage(), e);




    if (trigger.getNextFireTime() == null) {

        // 下次执行时间为空,也就是说没有下次了,直接修改trigger的状态为完成

        state = STATE_COMPLETE;

        force = true;


    // 修改trigger的撞他信息

    storeTrigger(conn, trigger, job, true, state, force, false);


    // 返回任务的执行信息

    return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup()

            .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger

            .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());













sharedCode源码交流群,欢迎喜欢阅读源码的朋友加群,添加下面的微信, 备注”加群“ 。 


  1. quartz集群模式下qrtz_triggers表trigger_state变ERROR分析

    最近在正式环境新增了一个定时任务,项目启动后,新增的任务总是跑一两次就不跑了,排查发现trigger_state变为ERROR了. 一.Quartz重要表含义 1)qrtz_calendars:以Bl ...

  2. Redis集群模式源码分析

    目录 1 主从复制模式 2 Sentinel(哨兵)模式 3 Cluster模式 4.参考文档 1 主从复制模式 主库负责读写操作,从库负责数据同步,接受来自主库的同步命令.通过分析Redis的客户端 ...

  3. Kafka单机、集群模式安装详解(二)

    本文环境如下: 操作系统:CentOS 6 32位 JDK版本:1.8.0_77 32位 Kafka版本: 2.11) 接上篇 Kafka单机.集群模式安装详解(一) 6. ...

  4. 第十节: 利用SQLServer实现Quartz的持久化和双机热备的集群模式

    背景: 默认情况下,Quartz.Net作业是持久化在内存中的,即 quartz.jobStore.type = "Quartz.Simpl.RAMJobStore, Quartz" ...

  5. 第十节: 利用SQLServer实现Quartz的持久化和双机热备的集群模式 :

    背景: 默认情况下,Quartz.Net作业是持久化在内存中的,即 quartz.jobStore.type = "Quartz.Simpl.RAMJobStore, Quartz" ...

  6. Zookeeper源码分析:集群模式启动概述

    参考资料 <<从PAXOS到ZOOKEEPER分布式一致性原理与实践>> zookeeper-3.0.0 Zookeeper概述 Zookeeper是一个分布式的,开放源码的分 ...

  7. quartz集群调度机制调研及源码分析---转载

    quartz2.2.1集群调度机制调研及源码分析 引言 quartz集群架构 调度器实例化 调度过程 触发器的获取 触发trigger: Job执行过程: 总结: 附: 引言 quratz是目前最为成 ...

  8. zookeeper专题:zookeeper集群模式下,leader选举流程分析

    文章目录 Zookeeper 集群模式一共有三种类型的角色 1. zookeeper启动时leader选举流程 1.1 加载配置文件,设置基本信息 1.2 指定快速选举算法,启动多级队列.线程 1.3 ...

  9. Solr系列二:solr-部署详解(solr两种部署模式介绍、独立服务器模式详解、SolrCloud分布式集群模式详解)...

    一.solr两种部署模式介绍 Standalone Server 独立服务器模式:适用于数据规模不大的场景 SolrCloud  分布式集群模式:适用于数据规模大,高可靠.高可用.高并发的场景 二.独 ...


  1. php限制字符输入,.NET_asp.net(c#)限制用户输入规定的字符和数字的代码,一下是这个代码: 只允许 用 - phpStudy...
  2. ni visa pci_CHINACOAT 2019“推荐品牌”赫普菲乐|PCI可名文化出品
  3. animate inater插件_C4D R20插件下载 旧版插件C4D R20桥接插件INSYDIUMS Plug-In Bridge Cinema 4D R20 免费版 下载-脚本之家...
  4. mac brew 安装
  5. BZOJ1858 [Scoi2010]序列操作 线段树
  6. STM32开发 -- 4G模块开发详解(3)
  7. 【机器学习】情侣、基友、渣男和狗-基于时空关联规则的影子账户挖掘
  8. 【Java18】Mybatis:jdbc解耦,动态代理,日志
  9. webpack打包css文件
  10. 主流虚拟化产品对比列表01
  11. VS2015开发Android,自带模拟器无法调试、加载程序,算是坑吗
  12. ubantu中怎样安装VMware Tools
  13. 你不知道的思维导图能做的事
  14. python模拟购物车流程_用函数模拟简单的购物车(Python)
  15. Python 之pass 语句
  16. head first JavaScript pdf 下载
  17. SQL语句简单增删改查
  18. 比尔·盖茨持有过的中国股票
  19. 达人评测 r7 7730U和R5 7530U选哪个好 锐龙r77730U和R57530U对比
  20. 一辈子交186万五险一金!退休你能拿回多少?算完惊呆……


  1. ngrok实现内网穿透,让家里的笔记本也能做服务器
  2. spring注解是怎么实现的?
  3. BAOCMS怎么样修改默认后台路径admin/login/index.html
  4. Matlab图论工具箱
  5. pymongo学习笔记
  6. SDK接口调用主流程
  7. Javascript实现元素选择器功能
  8. 加拿大约克大学计算机本科学费,加拿大约克大学学费基本情况
  9. 【20210205期AI简报】联发科发布二代5G基带芯片发布、超强镜像优化从1.16GB到22.4MB!...
  10. 如何把图片扫描成word文档?