public static void main(String[] args) {// 初始化数据源DataSource dataSource = MovieServiceUtils.getDataSource();// 定义日志数据库事件溯源配置JobEventConfiguration jobEventRdbConfig = new JobEventRdbConfiguration(dataSource);new JobScheduler(createRegistryCenter(), createJobConfiguration(), jobEventRdbConfig).init();}

JobScheduler执行过程

    public JobScheduler(CoordinatorRegistryCenter regCenter, LiteJobConfiguration liteJobConfig, JobEventConfiguration jobEventConfig, ElasticJobListener... elasticJobListeners) {this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);}private JobScheduler(CoordinatorRegistryCenter regCenter, LiteJobConfiguration liteJobConfig, JobEventBus jobEventBus, ElasticJobListener... elasticJobListeners) {JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());this.liteJobConfig = liteJobConfig;this.regCenter = regCenter;List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);this.setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);this.schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);this.jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);}

这里创建的是JobScheduler这个对象,然后下面调用了init方法。

    public void init() {LiteJobConfiguration liteJobConfigFromRegCenter = this.schedulerFacade.updateJobConfiguration(this.liteJobConfig);JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());JobScheduleController jobScheduleController = new JobScheduleController(this.createScheduler(), this.createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, this.regCenter);this.schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());}

初始化的几个动作:

--初始化设置分片总量,这些参数都是通过配置中心中获取到的,也就是LiteJobConfiguration这个类。

--创建调度器,createScheduler方法。

--创建任务详情,createJobdDetail方法来创建。

--注册任务到ZK上面。

--注册作业启动信息registerStartUpInfo方法。

--进行作业调度scheduleJob方法。

createScheduler方法

这里创建的就是Quartz里面的scheduler。

然后这个方法里面还添加了任务触发监听器。

    private Scheduler createScheduler() {try {StdSchedulerFactory factory = new StdSchedulerFactory();factory.initialize(this.getBaseQuartzProperties());Scheduler result = factory.getScheduler();result.getListenerManager().addTriggerListener(this.schedulerFacade.newJobTriggerListener());return result;} catch (SchedulerException var3) {throw new JobSystemException(var3);}}

factory这个地方调用了getScheduler这个方法,然后这个方法其实调用了的是Quartz中的方法。

然后上面的initialize方法是调用了一个getBaseQuartzProperties方法,在这个方法里面配置了相关的参数。

private Properties getBaseQuartzProperties() {Properties result = new Properties();result.put("org.quartz.threadPool.class", SimpleThreadPool.class.getName());result.put("org.quartz.threadPool.threadCount", "1");result.put("org.quartz.scheduler.instanceName", this.liteJobConfig.getJobName());result.put("org.quartz.jobStore.misfireThreshold", "1");result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());return result;}

getSchedule方法调用的Quartz中的Scheduler中的方法。

Scheduler在使用之前需要进行实例化,从SchedulerFactory创建它开始,到scheduler调用shutdown结束。

他在实例化之后,可以启动(start)、暂停(standby)、停止(shutdown),然后只有start的scheduler才能被触发(trigger).

public Scheduler getScheduler() throws SchedulerException {if (this.cfg == null) {this.initialize();}SchedulerRepository schedRep = SchedulerRepository.getInstance();Scheduler sched = schedRep.lookup(this.getSchedulerName());if (sched != null) {if (!sched.isShutdown()) {return sched;}schedRep.remove(this.getSchedulerName());}sched = this.instantiate();return sched;}

CreateJobDetail方法

 private JobDetail createJobDetail(String jobClass) {JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(this.liteJobConfig.getJobName()).build();result.getJobDataMap().put("jobFacade", this.jobFacade);Optional<ElasticJob> elasticJobInstance = this.createElasticJobInstance();if (elasticJobInstance.isPresent()) {result.getJobDataMap().put("elasticJob", elasticJobInstance.get());} else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {try {result.getJobDataMap().put("elasticJob", Class.forName(jobClass).newInstance());} catch (ReflectiveOperationException var5) {throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", new Object[]{jobClass});}}return result;}

这个地方首先是创建了一个result实例,这个实例用于封装Job的详细信息的。

前面几个方法的连续调用基本上就是添加参数而已。然后这里是直接build,创建一个JobDetail的实例。

后面调用了一个createElasticJobInstance()方法,这个方法的话是创建一个实例,然后这个实例放在了Gava的Optional里面。

接下来调用这个isPresent方法来判断下是不是实例已经创建了,如果创建了的话,那么就直接将其封装进result的DataMap中去。

直接使用的是elasticjob的key.

public JobDetail build() {JobDetailImpl job = new JobDetailImpl();job.setJobClass(this.jobClass);job.setDescription(this.description);if (this.key == null) {this.key = new JobKey(Key.createUniqueName((String)null), (String)null);}job.setKey(this.key);job.setDurability(this.durability);job.setRequestsRecovery(this.shouldRecover);if (!this.jobDataMap.isEmpty()) {job.setJobDataMap(this.jobDataMap);}return job;}

创建了一个JobDetailImpl这个类的实例,这个实例封装了许多参数信息。而这个JobDetailImpl的类就是Quartz中的一个实现类。

public class JobDetailImpl implements Cloneable, Serializable, JobDetail {private static final long serialVersionUID = -6069784757781506897L;private String name;private String group;private String description;private Class<? extends Job> jobClass;private JobDataMap jobDataMap;private boolean durability;private boolean shouldRecover;private transient JobKey key;

这里面是一个jobDataMap的一个类,这个类实际上是一个实现了Map接口的类一路继承下来的。应该是ElasticJob自己进行了一个封装而已。

然后创建了一个

    public JobScheduleController(Scheduler scheduler, JobDetail jobDetail, String triggerIdentity) {this.scheduler = scheduler;this.jobDetail = jobDetail;this.triggerIdentity = triggerIdentity;}

RegisterJob

JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, this.regCenter);

JobRegistry

public final class JobRegistry {private static volatile JobRegistry instance;private Map<String, JobScheduleController> schedulerMap = new ConcurrentHashMap();private Map<String, CoordinatorRegistryCenter> regCenterMap = new ConcurrentHashMap();private Map<String, JobInstance> jobInstanceMap = new ConcurrentHashMap();private Map<String, Boolean> jobRunningMap = new ConcurrentHashMap();private Map<String, Integer> currentShardingTotalCountMap = new ConcurrentHashMap();public static JobRegistry getInstance() {if (null == instance) {Class var0 = JobRegistry.class;synchronized(JobRegistry.class) {if (null == instance) {instance = new JobRegistry();}}}return instance;}

单例模式,采用了双重检查的方式创建了对象。

public void registerJob(String jobName, JobScheduleController jobScheduleController, CoordinatorRegistryCenter regCenter) {this.schedulerMap.put(jobName, jobScheduleController);this.regCenterMap.put(jobName, regCenter);regCenter.addCacheData("/" + jobName);}

注册的过程。

schedulerMap和regCenterMap,就是把数据封装到了Map中去。也就是将需要的数据交给第三方暂存一下,用的时候取就是了。这部分应该是本地做了一个缓存。

下面的regCenter是将数据注册到远端的注册中心上去。

RegisterStartUpInfo

    public void registerStartUpInfo(boolean enabled) {this.listenerManager.startAllListeners();//启动所有的监听器this.leaderService.electLeader();// 选举leaderthis.serverService.persistOnline(enabled);// 持久化作业信息到zk,先持久化服务器信息,然后再持久化作业信息。this.instanceService.persistOnline();// 持久化作业信息this.shardingService.setReshardingFlag();// 重新分片this.monitorService.listen();// 初始化监听服务if (!this.reconcileService.isRunning()) {this.reconcileService.startAsync();// 调节分布式作业不一致状态服务异步启动。}}

1.启动所有的监听器

 public void startAllListeners() {this.electionListenerManager.start();// 选举this.shardingListenerManager.start();// 分片this.failoverListenerManager.start();// 失效转移this.monitorExecutionListenerManager.start();// 执行监控监听this.shutdownListenerManager.start();// shutdown监听this.triggerListenerManager.start();// 触发器监听this.rescheduleListenerManager.start();// 重新调度this.guaranteeListenerManager.start(); // 保证分布式任务全部开始和结束状态this.jobNodeStorage.addConnectionStateListener(this.regCenterConnectionStateListener);// 注册中心与任务节点的状态}

2. 选举leader

    public void electLeader() {log.debug("Elect a new leader now.");this.jobNodeStorage.executeInLeader("leader/election/latch", new LeaderService.LeaderElectionExecutionCallback());log.debug("Leader election completed.");}

这个地方主要是用了一个锁。

leaderlatch调用的过程已经进入了Curator的代码了,然后主要的目的是用curator来实现主节点的选举工作。

    public void executeInLeader(String latchNode, LeaderExecutionCallback callback) {try {LeaderLatch latch = new LeaderLatch(this.getClient(), this.jobNodePath.getFullPath(latchNode));Throwable var4 = null;try {latch.start();latch.await();callback.execute();} catch (Throwable var14) {var4 = var14;throw var14;} finally {if (latch != null) {if (var4 != null) {try {latch.close();} catch (Throwable var13) {var4.addSuppressed(var13);}} else {latch.close();}}}} catch (Exception var16) {this.handleException(var16);}}

3. 持久化服务器信息

    public void persistOnline(boolean enabled) {if (!JobRegistry.getInstance().isShutdown(this.jobName)) {this.jobNodeStorage.fillJobNode(this.serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(this.jobName).getIp()), enabled ? "" : ServerStatus.DISABLED.name());}}

调用了注册中心

public void fillJobNode(String node, Object value) {this.regCenter.persist(this.jobNodePath.getFullPath(node), value.toString());}

调用了ZookeeperRegisterCenter,看创建的节点类型为Persist也就是说服务器的信息是持久节点

    public void persist(String key, String value) {try {if (!this.isExisted(key)) {((ACLBackgroundPathAndBytesable)this.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)).forPath(key, value.getBytes(Charsets.UTF_8));} else {this.update(key, value);}} catch (Exception var4) {RegExceptionHandler.handleException(var4);}}

4. 持久化作业信息

作业的数据是临时节点

public void persistOnline() {this.jobNodeStorage.fillEphemeralJobNode(this.instanceNode.getLocalInstanceNode(), "");}

5.重新分片

    public void setReshardingFlag() {this.jobNodeStorage.createJobNodeIfNeeded("leader/sharding/necessary");}

在JobNodeStorage中调用createJobNodeIfNeeded方法。

  public void createJobNodeIfNeeded(String node) {if (this.isJobRootNodeExisted() && !this.isJobNodeExisted(node)) {this.regCenter.persist(this.jobNodePath.getFullPath(node), "");}}

又是注册中心进行persist。

6. 初始化监听服务

public void listen() {int port = this.configService.load(true).getMonitorPort();if (port >= 0) {try {log.info("Elastic job: Monitor service is running, the port is '{}'", port);this.openSocketForMonitor(port);} catch (IOException var3) {log.error("Elastic job: Monitor service listen failure, error is: ", var3);}}}

然后调用了openSocketForMonitor方法。

private void openSocketForMonitor(int port) throws IOException {this.serverSocket = new ServerSocket(port);(new Thread() {public void run() {while(!MonitorService.this.closed) {try {MonitorService.this.process(MonitorService.this.serverSocket.accept());} catch (IOException var2) {MonitorService.log.error("Elastic job: Monitor service open socket for monitor failure, error is: ", var2);}}}}).start();}

这里直接调用了Socket来处理,然后这个process方法中进行了数据的处理。

这个地方从process来看的话主要还是用于监听dump相关数据用的吧。比较长,只复制一部分。

 private void process(Socket socket) throws IOException {BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));Throwable var3 = null;try {BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));Throwable var5 = null;try {Socket autoCloseSocket = socket;Throwable var7 = null;try {String cmdLine = reader.readLine();if (null != cmdLine && "dump".equalsIgnoreCase(cmdLine)) {List<String> result = new ArrayList();this.dumpDirectly("/" + this.jobName, result);this.outputMessage(writer, Joiner.on("\n").join(SensitiveInfoUtils.filterSensitiveIps(result)) + "\n");}

SchedulerJob调度

    public void scheduleJob(String cron) {try {if (!this.scheduler.checkExists(this.jobDetail.getKey())) {this.scheduler.scheduleJob(this.jobDetail, this.createTrigger(cron));}this.scheduler.start();} catch (SchedulerException var3) {throw new JobSystemException(var3);}}

启动调度器,然后进行相关的调度就行了,这个地方应该是调用了底层封装的Quartz的任务调度程序。

补充

为了理清Quartz与ElasticJob的关系。

ElasticJob主要是自己封装了一些任务的属性啊,方法啊之类的,基本的形式在于其封装了一些自己本身必须的而且Quartz也必须的部分,那么在使用的时候这两者到底怎么进行的切换呢?难道它是用的继承关系吗?

不是继承关系,当然这里面有继承关系实现的部分类。但是我用的lite版本的elasticjob主要还不是用的继承实现的,

下面来看这个类。该类中封装的就是Quartz类中的一些属性>jobDetail和scheduler。

其实这两者直接应该说是聚合关系。

/*** 作业调度控制器.* * @author zhangliang*/
@RequiredArgsConstructor
public final class JobScheduleController {private final Scheduler scheduler;private final JobDetail jobDetail;private final String triggerIdentity;

实际执行的时候是调用了这个JobScheduleController的schedule方法。这里面有一个createTrigger(cron)

   /*** 调度作业.* * @param cron CRON表达式*/public void scheduleJob(final String cron) {try {if (!scheduler.checkExists(jobDetail.getKey())) {scheduler.scheduleJob(jobDetail, createTrigger(cron));}scheduler.start();} catch (final SchedulerException ex) {throw new JobSystemException(ex);}}

涉及到的设计模式

public final class LiteJobConfiguration implements JobRootConfiguration {private final JobTypeConfiguration typeConfig;private final boolean monitorExecution;private final int maxTimeDiffSeconds;private final int monitorPort;private final String jobShardingStrategyClass;private final int reconcileIntervalMinutes;private final boolean disabled;private final boolean overwrite;public String getJobName() {return this.typeConfig.getCoreConfig().getJobName();}public boolean isFailover() {return this.typeConfig.getCoreConfig().isFailover();}public static LiteJobConfiguration.Builder newBuilder(JobTypeConfiguration jobConfig) {return new LiteJobConfiguration.Builder(jobConfig);}public JobTypeConfiguration getTypeConfig() {return this.typeConfig;}public boolean isMonitorExecution() {return this.monitorExecution;}public int getMaxTimeDiffSeconds() {return this.maxTimeDiffSeconds;}public int getMonitorPort() {return this.monitorPort;}public String getJobShardingStrategyClass() {return this.jobShardingStrategyClass;}public int getReconcileIntervalMinutes() {return this.reconcileIntervalMinutes;}public boolean isDisabled() {return this.disabled;}public boolean isOverwrite() {return this.overwrite;}private LiteJobConfiguration(JobTypeConfiguration typeConfig, boolean monitorExecution, int maxTimeDiffSeconds, int monitorPort, String jobShardingStrategyClass, int reconcileIntervalMinutes, boolean disabled, boolean overwrite) {this.typeConfig = typeConfig;this.monitorExecution = monitorExecution;this.maxTimeDiffSeconds = maxTimeDiffSeconds;this.monitorPort = monitorPort;this.jobShardingStrategyClass = jobShardingStrategyClass;this.reconcileIntervalMinutes = reconcileIntervalMinutes;this.disabled = disabled;this.overwrite = overwrite;}public static class Builder {private final JobTypeConfiguration jobConfig;private boolean monitorExecution;private int maxTimeDiffSeconds;private int monitorPort;private String jobShardingStrategyClass;private boolean disabled;private boolean overwrite;private int reconcileIntervalMinutes;public LiteJobConfiguration.Builder monitorExecution(boolean monitorExecution) {this.monitorExecution = monitorExecution;return this;}public LiteJobConfiguration.Builder maxTimeDiffSeconds(int maxTimeDiffSeconds) {this.maxTimeDiffSeconds = maxTimeDiffSeconds;return this;}public LiteJobConfiguration.Builder monitorPort(int monitorPort) {this.monitorPort = monitorPort;return this;}public LiteJobConfiguration.Builder jobShardingStrategyClass(String jobShardingStrategyClass) {if (null != jobShardingStrategyClass) {this.jobShardingStrategyClass = jobShardingStrategyClass;}return this;}public LiteJobConfiguration.Builder reconcileIntervalMinutes(int reconcileIntervalMinutes) {this.reconcileIntervalMinutes = reconcileIntervalMinutes;return this;}public LiteJobConfiguration.Builder disabled(boolean disabled) {this.disabled = disabled;return this;}public LiteJobConfiguration.Builder overwrite(boolean overwrite) {this.overwrite = overwrite;return this;}public final LiteJobConfiguration build() {return new LiteJobConfiguration(this.jobConfig, this.monitorExecution, this.maxTimeDiffSeconds, this.monitorPort, this.jobShardingStrategyClass, this.reconcileIntervalMinutes, this.disabled, this.overwrite);}private Builder(JobTypeConfiguration jobConfig) {this.monitorExecution = true;this.maxTimeDiffSeconds = -1;this.monitorPort = -1;this.jobShardingStrategyClass = "";this.reconcileIntervalMinutes = 10;this.jobConfig = jobConfig;}}
}

ElasticJob源码分析--定时任务执行JobScheduler类分析相关推荐

  1. 熔断器 Hystrix 源码解析 —— 命令执行(三)之执行超时

    2019独角兽企业重金招聘Python工程师标准>>> 摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-third-ti ...

  2. 分析开源项目源码,我们该如何入手分析?(授人以渔)

    点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达 今日推荐:牛人 20000 字的 Spring Cloud 总结,太硬核了~ 1 前言 本文接上篇文章跟大家聊聊我们为什么 ...

  3. 四足机器人|机器狗|仿生机器人|多足机器人|Adams仿真|Simulink仿真|基于CPG的四足机器人Simulink与Adams虚拟样机|源码可直接执行|绝对干货!需要资料及指导的可以联系我!

    四足机器人|机器狗|仿生机器人|多足机器人|基于CPG的四足机器人Simulink与Adams虚拟样机|源码可直接执行|绝对干货!需要资料及指导的可以联系我!QQ:1096474659 基于CPG的四 ...

  4. Python源码剖析[19] —— 执行引擎之一般表达式(2)

    Python源码剖析 --Python执行引擎之一般表达式(2) 本文作者: Robert Chen(search.pythoner@gmail.com ) 3.2     Simple.py 前面我 ...

  5. 【Flink】 Flink 源码之 SQL 执行流程

    1.概述 转载:Flink 源码之 SQL 执行流程 2.前言 本篇为大家带来Flink执行SQL流程的分析.它的执行步骤概括起来包含: 解析.使用Calcite的解析器,解析SQL为语法树(SqlN ...

  6. Spring源码系列- Spring Beans - 核心类的基本介绍

    Spring源码系列- Spring Beans - 核心类的基本介绍 读过上一篇文章的读者应该都能对Spring的体系结构有一个大致的了解,在结尾处,我也说过会从spring-beans包开始分析, ...

  7. 部署shiro官方源码时,执行maven命令出错

    转载自  部署shiro官方源码时,执行maven命令出错 部署shiro官方源码时,执行maven命令会报下面错误: [INFO] --------------------------------- ...

  8. 12 哈希表相关类——Live555源码阅读(一)基本组件类

    12 哈希表相关类--Live555源码阅读(一)基本组件类 这是Live555源码阅读的第一部分,包括了时间类,延时队列类,处理程序描述类,哈希表类这四个大类. 本文由乌合之众 lym瞎编,欢迎转载 ...

  9. [整站源码]thinkphp家纺针织床上用品类网站模板+前后端源码

    模板介绍: 本模板自带eyoucms内核,无需再下载eyou系统,原创设计.手工书写DIV+CSS,完美兼容IE7+.Firefox.Chrome.360浏览器等:主流浏览器:结构容易优化:多终端均可 ...

最新文章

  1. Spring整合ActiveMQ完成消息队列MQ编程
  2. php chinese word
  3. php 怎么配置邮件,PHP发邮件的配置_PHP教程
  4. LeetCode(136)——只出现一次的数字(JavaScript)
  5. python科研计价_科研速递 | 花费15年众望所归!NumPy论文终登上Nature!
  6. SQL SERVER 2000日期处理(转)
  7. 获取当前电脑全部网络连接名字
  8. 八皇后--python代码
  9. 成为0.01%!利用TensorFlow.js和深度学习,轻松阅读古草体文字
  10. Java类和对象 详解(一)
  11. linux透明桥,linux透明防墙(网桥模式).doc
  12. 新款 Mac mini(2018) 性能及接口分析
  13. jsp+sql智能道路交通信息管理系统的设计与实现(论文+系统+开题报告+答辩PPT+外文翻译)
  14. LaTeX 相对于 Word 有什么优势?
  15. Linux的10个彩蛋
  16. 解决nrm不能使用问题
  17. WslRegisterDistribution failed with error: 0x80370102 Error: 0x80370102 ???????????????????
  18. java初级工程师面试需要什么_初级Java工程师面试指导
  19. 久违了,AV终结者病毒
  20. 服务器 交换机的维护,华为交换机系统维护与调试命令

热门文章

  1. csp2019真题全解析
  2. 使用计算机形成假象,实验心理学总结
  3. BokehMe: When Neural Rendering Meets Classical Rendering
  4. python-识别验证码
  5. Grafana 9 正式发布,更易用,更酷炫了!
  6. linux环境聊天程序毕业设计,linux环境下的密文聊天系统(论文+程序)
  7. 2021年全球陀螺测斜仪收入大约6百万美元,预计2028年达到7百万美元
  8. BugKu msic方向 赛博朋克
  9. js中replaceAll失效
  10. 【黄啊码】tp5+微信小程序商城开发教程