ElasticJob源码分析--定时任务执行JobScheduler类分析
public static void main(String[] args) {// 初始化数据源DataSource dataSource = MovieServiceUtils.getDataSource();// 定义日志数据库事件溯源配置JobEventConfiguration jobEventRdbConfig = new JobEventRdbConfiguration(dataSource);new JobScheduler(createRegistryCenter(), createJobConfiguration(), jobEventRdbConfig).init();}
JobScheduler执行过程
public JobScheduler(CoordinatorRegistryCenter regCenter, LiteJobConfiguration liteJobConfig, JobEventConfiguration jobEventConfig, ElasticJobListener... elasticJobListeners) {this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);}private JobScheduler(CoordinatorRegistryCenter regCenter, LiteJobConfiguration liteJobConfig, JobEventBus jobEventBus, ElasticJobListener... elasticJobListeners) {JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());this.liteJobConfig = liteJobConfig;this.regCenter = regCenter;List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);this.setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);this.schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);this.jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);}
这里创建的是JobScheduler这个对象,然后下面调用了init方法。
public void init() {LiteJobConfiguration liteJobConfigFromRegCenter = this.schedulerFacade.updateJobConfiguration(this.liteJobConfig);JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());JobScheduleController jobScheduleController = new JobScheduleController(this.createScheduler(), this.createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, this.regCenter);this.schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());}
初始化的几个动作:
--初始化设置分片总量,这些参数都是通过配置中心中获取到的,也就是LiteJobConfiguration这个类。
--创建调度器,createScheduler方法。
--创建任务详情,createJobdDetail方法来创建。
--注册任务到ZK上面。
--注册作业启动信息registerStartUpInfo方法。
--进行作业调度scheduleJob方法。
createScheduler方法
这里创建的就是Quartz里面的scheduler。
然后这个方法里面还添加了任务触发监听器。
private Scheduler createScheduler() {try {StdSchedulerFactory factory = new StdSchedulerFactory();factory.initialize(this.getBaseQuartzProperties());Scheduler result = factory.getScheduler();result.getListenerManager().addTriggerListener(this.schedulerFacade.newJobTriggerListener());return result;} catch (SchedulerException var3) {throw new JobSystemException(var3);}}
factory这个地方调用了getScheduler这个方法,然后这个方法其实调用了的是Quartz中的方法。
然后上面的initialize方法是调用了一个getBaseQuartzProperties方法,在这个方法里面配置了相关的参数。
private Properties getBaseQuartzProperties() {Properties result = new Properties();result.put("org.quartz.threadPool.class", SimpleThreadPool.class.getName());result.put("org.quartz.threadPool.threadCount", "1");result.put("org.quartz.scheduler.instanceName", this.liteJobConfig.getJobName());result.put("org.quartz.jobStore.misfireThreshold", "1");result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());return result;}
getSchedule方法调用的Quartz中的Scheduler中的方法。
Scheduler在使用之前需要进行实例化,从SchedulerFactory创建它开始,到scheduler调用shutdown结束。
他在实例化之后,可以启动(start)、暂停(standby)、停止(shutdown),然后只有start的scheduler才能被触发(trigger).
public Scheduler getScheduler() throws SchedulerException {if (this.cfg == null) {this.initialize();}SchedulerRepository schedRep = SchedulerRepository.getInstance();Scheduler sched = schedRep.lookup(this.getSchedulerName());if (sched != null) {if (!sched.isShutdown()) {return sched;}schedRep.remove(this.getSchedulerName());}sched = this.instantiate();return sched;}
CreateJobDetail方法
private JobDetail createJobDetail(String jobClass) {JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(this.liteJobConfig.getJobName()).build();result.getJobDataMap().put("jobFacade", this.jobFacade);Optional<ElasticJob> elasticJobInstance = this.createElasticJobInstance();if (elasticJobInstance.isPresent()) {result.getJobDataMap().put("elasticJob", elasticJobInstance.get());} else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {try {result.getJobDataMap().put("elasticJob", Class.forName(jobClass).newInstance());} catch (ReflectiveOperationException var5) {throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", new Object[]{jobClass});}}return result;}
这个地方首先是创建了一个result实例,这个实例用于封装Job的详细信息的。
前面几个方法的连续调用基本上就是添加参数而已。然后这里是直接build,创建一个JobDetail的实例。
后面调用了一个createElasticJobInstance()方法,这个方法的话是创建一个实例,然后这个实例放在了Gava的Optional里面。
接下来调用这个isPresent方法来判断下是不是实例已经创建了,如果创建了的话,那么就直接将其封装进result的DataMap中去。
直接使用的是elasticjob的key.
public JobDetail build() {JobDetailImpl job = new JobDetailImpl();job.setJobClass(this.jobClass);job.setDescription(this.description);if (this.key == null) {this.key = new JobKey(Key.createUniqueName((String)null), (String)null);}job.setKey(this.key);job.setDurability(this.durability);job.setRequestsRecovery(this.shouldRecover);if (!this.jobDataMap.isEmpty()) {job.setJobDataMap(this.jobDataMap);}return job;}
创建了一个JobDetailImpl这个类的实例,这个实例封装了许多参数信息。而这个JobDetailImpl的类就是Quartz中的一个实现类。
public class JobDetailImpl implements Cloneable, Serializable, JobDetail {private static final long serialVersionUID = -6069784757781506897L;private String name;private String group;private String description;private Class<? extends Job> jobClass;private JobDataMap jobDataMap;private boolean durability;private boolean shouldRecover;private transient JobKey key;
这里面是一个jobDataMap的一个类,这个类实际上是一个实现了Map接口的类一路继承下来的。应该是ElasticJob自己进行了一个封装而已。
然后创建了一个
public JobScheduleController(Scheduler scheduler, JobDetail jobDetail, String triggerIdentity) {this.scheduler = scheduler;this.jobDetail = jobDetail;this.triggerIdentity = triggerIdentity;}
RegisterJob
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, this.regCenter);
JobRegistry
public final class JobRegistry {private static volatile JobRegistry instance;private Map<String, JobScheduleController> schedulerMap = new ConcurrentHashMap();private Map<String, CoordinatorRegistryCenter> regCenterMap = new ConcurrentHashMap();private Map<String, JobInstance> jobInstanceMap = new ConcurrentHashMap();private Map<String, Boolean> jobRunningMap = new ConcurrentHashMap();private Map<String, Integer> currentShardingTotalCountMap = new ConcurrentHashMap();public static JobRegistry getInstance() {if (null == instance) {Class var0 = JobRegistry.class;synchronized(JobRegistry.class) {if (null == instance) {instance = new JobRegistry();}}}return instance;}
单例模式,采用了双重检查的方式创建了对象。
public void registerJob(String jobName, JobScheduleController jobScheduleController, CoordinatorRegistryCenter regCenter) {this.schedulerMap.put(jobName, jobScheduleController);this.regCenterMap.put(jobName, regCenter);regCenter.addCacheData("/" + jobName);}
注册的过程。
schedulerMap和regCenterMap,就是把数据封装到了Map中去。也就是将需要的数据交给第三方暂存一下,用的时候取就是了。这部分应该是本地做了一个缓存。
下面的regCenter是将数据注册到远端的注册中心上去。
RegisterStartUpInfo
public void registerStartUpInfo(boolean enabled) {this.listenerManager.startAllListeners();//启动所有的监听器this.leaderService.electLeader();// 选举leaderthis.serverService.persistOnline(enabled);// 持久化作业信息到zk,先持久化服务器信息,然后再持久化作业信息。this.instanceService.persistOnline();// 持久化作业信息this.shardingService.setReshardingFlag();// 重新分片this.monitorService.listen();// 初始化监听服务if (!this.reconcileService.isRunning()) {this.reconcileService.startAsync();// 调节分布式作业不一致状态服务异步启动。}}
1.启动所有的监听器
public void startAllListeners() {this.electionListenerManager.start();// 选举this.shardingListenerManager.start();// 分片this.failoverListenerManager.start();// 失效转移this.monitorExecutionListenerManager.start();// 执行监控监听this.shutdownListenerManager.start();// shutdown监听this.triggerListenerManager.start();// 触发器监听this.rescheduleListenerManager.start();// 重新调度this.guaranteeListenerManager.start(); // 保证分布式任务全部开始和结束状态this.jobNodeStorage.addConnectionStateListener(this.regCenterConnectionStateListener);// 注册中心与任务节点的状态}
2. 选举leader
public void electLeader() {log.debug("Elect a new leader now.");this.jobNodeStorage.executeInLeader("leader/election/latch", new LeaderService.LeaderElectionExecutionCallback());log.debug("Leader election completed.");}
这个地方主要是用了一个锁。
leaderlatch调用的过程已经进入了Curator的代码了,然后主要的目的是用curator来实现主节点的选举工作。
public void executeInLeader(String latchNode, LeaderExecutionCallback callback) {try {LeaderLatch latch = new LeaderLatch(this.getClient(), this.jobNodePath.getFullPath(latchNode));Throwable var4 = null;try {latch.start();latch.await();callback.execute();} catch (Throwable var14) {var4 = var14;throw var14;} finally {if (latch != null) {if (var4 != null) {try {latch.close();} catch (Throwable var13) {var4.addSuppressed(var13);}} else {latch.close();}}}} catch (Exception var16) {this.handleException(var16);}}
3. 持久化服务器信息
public void persistOnline(boolean enabled) {if (!JobRegistry.getInstance().isShutdown(this.jobName)) {this.jobNodeStorage.fillJobNode(this.serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(this.jobName).getIp()), enabled ? "" : ServerStatus.DISABLED.name());}}
调用了注册中心
public void fillJobNode(String node, Object value) {this.regCenter.persist(this.jobNodePath.getFullPath(node), value.toString());}
调用了ZookeeperRegisterCenter,看创建的节点类型为Persist也就是说服务器的信息是持久节点
public void persist(String key, String value) {try {if (!this.isExisted(key)) {((ACLBackgroundPathAndBytesable)this.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)).forPath(key, value.getBytes(Charsets.UTF_8));} else {this.update(key, value);}} catch (Exception var4) {RegExceptionHandler.handleException(var4);}}
4. 持久化作业信息
作业的数据是临时节点
public void persistOnline() {this.jobNodeStorage.fillEphemeralJobNode(this.instanceNode.getLocalInstanceNode(), "");}
5.重新分片
public void setReshardingFlag() {this.jobNodeStorage.createJobNodeIfNeeded("leader/sharding/necessary");}
在JobNodeStorage中调用createJobNodeIfNeeded方法。
public void createJobNodeIfNeeded(String node) {if (this.isJobRootNodeExisted() && !this.isJobNodeExisted(node)) {this.regCenter.persist(this.jobNodePath.getFullPath(node), "");}}
又是注册中心进行persist。
6. 初始化监听服务
public void listen() {int port = this.configService.load(true).getMonitorPort();if (port >= 0) {try {log.info("Elastic job: Monitor service is running, the port is '{}'", port);this.openSocketForMonitor(port);} catch (IOException var3) {log.error("Elastic job: Monitor service listen failure, error is: ", var3);}}}
然后调用了openSocketForMonitor方法。
private void openSocketForMonitor(int port) throws IOException {this.serverSocket = new ServerSocket(port);(new Thread() {public void run() {while(!MonitorService.this.closed) {try {MonitorService.this.process(MonitorService.this.serverSocket.accept());} catch (IOException var2) {MonitorService.log.error("Elastic job: Monitor service open socket for monitor failure, error is: ", var2);}}}}).start();}
这里直接调用了Socket来处理,然后这个process方法中进行了数据的处理。
这个地方从process来看的话主要还是用于监听dump相关数据用的吧。比较长,只复制一部分。
private void process(Socket socket) throws IOException {BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));Throwable var3 = null;try {BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));Throwable var5 = null;try {Socket autoCloseSocket = socket;Throwable var7 = null;try {String cmdLine = reader.readLine();if (null != cmdLine && "dump".equalsIgnoreCase(cmdLine)) {List<String> result = new ArrayList();this.dumpDirectly("/" + this.jobName, result);this.outputMessage(writer, Joiner.on("\n").join(SensitiveInfoUtils.filterSensitiveIps(result)) + "\n");}
SchedulerJob调度
public void scheduleJob(String cron) {try {if (!this.scheduler.checkExists(this.jobDetail.getKey())) {this.scheduler.scheduleJob(this.jobDetail, this.createTrigger(cron));}this.scheduler.start();} catch (SchedulerException var3) {throw new JobSystemException(var3);}}
启动调度器,然后进行相关的调度就行了,这个地方应该是调用了底层封装的Quartz的任务调度程序。
补充
为了理清Quartz与ElasticJob的关系。
ElasticJob主要是自己封装了一些任务的属性啊,方法啊之类的,基本的形式在于其封装了一些自己本身必须的而且Quartz也必须的部分,那么在使用的时候这两者到底怎么进行的切换呢?难道它是用的继承关系吗?
不是继承关系,当然这里面有继承关系实现的部分类。但是我用的lite版本的elasticjob主要还不是用的继承实现的,
下面来看这个类。该类中封装的就是Quartz类中的一些属性>jobDetail和scheduler。
其实这两者直接应该说是聚合关系。
/*** 作业调度控制器.* * @author zhangliang*/
@RequiredArgsConstructor
public final class JobScheduleController {private final Scheduler scheduler;private final JobDetail jobDetail;private final String triggerIdentity;
实际执行的时候是调用了这个JobScheduleController的schedule方法。这里面有一个createTrigger(cron)
/*** 调度作业.* * @param cron CRON表达式*/public void scheduleJob(final String cron) {try {if (!scheduler.checkExists(jobDetail.getKey())) {scheduler.scheduleJob(jobDetail, createTrigger(cron));}scheduler.start();} catch (final SchedulerException ex) {throw new JobSystemException(ex);}}
涉及到的设计模式
public final class LiteJobConfiguration implements JobRootConfiguration {private final JobTypeConfiguration typeConfig;private final boolean monitorExecution;private final int maxTimeDiffSeconds;private final int monitorPort;private final String jobShardingStrategyClass;private final int reconcileIntervalMinutes;private final boolean disabled;private final boolean overwrite;public String getJobName() {return this.typeConfig.getCoreConfig().getJobName();}public boolean isFailover() {return this.typeConfig.getCoreConfig().isFailover();}public static LiteJobConfiguration.Builder newBuilder(JobTypeConfiguration jobConfig) {return new LiteJobConfiguration.Builder(jobConfig);}public JobTypeConfiguration getTypeConfig() {return this.typeConfig;}public boolean isMonitorExecution() {return this.monitorExecution;}public int getMaxTimeDiffSeconds() {return this.maxTimeDiffSeconds;}public int getMonitorPort() {return this.monitorPort;}public String getJobShardingStrategyClass() {return this.jobShardingStrategyClass;}public int getReconcileIntervalMinutes() {return this.reconcileIntervalMinutes;}public boolean isDisabled() {return this.disabled;}public boolean isOverwrite() {return this.overwrite;}private LiteJobConfiguration(JobTypeConfiguration typeConfig, boolean monitorExecution, int maxTimeDiffSeconds, int monitorPort, String jobShardingStrategyClass, int reconcileIntervalMinutes, boolean disabled, boolean overwrite) {this.typeConfig = typeConfig;this.monitorExecution = monitorExecution;this.maxTimeDiffSeconds = maxTimeDiffSeconds;this.monitorPort = monitorPort;this.jobShardingStrategyClass = jobShardingStrategyClass;this.reconcileIntervalMinutes = reconcileIntervalMinutes;this.disabled = disabled;this.overwrite = overwrite;}public static class Builder {private final JobTypeConfiguration jobConfig;private boolean monitorExecution;private int maxTimeDiffSeconds;private int monitorPort;private String jobShardingStrategyClass;private boolean disabled;private boolean overwrite;private int reconcileIntervalMinutes;public LiteJobConfiguration.Builder monitorExecution(boolean monitorExecution) {this.monitorExecution = monitorExecution;return this;}public LiteJobConfiguration.Builder maxTimeDiffSeconds(int maxTimeDiffSeconds) {this.maxTimeDiffSeconds = maxTimeDiffSeconds;return this;}public LiteJobConfiguration.Builder monitorPort(int monitorPort) {this.monitorPort = monitorPort;return this;}public LiteJobConfiguration.Builder jobShardingStrategyClass(String jobShardingStrategyClass) {if (null != jobShardingStrategyClass) {this.jobShardingStrategyClass = jobShardingStrategyClass;}return this;}public LiteJobConfiguration.Builder reconcileIntervalMinutes(int reconcileIntervalMinutes) {this.reconcileIntervalMinutes = reconcileIntervalMinutes;return this;}public LiteJobConfiguration.Builder disabled(boolean disabled) {this.disabled = disabled;return this;}public LiteJobConfiguration.Builder overwrite(boolean overwrite) {this.overwrite = overwrite;return this;}public final LiteJobConfiguration build() {return new LiteJobConfiguration(this.jobConfig, this.monitorExecution, this.maxTimeDiffSeconds, this.monitorPort, this.jobShardingStrategyClass, this.reconcileIntervalMinutes, this.disabled, this.overwrite);}private Builder(JobTypeConfiguration jobConfig) {this.monitorExecution = true;this.maxTimeDiffSeconds = -1;this.monitorPort = -1;this.jobShardingStrategyClass = "";this.reconcileIntervalMinutes = 10;this.jobConfig = jobConfig;}}
}
ElasticJob源码分析--定时任务执行JobScheduler类分析相关推荐
- 熔断器 Hystrix 源码解析 —— 命令执行(三)之执行超时
2019独角兽企业重金招聘Python工程师标准>>> 摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-third-ti ...
- 分析开源项目源码,我们该如何入手分析?(授人以渔)
点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达 今日推荐:牛人 20000 字的 Spring Cloud 总结,太硬核了~ 1 前言 本文接上篇文章跟大家聊聊我们为什么 ...
- 四足机器人|机器狗|仿生机器人|多足机器人|Adams仿真|Simulink仿真|基于CPG的四足机器人Simulink与Adams虚拟样机|源码可直接执行|绝对干货!需要资料及指导的可以联系我!
四足机器人|机器狗|仿生机器人|多足机器人|基于CPG的四足机器人Simulink与Adams虚拟样机|源码可直接执行|绝对干货!需要资料及指导的可以联系我!QQ:1096474659 基于CPG的四 ...
- Python源码剖析[19] —— 执行引擎之一般表达式(2)
Python源码剖析 --Python执行引擎之一般表达式(2) 本文作者: Robert Chen(search.pythoner@gmail.com ) 3.2 Simple.py 前面我 ...
- 【Flink】 Flink 源码之 SQL 执行流程
1.概述 转载:Flink 源码之 SQL 执行流程 2.前言 本篇为大家带来Flink执行SQL流程的分析.它的执行步骤概括起来包含: 解析.使用Calcite的解析器,解析SQL为语法树(SqlN ...
- Spring源码系列- Spring Beans - 核心类的基本介绍
Spring源码系列- Spring Beans - 核心类的基本介绍 读过上一篇文章的读者应该都能对Spring的体系结构有一个大致的了解,在结尾处,我也说过会从spring-beans包开始分析, ...
- 部署shiro官方源码时,执行maven命令出错
转载自 部署shiro官方源码时,执行maven命令出错 部署shiro官方源码时,执行maven命令会报下面错误: [INFO] --------------------------------- ...
- 12 哈希表相关类——Live555源码阅读(一)基本组件类
12 哈希表相关类--Live555源码阅读(一)基本组件类 这是Live555源码阅读的第一部分,包括了时间类,延时队列类,处理程序描述类,哈希表类这四个大类. 本文由乌合之众 lym瞎编,欢迎转载 ...
- [整站源码]thinkphp家纺针织床上用品类网站模板+前后端源码
模板介绍: 本模板自带eyoucms内核,无需再下载eyou系统,原创设计.手工书写DIV+CSS,完美兼容IE7+.Firefox.Chrome.360浏览器等:主流浏览器:结构容易优化:多终端均可 ...
最新文章
- Spring整合ActiveMQ完成消息队列MQ编程
- php chinese word
- php 怎么配置邮件,PHP发邮件的配置_PHP教程
- LeetCode(136)——只出现一次的数字(JavaScript)
- python科研计价_科研速递 | 花费15年众望所归!NumPy论文终登上Nature!
- SQL SERVER 2000日期处理(转)
- 获取当前电脑全部网络连接名字
- 八皇后--python代码
- 成为0.01%!利用TensorFlow.js和深度学习,轻松阅读古草体文字
- Java类和对象 详解(一)
- linux透明桥,linux透明防墙(网桥模式).doc
- 新款 Mac mini(2018) 性能及接口分析
- jsp+sql智能道路交通信息管理系统的设计与实现(论文+系统+开题报告+答辩PPT+外文翻译)
- LaTeX 相对于 Word 有什么优势?
- Linux的10个彩蛋
- 解决nrm不能使用问题
- WslRegisterDistribution failed with error: 0x80370102 Error: 0x80370102 ???????????????????
- java初级工程师面试需要什么_初级Java工程师面试指导
- 久违了,AV终结者病毒
- 服务器 交换机的维护,华为交换机系统维护与调试命令