我一直在努力将Podcastpedia.org的一些批处理作业迁移到Spring Batch。 以前,这些工作是以我自己的方式开发的,我认为现在是时候使用一种更“标准化”的方法了。 因为我以前从未在Java配置中使用过Spring,所以我认为通过在Java中配置Spring Batch作业,这是学习它的好机会。 而且由于我都在尝试使用Spring进行新的事物,所以为什么不把Spring Boot扔进船里呢?

注意:
在开始本教程之前,我建议您首先阅读Spring的入门-创建批处理服务 ,因为此处提供的结构和代码均基于该原始版本。

1.我要建立的

因此,如前所述,在这篇文章中,我将在配置Spring Batch和为Podcastpedia.org开发一些批处理作业的背景下介绍Spring Batch。 这是Podcastpedia-batch项目当前一部分的两个工作的简短描述:

  1. addNewPodcastJob

    1. 平面文件 读取播客元数据(提要URL,标识符,类别等)
    2. 转换(解析并准备要通过Http Apache Client插入的情节)数据
    3. 最后一步, 插入 Podcastpedia 数据库,通过电子邮件 告知提交者
  2. notifyEmailSubscribersJob –人们可以通过电子邮件在Podcastpedia.org上订阅自己喜欢的播客。 对于那些做过的人,会定期(每日,每周,每月)检查是否有新的情节出现,是否通过电子邮件通知订户是否有新情节; 从数据库中读取,通过JPA扩展读取的数据,将其重新分组并通过电子邮件 通知订户

源代码:
本教程的源代码可在GitHub- Podcastpedia-batch上获得。

注意:在开始之前,我还强烈建议您阅读Batch的域语言 ,以免使“ Jobs”,“ Steps”或“ ItemReaders”等术语听起来很陌生。

2.你需要什么

  • 最喜欢的文本编辑器或IDE
  • JDK 1.7或更高版本
  • Maven 3.0+

3.设置项目

该项目是使用Maven构建的。 它使用Spring Boot,这使创建可“运行”的基于独立Spring的应用程序变得容易。 您可以通过访问项目的网站来了解有关Spring Boot的更多信息。

Maven构建文件

因为它使用Spring Boot,所以它将使用spring-boot-starter-parent作为其父级,另外还有几个其他spring-boot-starters将为我们提供项目中所需的一些库:

podcastpedia-batch项目的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.podcastpedia.batch</groupId><artifactId>podcastpedia-batch</artifactId><version>0.1.0</version><properties><sprinb.boot.version>1.1.6.RELEASE</sprinb.boot.version><java.version>1.7</java.version></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.1.6.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId></dependency>  <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId>         </dependency>        <dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.3.5</version></dependency>      <dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpcore</artifactId><version>4.3.2</version></dependency><!-- velocity --><dependency><groupId>org.apache.velocity</groupId><artifactId>velocity</artifactId><version>1.7</version>        </dependency><dependency><groupId>org.apache.velocity</groupId><artifactId>velocity-tools</artifactId><version>2.0</version><exclusions><exclusion><groupId>org.apache.struts</groupId><artifactId>struts-core</artifactId></exclusion></exclusions>                </dependency><!-- Project rome rss, atom --><dependency><groupId>rome</groupId><artifactId>rome</artifactId><version>1.0</version></dependency><!-- option this fetcher thing --><dependency><groupId>rome</groupId><artifactId>rome-fetcher</artifactId><version>1.0</version></dependency><dependency><groupId>org.jdom</groupId><artifactId>jdom</artifactId><version>1.1</version></dependency>       <!-- PID 1 --><dependency><groupId>xerces</groupId><artifactId>xercesImpl</artifactId><version>2.9.1</version></dependency><!-- MySQL JDBC connector --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.31</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-freemarker</artifactId>   </dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-remote-shell</artifactId> <exclusions><exclusion><groupId>javax.mail</groupId><artifactId>mail</artifactId></exclusion></exclusions>              </dependency><dependency><groupId>javax.mail</groupId><artifactId>mail</artifactId><version>1.4.7</version></dependency>      <dependency><groupId>javax.inject</groupId><artifactId>javax.inject</artifactId><version>1</version></dependency>       <dependency><groupId>org.twitter4j</groupId><artifactId>twitter4j-core</artifactId><version>[4.0,)</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies><build><plugins><plugin> <artifactId>maven-compiler-plugin</artifactId> </plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

注意:
使用spring-boot-starter-parent作为项目的父项的一大优势是,您只需升级父项的版本,它将为您提供“最新”的库。 当我开始该项目时,spring boot的版本为1.1.3.RELEASE ,而在撰写本文时,其版本已经是1.1.6.RELEASE

项目目录结构

我以以下方式构造项目:

项目目录结构

└── src └── main └── java └── org └── podcastpedia └── batch └── common └── jobs └── addpodcast └── notifysubscribers

注意:

  • org.podcastpedia.batch.jobs软件包包含子软件包,这些子软件包具有针对特定作业的特定类。
  • org.podcastpedia.batch.jobs.common包包含所有作业使用的类,例如,当前两个作业都需要的JPA实体。

4.创建一个批处理作业配置

我将首先介绍第一个批处理作业的Java配置类:

批处理作业配置

package org.podcastpedia.batch.jobs.addpodcast;import org.podcastpedia.batch.common.configuration.DatabaseAccessConfiguration;
import org.podcastpedia.batch.common.listeners.LogProcessListener;
import org.podcastpedia.batch.common.listeners.ProtocolListener;
import org.podcastpedia.batch.jobs.addpodcast.model.SuggestedPodcast;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.io.ClassPathResource;import com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException;@Configuration
@EnableBatchProcessing
@Import({DatabaseAccessConfiguration.class, ServicesConfiguration.class})
public class AddPodcastJobConfiguration {@Autowiredprivate JobBuilderFactory jobs;@Autowiredprivate StepBuilderFactory stepBuilderFactory;// tag::jobstep[]@Beanpublic Job addNewPodcastJob(){return jobs.get("addNewPodcastJob").listener(protocolListener()).start(step()).build();} @Beanpublic Step step(){return stepBuilderFactory.get("step").<SuggestedPodcast,SuggestedPodcast>chunk(1) //important to be one in this case to commit after every line read.reader(reader()).processor(processor()).writer(writer()).listener(logProcessListener()).faultTolerant().skipLimit(10) //default is set to 0.skip(MySQLIntegrityConstraintViolationException.class).build();}  // end::jobstep[]// tag::readerwriterprocessor[]@Beanpublic ItemReader<SuggestedPodcast> reader(){FlatFileItemReader<SuggestedPodcast> reader = new FlatFileItemReader<SuggestedPodcast>();reader.setLinesToSkip(1);//first line is title definition reader.setResource(new ClassPathResource("suggested-podcasts.txt"));reader.setLineMapper(lineMapper());return reader; }@Beanpublic LineMapper<SuggestedPodcast> lineMapper() {DefaultLineMapper<SuggestedPodcast> lineMapper = new DefaultLineMapper<SuggestedPodcast>();DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();lineTokenizer.setDelimiter(";");lineTokenizer.setStrict(false);lineTokenizer.setNames(new String[]{"FEED_URL", "IDENTIFIER_ON_PODCASTPEDIA", "CATEGORIES", "LANGUAGE", "MEDIA_TYPE", "UPDATE_FREQUENCY", "KEYWORDS", "FB_PAGE", "TWITTER_PAGE", "GPLUS_PAGE", "NAME_SUBMITTER", "EMAIL_SUBMITTER"});BeanWrapperFieldSetMapper<SuggestedPodcast> fieldSetMapper = new BeanWrapperFieldSetMapper<SuggestedPodcast>();fieldSetMapper.setTargetType(SuggestedPodcast.class);lineMapper.setLineTokenizer(lineTokenizer);lineMapper.setFieldSetMapper(suggestedPodcastFieldSetMapper());return lineMapper;}@Beanpublic SuggestedPodcastFieldSetMapper suggestedPodcastFieldSetMapper() {return new SuggestedPodcastFieldSetMapper();}/** configure the processor related stuff */@Beanpublic ItemProcessor<SuggestedPodcast, SuggestedPodcast> processor() {return new SuggestedPodcastItemProcessor();}@Beanpublic ItemWriter<SuggestedPodcast> writer() {return new Writer();}// end::readerwriterprocessor[]@Beanpublic ProtocolListener protocolListener(){return new ProtocolListener();}@Beanpublic LogProcessListener logProcessListener(){return new LogProcessListener();}    }

@EnableBatchProcessing批注添加了许多支持作业的关键bean,并节省了我们的配置工作。 例如,您还可以@Autowired一些有用的东西到您的上下文中:

  • JobRepository (bean名称为“ jobRepository”)
  • JobLauncher (bean名称为“ jobLauncher”)
  • JobRegistry (bean名称为“ jobRegistry”)
  • 一个PlatformTransactionManager (bean名称为“ transactionManager”)
  • 一个JobBuilderFactory (bean名称为“ jobBuilders”)是一种便利,可以防止您不得不将作业存储库注入到每个作业中,如上例所示
  • StepBuilderFactory (bean名称为“ stepBuilders”)是一种便利,可防止您不得不将作业存储库和事务管理器注入到每个步骤中

第一部分着重于实际的作业配置:

批处理作业和步骤配置

@Bean
public Job addNewPodcastJob(){return jobs.get("addNewPodcastJob").listener(protocolListener()).start(step()).build();
}   @Bean
public Step step(){return stepBuilderFactory.get("step").<SuggestedPodcast,SuggestedPodcast>chunk(1) //important to be one in this case to commit after every line read.reader(reader()).processor(processor()).writer(writer()).listener(logProcessListener()).faultTolerant().skipLimit(10) //default is set to 0.skip(MySQLIntegrityConstraintViolationException.class).build();
}

第一种方法定义了一个工作,第二种方法定义了一个步骤。 正如您在“批处理的域语言”中所读到的一样 ,作业是从步骤构建的,其中每个步骤都可以涉及阅读器,处理器和编写器。

在步骤定义中,您定义一次要写入多少数据(在本例中,一次要写入1条记录)。 接下来,您指定读取器,处理器和写入器。

5. Spring Batch处理单元

大部分批处理可描述为读取数据,对其进行一些转换,然后将结果写出。 如果您对此有所了解,这将以某种方式反映提取,转换,加载(ETL)的过程。 Spring Batch提供了三个关键接口来帮助执行批量读取和写入: ItemReaderItemProcessorItemWriter

读者群

ItemReader是一种抽象,它提供了从许多不同类型的输入中检索数据的方法: 平面文件xml文件数据库jms等,一次仅一项。 有关可用项目阅读器的完整列表, 请参见附录A. ItemReaders和ItemWriters列表。

在Podcastpedia批处理作业中,我使用以下专用的ItemReader:

5.1.1。 FlatFileItemReader

顾名思义,它从一个平面文件中读取数据行,这些文件通常描述记录,这​​些记录的数据字段由文件中的固定位置定义或由某些特殊字符(例如逗号)分隔。 这种类型的ItemReader在第一个批处理作业中使用,addNewPodcastJob。 所使用的输入文件名为“ suggested-podcasts.in” ,位于类路径( src / main / resources )中,其外观类似于以下内容:

FlatFileItemReader的输入文件

FEED_URL; IDENTIFIER_ON_PODCASTPEDIA; CATEGORIES; LANGUAGE; MEDIA_TYPE; UPDATE_FREQUENCY; KEYWORDS; FB_PAGE; TWITTER_PAGE; GPLUS_PAGE; NAME_SUBMITTER; EMAIL_SUBMITTER
http://www.5minutebiographies.com/feed/; 5minutebiographies; people_society, history; en; Audio; WEEKLY; biography, biographies, short biography, short biographies, 5 minute biographies, five minute biographies, 5 minute biography, five minute biography; https://www.facebook.com/5minutebiographies; https://twitter.com/5MinuteBios; ; Adrian Matei; adrianmatei@gmail.com
http://notanotherpodcast.libsyn.com/rss; NotAnotherPodcast; entertainment; en; Audio; WEEKLY; Comedy, Sports, Cinema, Movies, Pop Culture, Food, Games; https://www.facebook.com/notanotherpodcastusa; https://twitter.com/NAPodcastUSA; https://plus.google.com/u/0/103089891373760354121/posts; Adrian Matei; adrianmatei@gmail.com

如您所见,第一行定义“列”的名称,随后几行包含实际数据(以“;”分隔),需要转换为上下文中相关的域对象。

现在让我们看看如何配置FlatFileItemReader

FlatFileItemReader示例

@Bean
public ItemReader<SuggestedPodcast> reader(){FlatFileItemReader<SuggestedPodcast> reader = new FlatFileItemReader<SuggestedPodcast>();reader.setLinesToSkip(1);//first line is title definition reader.setResource(new ClassPathResource("suggested-podcasts.in"));reader.setLineMapper(lineMapper());return reader;
}

除其他外,您可以指定输入资源,要跳过的行数和行映射器。

5.1.1.1。 LineMapper

LineMapper是用于将线(字符串)映射到域对象的接口,通常用于将从文件读取的线映射到每行的域对象。 对于Podcastpedia作业,我使用DefaultLineMapper ,这是两阶段的实现,包括将行的标记化为FieldSet然后映射到item:

LineMapper默认实现示例

@Bean
public LineMapper<SuggestedPodcast> lineMapper() {DefaultLineMapper<SuggestedPodcast> lineMapper = new DefaultLineMapper<SuggestedPodcast>();DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();lineTokenizer.setDelimiter(";");lineTokenizer.setStrict(false);lineTokenizer.setNames(new String[]{"FEED_URL", "IDENTIFIER_ON_PODCASTPEDIA", "CATEGORIES", "LANGUAGE", "MEDIA_TYPE", "UPDATE_FREQUENCY", "KEYWORDS", "FB_PAGE", "TWITTER_PAGE", "GPLUS_PAGE", "NAME_SUBMITTER", "EMAIL_SUBMITTER"});BeanWrapperFieldSetMapper<SuggestedPodcast> fieldSetMapper = new BeanWrapperFieldSetMapper<SuggestedPodcast>();fieldSetMapper.setTargetType(SuggestedPodcast.class);lineMapper.setLineTokenizer(lineTokenizer);lineMapper.setFieldSetMapper(suggestedPodcastFieldSetMapper());return lineMapper;
}
  • DelimitedLineTokenizer通过“;”分割输入字符串 定界符。
  • 如果将strict标志设置为false则将容忍具有较少令牌的行并用空列填充,而具有更多令牌的行将被截断。
  • 第一行的列名称设置为lineTokenizer.setNames(...);
  • 并设置了fieldMapper (第14行)

注意:
FieldSet是“接口”,平面文件输入源使用它来封装将字符串数组转换为Java本机类型的担忧。 就像JDBC中ResultSet扮演的角色一样,客户端将知道他们要提取的强类型字段的名称或位置。”

FieldSetMapper

FieldSetMapper是一个接口,用于将从FieldSet获得的数据FieldSet到对象中。 这是将fieldSet映射到SuggestedPodcast域对象的实现,该对象将进一步传递给处理器:

FieldSetMapper的实现

public class SuggestedPodcastFieldSetMapper implements FieldSetMapper<SuggestedPodcast> {@Overridepublic SuggestedPodcast mapFieldSet(FieldSet fieldSet) throws BindException {SuggestedPodcast suggestedPodcast = new SuggestedPodcast();suggestedPodcast.setCategories(fieldSet.readString("CATEGORIES"));suggestedPodcast.setEmail(fieldSet.readString("EMAIL_SUBMITTER"));suggestedPodcast.setName(fieldSet.readString("NAME_SUBMITTER"));suggestedPodcast.setTags(fieldSet.readString("KEYWORDS"));//some of the attributes we can map directly into the Podcast entity that we'll insert later into the databasePodcast podcast = new Podcast();podcast.setUrl(fieldSet.readString("FEED_URL"));podcast.setIdentifier(fieldSet.readString("IDENTIFIER_ON_PODCASTPEDIA"));podcast.setLanguageCode(LanguageCode.valueOf(fieldSet.readString("LANGUAGE")));podcast.setMediaType(MediaType.valueOf(fieldSet.readString("MEDIA_TYPE")));podcast.setUpdateFrequency(UpdateFrequency.valueOf(fieldSet.readString("UPDATE_FREQUENCY")));podcast.setFbPage(fieldSet.readString("FB_PAGE"));podcast.setTwitterPage(fieldSet.readString("TWITTER_PAGE"));podcast.setGplusPage(fieldSet.readString("GPLUS_PAGE"));suggestedPodcast.setPodcast(podcast);return suggestedPodcast;}}

JdbcCursorItemReader

在第二个作业notifyRmailSubscribersJob中 ,在阅读器中,我仅从单个数据库表中读取电子邮件订阅者,但在处理器中,进一步执行了更详细的读取(通过JPA),以检索用户订阅的播客的所有新片段。 。 这是批处理环境中使用的常见模式。 单击此链接以获取更多常见批处理模式。

对于初始读取,我选择了JdbcCursorItemReader ,这是一个简单的阅读器实现,它打开JDBC游标并连续检索ResultSet的下一行:

JdbcCursorItemReader示例

@Bean
public ItemReader<User> notifySubscribersReader(){JdbcCursorItemReader<User> reader = new JdbcCursorItemReader<User>();String sql = "select * from users where is_email_subscriber is not null";reader.setSql(sql);reader.setDataSource(dataSource);reader.setRowMapper(rowMapper());     return reader;
}

注意我必须设置sql ,要读取的datasourceRowMapper

5.2.1。 行映射器

RowMapperJdbcTemplate使用的接口,用于按行映射Result'set的行。 我对该接口的实现执行将每一行映射到结果对象的实际工作,但是我不必担心异常处理:

RowMapper的实现

public class UserRowMapper implements RowMapper<User> {@Overridepublic User mapRow(ResultSet rs, int rowNum) throws SQLException {User user = new User();user.setEmail(rs.getString("email"));return user;}}

作家

ItemWriter是一种抽象,表示一次Step的输出,每次一批或大块的项目。 通常,项目编写者不知道下一步将要接收的输入,仅知道在当前调用中传递的项目。

提出的两项工作的作者非常简单。 他们只是使用外部服务来发送电子邮件通知并在Podcastpedia的帐户上发布推文。 这是第一个任务的ItemWriter的实现– addNewPodcast

ItemWriter的Writer实现

package org.podcastpedia.batch.jobs.addpodcast;import java.util.Date;
import java.util.List;import javax.inject.Inject;
import javax.persistence.EntityManager;import org.podcastpedia.batch.common.entities.Podcast;
import org.podcastpedia.batch.jobs.addpodcast.model.SuggestedPodcast;
import org.podcastpedia.batch.jobs.addpodcast.service.EmailNotificationService;
import org.podcastpedia.batch.jobs.addpodcast.service.SocialMediaService;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;public class Writer implements ItemWriter<SuggestedPodcast>{@Autowiredprivate EntityManager entityManager;@Injectprivate EmailNotificationService emailNotificationService;@Injectprivate SocialMediaService socialMediaService;@Overridepublic void write(List<? extends SuggestedPodcast> items) throws Exception {if(items.get(0) != null){SuggestedPodcast suggestedPodcast = items.get(0);//first insert the data in the database Podcast podcast = suggestedPodcast.getPodcast();podcast.setInsertionDate(new Date());entityManager.persist(podcast);entityManager.flush();//notify submitter about the insertion and post a twitt about it String url = buildUrlOnPodcastpedia(podcast);emailNotificationService.sendPodcastAdditionConfirmation(suggestedPodcast.getName(), suggestedPodcast.getEmail(),url);if(podcast.getTwitterPage() != null){socialMediaService.postOnTwitterAboutNewPodcast(podcast,url);              }                   }}private String buildUrlOnPodcastpedia(Podcast podcast) {StringBuffer urlOnPodcastpedia = new StringBuffer("http://www.podcastpedia.org");if (podcast.getIdentifier() != null) {urlOnPodcastpedia.append("/" + podcast.getIdentifier());} else {urlOnPodcastpedia.append("/podcasts/");urlOnPodcastpedia.append(String.valueOf(podcast.getPodcastId()));urlOnPodcastpedia.append("/" + podcast.getTitleInUrl());}      String url = urlOnPodcastpedia.toString();return url;}}

如您所见,这里没有什么特别之处,除了必须重写write方法之外,这是注入的外部服务EmailNotificationServiceSocialMediaService用于通过电子邮件向播客提交者告知播客目录添加内容以及Twitter是否可用的地方。提交的页面上,将有一则推文张贴在播客的墙上 。 您可以在以下文章中找到有关如何通过Velocity发送电子邮件以及如何从Java在Twitter上发布的详细说明:

  • 如何使用Spring和Velocity在Java中编写HTML电子邮件
  • 如何在10分钟内使用Java从Twitter4J发布到Twittter

处理器

ItemProcessor是代表项目业务处理的抽象。 当ItemReader读取一个项目,而ItemWriter写入一个项目时, ItemProcessor提供访问以转换或应用其他业务处理。 使用自己的Processors ,必须实现ItemProcessor<I,O>接口,其唯一方法O process(I item) throws Exception ,返回可能被修改的或新的项目以继续处理。 如果返回的结果为null,则认为该项目的处理不应继续。

尽管第一项工作的处理器需要更多的逻辑,但是因为我必须设置etaglast-modified标头属性,播客的feed属性,情节,类别和关键字:

作业addNewPodcast的ItemProcessor实现

public class SuggestedPodcastItemProcessor implements ItemProcessor<SuggestedPodcast, SuggestedPodcast> {private static final int TIMEOUT = 10;@AutowiredReadDao readDao;@AutowiredPodcastAndEpisodeAttributesService podcastAndEpisodeAttributesService;@Autowiredprivate PoolingHttpClientConnectionManager poolingHttpClientConnectionManager;  @Autowiredprivate SyndFeedService syndFeedService;/*** Method used to build the categories, tags and episodes of the podcast*/@Overridepublic SuggestedPodcast process(SuggestedPodcast item) throws Exception {if(isPodcastAlreadyInTheDirectory(item.getPodcast().getUrl())) {return null;}String[] categories = item.getCategories().trim().split("\\s*,\\s*");        item.getPodcast().setAvailability(org.apache.http.HttpStatus.SC_OK);//set etag and last modified attributes for the podcastsetHeaderFieldAttributes(item.getPodcast());//set the other attributes of the podcast from the feed podcastAndEpisodeAttributesService.setPodcastFeedAttributes(item.getPodcast());//set the categoriesList<Category> categoriesByNames = readDao.findCategoriesByNames(categories);item.getPodcast().setCategories(categoriesByNames);//set the tagssetTagsForPodcast(item);//build the episodes setEpisodesForPodcast(item.getPodcast());return item;}......
}

第二个工作的处理器使用“驱动查询”方法 ,在该方法中 ,我用另一个“ JPA读取”扩展了从阅读器中检索的数据,并用情节对播客中的项目进行了分组,以便在我所用的电子邮件中看起来不错发送给订户:

ItemProcessor实现的第二项工作– notifySubscribers

@Scope("step")
public class NotifySubscribersItemProcessor implements ItemProcessor<User, User> {@AutowiredEntityManager em;@Value("#{jobParameters[updateFrequency]}")String updateFrequency;@Overridepublic User process(User item) throws Exception {String sqlInnerJoinEpisodes = "select e from User u JOIN u.podcasts p JOIN p.episodes e WHERE u.email=?1 AND p.updateFrequency=?2 AND"+ " e.isNew IS NOT NULL  AND e.availability=200 ORDER BY e.podcast.podcastId ASC, e.publicationDate ASC";TypedQuery<Episode> queryInnerJoinepisodes = em.createQuery(sqlInnerJoinEpisodes, Episode.class);queryInnerJoinepisodes.setParameter(1, item.getEmail());queryInnerJoinepisodes.setParameter(2, UpdateFrequency.valueOf(updateFrequency));       List<Episode> newEpisodes = queryInnerJoinepisodes.getResultList();return regroupPodcastsWithEpisodes(item, newEpisodes);}.......
}

注意:
如果您想了解更多有关如何使用Apache Http Client,获取etaglast-modified标头的信息,可以看一下我的文章– 如何使用新的Apache Http Client进行HEAD请求

6.执行批处理应用程序

批处理可以嵌入到Web应用程序和WAR文件中,但是在一开始我选择了一种创建独立应用程序的简单方法,该方法可以通过Java main()方法启动:

批处理Java main()方法

package org.podcastpedia.batch;
//imports ...;@ComponentScan
@EnableAutoConfiguration
public class Application {private static final String NEW_EPISODES_NOTIFICATION_JOB = "newEpisodesNotificationJob";private static final String ADD_NEW_PODCAST_JOB = "addNewPodcastJob";public static void main(String[] args) throws BeansException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException, InterruptedException {Log log = LogFactory.getLog(Application.class);SpringApplication app = new SpringApplication(Application.class);app.setWebEnvironment(false);ConfigurableApplicationContext ctx= app.run(args);JobLauncher jobLauncher = ctx.getBean(JobLauncher.class);if(ADD_NEW_PODCAST_JOB.equals(args[0])){//addNewPodcastJobJob addNewPodcastJob = ctx.getBean(ADD_NEW_PODCAST_JOB, Job.class);JobParameters jobParameters = new JobParametersBuilder().addDate("date", new Date()).toJobParameters();  JobExecution jobExecution = jobLauncher.run(addNewPodcastJob, jobParameters);BatchStatus batchStatus = jobExecution.getStatus();while(batchStatus.isRunning()){log.info("*********** Still running.... **************");Thread.sleep(1000);}log.info(String.format("*********** Exit status: %s", jobExecution.getExitStatus().getExitCode()));JobInstance jobInstance = jobExecution.getJobInstance();log.info(String.format("********* Name of the job %s", jobInstance.getJobName()));log.info(String.format("*********** job instance Id: %d", jobInstance.getId()));System.exit(0);} else if(NEW_EPISODES_NOTIFICATION_JOB.equals(args[0])){JobParameters jobParameters = new JobParametersBuilder().addDate("date", new Date()).addString("updateFrequency", args[1]).toJobParameters();  jobLauncher.run(ctx.getBean(NEW_EPISODES_NOTIFICATION_JOB,  Job.class), jobParameters);   } else {throw new IllegalArgumentException("Please provide a valid Job name as first application parameter");}System.exit(0);}}

从源头获得的有关SpringApplication -, @ComponentScan @EnableAutoConfiguration@EnableAutoConfiguration的最佳解释-入门-创建批处理服务:

main()方法SpringApplication helper类,将Application.class作为其run()方法的参数提供。 这告诉Spring从Application读取注释元数据,并将其作为Spring应用程序上下文中的组件进行管理。

@ComponentScan批注告诉Spring通过org.podcastpedia.batch包及其子级进行递归搜索,以查找直接或间接用Spring的@Component批注标记的@Component 。 该指令确保Spring查找并注册BatchConfiguration ,因为它被标记为@Configuration ,而@Configuration则是一种@Component注释。

@EnableAutoConfiguration批注根据您的类路径的内容打开合理的默认行为。 例如,它将查找实现CommandLineRunner接口并调用其run()方法的任何类。”

执行构建步骤:

  • JobLauncher是用于控制作业的简单界面,是从ApplicationContext中检索的。 请记住,这是通过@EnableBatchProcessing注释自动提供的。
  • 现在基于应用程序的第一个参数( args[0] ),我将从ApplicationContext检索相应的Job
  • 然后准备JobParameters ,在这里使用当前日期– .addDate("date", new Date()) ,以便作业执行始终是唯一的。
  • 一旦一切就绪,就可以执行作业: JobExecution jobExecution = jobLauncher.run(addNewPodcastJob, jobParameters);
  • 您可以使用返回的jobExecution来访问BatchStatus ,退出代码或作业名称和ID。

注意:我强烈建议您阅读和理解Spring Batch的元数据架构 。 它还将帮助您更好地了解Spring Batch Domain对象。

在开发和生产环境中运行应用程序

为了能够在不同的环境上运行Spring Batch / Spring Boot应用程序,我使用了Spring Profiles功能。 默认情况下,应用程序使用开发数据(数据库)运行。 但是,如果我想让工作使用生产数据库,则必须执行以下操作:

  • 提供以下环境参数-Dspring.profiles.active=prod
  • 在默认的application.properties文件旁边,在类路径的application-prod.properties文件中配置了生产数据库属性

摘要

在本教程中,我们学习了如何使用Spring Boot和Java配置来配置Spring Batch项目,如何在批处理中使用一些最普通的阅读器,如何配置一些简单的作业,以及如何从A程序启动Spring Batch作业。主要方法。

翻译自: https://www.javacodegeeks.com/2014/09/spring-batch-tutorial-with-spring-boot-and-java-configuration.html

具有Spring Boot和Java配置的Spring Batch教程相关推荐

  1. Spring Boot笔记-自动配置(Spring Boot封装成jar被其他项目引用)

    特点: 这里也就是自己写个Service注册到别人的SpringBoot项目中,然后别人来调用这个Service,这个Service,先读取自己的application.properties,再读取引 ...

  2. java spring sqlite,Spring Boot+MyBatis+SQLite配置

    Spring Boot+MyBatis+SQLite配置例子参考下面 创建新项目 项目类型务必选择箭头指定的类型,否则不会自动生成代码模版 增加依赖项 junit junit 4.13.1 test ...

  3. 自定义spring boot的自动配置

    文章目录 添加Maven依赖 创建自定义 Auto-Configuration 添加Class Conditions 添加 bean Conditions Property Conditions Re ...

  4. Spring Boot的自动化配置原理

    转载自 Spring Boot的自动化配置原理 随着Ruby.Groovy等动态语言的流行,相比较之下Java的开发显得格外笨重.繁多的配置.低下的开发效率.复杂的部署流程以及第三方技术集成难度大等问 ...

  5. 在Spring Boot中使用配置元数据来配置您的配置

    Spring Boot 1.3.0中发布了许多更新,但是其中一个对我很突出,因为我以前并不了解此更新,它的状态使其成为一项真正有用的功能(不幸的是,撰写本文时仅在Spring Boot中可用)这个). ...

  6. Spring Boot –现代Java应用程序的基础

    Spring Boot是Spring.io中一个相对较新的项目. 其目的是简化创建新的基于Spring Framework的项目,并通过应用一些约定来统一其配置. 这种关于配置的方法约定已经成功地应用 ...

  7. Spring boot的Maven配置依赖

    Spring boot 的 Maven 配置依赖 springboot spring pom maven models 我们通过引用spring-boot-starter-parent,添加sprin ...

  8. spring boot java app_利用spring boot创建java app

    利用spring boot创建java app 背景 在使用spring框架开发的过程中,随着功能以及业务逻辑的日益复杂,应用伴随着大量的XML配置和复杂的bean依赖关系,特别是在使用mvc的时候各 ...

  9. Spring Boot 针对 Java 开发人员的安装指南

    Spring Boot 可以使用经典的开发工具或者使用安装的命令行工具.不管使用何种方式,你都需要确定你的 Java 版本为 Java SDK v1.8 或者更高的版本.在你开始安装之前,你需要确定你 ...

最新文章

  1. 深入浅出多线程系列之四:简单的同步 lock
  2. spring图片转视频_一直在用的 Spring,你知道它的加载原理吗?
  3. fatal error C1083: 无法打开包括文件:“stdio.h
  4. 深入理解计算机操作系统:链接笔记
  5. anaconda 怎么安装xlrd_Anaconda 安装 tensorflow 和 keras
  6. table取tr对象 vue_Vue笔记
  7. python2.7虚拟环境
  8. 为什么电脑安装不了python_windows电脑安装python教程
  9. CVE-2012-0158 MSCOMCTL.ocx栈溢出漏洞分析
  10. 来不及解释!python字符串常用方法大全,先收藏再说
  11. 前端代码编辑器 sublime text 4 for Mac v4.0(4114) 中文设置
  12. 是德科技N9020A 频谱分析仪技术资料说明
  13. linux du 查看文件夹大小
  14. 推荐一款开源的加解密算法 --- XXTEA
  15. Word标题不显示目录数字章节
  16. 证监会计算机类笔试上岸经验,公务员考试笔试166分上岸经验(全干货)
  17. Android侧滑原来可以这么优雅
  18. django html 插入网页背景图片
  19. 2022年十三届蓝桥杯国赛(C/C++大学B组)个人题解
  20. 信用社计算机考试真题,农村信用社计算机考试试题.doc

热门文章

  1. 银行营业网点管理系统——entity类(CityArea)
  2. 新闻发布项目——实体类(comment)
  3. Python MySQL 插入表
  4. docker下安装nacos 并使用mysql数据库
  5. ping 命令使用代理_网络检测知识篇:ping命令使用知识,你知道几点?
  6. (实模式+保护模式)模式切换的过程步骤(代码+文字解析)
  7. java 指令重拍_我发现我的Java重拍了!
  8. jvm 错误_JVM因“ OutOfMemory”错误而关闭-我该怎么办?
  9. forge插件_使用Forge插件在现有Java EE项目上启用Arquillian
  10. JMetro版本8.6.11和11.6.11已发布