通过自定义任务,可以修改任务配置(如:触发时间),停止或启动任务,查看任务日志等功能。

后端项目:custom-schedule-server

前端项目:megrez-ng2

模型(model)

Config.java 任务配置

/*** 任务配置信息* * @author Lucky Yang* @since 1.0.0*/
@Getter
@Setter
@ToString(callSuper = true)
@Entity
@Table(name = "job_config")
public class Config extends AbstractBasePO<String, Integer> {/** 类完整名称 */private String className;/** 方法名称 */private String methodName;/** 方法参数 */private String methodParams;/** cron表达式 */private String cronExpression;/** 备注 */private String remark;/** 启用 */private Boolean enabled;@ToString.Exclude@OneToMany(mappedBy = "config", cascade = { CascadeType.PERSIST, CascadeType.REMOVE })private List<Log> logs = new ArrayList<>();public Config() {}public Config(String className, String methodName, String methodParams, String cronExpression, String remark) {this.className = className;this.methodName = methodName;this.methodParams = methodParams;this.cronExpression = cronExpression;this.remark = remark;this.enabled = true;}@PrePersist@PreUpdatepublic void preAction() {if (getId() == null) {setId(Snowflake.nextIdStr());}if (enabled == null) {enabled = true;}}@Overridepublic boolean equals(Object o) {return super.equals(o);}@Overridepublic int hashCode() {return super.hashCode();}}

Log.java 任务运行日志

/*** 日志* * @author Lucky Yang* @since 1.0.0*/
@Getter
@Setter
@ToString(callSuper = true)
@Entity
@Table(name = "job_log")
@EntityListeners({ AuditingEntityListener.class })
public class Log extends AbstractBasePO<String, Integer> {@CreatedDate@Column(updatable = false)@Temporal(TemporalType.TIMESTAMP)private Date createdDate;@ManyToOne@JoinColumn(name = "configId")private Config config;/** 配置快照 */private String configSnapshot;/** 类型 */@Enumeratedprivate TaskState state;/** 消息 */private String message;/** 耗时:毫秒 */private Long costTime;public Log() {}public Log(Config config) {this.config = config;}@PrePersistpublic void preAction() {if (getId() == null) {setId(Snowflake.nextIdStr());}}@Overridepublic boolean equals(Object o) {return super.equals(o);}@Overridepublic int hashCode() {return super.hashCode();}
}

Monitor.java 任务运行信息

/*** 任务监控信息* * @author Lucky Yang* @since 1.0.0*/
@Data
public class Monitor{/** 启动时间 */private Date startUpTime;/** 最后运行时间 */private Date lastRunTime;/** 运行次数 */private int numOfRuns;/** 失败次数 */private int numOfErrors;/** 任务状态 */private TaskState state;/** 最近的消息 */private String message;/** 最近消耗时间,毫秒 */private long costTime;/** 是否存在计划任务 */private boolean scheduled;public Monitor() {this.scheduled = false;this.state = TaskState.STOPPED;}public Monitor(Monitor m) {this.startUpTime = m.startUpTime;this.lastRunTime = m.lastRunTime;this.numOfRuns = m.numOfRuns;this.numOfErrors = m.numOfErrors;this.state = m.state;this.message = m.message;this.costTime = m.getCostTime();this.scheduled = m.isScheduled();}public Monitor start() {scheduled = true;startUpTime = new Date();lastRunTime = new Date();numOfRuns = 0;numOfErrors = 0;state = TaskState.STARTED;message = state.note;costTime = 0;return this;}public Monitor run() {costTime = calCostTime();lastRunTime = new Date();numOfRuns = numOfRuns + 1;state = TaskState.RUNNING;message = state.note;return this;}public Monitor complete() {costTime = calCostTime();lastRunTime = new Date();state = TaskState.COMPLETE;message = state.note;return this;}public Monitor stop() {scheduled = false;costTime = calCostTime();lastRunTime = new Date();state = TaskState.STOPPED;message = state.note;return this;}public Monitor error(String error) {costTime = calCostTime();lastRunTime = new Date();numOfErrors = numOfErrors + 1;state = TaskState.ERROR;message = error;return this;}private long calCostTime() {return (new Date()).getTime() - lastRunTime.getTime();}}

Task.java 任务信息

/*** 任务* * @author Lucky Yang* @since 1.0.0*/
@Getter
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public class Task {@EqualsAndHashCode.Includeprivate final Config config;private final Monitor monitor;private Task(Monitor monitor, Config config) {this.monitor = monitor;this.config = config;}public Task(Task task) {this(new Monitor(task.getMonitor()), task.getConfig());}private void validate() {final CommonRuntimeException ex = new CommonRuntimeException("Task config verification failed");boolean isNull = monitor == null && config == null;if (isNull) {throw ex;}boolean hasText = StringHelper.hasText(config.getClassName())&& StringHelper.hasText(config.getMethodName())&& StringHelper.hasText(config.getCronExpression());if (!hasText) {throw ex;}}@Overridepublic String toString() {return "Task [ id = " + config.getId() +", className = " + config.getClassName() +", methodName = " + config.getMethodName() +", cronExpression = " + config.getCronExpression() +", state = " + monitor.getState() +", startUpTime = " + monitor.getStartUpTime() +", lastRunTime = " + monitor.getLastRunTime() +"]";}public static Task of(Config config) {Task rslt = new Task(new Monitor(), config);rslt.validate();return rslt;}
}

服务(service)

TaskService.java 任务服务

/*** 任务服务* * @author Lucky Yang* @since 1.0.0*/
@Component
@RequiredArgsConstructor
public class TaskService {private final LogRepository logRepo;private final ConfigService configeSrc;private final ConfigRepository configRep;@Transactional(rollbackFor = Exception.class)public void run(Task task) {task.getMonitor().run();saveTaskToLog(new Task(task));}@Transactional(rollbackFor = Exception.class)public void start(Task task) {task.getMonitor().start();saveTaskToLog(new Task(task));}@Transactional(rollbackFor = Exception.class)public void complete(Task task) {task.getMonitor().complete();saveTaskToLog(new Task(task));}@Transactional(rollbackFor = Exception.class)public void stop(Task task) {task.getMonitor().stop();saveTaskToLog(new Task(task));}@Transactional(rollbackFor = Exception.class)public void error(Task task, String error) {task.getMonitor().error(error);saveTaskToLog(new Task(task));}/*** 将当前工作任务信息保存到日志* * @param task*/@Transactional(rollbackFor = Exception.class)public void saveTaskToLog(Task task) {Config config = configeSrc.findById(task.getConfig().getId());Log log = new Log();log.setConfig(config);log.setConfigSnapshot(task.getConfig().toString());log.setCostTime(task.getMonitor().getCostTime());log.setState(task.getMonitor().getState());log.setMessage(StringHelper.limitLength(task.getMonitor().getMessage(), 3999));logRepo.save(log);}@Transactional(rollbackFor = Exception.class)public Config removeConfig(String configId) {Config config = configeSrc.findById(configId);if (ScheduledTaskContainer.isStarted(Task.of(config))) {throw new TaskStateException();}configRep.delete(config);ScheduledTaskContainer.remove(Task.of(config));return config;}@Transactional(rollbackFor = Exception.class)public Config createConfig(Config config) {Assert.isNull(config.getId(), "config cannt create");Config rslt = configRep.save(config);// 加入到容器ScheduledTaskContainer.put(Task.of(rslt), ScheduledFutureTaskHolder.emptyHolder());return rslt;}@Transactional(rollbackFor = Exception.class)public Config updateConfig(Config config) {Assert.notNull(config.getId(), "config cannt update");// 是否有任务在运行if (ScheduledTaskContainer.isStarted( Task.of(config))) {throw new TaskStateException();}Config rslt = configRep.save(config);// 加入到容器ScheduledTaskContainer.put(Task.of(rslt), ScheduledFutureTaskHolder.emptyHolder());return rslt;}
}

ScheduledTaskContainer.java计划任务容器

/*** 计划任务容器* * @author Lucky Yang* @since 1.0.0*/
public class ScheduledTaskContainer {private final Map<Task, ScheduledFutureTaskHolder> cache = new ConcurrentHashMap<>();private static final ScheduledTaskContainer INST = new ScheduledTaskContainer();public static Optional<Entry<Task, ScheduledFutureTaskHolder>> getEntry(Config config) {return INST.cache.entrySet().stream().filter(entry -> entry.getKey().getConfig().equals(config)).findFirst();}public static boolean contains(Task task) {return INST.cache.containsKey(task);}public static synchronized void remove(Task task) {if(isStarted(task)){throw new TaskStateException();}INST.cache.remove(task);}public static synchronized void put(Task task, ScheduledFutureTaskHolder holder) {if(isStarted(task)){throw new TaskStateException();}// 需要更新key对象INST.cache.remove(task);INST.cache.put(task, holder);}public static Optional<ScheduledFutureTaskHolder> get(Task task) {return Optional.ofNullable(INST.cache.get(task));}public static boolean isStopped(Task task) {Optional<ScheduledFutureTaskHolder> holder = get(task);return holder.isEmpty() || holder.get().isStopped();}public static boolean isStarted(Task task) {return !isStopped(task);}public static Set<Task> getTasks() {return INST.cache.keySet();}public static void loopInContainer(Consumer<Entry<Task, ScheduledFutureTaskHolder>> consumer) {INST.cache.entrySet().forEach(consumer);}public static int size() {return INST.cache.size();}
}

TaskRegistrar.java 任务注册

/*** 任务注册* * @author Lucky Yang* @since 1.0.0*/
@Slf4j
@Component
@RequiredArgsConstructor
public class TaskRegistrar implements DisposableBean {private final TaskScheduler taskScheduler;private final TaskService taskService;public synchronized void register(Task task) {if (log.isDebugEnabled()) {log.debug("Task is registering: {}", task);}if (ScheduledTaskContainer.isStarted(task)) {throw new TaskStateException();}try {ScheduledTask scheduleTask = new ScheduledTask(task, taskService);CronTask cronTask = new CronTask(scheduleTask, task.getConfig().getCronExpression());ScheduledFuture<?> future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());ScheduledFutureTaskHolder holder = new ScheduledFutureTaskHolder(scheduleTask, future);ScheduledTaskContainer.put(task, holder);holder.start();} catch (Exception e) {throw new CommonRuntimeException(e);}}public synchronized void unregister(Task task) {if (log.isDebugEnabled()) {log.debug("Task is unregistering: {}", task);}if (ScheduledTaskContainer.isStopped(task)) {throw new TaskStateException();}ScheduledTaskContainer.get(task).ifPresent(ScheduledFutureTaskHolder::stop); }public List<Task> unregisterAll() {List<Task> stoppedTaskList = new ArrayList<>();ScheduledTaskContainer.loopInContainer(entry -> {Task task = entry.getKey();if (ScheduledTaskContainer.isStopped(task)) {return;}unregister(task);stoppedTaskList.add(task);});return stoppedTaskList;}@Overridepublic void destroy() {List<Task> stoppedTaskList = unregisterAll();log.info("Total task is {}, {} tasks stopped successfully ", ScheduledTaskContainer.size(), stoppedTaskList.size());if (log.isDebugEnabled() && !stoppedTaskList.isEmpty()) {log.debug("Stopped task list: {}", stoppedTaskList);}}}

功能截图

运行任务列表:

任务日志

任务管理

[Job服务] - 自定义计划任务服务相关推荐

  1. Centos7#基础服务之计划任务服务

    文章目录 1.一次性计划任务at 2.循环性计划任务cron 3.看门狗查看数据拷贝备份循环计划任务实例 1.一次性计划任务at 作用: 计划任务主要是做一些周期性的任务,目前最主要的用途是定期备份数 ...

  2. centos 开机启动java_Centos 7将java jar包自定义开机启动服务

    Centos 7将java jar包自定义开机启动服务 1. 先上 jar包的启动脚本 vim service.sh #!/bin/bash # 需要变更的参数 # 先查看java绝对路径:which ...

  3. linux自定义开机启动服务和chkconfig使用方法

    文章转载! linux自定义开机启动服务和chkconfig使用方法 1. 服务概述 在linux操作系统下,经常需要创建一些服务,这些服务被做成shell脚本,这些服务需要在系统启动的时候自动启动, ...

  4. 在ASP.NET AJAX中使用应用程序服务和本地化(5):自定义应用程序服务的服务器端实现...

    本文来自<ASP.NET AJAX程序设计 第II卷:客户端Microsoft AJAX Library相关>的第五章<应用程序服务和本地化>. 身份认证与用户个性化等应用程序 ...

  5. Windows7 打开任务计划提示“任务计划程序服务不可用。任务计划程序将尝试重新与其建立连接。”解决办法

    不知什么原因,Windows7系统进入"控制面板"----"管理工具"----"任务计划程序"时,提示"任务计划程序服务不可用.任 ...

  6. 启用Win11原生支持的DoH(DNS over HTTPS)和配置自定义的DoH服务

    更新 22.1.23:发现有的dns配置没有首选的dns加密,好像是用无线的时候就会没有,不影响,看情况2就行 为了方便,直接给出一些模板(国内可用的) netsh dns add encryptio ...

  7. 大数据管理神器:Ambari自定义stack和服务二次开发详细教程

    背景 Ambari 是 Apache Software Foundation 的一个顶级开源项目,是一个集中部署.管理.监控 Hadoop 分布式集群的工具. 部署:自动化部署 Hadoop 软件,能 ...

  8. TASK SCHEDULE(任务计划)服务无法运行 解决方案

    Q:TASK SCHEDULE(任务计划)服务无法运行? A:因为Event Log服务没有打开,所以TASK SCHEDULE服务也打不开 转载于:https://www.cnblogs.com/w ...

  9. 转载-大数据管理神器:Ambari自定义stack和服务二次开发详细教程

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/ZYC88888/article/det ...

最新文章

  1. 计算机出国读博必读!外国小哥创建最全CS奖学金项目清单
  2. 中运用_舞蹈中,如何正确运用呼吸?
  3. FPGA自定义UART传输(包含:matlab数据拆分)
  4. 【Linux】一步一步学Linux——seq命令(221)
  5. 如何遍历维数和各维上限未定的多维数组
  6. 洛谷P4463:calc(dp、拉格朗日插值)
  7. 车纷享:基于阿里云HBase构建车联网平台实践
  8. 把百度网站设为首页_大仙SEO:如何解决网站首页百度收录后被删除?【SEO案例】...
  9. opencv2.4.9中HoughlinesP源码中的疑问解析!
  10. 最新计算机二级c语言程序设计题库,计算机二级C语言编程题库(100题
  11. stm32单片机驱动L298N模块
  12. linux内核学习资料总结
  13. os 存储器的结构层次
  14. ITIL学习笔记——核心流程之:服务台
  15. python exec 返回值_Python exec()用法及代码示例
  16. 使用Python进行并发编程
  17. 是时候更新Android Studio 3.5了!不信,你看~
  18. 维天运通通过港交所上市聆讯:线上GTV减少14亿元,毛利率两连降
  19. CSDN实训 - 通过Java修改游戏存档
  20. 前5名最佳SQL数据库恢复软件

热门文章

  1. 全国计算机考试如何查准考证号
  2. JMeter BeanShell 应用
  3. Node节点禁止调度(平滑维护)方式- cordon,drain,delete
  4. 皕杰报表之小程序代码质量检测
  5. Win7 64位中MinGW和MSYS的安装
  6. 反素数java_【Java自学】 反素数
  7. flux读取不到数据_WebFlux 中form data获取不到参数问题
  8. c++和python哪个好_python和c++哪个好 有什么区别
  9. python的scapy_python scapy网络嗅探
  10. 什么是SSL协议,浅谈SSL协议。