2019独角兽企业重金招聘Python工程师标准>>>

Elastic-job-lite 2.1.3 代码详解

框架

Elastic-Job是一个分布式调度解决方案,由两个独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。

Elastic-Job-Lite: 轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。

Elastic-Job-Cloud:  Mesos + Docker 的解决方案,额外提供资源治理、应用分发以及进程隔离等服务。

基本原理

Elastic-job并无作业调度中心节点,而是基于部署作业Quartz的程序在到达相应时间点时各自触发调度。zookeeper用于作业注册、信息存储、任务执行过程的状态标记等, 主作业实例在选举过程中产生后用于作业分片的计算。

zookeeper上创建节点树,保存任务配置信息;各监听TreeCacheListener托管于"/${jobName}"的TreeCache对象的ListenerContainer中。当zk的节点树变化(add、remove、update...)TreeCache&TreeNode<implements org.apache.zookeeper.Watcher、

org.apache.curator.framework.api.BackgroundCallback>处理watchedEvent的响应,TreeCache调用publishEvent方法异步唤醒所有TreeCacheListener。

同时将当前TreeNode再次绑定为TreeCache的path监听:
client.checkExists().usingWatcher(this).inBackground(this).forPath(path);client.getData().usingWatcher(this).inBackground(this)).forPath(this.path);client.getChildren().usingWatcher(this).inBackground(this)).forPath(this.path);

印证:

zookeeper在create、delete、setData、exists、getData、getACL、getChildren时都能定义AsyncCallback;但只有在 ZooKeeper构造、exists、getData、getChildren 能注册Watcher.

package com.dangdang.ddframe.job.lite.internal.listener;import com.google.common.base.Charsets;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;/*** 作业注册中心的监听器.* * @author zhangliang*/
public abstract class AbstractJobListener implements TreeCacheListener {@Overridepublic final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {ChildData childData = event.getData();if (null == childData) {return;}String path = childData.getPath();if (path.isEmpty()) {return;}dataChanged(path, event.getType(), null == childData.getData() ? "" : new String(childData.getData(), Charsets.UTF_8));}protected abstract void dataChanged(final String path, final Type eventType, final String data);
}
TreeCache:private void publishEvent(final TreeCacheEvent event) {if (treeState.get() != TreeState.CLOSED) {LOG.debug("publishEvent: {}", event);executorService.submit(() -> {try {callListeners(event);} catch (Exception e) {ThreadUtils.checkInterrupted(e);handleException(e);}});}}private void callListeners(final TreeCacheEvent event){listeners.forEach(new Function<TreeCacheListener, Void>(){@Overridepublic Void apply(TreeCacheListener listener){try{listener.childEvent(client, event);}catch ( Exception e ){ThreadUtils.checkInterrupted(e);handleException(e);}return null;}});}

如何使用

maven

elastic-job-lite使用在 zookeeper-3.4.6.jar基础上进行封装curator框架(2.10.0) 来操作zookeeper节点。

构建项目时,使用curator的版本都应该一致:

<dependencies>     <dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-core</artifactId><version>2.1.3</version></dependency><dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>2.1.3</version></dependency>
</dependencies>
      <quartz.version>2.2.1</quartz.version><curator.version>2.10.0</curator.version><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-test</artifactId></dependency><dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId></dependency>

spring接入

elastic-job-lite-spring包下spring.handlers、spring.schemas文件声明xml中命名空间和对应的标签。RegNamespaceHandler 、JobNamespaceHandler: extends NamespaceHandlerSupport
  job-ref配置优先级大于class属性配置,在JobScheduler的createJobDetail方法中会判定LitJob类属性elasticJob实例的来源。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"xmlns:job="http://www.dangdang.com/schema/ddframe/job"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.dangdang.com/schema/ddframe/reghttp://www.dangdang.com/schema/ddframe/reg/reg.xsdhttp://www.dangdang.com/schema/ddframe/jobhttp://www.dangdang.com/schema/ddframe/job/job.xsd"><!--配置作业注册中心 --><reg:zookeeper id="regCenter" server-lists=" yourhost:2181"namespace="dd-job" base-sleep-time-milliseconds="1000"max-sleep-time-milliseconds="3000" max-retries="3" /><!-- 配置简单作业 --><job:simple id="simpleElasticJob" class="xxx.MySimpleElasticJob"registry-center-ref="regCenter" cron="0/10 * * * * ?"sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /><bean id="yourRefJobBeanId" class="xxx.MySimpleRefElasticJob"><property name="fooService" ref="xxx.FooService" /></bean><!-- 配置关联Bean作业 --><job:simple id="simpleRefElasticJob" job-ref="yourRefJobBeanId"registry-center-ref="regCenter" cron="0/10 * * * * ?"sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /><!-- 配置数据流作业 --><job:dataflow id="throughputDataflow" class="xxx.MyThroughputDataflowElasticJob"registry-center-ref="regCenter" cron="0/10 * * * * ?"sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /><!-- 配置脚本作业 --><job:script id="scriptElasticJob" registry-center-ref="regCenter"cron="0/10 * * * * ?" sharding-total-count="3"sharding-item-parameters="0=A,1=B,2=C" script-command-line="/your/file/path/demo.sh" /><!-- 配置带监听的简单作业 --><job:simple id="listenerElasticJob" class="xxx.MySimpleListenerElasticJob"registry-center-ref="regCenter" cron="0/10 * * * * ?"sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C"><job:listener class="xx.MySimpleJobListener" /><job:distributed-listener class="xx.MyOnceSimpleJobListener"started-timeout-milliseconds="1000" completed-timeout-milliseconds="2000" /></job:simple><!-- 配置带作业数据库事件追踪的简单作业 --><job:simple id="eventTraceElasticJob" class="xxx.MySimpleListenerElasticJob"registry-center-ref="regCenter" cron="0/10 * * * * ?"sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C"event-trace-rdb-data-source="yourDataSource"></job:simple></beans>
public  class SmsNoticeTask implements SimpleJob {@Overridepublic void execute(ShardingContext shardingContext) {logger.info("任务执行分片信息为:{}", shardingContext);//TODO do something}

node介绍

作业一旦启动成功后不能修改JobName,如果修改名称则视为新的作业实例。

${namespaces}/${JobName} 下持久化config、leaderserversinstances 、sharding主节点。

SchedulerFacade:
/*** 注册作业启动信息.* * @param enabled 作业是否启用*/
public void registerStartUpInfo(final boolean enabled) {listenerManager.startAllListeners();leaderService.electLeader();serverService.persistOnline(enabled);instanceService.persistOnline();shardingService.setReshardingFlag();monitorService.listen();if (!reconcileService.isRunning()) {reconcileService.startAsync();}
}

config

ConfigurationService
持久化节点,保存任务的参数配置。若zk上已经持久化配置,且没有设置overwrite为true,以zk为准。

JobScheduler.init() 
        -> schedulerFacade.updateJobConfiguration(liteJobConfig)
                ->  configService.persist(liteJobConfig)

/*** 持久化分布式作业配置信息.* * @param liteJobConfig*            作业配置*/
public void persist(final LiteJobConfiguration liteJobConfig) {checkConflictJob(liteJobConfig);// 校验JobClass; 校验zk上若存在config节点但数据为null,删除Job整个节点if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || liteJobConfig.isOverwrite()) {jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT,LiteJobConfigurationGsonFactory.toJson(liteJobConfig));}
}

leader

LeaderService

持久化 electionsharding、failover 子节点。

election

/*** 主节点路径.* */
public final class LeaderNode {/*** 主节点根路径.*/public static final String ROOT = "leader";static final String ELECTION_ROOT = ROOT + "/election";static final String INSTANCE = ELECTION_ROOT + "/instance";static final String LATCH = ELECTION_ROOT + "/latch";private final JobNodePath jobNodePath;..........
}

当作业初始化注册或原主作业实例离线时,触发选主过程。 
 LeaderElectionJobListener、 LeaderAbdicationJobListener

  • latch: 持久化子节点。在选主过程中,因多节点分布式服务创建临时有序子节点来锁限制。

    通过LeaderLatch来分布式并发锁定选主过程。创建的有序临时节点当序号最小时获取到执行权。LeaderLatch.start() : 重置leadership,checkLeadership方法中判定当前自己节点序号是否最小,若是,设置Leadership = true。否则再次注册Watcher和BackgroundCallback来判定;
    LeaderLatch.wait() : 判定 leadership == false则Object.wait()。

    LeaderService.electLeader() -> jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback())

    LeaderService:/*** 选举主节点.*/
    public void electLeader() {log.debug("Elect a new leader now.");jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());log.debug("Leader election completed.");
    }@RequiredArgsConstructor
    class LeaderElectionExecutionCallback implements LeaderExecutionCallback {@Overridepublic void execute() {if (!hasLeader()) {jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE,JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());}}
    }-----------------------------------------------------------------------------------------JobNodeStorage:/*** 在主节点执行操作.* * @param latchNode 分布式锁使用的作业节点名称* @param callback 执行操作的回调*/
    public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {latch.start();latch.await();callback.execute();//CHECKSTYLE:OFF} catch (final Exception ex) {//CHECKSTYLE:ONhandleException(ex);}
    }
    LeaderLatch:
    void reset() throws Exception
    {setLeadership(false);setNode(null);BackgroundCallback callback = new BackgroundCallback(){@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception{if ( debugResetWaitLatch != null ){debugResetWaitLatch.await();debugResetWaitLatch = null;}if ( event.getResultCode() == KeeperException.Code.OK.intValue() ){setNode(event.getName());if ( state.get() == State.CLOSED ){setNode(null);}else{getChildren();}}else{log.error("getChildren() failed. rc = " + event.getResultCode());}}};client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
    }public void await() throws InterruptedException, EOFException {synchronized(this) {while(this.state.get() == LeaderLatch.State.STARTED && !this.hasLeadership.get()) {this.wait();}}if(this.state.get() != LeaderLatch.State.STARTED) {throw new EOFException();}
    }
    
  • instance: 临时子节点。当选主完成后生成节点并保存主服务jobInstanceId<作业运行实例Id>。
  • sharding

    ShardingService

    持久化necessary节点,当分片完成后将被删除。
    用于作业启动、分片总数变更、作业服务器变动、或作业运行实例变动情况下设置分片标记。

    ShardingListenerManager:class ShardingTotalCountChangedJobListener extends AbstractJobListener {@Overrideprotected void dataChanged(final String path, final Type eventType, final String data) {if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getShardingTotalCount();if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {shardingService.setReshardingFlag();JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);}}}
    }class ListenServersChangedJobListener extends AbstractJobListener {@Overrideprotected void dataChanged(final String path, final Type eventType, final String data) {if (!JobRegistry.getInstance().isShutdown(jobName)&& (isInstanceChange(eventType, path) || isServerChange(path))) {shardingService.setReshardingFlag();}}

failover

  • FailoverService
  • FailoverListenerManager
    • FailoverSettingsChangedJobListener :
       设置failover=false时,移除所有分片sharding/${itemIndex}/failover节点。
    • JobCrashedJobListener:
        判定:failover == ture
                  && 事件源为instances节点下作业实例子节点删除<作业实例有离线>
                  &&  离线的jobInstanceId不为本机  
      则获取失效jobInstanceId处理的含sharding/${itemIndex}/failover标记的分片;若为空,则获取该失效jobInstanceId的所有正常分片 。将这些分片设置标记leader/failover/items/${itemIndex}。
      以leaderLatch的方式执行FailoverLeaderExecutionCallback (集群每次只能当前一个处理完成才能处理下一个,多个服务实例的处理线程会分布式锁竞争等待)
FailoverListenerManagerprivate boolean isFailoverEnabled() {LiteJobConfiguration jobConfig = configService.load(true);return null != jobConfig && jobConfig.isFailover();
}class JobCrashedJobListener extends AbstractJobListener {@Overrideprotected void dataChanged(final String path, final Type eventType, final String data) {if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {return;}List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);if (!failoverItems.isEmpty()) {for (int each : failoverItems) {failoverService.setCrashedFailoverFlag(each);failoverService.failoverIfNecessary();}} else {for (int each : shardingService.getShardingItems(jobInstanceId)) {failoverService.setCrashedFailoverFlag(each);failoverService.failoverIfNecessary();}}}}
}

servers

ServerService

当作业服务注册时,生成  服务器IP 持久化节点。所以按IP进行管理作业服务器。

instances

InstanceService

当作业服务注册时,生成临时作业运行实例Id 临时节点。该节点名称规则:

eg: 192.168.42.1@-@6260

package com.dangdang.ddframe.job.lite.api.strategy;import com.dangdang.ddframe.job.util.env.IpUtils;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;import java.lang.management.ManagementFactory;/*** 作业运行实例.* * @author zhangliang*/
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode(of = "jobInstanceId")
public final class JobInstance {private static final String DELIMITER = "@-@";/*** 作业实例主键.*/private final String jobInstanceId;public JobInstance() {jobInstanceId = IpUtils.getIp() + DELIMITER + ManagementFactory.getRuntimeMXBean().getName().split("@")[0];}/*** 获取作业服务器IP地址.* * @return 作业服务器IP地址*/public String getIp() {return jobInstanceId.substring(0, jobInstanceId.indexOf(DELIMITER));}
}

sharding

  • ShardingService
  • ShardingListenerManager
    • ShardingTotalCountChangedJobListener
    • ListenServersChangedJobListener

服务初次注册、或服务实例发生变更、分片总数变更时促发分片。分片将在下次作业触发时执行,只有主节点可以分片,分片时的从节点都将阻塞。

  1. 判定有可用的作业实例服务,且有分片标记。
  2. 判定当前是否为有主服务,没有则触发选举,并等待选主完成。
  3. 判定自己是否主服务,如果不是,则等待直到主服务分片完成。
  4. 判定是否还有在执行过程中的分片,若有,则等待完成。
  5. 创建 leader/sharding/processing  临时节点。
  6. 清空原有的分片节点,并按现有的节点创建各sharding/${itemIndex} 持久化节点。
  7. 根据指定的策略进行分片,在一个事务中创建sharding/${itemIndex}/instance 节点 并填充JobInstanceId;并删除 leader/sharding/necessary  和 leader/sharding/processing  节点。
ShardingService:/*** 如果需要分片且当前节点为主节点, 则作业分片.* * <p>* 如果当前无可用节点则不分片.* </p>*/
public void shardingIfNecessary() {List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();if (!isNeedSharding() || availableJobInstances.isEmpty()) {return;}if (!leaderService.isLeaderUntilBlock()) {blockUntilShardingCompleted();return;}waitingOtherJobCompleted();LiteJobConfiguration liteJobConfig = configService.load(false);int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();log.debug("Job '{}' sharding begin.", jobName);jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");resetShardingInfo(shardingTotalCount);JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));log.debug("Job '{}' sharding complete.", jobName);
}

转载于:https://my.oschina.net/u/3434392/blog/881731

Elastic-job 2.1.3 概述相关推荐

  1. 大牛总结 Elastic Search 概述(一)

    中文手册:点击打开链接 版权声明:本文为博主原创文章,未经博主允许不得转载.转载请务必加上原作者:铭毅天下,原文地址:blog.csdn.net/laoyang360 https://blog.csd ...

  2. 弹性光网络(Elastic Optical Networks)概述

    弹性光网络(Elastic Optical Networks) 1.光网络VS弹性光网络 光网络(Optical Network): 一般指使用光纤作为主要传输介质的广域网.城域网或者新建的大范围的局 ...

  3. 梯度下降优化算法概述

    本文原文是 An overview of gradient descent optimization algorithms,同时作者也在 arXiv 上发了一篇同样内容的 论文. 本文结合了两者来翻译 ...

  4. IaaS、PaaS 和 SaaS:云服务模型概述

    IaaS.PaaS 和 SaaS:云服务模型概述 为您的组织选择合适的云服务模型,可以帮助您充分利用预算和 IT 资源. 基础设施即服务 (IaaS).平台即服务 (PaaS) 以及软件即服务 (Sa ...

  5. 《实用机器学习》——1.4 本书概述

    本节书摘来异步社区<实用机器学习>一书中的第1章,第1.4节,作者:孙亮,黄倩,更多章节内容可以访问云栖社区"异步社区"公众号查看. 1.4 本书概述 本书主要从解决实 ...

  6. es head插件安装_ES笔记概述与安装

    来源:狂神说java https://www.bilibili.com/medialist/play/ml711044860 概述 ElasticSearch高可扩展性的分布式的实时的全文搜索和分析引 ...

  7. Elastic 使用Heartbeat监测服务运行状态

    概述 Heartbeat是一个轻量级守护进程,通过在远程服务器上安装,以定期检查服务的状态并确定它们是否可用. 在配置Heartbeat时,指定监视器来标识要检查的主机名,每个监视器都根据指定的计划运 ...

  8. 《Abaqus GUI程序开发指南(Python语言)》——第一章 概述1.1 简单插件实例——创建带孔板有限元模型...

    本节书摘来自异步社区<Abaqus GUI程序开发指南(Python语言)>一书中的第1章,第1.1节,作者: 贾利勇 , 富琛阳子 , 贺高 , 周正光 更多章节内容可以访问云栖社区&q ...

  9. elasticsearch index doc过程概述

    文章目录 概述 1.es中的基础概念 2.es中的索引过程 2.1 一次index请求的大体流程 2.2 为什么是near real time 2.3 为什么要有translog 2.3.1 tran ...

  10. 浅谈Time Elastic Band

    作者:WGQ 前言 在自主移动机器人路径规划的学习与开发过程中,我接触到Time Elastic Band算法,并将该算法应用于实际机器人,用于机器人的局部路径规划.在此期间,我也阅读了部分论文.官方 ...

最新文章

  1. python怎么做自动化测试仪器经销商_Python自动化测试踩坑记录(企业中如何实施自动化测试)...
  2. 前端学习(3052):vue+element今日头条管理-自定义表格列列表
  3. 洛谷 P1067 多项式输出
  4. canvas 绘制直线 并选中_javascript自学记录:canvas绘图
  5. serv u服务器显示图片,图解经典FTP服务器工具 SERV-U最安全的设置【防止被入侵】...
  6. wxpython grid设置字体颜色_Ext grid改变行背景颜色 和改变行字体颜色
  7. [RS] 地理空间数据云 使用ftp批量下载影像(以批量下载Landsat8数据为例)
  8. Kafka之sync、async以及oneway
  9. IDEA使用教程(二) 快捷键
  10. 静下心来学习MVC之基本概念
  11. 原来微信“对方正在输入”在这种情况下才显示
  12. 个人如何搭建云手机出租?ARM服务器搭建机房教程
  13. 全国计算机等级考试二级C知识点
  14. python儿童编程教育_2019儿童编程语言大全
  15. 计算机操作熟练说明,Mac新手必看教程,教你快速熟练mac电脑操作
  16. 非对称密钥PKCS#1和PKCS#8格式互相转换(Java)
  17. 彻底解决WebView_flutter自带的边框无法去除问题
  18. 因果卷积,膨胀卷积,混合膨胀卷积
  19. 十条最佳上云法则,助你安全无痛上云!
  20. 花生哥的伤感爱情日志:伤害我,你会心痛吗?

热门文章

  1. linux系统恢复上一次,如何将您的Ubuntu Linux系统恢复到其上一个状态
  2. php项目导入其他包,将一个外部项目导入Thinkphp环境中
  3. oracle查看视图数据,查看oracle 10g 视图-数据库专栏,ORACLE
  4. java 泛型 t extends_Java泛型的定义以及对于? extends T和? super T
  5. r语言岭回归参数选择_数据分析中常见的七种回归分析以及R语言实现(三)---岭回归...
  6. quartz的job类无法保留本身通过spring注入的属性问题
  7. Linux常见的发行版SUSE、Ubuntu、RedHat、CentOS、Fedora的联系和区别
  8. 计算机英语讲课笔记06
  9. 【BZOJ4353】Play with tree,树链剖分线段树
  10. 【BZOJ1013】球形空间产生器,第一次的高斯消元