1.概述

转载:分布式调度系统之 Elastic-Job-Lite

本文章会记录在当当网的开源框架 ElasticJob的学习案例

2.任务调度框架 Quartz

在ElasticJob中,底层分封装了Quartz,所以我们先来看下Quartz的相关知识

2.1 cron表达式简介

创建作业任务时间触发器(类似于公交⻋出⻋时间表)

cron表达式由七个位置组成,空格分隔

  • 1、Seconds(秒) 0~59

  • 2、Minutes(分) 0~59

  • 3、Hours(⼩时) 0~23

  • 4、Day of Month(天)1~31,注意有的⽉份不⾜31天

  • 5、Month(⽉) 0~11,或者JAN,FEB,MAR,APR,MAY,JUN,JUL,AUG,SEP,OCT,NOV,DEC

  • 6、Day of Week(周) 1~7,1=SUN或者 SUN,MON,TUE,WEB,THU,FRI,SAT

  • 7、Year(年)1970~2099 可选项

示例:

* 0 0 11 * * ? 每天的11点触发执⾏⼀次
* 0 30 10 1 * ? 每⽉1号上午10点半触发执⾏⼀次

2.2 配置pom文件

    <dependencies><dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.3.2</version></dependency></dependencies>

2.3 主体代码

public class QuartzMain {//创建一个调度器public static Scheduler createScheduler() throws SchedulerException {SchedulerFactory schedulerFactory = new StdSchedulerFactory();Scheduler scheduler = schedulerFactory.getScheduler();return scheduler;}//创建一个任务详情public static JobDetail createJob(){JobBuilder jobBuilder = JobBuilder.newJob(DemoJob.class);jobBuilder.withIdentity("jobName","myJob");JobDetail jobDetail = jobBuilder.build();return jobDetail;}//创建一个触发器public static Trigger createTrigger(){CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity("triggerName", "myTrigger").startNow().withSchedule(CronScheduleBuilder.cronSchedule("*/2 * * * * ?")).build();return trigger;}//执行主体内容public static void main(String[] args) throws SchedulerException {Scheduler scheduler = QuartzMain.createScheduler();JobDetail job = QuartzMain.createJob();Trigger trigger = QuartzMain.createTrigger();scheduler.scheduleJob(job,trigger);scheduler.start();}
}

演示效果如下,没两秒钟打印输出任务

我是一个定时任务
我是一个定时任务
我是一个定时任务

3.分布式调度框架Elastic-Job

3.1 简介

Elastic-Job 是当当⽹开源的⼀个分布式调度解决⽅案,基于Quartz⼆次开发的,由两个相互独⽴的⼦项⽬Elastic-Job-Lite和Elastic-Job-Cloud组成

Elastic-Job-Lite 轻量级⽆中⼼化解决⽅案,使⽤Jar包的形式提供分布式任务的协调服务
Elastic-Job-Cloud 需要结合Mesos以及Docker在云环境下使⽤。

3.2 主要功能

  1. 【分布式调度协调】 在分布式环境中,任务能够按指定的调度策略执⾏,并且能够避免同⼀任务多实例重复执⾏

  2. 【丰富的调度策略】基于成熟的定时任务作业框架Quartz cron表达式执⾏定时任务

  3. 【弹性扩容缩容】 当集群中增加某⼀个实例,它应当也能够被选举并执⾏任务;当集群减少⼀个实例

时,它所执⾏的任务能被转移到别的实例来执⾏。

  1. 【失效转移】某实例在任务执⾏失败后,会被转移到其他实例执⾏

  2. 【错过执⾏作业重触发】 若因某种原因导致作业错过执⾏,⾃动记录错过执⾏的作业,并在上次作业

完成后⾃动触发。

  1. 【⽀持并⾏调度】 ⽀持任务分⽚,任务分⽚是指将⼀个任务分为多个⼩任务项在多个实例同时执⾏。

  2. 【作业分⽚⼀致性】 当任务被分⽚后,保证同⼀分⽚在分布式环境中仅⼀个执⾏实例。

3.3 开始使用 Elastic-Job

Elastic-Job依赖于Zookeeper进⾏分布式协调,所以需要安装Zookeeper软件

3.3.1 安装Zookeeper

  1. 先从官方文件下载一个镜像文mirrors.tuna.tsinghua.edu.cn/apache/zook…

  2. 对压缩包进行解压 tar -zxvf zookeeper-3.4.14.tar.gz

  3. 进入conf目录 cp zoo_sample.cfg zoo.cfg

  4. 进入bin 目录,启动zk服务器

./zkServer.sh start   --启动
./zkServer.sh stop    --关闭
./zkServer.sh status    --关闭

启动服务

[root@localhost bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /root/zookeeper-3.4.14/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

查看状态

[root@localhost bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /root/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: standalone

3.3.2 zookeeper可视化工具

网络下载地址添加链接描述

解压之后进入到build目录

使用java -jar 启动可视化工具

java -jar .\zookeeper-dev-ZooInspector.jar

输入ip和端口号,端口号默认为2181

3.4 数据库建表语句

这里打算做一个同步业务,把resume中的用户 归档到 resume_bak中

-- ----------------------------
-- Table structure for resume
-- ----------------------------
DROP TABLE IF EXISTS `resume`;
CREATE TABLE `resume` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,`sex` varchar(255) DEFAULT NULL,`phone` varchar(255) DEFAULT NULL,`address` varchar(255) DEFAULT NULL,`education` varchar(255) DEFAULT NULL,`state` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
SET FOREIGN_KEY_CHECKS = 1;-- ----------------------------
-- Table structure for resume_bak
-- ----------------------------
DROP TABLE IF EXISTS `resume_bak`;
CREATE TABLE `resume_bak` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,`sex` varchar(255) DEFAULT NULL,`phone` varchar(255) DEFAULT NULL,`address` varchar(255) DEFAULT NULL,`education` varchar(255) DEFAULT NULL,`state` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
SET FOREIGN_KEY_CHECKS = 1;

3.5 主体代码

当前的主体代码主要功能是:创建配置,协调配置中心,调度计划,分片等,这里我们仅仅配置了一个分片

public class ElasticJobMain {public static void main(String[] args) {//1、创建一个Zookeeper的配置文件//配置ip地址即可,端口号默认为ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("192.168.56.101", "myjob-name");//2、配置分布式协调注册中心CoordinatorRegistryCenter coordinatorRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);coordinatorRegistryCenter.init();//3、指定作业的调度频率,周期//这里和Quartz不同的地方在于,elastic-job 有分片的概念//分片其实是一个很常见的概念,比如redis有分片,elasticSearch也有分片的概念JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("archive-job", "*/1 * * * * ?", 1).build();//4、这里封装了频率属性和job任务SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, ArchivieJob.class.getName());//5、再次把simple配置 封装成 LiteJob配置LiteJobConfiguration jobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();/*** 这里可以看到,同样是Configuration,在设计的时候,不是一股脑的把所有的信息放一起* 而是分解成不同的 configuration,有处理频率的配置,有赋值job任务的配置,有轻量级配置* 然后他们之间使用组合的形式,进行一层一层的封装 每个类保持独立*///6、创建任务调度器,//需要 协同注册中心//需要 轻量级的job配置JobScheduler jobScheduler = new JobScheduler(coordinatorRegistryCenter, jobConfiguration);jobScheduler.init();}
}

配置具体任务

/*** 在实现Job的时候只需实现一个SimpleJob即可*/
public class ArchivieJob implements SimpleJob {public void execute(ShardingContext shardingContext) {int shardingItem = shardingContext.getShardingItem();String jobParameter = shardingContext.getJobParameter();String name = ManagementFactory.getRuntimeMXBean().getName();String machineName = name.split("@")[0];String pid = name.split("@")[1];System.out.println("进程号:" + pid+" 机器名:"+machineName+" shardingItem:"+shardingItem +" jobParameter:"+jobParameter);}
}

3.6 IDEA支持同时启动多个main运行实例

IDEA默认不支持同时启动多个main方法,这里需要配置一下,允许并行运行程序

3.7 观看zookeeper中的节点值

可以看到在可视化工具中有几个节点需要关注一下

instances 此节点说明当前当前只有一台机器连接了


可以看到控制台打印输出的时候,也说明了当前端口号是27204


只要再次运行一个控制台程序,就会在instances中,有增加了一台机器。此时,我们把关注点放在sharding上,可以发现当前任务仅仅只有一个分片-0,而且这个实例就是 27204。

此时我们尝试增加一下分片,然后再看看分片的情况,此时设置为3,并且只跑一个实例

 JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("archive-job", "*/1 * * * * ?", 3).build();

可以看到,分片增加了三个,并且分片中每个实例都是9424

3.8 Leader节点选举机制

刚刚我们观察到的增加分片,启动多个实例,观察客户端工具,背后的实质就是他们的选举机制,原理如下:

  1. 每个Elastic-job作为Zookeeper的客户端,它来操作zookeeper的znode
  2. 多个实例同时去创建 /leader节点
  3. /leader节点只能创建一个,后创建的会失败,创建成功的实例会被选为leader节点,执⾏任务

4 轻量级去中心化

4.1 去中⼼化

  1. 执行节点对等,每套程序都是一样的
  2. 定时调度自触发,不需要调度中心进行调配
  3. 服务自发现(通过注册中心的服务发现)
  4. 主节点非固定

4.2 轻量级

  1. 所有的文件都打包在一个Jar文件中
  2. 仅仅需要依赖zookeeper服务

4.3 架构框图


这个接口包含了主要的功能

/*** 作业内部服务门面服务.* * @author zhangliang*/
public interface JobFacade {/*** 读取作业配置.* * @param fromCache 是否从缓存中读取* @return 作业配置*/JobRootConfiguration loadJobRootConfiguration(boolean fromCache);/*** 检查作业执行环境.* * @throws JobExecutionEnvironmentException 作业执行环境异常*/void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException;/*** 如果需要失效转移, 则执行作业失效转移.*/void failoverIfNecessary();/*** 注册作业启动信息.** @param shardingContexts 分片上下文*/void registerJobBegin(ShardingContexts shardingContexts);/*** 注册作业完成信息.** @param shardingContexts 分片上下文*/void registerJobCompleted(ShardingContexts shardingContexts);/*** 获取当前作业服务器的分片上下文.** @return 分片上下文*/ShardingContexts getShardingContexts();/*** 设置任务被错过执行的标记.** @param shardingItems 需要设置错过执行的任务分片项* @return 是否满足misfire条件*/boolean misfireIfRunning(Collection<Integer> shardingItems);/*** 清除任务被错过执行的标记.** @param shardingItems 需要清除错过执行的任务分片项*/void clearMisfire(Collection<Integer> shardingItems);/*** 判断作业是否需要执行错过的任务.* * @param shardingItems 任务分片项集合* @return 作业是否需要执行错过的任务*/boolean isExecuteMisfired(Collection<Integer> shardingItems);/*** 判断作业是否符合继续运行的条件.* * <p>如果作业停止或需要重分片或非流式处理则作业将不会继续运行.</p>* * @return 作业是否符合继续运行的条件*/boolean isEligibleForJobRunning();/**判断是否需要重分片.** @return 是否需要重分片*/boolean isNeedSharding();/*** 作业执行前的执行的方法.** @param shardingContexts 分片上下文*/void beforeJobExecuted(ShardingContexts shardingContexts);/*** 作业执行后的执行的方法.** @param shardingContexts 分片上下文*/void afterJobExecuted(ShardingContexts shardingContexts);/*** 发布执行事件.** @param jobExecutionEvent 作业执行事件*/void postJobExecutionEvent(JobExecutionEvent jobExecutionEvent);/*** 发布作业状态追踪事件.** @param taskId 作业Id* @param state 作业执行状态* @param message 作业执行消息*/void postJobStatusTraceEvent(String taskId, JobStatusTraceEvent.State state, String message);
}

Elastic job主要的功能有支持弹性扩容,通过Zookepper集中管理和监控job,支持失效转移等

4.4 任务分片

⼀个⼤的⾮常耗时的作业Job,⽐如:⼀次要处理⼀亿的数据,那这⼀亿的数据存储在数据库中,如果
⽤⼀个作业节点处理⼀亿数据要很久,在互联⽹领域是不太能接受的,互联⽹领域更希望机器的增加去
横向扩展处理能⼒。所以,ElasticJob可以把作业分为多个的task(每⼀个task就是⼀个任务分⽚),每
⼀个task交给具体的⼀个机器实例去处理(⼀个机器实例是可以处理多个task的),但是具体每个task
执⾏什么逻辑由我们⾃⼰来指定。

配置使用分片


获取分片的信息

分片策略

系统中,有一个作业分片策略类:JobShardingStrategy,并且有三个子类

public interface JobShardingStrategy {/*** 作业分片.* @param jobInstances 所有参与分片的单元列表* @param jobName 作业名称* @param shardingTotalCount 分片总数* @return 分片结果*/Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount);
}

AverageAllocationJobShardingStrategy 基于平均分配算法的分片策略.

 * 如果分片不能整除, 则不能整除的多余分片将依次追加到序号小的服务器.* 如: * 1. 如果有3台服务器, 分成9片, 则每台服务器分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8].* 2. 如果有3台服务器, 分成8片, 则每台服务器分到的分片是: 1=[0,1,6], 2=[2,3,7], 3=[4,5].* 3. 如果有3台服务器, 分成10片, 则每台服务器分到的分片是: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8].

RotateServerByNameJobShardingStrategy 根据作业名的哈希值对服务器列表进行轮转的分片策略.

后续查看原文。。。

【ES】分布式调度系统之 Elastic-Job-Lite相关推荐

  1. 单集群10万节点 走进腾讯云分布式调度系统VStation

    云计算并非无中生有的概念,它将普通的单台PC计算能力通过分布式调度软件连接起来.其最核心的问题是如何把一百台.一千台.一万台机器高效地组织起来,灵活进行任务调度和管理,从而像使用单台机器一样方便地使用 ...

  2. 王者归来:分布式调度解决方案 ElasticJob 重启!

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 你会误认为 ElasticJob 只是作业管控平台么?创 ...

  3. 国产最强分布式调度,它回来了

    导读: 调度(Scheduling)在计算机领域是个庞大概念,CPU 调度.内存调度.进程调度等都可称之为调度.它是指在特定的时机分配合理的资源去处理预先确定的任务,用于在适当的时机触发一个包含业务逻 ...

  4. 承载每天10万级任务的数据调度系统的架构是如何设计的

    消息:分布式工作流任务调度系统Apache DolphinScheduler开源地址:https://github.com/apache/incubator-dolphinscheduler , 欢迎 ...

  5. 专访腾讯云沙开波:从无到有,打造全球领先调度系统

    \\ "以前每个周末都去跑跑步.打打球,现在回家就是接接小孩,随着年龄的增长,很多爱好都丢了." \\\\ 沙开波--腾讯云计算产品总监,从一名基础架构组的程序员,到如今腾讯云计算 ...

  6. 面向大数据与云计算调度挑战的阿里经济体核心调度系统

    编者按 伏羲(Fuxi)是十年前最初创立飞天平台时的三大服务之一(分布式存储 Pangu,分布式计算 MaxCompute,分布式调度 Fuxi),当时的设计初衷是为了解决大规模分布式资源的调度问题( ...

  7. 面向大数据处理应用的广域存算协同调度系统

    点击上方蓝字关注我们 面向大数据处理应用的广域存算协同调度系统 张晨浩1,2, 肖利民1,2, 秦广军3, 宋尧1,2, 蒋世轩1,2, 王继业4 1 软件开发环境国家重点实验室,北京 100191 ...

  8. 分布式任务调度系统xxl-job源码探究(一、客户端)

    前面讲了xxl-job的搭建,现在来粗略的解析下该分布式调度系统的源码,先来客户点代码 客户端源码 客户端开启的时候会向服务中心进行注册,其实现用的是jetty连接,且每隔半分钟会发送一次心跳,来告诉 ...

  9. 赫拉(hera)分布式任务调度系统之项目启动(二)

    文章目录 赫拉 创建表 打包部署 测试 TIPS 加入群聊 赫拉 大数据平台,随着业务发展,每天承载着成千上万的ETL任务调度,这些任务集中在hive,shell脚本调度.怎么样让大量的ETL任务准确 ...

最新文章

  1. C++11 constexpr使用
  2. python yield与递归
  3. Ubuntu 10.10配置JRE、JDK、Eclipse和Tomcat7.0.5
  4. mysql query结果集_如何解决PHP使用mysql_query查询超大结果集超内存问题
  5. 从近期两篇论文看大规模商品图嵌入
  6. 漫画:什么是分布式锁
  7. 速记 封装案例 银行取款功能 1210
  8. 关于如何获得网站集宿主网站集地址的问题
  9. centos ll 格式化时间_审理文书格式化的实践探索
  10. 海量数据挖掘MMDS week2: 频繁项集挖掘 Apriori算法的改进:基于hash的方法
  11. 拓端tecdat|R语言NLP案例:LDA主题文本挖掘优惠券推荐网站数据
  12. 2018华为网络技术大赛课程-服务器操作系统基础原理自测题答案
  13. adminlte java_AdminLTE Button小结
  14. 广联达只有土建打不开_广联达BIM钢筋算量为什么打开工程是灰屏?工程打不开怎么办?...
  15. en开头的单词_英语四级en-词汇前后缀解析
  16. clickhouse ARRAY JOIN函数
  17. mysql 未找到 WinSxS_window_win7系统如何使用WinSxS工具安全删除WinSxS文件夹垃圾?,WinSxS文件: WinSxS是系统文件Wi - phpStudy...
  18. 射影几何----齐次坐标下的三点共线和非齐次坐标下的三点共线是等价的
  19. 计算机中的同步和异步
  20. 圣诞节,1inch狂撒3亿美金红包,币圈大佬在线炫富,我酸了...

热门文章

  1. 全网沸腾!鸿蒙手机要来了
  2. 小鹏汽车4月交付量5147台 同比增长285%
  3. SpaceX星舰飞船首次试飞成功着陆!但没想到还是爆炸了...
  4. 3月4日见!Redmi Note 10系列屏幕升级:首次采用Super AMOLED屏
  5. 《和平精英》玩跨界,特斯拉主题店超级充电站现身海岛
  6. 华为Mate 40系列或首发屏下摄像头:全球首个量产级别方案
  7. 特斯拉入驻天猫卖车了 将连做8天直播
  8. 安兔兔发布9月份Android性能榜:855霸榜,华为竟垫底
  9. 你以为环幕屏就结束了?真正的小米MIX 4或下月发布:1亿像素相机加持
  10. 腾讯Q2财报看点:游戏营收同比止跌 B端业务成第二大营收来源