spring batch item writer详解

  • ItemWrite
    • ItemWriter
    • ItemStream
    • 系统写组件
  • 写数据库
    • JdbcBatchItemWriter
    • JpaItemWriter
    • MyBatisItemWriter
  • 组合写
  • Item路由Writer
  • 服务复用
    • ItemWriterAdapter
    • PropertyExtractingDelegatingItemWriter
  • 自定义ItemWriter
    • 不可重启ItemWriter
    • 可重启ItemWriter
  • 拦截器
    • 接口
    • Annotation
    • 执行顺序
    • 拦截器异常
    • Merge

github地址:

https://github.com/a18792721831/studybatch.git

文章列表:

spring batch 入门

spring batch连接数据库

spring batch元数据

spring batch Job详解

spring batch step详解

spring batch ItemReader详解

spring batch itemProcess详解

spring batch itemWriter详解

spring batch 作业流

spring batch 健壮性

spring batch 扩展性

ItemWrite

spring batch通过Tasklet完成具体的任务,chunk类型的tasklet定义了标准的读、处理、写的执行步骤。ItemWriter是实现写的重要组件,spring batch框架提供了丰富的写基础设施来完成各种数据源的写入功能。

spring batch框架默认提供了丰富的Writer实现;如果不能满足需求可以快速方便地实现自定义的数据写入;对于已经存在的持久化服务,框架提供了复用现有服务的能力,避免重复开发。

spring batch框架通常针对大数据量进行处理,同时框架需要讲作业处理的状态实时地持久化到数据库中,如果读取一条记录就进行写操作或者状态数据的提交,会大量消耗系统资源,导致批处理框架性能较差。在面向批处理chunk的操作中,可以通过属性commit-interval设置read多少条记录后进行一次提交。通过设置commit-interval的间隔值,减少提交频次,降低资源使用率。

ItemWriter

ItemWriter是Step中对资源的写处理阶段,spring batch框架已经提供了各种类型的写实现。

所有的写操作需要实现ItemWriter接口

写操作的参数是一个List,所以通常情况下是批量写入。

ItemStream

spring batch框架童年时提供了另外一个重要的接口ItemStream。ItemStream接口定义了写操作与执行上下文ExecutionContext交互的能力。可以将已经写的条数通过该接口存放在执行上下文ExecutionContext中(ExecutionContext中的数据在批处理commit的时候会通过JobRepository持久化到数据库中),这样到Job发生异常重新启动Job的时候,写操作可以跳过已经成功写过的数据,继续从上次出错的地方(可以从执行上下文中获取上次成功写的位置)继续写。

ItemStream接口

open操作根据参数executionContext打开需要读取资源的stream;可以根据持久化在执行上下文executionContext中的数据重新定位需要写入记录的位置。

update操作将需要持久化的数据存放在执行上下文executionContext中

close操作关闭读取的资源

系统写组件

spring batch框架提供的写组件

ItemWriter 说明
FlatFileItemWriter 写Flat类型文件
MultiResourceItemWriter 多文件写组件
StaxEventItemWriter 写XML类型文件
AmqpItemWriter 写AMQP类型消息
ClassifierCompositeItemWriter 根据Classifier路由不同的Item到特定的ItemWriter处理
HibernateItemWriter 基于Hibernate方式写数据库
IbatisBatchItemWriter 基于Ibatis方式写数据库
ItemWriterAdapter ItemWriter适配器,可以复用现有的写服务
JdbcBatchItemWriter 基于JDBC方式写数据库
JmsItemWriter 写JMS队列
JpaItemWriter 基于Jpa方式写数据库
GemfireItemWriter 基于分布式数据库Gemfire的写组件
SpELMappingGemfireItemWriter 基于spring表达式语言写分布式数据库Gemfire的组件
MimeMessageItemWriter 发送邮件的写组件
MongoItemWriter 基于分布式文件存储的数据库MongoDB写组件
Neo4jItemWriter 面向网络的数据库Neo4j的写组件
PropertyExtractingDelegatingItemWriter 属性抽取代理写组件:通过调用给定的spring bean方法执行写入,参数有Item中指定的属性字段作为参数
RepositoryItemWriter 基于spring Data的写组件
SimpleMailMessageItemWriter 发送邮件的写组件
CompositeItemWriter 条目写的组合模式,支持组装多个ItemWriter

写数据库

spring batch框架对于写数据库提供了较好的支持,包括基于JDBC和ORM的写入方式。

JdbcBatchItemWriter

spring batch框架提供了对JDBC谢支持的组件JdbcBatchItemWriter。JdbcBatchItemWriter实现了ItemWriter接口,将Item对象转换为数据库中的记录。

JdbcBatchItemWriter对用户屏蔽了数据库访问的操作细节,且提供了批处理的特性,JdbcBatchItemWriter会批量执行一组SQL语句来提高性能,而不是逐条执行SQL语句,每次批量提交的语句数和chunk中定义的提交间隔是一致的。

JdbcBatchItemWriter关键接口

关键类 说明
DataSource 提供写入数据库的数据源信息
ItemPreparedStatementSetter 为SQL语句中有"?"的参数提供赋值接口
ColuniMapItemPreparedStatementSetter 接口ItemPreparedStatementSetter的实现类,提供基于列的参数设置
ItemSqlParameterSourceProvider 为SQL语句中有命名的参数提供赋值接口
BeanPropertyItemSqlParameterSourceProvider 从给定的Item中根据参数名称获取Item对应的属性值作为参数
NamedParameterJdbcOperations JdbcTemplate操作,提供执行SQL的能力

JdbcBatchItemWriter关键属性

JdbcBatchItemWriter属性 类型 说明
dataSourec DataSource 数据源,通过该属性指定使用的数据库信息
sql String 执行的SQL语句
itemSqlParameterSourceProvider ItemSqlParameterSourceProvider 为SQL语句中有命名的参数提供赋值
itemPreparedStatementSetter ItemPreparedStatementSetter 为SQL语句中有"?"的参数提供赋值
assertUpdates Boolean 当没有修改、删除一条记录时,是否抛出异常。默认抛出

使用JdbcBatchItemWriter至少需要配置dataSource,sql两个属性。dataSourec指定访问的数据源,sql用于指定处查询的SQL语句。

首先我们在数据库中创建表

接着创建JdbcBatchItemWriter的写入组件

然后使用这个写入器,完整代码

@EnableBatchProcessing
@Configuration
public class JdbcBatchItemWriterJobConf {@Beanpublic String runJob(JobLauncher jobLauncher, Job job) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {jobLauncher.run(job, new JobParametersBuilder().addDate("date", new Date()).toJobParameters());return "";}@Beanpublic Job job(JobBuilderFactory jobBuilderFactory, Step step) {return jobBuilderFactory.get("jdbc-batch-item-writer-step").start(step).build();}@Beanpublic Step step(StepBuilderFactory stepBuilderFactory, JdbcBatchItemWriter<People> writer) {AtomicLong atomicLong = new AtomicLong();return stepBuilderFactory.get("jdbc-batch-item-writer-step").<People, People>chunk(3).reader(() -> atomicLong.get() > 20 ? null : new People(null, "name : " + atomicLong.getAndIncrement())).processor((Function<People, People>) item -> {System.out.println("process : " + item);return item;}).writer(writer).build();}@Beanpublic JdbcBatchItemWriter<People> writer(DataSource dataSource) {return new JdbcBatchItemWriterBuilder<People>().dataSource(dataSource).sql("insert into people(id,name) values(null,?)").itemPreparedStatementSetter(((item, ps) -> ps.setString(1, item.getName()))).assertUpdates(false).build();}}

执行结果

数据库中查看结果

我们这里使用的是问号,也可以使用变量名

这次少写入点,写入10个

数据库中也有了

JpaItemWriter

对象关系映射(Object Relational Mapping,ORM)是一种为解决面向对象与关系数据库存在的互不匹配现象的技术。简单地说,ORM是通过使用描述对象和数据库之间映射的元数据,将Java程序中的对象自动持久化到关系数据库中。

JPA(Java Persistence API)是Sun官方提出的Java持久化规范。JPA通过注解或者XML描述对象-关系表的映射关系,并将运行期的实体对象持久化到数据库中;它为Java开发人员提供了一种对象/关系映射工具来管理Java应用中的关系数据。spring batch框架对ORM类型的JPA提供了基于写的ItemWriter。

JpaItemWriter实现了ItemWriter接口,核心作用是将Item对象转换为数据库中的记录。

JpaItemWriter关键属性

JpaItemWriter 类型 说明
entityManagerFactory EntityManagerFactory JPA提供的实体管理器的工厂类,用于生成实体管理EntityManager对象

使用JpaItemWriter需要配置属性entityManagerFactory:entityManagerFactory负责创建EntityManager,EntityManager负责完成对实体的增删改查。

首先引入JPA的依赖

接着配置Jpa的EntityManagerFactory

除此之外,还需要配置Jpa事务管理器(Jpa有默认的,事务级别和spring相同,做了适配)

整体代码

@EnableBatchProcessing
@Configuration
public class JpaItemWriterJobConf {@Beanpublic String runJob(JobLauncher jobLauncher, Job job) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {jobLauncher.run(job, new JobParametersBuilder().addDate("date", new Date()).toJobParameters());return "";}@Beanpublic Job job(JobBuilderFactory jobBuilderFactory, Step step) {return jobBuilderFactory.get("jpa-item-writer-step").start(step).build();}@Beanpublic Step step(StepBuilderFactory stepBuilderFactory, JpaItemWriter<People> writer,JpaTransactionManager jpaTransactionManager) {AtomicLong atomicLong = new AtomicLong();return stepBuilderFactory.get("jpa-item-writer-step").<People, People>chunk(3).reader(() -> atomicLong.get() > 10 ? null : new People(null, "jpa : " + atomicLong.getAndIncrement())).processor((Function<People, People>) item -> {System.out.println("jpa : " + item);return item;}).writer(writer).transactionManager(jpaTransactionManager).build();}@Beanpublic JpaItemWriter<People> writer(LocalContainerEntityManagerFactoryBean entityManagerFactoryBean) {return new JpaItemWriterBuilder<People>().entityManagerFactory(entityManagerFactoryBean.getObject()).build();}@Beanpublic LocalContainerEntityManagerFactoryBean entityManagerFactoryBean(DataSource dataSource, EntityManagerFactoryBuilder builder) {return builder.dataSource(dataSource).packages(Study8ItemwriterApplication.class).build();}@Beanpublic JpaTransactionManager transactionManager(DataSource dataSource) {final JpaTransactionManager transactionManager = new JpaTransactionManager();transactionManager.setDataSource(dataSource);return transactionManager;}
}

执行结果

数据库查询

MyBatisItemWriter

我们常用的ORM还有MyBatis,相比JPA,MyBatis的SQL可能更加直观(xml方式)。

MyBatisItemWriter需要指定数据源和SQL。在MyBatis中配置的是SqlSessionFactory.

首先引入MyBatis的依赖

接着配置MyBatis的SqlSessionFactory

增加MyBatis的接口

以及XML文件

增加扫描的Mapper注解

设置实体的作用域

创建MyBatis的写入器

assertUpdates默认为true,表示当1条记录都没有被插入时,会抛出异常。设置为false,则忽略这个问题 ,即使没有数据被插入,也不会抛出异常。

同样的,对于MyBatis,一定要保证Mapper先读取

Job的完整代码

@EnableBatchProcessing
@Configuration
public class MyBatisBatchItemWriterJobConf {@Beanpublic String runJob(JobLauncher jobLauncher, Job job, PeopleDao peopleDao) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {jobLauncher.run(job, new JobParametersBuilder().addDate("date", new Date()).toJobParameters());return "";}@Beanpublic Job job(JobBuilderFactory jobBuilderFactory, Step step) {return jobBuilderFactory.get("mybatis-batch-item-writer-step").start(step).build();}@Beanpublic Step step(StepBuilderFactory stepBuilderFactory, MyBatisBatchItemWriter<People> writer) {AtomicLong atomicLong = new AtomicLong();return stepBuilderFactory.get("mybatis-batch-item-writer-step").<People, People>chunk(3).reader(() -> atomicLong.get() > 10 ? null : new People(null, "mybatis : " + atomicLong.getAndIncrement())).processor((Function<People, People>) item -> {System.out.println("mybatis : " + item);return item;}).writer(writer).build();}@Beanpublic MyBatisBatchItemWriter<People> writer(SqlSessionFactoryBean sqlSessionFactoryBean) throws Exception {return new MyBatisBatchItemWriterBuilder<People>().sqlSessionFactory(sqlSessionFactoryBean.getObject()).assertUpdates(false).statementId("addPeople").build();}@Beanpublic SqlSessionFactoryBean sqlSessionFactoryBean(DataSource bossDataSource) {SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();factoryBean.setDataSource(bossDataSource);factoryBean.setTypeAliasesPackage("com.study.study8itemwriter.domain");return factoryBean;}}

执行结果

数据库记录

组合写

在spring batch框架中对于chunk只能配置一个ItemWriter,但在有些业务场景中需要将一个Item同时写到多个不同的资源文件中,即需要写入到多个ItemWriter中。spring batch框架提供了组合ItemWriter(CompositeItemWriter)的模式满足需求。

但是吧,经过我自己验证,对于同一个数据库,同时使用MyBtais和Jpa,似乎只有Jpa起作用。

我们在上面的例子中进行:

首先是两个Jpa的writer

以及一个MyBatis的Writer

最后全部扔到组合写处理器中

启动

发现数据库中只存入了5条记录,而不是预期的15条

而多数据源,或者不同种类的数据源,是能预期写入的。

如果我们去除Jpa,只有Mybatis

完整代码

@EnableBatchProcessing
@Configuration
public class CompositeItemWriterJobConf {@Beanpublic String runJob(JobLauncher jobLauncher, Job job, PeopleDao peopleDao) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {jobLauncher.run(job, new JobParametersBuilder().addDate("date", new Date()).toJobParameters());return "";}@Beanpublic Job job(JobBuilderFactory jobBuilderFactory, Step step) {return jobBuilderFactory.get("compo-batch-item-writer-step").start(step).build();}@Beanpublic Step step(StepBuilderFactory stepBuilderFactory, MyBatisBatchItemWriter<People> myBatisWriter1, MyBatisBatchItemWriter<People> myBatisWriter2) {AtomicLong atomicLong = new AtomicLong();CompositeItemWriter<People> writer = new CompositeItemWriterBuilder<People>().delegates(Arrays.asList(myBatisWriter1, myBatisWriter2)).ignoreItemStream(true).build();return stepBuilderFactory.get("compo-batch-item-writer-step").<People, People>chunk(3).reader(() -> atomicLong.get() > 5 ? null : new People(null, "compo : " + atomicLong.getAndIncrement())).processor((Function<People, People>) item -> {System.out.println("compo : " + item);return item;}).writer(writer).build();}@Beanpublic MyBatisBatchItemWriter<People> myBatisWriter1(SqlSessionFactoryBean sqlSessionFactoryBean) throws Exception {return new MyBatisBatchItemWriterBuilder<People>().sqlSessionFactory(sqlSessionFactoryBean.getObject()).assertUpdates(false).statementId("addPeople").build();}@Beanpublic MyBatisBatchItemWriter<People> myBatisWriter2(SqlSessionFactoryBean sqlSessionFactoryBean) throws Exception {return new MyBatisBatchItemWriterBuilder<People>().sqlSessionFactory(sqlSessionFactoryBean.getObject()).assertUpdates(false).statementId("addPeople").build();}@Beanpublic SqlSessionFactoryBean sqlSessionFactoryBean(DataSource bossDataSource) {SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();factoryBean.setDataSource(bossDataSource);factoryBean.setTypeAliasesPackage("com.study.study8itemwriter.domain");return factoryBean;}
}

数据还是5条

但是数据库中存入了10条

因为我们配置的是chunk3,所以每3条数据为一组。

Item路由Writer

在组合写中,我们是将1条记录,写到多处。

对于每一条记录都是多处写。相当于1条记录,拷贝了好多份。

还有一些场景:对于全部的数据,我们只希望存储1份,但是根据数据的重要程度,存储在不同的设备上。

比如:奇数记录在A数据库,偶数记录在B数据库。

总体还是1份数据。

spring batch框架提供了支持Item路由写的组件ClassifierCompositeItemWriter。

我们在原有代码的基础上,增加两个方法:

在真正持久化的时候,会标出是哪个方法写入的

接着创建这两个写入器

然后配置

当然还有其他的路由器

完整代码

@EnableBatchProcessing
@Configuration
public class ClassifierCompositeItemWriterJobConf {@Beanpublic String runJob(JobLauncher jobLauncher, Job job, PeopleDao peopleDao) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {jobLauncher.run(job, new JobParametersBuilder().addDate("date", new Date()).toJobParameters());return "";}@Beanpublic Job job(JobBuilderFactory jobBuilderFactory, Step step) {return jobBuilderFactory.get("compo-batch-item-writer-step").start(step).build();}@Beanpublic Step step(StepBuilderFactory stepBuilderFactory, MyBatisBatchItemWriter<People> myBatisWriter1, MyBatisBatchItemWriter<People> myBatisWriter2) {AtomicLong atomicLong = new AtomicLong();ClassifierCompositeItemWriter<People> itemWriter = new ClassifierCompositeItemWriterBuilder<People>().classifier(new BackToBackPatternClassifier<People, ItemWriter<? super People>>(people -> {String string = people.getName();return string.substring(string.lastIndexOf(":") + 1, string.length());}, str -> {Integer integer = Integer.parseInt(str.trim());if (integer % 2 == 0) {return myBatisWriter2;}return myBatisWriter1;})).build();return stepBuilderFactory.get("compo-batch-item-writer-step").<People, People>chunk(3).reader(() -> atomicLong.get() > 5 ? null : new People(null, "compo : " + atomicLong.getAndIncrement())).processor((Function<People, People>) item -> {System.out.println("compo : " + item);return item;}).writer(itemWriter).build();}@Beanpublic MyBatisBatchItemWriter<People> myBatisWriter1(SqlSessionFactoryBean sqlSessionFactoryBean) throws Exception {return new MyBatisBatchItemWriterBuilder<People>().sqlSessionFactory(sqlSessionFactoryBean.getObject()).assertUpdates(false).statementId("addPeople1").build();}@Beanpublic MyBatisBatchItemWriter<People> myBatisWriter2(SqlSessionFactoryBean sqlSessionFactoryBean) throws Exception {return new MyBatisBatchItemWriterBuilder<People>().sqlSessionFactory(sqlSessionFactoryBean.getObject()).assertUpdates(false).statementId("addPeople2").build();}@Beanpublic SqlSessionFactoryBean sqlSessionFactoryBean(DataSource bossDataSource) {SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();factoryBean.setDataSource(bossDataSource);factoryBean.setTypeAliasesPackage("com.study.study8itemwriter.domain");return factoryBean;}}

执行结果

数据库中的记录

可以很明显的看出,奇数使用的是1写入器,偶数使用2写入器。

服务复用

复用现有的企业资产和服务是提高企业应用开发的快捷手段,spring batch框架的写组件提供了复用现有服务的能力,利用spring batch框架提供的ItemWriterAdapter、PropertyExtractingDelegatingItemWriter可以方便地复用业务服务、spring bean、EJB或者其他远程服务。ItemWriterAdapter代理的现有服务需要能够处理Item对象;PropertyExtractingDelegatingItemWriter代理的服务支持更复杂的参数,参数可以根据指定的属性从Item中抽取。

ItemWriterAdapter

ItemWriterAdapter关键属性

ItemWriterAdapter属性 类型 说明
targetObject Object 需要调用的目标服务对象
targetMethod String 需要调用的目标操作名称
arguments Object[] 需要调用的操作参数。默认不需要传入参数,默认情况下,会将每次处理的item作为参数传入

首先创建一个简单服务

接着创建相关的job使用这个Service

创建写入器,并使用

完整代码

@EnableBatchProcessing
@Configuration
public class ItemWriterAdapterJobConf {@Beanpublic String runJob(JobLauncher jobLauncher, Job job) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {jobLauncher.run(job, new JobParametersBuilder().addDate("date", new Date()).toJobParameters());return "";}@Beanpublic Job job(JobBuilderFactory jobBuilderFactory, Step step) {return jobBuilderFactory.get("item-writer-adapter-job").start(step).build();}@Beanpublic Step step(StepBuilderFactory stepBuilderFactory, ItemWriterAdapter<People> writer) {AtomicLong atomicLong = new AtomicLong();return stepBuilderFactory.get("item-writer-adapter-step").<People, People>chunk(3).reader(() -> atomicLong.get() > 5 ? null : new People(null, "name : " + atomicLong.getAndIncrement())).processor((Function<People, People>) item -> {System.out.println("process : " + item);return item;}).writer(writer).build();}@Beanpublic ItemWriterAdapter<People> writer(PeopleService peopleService) {ItemWriterAdapter writerAdapter = new ItemWriterAdapter();writerAdapter.setTargetObject(peopleService);writerAdapter.setTargetMethod("print");return writerAdapter;}
}

执行结果

PropertyExtractingDelegatingItemWriter

PropertyExtractingDelegatingItemWriter代理的服务支持更加复杂的参数 ,参数可以根据指定的属性值从item中抽取(ItemWriterAdapter仅支持参数类型为具体的Item对象)。

比如增加如下服务

创建写入器

完整代码

@EnableBatchProcessing
@Configuration
public class PropertyExtractingDelegatingJobConf {@Beanpublic String runJob(JobLauncher jobLauncher, Job job) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {jobLauncher.run(job, new JobParametersBuilder().addDate("date", new Date()).toJobParameters());return "";}@Beanpublic Job job(JobBuilderFactory jobBuilderFactory, Step step) {return jobBuilderFactory.get("pro-item-writer-adapter-job").start(step).build();}@Beanpublic Step step(StepBuilderFactory stepBuilderFactory, PropertyExtractingDelegatingItemWriter<People> writer) {AtomicLong atomicLong = new AtomicLong();return stepBuilderFactory.get("pro-item-writer-adapter-step").<People, People>chunk(3).reader(() -> atomicLong.get() > 5 ? null : new People(null, " " + atomicLong.getAndIncrement())).processor((Function<People, People>) item -> {System.out.println("process : " + item);return item;}).writer(writer).build();}@Beanpublic PropertyExtractingDelegatingItemWriter<People> writer(PeopleService peopleService) {PropertyExtractingDelegatingItemWriter<People> writer = new PropertyExtractingDelegatingItemWriter<>();writer.setTargetObject(peopleService);writer.setTargetMethod("printName");writer.setFieldsUsedAsTargetMethodArguments(new String[]{"name"});return writer;}}

执行结果

自定义ItemWriter

spring batch框架提供了丰富的ItemWriter组件,当这些默认的系统组件不能满足需求时,我们可以自己实现ItemWriter接口来完成需要的业务操作。自定义实现ItemWriter非常简单,只需要实现接口ItemWriter接口。只实现ItemWriter接口的写入器不支持重启,为了支持可重启的自定义ItemWriter需要新增实现接口ItemStream。

不可重启ItemWriter

接口定义

我们实现一个自己的写入器

然后使用这个写入器

@EnableBatchProcessing
@Configuration
public class MyItemWriterJobConf {@Beanpublic String runJob(JobLauncher jobLauncher, Job job) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {jobLauncher.run(job, new JobParametersBuilder().toJobParameters());return "";}@Beanpublic Job job(JobBuilderFactory jobBuilderFactory, Step step) {return jobBuilderFactory.get("my-item-writer-job").start(step).build();}@Beanpublic Step step(StepBuilderFactory stepBuilderFactory) {AtomicLong atomicLong = new AtomicLong();return stepBuilderFactory.get("my-item-writer-step").<People, People>chunk(3).reader(() -> atomicLong.get() > 5 ? null : new People(null, "name : " + atomicLong.getAndIncrement())).processor((Function<People, People>) item -> {System.out.println("process : " + item);return item;}).writer(new MyItemWriter<People>()).build();}}

我们写的写入器,在写包含4这个数据的时候,一定会抛出异常。

在不修改参数的情况下重启

它还是从0开始,从头开始。不能接着上次写入失败的记录或者chunk继续写入。

初次之外,如果写入器比较简单,也可以直接使用lambda表达式写

其结果是相同的

可重启ItemWriter

spring batch框架对job提供了可重启的能力,spring batch框架提供的写组件中,文件写入等没有事务管理的都实现了ItemStream接口。

和ItemReader不一样的是,ItemReader的系统组件基本上都实现了ItemStream接口,而ItemWriter仅有部分的系统组件实现了ItemStream接口。因为通常情况下如果写的资源本身是事务性的,那么单个写入失败,会导致真个事务失败,从而导致本批次写入失败。所以下次写入时,需要从开始重头写入。因此本身具有事务性的写操作不需要事先ItemStream就支持可重启的特性。

如果写操作本身是有状态的,为了支持可重启的特性必须实现ItemStream。

实现我们自己的可重启的写入器。

核心写入逻辑相同,遇到4的时候异常,但是需要用到我们保存的number


然后使用自定义的可重启的写入器

完整代码

@EnableBatchProcessing
@Configuration
public class RestartItemWriterJobConf {@Beanpublic String runJob(JobLauncher jobLauncher, Job job) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {jobLauncher.run(job, new JobParametersBuilder().toJobParameters());return "";}@Beanpublic Job job(JobBuilderFactory jobBuilderFactory, Step step) {return jobBuilderFactory.get("restartv-item-writer-job").start(step).build();}@Beanpublic Step step(StepBuilderFactory stepBuilderFactory) {AtomicLong atomicLong = new AtomicLong();return stepBuilderFactory.get("restart-item-writer-step").<People, People>chunk(3).reader(() -> atomicLong.get() > 5 ? null : new People(null, "name : " + atomicLong.getAndIncrement())).processor((Function<People, People>) item -> {System.out.println("process : " + item);return item;}).writer(new RestartItemWriter<People>()).build();}}

第一次执行异常

第二次执行

直接从上次失败的地方开始写入。核心实现还是我们的read方法如何处理从exectionContext中拿到的记录号。

拦截器

spring batch框架在ItemWriter执行阶段提供了拦截器,使得在ItemWriter执行前后能够加入自定义的业务逻辑。

接口

接口定义

实现自己的写入拦截器

然后使用自己的写入拦截器

运行结果

完整代码

@EnableBatchProcessing
@Configuration
public class MyItemWriterLisJobConf {@Beanpublic String runJob(JobLauncher jobLauncher, Job job) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {jobLauncher.run(job, new JobParametersBuilder().addLong("id", 2L).toJobParameters());return "";}@Beanpublic Job job(JobBuilderFactory jobBuilderFactory, Step step) {return jobBuilderFactory.get("my-item-writer-lis-job").start(step).build();}@Beanpublic Step step(StepBuilderFactory stepBuilderFactory) {AtomicLong atomicLong = new AtomicLong();return stepBuilderFactory.get("my-item-writer-lis-step").<People, People>chunk(3).reader(() -> atomicLong.get() > 5 ? null : new People(atomicLong.getAndIncrement(), "lis")).processor((Function<People, People>) item -> {System.out.println(" process : " + item);return item;}).writer(items -> {for (People people : items) {if (people.getId() == 4) {throw new RuntimeException("people id is 4");}System.out.println(" writer : " + people);}}).listener(new MyItemWriterLis()).build();}}

Annotation

除了实现接口,还可以使用注解定义拦截器:

  • @BeanforeWriter
  • @AfterWriter
  • @OnWriterError

比如

使用

结果

执行顺序

配置的多个ItemWriterListener,拦截器之间的执行顺序按照配置的顺序执行。beforeWriter方法和配置顺序完全相同,afterWriter方法和配置顺序完全相反。

onWriteError方法和配置顺序相同

拦截器异常

拦截器异常会导致整个Job异常,所以在执行自定义的拦截器的时候,需要考虑对拦截器发生的异常做处理,避免影响业务。

比如

执行结果

确实是因为我们抛出的异常导致失败

Job失败

Merge

spring batch框架提供了多处配置拦截器执行,可以在chunk配置,也可以在tasklet配置。而且基于step的抽象和继承,可以在子step中控制是否执行父step。

通过在子step中使用super调用父step的监听,就可以实现将父、子step的拦截器全部注册。

如果在子step中没有调用父step中注册拦截器的方法,那么父step中的拦截器就不会注册,也就不会执行。

spring batch item writer详解相关推荐

  1. Spring 集成与分片详解

    1.Spring集成与分片详解 1.1pom依赖 1.2application.properties 定义配置类和任务类中要用到的参数 1.3创建任务 创建任务类,加上@Component注解 1.4 ...

  2. Spring和Redis整合详解

    Spring和Redis整合详解 官方主页 Spring Spring Redis 概述 Redis是一个开源(BSD许可)的内存数据结构存储,用作数据库,缓存和消息代理. 简单来说,它是一个以(ke ...

  3. Spring和Email整合详解

    Spring和Email整合详解 官方主页 Spring Email 概述 Spring Mail API都在org.springframework.mail及其子包org.springframewo ...

  4. Spring Cloud限流详解(附源码)

    在高并发的应用中,限流往往是一个绕不开的话题.本文详细探讨在Spring Cloud中如何实现限流. 在 Zuul 上实现限流是个不错的选择,只需要编写一个过滤器就可以了,关键在于如何实现限流的算法. ...

  5. spring框架 AOP核心详解

    AOP称为面向切面编程,在程序开发中主要用来解决一些系统层面上的问题,比如日志,事务,权限等待,Struts2的拦截器设计就是基于AOP的思想,是个比较经典的例子. 一 AOP的基本概念 (1)Asp ...

  6. java处理请求的流程_Java Spring mvc请求处理流程详解

    Spring mvc请求处理流程详解 前言 spring mvc框架相信很多人都很熟悉了,关于这方面的资料也是一搜一大把.但是感觉讲的都不是很细致,让很多初学者都云里雾里的.本人也是这样,之前研究过, ...

  7. batchnomal_pytorch的batch normalize使用详解

    torch.nn.BatchNorm1d() 1.BatchNorm1d(num_features, eps = 1e-05, momentum=0.1, affine=True) 对于2d或3d输入 ...

  8. Spring Cloud限流详解(内含源码)

    为什么80%的码农都做不了架构师?>>>    原文:http://www.itmuch.com/spring-cloud-sum/spring-cloud-ratelimit/ 在 ...

  9. SpringBoot2.1.5(16)--- Spring Boot的日志详解

    SpringBoot2.1.5(16)--- Spring Boot的日志详解 市面上有许多的日志框架,比如 JUL( java.util.logging), JCL( Apache Commons ...

最新文章

  1. dataframe常用操作_【Data Mining】机器学习三剑客之Pandas常用算法总结上
  2. 企业办公自动化系统_OA系统的核心功能有哪些?分析当下OA系统的缺陷以及相关解决方案...
  3. 原生JS、jQuery 遍历方法总结
  4. 卷组删除pv_CentOS下删除一个卷组(VG)
  5. 心得复述知识体系:《强化学习》中的蒙特卡洛方法 Monte Carlo Methods in Reinforcement Learning
  6. Lucas定理(求组合数,例题FZU2020,HDU3944)
  7. Android Theme.AppCompat 和 ThemeOverlay.AppCompat
  8. B2B、B2C、BOS系统都指哪些?
  9. Excel图表配色原理
  10. 微信搜一搜中的智能问答技术
  11. linux shell鸟哥,Linux shell脚本全面学习(三)
  12. python图片截取斜四边形_opencv 截取任意四边形区域的图像
  13. 介绍一个产品的思维导图_产品运营怎么使用思维导图?四个方面阐释思维导图的运用...
  14. linux ll 命令的含义
  15. 【tflearn系列教程】(一)为什么要学tflearn?
  16. 恩墨学院举办OCM联盟活动BDA大数据联盟春季活动
  17. 区域卫生平台用户分析
  18. 贴片电容的X5R X7R是什么意思
  19. Windows64位环境下注册32位达梦odbc驱动
  20. Apple Final Cut Pro X 10.5 视频编辑软件 下载 百度网盘

热门文章

  1. Payroll工资单中英文对照明细
  2. 蓝海彤翔董事长鲁永泉荣获太湖科学城功能片区2022年度表彰
  3. MOT学习 - SORT算法
  4. ajax布林德,丹尼·布林德
  5. dma-buf 由浅入深(二) —— kmap / vmap
  6. 特别总账SGL特别总账科目配置和实操【FBKP/OBXR/OBYR】
  7. 年度总结|回顾 2021,展望 2022
  8. OSChina 周日乱弹 ——阿拉蕾:你为什么那么爱我
  9. 有效准备演讲的几小招
  10. LoadLibrary windows动态库加载