Java SPI机制

在上一篇博客中介绍了ElasticJob的作业分片策略:

  • ElasticJob‐Lite:作业分片策略介绍与源码分析

其中提到了ElasticJob是通过Java提供的SPI机制(ServiceLoader类)加载所有作业分片策略。

ServiceLoader类就是Java提供的SPISPIService Provider Interface)是JDK内置的一种服务提供发现机制,可以用来启用框架扩展和替换组件,主要是被框架的开发人员使用,不同厂商可以针对同一接口做出不同的实现,比如java.sql.Driver接口,MySQLPostgreSQL都提供了对应的实现给用户使用,而JavaSPI机制可以为某个接口寻找服务实现。JavaSPI机制主要思想是将装配的控制权移到程序之外,在模块化设计中这个机制尤其重要,其核心思想就是解耦。

ServiceLoader类正常工作的唯一要求是服务提供类必须具有无参构造函数,以便它们可以在加载期间实例化。通过在资源目录META-INF/services中放置服务提供者配置文件来标识服务提供者,文件名是服务类型的完全限定名(比如ElasticJobListener类的完全限定名),该文件包含具体的服务提供者类的完全限定名列表(ElasticJobListener实现类的完全限定名列表),每行一个,每个名称周围的空格和制表符以及空行都将被忽略,该文件必须以UTF-8编码。

自定义作业分片策略

所有可用的作业分片策略在JobShardingStrategyFactory类的静态块中被加载(通过ElasticJobServiceLoader类,该类是ElasticJob基于Java SPI机制实现的特定于作业的服务加载器)。

    static {ElasticJobServiceLoader.registerTypedService(JobShardingStrategy.class);}

加载的类型是JobShardingStrategy.class,因此自定义的作业分片策略需要实现该接口。

自定义作业分片策略ShuffleJobShardingStrategy类:

package com.kaven.job.my;import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;import java.util.*;public class ShuffleJobShardingStrategy implements JobShardingStrategy {@Overridepublic Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {if (jobInstances.isEmpty()) {return Collections.emptyMap();}// 先将作业分片项装入容器List<Integer> shuffleShardingList = new ArrayList<>(shardingTotalCount);for (int i = 0; i < shardingTotalCount; i++) {shuffleShardingList.add(i);}// 将容器中的作业分片项顺序打乱(使用容器的shuffle方法)Collections.shuffle(shuffleShardingList);// 模仿AverageAllocationJobShardingStrategy作业分片策略进行分配Map<JobInstance, List<Integer>> result = shardingShuffle(jobInstances, shardingTotalCount, shuffleShardingList);addShuffle(jobInstances, shardingTotalCount, result, shuffleShardingList);return result;}private Map<JobInstance, List<Integer>> shardingShuffle(final List<JobInstance> shardingUnits,final int shardingTotalCount,final List<Integer> shuffleShardingList) {Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingUnits.size(), 1);// 每个作业服务器最少应该分配的作业分片项数int itemCountPerSharding = shardingTotalCount / shardingUnits.size();int count = 0;for (JobInstance each : shardingUnits) {// 每个作业服务器申请的作业分片项列表(容量为itemCountPerSharding + 1)// itemCountPerSharding + 1为每个作业服务器最多应该分配的作业分片项数List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {// 给作业分片项列表添加容器中的第i个作业分片项shardingItems.add(shuffleShardingList.get(i));}// 将作业服务器与它执行的作业分片项列表进行关联result.put(each, shardingItems);count++;}return result;}private void addShuffle(final List<JobInstance> shardingUnits, final int shardingTotalCount,final Map<JobInstance, List<Integer>> shardingResults,final List<Integer> shuffleShardingList) {// 无法平均分配的分片项数int aliquant = shardingTotalCount % shardingUnits.size();// 已分配的无法平均分配的分片项数int count = 0;for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {// 是否还有无法平均分配的分片项if (count < aliquant) {// 分配给序号较小的作业服务器entry.getValue().add(shuffleShardingList.get(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count));}// 已分配数更新count++;}}// 作业分片策略的标识符@Overridepublic String getType() {return "Shuffle";}
}

博主自定义的ShuffleJobShardingStrategy作业分片策略是模仿AverageAllocationJobShardingStrategy作业分片策略(默认的作业分片策略),只是先将作业分片项装入容器,然后将容器中的作业分片项顺序打乱(使用容器的shuffle方法),之后再基于该作业分片项容器使用AverageAllocationJobShardingStrategy作业分片策略给作业服务器分配该容器中的作业分片项,如果不了解AverageAllocationJobShardingStrategy作业分片策略,可以去看看最上面列出的博客。

添加服务实现

resourcesMETA-INF/services中放置服务提供者配置文件来标识服务提供者,如下图所示:

测试

作业定义(Simple作业):

package com.kaven.job;import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;import java.text.SimpleDateFormat;
import java.util.Date;public class MySimpleJob implements SimpleJob {private static final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Overridepublic void execute(ShardingContext shardingContext) {String job = shardingContext.getShardingParameter();if(job == null || job.trim().equals("")) {System.out.println("请指定帮[Kaven]执行的任务名称!");throw new RuntimeException();}System.out.printf("%s 执行任务%d - [%s]!\n", formatter.format(new Date()),shardingContext.getShardingItem(), job);}
}

启动类:

package com.kaven.job;import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;public class Application {public static void main(String[] args) {new ScheduleJobBootstrap(createRegistryCenter(), new MySimpleJob(), createJobConfiguration()).schedule();}// 注册中心private static CoordinatorRegistryCenter createRegistryCenter() {ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.200:9000", "my-job");zc.setConnectionTimeoutMilliseconds(40000);zc.setMaxRetries(5);CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);regCenter.init();return regCenter;}// 作业配置private static JobConfiguration createJobConfiguration() {String jobs = "0=看论文,1=做实验,2=打比赛,3=开组会,4=看书,5=写博客,6=看源码";return JobConfiguration.newBuilder("MySimpleJob", 7).cron("30 0/2 * * * ?").shardingItemParameters(jobs)// 使用自定义的作业分片策略.jobShardingStrategyType("Shuffle").overwrite(true).build();}
}

启动三个作业服务器,输出如下图所示:



输出符合预期,因为自定义作业分片策略是模仿AverageAllocationJobShardingStrategy作业分片策略,但自定义作业分片策略中将作业的分片项顺序打乱了,因此给每个作业服务器分配的作业分片项可能不是连续的。

修改作业配置(使用默认的作业分片策略):

    private static JobConfiguration createJobConfiguration() {String jobs = "0=看论文,1=做实验,2=打比赛,3=开组会,4=看书,5=写博客,6=看源码";return JobConfiguration.newBuilder("MySimpleJob", 7).cron("30 0/2 * * * ?").shardingItemParameters(jobs)
//                .jobShardingStrategyType("Shuffle").overwrite(true).build();}

启动三个作业服务器,输出如下图所示:



输出符合AverageAllocationJobShardingStrategy作业分片策略,[0,1,6][2,3][4,5]很显然是有序的,而博主自定义的作业分片策略是乱序的。

ElasticJob如何自定义作业分片策略就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

ElasticJob‐Lite:自定义作业分片策略相关推荐

  1. ElasticJob Lite Console 处于分片待调整状态,无法触发解决方案

    将对应job的触发时间改成最近的,比如当前时间17:49:00,那就把Cron表达式改成下一分钟:0 50 17 * * ? ,当自动触发过一次后,状态就会变成 因为要触发过,才会让你操作. 记得,改 ...

  2. sharding-jdbc分库分表的 4种分片策略

    如果我一部分表做了分库分表,另一部分未做分库分表的表怎么处理?怎么才能正常访问? 这是一个比较典型的问题,我们知道分库分表是针对某些数据量持续大幅增长的表,比如用户表.订单表等,而不是一刀切将全部表都 ...

  3. sharding-jdbc 分库分表的 4种分片策略

    往下开展前先做个答疑,前两天有个小伙伴私下问了个问题说: 如果我一部分表做了分库分表,另一部分未做分库分表的表怎么处理?怎么才能正常访问? 这是一个比较典型的问题,我们知道分库分表是针对某些数据量持续 ...

  4. sharding-jdbc 分库分表的 4种分片策略,还蛮简单的

    上文<快速入门分库分表中间件 Sharding-JDBC (必修课)>中介绍了 sharding-jdbc 的基础概念,还搭建了一个简单的数据分片案例,但实际开发场景中要远比这复杂的多,我 ...

  5. ElasticJob‐Lite:HTTP作业

    ElasticJob的作业分类基于class和type两种类型.基于class的作业需要开发者自行通过实现接口的方式织入业务逻辑:基于type的作业则无需编码,只需要提供相应配置即可.基于class的 ...

  6. ElasticJob‐Lite:Script作业

    ElasticJob的作业分类基于class和type两种类型.基于class的作业需要开发者自行通过实现接口的方式织入业务逻辑:基于type的作业则无需编码,只需要提供相应配置即可.基于class的 ...

  7. elasticjob 分片策略

    Elasticjob的分片策略实现了三种. AverageAllocationJobShardingStrategy是最基础的分片策略. private Map<JobInstance, Lis ...

  8. ShardingSphere(八) 分库分表的多种分片策略

    在之前文章<ShardingSphere(二) 水平分表配置搭建,实现分表写入读取>中,我们介绍了数据库的水平分表配置,在文章中只介绍了最简单的行表达式分表配置方式,但往往在实际中我们的业 ...

  9. Sharding JDBC(四) 分片策略一:标准分片策略StandardShardingStrategy

    目录 一.标准分片策略StandardShardingStrategy 二.StandardShardingStrategy配置实现 分库分表最核心的两点SQL 路由  . SQL 改写 applic ...

  10. MyCat分片规则(全局表,ER分片表,多对多关联,主键分片VS非主键分片),MyCat常用的分片规则(15中分片规则),自定义MyCat分片规则,其它术语

    1 MyCat分片规则 数据切分中重要的几条原则,其中有几条数据冗余,表分组(Table Group). 1.1全局表 如果你的业务中有些数据类似于数据字典,比如配置文件的配置,常用业务的配置或数据量 ...

最新文章

  1. 知乎 高级操作系统_知乎高赞:Linux!为何他一人就写出这么强的系统,中国却做不出来?...
  2. 使tomcat和lighttpd使用service启停
  3. Linux|UNIX下LAMP环境的搭建及常见问题[连载4]
  4. android 之自定义Adapter的用法
  5. 怎样对ZBrush中的材料进行渲染和着色
  6. 讲述华为发布鸿蒙系统,华为鸿蒙系统正式版首批升级名单公布:这8款机型用户有福了!...
  7. linux备份用户权限
  8. 各型号交换机端口镜像配置方法和命令
  9. Javascript模块化编程:require.js的用法
  10. Xgboost中特征重要性计算方法详解
  11. 短信轰渣器搭建源码下载
  12. 软件生命周期和开发模型
  13. python中如何打开csv文件_Python如何读取csv文件
  14. 腾讯再次推出“王卡宽带”,大家觉得怎么样?
  15. 利用C语言绘制操作系统图像界面
  16. 7-2 养兔子 (20 分)
  17. 华为路由器hilink怎么用_HUAWEI HiLink 两个华为路由器无线中继实测效果【图解】...
  18. 渗透测试之攻击Windows认证
  19. 使用bat 批处理 创建自己的自启动软件
  20. 下载原版百度文库资料

热门文章

  1. 如何开发Android安卓APP读写NFC Ntag
  2. DOS MASM 安装
  3. 超级终端连接华为交换机_笔者为你win8系统使用超级终端连接华为交换机的图文方法...
  4. vue-cli搭建项目引入jquery和jquery-weui步骤详解
  5. 小程序推广引导下载app的解决办法
  6. python爬虫项目报告需求分析_网络爬虫需求分析报告
  7. PreferenceActivity(首选项设置页)
  8. 《深入浅出MFC》下载
  9. 只需8招,搞定Pandas数据筛选与查询
  10. 5行python代码帮你搞定百度文库复制问题