项目需求

近日需要实现用户推荐相关的功能,也就是说向用户推荐他可能喜欢的东西。

我们的数据分析工程师会将用户以及用户可能喜欢的东西整理成文档给我,我只需要将数据从文档中读取出来,然后对数据进行进一步的清洗(例如去掉特殊符号,长度如果太长则截取)。然后将处理后的数据存入数据库(Mysql)。

所以分为三步:

读取文档获得数据

对获得的数据进行处理

更新数据库(新增或更新)

考虑到这个数据量以后会越来越大,这里没有使用 poi 来读取数据,而直接使用了 SpringBatch。

实现步骤

本文假设读者已经能够使用 SpringBoot 连接处理 Mysql,所以这部分文中会省略。

1、创建 Maven 项目,并在 pom.xml 中添加依赖

org.springframework.boot

spring-boot-starter-parent

1.5.2.RELEASE

1.8

org.springframework.boot

spring-boot-starter-batch

org.springframework.boot

spring-boot-starter-data-jpa

org.springframework.boot

spring-boot-starter-test

test

org.mybatis.spring.boot

mybatis-spring-boot-starter

1.2.0

org.projectlombok

lombok

1.12.6

org.apache.commons

commons-lang3

3.4

mysql

mysql-connector-java

runtime

com.alibaba

druid

1.0.26

org.springframework.boot

spring-boot-starter-web

这里是这个小项目中用到的所有依赖,包括连接数据库的依赖以及工具类等。

2、编写 Model 类

我们要从文档中读取的有效列就是 uid,tag,type,就是用户 ID,用户可能包含的标签(用于推送),用户类别(用户用户之间互相推荐)。

UserMap.java 中的 @Entity,@Column 注解,是为了利用 JPA 生成数据表而写的,可要可不要。

UserMap.java

@Data

@EqualsAndHashCode

@NoArgsConstructor

@AllArgsConstructor

//@Entity(name = "user_map")

public class UserMap extends BaseModel {

@Column(name = "uid", unique = true, nullable = false)

private Long uid;

@Column(name = "tag")

private String tag;

@Column(name = "type")

private Integer type;

}

3、实现批处理配置类

BatchConfiguration.java

@Configuration

@EnableBatchProcessing

public class BatchConfiguration {

@Autowired

public JobBuilderFactory jobBuilderFactory;

@Autowired

public StepBuilderFactory stepBuilderFactory;

@Autowired

@Qualifier("prodDataSource")

DataSource prodDataSource;

@Bean

public FlatFileItemReader reader() {

FlatFileItemReader reader = new FlatFileItemReader<>();

reader.setResource(new ClassPathResource("c152.txt"));

reader.setLineMapper(new DefaultLineMapper() {{

setLineTokenizer(new DelimitedLineTokenizer("|") {{

setNames(new String[]{"uid", "tag", "type"});

}});

setFieldSetMapper(new BeanWrapperFieldSetMapper() {{

setTargetType(UserMap.class);

}});

}});

return reader;

}

@Bean

public JdbcBatchItemWriter importWriter() {

JdbcBatchItemWriter writer = new JdbcBatchItemWriter<>();

writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());

writer.setSql("INSERT INTO user_map (uid,tag,type) VALUES (:uid, :tag,:type)");

writer.setDataSource(prodDataSource);

return writer;

}

@Bean

public JdbcBatchItemWriter updateWriter() {

JdbcBatchItemWriter writer = new JdbcBatchItemWriter<>();

writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());

writer.setSql("UPDATE user_map SET type = (:type),tag = (:tag) WHERE uid = (:uid)");

writer.setDataSource(prodDataSource);

return writer;

}

@Bean

public UserMapItemProcessor processor(UserMapItemProcessor.ProcessStatus processStatus) {

return new UserMapItemProcessor(processStatus);

}

@Bean

public Job importUserJob(JobCompletionNotificationListener listener) {

return jobBuilderFactory.get("importUserJob")

.incrementer(new RunIdIncrementer())

.listener(listener)

.flow(importStep())

.end()

.build();

}

@Bean

public Step importStep() {

return stepBuilderFactory.get("importStep")

.chunk(100)

.reader(reader())

.processor(processor(IMPORT))

.writer(importWriter())

.build();

}

@Bean

public Job updateUserJob(JobCompletionNotificationListener listener) {

return jobBuilderFactory.get("updateUserJob")

.incrementer(new RunIdIncrementer())

.listener(listener)

.flow(updateStep())

.end()

.build();

}

@Bean

public Step updateStep() {

return stepBuilderFactory.get("updateStep")

.chunk(100)

.reader(reader())

.processor(processor(UPDATE))

.writer(updateWriter())

.build();

}

}

prodDataSource 是假设用户已经设置好的,如果不知道怎么配置,也可以参考之前的文章进行配置:Springboot 集成 Mybatis。

reader(),这方法从文件中读取数据,并且设置了一些必要的参数。紧接着是写操作 importWriter() 和 updateWriter(),读者看其中一个就好,因为我这里是需要更新或者修改的,所以分为两个。

processor(ProcessStatus status) ,该方法是对我们处理数据的类进行实例化,这里我根据 status 是 IMPORT 还是 UPDATE 来获取不同的处理结果。

其他的看代码就可以看懂了,哈哈,不详细说了。

4、将获得的数据进行清洗

UserMapItemProcessor.java

public class UserMapItemProcessor implements ItemProcessor {

private static final int MAX_TAG_LENGTH = 200;

private ProcessStatus processStatus;

public UserMapItemProcessor(ProcessStatus processStatus) {

this.processStatus = processStatus;

}

@Autowired

IUserMapService userMapService;

private static final String TAG_PATTERN_STR = "^[a-zA-Z0-9\\u4E00-\\u9FA5_-]+$";

public static final Pattern TAG_PATTERN = Pattern.compile(TAG_PATTERN_STR);

private static final Logger LOG = LoggerFactory.getLogger(UserMapItemProcessor.class);

@Override

public UserMap process(UserMap userMap) throws Exception {

Long uid = userMap.getUid();

String tag = cleanTag(userMap.getTag());

Integer label = userMap.getType() == null ? Integer.valueOf(0) : userMap.getType();

if (StringUtils.isNotBlank(tag)) {

Map params = new HashMap<>();

params.put("uid", uid);

UserMap userMapFromDB = userMapService.selectOne(params);

if (userMapFromDB == null) {

if (this.processStatus == ProcessStatus.IMPORT) {

return new UserMap(uid, tag, label);

}

} else {

if (this.processStatus == ProcessStatus.UPDATE) {

if (!tag.equals(userMapFromDB.getTag()) && !label.equals(userMapFromDB.getType())) {

userMapFromDB.setType(label);

userMapFromDB.setTag(tag);

return userMapFromDB;

}

}

}

}

return null;

}

/**

* 清洗标签

*

* @param tag

* @return

*/

private static String cleanTag(String tag) {

if (StringUtils.isNotBlank(tag)) {

try {

tag = tag.substring(tag.indexOf("{") + 1, tag.lastIndexOf("}"));

String[] tagArray = tag.split(",");

Optional reduce = Arrays.stream(tagArray).parallel()

.map(str -> str.split(":")[0])

.map(str -> str.replaceAll("\'", ""))

.map(str -> str.replaceAll(" ", ""))

.filter(str -> TAG_PATTERN.matcher(str).matches())

.reduce((x, y) -> x + "," + y);

Function str = (s -> s.length() > MAX_TAG_LENGTH ? s.substring(0, MAX_TAG_LENGTH) : s);

return str.apply(reduce.get());

} catch (Exception e) {

LOG.error(e.getMessage(), e);

}

}

return null;

}

protected enum ProcessStatus {

IMPORT,

UPDATE;

}

public static void main(String[] args) {

String distinctTag = cleanTag("Counter({'《重新定义》': 3, '轻想上的轻小说': 3, '小说': 2, 'Fate': 2, '同人小说': 2, '雪狼八组': 1, " +

"'社会': 1, '人文': 1, '短篇': 1, '重新定义': 1, 'AMV': 1, '《FBD》': 1, '《雪狼六组》': 1, '战争': 1, '《灰羽联盟》': 1, " +

"'谁说轻想没人写小说': 1})");

System.out.println(distinctTag);

}

}

读取到的数据格式如 main()方法所示,清理之后的结果如:

轻想上的轻小说,小说,Fate,同人小说,雪狼八组,社会,人文,短篇,重新定义,AMV,战争,谁说轻想没人写小说 。

去掉了特殊符号以及数字等。使用了 Java8 的 Lambda 表达式。

并且这里在处理的时候,判断如果该数据用户已经存在,则进行更新,如果不存在,则新增。

5、Job 执行结束回调类

JobCompletionNotificationListener.java

@Component

public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

private final JdbcTemplate jdbcTemplate;

@Autowired

public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {

this.jdbcTemplate = jdbcTemplate;

}

@Override

public void afterJob(JobExecution jobExecution) {

System.out.println("end .....");

}

}

具体的逻辑可自行实现。

完成以上几个步骤,运行项目,就可以读取并写入数据到数据库了。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

本文标题: Spring Batch读取txt文件并写入数据库的方法教程

本文地址: http://www.cppcns.com/ruanjian/java/189254.html

springbatch读取文件_Spring Batch读取txt文件并写入数据库的方法教程相关推荐

  1. Java中使用字符流读取UTF-8和写出txt文件 乱码 问题

    乱码问题一直都是非常难受的问题,本文解决Java中使用字符流读取UTF-8和写出txt文件 乱码 话不多说,直接上图 输出结果: 使用代码: 解决:

  2. Python读取txt文件画Loss曲线图---txt文件转xls文件---xls文件转txt文件--xml文件转txt文件

    任务1- 读取.txt文件画折线图(曲线图) 任务2- 将.txt文件转换成.xls(excel)文件 任务3- 将.xls(excel)文件转换成.txt文件 任务4- 读取.txt文件画折线图(曲 ...

  3. matlab读取二进制文件字符串,matlab读取内容为二进制的TXT文件

    本方法同样适合读取十六进制和二进制以外的其他进制文件, txt使用一个最简单的命令就可以读取 textread 这是一个十分有用,简便的函数(对于fopen fscanf而言) 读取二进制txt文件: ...

  4. python读取多个文件夹下所有txt_Python实现合并同一个文件夹下所有txt文件的方法示例...

    本文实例讲述了Python实现合并同一个文件夹下所有txt文件的方法.分享给大家供大家参考,具体如下: 一.需求分析 合并一个文件夹下所有txt文件 二.合并效果 三.python实现代码 # -*- ...

  5. c++ 按行读取txt文件并赋值_python操作txt文件中数据教程[3]python读取文件夹中所有txt文件并将数据转为csv文件...

    觉得有用的话,请点击右下角 推荐给更多小伙伴 neoken_xuAsurada2015Evacloud 参考文献 python 操作 txt 文件中数据教程[1]-使用 python 读写 txt 文 ...

  6. matlab读int16读文件_Matlab文件操作及读txt文件(fopen,fseek,fread,fclose)

    Matlab 文件操作及读 txt 文件 (fopen,fseek,fread,fclose) matlab 文件操作 文件操作是一种重要的输入输出方式,即从数据文件读取数据或将 结果写入数据文件. ...

  7. ubuntu中获取文件名称并生成txt文件

    简介: 在机器视觉学习过程中,通常会经常批量处理一些图片,在Ubuntu下可以使用find命令,来实现将文件名全部读取出来,生成列表txt文件,作为标签使用 (1)find命令格式如下: find / ...

  8. java怎样输出一个文件夹,java合并一个文件夹下所有txt文件,输出到另一个txt,...

    java合并一个文件夹下所有txt文件,输出到另一个txt,最近写了个单元测试,递归调用方法,把同一个文件夹里所有的txt合并输出到一个txt文件.参考了两个博客,分别是已有的方法,还有个就是检测tx ...

  9. python把csv文件转换txt_Python实现txt文件转csv格式

    码农公社  210.net.cn  210= 1024  10月24日一个重要的节日--码农(程序员)节 把txt文件转成成csv文件格式,通过手动打开excel文件,然后导入txt来生产csv文件. ...

最新文章

  1. SpringBoot学习笔记(3):静态资源处理
  2. USTC English Club Note20211110
  3. DNS中的七大资源记录介绍
  4. 一文看懂卷积神经网络CNN的核心
  5. 移动互联网APP测试流程及测试点(转载) (二)
  6. Linux下使用wget下载FTP服务器文件
  7. python函数定义及调用-Python函数的基本定义和调用以及内置函数
  8. Microsoft 365 for Mac(原Office 365)
  9. Vue导出页面为PDF格式,解决PDF中图片不显示(跨域)
  10. Incremental Learning of Object Detectors without Catastrophic Forgetting详解
  11. GNU ARM汇编--(二十)总结
  12. Python中and和or的运算规则,短路计算
  13. LC-3 指令集注释规范
  14. 笔记本电脑怎么写日记
  15. Cadence Allegro调整丝印技巧-先自动调整再手动微调图文教程及视频演示
  16. python人生的不同阶段_完整的人生有四个阶段,绝大多数人只活过一个阶段
  17. 在Power BI中对Error值进行替换
  18. 美国电商是如何用大数据玩转“双十一”的?
  19. BurnInTest
  20. SSM在线学习系统毕业设计源码131843

热门文章

  1. 一些值得关注的云计算资源
  2. 凯恩斯与艺术品投资的更大笨蛋理论
  3. SpringCloud整合Seata(Docker)版本异常处理
  4. 网络基础--ARP技术介绍
  5. centos7 安装Zabbix3.0
  6. dinic 最大流费用流模板
  7. 七牛云CNAME设置,七牛云绑定域名。
  8. 程序员的算法趣题Q25: 时髦的鞋带系法
  9. 怪物农场2修改日志3 - 年轮
  10. Python(第一章)版本介绍,环境准备及IDLE使用