elastic-job 新手指南

大多数情况下,定时任务我们一般使用quartz开源框架就能满足应用场景。但如果考虑到健壮性等其它一些因素,就需要自己下点工夫,比如:要避免单点故障,至少得部署2个节点吧,但是部署多个节点,又有其它问题,有些数据在某一个时刻只能处理一次,比如 i = i+1 这些无法保证幂等的操作,run多次跟run一次,完全是不同的效果。

对于上面的问题,我曾经自行设计过一个基于zk分布式锁的解决方案:

1、每类定时job,可以分配一个独立的标识(比如:xxx_job)

2、这类job的实例,部署在多个节点上时,每个节点启动前,向zk申请一个分布式锁(在xxx_job节点下)

3、拿到锁的实例,才允许启动定时任务(通过代码控制quartz的schedule),没拿到锁的,处于standby状态,一直监听锁的变化

4、如果某个节点挂了,分布式锁自动释放,其它节点这时会抢到锁,按上面的逻辑,就会从standby状态,转为激活状态,小三正式上位,继续执行定时job。

这个方案,基本上解决了HA和业务正确性的问题,但是美中不足的地方有2点:

1、无法充分利用机器性能,处于standby的节点,实际上只是一个备胎,平时啥也不干

2、性能不方便扩展,比如:某个job一次要处理上千万的数据,仅1个激活节点,要处理很久

好了,前戏铺垫了这么久,该请主角登场了,elastic-job相当于quartz+zk的加强版,它允许对定时任务分片,可以集群部署(每个job的"分片"会分散到各个节点上),如果某个节点挂了,该节点上的分片,会调度到其它节点上。官网上有比较详细的教程,一般情况下,使用SimpleJob这种就可以了。

使用步骤:

前提:要先添加下面二个jar的依赖

1

2

compile "com.dangdang:elastic-job-lite-core:2.1.5"

compile "com.dangdang:elastic-job-lite-spring:2.1.5" 

1、自己的job要继承自SimpleJob,然后实现void execute(ShardingContext shardingContext)。

1

2

3

4

5

6

7

8

9

public interface SimpleJob extends ElasticJob {

    

    /**

     * 执行作业.

     *

     * @param shardingContext 分片上下文

     */

    void execute(ShardingContext shardingContext);

}

注意这里面有一个shardingContext参数,看下源码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

/**

 * 分片上下文.

 *

 * @author zhangliang

 */

@Getter

@ToString

public final class ShardingContext {

    

    /**

     * 作业名称.

     */

    private final String jobName;

    

    /**

     * 作业任务ID.

     */

    private final String taskId;

    

    /**

     * 分片总数.

     */

    private final int shardingTotalCount;

    

    /**

     * 作业自定义参数.

     * 可以配置多个相同的作业, 但是用不同的参数作为不同的调度实例.

     */

    private final String jobParameter;

    

    /**

     * 分配于本作业实例的分片项.

     */

    private final int shardingItem;

    

    /**

     * 分配于本作业实例的分片参数.

     */

    private final String shardingParameter;

    

    public ShardingContext(final ShardingContexts shardingContexts, final int shardingItem) {

        jobName = shardingContexts.getJobName();

        taskId = shardingContexts.getTaskId();

        shardingTotalCount = shardingContexts.getShardingTotalCount();

        jobParameter = shardingContexts.getJobParameter();

        this.shardingItem = shardingItem;

        shardingParameter = shardingContexts.getShardingItemParameters().get(shardingItem);

    }

}

这里面有2个很重要的属性:shardingTotalCount 分片总数(比如:2)、shardingItem 当前分片索引(比如:1),前面提到的性能扩容,就可以根据2个参数进行简单的处理,假设在电商系统中,每天晚上有个定时任务,要统计每家店的销量。商家id一般在表设计上是一个自增数字,如果总共2个分片(注:通常也就是部署2个节点),可以把 id为奇数的放到分片0,id为偶数的放到分片1,这样2个机器各跑一半,相对只有1台机器而言,就快多了。

伪代码如下:

1

2

3

4

5

6

7

8

9

10

11

public class TestJob implements SimpleJob {

    @Override

    public void execute(ShardingContext shardingContext) {

        int shardIndx = shardingContext.getShardingItem();

        if (shardIndx == 0) {

            //处理id为奇数的商家

        else {

            //处理id为偶数的商家

        }

    }

}

这个还可以进一步简化,如果使用mysql查询商家列表,mysql中有一个mod函数,直接可以对商家id进行取模运算

1

select from shop where mod(shop_id,2)=0

如果把上面的2、0换成参数,mybatis中类似这样:

1

select from shop where mod(shop_id,#{shardTotal})=#{shardIndex}

这样逻辑就转换到sql中处理了,java代码中把参数传进来就行,连if都可以省掉。

2、接下来看看如何配置

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"

       xmlns:job="http://www.dangdang.com/schema/ddframe/job"

       xsi:schemaLocation="http://www.springframework.org/schema/beans

       http://www.springframework.org/schema/beans/spring-beans.xsd

       http://www.dangdang.com/schema/ddframe/reg

       http://www.dangdang.com/schema/ddframe/reg/reg.xsd

       http://www.dangdang.com/schema/ddframe/job

       http://www.dangdang.com/schema/ddframe/job/job.xsd">

    <reg:zookeeper id="regCenter" server-lists="${zk_address}" namespace="my-xxx-job"

                   base-sleep-time-milliseconds="1000"

                   max-sleep-time-milliseconds="3000" max-retries="3"/>

    <job:simple id="xxxJob" class="com.cnblogs.yjmyzz.XXXJob" registry-center-ref="regCenter"

                cron="${xxxJob_cornExpress}"

                sharding-total-count="2" sharding-item-parameters="0=A,1=B"/>

                ...

</beans>

与常规的spring配置几乎没啥区别,几个要点如下:

a) 因为分片调度是基于zk的,所以要先配置zk注册中心,其中${zk_address}大家可以改成实际的zk地址列表,比如:10.x.x.1:2181,10.x.x.2:2181,10.x.x.3:2181 

b) 每个job中的corn属性,就是quartz中的cornExpress表达式,然后sharding-total-count即总分片数,而sharding-item-parameters则是指定每个分片中的具体参数

(注:刚才的电商每天晚上算销量,这个case其实只用到了分片索引、分片数,并不需要参数,所以这里随便配置一个类似0=A, 1=B就可以了,如果有些业务场景,希望在知道分片索引的同时,还希望额外传一些参数进来,就可以在这里配置你希望的参数,然后在execute中,也能读到相应的参数)

3、控制台

elastic-job还提供了一个不错的UI控制台,项目源代码git clone到本地,mvn install就能得到一个elastic-job-lite-console-${version}.tar.gz的包,解压,然后运行里面的bin/start.sh 就能跑起来,界面类似如下:

通过这个控制台,可以动态调整每个定时任务的触发时间(即:cornExpress)。详情可参考官网文档-运维平台部分。

4、与spring-cloud/spring-boot的整合

如果是传统的spring项目,按上面的步骤就可以无缝整合了,如果是spring-cloud/spring-boot,则稍微要复杂点。

由于spring-boot倡导零xml配置,所以大部分配置就用代码替代了,先定义一个elasticJob的配置类:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

@Data

@Configuration

public class ElasticJobConfig {

    @Value("${rxQuartz.app.zkAddress}")

    private String zkNodes;

    @Value("${rxQuartz.app.namespace}")

    private String namespace;

    @Bean

    public ZookeeperConfiguration zkConfig() {

        return new ZookeeperConfiguration(zkNodes, namespace);

    }

    @Bean(initMethod = "init")

    public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config) {

        return new ZookeeperRegistryCenter(config);

    }

}

上面这段代码,主要是解决zk注册中心的注入问题,然后各种xxxJob,由于要让spring自动注入,需要打上component注解

1

2

3

4

5

@Component("xxxJob")

public class XXXJob extends AbstractJob {

   

    ...

}

然后在真正要用的地方,把他们组装起来

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

import com.dangdang.ddframe.job.api.simple.SimpleJob;

import com.dangdang.ddframe.job.config.JobCoreConfiguration;

import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;

import com.dangdang.ddframe.job.lite.api.JobScheduler;

import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;

import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;

import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

 * @author: yangjunming

 */

@Configuration

public class ElasticJobs {

    @Autowired

    @Qualifier("xxxJob")

    public SimpleJob xxxJob;

   

    @Autowired

    private ZookeeperRegistryCenter regCenter;

    @Bean(initMethod = "init")

    public JobScheduler settlementJobScheduler(@Autowired @Qualifier("xxxJob") SimpleJob simpleJob,

                                               @Value("${xxxJob.billCronExpress}"final String cron,

                                               @Value("${xxxJob.shardingCount}"int shardingCount,

                                               @Value("${xxxJob.shardingItemParameters}") String shardingItemParameters) {

        return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingCount, shardingItemParameters));

    }

    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {

        return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(

                jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();

    }

}

大功告成,祝大家周末撸码愉快!

作者:菩提树下的杨过
出处:http://yjmyzz.cnblogs.com
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

elastic-job 新手指南相关推荐

  1. 2021年大数据ELK(一):集中式日志协议栈Elastic Stack简介

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 一.简介 二.ELK 协议栈介绍及体系结构 三.集中式日志协议栈 ...

  2. 【Elastic Stack(一)】Elastic Stack简介

    如果你没有听说过Elastic Stack,那你一定听说过ELK.实际上ELK是三款软件的简称,分别是Elasticsearch.Logstash.Kibana组成,在发展的过程中,又有新成员Beat ...

  3. 无导师学习_如何找到一位导师并加快学习速度:新手指南。

    无导师学习 by Victor Cassone 由Victor Cassone 如何找到一位导师并加快学习速度:新手指南. (How to find a mentor and accelerate y ...

  4. Elastic Job从单点到高可用、同城主备、同城双活

    以下文章来源方志朋的博客,回复"666"获面试宝典 来源:https://jaskey.github.io/blog/2020/05/25/elastic-job-timmer-a ...

  5. Elastic Search 介绍和基本概念

    Elastic Search 特点 Elastic Search 可能是是当下最火的搜索引擎中间件了.为什么这么火呢?主要是因为他有几大绝艺: 快速.无论什么时候,你需要向 ES 查询你的数据,都能够 ...

  6. R语言glmnet交叉验证选择(alpha、lambda)拟合最优elastic回归模型:弹性网络(elasticNet)模型选择最优的alpha值、模型最优的lambda值,最终模型的拟合与评估

    R语言glmnet交叉验证选择(alpha参数和lambda参数)拟合最优elastic回归模型:弹性网络(elasticNet)模型选择最优的alpha值.弹性网络(elasticNet)模型最优的 ...

  7. elastic stack中的Beats是什么?

    elastic stack中的Beats是什么? elastic stack栈涉及到以下⼏个组件 beats:⽤于轻量级⽇志采集,⽀持⽂件采集,系统数据采集,特定中间件数据采集等 logstash:⽤ ...

  8. AWS EBS是 Elastic Block Store 的简写

    Persistent storage volumes for your data using Amazon Elastic Block Store (Amazon EBS), known as Ama ...

  9. 卷积神经网络新手指南之二

    导语:本文将进一步探讨有关卷积神经网络的更多细节. 卷积神经网络新手指南之二 引言 本文将进一步探讨有关卷积神经网络的更多细节,注:以下文章中部分内容较为复杂,为了保证其简明性,部分内容详细解释的研究 ...

  10. 卷积神经网络(CNN)新手指南

    导语:带你了解近年来相当火爆的卷积神经网络(Convolutional Neural Network,CNN) 卷积神经网络(Convolutional Neural Network,CNN)新手指南 ...

最新文章

  1. python模块之imghdr检测图片类型
  2. java字符串的用法_Java字符串的重要方法的使用实例
  3. ubuntu 拷贝文件
  4. 华为搭载鸿蒙系统的手表,华为手表Watch 3即将发布:搭载鸿蒙系统
  5. 32 - II. 从上到下打印二叉树 II
  6. python开发环境anaconda3_Python环境管理(Anaconda3)
  7. 支付宝WAP支付接口开发
  8. leetcode string
  9. 云服务器上搭建hadoop伪分布式环境
  10. matlab生成16进制正弦波表
  11. 破解隔壁wifi的实践——网络攻击,抓取握手包,解包
  12. 树莓派小车C语言循迹,自动循迹小车_单片机/STM32/树莓派/Arduino/开发板创意项目-聚丰项目-电子发烧友网...
  13. 【等保知识】等保测评机构申请条件,所需资料以及流程
  14. AndroidQ文件存储适配
  15. Python彩色图片转成黑白图片
  16. 字符串(字符串的拼接及一些常用方法)
  17. 复赛名单公布!2022隐私计算HACKATHON大赛火热进行中!
  18. 用C语言写学生成绩管理系统
  19. 8位寄存器置位与清零操作
  20. linux超薄笔记本推荐,2016超薄笔记本买什么好

热门文章

  1. Java_一致性哈希算法与Java实现
  2. linux0.11内核完全注释读书笔记
  3. sessionFactory.getCurrentSession()的引出
  4. Java语法总结 - 方法
  5. 微软给中国学生的特权:免费使用微软软件
  6. proc文件系统实现用户空间与内核空间的数据通信
  7. 使用docker环境编译驱动
  8. 为线程命名——prctl
  9. Linux内核多线程(五)
  10. android studio 顶部导航栏_移动端控件(五)-标签导航(Tabs)和分段控件(Segmented Controls)...