每个作业都可以配置一个任务监听器,确切的说是只能配置一个本地监听器和一个分布式监听器。Elastic-job有三种作业类型,但是它们的通用配置都是一样的,所以本文在介绍作业的监听器配置时将仅以简单作业的配置为例。

本地监听器

本地监听器只在节点执行自己分片的时候调度,每个分片任务调度的时候本地监听器都会执行。本地监听器由ElasticJobListener接口定义,其定义如下:

/**
 * 弹性化分布式作业监听器接口.
 *
 * @author zhangliang
 */
public interface ElasticJobListener {/**
     * 作业执行前的执行的方法.
     *
     * @param shardingContexts 分片上下文
     */void beforeJobExecuted(final ShardingContexts shardingContexts);/**
     * 作业执行后的执行的方法.
     *
     * @param shardingContexts 分片上下文
     */void afterJobExecuted(final ShardingContexts shardingContexts);
}

该接口的接口方法的注释上已经说明了对应的接口方法的调用时机,详情也可以参考com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.execute()方法。简单示例如下:

public class MyElasticJobListener implements ElasticJobListener {private static final Logger LOGGER = Logger.getLogger(MyElasticJobListener.class);@Overridepublic void beforeJobExecuted(ShardingContexts shardingContexts) {LOGGER.info(String.format("开始调度任务[%s]", shardingContexts.getJobName()));}@Overridepublic void afterJobExecuted(ShardingContexts shardingContexts) {LOGGER.info(String.format("任务[%s]调度完成", shardingContexts.getJobName()));}}

本地监听器的配置由<job:listener/>节点配置,如下示例中就通过<job:listener/>给简单作业myElasticJob定义了一个本地监听器。

<bean id="simpleJob" class="com.elim.learn.elastic.job.MyElasticJob"/>
<job:simple id="myElasticJob" job-ref="simpleJob"registry-center-ref="regCenter" cron="0/30 * * * * ?"sharding-total-count="6" sharding-item-parameters="0=A,1=B,2=C,3=D,4=E,5=F"failover="true" overwrite="true" ><job:listener class="com.elim.learn.elastic.job.listener.MyElasticJobListener" />
</job:simple>

分布式监听器

本地监听器在作业执行本地的分片任务时会执行,如上面的示例,我们的作业被分成了6片,则监听器任务会执行6次。而分布式监听器会在总的任务开始执行时执行一次,在总的任务结束执行时执行一次。分布式监听器也是在普通监听器的基础上实现的,由AbstractDistributeOnceElasticJobListener抽象类封装的,其实现了ElasticJobListener接口。要实现自己的监听器只需要继承AbstractDistributeOnceElasticJobListener抽象类,实现其中的抽象方法即可。AbstractDistributeOnceElasticJobListener抽象类的定义如下:

/**
 * 在分布式作业中只执行一次的监听器.
 *
 * @author zhangliang
 */
public abstract class AbstractDistributeOnceElasticJobListener implements ElasticJobListener {private final long startedTimeoutMilliseconds;private final Object startedWait = new Object();private final long completedTimeoutMilliseconds;private final Object completedWait = new Object();@Setterprivate GuaranteeService guaranteeService;private TimeService timeService = new TimeService();public AbstractDistributeOnceElasticJobListener(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) {if (startedTimeoutMilliseconds <= 0L) {this.startedTimeoutMilliseconds = Long.MAX_VALUE;} else {this.startedTimeoutMilliseconds = startedTimeoutMilliseconds;}if (completedTimeoutMilliseconds <= 0L) {this.completedTimeoutMilliseconds = Long.MAX_VALUE; } else {this.completedTimeoutMilliseconds = completedTimeoutMilliseconds;}}@Overridepublic final void beforeJobExecuted(final ShardingContexts shardingContexts) {guaranteeService.registerStart(shardingContexts.getShardingItemParameters().keySet());if (guaranteeService.isAllStarted()) {doBeforeJobExecutedAtLastStarted(shardingContexts);guaranteeService.clearAllStartedInfo();return;}long before = timeService.getCurrentMillis();try {synchronized (startedWait) {startedWait.wait(startedTimeoutMilliseconds);}} catch (final InterruptedException ex) {Thread.interrupted();}if (timeService.getCurrentMillis() - before >= startedTimeoutMilliseconds) {guaranteeService.clearAllStartedInfo();handleTimeout(startedTimeoutMilliseconds);}}@Overridepublic final void afterJobExecuted(final ShardingContexts shardingContexts) {guaranteeService.registerComplete(shardingContexts.getShardingItemParameters().keySet());if (guaranteeService.isAllCompleted()) {doAfterJobExecutedAtLastCompleted(shardingContexts);guaranteeService.clearAllCompletedInfo();return;}long before = timeService.getCurrentMillis();try {synchronized (completedWait) {completedWait.wait(completedTimeoutMilliseconds);}} catch (final InterruptedException ex) {Thread.interrupted();}if (timeService.getCurrentMillis() - before >= completedTimeoutMilliseconds) {guaranteeService.clearAllCompletedInfo();handleTimeout(completedTimeoutMilliseconds);}}private void handleTimeout(final long timeoutMilliseconds) {throw new JobSystemException("Job timeout. timeout mills is %s.", timeoutMilliseconds);}/**
     * 分布式环境中最后一个作业执行前的执行的方法.
     *
     * @param shardingContexts 分片上下文
     */public abstract void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts);/**
     * 分布式环境中最后一个作业执行后的执行的方法.
     *
     * @param shardingContexts 分片上下文
     */public abstract void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts);/**
     * 通知任务开始.
     */public void notifyWaitingTaskStart() {synchronized (startedWait) {startedWait.notifyAll();}}/**
     * 通知任务结束.
     */public void notifyWaitingTaskComplete() {synchronized (completedWait) {completedWait.notifyAll();}}
}

以下是一个使用分布式监听器的示例:

public class MyDistributeOnceElasticJobListener extends AbstractDistributeOnceElasticJobListener {private static final Logger logger = Logger.getLogger(MyDistributeOnceElasticJobListener.class);/**
     * @param startedTimeoutMilliseconds
     * @param completedTimeoutMilliseconds
     */public MyDistributeOnceElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);}@Overridepublic void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {logger.info("分布式监听器开始……");}@Overridepublic void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {logger.info("分布式监听器结束……");}}

分布式监听器用到了锁的等待和通知,startedTimeoutMilliseconds和completedTimeoutMilliseconds分别用来指定作业开始前和完成后的对应的锁等待最大超时时间。分布式监听器由<job:distributed-listener/>,以下是一个使用分布式监听器的示例:

<bean id="simpleJob" class="com.elim.learn.elastic.job.MyElasticJob"/>
<job:simple id="myElasticJob" job-ref="simpleJob"registry-center-ref="regCenter" cron="0/30 * * * * ?"sharding-total-count="6" sharding-item-parameters="0=A,1=B,2=C,3=D,4=E,5=F"failover="true" overwrite="true" ><job:distributed-listener class="com.elim.learn.elastic.job.listener.MyDistributeOnceElasticJobListener" started-timeout-milliseconds="100" completed-timeout-milliseconds="100"/>
</job:simple>

(本文由Elim写于2017年10月2日)

elastic-job之监听器相关推荐

  1. AWS elastic load balancer里的监听器certificate设置

    要获取更多Jerry的原创文章,请关注公众号"汪子熙":

  2. Elastic Search Java API(文档操作API、Query DSL查询API)、es搜索引擎实战demo

    elastic search实战小demo:https://github.com/simonsfan/springboot-quartz-demo,分支:feature_es 之前在 Elastic ...

  3. Elastic Job 入门教程(三)— 作业监听

    接上一篇:Elastic Job 入门教程(二)- Spring Boot框架下是实现Elastic Job 脚本作业(Script Job),本章我们讨论作业Job的监听. 定义监听器 @Compo ...

  4. Elastic实战:canal自定义客户端,实现mysql多表同步到es

    0. 引言 我们之前讲解了利用canal实现无代码入侵的同步mysql数据到elasticsearch,并且讲解了主子表数据如何同步. 通过canal1.1.5实现mysql8.0数据增量/全量同步到 ...

  5. 2021年大数据ELK(一):集中式日志协议栈Elastic Stack简介

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 一.简介 二.ELK 协议栈介绍及体系结构 三.集中式日志协议栈 ...

  6. jemeter监听器的使用

    打开jemeter,新建线程组,添加http请求,在请求下添加监听器: 一.添加一个jp@gc - PerfMon Metrics Collector监听器: 服务器性能监测控件,包括CPU,memo ...

  7. 【Elastic Stack(一)】Elastic Stack简介

    如果你没有听说过Elastic Stack,那你一定听说过ELK.实际上ELK是三款软件的简称,分别是Elasticsearch.Logstash.Kibana组成,在发展的过程中,又有新成员Beat ...

  8. Servlet,过滤器,监听器,拦截器的区别

    由于最近两个月工作比较清闲,个人也比较"上进",利用工作空余时间,也继续学习了一下,某天突然想起struts2和struts1的区别的时 候,发现为什么struts1要用servl ...

  9. 安卓开发|自定义监听器的三个步骤

    首先,要有一个实现View.OnClickListener接口的类 这个类要重写onClick(View v)这个方法,里面加入想要触发的事件 将监听器绑定在要监听的组件上,例如 holder.ver ...

最新文章

  1. python里的tplt什么意思 Python的format格式化输出
  2. pandas describe函数_PANDAS: 新手教程 一
  3. 网工面对HR应该提前做好哪些准备?
  4. 互联网晚报 | 11月13日 星期六 | 腾讯、抖音商谈对等开放;荣耀电商平台粉丝破亿;碧桂园杨惠妍第九次成为中国女首富...
  5. map和foreach的区别和应用场景_说说session和cookie区别与主要应用场景,localStorage的特点...
  6. 【图论】二分图学习笔记
  7. yandex alice语音开发
  8. 爱看小说网源码全站带数据打包ThinkPHP内核小说网站源码
  9. 基于OCR的身份证要素提取-2019
  10. 搭建微商城平台,新商云提供源码部署只需3分钟
  11. 多线程m3u8下载器 v1.0
  12. HMC5883L指南针罗盘模块连接arduino使用的注意事项
  13. 硬件电路设计之按键消抖(利用瞬态抑制二极管TVS加低通滤波器)
  14. 前端开发技术-promise是什么?
  15. mysql error unpacking_error: unpacking of archive failed on file错误的解决
  16. Hutool XML 转JSON 后 parseArray踩坑
  17. 计算机无法启用媒体流,win10电脑媒体流启动不了怎么办_win10电脑媒体流无法启动解决方法-win7之家...
  18. jzoj4271. 【NOIP2015模拟10.27】魔法阵
  19. 【论文阅读】社交网络传播最大化问题-03
  20. 记录———Android开发常用依赖库

热门文章

  1. 维谛技术(Vertiv):场景驱动的边缘计算
  2. 国足淘“泰”胜不足喜 战伊朗不能全靠里皮
  3. Python: SQLAlchemy 打印 SQL语句
  4. CSDN创始人蒋涛:“重应用轻生态”的AI开源模式非长久之计
  5. 360搜索引擎数据抓取
  6. JS 选择性修改input标签的属性
  7. 如何拍背景虚化的照片_3大方法,教你拍出虚化背景的拍摄技巧效果
  8. nslookup默认服务器修改,Windows Server 2008 R2 域控服务器运行nslookup命令默认服务器显示 UnKnown...
  9. 【KEIL5】是时候给你的Keil换个好看的皮肤了(MDK)
  10. JAVA中方法覆盖,看这篇就够了