分布式定时任务—xxl-job学习(三)——调度中心(xxl-job-admin)的启动和任务调度过程源码分析

RabbitsInTheGrass 2020-06-30 10:31:08  813  收藏  原力计划
分类专栏: # xxl-job Java分布式
版权
分布式定时任务—xxl-job学习(三)调度中心(xxl-job-admin)的启动和任务调度过程源码分析
前言
一、调度中心的启动
1.1 分析XxlJobAdminConfig类
1.2 分析XxlJobScheduler.init()
1.2.1 initI18n()
1.2.2 JobRegistryMonitorHelper.getInstance().start()
1.2.3 JobFailMonitorHelper.getInstance().start()
1.2.3.1 分析失败重试中的JobTriggerPoolHelper.trigger()方法
1.2.3.2 分析失败预警中的XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log)方法
1.2.3.2.1 分析JobAlarmer预警类
1.2.3.2.2 分析EmailJobAlarm邮件预警类
1.2.4 JobLosedMonitorHelper.getInstance().start()
1.2.5 JobTriggerPoolHelper.toStart()
1.2.6 JobLogReportHelper.getInstance().start()
1.2.7 JobScheduleHelper.getInstance().start()
1.2.7.1 scheduleThread
1.2.7.2 ringThread
1.3 分析XxlJobTrigger核心类
1.3.1 trigger()方法
1.3.2 processTrigger()方法
1.3.3 runExecutor()方法
二、彩蛋
前言
接上一篇:分布式定时任务—xxl-job学习(二)——执行器的启动过程源码分析
更早:分布式定时任务—xxl-job学习(一):简单demo搭建

在上一篇我们深度分析了下xxl-job执行器组件的启动加载原理,本篇我们来深度挖掘一下调度中心(xxl-job-admin)的启动和调度过程。

xxl.job.version使用的依旧是2.2.1-SNAPSHOT版本

一、调度中心的启动

通过上边xxl-job-admin的代码目录层级我们可以知道它包括:controller层支持在Web页面进行CRUD、核心类core、持久层dao以及业务实现层service。

首先我们先看com.xxl.job.admin.core.conf目录下的XxlJobAdminConfig.java

1.1 分析XxlJobAdminConfig类
@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {

private static XxlJobAdminConfig adminConfig = null;
    public static XxlJobAdminConfig getAdminConfig() {
        return adminConfig;
    }

// ---------------------- XxlJobScheduler ----------------------

private XxlJobScheduler xxlJobScheduler;

@Override
    public void afterPropertiesSet() throws Exception {
        adminConfig = this;

xxlJobScheduler = new XxlJobScheduler();
        xxlJobScheduler.init();
    }

@Override
    public void destroy() throws Exception {
        xxlJobScheduler.destroy();
    }

// ---------------------- XxlJobScheduler ----------------------

// conf
    @Value("${xxl.job.i18n}")
    private String i18n;

@Value("${xxl.job.accessToken}")
    private String accessToken;

@Value("${spring.mail.from}")
    private String emailFrom;

@Value("${xxl.job.triggerpool.fast.max}")
    private int triggerPoolFastMax;

@Value("${xxl.job.triggerpool.slow.max}")
    private int triggerPoolSlowMax;

@Value("${xxl.job.logretentiondays}")
    private int logretentiondays;

// dao, service

@Resource
    private XxlJobLogDao xxlJobLogDao;
    @Resource
    private XxlJobInfoDao xxlJobInfoDao;
    @Resource
    private XxlJobRegistryDao xxlJobRegistryDao;
    @Resource
    private XxlJobGroupDao xxlJobGroupDao;
    @Resource
    private XxlJobLogReportDao xxlJobLogReportDao;
    @Resource
    private JavaMailSender mailSender;
    @Resource
    private DataSource dataSource;
    @Resource
    private JobAlarmer jobAlarmer;

public String getI18n() {
        if (!Arrays.asList("zh_CN", "zh_TC", "en").contains(i18n)) {
            return "zh_CN";
        }
        return i18n;
    }

public String getAccessToken() {
        return accessToken;
    }

public String getEmailFrom() {
        return emailFrom;
    }

public int getTriggerPoolFastMax() {
        if (triggerPoolFastMax < 200) {
            return 200;
        }
        return triggerPoolFastMax;
    }

public int getTriggerPoolSlowMax() {
        if (triggerPoolSlowMax < 100) {
            return 100;
        }
        return triggerPoolSlowMax;
    }

public int getLogretentiondays() {
        if (logretentiondays < 7) {
            return -1;  // Limit greater than or equal to 7, otherwise close
        }
        return logretentiondays;
    }

public XxlJobLogDao getXxlJobLogDao() {
        return xxlJobLogDao;
    }

public XxlJobInfoDao getXxlJobInfoDao() {
        return xxlJobInfoDao;
    }

public XxlJobRegistryDao getXxlJobRegistryDao() {
        return xxlJobRegistryDao;
    }

public XxlJobGroupDao getXxlJobGroupDao() {
        return xxlJobGroupDao;
    }

public XxlJobLogReportDao getXxlJobLogReportDao() {
        return xxlJobLogReportDao;
    }

public JavaMailSender getMailSender() {
        return mailSender;
    }

public DataSource getDataSource() {
        return dataSource;
    }

public JobAlarmer getJobAlarmer() {
        return jobAlarmer;
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
配置了i18n(调度中心国际化配置)、accessToken(调度中心通讯TOKEN)、emailFrom(报警邮箱)、triggerPoolFastMax( 调度线程池最大线程配置)、triggerPoolSlowMax( 调度线程池最大线程配置)、logretentiondays(调度中心日志表数据保存天数)等参数;
注入了DataSource、XxlJobLogDao、XxlJobInfoDao、XxlJobRegistryDao、XxlJobGroupDao、XxlJobLogReportDao、JavaMailSender、JobAlarmer等dao和service接口;
同时因为它实现了InitializingBean, DisposableBean接口,所以在这个对象初始化的时候会调用afterPropertiesSet()方法。
注意:
在afterPropertiesSet()方法中新建了一个XxlJobScheduler对象,并调用了它的init()方法。

1.2 分析XxlJobScheduler.init()
public void init() throws Exception {
    // init i18n
    initI18n();

// admin registry monitor run
    JobRegistryMonitorHelper.getInstance().start();

// admin fail-monitor run
    JobFailMonitorHelper.getInstance().start();

// admin lose-monitor run
    JobLosedMonitorHelper.getInstance().start();

// admin trigger pool start
    JobTriggerPoolHelper.toStart();

// admin log report start
    JobLogReportHelper.getInstance().start();

// start-schedule
    JobScheduleHelper.getInstance().start();

logger.info(">>>>>>>>> init xxl-job admin success.");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1.2.1 initI18n()
// ---------------------- I18n ----------------------

private void initI18n(){
    for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {
        item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));
    }
}
1
2
3
4
5
6
7
初始化新增任务页面中的阻塞处理策略相关的国际化信息。如果没有配置国际化语言则默认为中文。

1.2.2 JobRegistryMonitorHelper.getInstance().start()
private Thread registryThread;
private volatile boolean toStop = false;
public void start(){
    registryThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (!toStop) {
                try {
                    // auto registry group
                    List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
                    if (groupList!=null && !groupList.isEmpty()) {

// remove dead address (admin/executor)
                        List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
                        if (ids!=null && ids.size()>0) {
                            XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
                        }

// fresh online address (admin/executor)
                        HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
                        List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
                        if (list != null) {
                            for (XxlJobRegistry item: list) {
                                if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
                                    String appname = item.getRegistryKey();
                                    List<String> registryList = appAddressMap.get(appname);
                                    if (registryList == null) {
                                        registryList = new ArrayList<String>();
                                    }

if (!registryList.contains(item.getRegistryValue())) {
                                        registryList.add(item.getRegistryValue());
                                    }
                                    appAddressMap.put(appname, registryList);
                                }
                            }
                        }

// fresh group address
                        for (XxlJobGroup group: groupList) {
                            List<String> registryList = appAddressMap.get(group.getAppname());
                            String addressListStr = null;
                            if (registryList!=null && !registryList.isEmpty()) {
                                Collections.sort(registryList);
                                addressListStr = "";
                                for (String item:registryList) {
                                    addressListStr += item + ",";
                                }
                                addressListStr = addressListStr.substring(0, addressListStr.length()-1);
                            }
                            group.setAddressList(addressListStr);
                            XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
                        }
                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
                    }
                }
                try {
                    TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                } catch (InterruptedException e) {
                    if (!toStop) {
                        logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
                    }
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
        }
    });
    registryThread.setDaemon(true);
    registryThread.setName("xxl-job, admin JobRegistryMonitorHelper");
    registryThread.start();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
分析:
该类主要是创建一个守护线程管理注册地址(每30秒执行一次)

从xxl_job_group表查询所有自动注册的执行器分组groupList;
删除过期注册地址,查询xxl_job_registry表中update_time小于当前时间前90秒的记录并做delete删除;
查询xxl_job_registry表中update_time大于当前时间前90秒的注册记录,并存入HashMap<String, List<String>> appAddressMap中,key为执行器的appname,value为执行器注册地址集合;
循环groupList,从HashMap<String, List<String>> appAddressMap中获取每个分组对应的地址集合,拼接每一个地址,以","分隔,并更新到xxl_job_group中。
1.2.3 JobFailMonitorHelper.getInstance().start()
public void start(){
    monitorThread = new Thread(new Runnable() {

@Override
        public void run() {

// monitor
            while (!toStop) {
                try {

List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
                    if (failLogIds!=null && !failLogIds.isEmpty()) {
                        for (long failLogId: failLogIds) {

// lock log
                            int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
                            if (lockRet < 1) {
                                continue;
                            }
                            XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
                            XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());

// 1、fail retry monitor
                            if (log.getExecutorFailRetryCount() > 0) {
                                JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
                                String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
                                log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
                                XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
                            }

// 2、fail alarm monitor
                            int newAlarmStatus = 0;        // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
                            if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
                                boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
                                newAlarmStatus = alarmResult?2:3;
                            } else {
                                newAlarmStatus = 1;
                            }

XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
                        }
                    }

} catch (Exception e) {
                    if (!toStop) {
                        logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
                    }
                }

try {
                       TimeUnit.SECONDS.sleep(10);
                   } catch (Exception e) {
                       if (!toStop) {
                           logger.error(e.getMessage(), e);
                       }
                   }

}

logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");

}
    });
    monitorThread.setDaemon(true);
    monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
    monitorThread.start();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
分析:

新建一个预警守护线程(每十秒工作一次)
TimeUnit.SECONDS.sleep(10);

从xxl_job_log表查询失败日志记录id集合,每次取1000条;
List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);

<select id="findFailJobLogIds" resultType="long" >
        SELECT id FROM `xxl_job_log`
        WHERE !(
            (trigger_code in (0, 200) and handle_code = 0)
            OR
            (handle_code = 200)
        )
        AND `alarm_status` = 0
        ORDER BY id ASC
        LIMIT #{pagesize}
    </select>
1
2
3
4
5
6
7
8
9
10
11
如果failLogIds不为空,则循环failLogIds

锁住这条日志记录(其实是把这条记录的alarm_status字段从0变成了-1);

根据日志记录id从数据库加载日志记录对象XxlJobLog,根据日志记录里的jobId加载任务对象XxlJobInfo;

校验是否重试:如果任务设置了失败重试次数,那么进行失败重试。

将任务信息、失败次数-1加入调度线程队列中进行调度;
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null)
将重试信息更新到本条日志记录中;
String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
1
2
3
校验预警状态:
告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
如果任务配置了预警邮箱,调用所有实现JobAlarm接口的预警实现类的doAlarm(XxlJobInfo info, XxlJobLog jobLog)方法得到最终状态,并根据日志id把当前日志记录的状态从-1更新到最终状态。

1.2.3.1 分析失败重试中的JobTriggerPoolHelper.trigger()方法
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
    helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}
1
2
3
// job timeout count
private volatile long minTim = System.currentTimeMillis()/60000;     // ms > min
private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();

/**
 * add trigger
 */
public void addTrigger(final int jobId,
                       final TriggerTypeEnum triggerType,
                       final int failRetryCount,
                       final String executorShardingParam,
                       final String executorParam,
                       final String addressList) {

// choose thread pool
    ThreadPoolExecutor triggerPool_ = fastTriggerPool;
    AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
    if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
        triggerPool_ = slowTriggerPool;
    }

// trigger
    triggerPool_.execute(new Runnable() {
        @Override
        public void run() {

long start = System.currentTimeMillis();

try {
                // do trigger
                XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {

// check timeout-count-map
                long minTim_now = System.currentTimeMillis()/60000;
                if (minTim != minTim_now) {
                    minTim = minTim_now;
                    jobTimeoutCountMap.clear();
                }

// incr timeout-count-map
                long cost = System.currentTimeMillis()-start;
                if (cost > 500) {       // ob-timeout threshold 500ms
                    AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                    if (timeoutCount != null) {
                        timeoutCount.incrementAndGet();
                    }
                }

}

}
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
分析:

JobTriggerPoolHelper类中有一个ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap变量,保存每个任务的执行次数,key为任务的jobId;
根据传入的jobId,从jobTimeoutCountMap取到对应的值jobTimeoutCount;
如果jobTimeoutCount不为空并且大于10(即该任务在一分钟内调度了10次)则使用慢线程池来执行任务,否则使用默认的快线程池;
调用XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);执行任务;
check timeout-count-map;(==注意:==此处暂时没明白为什么要判断minTim != minTim_now,待补充。。。。)
计算调度任务的花费时间,如果大于500ms,则先调用jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1)),如果已经存在对应的key,则调用incrementAndGet()方法加1。
XxlJobTrigger.trigger()方法稍后再详细讲解。

1.2.3.2 分析失败预警中的XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log)方法
1.2.3.2.1 分析JobAlarmer预警类
@Component
public class JobAlarmer implements ApplicationContextAware, InitializingBean {
    private static Logger logger = LoggerFactory.getLogger(JobAlarmer.class);

private ApplicationContext applicationContext;
    private List<JobAlarm> jobAlarmList;

@Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

@Override
    public void afterPropertiesSet() throws Exception {
        Map<String, JobAlarm> serviceBeanMap = applicationContext.getBeansOfType(JobAlarm.class);
        if (serviceBeanMap != null && serviceBeanMap.size() > 0) {
            jobAlarmList = new ArrayList<JobAlarm>(serviceBeanMap.values());
        }
    }

/**
     * job alarm
     *
     * @param info
     * @param jobLog
     * @return
     */
    public boolean alarm(XxlJobInfo info, XxlJobLog jobLog) {

boolean result = false;
        if (jobAlarmList!=null && jobAlarmList.size()>0) {
            result = true;  // success means all-success
            for (JobAlarm alarm: jobAlarmList) {
                boolean resultItem = false;
                try {
                    resultItem = alarm.doAlarm(info, jobLog);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
                if (!resultItem) {
                    result = false;
                }
            }
        }

return result;
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
分析:

这个类ApplicationContextAware, InitializingBean接口,重写了setApplicationContext()方法和afterPropertiesSet()方法,那么这个对象在初始化的时候会从Spring容器对象ApplicationContext中取到指定的类的list。在这里就是所有的JobAlarm,于是我们拿到了jobAlarmList。
alarm(info, log)方法其实是循环jobAlarmList,执行每个JobAlarm类的doAlarm方法。
1.2.3.2.2 分析EmailJobAlarm邮件预警类
@Component
public class EmailJobAlarm implements JobAlarm {
    private static Logger logger = LoggerFactory.getLogger(EmailJobAlarm.class);

/**
     * fail alarm
     *
     * @param jobLog
     */
    public boolean doAlarm(XxlJobInfo info, XxlJobLog jobLog){
        boolean alarmResult = true;

// send monitor email
        if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {

// alarmContent
            String alarmContent = "Alarm Job LogId=" + jobLog.getId();
            if (jobLog.getTriggerCode() != ReturnT.SUCCESS_CODE) {
                alarmContent += "<br>TriggerMsg=<br>" + jobLog.getTriggerMsg();
            }
            if (jobLog.getHandleCode()>0 && jobLog.getHandleCode() != ReturnT.SUCCESS_CODE) {
                alarmContent += "<br>HandleCode=" + jobLog.getHandleMsg();
            }

// email info
            XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(Integer.valueOf(info.getJobGroup()));
            String personal = I18nUtil.getString("admin_name_full");
            String title = I18nUtil.getString("jobconf_monitor");
            String content = MessageFormat.format(loadEmailJobAlarmTemplate(),
                    group!=null?group.getTitle():"null",
                    info.getId(),
                    info.getJobDesc(),
                    alarmContent);

Set<String> emailSet = new HashSet<String>(Arrays.asList(info.getAlarmEmail().split(",")));
            for (String email: emailSet) {

// make mail
                try {
                    MimeMessage mimeMessage = XxlJobAdminConfig.getAdminConfig().getMailSender().createMimeMessage();

MimeMessageHelper helper = new MimeMessageHelper(mimeMessage, true);
                    helper.setFrom(XxlJobAdminConfig.getAdminConfig().getEmailFrom(), personal);
                    helper.setTo(email);
                    helper.setSubject(title);
                    helper.setText(content, true);

XxlJobAdminConfig.getAdminConfig().getMailSender().send(mimeMessage);
                } catch (Exception e) {
                    logger.error(">>>>>>>>>>> xxl-job, job fail alarm email send error, JobLogId:{}", jobLog.getId(), e);

alarmResult = false;
                }

}
        }

return alarmResult;
    }

/**
     * load email job alarm template
     *
     * @return
     */
    private static final String loadEmailJobAlarmTemplate(){
        String mailBodyTemplate = "<h5>" + I18nUtil.getString("jobconf_monitor_detail") + ":</span>" +
                "<table border=\"1\" cellpadding=\"3\" style=\"border-collapse:collapse; width:80%;\" >\n" +
                "   <thead style=\"font-weight: bold;color: #ffffff;background-color: #ff8c00;\" >" +
                "      <tr>\n" +
                "         <td width=\"20%\" >"+ I18nUtil.getString("jobinfo_field_jobgroup") +"</td>\n" +
                "         <td width=\"10%\" >"+ I18nUtil.getString("jobinfo_field_id") +"</td>\n" +
                "         <td width=\"20%\" >"+ I18nUtil.getString("jobinfo_field_jobdesc") +"</td>\n" +
                "         <td width=\"10%\" >"+ I18nUtil.getString("jobconf_monitor_alarm_title") +"</td>\n" +
                "         <td width=\"40%\" >"+ I18nUtil.getString("jobconf_monitor_alarm_content") +"</td>\n" +
                "      </tr>\n" +
                "   </thead>\n" +
                "   <tbody>\n" +
                "      <tr>\n" +
                "         <td>{0}</td>\n" +
                "         <td>{1}</td>\n" +
                "         <td>{2}</td>\n" +
                "         <td>"+ I18nUtil.getString("jobconf_monitor_alarm_type") +"</td>\n" +
                "         <td>{3}</td>\n" +
                "      </tr>\n" +
                "   </tbody>\n" +
                "</table>";

return mailBodyTemplate;
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
分析:

实现了JobAlarm预警接口
如果配置了报警邮件地址(多个地址使用","分隔),则发送预警邮件。
发送邮件这块实际上是使用了spring-boot-starter-mail,
其他发送邮件的各种方法可以参考本人的
SpringBoot学习历程(十一):SpringBoot2.X集成mail发送邮件

如果要实现自己业务的其他预警方法,只需要实现JobAlarm接口,重写doAlarm方法即可。(比如钉钉等)

1.2.4 JobLosedMonitorHelper.getInstance().start()
public class JobLosedMonitorHelper {
    private static Logger logger = LoggerFactory.getLogger(JobLosedMonitorHelper.class);
    
    private static JobLosedMonitorHelper instance = new JobLosedMonitorHelper();
    public static JobLosedMonitorHelper getInstance(){
        return instance;
    }

// ---------------------- monitor ----------------------

private Thread monitorThread;
    private volatile boolean toStop = false;
    public void start(){
        monitorThread = new Thread(new Runnable() {

@Override
            public void run() {

// monitor
                while (!toStop) {
                    try {
                        // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
                        Date losedTime = DateUtil.addMinutes(new Date(), -10);
                        List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);

if (losedJobIds!=null && losedJobIds.size()>0) {
                            for (Long logId: losedJobIds) {

XxlJobLog jobLog = new XxlJobLog();
                                jobLog.setId(logId);

jobLog.setHandleTime(new Date());
                                jobLog.setHandleCode(ReturnT.FAIL_CODE);
                                jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );

XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(jobLog);
                            }

}
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
                        }
                    }

try {
                        TimeUnit.SECONDS.sleep(60);
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

}

logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");

}
        });
        monitorThread.setDaemon(true);
        monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");
        monitorThread.start();
    }

public void toStop(){
        toStop = true;
        // interrupt and wait
        monitorThread.interrupt();
        try {
            monitorThread.join();
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
分析:
该类主要是任务结果丢失处理

Date losedTime = DateUtil.addMinutes(new Date(), -10)取到十分钟前的时间;
从xxl_job_log表查询调度记录停留在 “运行中” 状态超过10min,且对应执行器心跳注册失败不在线的记录id集合;
    <select id="findLostJobIds" resultType="long" >
        SELECT t.id
        FROM xxl_job_log AS t
        WHERE t.trigger_code = 200
            and t.handle_code = 0
            and t.trigger_time <![CDATA[ <= ]]> #{losedTime}
            and t.executor_address not in (
                SELECT t2.registry_value
                FROM xxl_job_registry AS t2
            )
    </select>
1
2
3
4
5
6
7
8
9
10
11
如果id集合不为空,则循环更新xxl_job_log表对应记录,将本地调度主动标记失败。
1.2.5 JobTriggerPoolHelper.toStart()
// ---------------------- helper ----------------------

private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();

public static void toStart() {
    helper.start();
}
public static void toStop() {
    helper.stop();
}
1
2
3
4
5
6
7
8
9
10
// ---------------------- trigger pool ----------------------

// fast/slow thread pool
private ThreadPoolExecutor fastTriggerPool = null;
private ThreadPoolExecutor slowTriggerPool = null;

public void start(){
    fastTriggerPool = new ThreadPoolExecutor(
            10,
            XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(1000),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
                }
            });

slowTriggerPool = new ThreadPoolExecutor(
            10,
            XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(2000),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
                }
            });
}

public void stop() {
    //triggerPool.shutdown();
    fastTriggerPool.shutdownNow();
    slowTriggerPool.shutdownNow();
    logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
分析:
初始化快慢两个线程池:fastTriggerPool和slowTriggerPool。当任务执行的时候默认使用快线程池fastTriggerPool;当任务执行失败并且重试次数达到阈值的时候就会使用慢线程池slowTriggerPool来执行任务

1.2.6 JobLogReportHelper.getInstance().start()
public class JobLogReportHelper {
    private static Logger logger = LoggerFactory.getLogger(JobLogReportHelper.class);

private static JobLogReportHelper instance = new JobLogReportHelper();
    public static JobLogReportHelper getInstance(){
        return instance;
    }

private Thread logrThread;
    private volatile boolean toStop = false;
    public void start(){
        logrThread = new Thread(new Runnable() {

@Override
            public void run() {

// last clean log time
                long lastCleanLogTime = 0;

while (!toStop) {

// 1、log-report refresh: refresh log report in 3 days
                    try {

for (int i = 0; i < 3; i++) {

// today
                            Calendar itemDay = Calendar.getInstance();
                            itemDay.add(Calendar.DAY_OF_MONTH, -i);
                            itemDay.set(Calendar.HOUR_OF_DAY, 0);
                            itemDay.set(Calendar.MINUTE, 0);
                            itemDay.set(Calendar.SECOND, 0);
                            itemDay.set(Calendar.MILLISECOND, 0);

Date todayFrom = itemDay.getTime();

itemDay.set(Calendar.HOUR_OF_DAY, 23);
                            itemDay.set(Calendar.MINUTE, 59);
                            itemDay.set(Calendar.SECOND, 59);
                            itemDay.set(Calendar.MILLISECOND, 999);

Date todayTo = itemDay.getTime();

// refresh log-report every minute
                            XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();
                            xxlJobLogReport.setTriggerDay(todayFrom);
                            xxlJobLogReport.setRunningCount(0);
                            xxlJobLogReport.setSucCount(0);
                            xxlJobLogReport.setFailCount(0);

Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);
                            if (triggerCountMap!=null && triggerCountMap.size()>0) {
                                int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;
                                int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;
                                int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;
                                int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;

xxlJobLogReport.setRunningCount(triggerDayCountRunning);
                                xxlJobLogReport.setSucCount(triggerDayCountSuc);
                                xxlJobLogReport.setFailCount(triggerDayCountFail);
                            }

// do refresh
                            int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
                            if (ret < 1) {
                                XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
                            }
                        }

} catch (Exception e) {
                        if (!toStop) {
                            logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e);
                        }
                    }

// 2、log-clean: switch open & once each day
                    if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0
                            && System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) {

// expire-time
                        Calendar expiredDay = Calendar.getInstance();
                        expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());
                        expiredDay.set(Calendar.HOUR_OF_DAY, 0);
                        expiredDay.set(Calendar.MINUTE, 0);
                        expiredDay.set(Calendar.SECOND, 0);
                        expiredDay.set(Calendar.MILLISECOND, 0);
                        Date clearBeforeTime = expiredDay.getTime();

// clean expired log
                        List<Long> logIds = null;
                        do {
                            logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);
                            if (logIds!=null && logIds.size()>0) {
                                XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);
                            }
                        } while (logIds!=null && logIds.size()>0);

// update clean time
                        lastCleanLogTime = System.currentTimeMillis();
                    }

try {
                        TimeUnit.MINUTES.sleep(1);
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

}

logger.info(">>>>>>>>>>> xxl-job, job log report thread stop");

}
        });
        logrThread.setDaemon(true);
        logrThread.setName("xxl-job, admin JobLogReportHelper");
        logrThread.start();
    }

public void toStop(){
        toStop = true;
        // interrupt and wait
        logrThread.interrupt();
        try {
            logrThread.join();
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
分析: 创建一个守护线程(每一分钟工作一次)

刷新前三天的日志相关报表数据:
取到前N天的开始时间和结束时间;
从xxl_job_log表统计相关指标;
    <select id="findLogReport" resultType="java.util.Map" >
        SELECT
            COUNT(handle_code) triggerDayCount,
            SUM(CASE WHEN (trigger_code in (0, 200) and handle_code = 0) then 1 else 0 end) as triggerDayCountRunning,
            SUM(CASE WHEN handle_code = 200 then 1 else 0 end) as triggerDayCountSuc
        FROM xxl_job_log
        WHERE trigger_time BETWEEN #{from} and #{to}
    </select>
1
2
3
4
5
6
7
8
保存结果到xxl_job_log_report表,有则更新,没有则新增
清理N天前的日志记录(N就是我们配置的xxl.job.logretentiondays参数,必须大于7,否则无效)
删除xxl_job_log表trigger_time是N天前的数据。
1.2.7 JobScheduleHelper.getInstance().start()
public class JobScheduleHelper {
    private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class);

private static JobScheduleHelper instance = new JobScheduleHelper();
    public static JobScheduleHelper getInstance(){
        return instance;
    }

public static final long PRE_READ_MS = 5000;    // pre read

private Thread scheduleThread;
    private Thread ringThread;
    private volatile boolean scheduleThreadToStop = false;
    private volatile boolean ringThreadToStop = false;
    private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();

public void start(){

// schedule thread
        scheduleThread = new Thread(new Runnable() {
            @Override
            public void run() {

try {
                    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
                } catch (InterruptedException e) {
                    if (!scheduleThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
                logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
                int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

while (!scheduleThreadToStop) {

// Scan Job
                    long start = System.currentTimeMillis();

Connection conn = null;
                    Boolean connAutoCommit = null;
                    PreparedStatement preparedStatement = null;

boolean preReadSuc = true;
                    try {

conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                        connAutoCommit = conn.getAutoCommit();
                        conn.setAutoCommit(false);

preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                        preparedStatement.execute();

// tx start

// 1、pre read
                        long nowTime = System.currentTimeMillis();
                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                        if (scheduleList!=null && scheduleList.size()>0) {
                            // 2、push time-ring
                            for (XxlJobInfo jobInfo: scheduleList) {

// time-ring jump
                                if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                    // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                    logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

// fresh next
                                    refreshNextValidTime(jobInfo, new Date());

} else if (nowTime > jobInfo.getTriggerNextTime()) {
                                    // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time

// 1、trigger
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

// 2、fresh next
                                    refreshNextValidTime(jobInfo, new Date());

// next-trigger-time in 5s, pre-read again
                                    if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

// 1、make ring second
                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

// 2、push time ring
                                        pushTimeRing(ringSecond, jobInfo.getId());

// 3、fresh next
                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

}

} else {
                                    // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time

// 1、make ring second
                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

// 2、push time ring
                                    pushTimeRing(ringSecond, jobInfo.getId());

// 3、fresh next
                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

}

}

// 3、update trigger info
                            for (XxlJobInfo jobInfo: scheduleList) {
                                XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                            }

} else {
                            preReadSuc = false;
                        }

// tx stop

} catch (Exception e) {
                        if (!scheduleThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                        }
                    } finally {

// commit
                        if (conn != null) {
                            try {
                                conn.commit();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                conn.setAutoCommit(connAutoCommit);
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                conn.close();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }

// close PreparedStatement
                        if (null != preparedStatement) {
                            try {
                                preparedStatement.close();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }
                    }
                    long cost = System.currentTimeMillis()-start;

// Wait seconds, align second
                    if (cost < 1000) {  // scan-overtime, not wait
                        try {
                            // pre-read period: success > scan each second; fail > skip this period;
                            TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                        } catch (InterruptedException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }

}

logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
            }
        });
        scheduleThread.setDaemon(true);
        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
        scheduleThread.start();

// ring thread
        ringThread = new Thread(new Runnable() {
            @Override
            public void run() {

// align second
                try {
                    TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
                } catch (InterruptedException e) {
                    if (!ringThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }

while (!ringThreadToStop) {

try {
                        // second data
                        List<Integer> ringItemData = new ArrayList<>();
                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                        for (int i = 0; i < 2; i++) {
                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                            if (tmpData != null) {
                                ringItemData.addAll(tmpData);
                            }
                        }

// ring trigger
                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                        if (ringItemData.size() > 0) {
                            // do trigger
                            for (int jobId: ringItemData) {
                                // do trigger
                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                            }
                            // clear
                            ringItemData.clear();
                        }
                    } catch (Exception e) {
                        if (!ringThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                        }
                    }

// next second, align second
                    try {
                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
                    } catch (InterruptedException e) {
                        if (!ringThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
            }
        });
        ringThread.setDaemon(true);
        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
        ringThread.start();
    }

private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws ParseException {
        Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(fromTime);
        if (nextValidTime != null) {
            jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
            jobInfo.setTriggerNextTime(nextValidTime.getTime());
        } else {
            jobInfo.setTriggerStatus(0);
            jobInfo.setTriggerLastTime(0);
            jobInfo.setTriggerNextTime(0);
        }
    }

private void pushTimeRing(int ringSecond, int jobId){
        // push async ring
        List<Integer> ringItemData = ringData.get(ringSecond);
        if (ringItemData == null) {
            ringItemData = new ArrayList<Integer>();
            ringData.put(ringSecond, ringItemData);
        }
        ringItemData.add(jobId);

logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
    }

public void toStop(){

// 1、stop schedule
        scheduleThreadToStop = true;
        try {
            TimeUnit.SECONDS.sleep(1);  // wait
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        }
        if (scheduleThread.getState() != Thread.State.TERMINATED){
            // interrupt and wait
            scheduleThread.interrupt();
            try {
                scheduleThread.join();
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
        }

// if has ring data
        boolean hasRingData = false;
        if (!ringData.isEmpty()) {
            for (int second : ringData.keySet()) {
                List<Integer> tmpData = ringData.get(second);
                if (tmpData!=null && tmpData.size()>0) {
                    hasRingData = true;
                    break;
                }
            }
        }
        if (hasRingData) {
            try {
                TimeUnit.SECONDS.sleep(8);
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
        }

// stop ring (wait job-in-memory stop)
        ringThreadToStop = true;
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        }
        if (ringThread.getState() != Thread.State.TERMINATED){
            // interrupt and wait
            ringThread.interrupt();
            try {
                ringThread.join();
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
        }

logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop");
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
分析:
创建了两个守护线程scheduleThread和ringThread

1.2.7.1 scheduleThread
根据treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)计算预读取任务数;
执行for update语句锁定任务的获取资格;
从xxl_job_info表查询未来5秒内即将要执行的任务集合;
    <select id="scheduleJobQuery" parameterType="java.util.HashMap" resultMap="XxlJobInfo">
        SELECT <include refid="Base_Column_List" />
        FROM xxl_job_info AS t
        WHERE t.trigger_status = 1
            and t.trigger_next_time <![CDATA[ <= ]]> #{maxNextTime}
        ORDER BY id ASC
        LIMIT #{pagesize}
    </select>
1
2
3
4
5
6
7
8
如果当前任务的触发时间已经超时5秒以上,跳过本次执行,计算下一次的执行时间
如果当前时间大于任务的下次触发时间,则调用JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);执行调度;计算下一次的执行时间;如果任务正在运行中并且下一次执行时间在5秒内,则进行此任务数据的缓存,处理逻辑就是第6步;
其他情况下则存入Map<Integer, List<Integer>> ringData 中,key为下次执行时间数,value为待执行任务id集合;
更新任务的触发调度信息;
1.2.7.2 ringThread
避免处理耗时太长,跨过刻度,向前校验一个刻度;
从Map<Integer, List<Integer>> ringData中取两个刻度(当前秒和上一秒)的任务id集合;
循环第二步的任务id集合,调用JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);触发调度。
1.3 分析XxlJobTrigger核心类
1.3.1 trigger()方法
public static void trigger(int jobId,
                           TriggerTypeEnum triggerType,
                           int failRetryCount,
                           String executorShardingParam,
                           String executorParam,
                           String addressList) {

// load data
    XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
    if (jobInfo == null) {
        logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
        return;
    }
    if (executorParam != null) {
        jobInfo.setExecutorParam(executorParam);
    }
    int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
    XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

// cover addressList
    if (addressList!=null && addressList.trim().length()>0) {
        group.setAddressType(1);
        group.setAddressList(addressList.trim());
    }

// sharding param
    int[] shardingParam = null;
    if (executorShardingParam!=null){
        String[] shardingArr = executorShardingParam.split("/");
        if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
            shardingParam = new int[2];
            shardingParam[0] = Integer.valueOf(shardingArr[0]);
            shardingParam[1] = Integer.valueOf(shardingArr[1]);
        }
    }
    if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
            && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
            && shardingParam==null) {
        for (int i = 0; i < group.getRegistryList().size(); i++) {
            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
        }
    } else {
        if (shardingParam == null) {
            shardingParam = new int[]{0, 1};
        }
        processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
分析:

根据jobId从数据库xxl_job_info表中加载任务信息XxlJobInfo;
校验方法入参executorParam,不为空则覆盖任务信息的executorParam;
校验方法入参failRetryCount,大于等于0则覆盖;
从数据库xxl_job_group表加载执行器信息,校验方法入参addressList,不为空则覆盖执行器的地址类型为手动注入1,地址列表为入参addressList;
校验方法入参executorShardingParam,
如果任务的路由策略是分片广播且执行器的注册地址列表不为空,需要根据分片参数调用本类中的processTrigger()方法。
1.3.2 processTrigger()方法
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){

// param
    ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
    ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
    String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

// 1、save log-id
    XxlJobLog jobLog = new XxlJobLog();
    jobLog.setJobGroup(jobInfo.getJobGroup());
    jobLog.setJobId(jobInfo.getId());
    jobLog.setTriggerTime(new Date());
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
    logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

// 2、init trigger-param
    TriggerParam triggerParam = new TriggerParam();
    triggerParam.setJobId(jobInfo.getId());
    triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
    triggerParam.setExecutorParams(jobInfo.getExecutorParam());
    triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
    triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
    triggerParam.setLogId(jobLog.getId());
    triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
    triggerParam.setGlueType(jobInfo.getGlueType());
    triggerParam.setGlueSource(jobInfo.getGlueSource());
    triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
    triggerParam.setBroadcastIndex(index);
    triggerParam.setBroadcastTotal(total);

// 3、init address
    String address = null;
    ReturnT<String> routeAddressResult = null;
    if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
            if (index < group.getRegistryList().size()) {
                address = group.getRegistryList().get(index);
            } else {
                address = group.getRegistryList().get(0);
            }
        } else {
            routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
            if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                address = routeAddressResult.getContent();
            }
        }
    } else {
        routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
    }

// 4、trigger remote executor
    ReturnT<String> triggerResult = null;
    if (address != null) {
        triggerResult = runExecutor(triggerParam, address);
    } else {
        triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
    }

// 5、collection trigger info
    StringBuffer triggerMsgSb = new StringBuffer();
    triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
            .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
    if (shardingParam != null) {
        triggerMsgSb.append("("+shardingParam+")");
    }
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);

triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
            .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");

// 6、save log trigger-info
    jobLog.setExecutorAddress(address);
    jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
    jobLog.setExecutorParam(jobInfo.getExecutorParam());
    jobLog.setExecutorShardingParam(shardingParam);
    jobLog.setExecutorFailRetryCount(finalFailRetryCount);
    //jobLog.setTriggerTime();
    jobLog.setTriggerCode(triggerResult.getCode());
    jobLog.setTriggerMsg(triggerMsgSb.toString());
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);

logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
分析:

处理方法入参:阻塞处理策略、路由策略、分片参数;
保存一条日志记录到xxl_job_log表;
初始化任务调度参数(包括执行器参数、日志记录id、任务信息和分片参数);
初始化调度地址(如果路由策略是分片广播,则从执行器地址列表中获取分片参数i对应的地址即可;如果是其他路由策略则调用对应ExecutorRouter的route方法获取最终执行器地址);
调用runExecutor(triggerParam, address)方法执行远程任务调度;
组装任务触发调度信息;
将触发调度信息更新到对应日志记录中。
1.3.3 runExecutor()方法
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
    ReturnT<String> runResult = null;
    try {
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }

StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
    runResultSB.append("<br>address:").append(address);
    runResultSB.append("<br>code:").append(runResult.getCode());
    runResultSB.append("<br>msg:").append(runResult.getMsg());

runResult.setMsg(runResultSB.toString());
    return runResult;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
分析:

根据执行器地址获取对应的ExecutorBizClient(所有的执行器客户端会保存在ConcurrentMap<String, ExecutorBiz> executorBizRepository中);
调用ExecutorBizClient.run(triggerParam)方法;
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
    return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
1
2
3
4
使用XxlJobRemotingUtil工具类发送了一个post请求
处理返回的结果信息;
二、彩蛋
本篇跟进了一下调度中心(xxl-job-admin)的启动过程和任务调度触发原理,下一篇我们继续跟进下调度中心的web页面端api调用源码分析。
————————————————
版权声明:本文为CSDN博主「RabbitsInTheGrass」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/RabbitInTheGrass/article/details/106996040

分布式定时任务—xxl-job学习(三)——调度中心(xxl-job-admin)的启动和任务调度过程源码分析相关推荐

  1. nimble源码学习——广播流程源码分析1

    广播流程源码分析1 在controller层有多种状态:广播.空闲.连接等等,这次分析的是广播这个状态或者叫 做角色.在前面controller层循环的分析中,可以明确controller层也有eve ...

  2. XxlJob(一) 分布式定时任务XxlJob用法及核心调度源码详解

    目录 一.XxlJob 的Executor 1. 使用Spring框架注入 2. 不使用框架注入 3. 使用jar包的形式集成executor 二.XxlJob的核心工作原理 1. 注册JobHand ...

  3. 快速学习-XXL-JOB调度中心/执行器 RESTful API

    六.调度中心/执行器 RESTful API XXL-JOB 目标是一种跨平台.跨语言的任务调度规范和协议. 针对Java应用,可以直接通过官方提供的调度中心与执行器,方便快速的接入和使用调度中心,可 ...

  4. FPGA学习之路—接口(3)—SPI详解及Verilog源码分析

    FPGA学习之路--SPI详解及Verilog源码分析 概述 SPI = Serial Peripheral Interface,是串行外围设备接口,是一种高速,全双工,同步的通信总线. 优点 支持全 ...

  5. FPGA学习之路—接口(2)—I2C协议详解+Verilog源码分析

    FPGA学习之路--I2C协议详解+Verilog源码分析 定义 I2C Bus(Inter-Integrated Circuit Bus) 最早是由Philips半导体(现被NXP收购)开发的两线时 ...

  6. (三)Druid数据库连接池如何回收线程源码分析和高级参数优化

    (1)销毁连接线程核心逻辑: 首先需要先知道几个关键的对象代表的含义: DruidConnectionHolder[] evictConnections对象中的connection都是准备关闭的,放进 ...

  7. 嵌入式linux开发uboot移植(三)——uboot启动过程源码分析

    一.uboot启动流程简介 与大多数BootLoader一样,uboot的启动过程分为BL1和BL2两个阶段.BL1阶段通常是开发板的配置等设备初始化代码,需要依赖依赖于SoC体系结构,通常用汇编语言 ...

  8. [源码学习][知了开发]WebMagic-总体流程源码分析

    写在前面 前一段时间开发[知了]用到了很多技术(可以看我前面的博文http://blog.csdn.net/wsrspirit/article/details/51751568),这段时间抽空把这些整 ...

  9. Ceph学习——Librbd块存储库与RBD读写流程源码分析

    Librbd是Ceph提供块存储的库,它实现了RBD接口,基于LIbrados实现了对RBD的基本操作.Librbd对于元数据的相关操作是通过cls_rbd实现的.cls_rbd是Cls的一个扩展模块 ...

最新文章

  1. 解决oracle11g安装导致数据库无法自动搜集统计信息-转
  2. python windows编程_在Windows下配置Python编程学习环境
  3. 【博士论文】集群系统中的网络流调度
  4. 自动定位失败_端到端定位5G SA接入问题
  5. 用wubi在一个独立分区硬件装ubuntu12.04产生的无法识别U盘的解决办法
  6. dat14-memcached
  7. 实现一个Ajax模式的文件上传功能有多难?
  8. docker -v 文件夹下没有数据_微服务就是Dubbo?并没有那么简单!微服务架构+Docker+k8s了解下...
  9. as3 primitives
  10. APP支付报错ALI40247处理方案!
  11. android根据ip获取查询省份,通过IP地址获取省份城市位置信息
  12. Linux检查服务器cpu状态脚本,Linux服务器硬件运行状态及故障邮件提醒的监控脚本分享...
  13. 聊聊docker的使用心得
  14. 思科模拟器GNS3将路由器变成交换机的方法
  15. 微信小程序 短信验证码
  16. java评论功能怎么实现_评论功能的简单实现
  17. Mybatis源码-cursor(游标)
  18. InZiv 为 MicroLED 显示检测技术筹集 1000 万美元
  19. 危与机并存 保险业如何走好线上线下业务并举转型之路?
  20. python字典操作 EasyDict()作用

热门文章

  1. 数据结构与算法——31. 图的应用:拓扑排序、强连通分支、最短路径问题、最小生成树
  2. java项目-宿舍管理系统
  3. ie浏览器能显示ftp协议的图片_浏览器向服务器请求一张图片,到底发生了什么?...
  4. 审批工作流系列教程 前言
  5. 大数据如何在前端流畅展示
  6. 【撷英采华】2020年6月-7月教学周期学员考试
  7. python做一副54扑克牌发牌_python模拟实现分发扑克牌
  8. 配置kubeconfig_多Kubernetes集群如何切换?基于Kubectl客户端配置的实操分享
  9. cmmi实践访谈测试ppt_计划并实施CMMI_实践篇精选.ppt
  10. Python函数的变量域