目录

前言

一、代码具体实现

1、pom文件配置

2、application.yaml文件

3、Service实现类

4、SpringBatch配置类

5、Processor设置

6、封装实体Bean

7、启动类上要加上注解

总结


前言

最近赶一个紧急需求,需求内容如下:

PC网页触发一条设备升级记录(下图),后台要定时批量设备更新。这里定时要用到Quartz,批量数据处理要用到SpringBatch,二者结合,可以完成该需求。

由于之前,没有用过SpringBatch,于是上网查了下资料,发现可参考的不是很多,于是只能去慢慢的翻看官方文档。

https://docs.spring.io/spring-batch/4.1.x/reference/html

遇到不少问题,就记录一下吧。

一、代码具体实现

1、pom文件配置

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId></dependency>
</dependencies>

2、application.yaml文件

spring:datasource:username: thinklinkpassword: thinklinkurl: jdbc:postgresql://172.16.205.54:5432/thinklinkdriver-class-name: org.postgresql.Driverbatch:job:enabled: false
server:port: 8073#upgrade-dispatch-base-url: http://172.16.205.125:8080/api/rpc/dispatch/command/
upgrade-dispatch-base-url: http://172.16.205.211:8080/api/noauth/rpc/dispatch/command/# 每次批量处理的数据量,默认为5000
batch-size: 5000

3、Service实现类

触发批处理任务的入口,执行一个job。

@Service("batchService")
public class BatchServiceImpl implements BatchService {// 框架自动注入@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate Job updateDeviceJob;/*** 根据 taskId 创建一个Job* @param taskId* @throws Exception*/@Overridepublic void createBatchJob(String taskId) throws Exception {JobParameters jobParameters = new JobParametersBuilder().addString("taskId", taskId).addString("uuid", UUID.randomUUID().toString().replace("-","")).toJobParameters();// 传入一个Job任务和任务需要的参数jobLauncher.run(updateDeviceJob, jobParameters);}
}

4、SpringBatch配置类

注意:此部分最重要

@Configuration
public class BatchConfiguration {private static final Logger log = LoggerFactory.getLogger(BatchConfiguration.class);@Value("${batch-size:5000}")private int batchSize;// 框架自动注入@Autowiredpublic JobBuilderFactory jobBuilderFactory;// 框架自动注入@Autowiredpublic StepBuilderFactory stepBuilderFactory;// 数据过滤器,对从数据库读出来的数据,注意进行操作@Autowiredpublic TaskItemProcessor taskItemProcessor;// 接收job参数public Map<String, JobParameter> parameters;public Object taskId;@Autowiredprivate JdbcTemplate jdbcTemplate;// 读取数据库操作@Bean@StepScopepublic JdbcCursorItemReader<DispatchRequest> itemReader(DataSource dataSource) {String querySql = " SELECT " +" e. ID AS taskId, " +" e.user_id AS userId, " +" e.timing_startup AS startTime, " +" u.device_id AS deviceId, " +" d.app_name AS appName, " +" d.compose_file AS composeFile, " +" e.failure_retry AS failureRetry, " +" e.tetry_times AS retryTimes, " +" e.device_managered AS deviceManagered " +" FROM " +" eiot_upgrade_task e " +" LEFT JOIN eiot_upgrade_device u ON e. ID = u.upgrade_task_id " +" LEFT JOIN eiot_app_detail d ON e.app_id = d. ID " +" WHERE " +" ( " +" u.device_upgrade_status = 0 " +" OR u.device_upgrade_status = 2" +" )" +" AND e.tetry_times > u.retry_times " +" AND e. ID = ?";return new JdbcCursorItemReaderBuilder<DispatchRequest>().name("itemReader").sql(querySql).dataSource(dataSource).queryArguments(new Object[]{parameters.get("taskId").getValue()}).rowMapper(new DispatchRequest.DispatchRequestRowMapper()).build();}// 将结果写回数据库@Bean@StepScopepublic ItemWriter<ProcessResult> itemWriter() {return new ItemWriter<ProcessResult>() {private int updateTaskStatus(DispatchRequest dispatchRequest, int status) {log.info("update taskId: {}, deviceId: {} to status {}", dispatchRequest.getTaskId(), dispatchRequest.getDeviceId(), status);Integer retryTimes = jdbcTemplate.queryForObject("select retry_times from eiot_upgrade_device where device_id = ? and upgrade_task_id = ?",new Object[]{ dispatchRequest.getDeviceId(), dispatchRequest.getTaskId()}, Integer.class);retryTimes += 1;int updateCount = jdbcTemplate.update("update eiot_upgrade_device set device_upgrade_status = ?, retry_times = ? " +"where device_id = ? and upgrade_task_id = ?", status, retryTimes, dispatchRequest.getDeviceId(), dispatchRequest.getTaskId());if (updateCount <= 0) {log.warn("no task updated");} else {log.info("count of {} task updated", updateCount);}// 最后一次重试if (status == STATUS_DISPATCH_FAILED && retryTimes == dispatchRequest.getRetryTimes()) {log.info("the last retry of {} failed, inc deviceManagered", dispatchRequest.getTaskId());return 1;} else {return 0;}}@Override@Transactionalpublic void write(List<? extends ProcessResult> list) throws Exception {Map taskMap = jdbcTemplate.queryForMap("select device_managered, device_count, task_status from eiot_upgrade_task where id = ?",list.get(0).getDispatchRequest().getTaskId() // 我们认定一个批量里面,taskId都是一样的);int deviceManagered = (int)taskMap.get("device_managered");Integer deviceCount = (Integer) taskMap.get("device_count");if (deviceCount == null) {log.warn("deviceCount of task {} is null", list.get(0).getDispatchRequest().getTaskId());}int taskStatus = (int)taskMap.get("task_status");for (ProcessResult result: list) {deviceManagered += updateTaskStatus(result.getDispatchRequest(), result.getStatus());}if (deviceCount != null && deviceManagered == deviceCount) {taskStatus = 2; //任务状态 0:待升级,1:升级中,2:已完成}jdbcTemplate.update("update eiot_upgrade_task set device_managered = ?, task_status = ? " +"where id = ?", deviceManagered, taskStatus, list.get(0).getDispatchRequest().getTaskId());}};}/*** 定义一个下发更新的 job* @return*/@Beanpublic Job updateDeviceJob(Step updateDeviceStep) {return jobBuilderFactory.get(UUID.randomUUID().toString().replace("-", "")).listener(new JobListener()) // 设置Job的监听器.flow(updateDeviceStep)// 执行下发更新的Step.end().build();}/*** 定义一个下发更新的 step* @return*/@Beanpublic Step updateDeviceStep(JdbcCursorItemReader<DispatchRequest> itemReader,ItemWriter<ProcessResult> itemWriter) {return stepBuilderFactory.get(UUID.randomUUID().toString().replace("-", "")).<DispatchRequest, ProcessResult> chunk(batchSize).reader(itemReader) //根据taskId从数据库读取更新设备信息.processor(taskItemProcessor) // 每条更新信息,执行下发更新接口.writer(itemWriter).build();}// job 监听器public class JobListener implements JobExecutionListener {@Overridepublic void beforeJob(JobExecution jobExecution) {log.info(jobExecution.getJobInstance().getJobName() + " before... ");parameters = jobExecution.getJobParameters().getParameters();taskId = parameters.get("taskId").getValue();log.info("job param taskId : " + parameters.get("taskId"));}@Overridepublic void afterJob(JobExecution jobExecution) {log.info(jobExecution.getJobInstance().getJobName() + " after... ");// 当所有job执行完之后,查询设备更新状态,如果有失败,则要定时重新执行jobString sql = " SELECT " +" count(*) " +" FROM " +" eiot_upgrade_device d " +" LEFT JOIN eiot_upgrade_task u ON d.upgrade_task_id = u. ID " +" WHERE " +" u. ID = ? " +" AND d.retry_times < u.tetry_times " +" AND ( " +" d.device_upgrade_status = 0 " +" OR d.device_upgrade_status = 2 " +" ) ";// 获取更新失败的设备个数Integer count = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);log.info("update device failure count : " + count);// 下面是使用Quartz触发定时任务// 获取任务时间,单位秒
// String time = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);// 此处方便测试,应该从数据库中取taskId对应的重试间隔,单位秒Integer millSecond = 10;if(count != null && count > 0){String jobName = "UpgradeTask_" + taskId;String reTaskId = taskId.toString();Map<String,Object> params = new HashMap<>();params.put("jobName",jobName);params.put("taskId",reTaskId);if (QuartzManager.checkNameNotExist(jobName)){QuartzManager.scheduleRunOnceJob(jobName, RunOnceJobLogic.class,params,millSecond);}}}}
}

5、Processor设置

处理每条数据,可以在此对数据进行过滤操作。

@Component("taskItemProcessor")
public class TaskItemProcessor implements ItemProcessor<DispatchRequest, ProcessResult> {public static final int STATUS_DISPATCH_FAILED = 2;public static final int STATUS_DISPATCH_SUCC = 1;private static final Logger log = LoggerFactory.getLogger(TaskItemProcessor.class);@Value("${upgrade-dispatch-base-url:http://localhost/api/v2/rpc/dispatch/command/}")private String dispatchUrl;@AutowiredJdbcTemplate jdbcTemplate;/*** 在这里,执行 下发更新指令 的操作* @param dispatchRequest* @return* @throws Exception*/@Overridepublic ProcessResult process(final DispatchRequest dispatchRequest) {// 调用接口,下发指令String url = dispatchUrl + dispatchRequest.getDeviceId()+"/"+dispatchRequest.getUserId();log.info("request url:" + url);RestTemplate restTemplate = new RestTemplate();HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON_UTF8);MultiValueMap<String, String> params = new LinkedMultiValueMap<String, String>();JSONObject jsonOuter = new JSONObject();JSONObject jsonInner = new JSONObject();try {jsonInner.put("jobId",dispatchRequest.getTaskId());jsonInner.put("name",dispatchRequest.getName());jsonInner.put("composeFile", Base64Util.bytesToBase64Str(dispatchRequest.getComposeFile()));jsonInner.put("policy",new JSONObject().put("startTime",dispatchRequest.getPolicy()));jsonInner.put("timestamp",dispatchRequest.getTimestamp());jsonOuter.put("method","updateApp");jsonOuter.put("params",jsonInner);} catch (JSONException e) {log.info("JSON convert Exception :" + e);}catch (IOException e) {log.info("Base64Util bytesToBase64Str :" + e);}log.info("request body json :" + jsonOuter);HttpEntity<String> requestEntity = new HttpEntity<String>(jsonOuter.toString(),headers);int status;try {ResponseEntity<String> response = restTemplate.postForEntity(url,requestEntity,String.class);log.info("response :" + response);if (response.getStatusCode() == HttpStatus.OK) {status = STATUS_DISPATCH_SUCC;} else {status = STATUS_DISPATCH_FAILED;}}catch (Exception e){status = STATUS_DISPATCH_FAILED;}return new ProcessResult(dispatchRequest, status);}
}

6、封装实体Bean

封装数据库返回数据的实体Bean,注意静态内部类

public class DispatchRequest {private String taskId;private String deviceId;private String userId;private String name;private byte[] composeFile;private String policy;private String timestamp;private String md5;private int failureRetry;private int retryTimes;private int deviceManagered;// 省略构造函数,setter/getter/tostring方法//......public static class DispatchRequestRowMapper implements RowMapper<DispatchRequest> {@Overridepublic DispatchRequest mapRow(ResultSet resultSet, int i) throws SQLException {DispatchRequest dispatchRequest = new DispatchRequest();dispatchRequest.setTaskId(resultSet.getString("taskId"));dispatchRequest.setUserId(resultSet.getString("userId"));dispatchRequest.setPolicy(resultSet.getString("startTime"));dispatchRequest.setDeviceId(resultSet.getString("deviceId"));dispatchRequest.setName(resultSet.getString("appName"));dispatchRequest.setComposeFile(resultSet.getBytes("composeFile"));dispatchRequest.setTimestamp(DateUtil.DateToString(new Date()));dispatchRequest.setRetryTimes(resultSet.getInt("retryTimes"));dispatchRequest.setFailureRetry(resultSet.getInt("failureRetry"));dispatchRequest.setDeviceManagered(resultSet.getInt("deviceManagered"));return dispatchRequest;}}
}

7、启动类上要加上注解

@SpringBootApplication
@EnableBatchProcessing
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}

总结

其实SpringBatch并没有想象中那么好用,当从数据库中每次取5000条数据后,进入processor中是逐条处理的,这个时候不能不行操作,等5000条数据处理完之后,再一次性执行ItemWriter方法。

在使用的过程中,最坑的地方是ItemReader和ItemWriter这两个地方,如何执行自定义的Sql,参考文中代码就行。

至于Quartz定时功能,很简单,只要定时创建SpringBatch里面的Job,让这个job启动就好了。

只有当你开始,你才会到达你的理想和目的地,只有当你努力,
你才会获得辉煌的成功,只有当你播种,你才会有所收获。只有追求,
才能尝到成功的味道,坚持在昨天叫立足,坚持在今天叫进取,坚持在明天叫成功。欢迎所有小伙伴们点赞+收藏!!!

如何优雅的整合定时批量任务(荣耀典藏版)相关推荐

  1. SpringBoot + SpringBatch + Quartz整合定时批量任务

    点击关注公众号,实用技术文章及时了解 来源:blog.csdn.net/zxd1435513775/article/ details/99677223 一.引言 最近一周,被借调到其他部门,赶一个紧急 ...

  2. 搭建大型分布式服务(二十二)SpringBoot 如何优雅地整合多个kafka数据源?

    系列文章目录 文章目录 系列文章目录 前言 一.本文要点 二.开发环境 三.创建项目 四.修改项目 五.测试一下 六.小结 前言 在日常开发当中,经常会遇到需要消费的topic不在同一个kafka集群 ...

  3. redis缓存队列+MySQL +php任务脚本定时批量入库

    原文地址:http://blog.jobbole.com/99567/ 需求背景:有个调用统计日志存储和统计需求,要求存储到mysql中:存储数据高峰能达到日均千万,瓶颈在于直接入库并发太高,可能会把 ...

  4. SpringBoot整合SpringEmail 批量发送邮件

    SpringBoot整合SpringEmail 批量发送邮件 前言:公司目前有个业务就是向订阅了客户发送邮件,所以我把这块的内容记录下来 1.引入依赖 <!-- email--><d ...

  5. Spring整合定时任务调度框架Quartz实

    Spring整合定时任务调度框架Quartz实战 定时的任务处理在程序开发中应用的相当普遍,之前一直使用JDK的Timer类库来做任务调度功能不是很方便,因为它不能像cron服务那样可以指定具体年.月 ...

  6. dede 织梦文章整合的批量插入图片

    你是否厌倦了在后台一张一张插入图片?你是否厌倦了把图片放到一个文件夹下命名为1.jpg,2.jpg...然后再手动复制地址到缩略图的地方,今天 CMS站长 莪叆啰带来织梦整合的批量插入图片功能,只需运 ...

  7. 王者荣耀进不去服务器维护中,王者荣耀苹果版更新后进不去 王者荣耀iOS版服务器维护怎么办...

    随着7月4日王者荣耀停机维护公告推出后,不少玩家开始更新游戏,但iOS用户反映,安卓手机用户早早就进入了游戏,并开始练习元歌这款新英雄,而自己更新王者荣耀后,一直服务器正在维护中,这是怎么回事呢? 王 ...

  8. 开创先河!《王者荣耀国际版》成为东南亚运动会正式比赛项目

    [TechWeb]3月24日消息,在今日的UP2019腾讯新文创生态大会上,腾讯互动娱乐自研.综合市场部总经理,腾讯电竞业务负责人侯淼透露,电竞已经成为体育新物种,腾讯电竞旗下<王者荣耀国际版& ...

  9. 花木兰荣耀典藏皮肤特效一览 花木兰九霄神辉值得入手吗

    王者荣耀新赛季,正式开启,而在此次也更新了许多新皮,其中就有花木兰的荣耀典藏皮肤"九霄神辉",那么,这款皮肤的特效如何呢?下面花木兰荣耀典藏皮肤特效吧. 花木兰荣耀典藏皮肤特效一览 ...

最新文章

  1. 【译】在ASP.NET中创建PDF-iTextSharp起步
  2. liunx配置本地yum源和更新aliyun yum源
  3. python开发是做什么的-python开发工程师是做什么的
  4. 从这篇文章可以看出有些错误,由此可以看出,还是看msdn要好的多,这是我的经验
  5. SubSonic数据库操作
  6. Activity管理
  7. 回到网易后开源APM技术选型与实战
  8. C++中void和void*指针的含义 (指针类型的含义)
  9. 初创团队可能不适合应届生小孩
  10. 易车与中国海洋报社达成深度合作 开拓海洋主题汽车内容
  11. 李飞飞等6名华人入选ACM 2018 Fellow,无国内学者入选
  12. Linux 6.4 partprobe出现warning问题
  13. CSP介绍、以及使用CryptoAPI枚举CSP并获取其属性
  14. python实现小游戏-猜年龄
  15. 为什么Uber微服务架构使用多租户?
  16. Android显示九宫图(自定义圆角,仿微信九宫格图)
  17. python 模拟触屏电脑操作_如何在硒中模拟触摸屏?
  18. Vue.js 框架源码与进阶 - Vue.js 源码剖析 - 响应式原理
  19. prim算法c语言,Prim算法(一)之 C语言详解
  20. r语言 python 股票_R语言:抓取股票数据并存入数据库进行分析实例 MySQL

热门文章

  1. python数学公式代码_PythonStudy_‘数学公式计算器’代码
  2. Excel如何提取文本左边的数字
  3. 拆字在线版-扌斥字在纟戋片反
  4. 计算机开机其他用户,Win8.1系统下取消开机显示“其他用户”的方法
  5. LiveMe x TiDB丨单表数据量 39 亿条,简化架构新体验
  6. 程序退出,段错误segment default问题定位
  7. 《极客时间-技术管理实战》读书笔记
  8. Qlik Sense申请试用账号详细步骤(可试用30天)
  9. ERROR: Cannot uninstall ‘llvmlite‘. It is a distutils installed project and thus we cannot accuratel
  10. 什么是云计算和大数据?他们之间的区别是什么?