ElasticJob

ElasticJob 简介

当当网开源的分布式定时任务框架,后被apache基金会支持;

官方文档:https://shardingsphere.apache.org/elasticjob/current/cn/overview/

问:ElasticJob是什么?

答:定时任务框架;

优势:

  • 支持分布式部署;不同节点上执行的是不一样的任务(代码是同一套);对于一个大任务,可以用分片策略,让他在多节点上执行;

  • 能够保证高可用;

  • 利用zk实现分布式环境管理;

  • 水平扩展(核心)

例如:
定义了10个分片(对应的片名是0-9),假设我们的定时任务是每1分钟执行一次,定时方法是execute。
当我们只有一台服务器的时候,那么每1分钟会调用十次execute(每次调用的时候分片名(0-9)都不一样)。
当我们有两台服务器的时候,那么每1分钟A、B服务器各自调用五次execute(每次调用的时候分片名(A 0-4,B 5-9)
当有三台服务器的时候A(3个),B(3个),C(4个),这样水平扩展就很容易了。

快速入门(Demo)

想实际操作,可以copy下面代码练手

1.建库建表

-- 建库
CREATE DATABASE `elastic_job_demo` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';-- 建表
DROP TABLE IF EXISTS `t_file`;
CREATE TABLE `t_file`  (`id` varchar(11) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`type` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`content` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`backedUp` tinyint(1) NULL DEFAULT NULL,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

2.pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.5.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example.elasticjob</groupId><artifactId>elasticjobdemo</artifactId><version>0.0.1-SNAPSHOT</version><name>elasticjobdemo</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--elasticJob--><dependency><groupId>org.apache.shardingsphere.elasticjob</groupId><artifactId>elasticjob-lite-core</artifactId><version>3.0.0-RC1</version></dependency><!--springboot整合elasticJob(官方文档中没有)--><dependency><groupId>org.apache.shardingsphere.elasticjob</groupId><artifactId>elasticjob-lite-spring-boot-starter</artifactId><version>3.0.0-RC1</version></dependency><!--elasticJob- 对接邮件、企业微信、钉钉--><dependency><groupId>org.apache.shardingsphere.elasticjob</groupId><artifactId>elasticjob-error-handler-email</artifactId><version>3.0.0-RC1</version></dependency><!--mysql--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.15</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.1.5.RELEASE</version><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

3.核心配置+任务调度

步骤:

1.配置zookeeper调动中心

2.配置ElasticJob核心配置

3.调度定时任务

  • 1.zookeeper配置类(创建zk客户端,且调用init方法)
package com.example.elasticjob.quickStart.config;import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ZookepeerConfig {//zookeeper链接字符串 localhost:2181private  String ZOOKEEPER_CONNECTION_STRING = "localhost:2181" ;//定时任务命名空间private  String JOB_NAMESPACE = "elastic-job-boot-java";//zk的配置及创建注册中心@Bean(initMethod = "init")public CoordinatorRegistryCenter setUpRegistryCenter(){//zk的配置ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, "elastic-job-boot-java");//设置zk超时时间zookeeperConfiguration.setSessionTimeoutMilliseconds(1000);//创建注册中心CoordinatorRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);return zookeeperRegistryCenter;}
}
  • 2.配置esjob+任务调度
package com.example.elasticjob.quickStart.config;
import com.example.elasticjob.quickStart.job.FileBackupJobDb;
import com.example.elasticjob.quickStart.service.FileService;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** elasticJob*/
@Configuration
public class ElasticJobConfig {@AutowiredCoordinatorRegistryCenter registryCenter;@AutowiredFileService fileService;@Beanpublic JobConfiguration createJobConfiguration() {// 定义作业核心配置JobConfiguration jobConfig = JobConfiguration.newBuilder("myJob", 3).cron("0/5 * * * * ?").shardingItemParameters("0=text,1=image,2=radio").failover(true).overwrite(true).monitorExecution(true).misfire(true).build();//启动分布式定时任务new ScheduleJobBootstrap(registryCenter, new FileBackupJobDb(fileService), jobConfig).schedule();return jobConfig;}
}

4.其他

  • FileService类
package com.example.elasticjob.quickStart.service;import com.example.elasticjob.quickStart.pojo.FileCustom;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;import java.util.List;@Component
public class FileService {@AutowiredJdbcTemplate jdbcTemplate;/*** 获取某文件类型未备份的文件* @param fileType 文件类型* @param count 获取条数* @return*/public List<FileCustom> fetchUnBackupFiles(String fileType, Integer count){String sql="select * from t_file where type = ? and backedUp = 0 limit 0,?";List<FileCustom> files = jdbcTemplate.query(sql, new Object[]{fileType, count}, new BeanPropertyRowMapper(FileCustom.class));return files;}/*** 备份文件* @param files 要备份的文件*/public void backupFiles(List<FileCustom> files){for(FileCustom file:files){String sql="update t_file set backedUp = 1 where id = ?";jdbcTemplate.update(sql,new Object[]{file.getId()});System.out.println(String.format("线程 %d | 已备份文件:%s  文件类型:%s",Thread.currentThread().getId(),file.getName(),file.getType()));}}
}
  • FileCustom实体
package com.example.elasticjob.quickStart.pojo;import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
public class FileCustom {/*** 标识*/private String id;/*** 文件名*/private String name;/*** 文件类型,如text、image、radio、vedio*/private String type;/*** 文件内容*/private String content;/*** 是否已备份*/private Boolean backedUp = false;public FileCustom(String id, String name, String type, String content){this.id = id;this.name = name;this.type = type;this.content = content;}
}
  • 数据源
package com.example.elasticjob.quickStart.config;import org.apache.commons.dbcp.BasicDataSource;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.sql.DataSource;@Configuration
public class DataSourceConfig {@Value("${spring.datasource.driver}")private String driverClassName;@Value("${spring.datasource.url}")private String url;@Value("${spring.datasource.username}")private String username;@Value("${spring.datasource.password}")private String password;@Bean(name = "myDataSource")public DataSource getMyDataSource() {BasicDataSource result = new BasicDataSource();result.setDriverClassName(driverClassName);result.setUrl(url);result.setUsername(username);result.setPassword(password);return result;}
}
  • yml
server:port: ${port:8081}esjob:zkServerlists: localhost:2181zkNamespace: es-job-cupidstartedTimeoutMilliseconds: 500completedTimeoutMilliseconds: 500spring:datasource:url: jdbc:mysql://localhost:3306/elastic_job_demo?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=trueusername: rootpassword: 1234driver: com.mysql.jdbc.Driver

基于yml的方式配置ElasticJob

如果修改yml文件不生效,可有如下两个办法

无需进行 JobConfiguration配置 + zookeeper配置

  • 改一下zookeeper的名命空间
  • 配置文件job下面添加 overwrite: true

1.导入pom文件

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><version>2.2.0.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.2.0.RELEASE</version>
</dependency>
<dependency><groupId>org.apache.shardingsphere.elasticjob</groupId><artifactId>elasticjob-lite-spring-boot-starter</artifactId><version>3.0.0-RC1</version>
</dependency>

这个starter里面有数据库相关的连接,我们只是简单测试,不想配置数据源的话,改一下启动类注解即可

@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })

2.编写作业

  • 普通作业
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.stereotype.Component;@Component
public class MyElasticJob implements SimpleJob {@Overridepublic void execute(ShardingContext context) {System.out.println(context.getShardingTotalCount() + "  " + context.getShardingItem());switch (context.getShardingItem()) {case 0:// do something by sharding item 0break;case 1:// do something by sharding item 1break;case 2:// do something by sharding item 2break;// case n: ...}}
}
  • 数据流作业
import com.xdx97.elasticjob.bean.XdxBean;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;@Component
public class MyDataflowJob implements DataflowJob<XdxBean> {@Overridepublic List<XdxBean> fetchData(ShardingContext shardingContext) {List<XdxBean> foos = new ArrayList<>();double random = Math.random();System.out.println("fetchData------ " + random);if (random > 0.5){XdxBean foo = new XdxBean();foo.setName("aaa");foos.add(foo);}return foos;}@Overridepublic void processData(ShardingContext shardingContext, List<XdxBean> list) {for (XdxBean xdxBean : list) {System.out.println("processData方法开始处理! "+ xdxBean.getNum() + "  "+ "当前分片:" + shardingContext.getShardingParameter() + "   "+ "当前分片项:"  + shardingContext.getShardingItem());}}
}
@Data
@NoArgsConstructor
public class XdxBean {private String name;}

注:Math.random() 产生的数据在0-1之间。

从上面运行的结果,我们可以得出结论,所谓的数据流作业其实也是一个定时任务,只不过当这个定时任务产生数据的时候,就会携带数据去调用processData()方法

3.yml配置文件

server:port: 8085elasticjob:regCenter:#zookeeper 的ip:portserverLists: 127.0.0.1:2181#名命空间,自己定义就好了namespace: my-job4jobs:#你的这个定时任务名称,自定义名称myElasticJob:#定时任务的全路径名elasticJobClass: com.elastic.job.MyElasticJob#定时任务执行的cron表达式cron: 0/5 * * * * ?#分片数量shardingTotalCount: 10

作业配置yml参数

配置前缀:elasticjob.jobs

可配置属性:

属性名 是否必填
elasticJobClass / elasticJobType
cron
timeZone
jobBootstrapBeanName
sharding-total-count
sharding-item-parameters
job-parameter
monitor-execution
failover
misfire
max-time-diff-seconds
reconcile-interval-minutes
job-sharding-strategy-type
job-executor-service-handler-type
job-error-handler-type
job-listener-types
description
props
disabled
overwrite

注册中心yml配置

配置前缀:elasticjob.reg-center

可配置属性:

属性名 是否必填
server-lists
namespace
base-sleep-time-milliseconds
max-sleep-time-milliseconds
max-retries
session-timeout-milliseconds
connection-timeout-milliseconds
digest

ElasticJob-Lite-UI

  1. 下载ElasticJob-Lite-UI

链接:https://shardingsphere.apache.org/elasticjob/current/cn/downloads/

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zpafnxpE-1640855259419)(https://gitee.com/li_hewei/img/raw/master/images/1640855010(1)].jpg)

  1. 解压后在bin目录启动 start.bat

  2. 启动后游览器访问(默认端口是8088):http://127.0.0.1:8088/#/login 用户名/密码 root/root

  3. 登录成功后,链接上注册中心,链接成功后便可以进行任务的管理

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-r60QcxL8-1640855259420)(https://gitee.com/li_hewei/img/raw/master/images/1640855122(1)].jpg)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-URQAijSD-1640855259420)(https://gitee.com/li_hewei/img/raw/master/images/1640855142(1)].jpg)

链接成功后即可在UI中修改定时任务相关配置。

ElasticJob 3.0 带你快速入门相关推荐

  1. 0基础如何入门软件测试?10分钟从测试9个阶段带你快速入门【建议细品】

    0基础如何快速入门软件测试,本文从测试九个阶段,附带9张各阶段测试路线图带你走进软件测试,废话不多说直接上干货! 一.软件测试视频第一阶段-前置基础知识 1.学习目标: 可掌握的核心能力: 1. 熟悉 ...

  2. 【效率】超详细!手把手带你快速入门 GitHub!

    作者:Peter     编辑:JackTian 来源:公众号「杰哥的IT之旅」 快速入门GitHub GitHub在程序开发领域家喻户晓,现在几乎整个互联网的开发者都将版本管理工具GitHub作为版 ...

  3. 带你快速入门AXI4总线--AXI4-Stream篇(1)----AXI4-Stream总线

    写在前面 随着对XILINX器件使用的深入,发现越来越多的IP都选配了AXI4的接口.这使得只要学会了AXI4总线的使用,基本上就能对XILINX IP的使用做到简单的上手.所以学会AXI4总线,对X ...

  4. 手把手带你快速入门超越GAN的Normalizing Flow

    手把手带你快速入门超越GAN的Normalizing Flow 作者:Aryansh Omray,微软数据科学工程师,Medium技术博主 机器学习领域的一个基本问题就是如何学习复杂数据的表征是机器学 ...

  5. 四篇文章带你快速入门Jetpck(中)之ViewModel,DataBinding

    文章目录 四篇文章带你快速入门Jetpck(中)之ViewModel,DataBinding Jetpack 官方推荐架构 ViewModel 添加依赖 创建ViewModel 初始化ViewMode ...

  6. 带你快速入门AXI4总线--AXI4-Full篇(3)----XILINX AXI4-Full接口IP源码仿真分析(Master接口)

    写在前面 接slave接口篇,本文继续打包一个AXI4-Full-Master接口的IP,学习下源码,再仿真看看波形. 带你快速入门AXI4总线--AXI4-Full篇(2)----XILINX AX ...

  7. 带你快速入门AXI4总线--AXI4-Full篇(1)----AXI4-Full总线

    写在前面 AXI4系列链接:带你快速入门AXI4总线--汇总篇(直达链接) 1.什么是AXI4-Full? AXI 表示 Advanced eXtensible Interface(高级可扩展接口), ...

  8. 全网最详细中英文ChatGPT-GPT-4示例文档-从0到1快速入门AI智能问答应用场景——官网推荐的48种最佳应用场景(附python/node.js/curl命令源代码,小白也能学)

    从0到1快速入门AI智能问答应用场景 Introduce 简介 setting 设置 Prompt 提示 Sample response 回复样本 API request 接口请求 python接口请 ...

  9. 一文带你快速入门【哈希表】

    最近开始学习哈希表,为此特写一遍文章介绍一下哈希表,带大家快速入门哈希表

  10. tensorflow2.0教程- Keras 快速入门

    tensorflow2.0教程-tensorflow.keras 快速入门 Tensorflow 2.0 教程持续更新: https://blog.csdn.net/qq_31456593/artic ...

最新文章

  1. 【大神公开课】旷视研究院院长-孙剑博士:视觉计算前沿进展
  2. PL/SQL 游标
  3. python3菜鸟-菜鸟笔记Python3——数据可视化(一)
  4. Apache+tomcat+mod_jk+centos6.2负载均衡集群配置--转载
  5. php的反射技术,PHP 反射使用
  6. android retrofit入门,Android开发 retrofit入门讲解
  7. 虚拟目录继承根Web.Config的问题解决(转)
  8. 把室友的STM32换成了GD32,会被打吗?
  9. ACL20 | 让笨重的BERT问答匹配模型变快!
  10. poj 2031 BuildingaSpaceStation 最小生成树 Prim、Kruskal
  11. IIS------项目配置到IIS后报500错误
  12. indesign教程,了解图层
  13. Atitit 建立新组织集团模型的框架基本制度与一些原则
  14. 计算机桌面动态壁纸,动态桌面壁纸,详细教您电脑动态桌面壁纸怎么设置
  15. NDS程序开发可行性分析报告
  16. 什么是电影级调色监视器?
  17. LED灯光照明控制协议(系统)- DALI
  18. 2016——大数据版图
  19. 3288 配置声卡芯片
  20. 进程组、session、前台任务、后台任务、守护进程

热门文章

  1. 了解CV和RoboMaster视觉组(五)滤波器、观测器和预测方法:维纳滤波器Wiener Filter,LMS
  2. 将IDM添加到谷歌浏览器
  3. 国际电话号码标准格式
  4. pip或者python安装jpype总是报错----Boilerpipe使用
  5. 【ENVI】FLAASH大气校正工具中比例因子说明
  6. 【Verilog基础】Verilog语法之force和release
  7. Java获取压缩包内文件数_java获取递归获取嵌套压缩包(zip和rar)中的所有文件
  8. 没了美国EDA软件,我们就不能做芯片?
  9. nc语法和nc木马远程控制主机
  10. 应用于手机触摸屏中的电容式触摸芯片