Spring Batch是一个轻量级的框架,完全面向Spring的批处理框架,用于企业级大量的数据读写处理系统。以POJO和Spring 框架为基础,包括日志记录/跟踪,事务管理、 作业处理统计工作重新启动、跳过、资源管理等功能。

       业务方案:

1、批处理定期提交。

2、并行批处理:并行处理工作。

3、企业消息驱动处理

4、大规模的并行处理

5、手动或是有计划的重启

6、局部处理:跳过记录(如:回滚)

      技术目标:

1、利用Spring编程模型:使程序员专注于业务处理,让Spring框架管理流程。

2、明确分离批处理的执行环境和应用。

3、提供核心的,共通的接口。

4、提供开箱即用(out of the box)的简单的默认的核心执行接口。

5、提供Spring框架中配置、自定义、和扩展服务。

6、所有存在的核心服务可以很容的被替换和扩展,不影响基础层。

7、提供一个简单的部署模式,利用Maven构建独立的Jar文件。

实现步骤:

1.定义处理对象

2.创建中间转换器 ***ItemProcessor 实现 ItemProcessor<I,O>接口

3.创建工作Job BatchConfiguration 主要处理读数据、处理数据、写数据等操作

4.创建listener job执行监听器

 

 

一般的批处理系统需要处理大量的数据, 内部消化单条记录失败的情况, 还要管理中断,在重启后也不去重复执行已经处理过的部分

 

 Spring Batch单/多处理单元(processors), 以及多个微线程(tasklets)

具体实现过程:

1.引入jar:

<!-- Spring-boot启动项目 --><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.3.1.RELEASE</version></parent><properties><java.version>1.8</java.version></properties><!--Spring batch核心包-><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency></dependencies>

2.配置application.properties :配置数据源

######mysql\u6570\u636E\u6E90#########
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.password= wbw123456
spring.datasource.url= jdbc:mysql://localhost/mydatabase
spring.datasource.username=root
<a target=_blank href="http://projects.spring.io/spring-batch/">点击打开链接</a>

3.新建实体类

package com.my.gs.batch.processing.domain;public class Person {//IDprivate Integer personId;//姓名private String personName;//年龄private String personAge;//性别private String personSex;public Person(){};public Person( String personName, String personAge,String personSex) {this.personName = personName;this.personAge = personAge;this.personSex = personSex;}public Integer getPersonId() {return personId;}public void setPersonId(Integer personId) {this.personId = personId;}public String getPersonName() {return personName;}public void setPersonName(String personName) {this.personName = personName;}public String getPersonAge() {return personAge;}public void setPersonAge(String personAge) {this.personAge = personAge;}public String getPersonSex() {return personSex;}public void setPersonSex(String personSex) {this.personSex = personSex;}}

4.中间转换器:

package com.my.gs.batch.processing.itemprocessor;import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;import com.my.gs.batch.processing.domain.Person;
/*** 中间转换器* @author wbw**/
public class PersonItemProcessor implements ItemProcessor<Person, Person> {//查询private static final String GET_PRODUCT = "select * from Person where personName = ?";private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);@Autowiredprivate JdbcTemplate jdbcTemplate;@Overridepublic Person process(final Person person) throws Exception {List<Person> personList = jdbcTemplate.query(GET_PRODUCT, new Object[] {person.getPersonName()}, new RowMapper<Person>() {@Overridepublic Person mapRow( ResultSet resultSet, int rowNum ) throws SQLException {Person p = new Person();p.setPersonName(resultSet.getString(1));p.setPersonAge(resultSet.getString(2));p.setPersonSex(resultSet.getString(3));return p;}});if(personList.size() >0){log.info("该数据已录入!!!");}String sex = null;if(person.getPersonSex().equals("0")){sex ="男";}else{sex ="女";}log.info("转换 (性别:"+person.getPersonSex()+") 为 (" + sex + ")");final Person transformedPerson = new Person(person.getPersonName(), person.getPersonAge(),sex);log.info("转换 (" + person + ") 为 (" + transformedPerson + ")");return transformedPerson;}}

5.处理具体工作业务  主要包含三个部分:读数据、处理数据、写数据

package com.my.gs.batch.processing.configuration;import javax.sql.DataSource;import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate;import com.my.gs.batch.processing.domain.Person;
import com.my.gs.batch.processing.itemprocessor.PersonItemProcessor;
/*** 处理具体工作业务  主要包含三个部分:读数据、处理数据、写数据* @author wbw**/
@Configuration
@EnableBatchProcessing
public class PersonBatchConfiguration {//插入语句private static final String PERSON_INSERT = "INSERT INTO Person (personName, personAge,personSex) VALUES (:personName, :personAge,:personSex)";public static final String Person_INSERT = "INSERT INTO Person (id, name,description,quantity) VALUES (:id, :name,:description,:quantity)";// tag::readerwriterprocessor[] 1.读数据@Beanpublic ItemReader<Person> reader() {FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();//加载外部文件数据 文件类型:CSVreader.setResource(new ClassPathResource("sample-data.csv"));reader.setLineMapper(new DefaultLineMapper<Person>() {{setLineTokenizer(new DelimitedLineTokenizer() {{setNames(new String[] { "personName","personAge","personSex" });}});setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{setTargetType(Person.class);}});}});return reader;}//2.处理数据@Beanpublic PersonItemProcessor processor() {return new PersonItemProcessor();}//3.写数据@Beanpublic ItemWriter<Person> writer(DataSource dataSource) {JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());writer.setSql(PERSON_INSERT);writer.setDataSource(dataSource);return writer;}// end::readerwriterprocessor[]// tag::jobstep[]@Beanpublic Job importUserJob(JobBuilderFactory jobs, @Qualifier("step1")Step s1, JobExecutionListener listener) {return jobs.get("importUserJob").incrementer(new RunIdIncrementer()).listener(listener).flow(s1).end().build();}@Beanpublic Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader,ItemWriter<Person> writer, ItemProcessor<Person, Person> processor) {return stepBuilderFactory.get("step1").<Person, Person> chunk(10).reader(reader).processor(processor).writer(writer).build();}// end::jobstep[]@Beanpublic JdbcTemplate jdbcTemplate(DataSource dataSource) {return new JdbcTemplate(dataSource);}}

6.监听器:用于处理任务执行之后和之前

<pre name="code" class="java">package com.my.gs.batch.processing.listener;import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;import com.my.gs.batch.processing.domain.Person;
/*** Job执行监听器* @author wbw**/
@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {private static final String PERSON_SQL = "SELECT personName, personAge,personSex FROM Person";private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);@Autowiredprivate  JdbcTemplate jdbcTemplate;@Autowiredpublic JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {if(this.jdbcTemplate==null){this.jdbcTemplate = jdbcTemplate;}}@Overridepublic void afterJob(JobExecution jobExecution) {if(jobExecution.getStatus() == BatchStatus.COMPLETED) {log.info("!!! JOB 执行完成!");List<Person> results = jdbcTemplate.query(PERSON_SQL, new RowMapper<Person>() {@Overridepublic Person mapRow(ResultSet rs, int row) throws SQLException {return new Person(rs.getString(1), rs.getString(2),rs.getString(3));}});log.info("入库条数---------"+results.size());for (Person person : results) {log.info("新增 <" + person.getPersonName() + "> 成功!!!!!");}}}/* (non-Javadoc)* @see org.springframework.batch.core.listener.JobExecutionListenerSupport#beforeJob(org.springframework.batch.core.JobExecution)*/@Overridepublic void beforeJob(JobExecution jobExecution) {// TODO Auto-generated method stubsuper.beforeJob(jobExecution);}}
7.新建csv文件
<img src="" alt="" />
</pre><pre code_snippet_id="1563739" snippet_file_name="blog_20160124_10_8470765" name="code" class="java">8.启动执行
<pre name="code" class="java">package com.my.batch;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Application {public static void main(String[] args) throws Exception {SpringApplication.run(Application.class, args);}
}

更多相关资源:http://projects.spring.io/spring-batch/

Spring Boot 集成 批处理框架Spring batch相关推荐

  1. Spring Boot集成Quartz注入Spring管理的类

    摘要: 在Spring Boot中使用Quartz时,在JOB中一般需要引用Spring管理的Bean,通过定义Job Factory实现自动注入. Spring有自己的Schedule定时任务,在S ...

  2. spring Boot 2 运维篇(spring boot程序的打包与部署,多环境开发配置文件的配置,spring boot集成日志框架)

    目录 1.SpringBoot程序的打包与运行 程序打包 程序运行 SpringBoot程序打包失败处理 命令行启动常见问题及解决方案 SpringBoot项目快速启动(Linux版) 2.配置高级 ...

  3. spring boot—集成log4j2日志框架

    文章目录 市场上的日志框架 spring boot日志框架关系 移除默认日志框架 切换为log4j2日志框架 市场上的日志框架   1)日志门面最常用的是slf4j   2)日志实现最常用的是logb ...

  4. Spring Boot集成Jasypt安全框架

    Jasypt安全框架提供了Spring的集成,主要是实现 PlaceholderConfigurerSupport类或者其子类. 在Sring 3.1之后,则推荐使用PropertySourcesPl ...

  5. 批处理框架 Spring Batch,数据迁移量过大如何保证内存?

    点击关注公众号,实用技术文章及时了解 来源:blog.csdn.net/topdeveloperr/ article/details/88843186 概述 本篇博客是记录使用spring batch ...

  6. spring boot 集成框架事例

    因为很多人再问要一个搭好的框架实例,这里我补上, 首先,需要安装一个maven,因为项目是基于maven 来创建的. 其次,项目是一个基本的框架,不涉及任何的项目逻辑,代码是本人自己写的绝对原创. 最 ...

  7. 蚂蚁金服开源增强版 Spring Boot 的研发框架!

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 来源:gitee.com/sofastack/sofa-b ...

  8. 蚂蚁金服开源增强版Spring Boot 的研发框架!

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! SOFABoot 是蚂蚁金服开源的基于 Spring Bo ...

  9. 6.3 Spring Boot集成mongodb开发

    6.3 Spring Boot集成mongodb开发 本章我们通过SpringBoot集成mongodb,Java,Kotlin开发一个极简社区文章博客系统. 0 mongodb简介 Mongo 的主 ...

最新文章

  1. 神州易桥财税 java项目经理_【高级项目经理(体系运营)职责】2021年燕园财税高级项目经理(体系运营)岗位职责-看准网...
  2. 当你感到学习困难的时候,你在走上坡路!
  3. linux防火墙没看3306访问不,Linux配置防火墙,开启80端口、3306端口
  4. Python FastApi:快速建立docker容器/挂载共享文件夹/导入导出
  5. 现代软件工程 作业 最后一周总结
  6. 解决linux下QtCreator无法输入中文的情况
  7. html 中avi视频插件,JDG让一追二击败V5!Kanavi降维打击,逆版本选英雄不按常理出牌...
  8. flask-文件上传/下载
  9. 免费python编程自学网站-免费学习Python编程的3个优秀的网站资源
  10. linux类似everything的软件,安装linux下强大的文件工具fsearch,与windows下Everything类似...
  11. Manjaro 安装xmind 8
  12. Ecshop3.x漏洞复现
  13. android 本地数据库持久化框架,android数据库持久化框架, ormlite框架,
  14. 共享充电线项目市场分析报告
  15. 锚点的作用是什么?如何创建锚点?
  16. Poster Design
  17. Day_04 传智健康项目-预约管理-套餐管理
  18. 【进阶实战】用PaddlePaddle检测空气质量
  19. matlab中(:)的部分使用
  20. NOIP 2012 国王游戏

热门文章

  1. Xilinx SDx 2018.3安装
  2. 剑侠世界职业优缺点简介
  3. 一文带你了解,色环电阻失效相关知识
  4. 微博创作者网址及申请条件,微博创作者收益
  5. 大唐之路(4)一号店入驻要求
  6. Consul作为配置中心,配置Asp.Net Core应用程序 依据key/value动态更新
  7. 融合注意力机制和Bi-LSTM的旅游评价情感分析模型
  8. 软碟通 (UltraISO)制作启动盘
  9. 云超融合数据中心 CloudFabric
  10. 关于opencv fitLine直线拟合得斜率及截距