任务在调度执行中,由于某种原因未执行完毕,下一次调度任务触发后,在同一个Job实例中,会出现两个线程处理同一个分片上的数据,这样就会造成两个线程可能处理到相同的数据。为了避免同一条数据可能会被多次执行的问题,ElasticJob引入幂等机制,确保同一条数据不会再被多个Job同时处理,也避免同一条数据在同一个Job实例的多个线程处理。再重申一次ElastciJob的分布式是数据的分布式,一个任务在多个Job实例上运行,每个Job实例处理该Job的部分数据(数据分片)。

本文重点分析ElasticJob是如何做到如下两点的。

  1. ElasticJob如何确保在同一个Job实例中多个线程不会处理相同的数据。
  2. ElasticJob如何确保数据不会被多个Job实例处理。
    为了解决上述这种情况,ElasticJob引入任务错过补偿执行(misfire)与幂等机制(monitorExecution)

1、ElasticJob如何确保在同一个Job实例中多个线程不会处理相同的数据。

场景:例如任务调度周期为每5s执行一次,正常每次调度任务处理需要耗时2s,如果在某一段时间由于数据库压力变大,导致原本只需要2s就能处理完成的任务,现在需要16s才能运行,在这个数据处理的过程中,每5s又会触发一次调度(任务处理),如果不加以控制的话,在同一个实例上根据分片条件去查询数据库,查询到的数据有可能相同(部分相同),这样同一条任务数据将被多次运行,如果这个任务时处理转账业务,如果在业务方法不实现幂等,则会引发非常严重的问题,那ElasticJob是否可以避免这个问题呢?

答案是肯定。elasticJob提供了一个配置参数:monitorExecution=true,开启幂等性。

一个任务触发后,将执行任务处理逻辑,其入口:AbstractElasticJobExecutor#misfireIfRunning

if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {  // @1if (shardingContexts.isAllowSendJobEvent()) {  // @2jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, shardingContexts.getShardingItemParameters().keySet()));}return;
}

代码@1:在一个调度任务触发后如果上一次任务还未执行,则需要设置该分片状态为mirefire,表示错失了一次任务执行。

代码@2:如果该分片被设置为mirefire并开启了事件跟踪,将事件跟踪保存在数据库中。

接下来详细分析JobFacade.misfireIfRunning的实现逻辑:

/*** 如果当前分片项仍在运行则设置任务被错过执行的标记.* * @param items 需要设置错过执行的任务分片项* @return 是否错过本次执行*/public boolean misfireIfHasRunningItems(final Collection<Integer> items) {if (!hasRunningItems(items)) {return false;}setMisfire(items);return true;}

如果存在未完成的分片,则调用setMisfire(items)方法,ElasticJob在开启monitorExecution(true)【幂等机制】机制的情况下,在分片任务开始时会创建${namespace}/jobname/sharding/{item}/running节点,在任务结束后会删除该目录,所以在判断是否有分片正在运行时,只需判断是否存在上述节点即可。如果存在,调用setMisfire方法。

PS:如果ElasticJob为开启幂等(monitorExecution)的情况下,才会创建${namespace}/jobname/sharding
/{item}/running,misfire机制才能生效。

ExecutionService#setMisfire

/*** 设置任务被错过执行的标记.** @param items 需要设置错过执行的任务分片项*/public void setMisfire(final Collection<Integer> items) {for (int each : items) {jobNodeStorage.createJobNodeIfNeeded(ShardingNode.getMisfireNode(each));}}

设置misfire的方法为分配给该实例下的所有分片创建持久节点${namespace}/jobname/shading/{item}/misfire节点,注意,只要分配给该实例的任何一分片未执行完毕,则在该实例下的所有分片都增加misfire节点,然后忽略本次任务触发执行,等待任务结束后再执行。

AbstractElasticJobExecutor#execute

execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}

在任务执行完成后检查是否存在${namespace}/jobname/sharding/{item}/misfire节点,如果存在,则首先清除misfie相关的文件,然后执行任务。

ElasticJob的misfire实现方案总结:

在下一个调度周期到达之后,只要发现这个分片的任何一个分片正在执行,则为该实例分片的所有分片都设置为misfire,等任务执行完毕后,再统一执行下一次任务调度。

2、ElasticJob如何确保数据不会被多个Job实例处理

ElasticJob基于数据分片,不同分片根据分片参数(人为配置),从数据库中查询各自数据(任务数据分片),如果当节点宕机,数据会重新分片,如果任务未执行完成,然后执行分片,数据是否会被不同的任务同时处理呢?

答案是不会,因为当节点宕机后,是否需要重新分片事件监听器会监听到Job实例代表的节点删除,设置重新分片,在任务被调度执行具体处理逻辑之前,需要重新分片,重新分片的前提又是要所有的分片的任务全部执行完毕,这也依赖是否开启幂等控制(monitorExecution),如果开启,ElasticJob能感知正在执行处理逻辑的分片,重新分片需要等待当前所有任务全部运行完毕后才会触发,故不会存在不同节点处理相同数据的问题。

问答:
1、如果一个任务JOB的调度频率为每10s一次,在某个时间,该job执行耗时用了33s(平时只需执行5s),按照正常调度,应该后续会触发3次调度,那该job后执行完,会连续执行3次调度吗?
答案:在33s这次任务执行完成后,如果后面的任务执行在10s内执行完毕的话,只会触发一次,不会补偿3次,因为ElasticJob记录任务错失执行,只是创建了misfire节点,并不会记录错失的此时,因为也没这个必要。

源码分析ElasticJob任务错过机制(misfire)与幂等性相关推荐

  1. activeMQ的源码分析 -TCP通讯机制

    2019独角兽企业重金招聘Python工程师标准>>> activeMQ的源码分析 -TCP通讯机制 博客分类: MQ <IGNORE_JS_OP style="WO ...

  2. caffe源码分析--SyncedMemory 内存管理机制

    caffe源码分析–SyncedMemory 内存管理机制 ​ SyncedMemory 是caffe中用来管理内存分配和CPU.GPU数据及同步的类,只服务于Blob类.SyncedMemory 对 ...

  3. Struts2 源码分析——拦截器的机制

    本章简言 上一章讲到关于action代理类的工作.即是如何去找对应的action配置信息,并执行action类的实例.而这一章笔者将讲到在执行action需要用到的拦截器.为什么要讲拦截器呢?可以这样 ...

  4. JDK源码分析——Java的SPI机制分析与实战

    重点提示:在我博客中的所有的源码分析的实例,我都将会放到github上,感兴趣的朋友可以下载下来调试运行,我相信还是可以有所收获的.我的目的是让所有读到我博客的朋友都可以了解到有价值的东西,学习到ja ...

  5. 源码分析RocketMQ ACL实现机制

    有关RocketMQ ACL的使用请查看上一篇<RocketMQ ACL使用指南>,本文从源码的角度,分析一下RocketMQ ACL的实现原理. 备注:RocketMQ在4.4.0时引入 ...

  6. ceph源码分析--monitor的lease机制

    概述 在monitor节点中,存在着Leader和Peon两种角色.在monitor采用了一种lease机制,保证了副本在一定时间内可读写.同时lease机制也保证了整个集群中的monitor当前都是 ...

  7. Condition源码分析与等待通知机制,linux系统架构与运维实战pdf

    //后继节点 Node nextWaiter; 进一步说明,等待队列是一个单向队列,而在之前说AQS时知道同步队列是一个双向队列.接下来我们用一个demo,通过debug进去看是不是符合我们的猜想: ...

  8. mq补偿机制java代码_RocketMQ源码分析之消息消费机制-消费端消息负载均衡机制与重新分布 - Java 技术驿站-Java 技术驿站...

    1.消息消费需要解决的问题 首先再次重复啰嗦一下RocketMQ消息消费的一些基本元素的关系 主题 ---> 消息队列(MessageQueue) 1 对多 主题 ----> 消息生产者, ...

  9. squid源码分析4—coss存储机制分析

    分类: 1. Coss 文件系统概述 1.1 概述 循环目标存储机制(Cyclic Object Storage Scheme,coss)尝试为squid定制一个新的文件系统.在ufs基础的机制下,主 ...

  10. Spark源码分析之BlockManager通信机制

    BlockManagerMasterEndpoint主要用于向BlockManagerSlaveEndpoint发送消息,主要分析他们都接受哪些消息,接受到消息之后怎么处理? 一BlockManage ...

最新文章

  1. 学JS的心路历程Day28 - PixiJS -基础(二)
  2. 手把手玩转协同编辑(1):AST (Address Space Transformation)地址空间转换算法 基本介绍...
  3. [云炬创业基础笔记] 第四章测试9
  4. 《福布斯》:微软的印度未来
  5. ReScript 与 TypeScript,谁是前端圈的“当红辣子鸡”
  6. java dom4j 写xml文件_Java实现——Dom4j读写XML文件
  7. Intellij IDEA 安装插件 报 ‘plugin xxxx is incompatible‘ 解决方案
  8. mdt抓取镜像后只显示回收站_又涨了!废纸价格贵过废铜烂铁,回收站缩减废旧物品收购规模...
  9. C语言图书信息管理系统
  10. windows server 开机自动启动项
  11. 魔众刮刮卡抽奖系统 v2.0.0 支付抽奖,更好用的刮刮卡系统
  12. 用SETFOS模拟Tadf OLED和超荧光OLED
  13. 无人机云台电机用的是哪种?
  14. ultraos win10启动盘_ultraiso制作u盘启动盘教程图文详解
  15. 笔记——STM32串口USART收发数据。
  16. SVA(立即断言、并发断言、触发判断)-概述
  17. git 分支branch详解
  18. python科学计算试题及答案_高校邦Python科学计算章节答案
  19. php 数字 字母组合,php随机生成数字字母组合的方法
  20. php 红包牛牛,恭喜发财:牛牛红包钩针手机包(有编织说明)

热门文章

  1. 数据结构以及相关排序
  2. 【路由和交换之H3C自导自演】
  3. DDoSCoin:加密货币奖励用户参与 DDoS 攻击
  4. 五种 必须了解的CSS选择器
  5. apache camel file(二)
  6. MEF程序设计指南四:使用MEF声明导出(Exports)与导入(Imports)
  7. 国外 java 源码_将近100多个国外优秀Java程序员的编程源代码JAVA源码下载
  8. Perf -- Linux下的系统性能调优工具
  9. jsSIP-demo(完整源码加注释)
  10. 具有system权限的进程无法访问sdcard