MapReduce之InputFormat理解
一 InputFormat主要作用:
#验证job的输入规范
#对输入的文件进行切分,形成多个InputSplit文件,每一个InputSplit对应着一个map任务
#创建RecordReader,从InputSplit分片中读取数据供map使用
二 有几个比较重要的实现
2.1FileInputFormat: 主要用于处理文件的一个InputFormat类,它包括子类:
2.1.1 FiexedLengthInputFormat: 读取输入文件的固定长度的记录,这种文件不该是文本文件,二进制文件比较多
2.1.2KeyValueInputFormat: 用于读取普通文本文件,文件按照行分割,每一行由key和value组成,key 和 value的分隔符若没有指定,那么整行为key,value为空
2.1.3TextInputFormat: 文件按照行划分,key就是这一行在文件中的偏移量,value就是这一行文本
2.1.5SequenceFileInputFormat:
=>SequenceFileAsBinaryInputFormat:InputFormat从sequenceFile读取以二进制格式读取key和value. 他只能读取SequenceFile
=>SequenceFileAsTextInputFormat:以文本格式读取
2.1.6NLineInputFormat: 是可以将N行数据划分为一个Split,作为MapTask输入
2.2DBInputFormat: 主要用于处理数据库数据的InputFormat类
2.3CombineFileInputFormat:
我们知道文件切分的时候,FileInputFormat默认情况下,如果一个文件小于blockSize,那么这个文件就是一个InputSplits, 如果大于blockSize,则是先按照文件大小/blockSize,如果有剩余单独成为一个InputSplit。
但是如果小文件太多,那么就会生成的切片太多,从而导致Map任务增多,Map任务增多所需资源就多。所以CombineFileInputFormat通过合并多个小文件成为一个分片,分片过程也考虑同一节点,同一机架的数据本地性,让每一个Mapper任务可以处理更多的数据
三 CombineFileInputFormat详解
我们知道,InputFormat接口,需要我们自己提供RecordReader.
它的原理就是:
我们通常大文件都是使用FileSplit,但是现在是包含多个文件的split,我们需要使用CombineFileSplit。它会将输入多个数据文件(小文件)元数据全部包装到CombineFileSplit里面。因为小文件都是单独的一个Block文件,一个CombineFileSplit包含一组文件的Block信息,涉及到每个文件的偏移,长度,block位置等元数据.
在执行Map-Reduce的时候,需要读取文本数据,那么对于Combine
FileSplit,你需要处理其包含的小文件block,就要对应设置RecordReader,才能正确读取文件数据内容。
编程流程:
#实现一个RecordReader来读取CombineFileSplit包装的Block
#继承CombineFileInputFormat实现一个我们自定义的RecordReader的类,用来读取每个文件的数据
public class MyCombineFileRecordReader extends RecordReader<LongWritable, BytesWritable> {
private CombineFileSplit combineFileSplit;//封装了小文件的meta信息,诸如长度,offset以及块的信息
private LineRecordReader lineRecordReader = new LineRecordReader();//用于读取行数据
private Path[] paths; //小文件的输入路径
private int length;
private int currentIndex;
private float currentProgress = 0;
private LongWritable currentKey;
private BytesWritable currentValue = new BytesWritable();
public MyCombineFileRecordReader(CombineFileSplit combineFileSplit, TaskAttemptContext context, Integer index){
super();
this.combineFileSplit = combineFileSplit;
//当前要处理的小文件block在CombineFileSplit中的索引
this.currentIndex = index;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.combineFileSplit = (CombineFileSplit) split;
this.paths = combineFileSplit.getPaths();
this.length = paths.length;
// 处理CombineFileSplit中的一个小文件Block,因为使用LineRecordReader,
//需要构造一个FileSplit对象,然后才能够读取数据
//当前小文件的路径
Path path = combineFileSplit.getPath(currentIndex);
//当前小文件的偏移量
long start = combineFileSplit.getOffset(currentIndex);
//当前小文件的长度
long length = combineFileSplit.getLength(currentIndex);
//小文件block所在的节点信息
String[] hosts = combineFileSplit.getLocations();
FileSplit fileSplit = new FileSplit(path, start, length, hosts);
lineRecordReader.initialize(fileSplit, context);
context.getConfiguration().set("map.input.file.name", path.getName());
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return currentIndex >= 0&& currentIndex < length && lineRecordReader.nextKeyValue();
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return lineRecordReader.getCurrentKey();
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
byte[] contents = lineRecordReader.getCurrentValue().getBytes();
int len = contents.length;
currentValue.set(contents, 0, len);
return currentValue;
}
@Override
public float getProgress() throws IOException, InterruptedException {
if (currentIndex >= 0&& currentIndex < length) {
currentProgress = (float) currentIndex / length;
return currentProgress;
}
return currentProgress;
}
@Override
public void close() throws IOException {
lineRecordReader.close();
}
}
public class MyCombineFileInputFormat extends CombineFileInputFormat<LongWritable,BytesWritable> {
@Override
public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
CombineFileSplit combineFileSplit = (CombineFileSplit) split;
CombineFileRecordReader<LongWritable,BytesWritable> recordReader =
new CombineFileRecordReader<LongWritable, BytesWritable>(combineFileSplit, context, MyCombineFileRecordReader.class);
try {
recordReader.initialize(combineFileSplit, context);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return recordReader;
}
}
MapReduce之InputFormat理解相关推荐
- 大数据培训之核心知识点Hbase、Hive、Spark和MapReduce的概念理解、特点及机制等
今天,上海尚学堂大数据培训班毕业的一位学生去参加易普软件公司面试,应聘的职位是大数据开发.面试官问了他10个问题,主要集中在Hbase.Spark.Hive和MapReduce上,基础概念.特点.应用 ...
- MapReduce之InputFormat、OutputFormat(三)
文章目录 1. 祝大家中秋节快乐 2. MapReduce进阶 2.1 MapReduce类型 2.2 MapReduce输入格式 2.2.1 InputFormat接口 2.2.2 InputFor ...
- MapReduce优劣,理解MapReduce与Hadoop
MapReduce是一种计算模型,用于大规模数据集(大于1TB)的并行运算.概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程 ...
- MapReduce之RecordReader理解
RecordReader:其作用就是将数据切分成key/value的形式然后作为输入传给Mapper. 一 方法分析: 1.1initialize: 初始化RecordReader,只能被调用一次. ...
- MapReduce之OutputFormat理解
一 OutputFormat作用 1校验job中指定输出路径是否存在 2将结果写入输出文件 二 OutputFormat的实现 2.1DBOutputFormat: 发送Reduce结果到SQL表中 ...
- MapReduce之RecordWriter理解
RecordWriter:其实主要就是负责将task的key/value结果写入内存或者磁盘 一 方法分析 1.1 write:写key/value键值对 1.2 close: 关闭RecordWri ...
- hadoop之MapReduce学习教程
hadoop之MapReduce学习 MapReduce概述 MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是用户开发"基于Hadoop的数据分析应用" ...
- JAVA大数据(二) Hadoop 分布式文件系统HDFS 架构,MapReduce介绍,Yarn资源调度
文章目录 1.分布式文件系统HDFS 1.HDFS的来源 2.HDFS的架构图之基础架构 2.1 master/slave 架构 2.2 名字空间(NameSpace) 2.3 文件操作 2.4副本机 ...
- 【大数据实验】06:MapReduce操作
MapReduce操作 OVERVIEW MapReduce操作 实验环境 一.WordCount单词计数 1.实验内容 2.实验原理 3.实验步骤 (1)启动Hadoop集群 (2)准备数据文件 ( ...
最新文章
- jQuery 2.0.3 源码分析core - 整体架构
- jQuery mobile 中div圆角弹出层
- C语言求积标识符,《C语言程序设计》模拟试卷四.doc
- python 货币合适_算法之Python实现 - 001 : 换钱的最少货币数
- windows控制台中文乱码解决方法
- yb3防爆电机型号含义_【产品信息】防爆充电机
- 【web前端】table的border属性解析(内联样式表和内部样式表中的区别)
- Python和Java哪个更好找工作?
- 【STM32H7教程】第39章 STM32H7的DMAMUX基础知识(重要)
- 关于CUDA,cuDNN,TF,CUDA驱动版本兼容问题
- Pytorch训练SSD网络时遇到的问题
- element-ui图标显示不出来问题
- 【无标题】SONET基本术语
- IDEA2022配置Tomcat服务器教程(超细致版)
- linux so lazyload,linux函数深入探索——open函数打开文件是否将文件内容加载到内存空间...
- 关于特殊后缀名如vue vm less等文件在DW中高亮显示并且代码提示的解决方案
- IT行业基础知识:云计算到底是什么
- oracle查询值的字符串长度、字节长度、大小写字母转换
- [BZOJ5020][THUWC 2017]在美妙的数学王国中畅游(LCT + 一点数学知识)
- 2015-11-4 html记事簿
热门文章
- iis10.0 php多版本,IIS7 IIS8 中多个版本php共存的方法
- python查找指定文件夹并重命名_python获取指定文件夹下的所有文件名,并删选指定类型文件进行重命名以及撤销重命名...
- 二甲医院云服务器,医院用上云计算 病情上传到云端可行否?
- linux怎么改目录位置,Linux下更改MySQL数据目录位置具体操作方法
- MySQL InnoDB 存储引擎文件
- swagger2使用步骤
- 带有哨兵的双向循环链表
- java volatile关键字的作用_java volatile关键字作用及使用场景详解
- 网页制作 css样式,网页设计与制作-CSS样式.ppt
- python3实用编程技巧_9.python3实用编程技巧进阶(四)