本示例使用Spring+quartz实现定时任务的调度,通过zookeeper+curator实现分布式锁,保证分布式任务串行运行,通过自定义注解实现任务的扫描及注册;

1.添加相关的maven依赖,不包括spring

<dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>2.8.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.8.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.8.0</version>
</dependency>
<dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.2.2</version>
</dependency>    

2.定义任务扫描的相关注解:

/*** 调度任务注解*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Scheduler {}
/***  调度任务是否为安全的并行任务,默认为true,不允许并行*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ScheduleTaskConcurrent {/*** 任务名称* @return*/String  value();
}
/*** 任务表达式*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ScheduleTaskCronExpression {/*** 任务名称* @return*/String value();
}
/*** 任务方法*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ScheduleTaskMethod {/*** 任务名称* @return*/String value();
}
/***  调度任务是否为分布式安全的并行任务,默认为true,不允许并行*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ScheduleTaskRemoteConcurrent {/*** 任务名称* @return*/String value();
}
/*** 任务是否执行*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ScheduleTaskRunning {/*** 任务名称* @return*/String value();
}

3.定义任务对象对应的实体bean:

/*** 任务对象*/
public class TaskBean {private String name;private String cronExpression;private Boolean concurrent = true;private Boolean remoteConcurrent=true;private Boolean running = true;private Object targetObject;private String targetMethod;public TaskBean(String name, String cronExpression, Boolean running, Boolean concurrent, Boolean remoteConcurrent,Object targetObject, String targetMethod) {this.name = name;this.cronExpression = cronExpression;this.concurrent = concurrent;this.remoteConcurrent = remoteConcurrent;this.running = running;this.targetObject = targetObject;this.targetMethod = targetMethod;}public TaskBean(String cronExpression, Object targetObject, String targetMethod) {this(null, cronExpression, null, null, targetObject, targetMethod);}public TaskBean(String name, String cronExpression, Object targetObject, String targetMethod) {this(name, cronExpression, null, null, targetObject, targetMethod);}public TaskBean(String cronExpression, Boolean running, Boolean stateful, Object targetObject, String targetMethod) {this(null, cronExpression, running, stateful, targetObject, targetMethod);}public TaskBean(String name, String cronExpression, Boolean running, Boolean concurrent, Object targetObject, String targetMethod) {this.name = StringUtils.isEmpty(name) ? getDefaultName() : name;this.cronExpression = cronExpression;this.running = running != null ? running : this.running;this.concurrent = concurrent != null ? concurrent : this.concurrent;this.targetObject = targetObject;this.targetMethod = targetMethod;}public TaskBean(String name, String cronExpression, Boolean running, Boolean concurrent,Object targetObject, String targetMethod,Boolean remoteConcurrent) {this.name = StringUtils.isEmpty(name) ? getDefaultName() : name;this.cronExpression = cronExpression;this.running = running != null ? running : this.running;this.concurrent = concurrent != null ? concurrent : this.concurrent;this.targetObject = targetObject;this.targetMethod = targetMethod;this.remoteConcurrent = remoteConcurrent != null ? remoteConcurrent : this.remoteConcurrent;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getCronExpression() {return cronExpression;}public void setCronExpression(String cronExpression) {this.cronExpression = cronExpression;}public Boolean getConcurrent() {return concurrent;}public void setConcurrent(Boolean concurrent) {this.concurrent = concurrent;}public Boolean getRemoteConcurrent() {return remoteConcurrent;}public void setRemoteConcurrent(Boolean remoteConcurrent) {this.remoteConcurrent = remoteConcurrent;}public Boolean getRunning() {return running;}public void setRunning(Boolean running) {this.running = running;}public Object getTargetObject() {return targetObject;}public void setTargetObject(Object targetObject) {this.targetObject = targetObject;}public String getTargetMethod() {return targetMethod;}public void setTargetMethod(String targetMethod) {this.targetMethod = targetMethod;}private String getDefaultName() {return this.targetObject.getClass().getName();}}

4.定义任务初始化工厂类及其实现:

/*** 任务初始化工厂类*/
public interface ScheduleFactory {public void init(List<TaskBean> taskBeanList) throws Exception;
}@Component("quartzSchedulerFactory")
public class QuartzScheduleFactory implements ScheduleFactory{@Autowiredprivate Scheduler quartzScheduler;public void init(List<TaskBean> taskBeanList) throws Exception {if(taskBeanList ==null || taskBeanList.size()<=0 ){return;}for(TaskBean taskBean : taskBeanList){if (taskBean != null) {if (taskBean.getRunning()) {QuartzSchedulerUtils.createScheduleJob(quartzScheduler, taskBean);}}}}}

5.zk实现的分布式锁:

zk配置信息的加载

public class LocalZookeeperPropertiesLoader{private static void putConfig(Properties localProperties, Map.Entry entry) {if (entry.getKey().equals(ZookeeperConfigConstants.ZOOKEEPER_ADDRESS)) {localProperties.put(ZookeeperConfigConstants.ZOOKEEPER_ADDRESS, entry.getValue());}}/*** 从默认配置文件 config.properties 中得到ZooKeeper的配置信息* 其中 ZooKeeper 的地址由 (系统属性 优先于 环境变量 优先于 配置文件)指定* @return 配置信息*/@Overridepublic Properties load() {Properties localProperties = new Properties();try {InputStream resourceAsStream = LocalZookeeperPropertiesLoader.class.getClassLoader().getResourceAsStream("config" + ".properties");localProperties.load(resourceAsStream);} catch (Exception e) {throw new RuntimeException(e);}System.getenv().entrySet().forEach(entry -> putConfig(localProperties, entry));System.getProperties().entrySet().forEach(entry -> putConfig(localProperties, entry));return localProperties;}@Overridepublic void destroy() {}

初始化curatorFrameWork

public class CRMSCuratorFrameworkFactory {private CRMSCuratorFrameworkFactory() {}public static CuratorFramework createCuratorFramework(CuratorProperties configuration){return createCuratorFramework(configuration, null);}public static CuratorFramework createCuratorFramework(CuratorProperties curatorProperties, TreeCacheListener listener, String... paths){CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(curatorProperties.getConnectString()).connectionTimeoutMs(curatorProperties.getConnectionTimeoutMs()).retryPolicy(curatorProperties.getRetryPolicy()).sessionTimeoutMs(curatorProperties.getSessionTimeoutMs()).build();curatorFramework.start();if(paths.length > 0 && listener != null){for(String path :paths){try {if(curatorFramework.checkExists().forPath(path)==null){curatorFramework.create().creatingParentsIfNeeded().forPath(path);}TreeCache watcher =TreeCache.newBuilder(curatorFramework,path).build();watcher.getListenable().addListener(listener);watcher.start();}catch (Exception e){throw new RuntimeException(e);}}}return  curatorFramework;}
}

互斥锁获取后的回调

@FunctionalInterface
public interface MutexLockCallable {void call();
}

分布式锁的实现

//zk实现的分布式互斥锁
public class ZookeeperMetuxLock {private static Logger log = LoggerFactory.getLogger(ZookeeperMetuxLock.class);/*** 分布式互斥锁* @param lockName 锁对应的zk节点的名称* @param time     阻塞时间* @param unit     阻塞时间单位* @param callback  获得锁需要执行的内容*/public static boolean execute(String lockName,long time,TimeUnit unit, MetuxLockCallble callback){boolean acquired = false;CuratorProperties curatorProperties = CuratorPropertiesBuilder.getInstance().build(new LocalZookeeperPropertiesLoader().load());try (CuratorFramework curatorFramework = CRMSCuratorFrameworkFactory.createCuratorFramework(curatorProperties)) {//互斥分布式锁InterProcessMutex mutexLock = new InterProcessMutex(curatorFramework, lockName);if (mutexLock.acquire(time, unit)) {try {callback.call();} finally {mutexLock.release();acquired = true;}}} catch (Exception ex) {log.error("MutexLock lock error! message:" + ex.getMessage(), ex);} finally {return acquired;}}/*** 分布式互斥锁** @param lockName 锁对应的zookeeper节点名字* @param callback 获取锁后执行的内容* @return 是否成功获取锁*/public static boolean execute(String lockName, MetuxLockCallble callback) {return execute(lockName, -1, null, callback);}
}

6.quartz的job的实现:

public class QuartzJob implements Job {private static Logger log = LoggerFactory.getLogger(QuartzJob .class);private static final String MUTEX_PATH_ON_ZOOKEEPER_PREFIX = "/scheduler_lock/";public void execute(JobExecutionContext context) throws JobExecutionException {final TaskFactoryBean taskFactoryBean = (TaskFactoryBean) context.getMergedJobDataMap().get(QuartzSchedulerUtils.SCHEDULEFACTORYBEAN);if (taskFactoryBean.getRemoteConcurrent()) {String lockName = MUTEX_PATH_ON_ZOOKEEPER_PREFIX + taskFactoryBean.getName();ZookeeperMutexLock.execute(lockName, 0, TimeUnit.MICROSECONDS, () -> {QuartzJob .executeTask(taskFactoryBean);});} else {executeTask(taskFactoryBean);}}private static void executeTask(TaskFactoryBean taskFactoryBean) {try {Object targetObject = taskFactoryBean.getTargetObject();String targetMethod = taskFactoryBean.getTargetMethod();Method method = targetObject.getClass().getMethod(targetMethod, new Class[]{});if (method != null) {method.invoke(targetObject, new Object[]{});} else {log.error("task " + taskFactoryBean.getName() + " execute error! message: execute method is null");}} catch (Exception ex) {log.error("task " + taskFactoryBean.getName() + " execute error! message:" + ex.getMessage(), ex);}}

7.创建job的工具类:

public class QuartzSchedulerUtils {public static final String SCHEDULEFACTORYBEAN = "scheduleFactoryBean";public static void createScheduleJob(Scheduler scheduler, TaskBean taskBean)throws SchedulerException {//同步或者异步Class<? extends Job> jobClass =  QuartzJob.class;//构建job信息String  jobName = taskBean.getName();String jobGroupName = jobName;JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName,jobGroupName).build();//放入参数,运行时的方法可以获取jobDetail.getJobDataMap().put(SCHEDULEFACTORYBEAN,taskBean);//表达式调度构造器CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(taskBean.getCronExpression());//按新的表达式构造一个新的triggerCronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(jobName,jobGroupName).withSchedule(scheduleBuilder).build();scheduler.scheduleJob(jobDetail,cronTrigger);}
}

8.定义调度管理的入口类:

@Component("schedulerManager")
@Lazy(false)
public class SchedulerManager implements ApplicationContextAware {private static Logger log = LoggerFactory.getLogger(SchedulerManager.class);private List<TaskFactoryBean> taskFactoryBeen = new ArrayList<TaskFactoryBean>();private SchedulerFactory schedulerFactory;@PostConstructpublic void init() throws Exception {schedulerFactory.init(taskFactoryBeen);}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {initTaskFactoryBeen(applicationContext);schedulerFactory = (SchedulerFactory) applicationContext.getBean("quartzSchedulerFactory");}private void initTaskFactoryBeen(ApplicationContext applicationContext) {Map<String, Object> taskBeanMap = applicationContext.getBeansWithAnnotation(Scheduler.class);if (taskBeanMap != null && taskBeanMap.size() > 0) {for (String beanName : taskBeanMap.keySet()) {Object taskBean = taskBeanMap.get(beanName);try {List<TaskFactoryBean> currentTaskFactoryBeen = buildTaskFactoryBean(taskBean);if (currentTaskFactoryBeen != null && currentTaskFactoryBeen.size() > 0) {taskFactoryBeen.addAll(currentTaskFactoryBeen);}} catch (Exception ex) {log.error("Initializes the scheduling bean " + beanName + " failure ! message:" + ex.getMessage(), ex);throw new RuntimeException(ex);}}}}private List<TaskFactoryBean> buildTaskFactoryBean(Object taskBean) throws Exception {Object targetBean = SpringTargetBeanUtils.getTarget(taskBean);Map<String, String> cronMap = new HashMap<String, String>();Map<String, Boolean> runningMap = new HashMap<String, Boolean>();Map<String, Boolean> concurrentMap = new HashMap<String, Boolean>();Map<String, Boolean> remoteConcurrentMap = new HashMap<String, Boolean>();Field[] fields = targetBean.getClass().getDeclaredFields();for (Field field : fields) {field.setAccessible(true);if (field.isAnnotationPresent(ScheduleTaskCronExpression.class)) {ScheduleTaskCronExpression scheduleTaskCronExpression = field.getAnnotation(ScheduleTaskCronExpression.class);String taskName = scheduleTaskCronExpression.value();String cronExpression = (String) field.get(targetBean);cronMap.put(taskName, cronExpression);} else if (field.isAnnotationPresent(ScheduleTaskRunning.class)) {ScheduleTaskRunning scheduleTaskRunning = field.getAnnotation(ScheduleTaskRunning.class);String taskName = scheduleTaskRunning.value();Boolean running = (Boolean) field.get(targetBean);runningMap.put(taskName, running);} else if (field.isAnnotationPresent(ScheduleTaskConcurrent.class)) {ScheduleTaskConcurrent scheduleTaskConcurrent = field.getAnnotation(ScheduleTaskConcurrent.class);String taskName = scheduleTaskConcurrent.value();Boolean concurrent = (Boolean) field.get(targetBean);concurrentMap.put(taskName, concurrent);}else if (field.isAnnotationPresent(ScheduleTaskRemoteConcurrent.class)) {ScheduleTaskRemoteConcurrent scheduleTaskRemoteConcurrent = field.getAnnotation(ScheduleTaskRemoteConcurrent.class);String taskName = scheduleTaskRemoteConcurrent.value();Boolean concurrent = (Boolean) field.get(targetBean);remoteConcurrentMap.put(taskName, concurrent);}}List<TaskFactoryBean> currentTaskFactoryBeen = new ArrayList<TaskFactoryBean>();Method[] methods = targetBean.getClass().getDeclaredMethods();for (Method method : methods) {if (method.isAnnotationPresent(ScheduleTaskMethod.class)) {ScheduleTaskMethod scheduleTaskMethod = method.getAnnotation(ScheduleTaskMethod.class);String taskName = scheduleTaskMethod.value();String methodName = method.getName();String cronExpression = cronMap.get(taskName);Boolean running = runningMap.get(taskName);Boolean concurrent = concurrentMap.get(taskName);Boolean remoteConcurrent = remoteConcurrentMap.get(taskName);TaskFactoryBean taskFactoryBean = new TaskFactoryBean(taskName, cronExpression, running, concurrent, taskBean, methodName, remoteConcurrent);currentTaskFactoryBeen.add(taskFactoryBean);}}return currentTaskFactoryBeen;}public List<TaskFactoryBean> getTaskFactoryBeen() {return taskFactoryBeen;}

9.相关spring及zk的配置:

<bean id="quartzScheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean" lazy-init="false"><!-- quartz 延时加载 --><property name="startupDelay" value="1"/>
</bean>

config.properies

#zookeeper addresss
zk_address=ip:2181
#zk properties
zookeeper.config.sessionTimeoutMs=60000
zookeeper.config.connectionTimeoutMs=3000
zookeeper.config.retry.baseSleepTimeMs=100
zookeeper.config.retry.maxRetries=3

10. 创建test类:

@Component
@Scheduler
public class TestTask {private static final String ADDUSERTASK = "TestTask";@ScheduleTaskCronExpression(ADDUSERTASK)private String taskEx = "* * * * * ? *";@ScheduleTaskRunning(ADDUSERTASK)private Boolean taskRunning = true;@ScheduleTaskRemoteConcurrent(ADDUSERTASK)private Boolean remoteCon = true;@ScheduleTaskMethod(ADDUSERTASK)public void task() {System.out.println("this is the task");}
}

Spring集成quartz实现的定时任务调用相关推荐

  1. spring集成quartz报org.springframework.scheduling.quartz.CronTriggerBean异常

    spring集成quartz项目做定时任务,但是启动tomcat报错: ClassNotFoundException: org.springframework.scheduling.quartz.Cr ...

  2. 使用Spring整合Quartz轻松完成定时任务

    一.背景 上次我们介绍了如何使用Spring Task进行完成定时任务的编写,这次我们使用Spring整合Quartz的方式来再一次实现定时任务的开发,以下奉上开发步骤及注意事项等. 二.开发环境及必 ...

  3. 使用Spring Boot + Quartz 实现分布式定时任务平台

    本文将从项目实战出发来介绍分布式定时任务的实现.在某些应用场景下要求任务必须具备高可用性和可扩展性,单台服务器不能满足业务需求,这时就需要使用Quartz实现分布式定时任务. 一.分布式任务应用场景 ...

  4. Quartz学习总结(1)——Spring集成Quartz框架

    一.Quartz简介 Quartz是OpenSymphony开源组织在Job scheduling领域又一个开源项目,它可以与J2EE与J2SE应用程序相结合也可以单独使用.Quartz可以用来创建简 ...

  5. spring集成Quartz时区问题造成任务晚执行八小时

    项目中在Spring中集成了Quartz,配置定时任务每日凌晨执行,但是到了八点多才执行,经过排查是时区问题造成的. 一种解决办法是在JVM启动参数中增加 --Duser.timezone=GMT+0 ...

  6. java 定时任务插件_详解Spring整合Quartz实现动态定时任务

    最近项目中需要用到定时任务的功能,虽然spring 也自带了一个轻量级的定时任务实现,但感觉不够灵活,功能也不够强大.在考虑之后,决定整合更为专业的Quartz来实现定时任务功能. 普通定时任务 首先 ...

  7. 使用Spring提供Quartz来实现定时任务

    Spring功能越来越多了,用起来还很舒服方便,Quartz实现的定时任务就是一个. 首先是配置文件: <?xml version="1.0" encoding=" ...

  8. java quartz spring_JavaLib-quartz | 基于Spring Boot Quartz开发的定时任务

    基于Spring Boot Quartz开发的JavaLib-quartz,目的是帮你快速构建定时任务系统,你可以专心编写你的业务逻辑,而不必关注定时任务具体是如何实现的,他的性能如何,有没有异常以及 ...

  9. spring集成quartz框架

    2019独角兽企业重金招聘Python工程师标准>>> 1.Spring对quartz支持 2.实例(按红色部分配置) 1)引入quartz包 <dependency>& ...

  10. 主题:spring集成quartz,出现2次重复调用的问题

    在项目中使用了quartz的任务调度,在本地测试的时候没有问题,只会调用1次部署到服务器上后,发现同一个任务,在相同的时间被调用了2次.本地环境 windowsXP ,spring2.5.1, sun ...

最新文章

  1. 手机mvno怎么设置_微信透明背景壁纸怎么弄 手机设置方法教程分享
  2. python for android-Python-for-Android安装笔记
  3. Oracle 查看表空间的大小及使用情况sql语句
  4. Docker中使用Dockerfile定制化jar启动时:at sun.awt.FontConfiguration.getVersion(FontConfiguration.java:1264)
  5. Variable、Tensor、Numpy的转换
  6. 托福试卷真题_托福反复考,反复不过百,你还不知道是谁的问题吗?
  7. 循序渐进DB2 (第2版)——DBA系统管理、运维与应用案例
  8. python队列来做什么_简单介绍python的双向队列
  9. 明星分手文案火了!为了营销 你们这些商家也是很努力啊...
  10. PCA原理及代码实现
  11. mysql xmlhttp_php_xmlhttp 乱码问题解决方法
  12. OC中iO操作相关方法
  13. java贪吃蛇详细设计,javascript贪吃蛇游戏设计与实现
  14. 七层网络协议模型(ISO模型)
  15. 【JAVA】 new ArrayList<> () {{}} 双花括号 是什么写法?
  16. 关于最新版go-cqhttp无法登录qq
  17. 百度西雅图开设AI实验室 总裁张亚勤称AI是时代变革之能
  18. 【AI名利场·人物】Gowild创始人邱楠:倔强顽强创新着,将AI虚拟生命进行到底...
  19. 基于 FPGA Vivado 的74系列IP封装(附源工程)
  20. 电磁场理论笔记01:场论

热门文章

  1. 计算机苏教版初一教案,文笔精华(苏教版七年级) 教案教学设计
  2. 23. Django进阶:Django发送邮件
  3. Node:连接MySQL报错\lib\protocol\Parser.js:43 Cannot read property ‘query‘ of undefined
  4. Go语言:数组练习—冒泡排序
  5. Java:面向对象编程
  6. FFmpeg学习(3)——视频中音频文件提取
  7. Java中Properties类的操作配置文件
  8. 论文笔记_S2D.37_2015-TPAMI_使用深度卷积神经场从单目图像学习深度
  9. 论文笔记_S2D.17-2018-ECCV-通过卷积空间传播网络(CSPN)的相似性学习进行深度估计
  10. 自动驾驶_基于强化学习的自动驾驶系统