大数据集的处理是软件世界中最重要的问题之一。 Spring Batch是一个轻量级且强大的批处理框架,用于处理数据集。

Spring Batch Framework提供了“面向TaskletStep”和“面向块”的处理风格。 在本文中,将解释面向块的处理模型。 此外,绝对建议在Spring Batch中使用面向TaskletStep的处理在本文中,绝对建议您研究如何在Spring Batch中开发面向TaskletStep的处理。

Spring Batch v2.0附带了面向块的处理功能。 它是指一次读取一个数据,并在事务边界内创建要写出的“块”。 从ItemReader读取一项,并将其交给ItemProcessor并写入。 一旦读取的项目数等于提交间隔,就通过ItemWriter写入整个块,然后提交事务。

基本上,如果需要至少一个数据项的读写,则应使用此功能。 否则,如果只需要读取或写入数据项,则可以使用面向TaskletStep的处理。

面向块的处理模型通过org.springframework.batch.item包公开了三个重要的接口,分别是ItemReaderItemProcessorItemWriter

  • ItemReader:此接口用于提供数据。 它读取将要处理的数据。
  • ItemProcessor:此接口用于项目转换。 它处理输入对象并转换为输出对象。
  • ItemWriter:此接口用于常规输出操作。 它写入由ItemProcessor转换的数据。 例如,可以将数据写入数据库,内存或输出流(等)。 在此示例应用程序中,我们将写入数据库。

让我们看一下如何开发面向块的处理模型。

二手技术:

  • JDK 1.7.0_09
  • Spring3.1.3
  • Spring批次2.1.9
  • 休眠4.1.8
  • Tomcat JDBC 7.0.27
  • MySQL 5.5.8
  • MySQL连接器5.1.17
  • Maven的3.0.4

步骤1:建立已完成的专案

创建一个Maven项目,如下所示。 (可以使用Maven或IDE插件来创建它)。

步骤2:图书馆

通过执行以下脚本来创建新的用户表:

CREATE TABLE ONLINETECHVISION.USER (id int(11) NOT NULL AUTO_INCREMENT,name varchar(45) NOT NULL,surname varchar(45) NOT NULL,PRIMARY KEY (`id`)
);

步骤3:图书馆

首先,将依赖项添加到Maven的pom.xml中。

<properties><spring.version>3.1.3.RELEASE</spring.version><spring-batch.version>2.1.9.RELEASE</spring-batch.version></properties><dependencies><dependency><groupId>org.springframework</groupId><artifactId>spring-core</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>${spring.version}</version></dependency>    <dependency><groupId>org.springframework</groupId><artifactId>spring-tx</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-orm</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework.batch</groupId><artifactId>spring-batch-core</artifactId><version>${spring-batch.version}</version></dependency><!-- Hibernate dependencies --><dependency><groupId>org.hibernate</groupId><artifactId>hibernate-core</artifactId><version>4.1.8.Final</version></dependency><!-- Tomcat DBCP --><dependency><groupId>org.apache.tomcat</groupId><artifactId>tomcat-jdbc</artifactId><version>7.0.27</version></dependency><!-- MySQL Java Connector library --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.17</version></dependency><!-- Log4j library --><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.16</version></dependency></dependencies>

maven-compiler-plugin (Maven插件)用于使用JDK 1.7编译项目

<plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.0</version><configuration><source>1.7</source><target>1.7</target></configuration></plugin>

以下Maven插件可用于创建runnable-jar

<plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.0</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><configuration><source>1.7</source><target>1.7</target></configuration><transformers><transformerimplementation='org.apache.maven.plugins.shade.resource.
ManifestResourceTransformer'><mainClass>com.onlinetechvision.exe.Application</mainClass></transformer><transformerimplementation='org.apache.maven.plugins.shade.resource.
AppendingTransformer'><resource>META-INF/spring.handlers</resource></transformer><transformerimplementation='org.apache.maven.plugins.shade.resource.
AppendingTransformer'><resource>META-INF/spring.schemas</resource></transformer></transformers></configuration></execution></executions></plugin>

步骤4:建立使用者实体

用户实体已创建。 该实体将在处理后存储。

package com.onlinetechvision.user;import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;/*** User Entity** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
@Entity
@Table(name='USER')
public class User {private int id;private String name;private String surname;@Id@GeneratedValue(strategy=GenerationType.AUTO)@Column(name='ID', unique = true, nullable = false)public int getId() {return id;}public void setId(int id) {this.id = id;}@Column(name='NAME', unique = true, nullable = false)public String getName() {return name;}public void setName(String name) {this.name = name;}@Column(name='SURNAME', unique = true, nullable = false)public String getSurname() {return surname;}public void setSurname(String surname) {this.surname = surname;}   @Overridepublic String toString() {StringBuffer strBuff = new StringBuffer();strBuff.append('id : ').append(getId());strBuff.append(', name : ').append(getName());strBuff.append(', surname : ').append(getSurname());return strBuff.toString();}
}

步骤5:建立IUserDAO介面

创建IUserDAO接口以公开数据访问功能。

package com.onlinetechvision.user.dao;import java.util.List;import com.onlinetechvision.user.User;/*** User DAO Interface** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
public interface IUserDAO {/*** Adds User** @param  User user*/void addUser(User user);/*** Gets User List**/List<User> getUsers();
}

步骤6:建立UserDAO IMPL

通过实现IUserDAO接口创建UserDAO类。

package com.onlinetechvision.user.dao;import java.util.List;import org.hibernate.SessionFactory;import com.onlinetechvision.user.User;/*** User DAO** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
public class UserDAO implements IUserDAO {private SessionFactory sessionFactory;/*** Gets Hibernate Session Factory** @return SessionFactory - Hibernate Session Factory*/public SessionFactory getSessionFactory() {return sessionFactory;}/*** Sets Hibernate Session Factory** @param SessionFactory - Hibernate Session Factory*/public void setSessionFactory(SessionFactory sessionFactory) {this.sessionFactory = sessionFactory;}/*** Adds User** @param  User user*/@Overridepublic void addUser(User user) {getSessionFactory().getCurrentSession().save(user);}/*** Gets User List** @return List - User list*/@SuppressWarnings({ 'unchecked' })@Overridepublic List<User> getUsers() {List<User> list = getSessionFactory().getCurrentSession().createQuery('from User').list();return list;}}

步骤7:建立IUserService介面

为服务层创建了IUserService接口。

package com.onlinetechvision.user.service;import java.util.List;import com.onlinetechvision.user.User;/**** User Service Interface** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
public interface IUserService {/*** Adds User** @param  User user*/void addUser(User user);/*** Gets User List** @return List - User list*/List<User> getUsers();
}

步骤8:创建UserService IMPL

通过实现IUserService接口创建UserService类。

package com.onlinetechvision.user.service;import java.util.List;import org.springframework.transaction.annotation.Transactional;import com.onlinetechvision.user.User;
import com.onlinetechvision.user.dao.IUserDAO;/**** User Service** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
@Transactional(readOnly = true)
public class UserService implements IUserService {IUserDAO userDAO;/*** Adds User** @param  User user*/@Transactional(readOnly = false)@Overridepublic void addUser(User user) {getUserDAO().addUser(user);}/*** Gets User List**/@Overridepublic List<User> getUsers() {return getUserDAO().getUsers();}public IUserDAO getUserDAO() {return userDAO;}public void setUserDAO(IUserDAO userDAO) {this.userDAO = userDAO;}
}

步骤9:建立TestReader IMPL

TestReader类是通过实现ItemReader接口创建的。 调用该类是为了读取项目。 当read方法返回null时,读取操作完成。 以下步骤详细说明了如何执行firstJob。

firstjob的commit-interval值为2,并执行以下步骤:

1)调用firstTestReader以读取第一项(firstname_0,firstsurname_0)
2)再次调用firstTestReader以读取第二个项目(firstname_1,firstsurname_1)
3)调用testProcessor处理第一项(FIRSTNAME_0,FIRSTSURNAME_0) 4)调用testProcessor处理第二个项目(FIRSTNAME_1,FIRSTSURNAME_1) 5)调用testWriter将第一项(FIRSTNAME_0,FIRSTSURNAME_0)写入数据库 6)调用testWriter将第二项(FIRSTNAME_1,FIRSTSURNAME_1)写入数据库 7)提交第一项和第二项,并且关闭交易。 调用firstTestReader以读取第三项(firstname_2,firstsurname_2) 9)firstTestReader的maxIndex值为3。read方法返回null,并且项读取操作完成。 10)调用testProcessor处理第三项(FIRSTNAME_2,FIRSTSURNAME_2) 11)调用testWriter将第一项(FIRSTNAME_2,FIRSTSURNAME_2)写入数据库 12)第三项已提交,交易已关闭。

第一步已完成,状态为“已完成”,第二步已开始。 secondJob和thirdJob以相同的方式执行。

package com.onlinetechvision.item;import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;import com.onlinetechvision.user.User;/*** TestReader Class is created to read items which will be processed** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
public class TestReader implements ItemReader<User> {private int index;private int maxIndex;private String namePrefix;private String surnamePrefix;/*** Reads items one by one** @return User** @throws Exception* @throws UnexpectedInputException* @throws ParseException* @throws NonTransientResourceException**/@Overridepublic User read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {User user = new User();user.setName(getNamePrefix() + '_' + index);user.setSurname(getSurnamePrefix() + '_' + index);if(index > getMaxIndex()) {return null;}incrementIndex();return user;}/*** Increments index which defines read-count** @return int**/private int incrementIndex() {return index++;}public int getMaxIndex() {return maxIndex;}public void setMaxIndex(int maxIndex) {this.maxIndex = maxIndex;}public String getNamePrefix() {return namePrefix;}public void setNamePrefix(String namePrefix) {this.namePrefix = namePrefix;}public String getSurnamePrefix() {return surnamePrefix;}public void setSurnamePrefix(String surnamePrefix) {this.surnamePrefix = surnamePrefix;}}

步骤10:创建FailedCaseTestReader IMPL

创建FailedCaseTestReader类以模拟失败的作业状态。 在此示例应用程序中,当在thirdStep处理thirdJob时,将调用failedCaseTestReader并引发异常,因此其状态将为FAILED。

package com.onlinetechvision.item;import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;import com.onlinetechvision.user.User;/*** FailedCaseTestReader Class is created in order to simulate the failed job status.** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
public class FailedCaseTestReader implements ItemReader<User> {private int index;private int maxIndex;private String namePrefix;private String surnamePrefix;/*** Reads items one by one** @return User** @throws Exception* @throws UnexpectedInputException* @throws ParseException* @throws NonTransientResourceException**/@Overridepublic User read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {User user = new User();user.setName(getNamePrefix() + '_' + index);user.setSurname(getSurnamePrefix() + '_' + index);if(index >= getMaxIndex()) {throw new Exception('Unexpected Error!');}incrementIndex();return user;}/*** Increments index which defines read-count** @return int**/private int incrementIndex() {return index++;}public int getMaxIndex() {return maxIndex;}public void setMaxIndex(int maxIndex) {this.maxIndex = maxIndex;}public String getNamePrefix() {return namePrefix;}public void setNamePrefix(String namePrefix) {this.namePrefix = namePrefix;}public String getSurnamePrefix() {return surnamePrefix;}public void setSurnamePrefix(String surnamePrefix) {this.surnamePrefix = surnamePrefix;}}

步骤11:创建TestProcessor IMPL

通过实现ItemProcessor接口来创建TestProcessor类。 此类称为处理项目。 从TestReader接收用户项,对其进行处理并返回给TestWriter。

package com.onlinetechvision.item;import java.util.Locale;import org.springframework.batch.item.ItemProcessor;import com.onlinetechvision.user.User;/*** TestProcessor Class is created to process items.** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
public class TestProcessor implements ItemProcessor<User, User>  {/*** Processes items one by one** @param User user* @return User* @throws Exception**/@Overridepublic User process(User user) throws Exception {user.setName(user.getName().toUpperCase(Locale.ENGLISH));user.setSurname(user.getSurname().toUpperCase(Locale.ENGLISH));return user;}}

步骤12:建立TestWriter IMPL

TestWriter类是通过实现ItemWriter接口创建的。 此类称为将项目写入DB,内存等…

package com.onlinetechvision.item;import java.util.List;import org.springframework.batch.item.ItemWriter;import com.onlinetechvision.user.User;
import com.onlinetechvision.user.service.IUserService;/*** TestWriter Class is created to write items to DB, memory etc...** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
public class TestWriter implements ItemWriter<User> {private IUserService userService;/*** Writes items via list** @throws Exception**/@Overridepublic void write(List<? extends User> userList) throws Exception {for(User user : userList) {getUserService().addUser(user);}System.out.println('User List : ' + getUserService().getUsers());}public IUserService getUserService() {return userService;}public void setUserService(IUserService userService) {this.userService = userService;}}

步骤13:创建失败的StepTasklet类

通过实现Tasklet接口创建FailedStepTasklet 。 它说明了失败步骤中的业务逻辑。

package com.onlinetechvision.tasklet;import org.apache.log4j.Logger;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;/*** FailedStepTasklet Class illustrates a failed job.** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
public class FailedStepTasklet implements Tasklet {private static final Logger logger = Logger.getLogger(FailedStepTasklet.class);private String taskResult;/*** Executes FailedStepTasklet** @param StepContribution stepContribution* @param ChunkContext chunkContext* @return RepeatStatus* @throws Exception**/public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {logger.debug('Task Result : ' + getTaskResult());throw new Exception('Error occurred!');}public String getTaskResult() {return taskResult;}public void setTaskResult(String taskResult) {this.taskResult = taskResult;} }

步骤14:创建BatchProcessStarter类

创建BatchProcessStarter类以启动作业。 此外,它记录他们的执行结果。

package com.onlinetechvision.spring.batch;import org.apache.log4j.Logger;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;/*** BatchProcessStarter Class launches the jobs and logs their execution results.** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
public class BatchProcessStarter {private static final Logger logger = Logger.getLogger(BatchProcessStarter.class);private Job firstJob;private Job secondJob;private Job thirdJob;private JobLauncher jobLauncher;private JobRepository jobRepository;/*** Starts the jobs and logs their execution results.**/public void start() {JobExecution jobExecution = null;JobParametersBuilder builder = new JobParametersBuilder();try {getJobLauncher().run(getFirstJob(), builder.toJobParameters());jobExecution = getJobRepository().getLastJobExecution(getFirstJob().getName(), builder.toJobParameters());logger.debug(jobExecution.toString());            getJobLauncher().run(getSecondJob(), builder.toJobParameters());jobExecution = getJobRepository().getLastJobExecution(getSecondJob().getName(), builder.toJobParameters());logger.debug(jobExecution.toString());getJobLauncher().run(getThirdJob(), builder.toJobParameters());jobExecution = getJobRepository().getLastJobExecution(getThirdJob().getName(), builder.toJobParameters());logger.debug(jobExecution.toString());} catch (JobExecutionAlreadyRunningException| JobRestartException| JobInstanceAlreadyCompleteException| JobParametersInvalidException e) {logger.error(e);}}  public Job getFirstJob() {return firstJob;}public void setFirstJob(Job firstJob) {this.firstJob = firstJob;}public Job getSecondJob() {return secondJob;}public void setSecondJob(Job secondJob) {this.secondJob = secondJob;}    public Job getThirdJob() {return thirdJob;}public void setThirdJob(Job thirdJob) {this.thirdJob = thirdJob;}public JobLauncher getJobLauncher() {return jobLauncher;}public void setJobLauncher(JobLauncher jobLauncher) {this.jobLauncher = jobLauncher;}public JobRepository getJobRepository() {return jobRepository;}public void setJobRepository(JobRepository jobRepository) {this.jobRepository = jobRepository;} }

步骤15:创建dataContext.xml

jdbc.properties已创建。 它定义数据源信息,并通过dataContext.xml读取

jdbc.db.driverClassName=com.mysql.jdbc.Driver
jdbc.db.url=jdbc:mysql://localhost:3306/onlinetechvision
jdbc.db.username=root
jdbc.db.password=root
jdbc.db.initialSize=10
jdbc.db.minIdle=3
jdbc.db.maxIdle=10
jdbc.db.maxActive=10
jdbc.db.testWhileIdle=true
jdbc.db.testOnBorrow=true
jdbc.db.testOnReturn=true
jdbc.db.initSQL=SELECT 1 FROM DUAL
jdbc.db.validationQuery=SELECT 1 FROM DUAL
jdbc.db.timeBetweenEvictionRunsMillis=30000

步骤16:创建dataContext.xml

Spring配置文件dataContext.xml已创建。 它涵盖了dataSource,sessionFactory和transactionManager定义。

<?xml version='1.0' encoding='UTF-8'?>
<beans xmlns='http://www.springframework.org/schema/beans'xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'xmlns:context='http://www.springframework.org/schema/context'xmlns:p='http://www.springframework.org/schema/p'xmlns:batch='http://www.springframework.org/schema/batch'xmlns:tx='http://www.springframework.org/schema/tx'xsi:schemaLocation='http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-3.0.xsdhttp://www.springframework.org/schema/batchhttp://www.springframework.org/schema/batch/spring-batch-2.1.xsdhttp://www.springframework.org/schema/txhttp://www.springframework.org/schema/tx/spring-tx-3.0.xsd'><context:property-placeholder location='classpath:jdbc.properties'/><!-- Enable the configuration of transactional behavior based on annotations --><tx:annotation-driven transaction-manager='transactionManager'/><!-- Data Source Declaration --><bean id='dataSource' class='org.apache.tomcat.jdbc.pool.DataSource' destroy-method='close'p:driverClassName='${jdbc.db.driverClassName}'p:url='${jdbc.db.url}'p:username='${jdbc.db.username}'p:password='${jdbc.db.password}'p:initialSize='${jdbc.db.initialSize}'p:minIdle='${jdbc.db.minIdle}'p:maxIdle='${jdbc.db.maxIdle}'p:maxActive='${jdbc.db.maxActive}'p:testWhileIdle='${jdbc.db.testWhileIdle}'p:testOnBorrow='${jdbc.db.testOnBorrow}'p:testOnReturn='${jdbc.db.testOnReturn}'p:initSQL='${jdbc.db.initSQL}'p:validationQuery='${jdbc.db.validationQuery}'p:timeBetweenEvictionRunsMillis='${jdbc.db.timeBetweenEvictionRunsMillis}'/>    <!-- Session Factory Declaration --><bean id='sessionFactory' class='org.springframework.orm.hibernate4.LocalSessionFactoryBean'><property name='dataSource' ref='dataSource' /><property name='annotatedClasses'><list><value>com.onlinetechvision.user.User</value></list></property><property name='hibernateProperties'><props><prop key='hibernate.dialect'>org.hibernate.dialect.MySQLDialect</prop><prop key='hibernate.show_sql'>true</prop></props></property></bean><!-- Transaction Manager Declaration --><bean id='transactionManager' class='org.springframework.orm.hibernate4.HibernateTransactionManager'><property name='sessionFactory' ref='sessionFactory'/></bean></beans>

步骤17:创建jobContext.xml

Spring配置文件jobContext.xml已创建。 它涵盖jobRepository,jobLauncher,项目读取器,项目处理器,项目编写器,tasklet和作业定义。

<?xml version='1.0' encoding='UTF-8'?>
<beans xmlns='http://www.springframework.org/schema/beans'xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'xmlns:batch='http://www.springframework.org/schema/batch'xsi:schemaLocation='http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/batchhttp://www.springframework.org/schema/batch/spring-batch-2.1.xsd'><import resource='dataContext.xml'/><!-- jobRepository Declaration --><bean id='jobRepository' class='org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean'><property name='transactionManager' ref='transactionManager' /></bean><!-- jobLauncher Declaration --><bean id='jobLauncher' class='org.springframework.batch.core.launch.support.SimpleJobLauncher' ><property name='jobRepository' ref='jobRepository'/></bean><!-- Reader Bean Declarations --><bean id='firstTestReader' class='com.onlinetechvision.item.TestReader'><property name='maxIndex' value='2'/><property name='namePrefix' value='firstname'/><property name='surnamePrefix' value='firstsurname'/></bean><bean id='secondTestReader' class='com.onlinetechvision.item.TestReader'><property name='maxIndex' value='2'/><property name='namePrefix' value='secondname'/><property name='surnamePrefix' value='secondsurname'/></bean><bean id='thirdTestReader' class='com.onlinetechvision.item.TestReader'><property name='maxIndex' value='3'/><property name='namePrefix' value='thirdname'/><property name='surnamePrefix' value='thirdsurname'/></bean><bean id='fourthTestReader' class='com.onlinetechvision.item.TestReader'><property name='maxIndex' value='3'/><property name='namePrefix' value='fourthname'/><property name='surnamePrefix' value='fourthsurname'/></bean><bean id='fifthTestReader' class='com.onlinetechvision.item.TestReader'><property name='maxIndex' value='3'/><property name='namePrefix' value='fifthname'/><property name='surnamePrefix' value='fifthsurname'/></bean><bean id='failedCaseTestReader' class='com.onlinetechvision.item.FailedCaseTestReader'><property name='maxIndex' value='1'/><property name='namePrefix' value='failedcasename'/><property name='surnamePrefix' value='failedcasesurname'/></bean><!-- Processor Bean Declaration --><bean id='testProcessor' class='com.onlinetechvision.item.TestProcessor' /><!-- Writer Bean Declaration --><bean id='testWriter' class='com.onlinetechvision.item.TestWriter' ><property name='userService' ref='userService'/></bean><!-- Failed Step Tasklet Declaration --><bean id='failedStepTasklet' class='com.onlinetechvision.tasklet.FailedStepTasklet'><property name='taskResult'  value='Error occurred!' /></bean> <!-- Batch Job Declarations --><batch:job id='firstJob'><batch:step id='firstStep' next='secondStep'><batch:tasklet><batch:chunk reader='firstTestReader' processor='testProcessor' writer='testWriter' commit-interval='2'/></batch:tasklet></batch:step><batch:step id='secondStep'><batch:tasklet><batch:chunk reader='secondTestReader' processor='testProcessor' writer='testWriter' commit-interval='2'/></batch:tasklet></batch:step></batch:job><batch:job id='secondJob'><batch:step id='thirdStep'><batch:tasklet><batch:chunk reader='thirdTestReader' processor='testProcessor' writer='testWriter' commit-interval='2'/></batch:tasklet><batch:next on='*' to='fourthStep' /><batch:next on='FAILED' to='firstFailedStep' /></batch:step><batch:step id='fourthStep'><batch:tasklet><batch:chunk reader='fourthTestReader' processor='testProcessor' writer='testWriter' commit-interval='2'/></batch:tasklet></batch:step><batch:step id='firstFailedStep'><batch:tasklet ref='failedStepTasklet' /></batch:step></batch:job><batch:job id='thirdJob'><batch:step id='fifthStep'><batch:tasklet><batch:chunk reader='failedCaseTestReader' processor='testProcessor' writer='testWriter' commit-interval='2'/></batch:tasklet><batch:next on='*' to='sixthStep' /><batch:next on='FAILED' to='secondFailedStep' /></batch:step><batch:step id='sixthStep'><batch:tasklet><batch:chunk reader='fifthTestReader' processor='testProcessor' writer='testWriter' commit-interval='2'/></batch:tasklet></batch:step><batch:step id='secondFailedStep'><batch:tasklet ref='failedStepTasklet' /></batch:step></batch:job></beans>

步骤18:创建applicationContext.xml

Spring配置文件applicationContext.xml已创建。 它涵盖了bean的定义。

<?xml version='1.0' encoding='UTF-8'?>
<beans xmlns='http://www.springframework.org/schema/beans'xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'xmlns:batch='http://www.springframework.org/schema/batch'xsi:schemaLocation='http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/batchhttp://www.springframework.org/schema/batch/spring-batch-2.1.xsd'><import resource='jobContext.xml'/><!-- User DAO Declaration --><bean id='userDAO' class='com.onlinetechvision.user.dao.UserDAO'><property name='sessionFactory' ref='sessionFactory' /></bean><!-- User Service Declaration --><bean id='userService' class='com.onlinetechvision.user.service.UserService'><property name='userDAO' ref='userDAO' /></bean>   <!-- BatchProcessStarter Declaration --><bean id='batchProcessStarter' class='com.onlinetechvision.spring.batch.BatchProcessStarter'><property name='jobLauncher' ref='jobLauncher'/><property name='jobRepository' ref='jobRepository'/><property name='firstJob' ref='firstJob'/><property name='secondJob' ref='secondJob'/><property name='thirdJob' ref='thirdJob'/></bean> </beans>

步骤19:创建应用程序类

创建应用程序类以运行应用程序。

package com.onlinetechvision.exe;import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;import com.onlinetechvision.spring.batch.BatchProcessStarter;/*** Application Class starts the application.** @author onlinetechvision.com* @since 10 Dec 2012* @version 1.0.0**/
public class Application {/*** Starts the application** @param  String[] args**/public static void main(String[] args) {ApplicationContext appContext = new ClassPathXmlApplicationContext('applicationContext.xml');BatchProcessStarter batchProcessStarter = (BatchProcessStarter)appContext.getBean('batchProcessStarter');batchProcessStarter.start();}}

步骤20:建立专案

构建OTV_SpringBatch_Chunk_Oriented_Processing项目之后,将创建OTV_SpringBatch_Chunk_Oriented_Processing-0.0.1-SNAPSHOT.jar

步骤21:运行项目

运行创建的OTV_SpringBatch_Chunk_Oriented_Processing-0.0.1-SNAPSHOT.jar文件后,将显示以下数据库和控制台输出日志:

数据库截图:

First Job的控制台输出:

16.12.2012 19:30:41  INFO (SimpleJobLauncher.java:118) - Job: [FlowJob: [name=firstJob]] launched with the following parameters: [{}]16.12.2012 19:30:41 DEBUG (AbstractJob.java:278) - Job execution starting: JobExecution: id=0, version=0, startTime=null, endTime=null, lastUpdated=Sun Dec 16 19:30:41 GMT 2012, status=STARTING, exitStatus=exitCode=UNKNOWN;exitDescription=, job=[JobInstance: id=0, version=0, JobParameters=[{}], Job=[firstJob]]User List : [id : 181, name : FIRSTNAME_0, surname : FIRSTSURNAME_0, id : 182, name : FIRSTNAME_1, surname : FIRSTSURNAME_1, id : 183, name : FIRSTNAME_2, surname : FIRSTSURNAME_2, id : 184, name : SECONDNAME_0, surname : SECONDSURNAME_0, id : 185, name : SECONDNAME_1, surname : SECONDSURNAME_1, id : 186, name : SECONDNAME_2, surname : SECONDSURNAME_2]16.12.2012 19:30:42 DEBUG (BatchProcessStarter.java:43) - JobExecution: id=0, version=2, startTime=Sun Dec 16 19:30:41 GMT 2012, endTime=Sun Dec 16 19:30:42 GMT 2012, lastUpdated=Sun Dec 16 19:30:42 GMT 2012, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=0, version=0, JobParameters=[{}], Job=[firstJob]]

Second Job的控制台输出:

16.12.2012 19:30:42  INFO (SimpleJobLauncher.java:118) - Job: [FlowJob: [name=secondJob]] launched with the following parameters: [{}]16.12.2012 19:30:42 DEBUG (AbstractJob.java:278) - Job execution starting: JobExecution: id=1, version=0, startTime=null, endTime=null, lastUpdated=Sun Dec 16 19:30:42 GMT 2012, status=STARTING, exitStatus=exitCode=UNKNOWN;exitDescription=, job=[JobInstance: id=1, version=0, JobParameters=[{}], Job=[secondJob]]User List : [id : 181, name : FIRSTNAME_0, surname : FIRSTSURNAME_0, id : 182, name : FIRSTNAME_1, surname : FIRSTSURNAME_1, id : 183, name : FIRSTNAME_2, surname : FIRSTSURNAME_2, id : 184, name : SECONDNAME_0, surname : SECONDSURNAME_0, id : 185, name : SECONDNAME_1, surname : SECONDSURNAME_1, id : 186, name : SECONDNAME_2, surname : SECONDSURNAME_2, id : 187, name : THIRDNAME_0, surname : THIRDSURNAME_0, id : 188, name : THIRDNAME_1, surname : THIRDSURNAME_1, id : 189, name : THIRDNAME_2, surname : THIRDSURNAME_2, id : 190, name : THIRDNAME_3, surname : THIRDSURNAME_3, id : 191, name : FOURTHNAME_0, surname : FOURTHSURNAME_0, id : 192, name : FOURTHNAME_1, surname : FOURTHSURNAME_1, id : 193, name : FOURTHNAME_2, surname : FOURTHSURNAME_2, id : 194, name : FOURTHNAME_3, surname : FOURTHSURNAME_3]16.12.2012 19:30:42 DEBUG (BatchProcessStarter.java:47) - JobExecution: id=1, version=2, startTime=Sun Dec 16 19:30:42 GMT 2012, endTime=Sun Dec 16 19:30:42 GMT 2012, lastUpdated=Sun Dec 16 19:30:42 GMT 2012, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=1, version=0, JobParameters=[{}], Job=[secondJob]]

Third Job的控制台输出:

16.12.2012 19:30:42  INFO (SimpleJobLauncher.java:118) - Job: [FlowJob: [name=thirdJob]] launched with the following parameters: [{}]16.12.2012 19:30:42 DEBUG (AbstractJob.java:278) - Job execution starting: JobExecution: id=2, version=0, startTime=null, endTime=null, lastUpdated=Sun Dec 16 19:30:42 GMT 2012, status=STARTING, exitStatus=exitCode=UNKNOWN;exitDescription=, job=[JobInstance: id=2, version=0, JobParameters=[{}], Job=[thirdJob]]16.12.2012 19:30:42 DEBUG (TransactionTemplate.java:159) - Initiating transaction rollback on application exception
org.springframework.batch.repeat.RepeatException: Exception in batch process; nested exception is java.lang.Exception: Unexpected Error!
...16.12.2012 19:30:43 DEBUG (BatchProcessStarter.java:51) - JobExecution: id=2, version=2, startTime=Sun Dec 16 19:30:42 GMT 2012, endTime=Sun Dec 16 19:30:43 GMT 2012, lastUpdated=Sun Dec 16 19:30:43 GMT 2012, status=FAILED, exitStatus=exitCode=FAILED;exitDescription=, job=[JobInstance: id=2, version=0, JobParameters=[{}], Job=[thirdJob]]

步骤22:下载
https://github.com/erenavsarogullari/OTV_SpringBatch_Chunk_Oriented_Processing

资源:

Spring Batch中的块处理

参考: Online Technology Vision博客中来自我们JCG合作伙伴 Eren Avsarogullari的Spring Batch中面向块的处理 。

翻译自: https://www.javacodegeeks.com/2012/12/chunk-oriented-processing-in-spring-batch.html

Spring Batch中的块处理相关推荐

  1. Spring Batch中面向TaskletStep的处理

    许多企业应用程序需要批处理才能每天处理数十亿笔交易. 必须处理这些大事务集,而不会出现性能问题. Spring Batch是一个轻量级且强大的批处理框架,用于处理这些大数据集. Spring Batc ...

  2. 介绍Spring Batch 中Tasklet 和 Chunks

    介绍Spring Batch 中Tasklet 和 Chunks Spring Batch 提供了两种不同方式实现job: tasklet 和 chunk.本文通过实例实践两种方法. 示例需求说明 给 ...

  3. 在spring batch中如何使用rowmapper

    概述 在spring batch框架中提供了三个核心的概念,分别是reader和processor和writer,分别用于读取,处理和写数据.关于这部分更详细的内容可以参考博客:批处理框架spring ...

  4. Spring Batch在大型企业中的最佳实践

    在大型企业中,由于业务复杂.数据量大.数据格式不同.数据交互格式繁杂,并非所有的操作都能通过交互界面进行处理.而有一些操作需要定期读取大批量的数据,然后进行一系列的后续处理.这样的过程就是" ...

  5. java中batch基础_详解Spring batch 入门学习教程(附源码)

    详解Spring batch 入门学习教程(附源码) 发布时间:2020-09-08 00:28:40 来源:脚本之家 阅读:99 作者:achuo Spring batch 是一个开源的批处理框架. ...

  6. Spring Batch 专题

    如今微服务架构讨论的如火如荼.但在企业架构里除了大量的OLTP交易外,还存在海量的批处理交易.在诸如银行的金融机构中,每天有3-4万笔的批处理作业需要处理.针对OLTP,业界有大量的开源框架.优秀的架 ...

  7. Spring Batch教程–最终指南

    这是Spring批处理教程,它是Spring框架的一部分. Spring Batch提供了可重用的功能,这些功能对于处理大量记录至关重要,包括日志记录/跟踪,事务管理,作业处理统计信息,作业重新启动, ...

  8. 扩展Spring Batch –步骤分区

    在之前的几篇文章中,我们已经讨论了如何启动和运行Spring Batch. 现在,我们将开始讨论可用于扩展Spring Batch的一些策略. 本文将重点介绍如何对步骤进行分区,以使该步骤具有多个线程 ...

  9. 大数据批处理框架Spring Batch 的全面解析

    如今微服务架构讨论的如火如荼.但在企业架构里除了大量的OLTP交易外,还存在海量的批处理交易.在诸如银行的金融机构中,每天有3-4万笔的批处理作业需要处理.针对OLTP,业界有大量的开源框架.优秀的架 ...

最新文章

  1. 20150720 Two heads are better than one
  2. unittest-ddt报错AttributeError: type object ‘forTestDDT‘ has no attribute ‘test_2‘
  3. 《天天数学》连载35:二月四日
  4. Intel 64/x86_64/IA-32/x86处理器 - SIMD指令集 - SSE扩展(10) - MXCSR状态控制指令
  5. sklearn GridSearchCV网格搜索案例与代码
  6. java url压缩_URL短地址压缩算法 微博短地址原理解析 (Java实现)
  7. 蹭B站源码泄露的热点来聊聊B站有趣的源码片段
  8. dell安装linux系统网卡,DELL 2850服务器Redhat Linux 9系统安装网卡驱动记
  9. qt 合并单元格 tablewidget 合并单元格
  10. [0CTF 2016]piapiapia BUUCTF 详细writeup
  11. 揭开 Java 注解的神秘面纱
  12. Java学习中遇到的中文乱码问题的整理和解决方法
  13. XDOC 在线word文档表格预览
  14. Activiti 6研究01 - 流程文件的手动导入
  15. Scripts的下载及安装(手动部署)
  16. 电机控制----FOC框架讲解
  17. linux上安装fio教程,FIO使用指南,fio使用方法
  18. 友盟集成小米华为(快速集成和厂商通道集成)
  19. 蓝牙权限管理android,基于蓝牙与Android设备的控制系统设计
  20. MySql数据库事务隔离级别底层实现原理总结

热门文章

  1. mysql unique count_MySQL - Count Number of Unique Values
  2. linux wait函数头文件_手把手教Linux驱动9-等待队列waitq
  3. 利用ant构建 jsp-servlet-class-jar
  4. Hibernate锁定模式– PESSIMISTIC_FORCE_INCREMENT锁定模式如何工作
  5. dbunit使用_摆脱困境:在DbUnit数据集中使用空值
  6. 使用Spring Boot和H2可以完全工作的原型
  7. JDK 12,合并的收集器和命名的挑战
  8. api签名_使用签名保护基于HTTP的API
  9. Java 8中的功能接口是什么? @功能注释和示例
  10. 使用Eclipse和Open Liberty的Java EE 8上的Java 9