• 说明
  • 功能列表
    • 任务分片
    • 多任务类型
    • 云原生
    • 容错性
    • 任务聚合
    • 易用性
  • 构建工具
  • 项目结构如下
  • 引入依赖
  • SimpleJob 简单作业
  • DataFlowJob 数据流作业
  • 测试以上两种作业
  • 运行结果
  • 创建elasticxml配置文件
  • 配置datasource
  • 创建applicationContextxml文件
  • 配置webxml
  • 运作结果

说明

Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。

Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务;Elastic-Job-Cloud采用自研Mesos Framework的解决方案,额外提供资源治理、应用分发以及进程隔离等功能。

功能列表

1. 任务分片

  • 将整体任务拆解为多个子任务
  • 可通过服务器的增减弹性伸缩任务处理能力
  • 分布式协调,任务服务器上下线的全自动发现与处理

2. 多任务类型

  • 基于时间驱动的任务
  • 基于数据驱动的任务(TBD)
  • 同时支持常驻任务和瞬时任务
  • 多语言任务支持

3. 云原生

  • 完美结合Mesos或Kubernetes等调度平台
  • 任务不依赖于IP、磁盘、数据等有状态组件
  • 合理的资源调度,基于Netflix的Fenzo进行资源分配

4. 容错性

  • 支持定时自我故障检测与自动修复
  • 分布式任务分片唯一性保证
  • 支持失效转移和错过任务重触发

5. 任务聚合

  • 相同任务聚合至相同的执行器统一处理
  • 节省系统资源与初始化开销
  • 动态调配追加资源至新分配的任务

6. 易用性

  • 完善的运维平台
  • 提供任务执行历史数据追踪能力
  • 注册中心数据一键dump用于备份与调试问题

相关概念可以访问官方网站进行了解:http://elasticjob.io/index_zh.html

接下来我们就开始实现一个小例子

构建工具

gradle

项目结构如下

引入依赖

在build.gradle文件中

//elastic-job[group: 'com.dangdang', name: 'elastic-job-lite-core', version: '2.1.5'],[group: 'com.dangdang', name: 'elastic-job-lite-spring', version: '2.1.5']

SimpleJob 简单作业

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;public class MyElasticSimpleJob implements SimpleJob{@Overridepublic void execute(ShardingContext context) {switch (context.getShardingItem()) {case 0: System.out.println("do something by sharding item 0"); break;case 1: System.out.println("do something by sharding item 1"); break;case 2: System.out.println("do something by sharding item 2"); break;// case n: ...}}}

DataFlowJob 数据流作业

import java.util.ArrayList;
import java.util.List;import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;public class MyElasticDataflowJob implements DataflowJob<String>{@Overridepublic List<String> fetchData(ShardingContext context) {switch (context.getShardingItem()) {case 0: // get data from database by sharding item 0List<String> data1 = new ArrayList<>();data1.add("get data from database by sharding item 0");return data1;case 1: // get data from database by sharding item 1List<String> data2 = new ArrayList<>();data2.add("get data from database by sharding item 1");return data2;case 2: // get data from database by sharding item 2List<String> data3 = new ArrayList<>();data3.add("get data from database by sharding item 2");return data3;// case n: ...}return null;}@Overridepublic void processData(ShardingContext shardingContext, List<String> data) {int count=0;// process data// ...for (String string : data) {count++;System.out.println(string);if (count>10) {return;}}}}

测试以上两种作业

import java.net.InetAddress;import java.net.UnknownHostException;import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.JobRootConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.config.script.ScriptJobConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.job.task.MyElasticDataflowJob;
import com.job.task.MyElasticSimpleJob;public class JobDemo {public static void main(String[] args) throws UnknownHostException {System.out.println("Start...");System.out.println(InetAddress.getLocalHost());new JobScheduler(createRegistryCenter(), createSimpleJobConfiguration()).init();new JobScheduler(createRegistryCenter(), createDataflowJobConfiguration()).init();}private static CoordinatorRegistryCenter createRegistryCenter() {CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("127.0.0.1:2181", "new-elastic-job-demo"));regCenter.init();return regCenter;}private static LiteJobConfiguration createSimpleJobConfiguration() {// 定义作业核心配置JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("SimpleJobDemo", "0/15 * * * * ?", 10).build();// 定义SIMPLE类型配置SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticSimpleJob.class.getCanonicalName());// 定义Lite作业根配置JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();return (LiteJobConfiguration) simpleJobRootConfig;}private static LiteJobConfiguration createDataflowJobConfiguration() {// 定义作业核心配置JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder("DataflowJob", "0/30 * * * * ?", 10).build();// 定义DATAFLOW类型配置DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, MyElasticDataflowJob.class.getCanonicalName(), true);// 定义Lite作业根配置JobRootConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).build();return (LiteJobConfiguration) dataflowJobRootConfig;}
}

运行结果


现在我们通过配置文件的方式来实现两种类型的作业

创建elastic.xml配置文件

将elastic-job通过配置文件进行参数设置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"xmlns:job="http://www.dangdang.com/schema/ddframe/job"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.dangdang.com/schema/ddframe/reg http://www.dangdang.com/schema/ddframe/reg/reg.xsd http://www.dangdang.com/schema/ddframe/job http://www.dangdang.com/schema/ddframe/job/job.xsd "><!-- 配置作业注册中心; baseSleepTimeMilliseconds:等待重试的间隔时间的初始值单位:毫秒 ; maxSleepTimeMilliseconds:等待重试的间隔时间的最大值单位:毫秒;maxRetries:最大重试次数 --><reg:zookeeper id="regCenter" server-lists="192.168.6.175:12181"namespace="elastic-job" base-sleep-time-milliseconds="1000"max-sleep-time-milliseconds="3000" max-retries="3" /><!-- 配置简单作业 --><job:simple id="JobSimpleJob" class="com.job.task.MyElasticSimpleJob"registry-center-ref="regCenter" cron="0/30 * * * * ?"sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /><!-- 配置数据流作业, job-parameter定义的为分页参数 sharding-total-count 作业分片总数sharding-item-parameters分片序列号和参数用等号分隔,多个键值对用逗号分隔 ,分片序列号从0开始,不可大于或等于作业分片总数job-parameter 作业自定义参数,可通过传递该参数为作业调度的业务方法传参,用于实现带参数的作业例:每次获取的数据量、作业实例从数据库读取的主键等job-sharding-strategy-class 作业分片策略实现类全路径 默认使用平均分配策略streaming-process 是否流式处理数据reconcile-interval-minutes 修复作业服务器不一致状态服务调度间隔时间,配置为小于1的任意值表示不执行修复event-trace-rdb-data-source 作业事件追踪的数据源Bean引用--><job:dataflow id="JobDataflow" class="com.job.task.MqElasticDataflowJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=a,1=b,2=c" job-sharding-strategy-class="com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy" job-parameter="100" streaming-process="true" reconcile-interval-minutes="10" overwrite="true" event-trace-rdb-data-source="dataSource"/> </beans>

配置datasource

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util"xmlns:tx="http://www.springframework.org/schema/tx"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd"><bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource"destroy-method="close"><property name="driverClass" value="com.mysql.jdbc.Driver" /><property name="jdbcUrl" value="jdbc:mysql://127.0.0.1:3306/for_test?useUnicode=yes&amp;characterEncoding=UTF-8" /><property name="user" value="admin" /><property name="password" value="super" /><property name="minPoolSize" value="3" /><property name="maxPoolSize" value="20" /><property name="acquireIncrement" value="1" /><property name="testConnectionOnCheckin" value="true" /><property name="maxIdleTimeExcessConnections" value="240" /><property name="idleConnectionTestPeriod" value="300" /></bean></beans>

创建applicationContext.xml文件

将elastic-job与Spring整合

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context" xmlns:task="http://www.springframework.org/schema/task"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.2.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd "><task:scheduler id="taskScheduler" pool-size="10" /><task:executor id="taskExecutor" /><task:annotation-driven executor="taskExecutor" scheduler="taskScheduler" /><import resource="elastic.xml" /><import resource="mysql.xml"/>
</beans>

配置web.xml

<?xml version="1.0" encoding="UTF-8"?>
<web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"><display-name>elastic-job</display-name><!-- 用来设定web应用的环境参数(context) --><context-param><param-name>contextConfigLocation</param-name><param-value>classpath:applicationContext.xml</param-value></context-param><!-- listener元素用来定义Listener接口,对事件监听程序 --><listener><listener-class>org.springframework.web.context.ContextLoaderListener</listener-class></listener></web-app>

运作结果

代码下载:https://coding.net/u/liaiyomia/p/elasticJobDemo/git

elastic-job入门实例相关推荐

  1. Elastic search入门到集群实战操作详解(原生API操作、springboot整合操作)-step1

    Elastic search入门到集群实战操作详解(原生API操作.springboot整合操作)-step2 https://blog.csdn.net/qq_45441466/article/de ...

  2. Asp.Net MVC2.0 Url 路由入门---实例篇

    本篇主要讲述Routing组件的作用,以及举几个实例来学习Asp.Net MVC2.0 Url路由技术. 接着上一篇开始讲,我们在Global.asax中注册一条路由后,我们的请求是怎么转到相应的Vi ...

  3. SpringMVC 框架系列之初识与入门实例

    微信公众号:compassblog 欢迎关注.转发,互相学习,共同进步! 有任何问题,请后台留言联系! 1.SpringMVC 概述 (1).什么是 MVC:Model-View-Control Co ...

  4. 《HFSS电磁仿真设计从入门到精通》一第2章 入门实例——T形波导的内场分析和优化设计...

    本节书摘来自异步社区<HFSS电磁仿真设计从入门到精通>一书中的第2章,作者 易迪拓培训 , 李明洋 , 刘敏,更多章节内容可以访问云栖社区"异步社区"公众号查看 第2 ...

  5. python爬虫实例-10个python爬虫入门实例

    昨天带伙伴萌学习python爬虫,准备了几个简单的入门实例 涉及主要知识点: web是如何交互的 requests库的get.post函数的应用 response对象的相关函数,属性 python文件 ...

  6. python爬虫程序实例-10个python爬虫入门实例

    作者:h3zh1 来源:cnblogs.com/h3zh1/p/12548946.html 今天为大家准备了几个简单的python爬虫入门实例,分享给大家. 涉及主要知识点:web是如何交互的 req ...

  7. ADO.NET Entity Data Model入门实例

    ADO.NET Entity Data Model入门实例 快速开发一个网站,最繁琐的地方可能要算数据库访问和ORM了,还好.net中这个很强大,可以省去好多体力劳动. 这里就给个快速使用ADO.NE ...

  8. Java Socket入门实例

    基于测试驱动的Socket入门实例(代码的具体功能可以看我的程序中的注释,不理解的可以短信我) 先看Server的代码: package socketStudy; import java.io.Buf ...

  9. linux Shell(脚本)编程入门实例讲解详解

    linux Shell(脚本)编程入门实例讲解详解 为什么要进行shell编程 在Linux系统中,虽然有各种各样的图形化接口工具,但是sell仍然是一个非常灵活的工具.Shell不仅仅是命令的收集, ...

  10. Activiti工作流从入门到入土:入门实例

    一.前言 在上一节中我们对activiti进行了基本的介绍activiti进行了基本的介绍,同时介绍了基本的概念. 这一节,我将用一个入门程序,介绍如何使用activiti. 二.环境准备 2.1.编 ...

最新文章

  1. 强烈推荐7个让人惊艳的宝藏实用网站,太好用了
  2. CoFun 1612 单词分组(容斥)
  3. python csv 模块的使用
  4. QT QNetworkInterface::allAddresses();获取了很多无效的地址_Qt编写地图综合应用16-省市轮廓图下载...
  5. java程序中执行maven_java – 将一个enviornment变量传递给Maven中的已执行进程
  6. 转(ASP.NET页面缓存)
  7. mysql与oracle存储过程_MySQL与Oracle差异比较之五存储过程Function
  8. 远程桌面连接--“发生身份验证错误。要求的函数不受支持
  9. Eclipse启动运行速度调优
  10. java中连接字符串_Java中几种方式连接字符串的方法
  11. 爬虫选用PPTP协议代理ip的必要性
  12. phpexcel导出excel无法打开,提示文件格式或文件名无效,文件损毁,解决办法
  13. 一堂难忘的计算机课作文,难忘的信息课作文
  14. 利用Python批量把flv文件转换成mp4文件
  15. 一份来自28岁老程序员的自白
  16. Android辅助功能之自动安装apk
  17. WebBrowser控件的多页面浏览(Tabbed Browsing)开发接口
  18. keep-alive 的作用及使用场景
  19. Js判断是否为数字,是否为空,是否为整形,是否为浮点型,是否为对象
  20. .NET框架和发展历史介绍

热门文章

  1. mac m1 无法连接公司内网问题
  2. mybatis 连带操作(注解方式)(两张表关联,一张表插入一条新数据,另外一张表也跟着插入一条新数据)
  3. 中国Azure新数据中心(区域)正式商用
  4. Pspice模型的使用
  5. 【PyTorch】3 AI诗人RNN实战(LSTM)——完成诗歌剩余部分、生成藏头诗
  6. 2021英伟达暑期实习面经(芯片设计)
  7. linux 中输入bash,Linux上Bash Shell编程
  8. 谷歌自研芯片Tensor重磅来袭:“机器学习的里程碑”
  9. 机器学习 --- 朴素贝叶斯分类器 python
  10. fluent并行 linux_Fluent17.2在基于Linux下PC集群的并行计算.pdf