ElasticJob‐Lite提供作业监听器,用于在任务执行前后触发监听器的相关方法。作业监听器分为每台作业节点均执行的常规监听器分布式场景中仅单一节点执行的分布式监听器(分布式监听器目前有Bug)。在作业依赖(DAG)功能开发完成之后,可能会考虑删除作业监听器功能。而实现自己的常规监听器和分布式监听器,需要通过SPI加入,才能被ElasticJob‐Lite感知到。

源码分析

作业监听器工厂类:

package org.apache.shardingsphere.elasticjob.infra.listener;import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;import java.util.Optional;
import java.util.Properties;@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ElasticJobListenerFactory {// 在静态块中注册作业监听器static {ElasticJobServiceLoader.registerTypedService(ElasticJobListener.class);}...
}

作业监听器是在作业监听器工厂中进行注册的,作业监听器工厂类在完成类初始化后,就已经通过ElasticJobServiceLoader类注册了所有的作业监听器。ElasticJobServiceLoader类的相关代码如下:

    private static final ConcurrentMap<Class<? extends TypedSPI>, ConcurrentMap<String, TypedSPI>> TYPED_SERVICES = new ConcurrentHashMap<>();private static final ConcurrentMap<Class<? extends TypedSPI>, ConcurrentMap<String, Class<? extends TypedSPI>>> TYPED_SERVICE_CLASSES = new ConcurrentHashMap<>();public static <T extends TypedSPI> void registerTypedService(final Class<T> typedService) {if (TYPED_SERVICES.containsKey(typedService)) {return;}// 使用ServiceLoader类加载服务(作业监听器),然后存储于ConcurrentMap中ServiceLoader.load(typedService).forEach(each -> registerTypedServiceClass(typedService, each));}// 将作业监听器存储于ConcurrentMap中,方便以后使用private static <T extends TypedSPI> void registerTypedServiceClass(final Class<T> typedService, final TypedSPI instance) {TYPED_SERVICES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance);TYPED_SERVICE_CLASSES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance.getClass());}

ServiceLoader类就是Java提供的SPISPIService Provider Interface)是JDK内置的一种服务提供发现机制,可以用来启用框架扩展和替换组件,主要是被框架的开发人员使用,不同厂商可以针对同一接口做出不同的实现,比如java.sql.Driver接口,MySQLPostgreSQL都提供了对应的实现给用户使用,而JavaSPI机制可以为某个接口寻找服务实现。JavaSPI机制主要思想是将装配的控制权移到程序之外,在模块化设计中这个机制尤其重要,其核心思想就是解耦。

ServiceLoader类正常工作的唯一要求是服务提供类必须具有无参构造函数,以便它们可以在加载期间实例化。通过在资源目录META-INF/services中放置服务提供者配置文件来标识服务提供者,文件名是服务类型的完全限定名(比如ElasticJobListener类的完全限定名),该文件包含具体的服务提供者类的完全限定名列表(ElasticJobListener实现类的完全限定名列表),每行一个,每个名称周围的空格和制表符以及空行都将被忽略,该文件必须以UTF-8编码。

常规监听器

若作业处理作业节点的文件,处理完成后删除文件,可考虑使用每个节点均执行清理任务。此类型任务实现简单,且无需考虑全局分布式任务是否完成,应尽量使用此类型监听器。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.kaven</groupId><artifactId>job</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.1.RELEASE</version></parent><dependencies><dependency><groupId>org.apache.shardingsphere.elasticjob</groupId><artifactId>elasticjob-lite-spring-boot-starter</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version><scope>provided</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency></dependencies>
</project>

作业定义(@Component注解一定要加上,不然Spring Boot不能感知到这个作业):

package com.kaven.job;import lombok.SneakyThrows;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;/*** @Author: ITKaven* @Date: 2021/11/20 17:02* @Blog: https://kaven.blog.csdn.net* @Leetcode: https://leetcode-cn.com/u/kavenit* @Notes:*/@Component
public class MySimpleJob implements SimpleJob {private static final SimpleDateFormat formatter =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@SneakyThrows@Overridepublic void execute(ShardingContext shardingContext) {System.out.println(formatter.format(new Date()) + " : " + shardingContext.getShardingParameter() + "进行数据备份...");Thread.sleep(2000);System.out.println(formatter.format(new Date()) + " : " + shardingContext.getShardingParameter() + "完成数据备份。");}
}

常规监听器(实现ElasticJobListener接口):

package com.kaven.job;import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;import java.text.SimpleDateFormat;
import java.util.Date;/*** @Author: ITKaven* @Date: 2021/12/18 15:46* @Blog: https://kaven.blog.csdn.net* @Leetcode: https://leetcode-cn.com/u/kavenit* @Notes:*/
public class MySimpleJobListener implements ElasticJobListener {private static final SimpleDateFormat formatter =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 无参构造器public MySimpleJobListener(){}@Overridepublic void beforeJobExecuted(ShardingContexts shardingContexts) {System.out.println(formatter.format(new Date()) + " : 准备数据备份环境。");}@Overridepublic void afterJobExecuted(ShardingContexts shardingContexts) {System.out.println(formatter.format(new Date()) + " : 清理数据备份环境。");}@Overridepublic String getType() {return "MySimpleJobListener";}
}

getType方法的返回值是存储该作业监听器的第二个keyConcurrentMap<Class<? extends TypedSPI>, ConcurrentMap<String, TypedSPI>>ConcurrentMap<Class<? extends TypedSPI>, ConcurrentMap<String, Class<? extends TypedSPI>>>)。

        TYPED_SERVICES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance);TYPED_SERVICE_CLASSES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance.getClass());

启动类:

package com.kaven.job;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @Author: ITKaven* @Date: 2021/12/16 20:21* @Blog: https://kaven.blog.csdn.net* @Leetcode: https://leetcode-cn.com/u/kavenit* @Notes:*/@SpringBootApplication
public class Server {public static void main(String[] args) {SpringApplication.run(Server.class);}
}

配置文件:

elasticjob:reg-center:server-lists: "192.168.31.173:9000"namespace: "my-job"connection-timeout-milliseconds: 40000max-retries: 5jobs:MySimpleJob:elasticJobClass: com.kaven.job.MySimpleJobshardingTotalCount: 3cron: "30 * * * * ?"description: "该作业有三个分片,每隔一分钟执行一次"overwrite: truejobListenerTypes: MySimpleJobListener     # 根据getType方法的返回值来匹配合适的作业监听器shardingItemParameters: "0=北京,1=上海,2=深圳"server:port: 8080

通过SPI加入自定义的作业监听器。

允许应用并行启动(每次启动都需要修改端口,不然端口会有冲突):

输出如下图所示:



输出结果符合预期。

分布式监听器

若作业处理数据库中的数据,处理完成后只需一个节点执行数据清理任务即可。此类型任务处理复杂,需要同步分布式环境下作业的状态,提供了超时设置来避免作业不同步导致的死锁,应谨慎使用。目前ElasticJob‐Lite的分布式监听器是有Bug的,坑了博主小半天,还以为操作有问题,最终看源码发现有Bug,并且Github上也有人提这个Bug(shardingsphere-elasticjob/issues/487)。

不过博主还是演示一下,然后分析一下导致这个Bug的原因。

分布式监听器(继承AbstractDistributeOnceElasticJobListener抽象类):

package com.kaven.job;import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
import org.apache.shardingsphere.elasticjob.lite.api.listener.AbstractDistributeOnceElasticJobListener;import java.text.SimpleDateFormat;
import java.util.Date;/*** @Author: ITKaven* @Date: 2021/12/18 17:05* @Blog: https://kaven.blog.csdn.net* @Leetcode: https://leetcode-cn.com/u/kavenit* @Notes:*/
public class MyDistributeOnceSimpleJobListener extends AbstractDistributeOnceElasticJobListener {private static final SimpleDateFormat formatter =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");private static final long startedTimeoutMilliseconds = 10000;private static final long completedTimeoutMilliseconds = 10000;public MyDistributeOnceSimpleJobListener() {super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);}@Overridepublic void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {System.out.println(formatter.format(new Date()) + " : 往数据库中插入备份任务,任务状态为未完成。");}@Overridepublic void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {System.out.println(formatter.format(new Date()) + " : 更新数据库中对应备份任务的状态为完成。");}@Overridepublic String getType() {return "MyDistributeOnceSimpleJobListener";}
}

AbstractDistributeOnceElasticJobListener抽象类实现了ElasticJobListener接口。

修改配置文件:

      jobListenerTypes: MySimpleJobListener,MyDistributeOnceSimpleJobListener

通过SPI加入自定义的作业监听器。

输出如下图所示:

分布式监听器可能在一次作业调度中被多个作业节点执行了。

Bug原因分析

分布式监听器的doBeforeJobExecutedAtLastStarted方法和doAfterJobExecutedAtLastCompleted方法分别在父类(AbstractDistributeOnceElasticJobListener)的beforeJobExecuted方法和afterJobExecuted方法中调用,而调用过程中并没有使用分布式锁,这可能导致分布式监听器的这两个方法在一次作业调度中被多个作业分片节点执行。

ElasticJob‐Lite的作业监听器就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

ElasticJob‐Lite:作业监听器相关推荐

  1. ElasticJob‐Lite:HTTP作业

    ElasticJob的作业分类基于class和type两种类型.基于class的作业需要开发者自行通过实现接口的方式织入业务逻辑:基于type的作业则无需编码,只需要提供相应配置即可.基于class的 ...

  2. ElasticJob‐Lite:Script作业

    ElasticJob的作业分类基于class和type两种类型.基于class的作业需要开发者自行通过实现接口的方式织入业务逻辑:基于type的作业则无需编码,只需要提供相应配置即可.基于class的 ...

  3. ElasticJob Lite Console 处于分片待调整状态,无法触发解决方案

    将对应job的触发时间改成最近的,比如当前时间17:49:00,那就把Cron表达式改成下一分钟:0 50 17 * * ? ,当自动触发过一次后,状态就会变成 因为要触发过,才会让你操作. 记得,改 ...

  4. 分布式作业 Elastic-Job 快速上手指南

    转载自 分布式作业 Elastic-Job 快速上手指南 Elastic-Job支持 JAVA API 和 Spring 配置两种方式配置任务,这里我们使用 JAVA API 的形式来创建一个简单的任 ...

  5. elastic-job之监听器

    每个作业都可以配置一个任务监听器,确切的说是只能配置一个本地监听器和一个分布式监听器.Elastic-job有三种作业类型,但是它们的通用配置都是一样的,所以本文在介绍作业的监听器配置时将仅以简单作业 ...

  6. 任务调度之Elastic-Job

    认识 Elastic-Job 任务调度高级需求 Quartz 的不足: 1. 作业只能通过 DB 抢占随机负载,无法协调 2. 任务不能分片--单个任务数据太多了跑不完,消耗线程,负载不均 3. 作业 ...

  7. 分布式作业调度框架——Elastic-Job

    分布式作业调度框架--Elastic-Job 1.概述 Elastic-Job是当当开源的分布式弹性作业框架.Elastic-Job分为lite和cloud两个相对独立的版本,lite版为轻量级去中心 ...

  8. elastic-job 定时任务集成

    文章目录 一.基本使用 第一步添加依赖: 第二步:增加Zookeeper注册中心的配置 第三步:开启Elastic-Job自动配置 第四步 配置任务 二.扩展功能 1.想在配置文件设置任务参数怎么写? ...

  9. 分布式调度框架 elastic-job 实践详解(超详细)

    虽然 Quartz 也可以通过集群方式来保证服务高可用,但是它也有一个的弊端,那就是服务节点数量的增加,并不能提升任务的执行效率,即不能实现水平扩展! 之所以产生这样的结果,是因为 Quartz 在分 ...

  10. 王者归来:分布式调度解决方案 ElasticJob 重启!

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 你会误认为 ElasticJob 只是作业管控平台么?创 ...

最新文章

  1. C#Redis集合set
  2. 构建现代产业体系 农民丰收节·林裕豪:从玉农业落实“链长制”1-09-27
  3. 推荐一个以动画效果显示github提交记录的黑科技工具:Gource
  4. PHP5各个版本的新功能和新特性总结(转载 http://www.jb51.net/article/48150.htm)
  5. S3C2440对Nand Flash操作和电路原理(基于K9F2G08U0A)
  6. python内置函数map_Python内置函数(34)——map
  7. 897-递增顺序查找树
  8. 鸿蒙灵珠被林铭得到,上古五大灵珠,蕴含三界奥秘,为众生求得一线生机。
  9. 图像处理之理想低通滤波器、巴特沃斯低通滤波器和高斯低通滤波器的matlab实现去噪
  10. NOD32病毒库自动更新代码
  11. U盘容量变小后修复的方法
  12. win10无法连接wifi_更新WIN10 1903遇到旧版高通驱动无法连接WIFI重新安装网卡驱动...
  13. 城市天际线 for Mac城市建造类游戏
  14. OpenGL ES SDK for Android - 3
  15. 华硕ZenFone 7系列旗舰5G智能手机凭借Pixelworks显示技术展现惊人的视觉效果
  16. 《鸟哥的Linux私房菜》chapter9 20180818~20180826
  17. 动态规划题目——背包
  18. Failed to obtain JDBC Connection; nested exception is com.mysql.cj.jdbc.exceptions.CommunicationsEx
  19. win10 屏幕切换鼠标手势桌面边缘快捷切换 ahk
  20. 运动目标检测--三种方法比较

热门文章

  1. 知识总结2:Django常见面试题总结(持续更新)
  2. webstorm 下载并设置jade、less
  3. 【密码学】一文读懂ZUC算法
  4. 手机modem开发之VoLTE信令
  5. html下拉菜单右侧显示,css如何设置下拉菜单?
  6. FPGA之SDRAM控制器设计(一)
  7. 通过串口波特率计算数据传输速率(每秒字节数)
  8. C语言中求大于M10个最大素数,全国计算机二级C语言上机题库—南开100题
  9. 基于cat12和SPM12进行SBM数据分析笔记
  10. unity3D一些教程