十六、FileInputFormat介绍,切片源码分析
一、InputFormat介绍
InputFormat,从单词意思解读分为输入、格式,也就是数据来源与加载数据的方式是决定MR编程的map阶段的任务并行度。
数据来源划分:其实也就是他的子类,由于我目前只使用了如下三种方式,其实还有很多子类。
HLogInputFormat:从hbase加载数据编写mr程序计算
FileInputFormat:主要从hdfs或本地加载数据
自定义实现:可以编写从mysql或oracle中加载数据
InputFormat它是一个抽象类,定义了获取切片与切片划分的抽象函数,具体工作有它的子类来完成。
public abstract class InputFormat<K, V> {public InputFormat() {}//切片:确定每个片的大小public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;//创建数据片的数据加载:读取当前切片的数据public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}
什么是切片?
大家看到了上面提到的切片,相信会疑问什么是切片,它有什么意义?
大家都知道MR是分布是计算,那么它肯定是可以并行多个MapTask,但是MapTask的并行度到底是多少这就不清楚了,如下问题:
思考:1G的数据,启动8个MapTask,可以提高集群的并发处理能力。
那么1K的数据,也启动8个MapTask,会提高集群性能吗?MapTask
并行任务是否越多越好呢?哪些因素影响了MapTask并行度?
MapTask并行度决定机制:
数据块:Block是HDFS物理上把数据分成一块一块。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。
具体切片工作机制请看下图:
逻辑分片: 假设某个文件有200M,默认按照128M切片,那么就开启两个maptask,一个加载0-128M的数据,一个加载129M-200M的数据。并不是将数据切分为两个文件,这就叫逻辑分片,也就是逻辑切片。
二、FileInputFormat 介绍
相信编写mr程序的都不陌生,它是加载hdfs/本地数据的输入格式,其实它也是一个抽象类,mr运行时具体工作由它子类完成。但是它相比InputFormat完善了一些工作。InputFormat只是定义了数据切片,FileInputFormat完善了具体切片方式,下面就来介绍FileInputFormat切片机制。
1、FileInputFormat切片机制
(1)简单地按照文件的内容长度进行切片
(2)切片大小,默认等于Block大小
(3)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
具体切片过程看上图
2、切片大小设置
1)源码中默认计算切片公式
Math.max(minSize, Math.min(maxSize, blockSize));
mapreduce.input.fileinputformat.split.minsize=1 默认值为1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue因此,默认情况下,切片大小=blocksize。
完整代码:protected long computeSplitSize(long blockSize, long minSize, long maxSize) {return Math.max(minSize, Math.min(maxSize, blockSize));}
(2)切片大小设置
maxsize(切片最大值):参数如果调得比blockSize小,则会让切片变小,而且就等于配置的这个参数的值。minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blockSize还大。
(3)获取切片信息API
// 获取切片的文件名称
String name = inputSplit.getPath().getName();
// 根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
3、FileInputFormat实现类
4、源码分析注释
//分片机制
public List<InputSplit> getSplits(JobContext job) throws IOException {Stopwatch sw = (new Stopwatch()).start();long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));long maxSize = getMaxSplitSize(job);//分的每个片集合List<InputSplit> splits = new ArrayList();//获取当前路径下的所有文件List<FileStatus> files = this.listStatus(job);//迭代每个文件Iterator i$ = files.iterator();while(true) {while(true) {while(i$.hasNext()) {FileStatus file = (FileStatus)i$.next();Path path = file.getPath();long length = file.getLen();//判断文件是否有为空if (length != 0L) {//文件不为空BlockLocation[] blkLocations;if (file instanceof LocatedFileStatus) {blkLocations = ((LocatedFileStatus)file).getBlockLocations();} else {FileSystem fs = path.getFileSystem(job.getConfiguration());blkLocations = fs.getFileBlockLocations(file, 0L, length);}//判断文件是否分片if (this.isSplitable(job, path)) {long blockSize = file.getBlockSize();long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);long bytesRemaining;int blkIndex;//遍历为文件分片,按照字节偏移量for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));}//如果文件大小不是片的倍数,则余下一块比块小的部分单独作为一片if (bytesRemaining != 0L) {blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));}} else {//如果文件过小则不用分片splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));}} else {//splits.add(this.makeSplit(path, 0L, length, new String[0]));}}job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size());sw.stop();if (LOG.isDebugEnabled()) {LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis());}return splits;}}}
1、获取路径下所有文件
2、迭代处理每个文件
3、判断文件大小是否为空,为空则创建一个片信息
4、文件不为空则判断文件是否需要分片
5、文件不为空且小于128M则直接作为一个片
6、文件大于128M,则按照字节偏移量为文件分片
7、文件分片完成后最后一部分小于128M的数据单独作为一片
十六、FileInputFormat介绍,切片源码分析相关推荐
- Hadoop MapReduce Splits 切片源码分析及切片机制
本文从Job提交,逐步分析Splits相关源码. 数据块:Block是HDFS物理上把数据分成一块一块的. 数据切片:数据切片只是在物理上输入进行分片,并不会在磁盘上将其分成片进行存储. 文件路径 o ...
- 大数据之-Hadoop3.x_MapReduce_切片源码分析---大数据之hadoop3.x工作笔记0104
然后我们开始来看看,切片的源码,看看到底是怎么切片的 然后我们继续调试,我们快速走就可以了,上节,已经调试一遍了. 然后进入job提交方法
- SpringBoot实现过滤器、拦截器与切片源码分析
过滤器Filter 过滤器概念 Filter是J2E中来的,可以看做是Servlet的一种"加强版",它主要用于对用户请求进行预处理和后处理,拥有一个典型的处理链.Filter也可 ...
- FileInputFormat切片源码解析
文章目录 FileInputFormat切片源码解析 1.MapTask并行度决定机制 2.源码步骤 3.FileInputFormat切片机制 3.1 源代码中计算切片大小的公式 3.2 获取切片信 ...
- Stable Diffusion 原理介绍与源码分析(一)
Stable Diffusion 原理介绍与源码分析(一) 文章目录 Stable Diffusion 原理介绍与源码分析(一) 前言(与正文无关,可以忽略) 总览 说明 Stable Diffusi ...
- GAT 算法原理介绍与源码分析
GAT 算法原理介绍与源码分析 文章目录 GAT 算法原理介绍与源码分析 零. 前言 (与正文无关, 请忽略) 广而告之 一. 文章信息 二. 核心观点 三. 核心观点解读 四. 源码分析 4.1 G ...
- Redis 的 Sentinel哨兵介绍与源码分析(1):初始化部分
http://www.redis.cn/topics/sentinel.html redis-6.0.8 本文是在官方中文文档的基础上进行的源码分析,其中包含完整的原文,并在此基础上,添加源码介绍. ...
- SDIO_WiFi驱动学习之SDIO架构介绍及源码分析
一.引言 因为WiFi驱动比较复杂,所以WiFi驱动的博客将多分几篇来写. 本篇博客主要介绍Linux下的SDIO架构及源码分析. 本文部分内容摘抄自网络,若有侵权,请联系删除. 二.SDIO WiF ...
- ThreadLocal介绍以及源码分析
ThreadLocal 线程主变量 前面部分引用其他优秀博客,后面源码自己分析的,如有冒犯请私聊我. 用Java语言开发的同学对 ThreadLocal 应该都不会陌生,这个类的使用场景很多,特别是在 ...
最新文章
- 跨域资源共享 CORS
- Linux怎么查询全部容器时间,docker容器与Linux主机环境获取时间不一致
- DPDK — OvS-DPDK
- 当你再面对大多数需求时能够说这些问题我以前做过,那你就。。。
- 出现“Could not resolve host: www.github.com; Unknown error”错误解决
- 记录每次更新到仓库 —— Git 学习笔记 10
- Net Core平台灵活简单的日志记录框架NLog+Mysql组合初体验
- 【MySQL】基于MySQL的SQL核心语法实战演练(三)
- tftp工具_tftp,tftp等八款最佳的FTP客户端工具
- snakeyaml java_JAVA使用SnakeYAML解析与序列化YAML
- 常见摄像机外部接口类型
- 20155304《网络对抗》信息搜集与漏洞扫描
- 支付宝退款,支付宝提现转账
- CSS 教程(全)+代码
- iso 开发学习--简易音乐播放器(基于iPhone4s屏幕尺寸)
- 台式计算机怎么连接蓝牙 win10,win10台式电脑蓝牙怎么开启(开启电脑蓝牙的步骤图)...
- 【Tool】资料搜索:百度网盘资料检索
- mysql master sevler_零零星星
- hdu 5755 Gambler Bo【gauss】
- 阿里五年测试工程师的一些小建议,让你少走弯路
热门文章
- 为什么越来越多人觉得“元宇宙”是个骗局?
- php include path pear,安装PHP程序提示“include_path=.;c:/php5/pear”解决办法
- kafka 单机配置外网无法访问
- 点成分享丨细胞培养三步骤——复苏、传代、冻存
- Alfred安装与使用
- 机器学习:支持向量机(SVM)
- 唯教育标准化和教育标准化,真可谓:差之毫厘,谬以千里
- 2018社区计算机考试题,2018年社区工作者考试模拟题―行测全卷试题(4.10)
- Web前端发展方向有哪些?可以做什么岗位?
- 记录自己Flutter配环境失误