一 InputFormat主要作用:

#验证job的输入规范

#对输入的文件进行切分,形成多个InputSplit文件,每一个InputSplit对应着一个map任务

#创建RecordReader,从InputSplit分片中读取数据供map使用

二 有几个比较重要的实现

2.1FileInputFormat: 主要用于处理文件的一个InputFormat类,它包括子类:

2.1.1 FiexedLengthInputFormat: 读取输入文件的固定长度的记录,这种文件不该是文本文件,二进制文件比较多

2.1.2KeyValueInputFormat: 用于读取普通文本文件,文件按照行分割,每一行由key和value组成,key 和 value的分隔符若没有指定,那么整行为key,value为空

2.1.3TextInputFormat: 文件按照行划分,key就是这一行在文件中的偏移量,value就是这一行文本

2.1.5SequenceFileInputFormat:

=>SequenceFileAsBinaryInputFormat:InputFormat从sequenceFile读取以二进制格式读取key和value. 他只能读取SequenceFile

=>SequenceFileAsTextInputFormat:以文本格式读取

2.1.6NLineInputFormat: 是可以将N行数据划分为一个Split,作为MapTask输入

2.2DBInputFormat: 主要用于处理数据库数据的InputFormat类

2.3CombineFileInputFormat:

我们知道文件切分的时候,FileInputFormat默认情况下,如果一个文件小于blockSize,那么这个文件就是一个InputSplits, 如果大于blockSize,则是先按照文件大小/blockSize,如果有剩余单独成为一个InputSplit。

但是如果小文件太多,那么就会生成的切片太多,从而导致Map任务增多,Map任务增多所需资源就多。所以CombineFileInputFormat通过合并多个小文件成为一个分片,分片过程也考虑同一节点,同一机架的数据本地性,让每一个Mapper任务可以处理更多的数据

三 CombineFileInputFormat详解

我们知道,InputFormat接口,需要我们自己提供RecordReader.

它的原理就是:

我们通常大文件都是使用FileSplit,但是现在是包含多个文件的split,我们需要使用CombineFileSplit。它会将输入多个数据文件(小文件)元数据全部包装到CombineFileSplit里面。因为小文件都是单独的一个Block文件,一个CombineFileSplit包含一组文件的Block信息,涉及到每个文件的偏移,长度,block位置等元数据.

在执行Map-Reduce的时候,需要读取文本数据,那么对于Combine

FileSplit,你需要处理其包含的小文件block,就要对应设置RecordReader,才能正确读取文件数据内容。

编程流程:

#实现一个RecordReader来读取CombineFileSplit包装的Block

#继承CombineFileInputFormat实现一个我们自定义的RecordReader的类,用来读取每个文件的数据

public class MyCombineFileRecordReader extends RecordReader<LongWritable, BytesWritable> {

private CombineFileSplit combineFileSplit;//封装了小文件的meta信息,诸如长度,offset以及块的信息

private LineRecordReader lineRecordReader = new LineRecordReader();//用于读取行数据

private Path[] paths; //小文件的输入路径

private int length;

private int currentIndex;

private float currentProgress = 0;

private LongWritable currentKey;

private BytesWritable currentValue = new BytesWritable();

public MyCombineFileRecordReader(CombineFileSplit combineFileSplit, TaskAttemptContext context, Integer index){

super();

this.combineFileSplit = combineFileSplit;

//当前要处理的小文件block在CombineFileSplit中的索引

this.currentIndex = index;

}

@Override

public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

this.combineFileSplit = (CombineFileSplit) split;

this.paths = combineFileSplit.getPaths();

this.length = paths.length;

// 处理CombineFileSplit中的一个小文件Block,因为使用LineRecordReader,

//需要构造一个FileSplit对象,然后才能够读取数据

//当前小文件的路径

Path path = combineFileSplit.getPath(currentIndex);

//当前小文件的偏移量

long start = combineFileSplit.getOffset(currentIndex);

//当前小文件的长度

long length = combineFileSplit.getLength(currentIndex);

//小文件block所在的节点信息

String[] hosts = combineFileSplit.getLocations();

FileSplit fileSplit = new FileSplit(path, start, length, hosts);

lineRecordReader.initialize(fileSplit, context);

context.getConfiguration().set("map.input.file.name", path.getName());

}

@Override

public boolean nextKeyValue() throws IOException, InterruptedException {

return currentIndex >= 0&& currentIndex < length && lineRecordReader.nextKeyValue();

}

@Override

public LongWritable getCurrentKey() throws IOException, InterruptedException {

return lineRecordReader.getCurrentKey();

}

@Override

public BytesWritable getCurrentValue() throws IOException, InterruptedException {

byte[] contents = lineRecordReader.getCurrentValue().getBytes();

int len = contents.length;

currentValue.set(contents, 0, len);

return currentValue;

}

@Override

public float getProgress() throws IOException, InterruptedException {

if (currentIndex >= 0&& currentIndex < length) {

currentProgress = (float) currentIndex / length;

return currentProgress;

}

return currentProgress;

}

@Override

public void close() throws IOException {

lineRecordReader.close();

}

}

public class MyCombineFileInputFormat extends CombineFileInputFormat<LongWritable,BytesWritable> {

@Override

public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {

CombineFileSplit combineFileSplit = (CombineFileSplit) split;

CombineFileRecordReader<LongWritable,BytesWritable> recordReader =

new CombineFileRecordReader<LongWritable, BytesWritable>(combineFileSplit, context, MyCombineFileRecordReader.class);

try {

recordReader.initialize(combineFileSplit, context);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

return recordReader;

}

}

MapReduce之InputFormat理解相关推荐

  1. 大数据培训之核心知识点Hbase、Hive、Spark和MapReduce的概念理解、特点及机制等

    今天,上海尚学堂大数据培训班毕业的一位学生去参加易普软件公司面试,应聘的职位是大数据开发.面试官问了他10个问题,主要集中在Hbase.Spark.Hive和MapReduce上,基础概念.特点.应用 ...

  2. MapReduce之InputFormat、OutputFormat(三)

    文章目录 1. 祝大家中秋节快乐 2. MapReduce进阶 2.1 MapReduce类型 2.2 MapReduce输入格式 2.2.1 InputFormat接口 2.2.2 InputFor ...

  3. MapReduce优劣,理解MapReduce与Hadoop

    MapReduce是一种计算模型,用于大规模数据集(大于1TB)的并行运算.概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程 ...

  4. MapReduce之RecordReader理解

    RecordReader:其作用就是将数据切分成key/value的形式然后作为输入传给Mapper. 一 方法分析: 1.1initialize: 初始化RecordReader,只能被调用一次. ...

  5. MapReduce之OutputFormat理解

    一 OutputFormat作用 1校验job中指定输出路径是否存在 2将结果写入输出文件 二 OutputFormat的实现 2.1DBOutputFormat: 发送Reduce结果到SQL表中 ...

  6. MapReduce之RecordWriter理解

    RecordWriter:其实主要就是负责将task的key/value结果写入内存或者磁盘 一 方法分析 1.1 write:写key/value键值对 1.2 close: 关闭RecordWri ...

  7. hadoop之MapReduce学习教程

    hadoop之MapReduce学习 MapReduce概述 MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是用户开发"基于Hadoop的数据分析应用" ...

  8. JAVA大数据(二) Hadoop 分布式文件系统HDFS 架构,MapReduce介绍,Yarn资源调度

    文章目录 1.分布式文件系统HDFS 1.HDFS的来源 2.HDFS的架构图之基础架构 2.1 master/slave 架构 2.2 名字空间(NameSpace) 2.3 文件操作 2.4副本机 ...

  9. 【大数据实验】06:MapReduce操作

    MapReduce操作 OVERVIEW MapReduce操作 实验环境 一.WordCount单词计数 1.实验内容 2.实验原理 3.实验步骤 (1)启动Hadoop集群 (2)准备数据文件 ( ...

最新文章

  1. jQuery 2.0.3 源码分析core - 整体架构
  2. jQuery mobile 中div圆角弹出层
  3. C语言求积标识符,《C语言程序设计》模拟试卷四.doc
  4. python 货币合适_算法之Python实现 - 001 : 换钱的最少货币数
  5. windows控制台中文乱码解决方法
  6. yb3防爆电机型号含义_【产品信息】防爆充电机
  7. 【web前端】table的border属性解析(内联样式表和内部样式表中的区别)
  8. Python和Java哪个更好找工作?
  9. 【STM32H7教程】第39章 STM32H7的DMAMUX基础知识(重要)
  10. 关于CUDA,cuDNN,TF,CUDA驱动版本兼容问题
  11. Pytorch训练SSD网络时遇到的问题
  12. element-ui图标显示不出来问题
  13. 【无标题】SONET基本术语
  14. IDEA2022配置Tomcat服务器教程(超细致版)
  15. linux so lazyload,linux函数深入探索——open函数打开文件是否将文件内容加载到内存空间...
  16. 关于特殊后缀名如vue vm less等文件在DW中高亮显示并且代码提示的解决方案
  17. IT行业基础知识:云计算到底是什么
  18. oracle查询值的字符串长度、字节长度、大小写字母转换
  19. [BZOJ5020][THUWC 2017]在美妙的数学王国中畅游(LCT + 一点数学知识)
  20. 2015-11-4 html记事簿

热门文章

  1. iis10.0 php多版本,IIS7 IIS8 中多个版本php共存的方法
  2. python查找指定文件夹并重命名_python获取指定文件夹下的所有文件名,并删选指定类型文件进行重命名以及撤销重命名...
  3. 二甲医院云服务器,医院用上云计算 病情上传到云端可行否?
  4. linux怎么改目录位置,Linux下更改MySQL数据目录位置具体操作方法
  5. MySQL InnoDB 存储引擎文件
  6. swagger2使用步骤
  7. 带有哨兵的双向循环链表
  8. java volatile关键字的作用_java volatile关键字作用及使用场景详解
  9. 网页制作 css样式,网页设计与制作-CSS样式.ppt
  10. python3实用编程技巧_9.python3实用编程技巧进阶(四)