一:简单认识InputFormat类

InputFormat主要用于描述输入数据的格式,提供了以下两个功能:

1)、数据切分,按照某个策略将输入数据且分成若干个split,以便确定Map Task的个数即Mapper的个数,在MapReduce框架中,一个split就意味着需要一个Map Task;

2)为Mapper提供输入数据,即给定一个split,(使用其中的RecordReader对象)将之解析为一个个的key/value键值对。

下面我们先来看以下1.0版本中的老的InputFormat接口:

Java代码 
  1. public interface InputFormat<K,V>{
  2. //获取所有的split分片
  3. public InputSplit[] getSplits(JobConf job,int numSplits) throws IOException;
  4. //获取读取split的RecordReader对象,实际上是由RecordReader对象将
  5. //split解析成一个个的key/value对儿
  6. public RecordReader<K,V> getRecordReader(InputSplit split,
  7. JobConf job,
  8. Reporter reporter) throws IOException;
  9. }

InputSplit 
        getSplit(...)方法主要用于切分数据,它会尝试浙江输入数据且分成numSplits个InputSplit的栓皮栎split分片。InputSplit主要有以下特点: 
        1)、逻辑分片,之前我们已经学习过split和block的对应关系和区别,split只是在逻辑上对数据分片,并不会在磁盘上讲数据切分成split物理分片,实际上数据在HDFS上还是以block为基本单位来存储数据的。InputSplit只记录了Mapper要处理的数据的元数据信息,如起始位置、长度和所在的节点;

2)、可序列化,在Hadoop中,序列化主要起两个作用,进程间通信和数据持久化存储。在这里,InputSplit主要用于进程间的通信。 
         在作业被提交到JobTracker之前,Client会先调用作业InputSplit中的getSplit()方法,并将得到的分片信息序列化到文件中,这样,在作业在JobTracker端初始化时,便可并解析出所有split分片,创建对象应的Map Task。 
         InputSplit也是一个interface,具体返回什么样的implement,这是由具体的InputFormat来决定的。InputSplit也只有两个接口函数:

Java代码 
  1. public interface InputSplit extends Writable {
  2. /**
  3. * 获取split分片的长度
  4. *
  5. * @return the number of bytes in the input split.
  6. * @throws IOException
  7. */
  8. long getLength() throws IOException;
  9. /**
  10. * 获取存放这个Split的Location信息(也就是这个Split在HDFS上存放的机器。它可能有
  11. * 多个replication,存在于多台机器上
  12. *
  13. * @return list of hostnames where data of the <code>InputSplit</code> is
  14. *         located as an array of <code>String</code>s.
  15. * @throws IOException
  16. */
  17. String[] getLocations() throws IOException;
  18. }

在需要读取一个Split的时候,其对应的InputSplit会被传递到InputFormat的第二个接口函数getRecordReader,然后被用于初始化一个RecordReader,以便解析输入数据,描述Split的重要信息都被隐藏了,只有具体的InputFormat自己知道,InputFormat只需要保证getSplits返回的InputSplit和getRecordReader所关心的InputSplit是同样的implement就行了,这给InputFormat的实现提供了巨大的灵活性。 
         在MapReduce框架中最常用的FileInputFormat为例,其内部使用的就是FileSplit来描述InputSplit。我们来看一下FileSplit的一些定义信息:

Java代码  
  1. /** A section of an input file.  Returned by {@link
  2. * InputFormat#getSplits(JobConf, int)} and passed to
  3. * {@link InputFormat#getRecordReader(InputSplit,JobConf,Reporter)}.
  4. */
  5. public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit
  6. implements InputSplit {
  7. // Split所在的文件
  8. private Path file;
  9. // Split的起始位置
  10. private long start;
  11. // Split的长度
  12. private long length;
  13. // Split所在的机器名称
  14. private String[] hosts;
  15. FileSplit() {}
  16. /** Constructs a split.
  17. * @deprecated
  18. * @param file the file name
  19. * @param start the position of the first byte in the file to process
  20. * @param length the number of bytes in the file to process
  21. */
  22. @Deprecated
  23. public FileSplit(Path file, long start, long length, JobConf conf) {
  24. this(file, start, length, (String[])null);
  25. }
  26. /** Constructs a split with host information
  27. *
  28. * @param file the file name
  29. * @param start the position of the first byte in the file to process
  30. * @param length the number of bytes in the file to process
  31. * @param hosts the list of hosts containing the block, possibly null
  32. */
  33. public FileSplit(Path file, long start, long length, String[] hosts) {
  34. this.file = file;
  35. this.start = start;
  36. this.length = length;
  37. this.hosts = hosts;
  38. }
  39. /** The file containing this split's data. */
  40. public Path getPath() { return file; }
  41. /** The position of the first byte in the file to process. */
  42. public long getStart() { return start; }
  43. /** The number of bytes in the file to process. */
  44. public long getLength() { return length; }
  45. public String toString() { return file + ":" + start + "+" + length; }
  46. // Writable methods
  47. public void write(DataOutput out) throws IOException {
  48. UTF8.writeString(out, file.toString());
  49. out.writeLong(start);
  50. out.writeLong(length);
  51. }
  52. public void readFields(DataInput in) throws IOException {
  53. file = new Path(UTF8.readString(in));
  54. start = in.readLong();
  55. length = in.readLong();
  56. hosts = null;
  57. }
  58. public String[] getLocations() throws IOException {
  59. if (this.hosts == null) {
  60. return new String[]{};
  61. } else {
  62. return this.hosts;
  63. }
  64. }
  65. }

从上面的代码中我们可以看到,FileSplit就是InputSplit接口的一个实现。InputFormat使用的RecordReader将从FileSplit中获取信息,解析FileSplit对象从而获得需要的数据的起始位置、长度和节点位置。

RecordReader 
         对于getRecordReader(...)方法,它返回一个RecordReader对象,该对象可以讲输入的split分片解析成一个个的key/value对儿。在Map Task的执行过程中,会不停的调用RecordReader对象的方法,迭代获取key/value并交给map()方法处理:

Java代码 
  1. //调用InputFormat的getRecordReader()获取RecordReader<K,V>对象,
  2. //并由RecordReader对象解析其中的input(split)...
  3. K1 key = input.createKey();
  4. V1 value = input.createValue();
  5. while(input.next(key,value)){//从input读取下一个key/value对
  6. //调用用户编写的map()方法
  7. }
  8. input.close();

RecordReader主要有两个功能: 
         ●定位记录的边界:由于FileInputFormat是按照数据量对文件进行切分,因而有可能会将一条完整的记录切成2部分,分别属于两个split分片,为了解决跨InputSplit分片读取数据的问题,RecordReader规定每个分片的第一条不完整的记录划给前一个分片处理。 
         ●解析key/value:定位一条新的记录,将记录分解成key和value两部分供Mapper处理。

InputFormat 
         MapReduce自带了一些InputFormat的实现类:

下面我们看几个有代表性的InputFormat: 
         FileInputFormat 
         FileInputFormat是一个抽象类,它最重要的功能是为各种InputFormat提供统一的getSplits()方法,该方法最核心的是文件切分算法和Host选择算法:

Java代码 
  1. /** Splits files returned by {@link #listStatus(JobConf)} when
  2. * they're too big.*/
  3. @SuppressWarnings("deprecation")
  4. public InputSplit[] getSplits(JobConf job, int numSplits)
  5. throws IOException {
  6. FileStatus[] files = listStatus(job);
  7. // Save the number of input files in the job-conf
  8. job.setLong(NUM_INPUT_FILES, files.length);
  9. long totalSize = 0;                           // compute total size
  10. for (FileStatus file: files) {                // check we have valid files
  11. if (file.isDir()) {
  12. throw new IOException("Not a file: "+ file.getPath());
  13. }
  14. totalSize += file.getLen();
  15. }
  16. long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
  17. long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
  18. minSplitSize);
  19. // 定义要生成的splits(FileSplit)的集合
  20. ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
  21. NetworkTopology clusterMap = new NetworkTopology();
  22. for (FileStatus file: files) {
  23. Path path = file.getPath();
  24. FileSystem fs = path.getFileSystem(job);
  25. long length = file.getLen();
  26. BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
  27. if ((length != 0) && isSplitable(fs, path)) {
  28. long blockSize = file.getBlockSize();
  29. //获取最终的split分片的大小,该值很可能和blockSize不相等
  30. long splitSize = computeSplitSize(goalSize, minSize, blockSize);
  31. long bytesRemaining = length;
  32. while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  33. //获取split分片所在的host的节点信息
  34. String[] splitHosts = getSplitHosts(blkLocations,
  35. length-bytesRemaining, splitSize, clusterMap);
  36. //最终生成所有分片
  37. splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
  38. splitHosts));
  39. bytesRemaining -= splitSize;
  40. }
  41. if (bytesRemaining != 0) {
  42. splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
  43. blkLocations[blkLocations.length-1].getHosts()));
  44. }
  45. } else if (length != 0) {
  46. //获取split分片所在的host的节点信息
  47. String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
  48. //最终生成所有分片
  49. splits.add(new FileSplit(path, 0, length, splitHosts));
  50. } else {
  51. //Create empty hosts array for zero length files
  52. //最终生成所有分片
  53. splits.add(new FileSplit(path, 0, length, new String[0]));
  54. }
  55. }
  56. LOG.debug("Total # of splits: " + splits.size());
  57. return splits.toArray(new FileSplit[splits.size()]);
  58. }

1)、文件切分算法 
          文件切分算法主要用于确定InputSplit的个数以及每个InputSplit对应的数据段,FileInputSplit以文件为单位切分生成InputSplit。有三个属性值来确定InputSplit的个数: 
          ●goalSize:该值由totalSize/numSplits来确定InputSplit的长度,它是根据用户的期望的InputSplit个数计算出来的;numSplits为用户设定的Map Task的个数,默认为1。 
          ●minSize:由配置参数mapred.min.split.size决定的InputFormat的最小长度,默认为1。 
          ●blockSize:HDFS中的文件存储块block的大小,默认为64MB。 
          这三个参数决定一个InputFormat分片的最终的长度,计算方法如下: 
                      splitSize = max{minSize,min{goalSize,blockSize}} 
计算出了分片的长度后,也就确定了InputFormat的数目。

2)、host选择算法 
          InputFormat的切分方案确定后,接下来就是要确定每一个InputSplit的元数据信息。InputSplit元数据通常包括四部分,<file,start,length,hosts>其意义为: 
          ●file标识InputSplit分片所在的文件; 
          ●InputSplit分片在文件中的的起始位置; 
          ●InputSplit分片的长度; 
          ●分片所在的host节点的列表。 
          InputSplit的host列表的算作策略直接影响到运行作业的本地性。我们知道,由于大文件存储在HDFS上的block可能会遍布整个Hadoop集群,而一个InputSplit分片的划分算法可能会导致一个split分片对应多个不在同一个节点上的blocks,这就会使得在Map Task执行过程中会涉及到读其他节点上的属于该Task的block中的数据,从而不能实现数据本地性,而造成更多的网络传输开销。 
          一个InputSplit分片对应的blocks可能位于多个数据节点地上,但是基于任务调度的效率,通常情况下,不会把一个分片涉及的所有的节点信息都加到其host列表中,而是选择包含该分片的数据总量的最大的前几个节点,作为任务调度时判断是否具有本地性的主要凭证。 
         FileInputFormat使用了一个启发式的host选择算法:首先按照rack机架包含的数据量对rack排序,然后再在rack内部按照每个node节点包含的数据量对node排序,最后选取前N个(N为block的副本数)node的host作为InputSplit分片的host列表。当任务地调度Task作业时,只要将Task调度给host列表上的节点,就可以认为该Task满足了本地性。 
         从上面的信息我们可以知道,当InputSplit分片的大小大于block的大小时,Map Task并不能完全满足数据的本地性,总有一本分的数据要通过网络从远程节点上读数据,故为了提高Map Task的数据本地性,减少网络传输的开销,应尽量是InputFormat的大小和HDFS的block块大小相同。

TextInputFormat 
          默认情况下,MapReduce使用的是TextInputFormat来读分片并将记录数据解析成一个个的key/value对,其中key为该行在整个文件(注意而不是在一个block)中的偏移量,而行的内容即为value。 
          CombineFileInputFormat 
          CombineFileInputFormat的作用是把许多文件合并作为一个map的输入,它的主要思路是把输入目录下的大文件分成多个map的输入, 并合并小文件, 做为一个map的输入。适合在处理多个小文件的场景。 
          SequenceFileInputFormat 
          SequenceFileInputFormat是一个顺序的二进制的FileInputFormat,内部以key/value的格式保存数据,通常会结合LZO或Snappy压缩算法来读取或保存可分片的数据文件。

MapReduce InputFormat之FileInputFormat相关推荐

  1. Hadoop MapReduce InputFormat基础

    有时候你可能想要用不同的方法从input data中读取数据.那么你就需要创建一个自己的InputFormat类.   InputFormat是一个只有两个函数的接口.   1 public inte ...

  2. hadoop mapreduce相关类 FileInputFormat

    hadoop mapreduce相关类 FileInputFormat 官方链接 http://hadoop.apache.org/docs/r2.9.1/api/ 功能 InputFormat会生成 ...

  3. MapReduce源码之InputFormat

    2019独角兽企业重金招聘Python工程师标准>>> InputFormat主要做了一下事情: 验证作业的输入配置(即检查数据是否存在) 按照InputSplit类型将输入块和文件 ...

  4. MapReduce操作时Error:The method setInputPaths(JobConf, String) in the type FileInputFormat is not

    问题描述 今天在写java API的MapReduce操作时,出现了Error:The method setInputPaths(JobConf, String) in the type FileIn ...

  5. 大数据【四】MapReduce(单词计数;二次排序;计数器;join;分布式缓存)

       前言: 根据前面的几篇博客学习,现在可以进行MapReduce学习了.本篇博客首先阐述了MapReduce的概念及使用原理,其次直接从五个实验中实践学习(单词计数,二次排序,计数器,join,分 ...

  6. MapReduce编程系列 — 2:计算平均分

    1.项目名称: 2.程序代码: package com.averagescorecount;import java.io.IOException; import java.util.Iterator; ...

  7. 自定义InputFormat案例

    自定义InputFormat案例 背景说明 需求 1. 需求说明 2.文件 案例分析 1.需求 2.输入数据 3.输出数据 4.实现分析 代码实现 1.自定义InputFromat 2.自定义Reco ...

  8. 从源码的角度分析MapReduce的map-input流程

    前言 之前我们对MapReduce中Client提交Job作业的流程进行了源码分析(点击查看Client提交Job作业源码分析),今天我们来分析一下map-input阶段的源码. 源码位置 hadoo ...

  9. Hadoop学习笔记:MapReduce框架详解

    原文:http://blog.jobbole.com/84089/ 原文出处: 夏天的森林 开始聊mapreduce,mapreduce是hadoop的计算框架,我学hadoop是从hive开始入手, ...

最新文章

  1. android 各种控件颜色值的设置(使用Drawable,Color)
  2. 屏幕滚动控件Scrollview
  3. laravel中单独获取一个错误信息的方法
  4. 就业技术书文件表格_429页标准指南,教你如何管理工程监理文件资料,丰富图表一看就会...
  5. 【POJ 1845】 Sumdiv (整数唯分+约数和公式+二分等比数列前n项和+同余)
  6. vue 组件中的钩子函数 不能直接写this
  7. 如何打造标签式IE浏览器 (共享源码)
  8. 【瑕疵检测】基于matlab GUI灰度共生矩阵痕迹检测【含Matlab源码 863期】
  9. StretchDIBits函数显示图片
  10. linux logo程序设计,Android开机LOGO的修改
  11. Appium原理及使用
  12. Flash相关知识总结
  13. 13行列式02---余子式与代数余子式、行列式按行(列)展开法则、行列式计算、范德蒙行列式
  14. 到底什么是爬虫技术?简谈爬虫概念
  15. MongoDB-day1:MongoDB安装使用、python操作、虚拟环境、windows下pip豆瓣源配置
  16. 均值定理最大值最小值公式_数学均值定理怎么求不等式的最大值最小值,求教会(ฅω*ฅ)...
  17. 学会提问——批判性思维指南
  18. Pdf文档在线编辑控件源码及演示
  19. 图像处理---亚像素
  20. linux报错Loading mirror speeds from cached hostfile解决方法

热门文章

  1. Codeigniter基础
  2. HTML5css3学习总结(2)
  3. css中auto的用法
  4. C# WinForm窗口最小化到系统托盘
  5. Enterprise Library2.0研究(一)日志组件的使用场景
  6. Django使用消息提示简单的弹出个对话框
  7. 启动tomcat的startup.bat闪退问题
  8. *** is required and cannot be removed from the server
  9. c++ class struct同名_如何把C++的源代码改写成C代码?而C改C++只需一步!
  10. 程序代码移植和烧录需要注意什么_法人变更需要注意什么