ElasticJob的作业分类基于classtype两种类型。基于class的作业需要开发者自行通过实现接口的方式织入业务逻辑;基于type的作业则无需编码,只需要提供相应配置即可。基于class的作业接口的方法参数shardingContext包含作业配置、片和运行时信息。可通过getShardingTotalCount()getShardingItem()等方法分别获取分片总数和运行在本作业服务器的分片序列号等。

ElasticJob目前提供SimpleDataflow这两种基于class的作业类型,并提供ScriptHTTP这两种基于type的作业类型,用户可通过实现SPI接口自行扩展作业类型。

添加依赖(3.0.1是目前最新的Releases版本):

        <dependency><groupId>org.apache.shardingsphere.elasticjob</groupId><artifactId>elasticjob-lite-core</artifactId><version>3.0.1</version></dependency>

本篇博客介绍如何通过实现SPI接口自行扩展作业类型。

扩展作业类型

想要通过实现SPI接口自行扩展作业类型需要三个步骤(基于class的作业类型),而基于type的作业类型只需要后面两个步骤:

  1. 定义作业逻辑的执行接口(基于type的作业类型不需要此步骤,比如Script作业的作业逻辑由脚本程序执行,而HTTP作业的作业逻辑由请求的服务端执行,因此基于type的作业类型不需要定义作业逻辑的执行接口)。
  2. 实现作业逻辑执行接口的JobItemExecutor
  3. 通过Java SPI的方式声明实现的JobItemExecutor

作业逻辑执行接口

KavenJob接口(继承ElasticJob接口,作业逻辑的执行接口):

package com.kaven.job.my;import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;public interface KavenJob extends ElasticJob {void work(ShardingContext shardingContext, String jobExecutorName);
}

KavenJob接口的实现类MyKavenJob(实现作业逻辑的执行):

package com.kaven.job.my;import org.apache.shardingsphere.elasticjob.api.ShardingContext;import java.text.SimpleDateFormat;
import java.util.Date;public class MyKavenJob implements KavenJob{private static final SimpleDateFormat formatter =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Overridepublic void work(ShardingContext shardingContext, String jobExecutorName) {String job = shardingContext.getShardingParameter();if(job == null || job.trim().equals("")) {System.out.println("请指定帮[Kaven]执行的任务名称!");throw new RuntimeException();}System.out.printf("%s[%s]:帮[Kaven]执行%s任务!\n", jobExecutorName, formatter.format(new Date()), job);}
}

内置的SimpleDataflow作业的作业逻辑执行接口也是这样定义的:

package org.apache.shardingsphere.elasticjob.simple.job;import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;public interface SimpleJob extends ElasticJob {void execute(ShardingContext var1);
}
package org.apache.shardingsphere.elasticjob.dataflow.job;import java.util.List;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;public interface DataflowJob<T> extends ElasticJob {List<T> fetchData(ShardingContext var1);void processData(ShardingContext var1, List<T> var2);
}

因此想要定义SimpleDataflow作业就只需要分别实现SimpleJobDataflowJob接口即可。

JobItemExecutor

ElasticJob的作业分类基于classtype两种类型,因此JobItemExecutor必须继承或实现ClassedJobItemExecutor或者TypedJobItemExecutor接口。

KavenJobItemExecutor接口(继承了ClassedJobItemExecutor,如果要继承TypedJobItemExecutor接口来扩展type类型作业的自定义JobItemExecutor,也是类似的):

package com.kaven.job.my;import org.apache.shardingsphere.elasticjob.executor.item.impl.ClassedJobItemExecutor;public interface KavenJobItemExecutor extends ClassedJobItemExecutor<KavenJob> {String getJobExecutorName();
}

KavenJobItemExecutor接口的实现类MyKavenJobExecutor

package com.kaven.job.my;import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.executor.JobFacade;public class MyKavenJobExecutor implements KavenJobItemExecutor {public MyKavenJobExecutor() {}@Overridepublic Class<KavenJob> getElasticJobClass() {return KavenJob.class;}@Overridepublic void process(KavenJob kavenJob, JobConfiguration jobConfiguration, JobFacade jobFacade, ShardingContext shardingContext) {kavenJob.work(shardingContext, getJobExecutorName());}@Overridepublic String getJobExecutorName() {return this.getClass().getName();}
}

很显然MyKavenJob类中的work方法将在MyKavenJobExecutor类的process方法中调用,这是由ElasticJob控制的。SimpleDataflowScriptHTTP作业也是如此,在JobItemExecutor中执行作业的分片(以SimpleScript作业为例):

package org.apache.shardingsphere.elasticjob.simple.executor;import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.executor.JobFacade;
import org.apache.shardingsphere.elasticjob.executor.item.impl.ClassedJobItemExecutor;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;public final class SimpleJobExecutor implements ClassedJobItemExecutor<SimpleJob> {public SimpleJobExecutor() {}public void process(SimpleJob elasticJob, JobConfiguration jobConfig, JobFacade jobFacade, ShardingContext shardingContext) {elasticJob.execute(shardingContext);}public Class<SimpleJob> getElasticJobClass() {return SimpleJob.class;}
}
package org.apache.shardingsphere.elasticjob.script.executor;import com.google.common.base.Strings;
import java.io.IOException;
import java.util.Properties;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.executor.JobFacade;
import org.apache.shardingsphere.elasticjob.executor.item.impl.TypedJobItemExecutor;
import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException;
import org.apache.shardingsphere.elasticjob.infra.json.GsonFactory;public final class ScriptJobExecutor implements TypedJobItemExecutor {public ScriptJobExecutor() {}public void process(ElasticJob elasticJob, JobConfiguration jobConfig, JobFacade jobFacade, ShardingContext shardingContext) {CommandLine commandLine = CommandLine.parse(this.getScriptCommandLine(jobConfig.getProps()));commandLine.addArgument(GsonFactory.getGson().toJson(shardingContext), false);try {(new DefaultExecutor()).execute(commandLine);} catch (IOException var7) {throw new JobSystemException("Execute script failure.", new Object[]{var7});}}private String getScriptCommandLine(Properties props) {String result = props.getProperty("script.command.line");if (Strings.isNullOrEmpty(result)) {throw new JobConfigurationException("Cannot find script command line, job is not executed.", new Object[0]);} else {return result;}}public String getType() {return "SCRIPT";}
}

声明JobItemExecutor

通过Java SPI的方式声明实现的JobItemExecutor,那为什么需要声明实现的JobItemExecutor?因为ElasticJob需要知道作业对应的JobItemExecutor,以便用它来执行该作业的分片。ElasticJob通过ScheduleJobBootstrap实例来完成定时任务的调度。

Application类(启动类):

package com.kaven.job;import com.kaven.job.my.MyKavenJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;/*** @Author: ITKaven* @Date: 2021/11/20 17:05* @Blog: https://kaven.blog.csdn.net* @Leetcode: https://leetcode-cn.com/u/kavenit* @Notes:*/
public class Application {public static void main(String[] args) {new ScheduleJobBootstrap(createRegistryCenter(), new MyKavenJob(),createJobConfiguration()).schedule();}private static CoordinatorRegistryCenter createRegistryCenter() {ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.200:9999", "my-job");zc.setConnectionTimeoutMilliseconds(40000);zc.setMaxRetries(5);CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);regCenter.init();return regCenter;}private static JobConfiguration createJobConfiguration() {String jobs = "0=看论文,1=做实验,2=打比赛,3=开组会,4=看书,5=其他";return JobConfiguration.newBuilder("KavenJob", 6).cron("30 * * * * ?").shardingItemParameters(jobs).overwrite(true).failover(true).build();}
}

创建ScheduleJobBootstrap实例,也会创建JobScheduler实例。

    public ScheduleJobBootstrap(CoordinatorRegistryCenter regCenter, ElasticJob elasticJob, JobConfiguration jobConfig) {this.jobScheduler = new JobScheduler(regCenter, elasticJob, jobConfig);}public ScheduleJobBootstrap(CoordinatorRegistryCenter regCenter, String elasticJobType, JobConfiguration jobConfig) {this.jobScheduler = new JobScheduler(regCenter, elasticJobType, jobConfig);}

而在创建JobScheduler实例时,还会创建ElasticJobExecutor实例。

    public JobScheduler(CoordinatorRegistryCenter regCenter, ElasticJob elasticJob, JobConfiguration jobConfig) {...this.jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, this.jobFacade);...}public JobScheduler(CoordinatorRegistryCenter regCenter, String elasticJobType, JobConfiguration jobConfig) {...this.jobExecutor = new ElasticJobExecutor(elasticJobType, this.jobConfig, this.jobFacade);...}

在创建ElasticJobExecutor实例时,会通过JobItemExecutorFactory类获取作业对应的JobItemExecutor

    public ElasticJobExecutor(final ElasticJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade) {this(elasticJob, jobConfig, jobFacade, JobItemExecutorFactory.getExecutor(elasticJob.getClass()));}public ElasticJobExecutor(final String type, final JobConfiguration jobConfig, final JobFacade jobFacade) {this(null, jobConfig, jobFacade, JobItemExecutorFactory.getExecutor(type));}

JobItemExecutorFactory类:

package org.apache.shardingsphere.elasticjob.executor.item;import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
import org.apache.shardingsphere.elasticjob.executor.item.impl.ClassedJobItemExecutor;
import org.apache.shardingsphere.elasticjob.executor.item.impl.TypedJobItemExecutor;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.ServiceLoader;/*** Job item executor factory.*/
@SuppressWarnings("rawtypes")
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class JobItemExecutorFactory {private static final Map<Class, ClassedJobItemExecutor> CLASSED_EXECUTORS = new HashMap<>();static {ElasticJobServiceLoader.registerTypedService(TypedJobItemExecutor.class);ServiceLoader.load(ClassedJobItemExecutor.class).forEach(each -> CLASSED_EXECUTORS.put(each.getElasticJobClass(), each));}/*** Get executor.* * @param elasticJobClass elastic job class* @return job item executor*/@SuppressWarnings("unchecked")public static JobItemExecutor getExecutor(final Class<? extends ElasticJob> elasticJobClass) {for (Entry<Class, ClassedJobItemExecutor> entry : CLASSED_EXECUTORS.entrySet()) {if (entry.getKey().isAssignableFrom(elasticJobClass)) {return entry.getValue();}}throw new JobConfigurationException("Can not find executor for elastic job class `%s`", elasticJobClass.getName());}/*** Get executor.** @param elasticJobType elastic job type* @return job item executor*/public static JobItemExecutor getExecutor(final String elasticJobType) {return ElasticJobServiceLoader.getCachedTypedServiceInstance(TypedJobItemExecutor.class, elasticJobType).orElseThrow(() -> new JobConfigurationException("Cannot find executor for elastic job type `%s`", elasticJobType));}
}

ServiceLoader类就是Java提供的SPISPIService Provider Interface)是JDK内置的一种服务提供发现机制,可以用来启用框架扩展和替换组件,主要是被框架的开发人员使用,不同厂商可以针对同一接口做出不同的实现,比如java.sql.Driver接口,MySQLPostgreSQL都提供了对应的实现给用户使用,而JavaSPI机制可以为某个接口寻找服务实现。JavaSPI机制主要思想是将装配的控制权移到程序之外,在模块化设计中这个机制尤其重要,其核心思想就是解耦。

ServiceLoader类正常工作的唯一要求是服务提供类必须具有无参构造函数,以便它们可以在加载期间实例化。通过在资源目录META-INF/services中放置服务提供者配置文件来标识服务提供者,文件名是服务类型的完全限定名,该文件包含具体的服务提供者类的完全限定名列表,每行一个,每个名称周围的空格和制表符以及空行都将被忽略,该文件必须以UTF-8编码。如下图所示:

完成这三个步骤,我们实现的JobItemExecutor就可以被ElasticJob发现,以便将它们用于对应作业分片的执行。

    static {// 加载type作业的JobItemExecutorElasticJobServiceLoader.registerTypedService(TypedJobItemExecutor.class);// 加载class作业的JobItemExecutorServiceLoader.load(ClassedJobItemExecutor.class).forEach(each -> CLASSED_EXECUTORS.put(each.getElasticJobClass(), each));}

JobItemExecutorFactory类的静态块中会加载classtype作业的JobItemExecutor,加载class作业的JobItemExecutor时,以each.getElasticJobClass()keyeachvalue。而MyKavenJobExecutor类的getElasticJobClass方法返回KavenJob.class,这样作业和JobItemExecutor就对应起来了。

    @Overridepublic Class<KavenJob> getElasticJobClass() {return KavenJob.class;}

加载type作业的JobItemExecutor是在ElasticJobServiceLoader类中完成的(也是使用ServiceLoader类来加载),以instance.getType()keyinstancevalue

    public static <T extends TypedSPI> void registerTypedService(final Class<T> typedService) {if (TYPED_SERVICES.containsKey(typedService)) {return;}ServiceLoader.load(typedService).forEach(each -> registerTypedServiceClass(typedService, each));}private static <T extends TypedSPI> void registerTypedServiceClass(final Class<T> typedService, final TypedSPI instance) {TYPED_SERVICES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance);TYPED_SERVICE_CLASSES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance.getClass());}

这也是为什么ScheduleJobBootstrap构造器的elasticJobType参数需要全部大写(比如SCRIPTHTTP)的原因。

new ScheduleJobBootstrap(createRegistryCenter(), "SCRIPT",createJobConfiguration()).schedule();

因为这两种作业对应的JobItemExecutor就是使用getType()的返回值作为key进行存储的。

    public String getType() {return "SCRIPT";}public String getType() {return "HTTP";}

这样就通过实现SPI接口自行扩展了作业类型,输出如下图所示:



type类型作业的扩展也是类似的,博主对不同之处也进行了分析,这里就不赘述了。扩展作业类型就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

ElasticJob‐Lite:扩展作业类型相关推荐

  1. ETL作业调度软件TASKCTL自定义扩展作业类型插件安装

    TASKCTL批量自动化调度作业类型扩展插件的安装方法如下几种: 1. 直接覆盖法 直接覆盖法的意思就是将自定义扩展好的插件,通常是一个shell脚本,上传至后台调度核心服务上,然后修改后台任务类型的 ...

  2. ElasticJob‐Lite:HTTP作业

    ElasticJob的作业分类基于class和type两种类型.基于class的作业需要开发者自行通过实现接口的方式织入业务逻辑:基于type的作业则无需编码,只需要提供相应配置即可.基于class的 ...

  3. ElasticJob‐Lite:Script作业

    ElasticJob的作业分类基于class和type两种类型.基于class的作业需要开发者自行通过实现接口的方式织入业务逻辑:基于type的作业则无需编码,只需要提供相应配置即可.基于class的 ...

  4. : error c2062: 意外的类型“int”_Go 命令行解析 flag 包之扩展新类型

    上篇文章 说到,除布尔类型 Flag,flag 支持的还有整型(int.int64.uint.uint64).浮点型(float64).字符串(string)和时长(duration). flag 内 ...

  5. 如何设计一门语言(十二)——设计可扩展的类型

    在思考怎么写这一篇文章的时候,我又想到了以前讨论正交概念的事情.如果一个系统被设计成正交的,他的功能扩展起来也可以很容易的保持质量这是没错的,但是对于每一个单独给他扩展功能的个体来说,这个系统一点都不 ...

  6. SAP 作业类型主数据

    创建KL01 修改KL02 显示KL03 解释一下栏位: 作业类型类别:1(手工输入,手工分配),就是为了说明作业的使用方法. 价格标志:1(根据计划作业自动计算)Price Indicator  有 ...

  7. SAP作业类型应用简介

    简单问题看作业类型应用 问题现象: MFN1|CON1/CON2对工单进行实际作业价格重估时提示KI 760错误消息如" "按实际价格重估在版本0, 财政年度 2000*年中不可能 ...

  8. SAP-CO.创建成本中心,作业类型,内部订单

    1 配置工作中心负责人 2 配置成本控制范围(OKKP) -- -- 3 把公司代码分配给控制范围 (OX19) 4 成本中心创建(KS01) 5 创建次级成本要素(KA06) -- 6 生成作业类型 ...

  9. BOOST使用 proto::extends 扩展终端类型的简单示例

    BOOST使用 proto::extends 扩展终端类型的简单示例 实现功能 C++实现代码 实现功能 BOOST使用 proto::extends 扩展终端类型的简单示例 C++实现代码 #inc ...

  10. SAP License:作业类型作为成本对象

    作业类型可以作为成本对象,此配置在成本控制范围中设置.当设置为成本对象时,可以在FI中对成本中心+作业类型进行记帐,记帐金额仍在成本中心报表中显示.但作业类型不能作为分配或分摊的发送方.接收方,仅能作 ...

最新文章

  1. Python 比特币 教程 之一:创建机器人
  2. python中的any函数_python函数-any()
  3. 一文带你纵览 200+ 大规模机器学习研究
  4. if判断用户名 linux,Shell脚本IF条件判断和判断条件总结
  5. Python一题三解:查找总分等于特定值的同学成绩
  6. 如何找回Oracle所有用户丢失的密码
  7. 虹软2.0 离线人脸识别 Android 开发 Demo
  8. 原生JS 将canvas生成图片
  9. [导入].net中设置系统时间
  10. Tenorshare iCareFone for mac如何修复iPhone手机系统?
  11. qca9563修改art区,将2T2R修改为1T1R
  12. 安装Ubuntu后找不到windows启动项的解决办法
  13. 高考志愿填报|物联网为何成为【热门选手】?
  14. weak和alias
  15. 云图说 | 勒索病毒防治解决方案
  16. C语言数组 :用户输入一个数, 我要用这个数当数组的长度。怎么办呢
  17. Springboot注解@ServletComponentScan和@ComponentScan
  18. Spring+SpringMVC+MyBatis+easyUI整合优化篇(五)MockMvc服务端的单元测试
  19. totolink路由器虚拟服务器,不怕没信号 TOTOLINK-N350R路由器任你用
  20. 40%带宽成本节约!京东云视频云正式支持AV1编码

热门文章

  1. 【图像去噪】基于matlab自适应布谷鸟算法优化维纳滤波器图像去噪【含Matlab源码 1892期】
  2. 数据库实验一、实验二、实验三、实验四
  3. 微信小程序点击图片放大图片
  4. datavideo切换台说明书_SE-700切换台-应用篇
  5. UniApp设置APP图标配置,不自动生成所有图标问题
  6. 使用旧版本MATLAB打开新版本MATLAB创建的.slx文件 提高MATLAB启动速度
  7. 简单计算经纬度表示的距离
  8. OPENCV Linux 显示中文 arm64
  9. 【语音定时播报系统】基于树莓派+百度语音合成打造语音定时播报系统
  10. ArcGis学习资料汇总整理