目录

一、定时任务的基础实现

1. 利用Thread及Sleep实现,通过while循环让其不停运行

2.使用jdk的Timer和TimerTask

3.ScheduledExecutorService

4. Quartz实现

附:Cron表达式

5. Spring Task实现

6. 分布式定时任务Elastic-Job

1.概述

2.调度模型

3.功能

4.适用场景

5.分片策略

6. ElasticJob 原理

7. 失效转移


其次,定时任务大体分两种:指定间隔时间执行 和 指定某个时间执行

实现定时任务的途径有很多,比如你甚至可以自己实现简单的定时任务

一、定时任务的基础实现

1. 利用Thread及Sleep实现,通过while循环让其不停运行

public class ThreadTaskDemo {public static void main(String[] args) {Runnable runable=new Runnable() {@Overridepublic void run() {System.out.println("子线程执行任务,当前时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));}};try {System.out.println("主线程启动子线程时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));scheduleThread(5L,3,runable);} catch (InterruptedException e) {e.printStackTrace();}}/*** @param duration 指定什么时间后运行 单位:秒* @param timeInterval 每次运行间隔时间 单位:秒* @param runnable 待运行的Runable对象* @throws InterruptedException*/static void scheduleThread(Long duration,Integer timeInterval,Runnable runnable) throws InterruptedException{/*阻塞等待*/TimeUnit.SECONDS.sleep(duration);final Runnable interiorRun=runnable;final Integer interiorTimeInterval=timeInterval;/*运行*/new Thread(new Runnable() {@Overridepublic void run() {while(true){/*执行方法*/interiorRun.run();try {/*任务执行间隔*/TimeUnit.SECONDS.sleep(interiorTimeInterval);} catch (InterruptedException e) {e.printStackTrace();}}}}).start();}
}

2.使用jdk的Timer和TimerTask

使用jdk的Timer和TimerTask,可以实现简单的间隔执行任务,无法实现按日历去调度执行任务

public class TimerTaskDemo {/*** 其中Timer和TimerTask的区别和联系:* * Timer是调度者,可以安排任务执行计划。* * TimerTask是任务。Timer类可以调度TimerTask任务,TimerTask则通过在run()方法里实现具体任务。TimerTask也可停止自身任务。* * 一个Timer可以调度多个TimerTask。* * Timer是单线程的:Timer构造函数调用时会创建了一个新线程,所有TimerTask都是依靠这个新的线程执行。* @param args*/public static void main(String[] args) {TimerTask timerTask = new TimerTask() {@Overridepublic void run() {System.out.println("当前时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));}};Timer timer = new Timer();timer.schedule(timerTask,10,1000);}}

3.ScheduledExecutorService

ScheduledExecutorService是并发工具包中的类,是对比前面最理想的定时任务实现方式。

public class ScheduledDemo {public static void main(String[] args) {Runnable runnable1 = new Runnable() {@Overridepublic void run() {System.out.println("runnable1当前时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));}};Runnable runnable2 = new Runnable() {@Overridepublic void run() {System.out.println("runnable2当前时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));}};//方式一 定义4个线程ScheduledExecutorService service = Executors.newScheduledThreadPool(4);ScheduledFuture<?> scheduledFuture= service.scheduleAtFixedRate(runnable1, 0,2, TimeUnit.SECONDS);//方式二ScheduledExecutorService service2 = Executors.newSingleThreadScheduledExecutor();service2.scheduleAtFixedRate(runnable2, 1, 2, TimeUnit.SECONDS);}}

4. Quartz实现

        <dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.3.2</version></dependency><dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz-jobs</artifactId><version>2.3.2</version></dependency>
public class QuartzDemo {public static void main(String[] args) throws SchedulerException {//创建Scheduler的工厂SchedulerFactory sf = new StdSchedulerFactory();//从工厂中获取调度器实例Scheduler scheduler = sf.getScheduler();/*** 创建JobDetail* withDescription:job的描述* withIdentity:job的name和group*/JobDetail jb = JobBuilder.newJob(QuartzSchedueJob.class).withDescription("this is a job").withIdentity("myJob", "myJobGroup").build();//任务运行的时间,5秒后启动任务long time = System.currentTimeMillis() + 5 * 1000L;Date statTime = new Date(time);//创建Trigger,使用SimpleScheduleBuilder或者CronScheduleBuilderTrigger t = TriggerBuilder.newTrigger().withDescription("this is a trigger").withIdentity("myTrigger", "myTriggerGroup")//.withSchedule(SimpleScheduleBuilder.simpleSchedule())//设置启动时间.startAt(statTime)//每隔3秒执行一次.withSchedule(CronScheduleBuilder.cronSchedule("0/3 * * * * ? *")).build();//注册任务和定时器scheduler.scheduleJob(jb, t);//启动 调度器scheduler.start();}}

附:Cron表达式

在上面的demo中出现了"0/3 * * * * ? *",这是cron表达式,表示定时任务执行的时间规则

        cron 表达式是一个字符串,该字符串由 6 个空格分为 7 个域,每一个域代表一个时间含义。 格式: [秒] [分] [时] [日] [月] [周] [年],其中通常定义 “年” 的部分可以省略,实际常用的由前六部分组成。

关于 cron 的各个域的定义如下表格所示:

是否必填 值以及范围 通配符
0-59 , - * /
0-59 , - * /
0-23 , - * /
1-31 , - * ? / L W
1-12 或 JAN-DEC , - * /
1-7 或 SUN-SAT , - * ? / L #
1970-2099 , - * /

这块不作过多描述,有兴趣的可以自行了解,也可以通过在线工具转换:quartz/Cron/Crontab表达式在线生成工具-BeJSON.com

5. Spring Task实现

1)SpringBoot:在Spring boot启动类上添加注解:@EnableScheduling
2)Spring:添加命名空间:xmlns:task="http://www.springframework.org/schema/task"
                   添加约束:http://www.springframework.org/schema/task
                                     http://www.springframework.org/schema/task/spring‐task.xsd

开启任务调度:<task:annotation‐driven></task:annotation‐driven>

定时任务串行执行:

@Component
public class SpringTaskTest {private static final Logger LOGGER = LoggerFactory.getLogger(SpringTaskTest.class);/*** 每隔2秒执行一次*/@Scheduled(cron = "0/2 * * * * *")public void task1() {LOGGER.info("--------------------task1开始--------------------");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}LOGGER.info("--------------------task1结束--------------------");}
}

定时任务并行执行:

@Configuration
//启动类或者此处配置@EnableScheduling
public class TaskConfig implements SchedulingConfigurer, AsyncConfigurer {/*** 线程池线程数量*/private int poolSize = 5;@Beanpublic ThreadPoolTaskScheduler taskScheduler() {//创建定时任务线程池ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();///初始化线程池scheduler.initialize();//线程池容量scheduler.setPoolSize(poolSize);return scheduler;}@Overridepublic Executor getAsyncExecutor() {Executor executor = taskScheduler();return executor;}@Overridepublic AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return null;}@Overridepublic void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {scheduledTaskRegistrar.setTaskScheduler(taskScheduler());}
}

6. 分布式定时任务Elastic-Job

1.概述

上文中提到的多种定时任务的实现,而本篇的重点在于站在“巨人”肩膀上的ElasticJob分布式调度框架,巨人是指“Quartz”和“Zookeeper”,Elastic-Job最开始只有一个 elastic-job-core 的项目,在 2.X 版本以后主要分为Elastic-Job-Lite 和 Elastic-Job-Cloud 两个子项目。其中,Elastic-Job-Lite 定位为轻量级无中心化解决方案 , 使 用 jar包的形式提供分布式任务的协调服务。

应用在各自的节点执行任务,通过 ZK 注册中心协调。节点注册、节点选举、任务分片、监听都在 E-Job 的代码中完成,以下是官网架构图:

2.调度模型

ElasticJob Lite是线程级别调度的进程内调度。

1.方便与Spring、Dubbo等Java框架配合使用,自由使用Spring注入Bean;

2.与业务应用部署在一起,生命周期与业务应用保持一致,是典型的嵌入式轻量级架构;

3.适用于资源使用稳定、部署架构简单的普通Java应用;

4.分布式下的每个任务节点均是以自调度的方式适时的调度作业,任务之间只需要一个注册中心(注册中心目前支持Zookeeper和ETCD两种)对分布式场景下任务状态进行协调即可;

5.分布式作业节点通过选举的方式获取主节点,主节点进行分片,完毕后主节点和其他节点并无不同,都以自我调度的反射光hi执行任务

  ElasticJob Cloud调度方式是可以是进程内调度,作业类型属于:常驻任务,也可以是进程级别调度,作业类型属于:瞬时任务。

在ElasticJob Lite全部能力的基础上,还拥有资源分配和任务分发的能力,将作业的开发、打包、分发、调度、治理、分片等一系列生命周期完全托管,是真正的作业云调度系统。

3.功能

1. 弹性调度:让任务通过分片进行水平扩展的任务处理,每台服务器只运行分配给该服务器的分片;

2. 资源分配:由Mesos实现,Mesos负责分配任务声明所需要的资源(内存和CPU),并将分配出去的资源进行隔离

3. 作业治理:分布式场景下高可用、失效转移、错过作业重新执行等行为的治理协调

4. 可视化管理:包含作业增删改查管控端、执行历史记录查询、配置中心管理等

4.适用场景

1.复杂任务,如数据迁移,弹性分片能力大大减少海量数据迁移的时间

2.资源导向任务,占用大量计算资源的报表作业适合采用瞬时作业实现

3.订单拉取之类的,就是我们系统中最常用的那些场景

5.分片策略

  1. 分片项与分片参数

  任务分片,是为了实现把一个任务拆分成多个子任务,在不同的 ejob 示例上执行。例如 100W 条数据,在配置文件中指定分成 10 个子任务(分片项),这 10 个子任务再按照一定的规则分配到 5 个实际运行的服务器上执行。除了直接用分片项 ShardingItem获取分片任务之外,还可以用 item 对应的 parameter 获取任务。

定义几个分片项,一个任务就会有几个线程去运行它。

  注意:分片个数和分片参数要一一对应。通常把分片项设置得比 E-Job 服务器个数大一些,比如 3 台服务器,分成 9 片,这样如果有服务器宕机,分片还可以相对均匀。

 2. 设置分片策略

// 作业分片策略
// 基于平均分配算法的分片策略
String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName();
// 定义Lite作业根配置
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).jobShardingStrategyClass(jobShardingStrategyClass).build();

  3.分片方案

  • 对业务主键进行取模,获取余数等于分片项的数据,举例:获取到的 sharding item 是 0,1在 SQL 中加入过滤条件:where mod(id, 4) in (1, 2)。这种方式的缺点:会导致索引失效,查询数据时会全表扫描。解决方案:在查询条件中在增加一个索引条件进行过滤。
  • 在表中增加一个字段,根据分片数生成一个 mod 值。取模的基数要大于机器数。否则在增加机器后,会导致机器空闲。例如取模基数是 2,而服务器有 5 台,那么有三台服务器永远空闲。而取模基数是 10,生成 10 个 shardingItem,可以分配到 5 台服务器。当然,取模基数也可以调整。
  • 如果从业务层面,可以用 ShardingParamter 进行分片。例如 0=RDP, 1=CORE, 2=SIMS, 3=ECIF,List<users> = SELECT * FROM user WHERE status = 0 AND SYSTEM_ID ='RDP' limit 0, 100。

  在 Spring Boot 中要 Elastic-Job 要配置的内容太多了,有没有更简单的添加任务的方法呢?比如在类上添加一个注解?这个时候我们就要用到 starter 了。

6. ElasticJob 原理

1. new JobScheduler(regCenter, simpleJobRootConfig, jobEventConfig).init(); 进入启动流程

/**
* 初始化作业.
*/
public void init() {LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);// 设置分片数JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());// 构建任务,创建调度器JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());// 在 ZK 上注册任务JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);// 添加任务信息并进行节点选举schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());// 启动调度器jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}

2.registerStartUpInfo 方法

/**
* 注册作业启动信息.
*
* @param enabled 作业是否启用
*/
public void registerStartUpInfo(final boolean enabled) {// 启动所有的监听器、监听器用于监听 ZK 节点信息的变化。listenerManager.startAllListeners();// 节点选举leaderService.electLeader();// 服务信息持久化(写到 ZK)serverService.persistOnline(enabled);// 实例信息持久化(写到 ZK)instanceService.persistOnline();// 重新分片shardingService.setReshardingFlag();// 监控信息监听器monitorService.listen();// 自诊断修复,使本地节点与 ZK 数据一致if (!reconcileService.isRunning()) {reconcileService.startAsync();}
}

3. 启动的时候进行主节点选举

/**
* 选举主节点.
*/
public void electLeader() {log.debug("Elect a new leader now.");jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());log.debug("Leader election completed.");
}

Latch 是一个分布式锁,选举成功后在 instance 写入服务器信息

4. 启动调度任务则是

/**
* 调度作业.
*
* @param cron CRON表达式
*/
public void scheduleJob(final String cron) {try {if (!scheduler.checkExists(jobDetail.getKey())) {scheduler.scheduleJob(jobDetail, createTrigger(cron));}    //调用 Quartz 一样的类进行启动scheduler.start();} catch (final SchedulerException ex) {throw new JobSystemException(ex);}
}

7. 失效转移

  失效转移,就是在执行任务的过程中发生异常时,这个分片任务可以在其他节点再次执行。

  FailoverListenerManager 监听的是 zk 的 instance 节点删除事件。如果任务配置了 failover 等于 true,其中某个 instance 与 zk 失去联系或被删除,并且失效的节点又不是本身,就会触发失效转移逻辑。Job 的失效转移监听来源于 FailoverListenerManager 中内部类JobCrashedJobListener 的 dataChanged 方法。当节点任务失效时会调用 JobCrashedJobListener 监听器,此监听器会根据实例 id获取所有的分片,然后调用 FailoverService 的 setCrashedFailoverFlag 方法,将每个分片 id 写到/jobName/leader/failover/items 下,例如原来的实例负责 1、2 分片项,那么 items 节点就会写入 1、2,代表这两个分片项需要失效转移。

class JobCrashedJobListener extends AbstractJobListener {@Overrideprotected void dataChanged(final String path, final Type eventType, final String data) {if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {return;}          // 获取到失效的分片集合List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);if (!failoverItems.isEmpty()) {for (int each : failoverItems) {               // 设置失效的分片项标记failoverService.setCrashedFailoverFlag(each);failoverService.failoverIfNecessary();}} else {for (int each : shardingService.getShardingItems(jobInstanceId)) {failoverService.setCrashedFailoverFlag(each);failoverService.failoverIfNecessary();}}}}}

Elastic-Job | 由浅入深一篇理解分布式定时任务的基本用法及简单原理解析相关推荐

  1. 分布式定时任务—xxl-job学习(三)——调度中心(xxl-job-admin)的启动和任务调度过程源码分析

    分布式定时任务-xxl-job学习(三)--调度中心(xxl-job-admin)的启动和任务调度过程源码分析 RabbitsInTheGrass 2020-06-30 10:31:08  813   ...

  2. oledb 访问接口sqlncli10返回了消息 没有活动事务_这样理解分布式事务你是不是就会懂了?...

    分布式事务主要解决分布式一致性的问题.说到底就是数据的分布式操作导致仅依靠本地事务无法保证原的性.与单机版的事务不同的是,单机是把多个命令打包成一个统一处理,分布式事务是将多个机器上执行的命令打包成一 ...

  3. quartz 分布式_6大分布式定时任务对比

    作者 | sharedCode 来源 | blog.csdn.net/u012394095/article/details/79470904 分布式定时任务简介 把分散的,可靠性差的计划任务纳入统一的 ...

  4. 聊聊分布式定时任务中间件架构及其实现--转

    原文来自微信公众号:聊聊架构 在互联网应用中,各式各样的定时任务存于系统各个角落.我们希望由一个平台统一将这些作业管理起来.通过这个系统,作业的宕机.崩溃等状态就可收入运维同学掌控,直接对接报警系统, ...

  5. 理解分布式一致性:拜占庭容错与PBFT

    理解分布式一致性:拜占庭容错与PBFT 拜占庭问题 拜占庭容错BFT PBFT(Practical Byzantine Fault Tolerance) why 3f+1 ? PBFT 的优点 PBF ...

  6. 理解分布式一致性:Paxos协议之Generalized Paxos Byzantine Paxos

    理解分布式一致性:Paxos协议之Generalized Paxos & Byzantine Paxos Generalized Paxos Byzantine Paxos Byzantine ...

  7. 理解分布式一致性:Paxos协议之Cheap Paxos Fast Paxos

    理解分布式一致性:Paxos协议之Cheap Paxos & Fast Paxos Cheap Paxos Message flow: Cheap Multi-Paxos Fast Paxos ...

  8. 6大分布式定时任务对比

    作者 | sharedCode 来源 | blog.csdn.net/u012394095/article/details/79470904 分布式定时任务简介 把分散的,可靠性差的计划任务纳入统一的 ...

  9. 《深入理解分布式事务》第九章 可靠消息最终一致性分布式事务原理

    <深入理解分布式事务>第九章 可靠消息最终一致性分布式事务原理 文章目录 <深入理解分布式事务>第九章 可靠消息最终一致性分布式事务原理 一.基本原理 二.本地消息表 1.实现 ...

  10. 《深入理解分布式事务》第八章 TCC 分布式事务原理

    <深入理解分布式事务>第八章 TCC 分布式事务原理 文章目录 <深入理解分布式事务>第八章 TCC 分布式事务原理 一.TCC 核心思想 二.TCC 实现原理 1.TCC 核 ...

最新文章

  1. C语言数据类型基本概念
  2. 查看sqlserver版本
  3. 客户想你死系列,哈哈哈设计师不容易啊! | 今日最佳
  4. nmcli 命令的基本使用
  5. mysql 分布式锁_【分布式锁的演化】分布式锁居然还能用MySQL?
  6. Vue之代码自动格式化
  7. 喜大普奔,FL Studio终于出官方中文版了!
  8. javascript中的弹框
  9. mysql 通达信公式_通达信的几个好用指标
  10. HDS设备高级操作_VSP_更换Cache电池手册
  11. C#实现的等额本息法、按月付息到期还本法、一次性还本付息法
  12. 电信催费打错话费单 男子“拖欠”45036亿元话费
  13. wangEditor - 支持word上传的富文本编辑器
  14. Attention-Based Recurrent Neural Network Models for Joint Intent Detection and Slot Filling论文笔记
  15. PHP 生成随机号段的电话号码,PHP手机号正则(多号段)
  16. gateway-使用
  17. 谁说小P只是玩伴-巧用PSP及RSS资源提高英语听力
  18. python爬取电影天堂beautiful_Python爬虫 -- 抓取电影天堂8分以上电影
  19. uniapp获取微信授权登录和手机号一键登录(保姆教程)
  20. 三字经 -王应麟 章太炎

热门文章

  1. PDF文件在线合并如何操作
  2. RS485通信原理图及程序实例详解
  3. 计算机丢失msvcrt.dll,msvcrt.dll修复工具
  4. 快速入门学习数字图像处理(冈萨雷斯第三版)
  5. 海量数据和高并发下的 Redis 业务优化实践
  6. iOS下载历史版本App教程
  7. JSP-tomcat设置编码格式 配置utf-8(以防网页框以及网页显示的时候中文乱码)
  8. java tostring apache,Java如何使用Apache Commons Lang ToStringBuilder类?
  9. 2020年百度之星程序设计大赛-初赛二(Poker、Distance)
  10. 运用加密技术保护Java源代码