需求

  • 服务使用集群部署(多Pod)
  • 基础服务提供调度任务注册,删除,查看的功能
  • 尽可能减少客户端的使用成本
  • 开发工作量尽可能少,成本尽可能小

基于以上的需求,设计如下,调度中心非独立部署,集成在base服务中。客户端目前属于同一个项目,直接使用公共模块的代码,非sdk使用。

客户端接入调度中心,只需2步。

  1. 使用公共模块的服务,调度任务注册
  2. 实现公共模块的job接口,注册中心会按照客户端提供信息,触发任务
  3. 服务端与客户端的交互使用http通讯,由k8s提供的域名调用,路由逻辑由k8s ingress 提供(默认多pod,循环调用)
  4. 服务端不保证客户端执行结束,只保证调度任务正确触发,客户端任务为异步执行

实现demo如下

  • springboot,依赖
    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-quartz</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

yaml 配置

server:port: 8082# 应用名称
spring:application:name: quartz-demoquartz:job-store-type: jdbcjdbc:# 是否自动初始化quartz的表结构initialize-schema: never#相关属性配置properties:org:quartz:jobStore:# 使用的数据源名称dataSource: quartzDataSource# 设置为“true”以打开群集功能,多个quartz实力必须打开isClustered: trueclass: org.quartz.impl.jdbcjobstore.JobStoreTXtablePrefix: QRTZ_# 标准jdbc数据库代理driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegatedatasource:driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=falseusername: rootpassword: 123456

客户端主要使用以下的代码

job 接口

package com.lq.quartzdemo.controller;/*** @author seven* @version 1.0* @description 公共job* @date 2022/9/11 14:33*/
public interface CommonJob {/*** JOB 的名字** @return*/String jobName();/*** 执行job** @param param* @return*/Object exec(Object param);
}

job 工厂

package com.lq.quartzdemo.controller;import com.google.common.util.concurrent.ThreadFactoryBuilder;
import jdk.internal.org.objectweb.asm.tree.TryCatchBlockNode;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;/*** @author seven* @version 1.0* @description job factory* @date 2022/9/11 14:37*/
@Component
@Log4j2
public class CommonJobFactory implements ApplicationContextAware {// 任务映射private static Map<String, CommonJob> jobNameMapping = new HashMap<>();// 线程池private static ExecutorService threadPool;@Value("${schedule.core_pool_size:5}")private Integer CORE_POOL_SIZE;@Value("${schedule.max_pool_size:20}")private Integer MAX_POOL_SIZE;@Value("${schedule.queue_size:10000}")private Integer QUEUE_SIZE;public static String run(String jobName, Object param) {CompletableFuture.supplyAsync(() -> {final CommonJob commonJob = jobNameMapping.get(jobName);if (null == commonJob) {throw new RuntimeException("job not exist,please check jobName");}return commonJob.exec(param);}, threadPool).exceptionally(e -> {log.error("job exec fail,jobName={},param={},e={}", jobName, param == null ? "" : param.toString(), e);return null;});return "ack ok";}public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.initJobNameMapping(applicationContext);this.initThreadPool();}public void initThreadPool() {log.info("start CommonJobFactory.initThreadPool...");ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("batch-save-nodes-%d").build();threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE), threadFactory, new ThreadPoolExecutor.AbortPolicy());log.info("end CommonJobFactory.initThreadPool...core_size={},max_size={},queue_size={}",CORE_POOL_SIZE, MAX_POOL_SIZE, QUEUE_SIZE);}public void initJobNameMapping(ApplicationContext applicationContext) {log.info("start CommonJobFactory.initJobNameMapping...");final Collection<CommonJob> values = applicationContext.getBeansOfType(CommonJob.class).values();for (CommonJob commonJob : values) {if (StringUtils.isEmpty(commonJob.jobName())) {log.error("job name is not null");throw new JobNameException("");}if (jobNameMapping.containsKey(commonJob.jobName())) {log.error("job name is repeat,{} and {} has the same job name [{}]",commonJob.getClass().getName(), jobNameMapping.get(commonJob.jobName()).getClass().getName(), commonJob.jobName());throw new JobNameException("");}jobNameMapping.put(commonJob.jobName(), commonJob);}log.info("end CommonJobFactory.initJobNameMapping...job_size={},jobName={}", jobNameMapping.size(), jobNameMapping.keySet());}
}

调度执行controller

package com.lq.quartzdemo.controller;import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author seven* @version 1.0* @description 公共执行逻辑模块* @date 2022/9/11 14:16*/
@RestController
@RequestMapping("common-task")
public class CommonScheController {/*** 任务执行* @param taskName* @param param*/@PostMapping("{taskName}")public String taskExec(@PathVariable("taskName") String taskName, Object param) {return CommonJobFactory.run(taskName,param);}
}

任务提交、修改、删除

package com.lq.quartzdemo.task;import lombok.Data;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;import java.util.Date;/*** @author seven* @version 1.0* @description 定时任务提交器* @date 2022/9/11 14:06*/
@Component
public class ScheTaskCommit {private RestTemplate restTemplate = new RestTemplate();@Datapublic static class ScheTaskDto {// 系统名private String appname;// 任务名private String taskName;// corn 表达式private String cronExpression;// corn 开始时间private Date startTime;// corn 结束时间private Date endTime;// 触发地址 域名private String triggerAddr;// 触发参数private String triggerParam;}public void commit(ScheTaskDto scheTaskDto) {final String s = restTemplate.postForObject("http://localhost:8082/taskReceiver/register", scheTaskDto, String.class);System.out.println(s);}public void remove(String appName, String taskName) {restTemplate.delete("http://localhost:8082/taskReceiver/remove/" + appName + "/" + taskName);}
}

服务端的代码

任务接收

package com.lq.quartzdemo.controller;import com.lq.quartzdemo.task.CommonTask;
import com.lq.quartzdemo.task.QuartzManager;
import com.lq.quartzdemo.task.ScheTaskCommit;
import org.quartz.JobDataMap;
import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;/*** @author seven* @version 1.0* @description base 的调度任务接收controller* @date 2022/9/11 15:57*/
@RestController
@RequestMapping("taskReceiver")
public class BaseTaskReceiver {@Resourceprivate QuartzManager quartzManager;@PutMapping("register")public void registerTask(ScheTaskCommit.ScheTaskDto request){JobDataMap jobDataMap=new JobDataMap();jobDataMap.put("beginTime",request.getStartTime());jobDataMap.put("endTime",request.getEndTime());quartzManager.addJob(request.getAppname(), request.getTaskName(),CommonTask.class,request.getCronExpression(),jobDataMap);}@DeleteMapping("{appName}/{taskName}")public void deleteTask(@PathVariable("appName") String appName,@PathVariable("taskName") String taskName){quartzManager.removeJob(taskName,appName);}@GetMapping("{appName}")public Object tasks(@PathVariable("appName") String appName){return null;}
}

任务管理器

package com.lq.quartzdemo.task;import org.quartz.*;
import org.quartz.impl.triggers.CronTriggerImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.HashSet;
import java.util.Set;import static org.quartz.CronScheduleBuilder.cronSchedule;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.TriggerBuilder.newTrigger;/*** @author seven* @version 1.0* @description 任务管理器* @date 2022/9/11 11:03*/
@Component
public class QuartzManager {@Autowiredprivate Scheduler sched;/*** 添加任务** @param groupName 组名* @param jobName   任务名* @param cls       class文件*/@SuppressWarnings("unchecked")public void addJob(String groupName, String jobName, @SuppressWarnings("rawtypes") Class cls, String time,JobDataMap jobDataMap) {try {//任务JobDetail jobDetail = newJob(cls).withIdentity(jobName, groupName).storeDurably().usingJobData(jobDataMap).build();// 触发器TriggerBuilder<Trigger> triggerTriggerBuilder = newTrigger();if (jobDataMap.get("beginTime") != null && jobDataMap.get("endTime") != null) {Date beginTime = (Date) jobDataMap.get("beginTime");Date endTime = (Date) jobDataMap.get("endTime");triggerTriggerBuilder.startAt(beginTime).endAt(endTime);}CronTriggerImpl trigger = (CronTriggerImpl) triggerTriggerBuilder.withIdentity(groupName + jobName, groupName).withSchedule(cronSchedule(time).withMisfireHandlingInstructionDoNothing()).build();Set<Trigger> set = new HashSet();set.add(trigger);sched.scheduleJob(jobDetail, set,true);// 启动if (!sched.isShutdown()) {sched.start();}} catch (SchedulerException e) {e.printStackTrace();}}/*** @param jobName* @param jobGroupName* @Description: 移除一个任务*/public void removeJob(String jobName, String jobGroupName) {try {TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);sched.pauseTrigger(triggerKey);// 停止触发器sched.unscheduleJob(triggerKey);// 移除触发器sched.deleteJob(JobKey.jobKey(jobName, jobGroupName));// 删除任务} catch (Exception e) {throw new RuntimeException(e);}}
}

在数据库中创建quartz提供的表

#
# Quartz seems to work best with the driver mm.mysql-2.0.7-bin.jar
#
# PLEASE consider using mysql with innodb tables to avoid locking issues
#
# In your Quartz properties file, you'll need to set
# org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
DROP TABLE IF EXISTS QRTZ_LOCKS;
DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
DROP TABLE IF EXISTS QRTZ_CALENDARS;CREATE TABLE QRTZ_JOB_DETAILS(SCHED_NAME VARCHAR(120) NOT NULL,JOB_NAME  VARCHAR(200) NOT NULL,JOB_GROUP VARCHAR(200) NOT NULL,DESCRIPTION VARCHAR(250) NULL,JOB_CLASS_NAME   VARCHAR(250) NOT NULL,IS_DURABLE VARCHAR(1) NOT NULL,IS_NONCONCURRENT VARCHAR(1) NOT NULL,IS_UPDATE_DATA VARCHAR(1) NOT NULL,REQUESTS_RECOVERY VARCHAR(1) NOT NULL,JOB_DATA BLOB NULL,PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
);CREATE TABLE QRTZ_TRIGGERS(SCHED_NAME VARCHAR(120) NOT NULL,TRIGGER_NAME VARCHAR(200) NOT NULL,TRIGGER_GROUP VARCHAR(200) NOT NULL,JOB_NAME  VARCHAR(200) NOT NULL,JOB_GROUP VARCHAR(200) NOT NULL,DESCRIPTION VARCHAR(250) NULL,NEXT_FIRE_TIME BIGINT(13) NULL,PREV_FIRE_TIME BIGINT(13) NULL,PRIORITY INTEGER NULL,TRIGGER_STATE VARCHAR(16) NOT NULL,TRIGGER_TYPE VARCHAR(8) NOT NULL,START_TIME BIGINT(13) NOT NULL,END_TIME BIGINT(13) NULL,CALENDAR_NAME VARCHAR(200) NULL,MISFIRE_INSTR SMALLINT(2) NULL,JOB_DATA BLOB NULL,PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)
);CREATE TABLE QRTZ_SIMPLE_TRIGGERS(SCHED_NAME VARCHAR(120) NOT NULL,TRIGGER_NAME VARCHAR(200) NOT NULL,TRIGGER_GROUP VARCHAR(200) NOT NULL,REPEAT_COUNT BIGINT(7) NOT NULL,REPEAT_INTERVAL BIGINT(12) NOT NULL,TIMES_TRIGGERED BIGINT(10) NOT NULL,PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);CREATE TABLE QRTZ_CRON_TRIGGERS(SCHED_NAME VARCHAR(120) NOT NULL,TRIGGER_NAME VARCHAR(200) NOT NULL,TRIGGER_GROUP VARCHAR(200) NOT NULL,CRON_EXPRESSION VARCHAR(200) NOT NULL,TIME_ZONE_ID VARCHAR(80),PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);CREATE TABLE QRTZ_SIMPROP_TRIGGERS(SCHED_NAME VARCHAR(120) NOT NULL,TRIGGER_NAME VARCHAR(200) NOT NULL,TRIGGER_GROUP VARCHAR(200) NOT NULL,STR_PROP_1 VARCHAR(512) NULL,STR_PROP_2 VARCHAR(512) NULL,STR_PROP_3 VARCHAR(512) NULL,INT_PROP_1 INT NULL,INT_PROP_2 INT NULL,LONG_PROP_1 BIGINT NULL,LONG_PROP_2 BIGINT NULL,DEC_PROP_1 NUMERIC(13,4) NULL,DEC_PROP_2 NUMERIC(13,4) NULL,BOOL_PROP_1 VARCHAR(1) NULL,BOOL_PROP_2 VARCHAR(1) NULL,PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);CREATE TABLE QRTZ_BLOB_TRIGGERS(SCHED_NAME VARCHAR(120) NOT NULL,TRIGGER_NAME VARCHAR(200) NOT NULL,TRIGGER_GROUP VARCHAR(200) NOT NULL,BLOB_DATA BLOB NULL,PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);CREATE TABLE QRTZ_CALENDARS(SCHED_NAME VARCHAR(120) NOT NULL,CALENDAR_NAME  VARCHAR(200) NOT NULL,CALENDAR BLOB NOT NULL,PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)
);CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS(SCHED_NAME VARCHAR(120) NOT NULL,TRIGGER_GROUP  VARCHAR(200) NOT NULL,PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)
);CREATE TABLE QRTZ_FIRED_TRIGGERS(SCHED_NAME VARCHAR(120) NOT NULL,ENTRY_ID VARCHAR(95) NOT NULL,TRIGGER_NAME VARCHAR(200) NOT NULL,TRIGGER_GROUP VARCHAR(200) NOT NULL,INSTANCE_NAME VARCHAR(200) NOT NULL,FIRED_TIME BIGINT(13) NOT NULL,SCHED_TIME BIGINT(13) NOT NULL,PRIORITY INTEGER NOT NULL,STATE VARCHAR(16) NOT NULL,JOB_NAME VARCHAR(200) NULL,JOB_GROUP VARCHAR(200) NULL,IS_NONCONCURRENT VARCHAR(1) NULL,REQUESTS_RECOVERY VARCHAR(1) NULL,PRIMARY KEY (SCHED_NAME,ENTRY_ID)
);CREATE TABLE QRTZ_SCHEDULER_STATE(SCHED_NAME VARCHAR(120) NOT NULL,INSTANCE_NAME VARCHAR(200) NOT NULL,LAST_CHECKIN_TIME BIGINT(13) NOT NULL,CHECKIN_INTERVAL BIGINT(13) NOT NULL,PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)
);CREATE TABLE QRTZ_LOCKS(SCHED_NAME VARCHAR(120) NOT NULL,LOCK_NAME  VARCHAR(40) NOT NULL,PRIMARY KEY (SCHED_NAME,LOCK_NAME)
);commit;

基于 Quartz 的调度中心相关推荐

  1. 联想基于Apache DolphinScheduler构建统一调度中心的应用实践

    导读 随着业务不断增长以及定时任务类型的多样化,联想内部需要一个统一的调度中心对任务生命周期进行管理.Apache DolphinScheduler 是一个分布式.易扩展的可视化 DAG 工作流任务调 ...

  2. 十七、.net core(.NET 6)搭建基于Quartz组件的定时调度任务

     搭建基于Quartz组件的定时调度任务 先在package包项目下,添加Quartz定时器组件: 新建类库项目Wsk.Core.QuartzNet,并且引用包类库项目.然后新建一个中间调度类,叫Qu ...

  3. 基于quartz实现定时任务管理系统

    基于quartz实现定时任务管理系统 背景 说起定时任务框架,首先想到的是Quartz.这是定时任务的老牌框架了,它的优缺点都很明显.借助PowerJob 的readme文档的内容简单带过一下这部分. ...

  4. 基于 K8S 构建数据中心操作系统

    在 12 月 22 日 ECUG 的下午场 ,七牛云容器计算部技术总监袁晓沛为大家带来了主题为<基于 K8S 的 DCOS 之路>的精彩分享,向大家介绍了七牛容器云目前 K8S 的状况和产 ...

  5. 一行代码完成定时任务调度,基于Quartz的UI可视化操作组件 GZY.Quartz.MUI

    前言 之前发布过第一个版本,有兴趣的可以去看看: NET Core 基于Quartz的UI可视化操作组件 GZY.Quartz.MUI 简介 GitHub开源地址:l2999019/GZY.Quart ...

  6. 基于Quartz.net 的开源任务管理平台

    最近,又重新整理,开发出了一套基于Quartz.net 的任务管理平台.将Quartz.net 的任务调度,管理等功能统一整合,形成了一套比较完整的任务调度平台.主要是:任务调度服务,后台任务管理 等 ...

  7. 控制台基于Quartz.Net组件实现定时任务调度(一)

    前言: 你曾经需要应用执行一个任务吗?比如现在有一个需求,需要每天在零点定时执行一些操作,那应该怎样操作呢? 这个时候,如果你和你的团队是用.NET编程的话,可以考虑使用Quartz.NET调度器.允 ...

  8. 基于JAVA师大家教中心管理系统计算机毕业设计源码+系统+mysql数据库+lw文档+部署

    基于JAVA师大家教中心管理系统计算机毕业设计源码+系统+mysql数据库+lw文档+部署 基于JAVA师大家教中心管理系统计算机毕业设计源码+系统+mysql数据库+lw文档+部署 本源码技术栈: ...

  9. Quartz.NET总结(五)基于Quartz.net 的开源任务管理平台

    前面总结了很多,关于Quartz.net 的文章,介绍了如何使用Quartz.net.不清楚的朋友,可以看我之前的系列文章,http://www.cnblogs.com/zhangweizhong/c ...

最新文章

  1. Delphi(Tuxedo,BDE,ADO)三合一数据集组件HsTxQuery
  2. 1、kubernetes系统基础190622
  3. jodd-StringTemplateParser使用
  4. FeHelper的安装与使用
  5. boost::log模块实现将日志记录初始化到远程 syslog 服务器
  6. 二维三角元有限元方法matlab,有限元C++编程实践.doc
  7. Android中文API(128) —— HandlerThread
  8. 双向循环链表:鸿蒙轻内核中数据的“驿站”
  9. IntelliJ IDEA 2017 MySQL5 绿色版 Spring 4 Mybatis 3 配置步骤详解(二)
  10. python集合排序_numpy排序与集合运算用法示例
  11. testlink界面优化_Testlink 太老了,测试用例管理有没有什么好工具推荐?
  12. python编程代码-几个Python小案例,爱上Python编程!
  13. 倒排索引c语言,Inverted Index(倒排索引)
  14. 知识付费,下半场怎么走(附大会PPT下载)
  15. iMazing 2.11.6 WinMac 中文版 — iOS设备管理工具
  16. 深圳软件测试培训:软件生命周期(SDLC)的六个阶段
  17. matlab能否独立做程序,如何将MATLAB程序编译成独立可执行的程序
  18. php 统计中英混杂word文档字数,wordfrequencycount
  19. Unity API通读 CustomEditor
  20. 七款好用的Linux防火墙

热门文章

  1. project(2)
  2. id门禁卡复制到手机_门禁卡怎么复制到苹果手机?
  3. onedrive php映射,宝塔面板搭建OneDrive目录程序OLAINDEX
  4. XT.COM关于Coinzilla AMA直播回顾
  5. 一图搞懂formula常用符号
  6. 聚焦云上安全,2021首届-西部云安全峰会将在西安召开
  7. Android java synchronized原理
  8. 中文词向量的下载与使用探索 (tensorflow加载词向量)
  9. 深圳美景品牌策划机构:英国知名时尚鞋履品牌IGX签约美景策划
  10. wro4j和maven plugin在编译期间压缩静态资源