折腾了一周的 Java Quartz 集群任务调度,很遗憾没能搞定,网上的相关文章也少得可怜,在多节点(多进程)环境下 Quartz 似乎无法动态增减任务,恼火。无奈之下自己撸了一个简单的任务调度器,结果只花了不到 2天时间,而且感觉非常简单好用,代码量也不多,扩展性很好。

实现一个分布式的任务调度器有几个关键的考虑点单次任务和循环任务好做,难的是 cron 表达式的解析和时间计算怎么做?

多进程同一时间如何保证一个任务的互斥性?

如何动态变更增加和减少任务?

代码实例

在深入讲解实现方法之前,我们先来看看这个调度器是如何使用的class Demo{

public static void main(String[] args){

var redis = new RedisStore();

// sample 为任务分组名称

var store = new RedisTaskStore(redis, "sample");

// 5s 为任务锁寿命

var scheduler = new DistributedScheduler(store, 5);

// 注册一个单次任务

scheduler.register(Trigger.onceOfDelay(5), Task.of("once1", () -> {

System.out.println("once1");

}));

// 注册一个循环任务

scheduler.register(Trigger.periodOfDelay(5, 5), Task.of("period2", () -> {

System.out.println("period2");

}));

// 注册一个 CRON 任务

scheduler.register(Trigger.cronOfMinutes(1), Task.of("cron3", () -> {

System.out.println("cron3");

}));

// 设置全局版本号

scheduler.version(1);

// 注册监听器

scheduler.listener(ctx -> {

System.out.println(ctx.task().name() + " is complete");

});

// 启动调度器

scheduler.start();

}

}

当代码升级任务需要增加减少时(或者变更调度时间),只需要递增全局版本号,现有的进程中的任务会自动被重新调度,那些没有被注册的任务(任务减少)会自动清除。新增的任务(新任务)在老代码的进程里是不会被调度的(没有新任务的代码无法调度),被清除的任务(老任务)在老代码的进程里会被取消调度。

比如我们要取消 period2 任务,增加 period4 任务class Demo{

public static void main(String[] args){

var redis = new RedisStore();

// sample 为任务分组名称

var store = new RedisTaskStore(redis, "sample");

// 5s 为任务锁寿命

var scheduler = new DistributedScheduler(store, 5);

// 注册一个单次任务

scheduler.register(Trigger.onceOfDelay(5), Task.of("once1", () -> {

System.out.println("once1");

}));

// 注册一个 CRON 任务

scheduler.register(Trigger.cronOfMinutes(1), Task.of("cron3", () -> {

System.out.println("cron3");

}));

// 注册一个循环任务

scheduler.register(Trigger.periodOfDelay(5, 10), Task.of("period4", () -> {

System.out.println("period4");

}));

// 递增全局版本号

scheduler.version(2);

// 注册监听器

scheduler.listener(ctx -> {

System.out.println(ctx.task().name() + " is complete");

});

// 启动调度器

scheduler.start();

}

}

cron4j

it.sauronsoftware.cron4j

cron4j

2.2.5

这个开源的 library 包含了基础的 cron 表达式解析功能,它还提供了任务的调度功能,不过这里并不需要使用它的调度器。我只会用到它的表达式解析功能,以及一个简单的方法用来判断当前的时间是否匹配表达式(是否该运行任务了)。

我们对 cron 的时间精度要求很低,1 分钟判断一次当前的时间是否到了该运行任务的时候就可以了。class SchedulingPattern{

// 表达式是否有效

boolean validate(String cronExpr);

// 是否应该运行任务了(一分钟判断一次)

boolean match(long nowTs);

}

任务的互斥性

因为是分布式任务调度器,多进程环境下要控制同一个任务在调度的时间点只能有一个进程运行。使用 Redis 分布式锁很容易就可以搞定。锁需要保持一定的时间(比如默认 5s)。

所有的进程都会在同一时间调度这个任务,但是只有一个进程可以抢到锁。因为分布式环境下时间的不一致性,不同机器上的进程会有较小的时间差异窗口,锁必须保持一个窗口时间,这里我默认设置为 5s(可定制),这就要求不同机器的时间差不能超过 5s,超出了这个值就会出现重复调度。public boolean grabTask(String name){

var holder = new Holder();

redis.execute(jedis -> {

var lockKey = keyFor("task_lock", name);

var ok = jedis.set(lockKey, "true", SetParams.setParams().nx().ex(lockAge));

holder.value(ok != null);

});

return holder.value();

}

全局版本号

我们给任务列表附上一个全局的版本号,当业务上需要增加或者减少调度任务时,通过变更版本号来触发进程的任务重加载。这个重加载的过程包含轮询全局版本号(Redis 的一个key),如果发现版本号变动,立即重新加载任务列表配置并重新调度所有的任务。private void scheduleReload() {

// 1s 对比一次

this.scheduler.scheduleWithFixedDelay(() -> {

try {

if (this.reloadIfChanged()) {

this.rescheduleTasks();

}

} catch (Exception e) {

LOG.error("reloading tasks error", e);

}

}, 0, 1, TimeUnit.SECONDS);

}

重新调度任务先要取消当前所有正在调度的任务,然后调度刚刚加载的所有任务。private void rescheduleTasks(){

this.cancelAllTasks();

this.scheduleTasks();

}

private void cancelAllTasks(){

this.futures.forEach((name, future) -> {

LOG.warn("cancelling task {}", name);

future.cancel(false);

});

this.futures.clear();

}

因为需要将任务持久化,所以设计了一套任务的序列化格式,这个也很简单,使用文本符号分割任务配置属性就行。// 一次性任务(startTime)

ONCE@2019-04-29T15:26:29.946+0800

// 循环任务,(startTime,endTime,period),这里任务的结束时间是天荒地老

PERIOD@2019-04-29T15:26:29.949+0800|292278994-08-17T15:12:55.807+0800|5

// cron 任务,一分钟一次

CRON@*/1 * * * *

$ redis-cli

127.0.0.1:6379> hgetall sample_triggers

1) "task3"

2) "CRON@*/1 * * * *"

3) "task2"

4) "PERIOD@2019-04-29T15:26:29.949+0800|292278994-08-17T15:12:55.807+0800|5"

5) "task1"

6) "ONCE@2019-04-29T15:26:29.946+0800"

7) "task4"

8) "PERIOD@2019-04-29T15:26:29.957+0800|292278994-08-17T15:12:55.807+0800|10"

线程池

时间调度会有一个单独的线程(单线程线程池),任务的运行由另外一个线程池来完成(数量可定制)。class DistributedScheduler{

private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

private ExecutorService executor = Executors.newFixedThreadPool(threads);

}

之所以要将线程池分开,是为了避免任务的执行(IO)影响了时间的精确调度。

支持无互斥任务

互斥任务要求任务的单进程运行,无互斥任务就是没有加分布式锁的任务,可以多进程同时运行。默认需要互斥。class Task{

/**

* 是否需要考虑多进程互斥(true表示不互斥,多进程能同时跑)

*/

private boolean concurrent;

private String name;

private Runnable runner;

...

public static Task of(String name, Runnable runner){

return new Task(name, false, runner);

}

public static Task concurrent(String name, Runnable runner){

return new Task(name, true, runner);

}

}

增加回调接口

考虑到调度器的使用者可能需要对任务运行状态进行监控,这里增加了一个简单的回调接口,目前功能比较简单。能汇报运行结果(成功还是异常)和运行的耗时class TaskContext{

private Task task;

private long cost;  // 运行时间

private boolean ok;

private Throwable e;

}

interface ISchedulerListener{

public void onComplete(TaskContext ctx);

}

支持存储扩展

目前只实现了 Redis 和 Memory 形式的任务存储,扩展到 zk、etcd、关系数据库也是可行的,实现下面的接口即可。interface ITaskStore{

public long getRemoteVersion();

public Map getAllTriggers();

public void saveAllTriggers(long version, Map triggers);

public boolean grabTask(String name);

}

代码地址

https://github.com/pyloque/taskin

java 分布式任务_一个简单的基于 Redis 的分布式任务调度器 —— Java 语言实现...相关推荐

  1. java超市管理系统_一个简单的基于控制台的超市管理系统(java)

    一个小菜鸟的成长之路: 废话不多说,直接上干货 首先是java环境:jdk1.10 jdbc驱动:mysql-connector-java-5.1.46.jar 其次是开发软件IntelliJ IDE ...

  2. redis延迟队列 实现_灵感来袭,基于Redis的分布式延迟队列(续)

    背景 上一篇(灵感来袭,基于Redis的分布式延迟队列)讲述了基于Java DelayQueue和Redis实现了分布式延迟队列,这种方案实现比较简单,应用于延迟小,消息量不大的场景是没问题的,毕竟J ...

  3. redis延迟队列 实现_灵感来袭,基于Redis的分布式延迟队列

    一.延迟队列 延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费.比如1分钟之后发送短信,发送邮件,检测数据状态等. 二.Redisson Delayed Queue 如果你项目中使 ...

  4. java 序列化 例子_一个简单的Java序列化的例子

    简单来说序列化就是一种用来处理对象流的机制,所谓对象流也就是将对象的内容进行流化,流的概念这里不用多说(就是I/O),我们可以对流化后的对象进行读写操作,也可将流化后的对象传输于网络之间(注:要想将对 ...

  5. java年龄计算_一个简单的java年龄计算器

    制作一个如下图年龄计算器 根据题目,我做了一个由Calendar类以及年月日各相减得到的年龄,当然正确的方法不止一个,以下为我的源代码和结果截图: package com.Date; import j ...

  6. python写tcp通信程序_一个简单的基于TCP通信的服务器端与客户端程序

    一,概述 1,客户端: 创建客户端套接字对象 和服务端套接字建立连接 发送数据 接收数据 关闭客户端套接字 2,服务器端: 创建服务端端套接字对象 绑定端口号 设置监听 等待接受客户端的连接请求 接收 ...

  7. java实现购买_一个简单的实现购买商品功能的Java小程序

    public class Purchase { public static void main(String[] args) { System.out.println(" * * * * * ...

  8. java瀑布流_一个简单的瀑布流实现。

    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,博客地址为http://www.cnblogs.com/jasonnode/ . 瀑布流简介 主 ...

  9. 基于Redis实现分布式锁之前,这些坑你一定得知道

    开头 基于Redis的分布式锁对大家来说并不陌生,可是你的分布式锁有失败的时候吗?在失败的时候可曾怀疑过你在用的分布式锁真的靠谱吗?以下是结合自己的踩坑经验总结的一些经验之谈. 你真的需要分布式锁吗? ...

最新文章

  1. 转:Swing中的线程探究
  2. 初学者怎样看懂python代码_新手入门必看,最常用的Python代码片段
  3. Ubuntu 18.04 安装 redis入门使用
  4. 重温Android四大组件(一)—Activity的生命周期
  5. MySQL日期和时间类型
  6. 2008年4月10日:超负荷的一天
  7. 一个好端端的团队是如何被管理者搞垮的?
  8. php环境模拟stphp_一个模拟浏览器请求的php类,模拟请求ua设置
  9. Unity 和腾讯游戏成立联合创新实验室:从技术创新探索游戏产品新模式和概念
  10. 5G 是安全漏洞的“救世主”吗?
  11. 关于解决“用系统U盘安装win7却提示‘缺少所需的CD/DVD驱动器设备驱动程序’”的问题
  12. 《网络攻防第六周作业》
  13. 3.字符串(string)
  14. php larvel4.2,Laravel 4.2参考手册 pdf
  15. 时间序列的平稳性检验方法汇总
  16. tar 打包解压参数详解
  17. 安装Win7时删除系统保留的100M隐藏分区
  18. unity 使物体跟随路径点自动移动位置 插值旋转
  19. 嵌入式系统开发环境概述
  20. 支付宝网站支付在微信浏览器中跳转问题(亲测,附源码)

热门文章

  1. linux df 目录大小,Linux命令du df查看文件和文件夹大小
  2. python所有函数用法_Python函数使用方法(高级用法)
  3. 神经网络迭代次数的简并和不可约谱项
  4. gamma函数stiriling公式_SVM参数: C和gamma
  5. 【控制】《多智能体系统一致性协同演化控制理论与技术》纪良浩老师-第10章-二阶离散时间时延多智能体系统加权一致性
  6. 1.10 梯度消失与梯度爆炸-深度学习第二课《改善深层神经网络》-Stanford吴恩达教授
  7. TCL with SNPS - get_object_namesizeof_collectionstring
  8. 示波器到底选择多大的带宽合适
  9. linux svn强制注释,svn强制提交时添加注释
  10. linux下配置jdk+tomcat