全网最详细SpringBatch批处理读取分区(Paratition)文件讲解
文章目录
- 一、分区Step
- 1、数据分区
- 2、分区处理
- 二、实现分区关键接口
- 1、Partitioner
- 2、StepExecutionSplitter
- 3、PartitionHandler
- 三、基本配置和属性说明
- 1、基本配置
- 2、属性说明
- 四、文件分区
- 1、定义分区文件Partitioner
- 2、定义文件读
- 3、定义分区job配置
- 4、定义processor
- 4、定义writer
- 4、定义step监听器
- 6、运行job
写在前面: 我是「境里婆娑」。我还是从前那个少年,没有一丝丝改变,时间只不过是考验,种在心中信念丝毫未减,眼前这个少年,还是最初那张脸,面前再多艰险不退却。
写博客的目的就是分享给大家一起学习交流,如果您对 Java感兴趣,可以关注我,我们一起学习。
前言:为什么要写这篇文章,在网上很难找到一篇关于SpringBatch批处理读取分区文件基于JavaBean配置的文章,因此我决定写一篇关于SpringBatch读取分区文件基于javaBean配置的文章,希望这篇文章可以帮助新手的你或者你有一定经验的可以加深印象。
一、分区Step
何为分区Step:
通过将任务进行分区,不同的Step处理不同任务数据达到提高Job效率功能。
分区作业可以分区两个处理阶段,数据分区、分区处理;
1、数据分区
数据分区:根据特殊的规则,将数据进行合理分片,为不同的数据切片生成数据执行上下文Execution Context、作业执行器Step Execution。可以通过接口Partitioner生成自定义分区逻辑,SpringBatch批处理框架默认对多文件实现MultiResourcePartititoner;也可以自行扩展接口Partitioner实现自定义分区逻辑。
2、分区处理
分区处理:通过数据分区后,不同的数据已经被分配到不同的作业执行器中,接下来需要交给分区处理器进行作业,分区处理器可以在本地或远程执行被划分的作业。接口PartitionHandler定义了分区处理逻辑,SpringBatch批处理框架默认实现了本地分区处理TaskExecutorPartitionHandler;也可以自行扩展接口PartitionHandler来实现自定义分区逻辑。
分区作业逻辑结构图:
二、实现分区关键接口
实现分区关键接口有如下:PartitionHandler、StepExecutionSplitter、Partitioner。
1、Partitioner
Partitoner接口定义了如何根据给定的分区规则进行创建作业执行分区的上下文。
Partitioner接口定义如下:
public interface Partitioner {Map<String, ExecutionContext> partition(int gridSize);
}
gridSize含义:根据给定的gridSize大小进行执行上下文划分。
2、StepExecutionSplitter
StepExecutionSplitter接口定义了如何根据给定的分区规则进行创建作业执行分区的执行器。
StepExecutionSplitter接口定义如下
public interface StepExecutionSplitter {String getStepName();Set<StepExecution> split(StepExecution stepExecution, int gridSize) throws JobExecutionException;
}
getStepName:获取当前定义的分区作业的名称。
split:根据给定的分区规则为每个分区生成对应的分区执行器。
3、PartitionHandler
PartitionHandler接口定义了分区处理的逻辑,根据给定的StepExecutionSplitter进行分区并执行,最后将执行的结果进行收集,反馈给前端。
PartitionHandler接口定义如下
public interface PartitionHandler {Collection<StepExecution> handle(StepExecutionSplitter stepSplitter, StepExecution stepExecution) throws Exception;
}
三、基本配置和属性说明
上面两节基本知识已经介绍完毕,下面我们将讲一个例子来巩固之前知识。
1、基本配置
一个典型分区Job配置
@Beanpublic Step partitionMasterMultiFileStep() {return stepBuilderFactory.get("partitionMasterMultiFileStep").partitioner(partitionSlaveMultiFileStep().getName(),multiResourcePartitioner()).partitionHandler(multiFilePartitionHandler()).build();}
2、属性说明
在配置分区Step之前,我们先看下分区Step的主要属性定义和元素定义
属性 | 说明 |
---|---|
step | 用于指定分区step名称 |
handler(属性) | 属性handler指定分区执行器,需要实现接口PartitionHandler |
handler(子元素) | 用于定义默认实现:TaskExecutorPartitionHandler |
task-executor | 生命使用的线程池 |
grid-size | 声明分区的HashMap的初始值大小 |
四、文件分区
SpringBatch框架提供了对文件分区的支持,实现类:MultiResourcePartitioner提供了对文件分区的默认支持,根据文件名将不同文件处理进行分区,提升处理速度和效率。本文将按照此例子给出如何配置多文件分区实现。
读取文件如下:
本节实例由于文件多,我们对文件进行分区,然后将文件的内容写入DB,逻辑示意图如下:
1、定义分区文件Partitioner
定义文件分区,将不同的文件分配到不同的作业中,使用自定义MyMultiResourcePartitioner分区。
自定义分区MyMultiResourcePartitioner如下:
/*** @author shuliangzhao* @date 2020/12/4 23:14*/
public class MyMultiResourcePartitioner implements Partitioner {private static final String DEFAULT_KEY_NAME = "fileName";private static final String PARTITION_KEY = "partition";private Resource[] resources = new Resource[0];private String keyName = DEFAULT_KEY_NAME;public void setResources(Resource[] resources) {this.resources = resources;}public void setKeyName(String keyName) {this.keyName = keyName;}@Overridepublic Map<String, ExecutionContext> partition(int gridSize) {Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>(gridSize);int i = 0;for (Resource resource : resources) {ExecutionContext context = new ExecutionContext();Assert.state(resource.exists(), "Resource does not exist: "+resource);try {context.putString(keyName, resource.getURI().getPath());}catch (IOException e) {throw new IllegalArgumentException("File could not be located for: "+resource, e);}map.put(PARTITION_KEY + i, context);i++;}return map;}
}
属性keyName:用于指定作业上文中属性名字,作用是在不同的作业上下文中可以获取设置的对于属性值。可以在读写阶段通过@Value("#{stepExecutionContext[fileName]}"方式获取。
2、定义文件读
配置好分区实现,需要在每个分区作业中读入不同文件,进而提供文件处理效率。
PartitionMultiFileReader 实现
public class PartitionMultiFileReader extends FlatFileItemReader {public PartitionMultiFileReader(Class clz,String fileName) {setResource(new FileSystemResource(fileName.substring(1)));Field[] declaredFields = clz.getDeclaredFields();List<String> list = new ArrayList<>();for (Field field:declaredFields) {list.add(field.getName());}String[] names = new String[list.size()];DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();delimitedLineTokenizer.setDelimiter(",");delimitedLineTokenizer.setNames(list.toArray(names));DefaultLineMapper defaultLineMapper = new DefaultLineMapper();defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);CommonFieldSetMapper commonFieldSetMapper = new CommonFieldSetMapper();commonFieldSetMapper.setTargetType(clz);defaultLineMapper.setFieldSetMapper(commonFieldSetMapper);setLineMapper(defaultLineMapper);setName(clz.getSimpleName());}
}
3、定义分区job配置
基于javabean方式实现job配置
package com.sl.config;
//包导入省略/*** @author shuliangzhao* @Title: PartitionFileConfiguration* @ProjectName spring-boot-learn* @Description: TODO* @date 2020/12/4 21:09*/
@Configuration
@EnableBatchProcessing
public class PartitionMultiFileConfiguration {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate PartitonMultiFileProcessor partitonMultiFileProcessor;@Autowiredprivate PartitionMultiFileWriter partitionMultiFileWriter;@Beanpublic Job partitionMultiFileJob() {return jobBuilderFactory.get("partitionMultiFileJob").start(partitionMasterMultiFileStep()).build();}@Beanpublic Step partitionMasterMultiFileStep() {return stepBuilderFactory.get("partitionMasterMultiFileStep").partitioner(partitionSlaveMultiFileStep().getName(),multiResourcePartitioner()).partitionHandler(multiFilePartitionHandler()).build();}@Beanpublic PartitionHandler multiFilePartitionHandler() {TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();handler.setGridSize(2);handler.setStep(partitionSlaveMultiFileStep());handler.setTaskExecutor(new SimpleAsyncTaskExecutor());return handler;}@Beanpublic Step partitionSlaveMultiFileStep() {return stepBuilderFactory.get("partitionSlaveMultiFileStep").<CreditBill,CreditBill>chunk(1).reader(partitionMultiFileReader(null)).processor(partitonMultiFileProcessor).writer(partitionMultiFileWriter).build();}@Bean@StepScopepublic PartitionMultiFileReader partitionMultiFileReader(@Value("#{stepExecutionContext[fileName]}")String fileName) {return new PartitionMultiFileReader(CreditBill.class,fileName);}@Beanpublic MyMultiResourcePartitioner multiResourcePartitioner() {MyMultiResourcePartitioner multiResourcePartitioner = new MyMultiResourcePartitioner();multiResourcePartitioner.setKeyName("fileName");multiResourcePartitioner.setResources(getResource());return multiResourcePartitioner;}private Resource[] getResource() {String filePath = "D:\\aplus\\bill\\";File file = new File(filePath);List<Resource> resourceList = new ArrayList<>();if (file.isDirectory()) {String[] list = file.list();if (list != null) {for (String str : list) {String resource = file.getPath() + "\\" + str;FileSystemResource fileSystemResource = new FileSystemResource(resource);resourceList.add(fileSystemResource);}}}Resource[] resources = new Resource[resourceList.size()];return resourceList.toArray(resources);}}
4、定义processor
定义processor
/*** @author shuliangzhao* @date 2020/12/4 22:11*/
@Component
@StepScope
public class PartitonMultiFileProcessor implements ItemProcessor<CreditBill,CreditBill> {@Overridepublic CreditBill process(CreditBill item) throws Exception {CreditBill creditBill = new CreditBill();creditBill.setAcctid(item.getAcctid());creditBill.setAddress(item.getAddress());creditBill.setAmout(item.getAmout());creditBill.setDate(item.getDate());creditBill.setName(item.getName());return creditBill;}
}
4、定义writer
/*** @author shuliangzhao* @date 2020/12/4 22:29*/
@Component
@StepScope
public class PartitionMultiFileWriter implements ItemWriter<CreditBill> {@Autowiredprivate CreditBillMapper creditBillMapper;@Overridepublic void write(List<? extends CreditBill> items) throws Exception {if (items != null && items.size() > 0) {items.stream().forEach(item -> {creditBillMapper.insert(item);});}}
}
4、定义step监听器
定义step监听器目的是在处理作业之前打印线程名字和读取文件名字
@Component
public class PartitionStepListener implements StepExecutionListener {private static final Logger logger = LoggerFactory.getLogger(PartitionStepListener.class);@Overridepublic void beforeStep(StepExecution stepExecution) {logger.info("ThreadName={},steName={},FileName={}",Thread.currentThread().getName(),stepExecution.getStepName(),stepExecution.getExecutionContext().getString("fileName"));}@Overridepublic ExitStatus afterStep(StepExecution stepExecution) {return null;}
}
6、运行job
执行job查看结果,可以看出不同的文件有不同的线程来处理,并且被分配到不同的分区作业步中执行
2020-12-05 15:58:34.100 INFO 13208 --- [cTaskExecutor-1] com.sl.listener.PartitionStepListener : ThreadName=SimpleAsyncTaskExecutor-1,steName=partitionSlaveMultiFileStep:partition1,FileName=/D:/aplus/bill/bill2.csv
2020-12-05 15:58:34.114 INFO 13208 --- [cTaskExecutor-3] com.sl.listener.PartitionStepListener : ThreadName=SimpleAsyncTaskExecutor-3,steName=partitionSlaveMultiFileStep:partition0,FileName=/D:/aplus/bill/bill1.csv
2020-12-05 15:58:34.122 INFO 13208 --- [cTaskExecutor-2] com.sl.listener.PartitionStepListener : ThreadName=SimpleAsyncTaskExecutor-2,steName=partitionSlaveMultiFileStep:partition2,FileName=/D:/aplus/bill/bill3.csv
至此,我们完成了对文件分区的处理。
如果想更详细查看以上所有代码请移步到github:文件分区详细代码
全网最详细SpringBatch批处理读取分区(Paratition)文件讲解相关推荐
- 全网最详细SpringBatch读(Reader)混合文件讲解
文章列表 一.读混合记录文件 1.FieldSetMapper实现 2.LineTokenizer实现 3.混合读reader实现类 4.读混合文件job配置 5.读混合文件processor 6.读 ...
- 全网最详细SpringBatch读(Reader)跨多行文件讲解
文章列表 读记录跨多行文件 1.读跨多行文件job配置 2.读跨多行文件reader 3.自定义FieldSetMapper 4.读跨多行文件processor 5.读跨多行文件writer 写在前面 ...
- 全网最详细 Python如何读取NIFTI格式图像(.nii文件)和 .npy格式文件和pkl标签文件内容
在医学图像处理中,我们经常使用一种NIFTI格式图像(.nii文件),现在我们来看看 什么是.nii文件? 该如何读取.nii文件? 1. NIFTI格式图像 什么是NIFTI(Neuroimagin ...
- 全网最详细笔记:张益唐北大讲解火热出炉!本质上已证明「零点猜想」
视学算法报道 编辑:编辑部 [导读]关于「零点猜想」问题,大海里的针我没捞到, 但海底地貌我探得差不多了. 一支马克笔,一张小白板. 刚刚,张益唐教授现身北大,在B站的直播平台上,给广大网友上 ...
- 全网最详细笔记:张益唐北大讲解火热出炉!证明「零点猜想」!
本文 新智元 公众号授权 编辑部 [导读]关于「零点猜想」问题,大海里的针我没捞到, 但海底地貌我探得差不多了. 一支马克笔,一张小白板. 刚刚,张益唐教授现身北大,在B站的直播平台上,给广大网友上 ...
- SpringBatch批处理框架入门(二)
这篇文章接上一篇SpringBatch批处理框架入门(一),继续讲解SpringBatch基础知识. 目录 SpringBatch 核心类介绍 SpringBatch 核心类Job SpringBat ...
- “是男人就下一百层”h5游戏全网最详细教学、全代码,js操作
"是男人就下一百层"h5游戏全网最详细教学.全代码,js操作 博主的话 游戏展示 编程工具介绍 游戏代码 代码讲解 js 第一步 切换div的显示与隐藏 js 第二步 在菜单页面用 ...
- 全网超详细的VMware虚拟机安装Kali Linux系统以及首次启动Kali Linux系统的注意事项
文章目录 1. 简述Kali Linux 2. 下载Kali Linux的镜像文件 3. 安装Kali Linux 4. 首次启动Kali Linux 5. 其他方法安装Kali Linux 1. 简 ...
- 玩机搞机---全网最详细的手机全机型 刷机教程 二
接上篇 玩机搞机---全网最详细的手机全机型 刷机教程一 玩机搞机---mtk芯片机型线刷救砖的一些基本解决方法和步骤解析 mtk报错代码 SP_Flash平台刷机 超详细的三星全系列机型线刷图文教程 ...
最新文章
- codeforces37C
- 【软件工程】CMMI 能力成熟度模型集成 ( 简介 | 相关术语 | CMMI 等级评估次序 )
- 什么是Vue?Vue的工作原理是什么?
- php 下载限制,php实现限制文件下载速度的代码实例
- 鸿蒙2.0都来了,快搭个环境玩起来吧!
- Gartner:容器采用将迅速增长,但不会很快有利可图
- python manage.py syncdb Unknown command: 'syncdb'问题解决方法
- 美国影视演员协会选择了Windows Azure
- 张小龙Linux微信,微信至今没有黑暗模式,原来是张小龙“全责”?
- Spring Cloud Hystrix 进行服务熔断设置时,报错找不到对应的服务熔断方法
- 26个数据分析案例——第三站:基于python的药店销售数据分析
- 计算机科学与技术代码0812,一级学科代码及名称0812计算机科学与技术(2007年)本.doc...
- 山地车中轴进水表现_4种自行车中轴的拆卸和保养方法
- 银行卡卡号归属地汇总
- VS Code:推荐插件 - HTML格式化(包括JS、CSS)
- 连通图 P3387 缩点 模板
- Oracle SQL:update更新语句总结
- android 友盟微信授权2002,友盟 2002错误
- 3.4 Docker最新入门教程-Docker入门-共享应用程序
- python爬取小说章节_python之如何爬取一篇小说的第一章内容
热门文章
- 本地图片转base64_从一道面试题说起:GET 请求能传图片吗?
- python循环语句打印矩形_Python中使用循环语句打印三角形、菱形
- 趣学python3(7)-循环语句(1)
- 【Python】7种方案,彻底实现可视化图片大小/分辨率控制自由
- 【Python】机器学习绘图神器Matplotlib首秀!
- 【Python相关】Vaex :突破pandas,快速分析100GB大数据集
- 【竞赛经验分享】2020腾讯广告算法大赛:如何突破分数瓶颈?
- AI For Everyone:Andrew Ng想用30分钟的非技术课程传达的内容
- 爸,这下你还敢抽烟么?
- 大盘点|卷积神经网络必读的 100 篇经典论文,包含检测 / 识别 / 分类 / 分割多个领域