tags:springbatch

1.引言

上一篇《便捷的数据读写-spring batch(5)结合beetlSql进行数据读写》中使用Spring Batch及BeetlSql,对数据库读写组件进行数据库同步,实际上是全量同步。全量同步的问题在于每次需要读取整个表数据,如果表数据量大,则资源耗费大,而且不便于对已有数据的更新。因此,在数据同步过程中,更多的使用增量同步,即通过某些条件,区分新数据进行插入,对有变化的数据进行更新,对不存在的数据进行删除等(当然,一般都不会对数据进行物理删除,只做逻辑删除,因此也就变成了数据更新操作)。

增量更新更多情况需要依据上一次更新后的状态(如时间、自增ID,数据位置等),下一次更新以上一次更新的状态为基础,因此,需要把每一次更新后的状态以变量参数的方式保存下来,下一次更新则以此状态数据为动态参数来使用。Spring Batch支持任务运行时的动态参数,结合此特性,可以实现数据的增量同步。

2.开发环境

JDK: jdk1.8

Spring Boot: 2.1.4.RELEASE

Spring Batch:4.1.2.RELEASE

开发IDE: IDEA

构建工具Maven: 3.3.9

日志组件logback:1.2.3

lombok:1.18.6

3.增量同步简述

增量同步,是相对与全量同步,即每次同步,只需要同步源数据库变化的部分,这样提高了数据同步的效率。是当前数据同步的普遍方式。抽取变化的数据,又名CDC,即Change Data Capture变化数据捕获。在《Pentaho Kettle解决方案:使用PDI构建开源ETL解决方案 》一书中,对CDC作了比较详细说明。此处简要做一下说明,当前实现增量同步的方式有4种,分别是基于源数据的CDC,基于触发器的CDC,基于快照的CDC,基于日志的CDC。

3.1 基于源数据的CDC

基于源数据的CDC要求源数据里有相关的属性列,利用这些属性列,可以判断出哪里是增量数据,最常见的属性列有:

时间戳

基于时间来标识数据,至少需要一个时间,最好两个,一个标识创建,一个标识更新时间,所以一般我们设计数据库时都会添加sys_create_time和sys_update_time作为默认字段,并且设计为默认当前时间和更新处理。

自增序列

使用数据库表的自增序列字段(一般是主键),来标识新插入的数据。不过现实中用得比较少。

此方法需要有一个临时表来保存上一次更新时间或,在实践中,一般是在独立的模式下创建此表,保存数据。下一次更新则比较上一次时间或序列。这是用得比较普遍的方式,本文中的增量同步也是使用此方法。

3.2 基于触发器的CDC

在数据库中编写触发器,当前数据库执行INSERT,UPDATE,DELETE等语句时,可以激活数据库中的触发器,然后触发器可以把这些变更的数据保存到中间临时表,然后再从临时表中获取这些数据,同步到目标数据库中。当然,这种方法是入侵性最强的,一般数据库都不允许向数据库里添加触发器(影响性能)。

3.3 基于快照的CDC

此方法就是一次抽取当前全部数据放到缓冲区,作为快照,下一次同步时从源数据读取数据,然后和快照做比较,找出变化的数据。简单来说是就做全表读取与比较,找出变化的数据。做全表扫描,问题就在于性能,所以一般不会使用这种方式。

3.4 基于日志的CDC

最高级和最没有入侵性的方法就是基于日志的方式,数据库会把插入、更新、删除的操作记到日志中,如Mysql会有binlog,增量同步可以读取日志文件,把二进制文件转为可理解的方式,然后再把里面的操作按照顺序重做一遍。但是这种方式只能对同种数据库有效,对于异构的数据库就无法实现了。而且实现起来有一定的难度。

3.5 本示例增量同步方法说明

在本示例中,依然是基于test_user表进行增量同步,表有字段sys_create_time和sys_update_time来标识数据创建和更新时间(当前,若现实情况中只有一个时间,也可以只基于此时间,只是这样就比较难标识此数据是更新还是插入了)。增量同步流程如下:

流程

说明:

每次同步,会先读取临时表,获取上一次同步后数据的时间。

若是第一次同步,则全部同步,若不是,则根据时间作为查询语句的参数。

根据时间读取数据后,把数据插入目标表

更新临时表的数据时间,以便下一次同步。

4.Spring Batch动态参数绑定

根据上面的增量同步流程,关键点在于把数据时间保存到临时表,在数据读取时可以作为比较的条件。而这时间参数是动态的,在任务执行时才传递进去,在Spring Batch中,支持动态参数绑定,只需要使用@StepScope注解即可,结合BeetlSql,很快就可以实现增量同步。本示例是基于上一篇文章的示例来进一步开发的,可以下载源码查看完整示例。

4.1 沿用原来数据库配置和多数据源

源数据库: mytest

目标数据库: my_test1

spring batch数据库: my_spring_batch

同步的数据表:test_user

4.2 创建临时表

使用示例中的sql/initCdcTempTable.sql,在my_spring_batch库中,创建临时表cdc_temp,并插入记录为1的记录,标识是同步test_user表。此处,我们只需要关注last_update_time和current_update_time,前者表示上一次同步完后的数据最后时间,后者表示上一次同步后的系统时间。

4.3 添加/修改dao

4.3.1 添加临时表dao及service类

添加类CdcTempRepository

根据配置,由于cdc_temp是在my_spring_batch,而它的读写是在dao.local包中,因此需要添加dao.local包,然后添加类CdcTempRepository,如下所示:

@Repository

public interface CdcTempRepository extends BaseMapper {

}

添加类CdcTempService,用于cdc_temp表的读取及数据更新

主要包括两个函数,一个是根据ID获取当前的cdc_temp记录,以便获取数据上一次同步的数据最后时间。一个是在同步完成后,更新cdc_temp的数据。如下:

/**

* 根据id获取cdc_temp的记录

* @param id 记录ID

* @return {@link CdcTemp}

*/

public CdcTemp getCurrentCdcTemp(int id){

return cdcTempRepository.getSQLManager().single(CdcTemp.class, id);

}

/**

* 根据参数更新cdcTemp表的数据

* @param cdcTempId cdcTempId

* @param status job状态

* @param lastUpdateTime 最后更新时间

*/

public void updateCdcTempAfterJob(int cdcTempId,BatchStatus status,Date lastUpdateTime){

//获取

CdcTemp cdcTemp = cdcTempRepository.getSQLManager().single(CdcTemp.class, cdcTempId);

cdcTemp.setCurrentUpdateTime(DateUtil.date());

//正常完成则更新数据时间

if( status == BatchStatus.COMPLETED){

cdcTemp.setLastUpdateTime(lastUpdateTime);

}else{

log.info(LogConstants.LOG_TAG+"同步状态异常:"+ status.toString());

}

//设置同步状态

cdcTemp.setStatus(status.name());

cdcTempRepository.updateById(cdcTemp);

}

4.3.2 修改源数据dao

在源数据dao类OriginUserRepository添加函数getOriginIncreUser,此函数对应user.md中的sql语句。

4.3.3 修改目标数据dao

在目标数据dao类TargetUserRepository中添加函数selectMaxUpdateTime,用于查询同步后数据的最后时间。由于此方法的sql简单,可以直接使用@Sql注解,如下所示:

@Sql(value="select max(sys_update_time) from test_user")

Date selectMaxUpdateTime();

4.4 修改user.md中的sql语句。

4.4.1 添加增量读数据sql

在user.md中添加增量读数据的sql语句,如下:

getOriginIncreUser

===

* 查询user数据

select * from test_user

WHERE 1=1

@if(!isEmpty(lastUpdateTime)){

AND (sys_create_time >= #lastUpdateTime# OR sys_update_time >= #lastUpdateTime#)

@}

说明:

@开头是beetl的语法,可以对变量读取和逻辑判断,此处的意思是如果变量lastUpdateTime不为空,则按此条件进行读取。

lastUpdateTime变量由调用时传入(Map)

具体beetl使用语法,可参见官方文档

4.4.2 编写增量插入sql语句

对于Mysql数据库,有insert into ... on duplicate key update ...的用法,即可以根据唯一键(主键或唯一索引),若数据已存在,则更新,不存在,则插入。在user.md文件中,添加以下语句:

insertIncreUser

===

* 插入数据

insert into test_user(id,name,phone,title,email,gender,date_of_birth,sys_create_time,sys_create_user,sys_update_time,sys_update_user)

values (#id#,#name#,#phone#,#title#,#email#,#gender#,#dateOfBirth#

,#sysCreateTime#,#sysCreateUser#,#sysUpdateTime#,#sysUpdateUser#)

ON DUPLICATE KEY UPDATE

id = VALUES(id),

name = VALUES(name),

phone = VALUES(phone),

title = VALUES(title),

email = VALUES(email),

gender = VALUES(gender),

date_of_birth = VALUES(date_of_birth),

sys_create_time = VALUES(sys_create_time),

sys_create_user = VALUES(sys_create_user),

sys_update_time = VALUES(sys_update_time),

sys_update_user = VALUES(sys_update_user)

4.5 编写Spring Batch的组件

Spring Batch文件结构如下:

文件结构

4.5.1 ItemReader

此处与之前的一致,只需要把getOriginUser函数改为getOriginIncreUser即可。

4.5.2 ItemWriter

此处与之前的一致,只需要把sql的ID由user.insertUser改为user.insertIncreUser即可。

4.5.3 添加IncrementJobEndListener

由于数据同步完后,最后一步就是要更新临时表的最后时间数据。如下:

@Slf4j

public class IncrementJobEndListener extends JobExecutionListenerSupport {

@Autowired

private CdcTempService cdcTempService;

@Autowired

private TargetUserRepository targetUserRepository;

@Override

public void afterJob(JobExecution jobExecution) {

BatchStatus status = jobExecution.getStatus();

Date latestDate = targetUserRepository.selectMaxUpdateTime();

cdcTempService.updateCdcTempAfterJob(SyncConstants.CDC_TEMP_ID_USER,status,latestDate);

}

}

说明:

先查询当前数据库中数据最后时间(selectMaxUpdateTime)

更新中间表数据cdc_temp中的last_update_time

4.5.4 添加任务启动时参数初始化

在数据同步的第一步,需要先初始化临时表中的数据最后更新时间,因此在任务启动前,先要进行任务参数设置,以便于把时间参数传到任务中,在任务执行时使用。如下:

public JobParameters initJobParam(){

CdcTemp currentCdcTemp = cdcTempService.getCurrentCdcTemp(getCdcTempId());

//若未初始化,则先查询数据库中对应的最后时间

if(SyncConstants.STR_STATUS_INIT.equals(currentCdcTemp.getStatus())

|| SyncConstants.STR_STATUS_FAILED.equals(currentCdcTemp.getStatus())){

Date maxUpdateTime = selectMaxUpdateTime();

//若没有数据,则按初始时间处理

if(Objects.nonNull(maxUpdateTime)){

currentCdcTemp.setLastUpdateTime(maxUpdateTime);

}

}

return JobUtil.makeJobParameters(currentCdcTemp);

}

4.5.5 组装完整任务

最后,需要一个IncrementBatchConfig配置把读、处理、写、监听组装起来,值得一提的是,在配置读组件时,由于需要使用动态参数,此处需要添加@StepScope注解,同时在参数中使用spEL获取参数内容,如下所示:

@Bean

@StepScope

public ItemReader incrementItemReader(@Value("#{jobParameters['lastUpdateTime']}") String lastUpdateTime) {

IncrementUserItemReader userItemReader = new IncrementUserItemReader();

//设置参数,当前示例可不设置参数

Map params = CollUtil.newHashMap();

params.put(SyncConstants.STR_LAST_UPDATE_TIME,lastUpdateTime);

userItemReader.setParams(params);

return userItemReader;

}

4.5.6 测试

参考上一文章的BeetlsqlJobTest,编写IncrementJobTest测试文件。由于需要测试增量同步,测试流程如下所示:

测试前增量添加数据

测试前,源数据表和目标数据表已经有数据,在源数据表中,执行代码中的sql/user-data-new.sql添加新的用户。注意,由于sys_create_time和sys_update_time定义如下:

`sys_create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,

`sys_update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

从而达到数据插入时自动生成时间,修改时也自动更新时间。

运行测试

以单元测试运行incrementJob。

查看结果

运行完成后,结果如下:

输出

增量同步后,数据如下:

结果

5.总结

本文先对增量同步做了一个简单的介绍,列出当前一般使用的增量同步方法,然后使用Spring Batch和BeetlSql使用基于时间戳的方式实现增量同步,本示例具有一定的实用性,希望能对做数据同步或相关批处理的开发者有帮助。

springboot实现增量备份_增量同步-spring batch(6)动态参数绑定与增量同步相关推荐

  1. java实现mysql增量备份_企业级MySQL备份原理

    企业级MYSQL备份恢复原理 1. 全量备份 全量数据就是数据库中所有的数据,全量备份就是把数据库中所有的数据进行备份. 例如: 备份所有库: # mysqldump -uroot -poldboy ...

  2. windows mysql增量备份_关于window下mysql数据库增量备份

    mysql增量备份(mysql5.1 之后),linux下mysql增量备份 网上资料很多.这里只说明window下mysql增量备份. 定义: mysql数据库会以二进制的形式,自动把用户对mysq ...

  3. mysql 增量备份_云计算-开源数据库-备份

    关于备份: 备份原因:怕丢,怕被误删. 备份目标:数据的一致性,服务的可用性. 备份技术:物理备份/冷备份 直接复制数据库文件,适用于大型数据库环境,不受存储引擎的限制,但不能恢复到不同的MySQL版 ...

  4. java中batch基础_详解Spring batch 入门学习教程(附源码)

    详解Spring batch 入门学习教程(附源码) 发布时间:2020-09-08 00:28:40 来源:脚本之家 阅读:99 作者:achuo Spring batch 是一个开源的批处理框架. ...

  5. batch spring 重复执行_重复的Spring Batch作业实例

    我有一个小的示例Spring Batch应用程序,该应用程序在首次启动时可以正常运行,但是每当我关闭该应用程序并重新启动jar时,我总是会收到此错误: Caused by: org.springfra ...

  6. java batch是什么_什么是 Spring Batch?

    Spring Batch是一个基于Spring的企业级批处理框架,按照我师父的说法,所有基于Spring的框架都是使用了spring的IoC特性,然后加上自己的一些处理规则.因此,要理解Spring ...

  7. springboot java获取版本号_深入实践Spring Boot 实战篇,大佬整理出的PDF文档

    如何使用Spring Boot 本文章将会详细介绍如何使用Spring Boot.它覆盖了构建系统,自动配置和运行/部署选项等主题.我们也覆盖了一些Spring Boot最佳实践.尽管Spring B ...

  8. java batch基础_详解Spring Batch 轻量级批处理框架实践

    实践内容 从 MariaDB 一张表内读 10 万条记录,经处理后写到 MongoDB . 具体实现 1.新建 Spring Boot 应用,依赖如下: org.springframework.boo ...

  9. url参数 转java对象_如何让Spring MVC接收的参数可以转换为java对象

    展开全部 可以使用@RequestBody注解:@RequestMapping(value = "user/saveUser"", method = RequestMet ...

最新文章

  1. bootstrap-fileinput组件在上传时传递额外参数
  2. ViewPager+Fragment实现TabHost(可复用的类)
  3. 【MATLAB】进阶绘图 ( colormap 颜色图矩阵分析 | 自定义 colormap 颜色图 | 生成 64 x 3 的 colormap 颜色图矩阵 )
  4. 成为一名优秀的程序员基本要素
  5. 社区奖品之原木双面记事板
  6. 解决VS2005“无法启动调试,绑定句柄失效”
  7. 2017.3.30 时态同步 失败总结
  8. 都是远程办公惹的祸!搜狗输入法为错误推送地震预警信息致歉
  9. Java字节码深入解析
  10. 冲刺阶段站立会议每天任务1
  11. 专线维护 07/11
  12. Ubuntu16.04 安装Spyder问题
  13. 今日arXiv精选 | Interspeech/KDD/TACL/ICCV/CIKM
  14. 使用SP Racing F3飞控ROSflight软件包的无人机自主飞行系统
  15. 控制翻页c语言,阅读器多种翻页的设计与实现
  16. 运维学python用不上_数读 | 为什么运维朋友们都需要学Python?
  17. JavaWeb 实验 Servlet用户登录验证
  18. Texas Instruments
  19. 易基因2022年度DNA甲基化研究高分项目文章精选
  20. 在Wireshark中按进程过滤

热门文章

  1. 免备案服务器要怎么选择?
  2. mewnet设备打开错误
  3. 【证书】certbot 工具,自动 letencrypt 通配符证书自动续期(renew)
  4. 部落冲突-家乡防御建筑介绍(城墙、加农炮、箭塔、迫击炮、防空火箭、法师塔、空气炮、特斯拉电磁塔、炸弹塔、X连弩、地狱之塔、天鹰火炮、投石炮)
  5. 计算机上的用户名是哪一个,电脑在哪登陆上网账号密码
  6. 获取某年某月的最后一天
  7. Ubuntu把在效能器范畴起更重要的脚色
  8. 前端找工作真的那么难吗?
  9. 中企动力与港田高分子同行 创建营销型企业云平台
  10. 个性化推荐:电子商务驱动力