【ES】分布式调度系统之 Elastic-Job-Lite
1.概述
转载:分布式调度系统之 Elastic-Job-Lite
本文章会记录在当当网的开源框架 ElasticJob的学习案例
2.任务调度框架 Quartz
在ElasticJob中,底层分封装了Quartz,所以我们先来看下Quartz的相关知识
2.1 cron表达式简介
创建作业任务时间触发器(类似于公交⻋出⻋时间表)
cron表达式由七个位置组成,空格分隔
1、Seconds(秒) 0~59
2、Minutes(分) 0~59
3、Hours(⼩时) 0~23
4、Day of Month(天)1~31,注意有的⽉份不⾜31天
5、Month(⽉) 0~11,或者
JAN,FEB,MAR,APR,MAY,JUN,JUL,AUG,SEP,OCT,NOV,DEC
6、Day of Week(周) 1~7,1=SUN或者 SUN,MON,TUE,WEB,THU,FRI,SAT
7、Year(年)1970~2099 可选项
示例:
* 0 0 11 * * ? 每天的11点触发执⾏⼀次
* 0 30 10 1 * ? 每⽉1号上午10点半触发执⾏⼀次
2.2 配置pom文件
<dependencies><dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.3.2</version></dependency></dependencies>
2.3 主体代码
public class QuartzMain {//创建一个调度器public static Scheduler createScheduler() throws SchedulerException {SchedulerFactory schedulerFactory = new StdSchedulerFactory();Scheduler scheduler = schedulerFactory.getScheduler();return scheduler;}//创建一个任务详情public static JobDetail createJob(){JobBuilder jobBuilder = JobBuilder.newJob(DemoJob.class);jobBuilder.withIdentity("jobName","myJob");JobDetail jobDetail = jobBuilder.build();return jobDetail;}//创建一个触发器public static Trigger createTrigger(){CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity("triggerName", "myTrigger").startNow().withSchedule(CronScheduleBuilder.cronSchedule("*/2 * * * * ?")).build();return trigger;}//执行主体内容public static void main(String[] args) throws SchedulerException {Scheduler scheduler = QuartzMain.createScheduler();JobDetail job = QuartzMain.createJob();Trigger trigger = QuartzMain.createTrigger();scheduler.scheduleJob(job,trigger);scheduler.start();}
}
演示效果如下,没两秒钟打印输出任务
我是一个定时任务
我是一个定时任务
我是一个定时任务
3.分布式调度框架Elastic-Job
3.1 简介
Elastic-Job 是当当⽹开源的⼀个分布式调度解决⽅案,基于Quartz⼆次开发的,由两个相互独⽴的⼦项⽬Elastic-Job-Lite和Elastic-Job-Cloud组成
Elastic-Job-Lite 轻量级⽆中⼼化解决⽅案,使⽤Jar包的形式提供分布式任务的协调服务
Elastic-Job-Cloud 需要结合Mesos以及Docker在云环境下使⽤。
3.2 主要功能
【分布式调度协调】 在分布式环境中,任务能够按指定的调度策略执⾏,并且能够避免同⼀任务多实例重复执⾏
【丰富的调度策略】基于成熟的定时任务作业框架Quartz cron表达式执⾏定时任务
【弹性扩容缩容】 当集群中增加某⼀个实例,它应当也能够被选举并执⾏任务;当集群减少⼀个实例
时,它所执⾏的任务能被转移到别的实例来执⾏。
【失效转移】某实例在任务执⾏失败后,会被转移到其他实例执⾏
【错过执⾏作业重触发】 若因某种原因导致作业错过执⾏,⾃动记录错过执⾏的作业,并在上次作业
完成后⾃动触发。
【⽀持并⾏调度】 ⽀持任务分⽚,任务分⽚是指将⼀个任务分为多个⼩任务项在多个实例同时执⾏。
【作业分⽚⼀致性】 当任务被分⽚后,保证同⼀分⽚在分布式环境中仅⼀个执⾏实例。
3.3 开始使用 Elastic-Job
Elastic-Job依赖于Zookeeper进⾏分布式协调,所以需要安装Zookeeper软件
3.3.1 安装Zookeeper
先从官方文件下载一个镜像文mirrors.tuna.tsinghua.edu.cn/apache/zook…
对压缩包进行解压 tar -zxvf zookeeper-3.4.14.tar.gz
进入conf目录 cp zoo_sample.cfg zoo.cfg
进入bin 目录,启动zk服务器
./zkServer.sh start --启动
./zkServer.sh stop --关闭
./zkServer.sh status --关闭
启动服务
[root@localhost bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /root/zookeeper-3.4.14/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
查看状态
[root@localhost bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /root/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: standalone
3.3.2 zookeeper可视化工具
网络下载地址添加链接描述
解压之后进入到build目录
使用java -jar 启动可视化工具
java -jar .\zookeeper-dev-ZooInspector.jar
输入ip和端口号,端口号默认为2181
3.4 数据库建表语句
这里打算做一个同步业务,把resume中的用户 归档到 resume_bak中
-- ----------------------------
-- Table structure for resume
-- ----------------------------
DROP TABLE IF EXISTS `resume`;
CREATE TABLE `resume` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,`sex` varchar(255) DEFAULT NULL,`phone` varchar(255) DEFAULT NULL,`address` varchar(255) DEFAULT NULL,`education` varchar(255) DEFAULT NULL,`state` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
SET FOREIGN_KEY_CHECKS = 1;-- ----------------------------
-- Table structure for resume_bak
-- ----------------------------
DROP TABLE IF EXISTS `resume_bak`;
CREATE TABLE `resume_bak` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,`sex` varchar(255) DEFAULT NULL,`phone` varchar(255) DEFAULT NULL,`address` varchar(255) DEFAULT NULL,`education` varchar(255) DEFAULT NULL,`state` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
SET FOREIGN_KEY_CHECKS = 1;
3.5 主体代码
当前的主体代码主要功能是:创建配置,协调配置中心,调度计划,分片等,这里我们仅仅配置了一个分片
public class ElasticJobMain {public static void main(String[] args) {//1、创建一个Zookeeper的配置文件//配置ip地址即可,端口号默认为ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("192.168.56.101", "myjob-name");//2、配置分布式协调注册中心CoordinatorRegistryCenter coordinatorRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);coordinatorRegistryCenter.init();//3、指定作业的调度频率,周期//这里和Quartz不同的地方在于,elastic-job 有分片的概念//分片其实是一个很常见的概念,比如redis有分片,elasticSearch也有分片的概念JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("archive-job", "*/1 * * * * ?", 1).build();//4、这里封装了频率属性和job任务SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, ArchivieJob.class.getName());//5、再次把simple配置 封装成 LiteJob配置LiteJobConfiguration jobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();/*** 这里可以看到,同样是Configuration,在设计的时候,不是一股脑的把所有的信息放一起* 而是分解成不同的 configuration,有处理频率的配置,有赋值job任务的配置,有轻量级配置* 然后他们之间使用组合的形式,进行一层一层的封装 每个类保持独立*///6、创建任务调度器,//需要 协同注册中心//需要 轻量级的job配置JobScheduler jobScheduler = new JobScheduler(coordinatorRegistryCenter, jobConfiguration);jobScheduler.init();}
}
配置具体任务
/*** 在实现Job的时候只需实现一个SimpleJob即可*/
public class ArchivieJob implements SimpleJob {public void execute(ShardingContext shardingContext) {int shardingItem = shardingContext.getShardingItem();String jobParameter = shardingContext.getJobParameter();String name = ManagementFactory.getRuntimeMXBean().getName();String machineName = name.split("@")[0];String pid = name.split("@")[1];System.out.println("进程号:" + pid+" 机器名:"+machineName+" shardingItem:"+shardingItem +" jobParameter:"+jobParameter);}
}
3.6 IDEA支持同时启动多个main运行实例
IDEA默认不支持同时启动多个main方法,这里需要配置一下,允许并行运行程序
3.7 观看zookeeper中的节点值
可以看到在可视化工具中有几个节点需要关注一下
instances 此节点说明当前当前只有一台机器连接了
可以看到控制台打印输出的时候,也说明了当前端口号是27204
只要再次运行一个控制台程序,就会在instances中,有增加了一台机器。此时,我们把关注点放在sharding上,可以发现当前任务仅仅只有一个分片-0,而且这个实例就是 27204。
此时我们尝试增加一下分片,然后再看看分片的情况,此时设置为3,并且只跑一个实例
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("archive-job", "*/1 * * * * ?", 3).build();
可以看到,分片增加了三个,并且分片中每个实例都是9424
3.8 Leader节点选举机制
刚刚我们观察到的增加分片,启动多个实例,观察客户端工具,背后的实质就是他们的选举机制,原理如下:
- 每个Elastic-job作为Zookeeper的客户端,它来操作zookeeper的znode
- 多个实例同时去创建 /leader节点
- /leader节点只能创建一个,后创建的会失败,创建成功的实例会被选为leader节点,执⾏任务
4 轻量级去中心化
4.1 去中⼼化
- 执行节点对等,每套程序都是一样的
- 定时调度自触发,不需要调度中心进行调配
- 服务自发现(通过注册中心的服务发现)
- 主节点非固定
4.2 轻量级
- 所有的文件都打包在一个Jar文件中
- 仅仅需要依赖zookeeper服务
4.3 架构框图
这个接口包含了主要的功能
/*** 作业内部服务门面服务.* * @author zhangliang*/
public interface JobFacade {/*** 读取作业配置.* * @param fromCache 是否从缓存中读取* @return 作业配置*/JobRootConfiguration loadJobRootConfiguration(boolean fromCache);/*** 检查作业执行环境.* * @throws JobExecutionEnvironmentException 作业执行环境异常*/void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException;/*** 如果需要失效转移, 则执行作业失效转移.*/void failoverIfNecessary();/*** 注册作业启动信息.** @param shardingContexts 分片上下文*/void registerJobBegin(ShardingContexts shardingContexts);/*** 注册作业完成信息.** @param shardingContexts 分片上下文*/void registerJobCompleted(ShardingContexts shardingContexts);/*** 获取当前作业服务器的分片上下文.** @return 分片上下文*/ShardingContexts getShardingContexts();/*** 设置任务被错过执行的标记.** @param shardingItems 需要设置错过执行的任务分片项* @return 是否满足misfire条件*/boolean misfireIfRunning(Collection<Integer> shardingItems);/*** 清除任务被错过执行的标记.** @param shardingItems 需要清除错过执行的任务分片项*/void clearMisfire(Collection<Integer> shardingItems);/*** 判断作业是否需要执行错过的任务.* * @param shardingItems 任务分片项集合* @return 作业是否需要执行错过的任务*/boolean isExecuteMisfired(Collection<Integer> shardingItems);/*** 判断作业是否符合继续运行的条件.* * <p>如果作业停止或需要重分片或非流式处理则作业将不会继续运行.</p>* * @return 作业是否符合继续运行的条件*/boolean isEligibleForJobRunning();/**判断是否需要重分片.** @return 是否需要重分片*/boolean isNeedSharding();/*** 作业执行前的执行的方法.** @param shardingContexts 分片上下文*/void beforeJobExecuted(ShardingContexts shardingContexts);/*** 作业执行后的执行的方法.** @param shardingContexts 分片上下文*/void afterJobExecuted(ShardingContexts shardingContexts);/*** 发布执行事件.** @param jobExecutionEvent 作业执行事件*/void postJobExecutionEvent(JobExecutionEvent jobExecutionEvent);/*** 发布作业状态追踪事件.** @param taskId 作业Id* @param state 作业执行状态* @param message 作业执行消息*/void postJobStatusTraceEvent(String taskId, JobStatusTraceEvent.State state, String message);
}
Elastic job主要的功能有支持弹性扩容,通过Zookepper集中管理和监控job,支持失效转移等
4.4 任务分片
⼀个⼤的⾮常耗时的作业Job,⽐如:⼀次要处理⼀亿的数据,那这⼀亿的数据存储在数据库中,如果
⽤⼀个作业节点处理⼀亿数据要很久,在互联⽹领域是不太能接受的,互联⽹领域更希望机器的增加去
横向扩展处理能⼒。所以,ElasticJob可以把作业分为多个的task(每⼀个task就是⼀个任务分⽚),每
⼀个task交给具体的⼀个机器实例去处理(⼀个机器实例是可以处理多个task的),但是具体每个task
执⾏什么逻辑由我们⾃⼰来指定。
配置使用分片
获取分片的信息
分片策略
系统中,有一个作业分片策略类:JobShardingStrategy,并且有三个子类
public interface JobShardingStrategy {/*** 作业分片.* @param jobInstances 所有参与分片的单元列表* @param jobName 作业名称* @param shardingTotalCount 分片总数* @return 分片结果*/Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount);
}
AverageAllocationJobShardingStrategy 基于平均分配算法的分片策略.
* 如果分片不能整除, 则不能整除的多余分片将依次追加到序号小的服务器.* 如: * 1. 如果有3台服务器, 分成9片, 则每台服务器分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8].* 2. 如果有3台服务器, 分成8片, 则每台服务器分到的分片是: 1=[0,1,6], 2=[2,3,7], 3=[4,5].* 3. 如果有3台服务器, 分成10片, 则每台服务器分到的分片是: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8].
RotateServerByNameJobShardingStrategy 根据作业名的哈希值对服务器列表进行轮转的分片策略.
后续查看原文。。。
【ES】分布式调度系统之 Elastic-Job-Lite相关推荐
- 单集群10万节点 走进腾讯云分布式调度系统VStation
云计算并非无中生有的概念,它将普通的单台PC计算能力通过分布式调度软件连接起来.其最核心的问题是如何把一百台.一千台.一万台机器高效地组织起来,灵活进行任务调度和管理,从而像使用单台机器一样方便地使用 ...
- 王者归来:分布式调度解决方案 ElasticJob 重启!
点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 你会误认为 ElasticJob 只是作业管控平台么?创 ...
- 国产最强分布式调度,它回来了
导读: 调度(Scheduling)在计算机领域是个庞大概念,CPU 调度.内存调度.进程调度等都可称之为调度.它是指在特定的时机分配合理的资源去处理预先确定的任务,用于在适当的时机触发一个包含业务逻 ...
- 承载每天10万级任务的数据调度系统的架构是如何设计的
消息:分布式工作流任务调度系统Apache DolphinScheduler开源地址:https://github.com/apache/incubator-dolphinscheduler , 欢迎 ...
- 专访腾讯云沙开波:从无到有,打造全球领先调度系统
\\ "以前每个周末都去跑跑步.打打球,现在回家就是接接小孩,随着年龄的增长,很多爱好都丢了." \\\\ 沙开波--腾讯云计算产品总监,从一名基础架构组的程序员,到如今腾讯云计算 ...
- 面向大数据与云计算调度挑战的阿里经济体核心调度系统
编者按 伏羲(Fuxi)是十年前最初创立飞天平台时的三大服务之一(分布式存储 Pangu,分布式计算 MaxCompute,分布式调度 Fuxi),当时的设计初衷是为了解决大规模分布式资源的调度问题( ...
- 面向大数据处理应用的广域存算协同调度系统
点击上方蓝字关注我们 面向大数据处理应用的广域存算协同调度系统 张晨浩1,2, 肖利民1,2, 秦广军3, 宋尧1,2, 蒋世轩1,2, 王继业4 1 软件开发环境国家重点实验室,北京 100191 ...
- 分布式任务调度系统xxl-job源码探究(一、客户端)
前面讲了xxl-job的搭建,现在来粗略的解析下该分布式调度系统的源码,先来客户点代码 客户端源码 客户端开启的时候会向服务中心进行注册,其实现用的是jetty连接,且每隔半分钟会发送一次心跳,来告诉 ...
- 赫拉(hera)分布式任务调度系统之项目启动(二)
文章目录 赫拉 创建表 打包部署 测试 TIPS 加入群聊 赫拉 大数据平台,随着业务发展,每天承载着成千上万的ETL任务调度,这些任务集中在hive,shell脚本调度.怎么样让大量的ETL任务准确 ...
最新文章
- C++11 constexpr使用
- python yield与递归
- Ubuntu 10.10配置JRE、JDK、Eclipse和Tomcat7.0.5
- mysql query结果集_如何解决PHP使用mysql_query查询超大结果集超内存问题
- 从近期两篇论文看大规模商品图嵌入
- 漫画:什么是分布式锁
- 速记 封装案例 银行取款功能 1210
- 关于如何获得网站集宿主网站集地址的问题
- centos ll 格式化时间_审理文书格式化的实践探索
- 海量数据挖掘MMDS week2: 频繁项集挖掘 Apriori算法的改进:基于hash的方法
- 拓端tecdat|R语言NLP案例:LDA主题文本挖掘优惠券推荐网站数据
- 2018华为网络技术大赛课程-服务器操作系统基础原理自测题答案
- adminlte java_AdminLTE Button小结
- 广联达只有土建打不开_广联达BIM钢筋算量为什么打开工程是灰屏?工程打不开怎么办?...
- en开头的单词_英语四级en-词汇前后缀解析
- clickhouse ARRAY JOIN函数
- mysql 未找到 WinSxS_window_win7系统如何使用WinSxS工具安全删除WinSxS文件夹垃圾?,WinSxS文件:
WinSxS是系统文件Wi - phpStudy...
- 射影几何----齐次坐标下的三点共线和非齐次坐标下的三点共线是等价的
- 计算机中的同步和异步
- 圣诞节,1inch狂撒3亿美金红包,币圈大佬在线炫富,我酸了...
热门文章
- 全网沸腾!鸿蒙手机要来了
- 小鹏汽车4月交付量5147台 同比增长285%
- SpaceX星舰飞船首次试飞成功着陆!但没想到还是爆炸了...
- 3月4日见!Redmi Note 10系列屏幕升级:首次采用Super AMOLED屏
- 《和平精英》玩跨界,特斯拉主题店超级充电站现身海岛
- 华为Mate 40系列或首发屏下摄像头:全球首个量产级别方案
- 特斯拉入驻天猫卖车了 将连做8天直播
- 安兔兔发布9月份Android性能榜:855霸榜,华为竟垫底
- 你以为环幕屏就结束了?真正的小米MIX 4或下月发布:1亿像素相机加持
- 腾讯Q2财报看点:游戏营收同比止跌 B端业务成第二大营收来源