分布式作业调度框架——Elastic-Job

1.概述

Elastic-Job是当当开源的分布式弹性作业框架。Elastic-Job分为lite和cloud两个相对独立的版本,lite版为轻量级去中心化的版本,cloud版则是基于Mesos + Docker方案提供了资源治理、应用分发和服务隔离的功能。我们项目使用的是lite版的Elastic-Job,因此本文主要围绕lite版本进行介绍。

elastic-job-lite主要的设计理念是无中心化的分布式定时调度框架,思路来源于Quartz的基于数据库的高可用方案。但数据库没有分布式协调功能,所以在高可用方案的基础上增加了弹性扩容和数据分片的思路,以便于更大限度的利用分布式服务器的资源。

1.1 分片

任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。

例如:有一个遍历数据库某张表的作业,现有2台服务器。
为了快速的执行作业,那么每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。
作业遍历数据的逻辑可以为:服务器A遍历ID以奇数结尾的数据;服务器B遍历ID以偶数结尾的数据。
如果分成10片,则服务器A被分配到分片项0,1,2,3,4;服务器B被分配到分片项5,6,7,8,9。
作业遍历数据的逻辑可以为:服务器A遍历ID以0-4结尾的数据;服务器B遍历ID以5-9结尾的数据。

1.2 分片项与业务处理解耦

Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。以上面例子分成10片为例,框架只负责决定服务器分配到哪些分片项,由作业分配策略决定,但是每个分片处理哪一部分数据,比如第一个分片处理id以0-4结尾的数据,是由开发者去决定和处理的。

1.3 去中心化

elastic-job-lite是去中心化设计,作业调度中心节点,各个作业节点是自治的,作业框架的程序在到达相应时间点时各自触发调度,缺点是可能会存在各个作业服务器的时间不一致的问题。

2.主要功能

  • 定时任务: 基于成熟的定时任务作业框架Quartz cron表达式执行定时任务。
  • 作业注册中心: 基于Zookeeper和其客户端Curator实现的全局作业注册控制中心。用于注册,控制和协调分布式作业执行。
  • 作业分片: 将一个任务分片成为多个小任务项在多服务器上同时执行。
  • 弹性扩容缩容: 运行中的作业服务器崩溃,或新增加n台作业服务器,作业框架将在下次作业执行前重新分片,不影响当前作业执行。
  • 支持多种作业执行模式: 支持OneOff,Perpetual和SequencePerpetual三种作业模式。
  • 失效转移: 运行中的作业服务器崩溃不会导致重新分片,只会在下次作业启动时分片。启用失效转移功能可以在本次作业执行过程中,监测其他作业服务器空闲,抓取未完成的孤儿分片项执行。
  • 运行时状态收集: 监控作业运行时状态,统计最近一段时间处理的数据成功和失败数量,记录作业上次运行开始时间,结束时间和下次运行时间。
  • **作业停止,恢复和禁用:**用于操作作业启停,并可以禁止某作业运行(上线时常用)。
  • **被错过执行的作业重触发:**自动记录错过执行的作业,并在上次作业完成后自动触发。可参考Quartz的misfire。
  • **多线程快速处理数据:**使用多线程处理抓取到的数据,提升吞吐量。
  • **幂等性:**重复作业任务项判定,不重复执行已运行的作业任务项。由于开启幂等性需要监听作业运行状态,对瞬时反复运行的作业对性能有较大影响。
  • **容错处理:**作业服务器与Zookeeper服务器通信失败则立即停止作业运行,防止作业注册中心将失效的分片分项配给其他作业服务器,而当前作业服务器仍在执行任务,导致重复执行。
  • **Spring****支持:**支持spring容器,自定义命名空间,支持占位符。
  • **运维平台:**提供运维界面,可以管理作业和注册中心。

3.快速开发

官方的版本可以通过两种方式进行作业开发:

  1. 通过代码动态加载;
  2. 通过spring的xml文件配置加载:

具体可以参看官方文档——快速入门、官方文档——开发指南。

由于我们的项目采用的是springboot框架,因此个人在官方代码的基础上做了一层封装,增加通过注解配置任务、sleuth组件支持,具体见:elastic-job-spring-boot-starter。

3.1 作业开发

Elastic-Job提供Simple、Dataflow和Script 3种作业类型。 方法参数shardingContext包含作业配置、片和运行时信息。可通过getShardingTotalCount(), getShardingItem()等方法分别获取分片总数,运行在本作业服务器的分片序列号等。

a. Simple类型作业

意为简单实现,未经任何封装的类型。需实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。

public class MyElasticJob implements SimpleJob {@Overridepublic void execute(ShardingContext context) {switch (context.getShardingItem()) {case 0: // do something by sharding item 0break;case 1: // do something by sharding item 1break;case 2: // do something by sharding item 2break;// case n: ...}}
}

b. Dataflow类型作业

Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。

public class MyElasticJob implements DataflowJob<Foo> {@Overridepublic List<Foo> fetchData(ShardingContext context) {switch (context.getShardingItem()) {case 0: List<Foo> data = // get data from database by sharding item 0return data;case 1: List<Foo> data = // get data from database by sharding item 1return data;case 2: List<Foo> data = // get data from database by sharding item 2return data;// case n: ...}}@Overridepublic void processData(ShardingContext shardingContext, List<Foo> data) {// process data// ...}
}

流式处理

可通过DataflowJobConfiguration配置是否流式处理。

流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。

如果采用流式作业处理方式,建议processData处理数据后更新其状态,避免fetchData再次抓取到,从而使得作业永不停止。 流式数据处理参照TbSchedule设计,适用于不间歇的数据处理。

c. Script类型作业

Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。

#!/bin/bash
echo sharding execution context is $*

作业运行时输出

sharding execution context is {“jobName”:“scriptElasticDemoJob”,“shardingTotalCount”:10,“jobParameter”:“”,“shardingItem”:0,“shardingParameter”:“A”}

3.2 作业配置

Elastic-Job配置分为3个层级,分别是Core, Type和Root。每个层级使用相似于装饰者模式的方式装配。

Core对应JobCoreConfiguration,用于提供作业核心配置信息,如:作业名称、分片总数、CRON表达式等。

Type对应JobTypeConfiguration,有3个子类分别对应SIMPLE, DATAFLOW和SCRIPT类型作业,提供3种作业需要的不同配置,如:DATAFLOW类型是否流式处理或SCRIPT类型的命令行等。

Root对应JobRootConfiguration,有2个子类分别对应Lite和Cloud部署类型,提供不同部署类型所需的配置,如:Lite类型的是否需要覆盖本地配置或Cloud占用CPU或Memory数量等。

a. 使用Java代码配置

通用作业配置

    // 定义作业核心配置JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();// 定义SIMPLE类型配置SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, SimpleDemoJob.class.getCanonicalName());// 定义Lite作业根配置JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();// 定义作业核心配置JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder("demoDataflowJob", "0/30 * * * * ?", 10).build();// 定义DATAFLOW类型配置DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, DataflowDemoJob.class.getCanonicalName(), true);// 定义Lite作业根配置JobRootConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).build();// 定义作业核心配置配置JobCoreConfiguration scriptCoreConfig = JobCoreConfiguration.newBuilder("demoScriptJob", "0/45 * * * * ?", 10).build();// 定义SCRIPT类型配置ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(scriptCoreConfig, "test.sh");// 定义Lite作业根配置JobRootConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptCoreConfig).build();

b. Spring命名空间配置

与Spring容器配合使用作业,可将作业Bean配置为Spring Bean,并在作业中通过依赖注入使用Spring容器管理的数据源等对象。可用placeholder占位符从属性文件中取值。Lite可考虑使用Spring命名空间方式简化配置。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.dangdang.com/schema/ddframe/reg http://www.dangdang.com/schema/ddframe/reg/reg.xsd http://www.dangdang.com/schema/ddframe/job http://www.dangdang.com/schema/ddframe/job/job.xsd "><!--配置作业注册中心 --><reg:zookeeper id="regCenter" server-lists="yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" /><!-- 配置简单作业--><job:simple id="simpleElasticJob" class="xxx.MySimpleElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /><bean id="yourRefJobBeanId" class="xxx.MySimpleRefElasticJob"><property name="fooService" ref="xxx.FooService"/></bean><!-- 配置关联Bean作业--><job:simple id="simpleRefElasticJob" job-ref="yourRefJobBeanId" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /><!-- 配置数据流作业--><job:dataflow id="throughputDataflow" class="xxx.MyThroughputDataflowElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /><!-- 配置脚本作业--><job:script id="scriptElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" script-command-line="/your/file/path/demo.sh" /><!-- 配置带监听的简单作业--><job:simple id="listenerElasticJob" class="xxx.MySimpleListenerElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C"><job:listener class="xx.MySimpleJobListener"/><job:distributed-listener class="xx.MyOnceSimpleJobListener" started-timeout-milliseconds="1000" completed-timeout-milliseconds="2000" /></job:simple><!-- 配置带作业数据库事件追踪的简单作业--><job:simple id="eventTraceElasticJob" class="xxx.MySimpleListenerElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" event-trace-rdb-data-source="yourDataSource"></job:simple>
</beans>

配置项详细说明请参见配置手册

3.3 作业启动

a. Java启动方式

public class JobDemo {public static void main(String[] args) {new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();}private static CoordinatorRegistryCenter createRegistryCenter() {CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181", "elastic-job-demo"));regCenter.init();return regCenter;}private static LiteJobConfiguration createJobConfiguration() {// 创建作业配置...}
}

b. Spring启动方式

将配置Spring命名空间的xml通过Spring启动,作业将自动加载。

4.作业流程

作业启动流程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FsenYazD-1586316152446)(file:///C:/Users/zuhiz/AppData/Local/Packages/oice_16_974fa576_32c1d314_209e/AC/Temp/msohtmlclip1/01/clip_image001.jpg)]

作业执行流程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uIDKppXc-1586316152447)(file:///C:/Users/zuhiz/AppData/Local/Packages/oice_16_974fa576_32c1d314_209e/AC/Temp/msohtmlclip1/01/clip_image003.jpg)]

  1. 第一台服务器上线触发主服务器选举。主服务器一旦下线,则重新触发选举,选举过程中阻塞,只有主服务器选举完成,才会执行其他任务。
  2. 某作业服务器上线时会自动将服务器信息注册到注册中心,下线时会自动更新服务器状态。
  3. 主节点选举,服务器上下线,分片总数变更均更新重新分片标记。
  4. 定时任务触发时,如需重新分片,则通过主服务器分片,分片过程中阻塞,分片结束后才可执行任务。如分片过程中主服务器下线,则先选举主服务器,再分片。
  5. 通过上一项说明可知,为了维持作业运行时的稳定性,运行过程中只会标记分片状态,不会重新分片。分片仅可能发生在下次任务触发前。
  6. 每次分片都会按服务器IP排序,保证分片结果不会产生较大波动(默认分片策略)。
  7. 实现失效转移功能,在某台服务器执行完毕后主动抓取未分配的分片,并且在某台服务器下线后主动寻找可用的服务器执行任务。

5.其它

5.1 运维平台

支持功能:

· 登录安全控制

· 注册中心、事件追踪数据源管理

· 快捷修改作业设置

· 作业和服务器维度状态查看

· 操作作业禁用\启用、停止和删除等生命周期

· 事件追踪查询

具体见elastic-job-lite-console。

5.2 事件追踪

Elastic-Job提供了事件追踪功能,可通过事件订阅的方式处理调度过程的重要事件,用于查询、统计和监控。Elastic-Job目前提供了基于关系型数据库两种事件订阅方式记录事件。

在配置作业关联数据源后,elastic-job会自动在数据库里创建JOB_EXECUTION_LOG和JOB_STATUS_TRACE_LOG两张表以及若干索引。

JOB_EXECUTION_LOG:记录每次作业的执行历史。分为两个步骤:

\1. 作业开始执行时向数据库插入数据,除failure_cause和complete_time外的其他字段均不为空。

\2. 作业完成执行时向数据库更新数据,更新is_success, complete_time和failure_cause(如果作业执行失败)。

JOB_STATUS_TRACE_LOG:记录作业状态变更痕迹表。可通过每次作业运行的task_id查询作业状态变化的生命周期和运行轨迹。

——官方版本并不支持oracle,elastic-job-spring-boot-starter中已支持。

5.3 作业监听器

可通过配置多个任务监听器,在任务执行前和执行后执行监听的方法。监听器分为每台作业节点均执行和分布式场景中仅单一节点执行2种。

  • 每台作业节点均执行的监听:若作业处理作业服务器的文件,处理完成后删除文件,可考虑使用每个节点均执行清理任务。此类型任务实现简单,且无需考虑全局分布式任务是否完成,请尽量使用此类型监听器。

· 分布式场景中仅单一节点执行的监听:若作业处理数据库数据,处理完成后只需一个节点完成数据清理任务即可。此类型任务处理复杂,需同步分布式环境下作业的状态同步,提供了超时设置来避免作业不同步导致的死锁,请谨慎使用。

5.4 ……(见官方文档)

6.参考资料

Elastic-job 介绍与使用

Elastic-job使用及原理

Elastic-Job官方文档

一个节点完成数据清理任务即可。此类型任务处理复杂,需同步分布式环境下作业的状态同步,提供了超时设置来避免作业不同步导致的死锁,请谨慎使用。

5.4 ……(见官方文档)

6.参考资料

Elastic-job 介绍与使用

Elastic-job使用及原理

Elastic-Job官方文档

分布式作业调度框架——Elastic-Job相关推荐

  1. 分布式定时任务框架Elastic-Job的使用

    为什么80%的码农都做不了架构师?>>>    一.前言 Elastic-Job是一个优秀的分布式作业调度框架. Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项 ...

  2. 分布式任务调度框架和微服务的区别

    一.前言 分布式大行其下的时代,让大家彻底的抛弃了传统陈旧的技术框架.几乎每一个技术人都知道和掌握了微服务架构,微服务自然有它的美,但是所以技术框架都必须服务于业务,结合自身业务选取甚至自研适合自身的 ...

  3. 快手八卦!突破TensorFlow、PyTorch并行瓶颈的开源分布式训练框架来了!

    来源:AI前线本文约5200字,建议阅读8分钟 本文介绍了专门针对分布式场景设计了特定的优化算法同比,性能较同类提升60%. 近日,快手和苏黎世理工宣布开源分布式训练框架 Bagua(八卦),相比于 ...

  4. 6_分布式训练框架Horovod使用(20190111)

    分布式训练框架Horovod使用 文章目录 一.Horovod简介 二.Horovod框架的安装 Install 1.安装OpenMPI 2.安装Horovod 三.Horovod框架的使用 1.在项 ...

  5. 分布式定时任务框架选型,写得太好了!

    点击关注公众号,实用技术文章及时了解 为什么我们需要定时任务 我们先思考下面几个业务场景的解决方案: 支付系统每天凌晨1点跑批,进行一天清算,每月1号进行上个月清算 电商整点抢购,商品价格8点整开始优 ...

  6. 唯品会开源分布式作业调度平台Saturn

    目录 前言 背景 一.简介 二.特性 三.快速开始 一键启动 Docker启动 前言 Saturn英文意思是:土星,太阳系中的第二大行星.比地球高一个层级,地球是八大行星之一.而今天我们介绍的Satu ...

  7. 再见 xxl-job!更强大的新一代分布式任务调度框架来了!

    大家好,我是老赵 概述 PowerJob是新一代分布式任务调度与计算框架,支持CRON.API.固定频率.固定延迟等调度策略,提供工作流来编排任务解决依赖关系,能让您轻松完成作业的调度与繁杂任务的分布 ...

  8. 再见 xxl-job!更强大的新一代分布式任务调度框架来了

    因公众号更改推送规则,请点"在看"并加"星标"第一时间获取精彩技术分享 点击关注#互联网架构师公众号,领取架构师全套资料 都在这里 0.2T架构师学习资料干货分 ...

  9. 只会用 xxl-job?更强大的新一代分布式任务调度框架来了!

    点击关注公众号,实用技术文章及时了解 概述 PowerJob是新一代分布式任务调度与计算框架,支持CRON.API.固定频率.固定延迟等调度策略,提供工作流来编排任务解决依赖关系,能让您轻松完成作业的 ...

最新文章

  1. Zabbix 3.0 基础介绍 [一]
  2. cJsonFiles数据结构
  3. 鸟哥的Linux私房菜(服务器)- 第二十一章、文件服务器之三: FTP 服务器
  4. oracle中pdb,Oracle12c数据库创建pdb的3种方法
  5. RabbitMQ入门(三)-Publish/Subscribe(发布/订阅)
  6. ffplay分析 (音视频同步:主时钟为音频)
  7. 对比linux终端模式和图形模式,Linux知识-2. Linux初学(CnetOS Linux7)之切换命令模式和图形模式...
  8. html怎么在图片上添加文字_Image J基础操作:给图片添加文字和标注
  9. Java 线程实例一(查看线程是否存活、获取当前线程名称、状态监测、线程优先级设置、死锁及解决方法、获取线程id、线程挂起)
  10. redis启动.停止.重启
  11. WINDOWS假冒KERBEROS令牌***域
  12. java jdk 加密_jdk实现常见的加密算法
  13. todo: 网口驱动
  14. linux打包压缩命令
  15. qq视频转码失败怎么办_迅捷视频转换器转换失败的解决方法
  16. 数据结构试卷及答案(七)
  17. c语言分数乘法,武汉小学数学六年级
  18. 【渝粤题库】陕西师范大学165203 社会保险学 作业(专升本)
  19. uni-app -- 小程序分享遇到的问题
  20. 字节终面:说说Kakfa副本状态机的实现原理?

热门文章

  1. 海康摄像头PS流格式解析(RTP/PS/H264)
  2. windows 7的瘦身版
  3. 51单片机的PID水温控制器设计
  4. SQL将多列的值合并为一列
  5. 历年国二c语言真题百度网盘 下载,历年国家计算机二级C语言上机国二真题整理100套...
  6. 火车头抓取豆瓣影评案例
  7. 调用Minitab软件应用于工业数据分析
  8. Sublime Merge——一款超好用的Git可视化工具
  9. 猫猫新开通了新浪微博,欢迎小伙伴们来关注哟
  10. 搞懂Pandas数据合并,这一片就够了