代码存于github:https://github.com/zuodaoyong/Hadoop

在运行MapReduce程序时,输入的文件格式有:日志文件,二进制文件,数据库表等,那么针对不同的数据类型,MapReduce提供了相应的读取数据接口实现类

TextInputFormat,KeyValueTextInputFormat,NLineInputFormat,CombineTextInputFormat和自定义InputFormat

1、TextInputFormat(Text类型)

TextInputFormat是默认的FileInputFormat实现类,按行读取每条记录。

键是存储该行在整个文件中的起始字节偏移量,LongWritable类型,值是该行内容,不包含任何终止符(换行符和回车符)

2、KeyValueTextInputFormat

每一行均为一条记录,被分隔符分割成key,value,在驱动类中设定分隔符,默认分隔符是tab(\t)

//获取配置信息,job对象实例
Configuration configuration=new Configuration();
configuration.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,"\t");

(3)NLineInputFormat

每个map进程处理的InputSplit不再按照Block块划分,而是按照NLineInputFormat指定的行数N来划分,即输入文件的总行数/N=切片数。如果不能够整除,切片数=商+1

(4)自定义InputFormat

Hadoop自带的InputFormat类型不能满足所有应用场景时,需要自定义InputFormat来解决

自定义InputFormat步骤

(1)自定义InputFormat类继承FileInputFormat

i)重写isSplitable方法,返回false不可切割

ii)重写RecordReader方法,创建自定义的RecordReader对象并初始化

public class WholeInputFormat extends FileInputFormat<NullWritable, BytesWritable>{@Overrideprotected boolean isSplitable(JobContext context, Path filename) {return false;}@Overridepublic RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext context)throws IOException, InterruptedException {WholeRecordReader recordReader=new WholeRecordReader();recordReader.initialize(null, context);return recordReader;}}

(2)改写RecordReader,实现一次读取一个完整文件封装为KV

i)采用IO流一次读取一个文件输出到value中,因为设置了不可切片,最终把所有文件都封装在value中

ii)获取文件路径信息+名称,并设置key

public class WholeRecordReader extends RecordReader<Text,BytesWritable>{private BytesWritable value=new BytesWritable();private Text key=new Text();private boolean isProcess=false;private FileSplit fileSplit;private Configuration configuration;@Overridepublic void close() throws IOException {}@Overridepublic Text getCurrentKey() throws IOException, InterruptedException {return key;}@Overridepublic BytesWritable getCurrentValue() throws IOException, InterruptedException {return value;}@Overridepublic float getProgress() throws IOException, InterruptedException {return isProcess?1:0;}@Overridepublic void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {fileSplit=(FileSplit) inputSplit;configuration = context.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if(!isProcess){FSDataInputStream inputStream=null;FileSystem fileSystem=null;try {byte[] bs=new byte[(int) fileSplit.getLength()];//获取文件系统Path path = fileSplit.getPath();fileSystem = path.getFileSystem(configuration);//打开文件流inputStream = fileSystem.open(path);IOUtils.readFully(inputStream, bs, 0,bs.length);value.set(bs, 0, bs.length);key.set(path.toString());}catch(Exception e){e.printStackTrace();}finally {if(inputStream!=null){inputStream.close();}if(fileSystem!=null){fileSystem.close();}}isProcess=true;return true;}return false;}}

(3)在输出时使用SequenceFileOutputFormat输出合并文件

job.setInputFormatClass(WholeInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);

Driver端

public static void main(String[] args) throws Exception {System.setProperty("HADOOP_USER_NAME", "root");Configuration configuration=new Configuration();Job job = Job.getInstance(configuration);//设置输入的inputFormatjob.setInputFormatClass(WholeInputFormat.class);//设置输出的outputFormatjob.setOutputFormatClass(SequenceFileOutputFormat.class);job.setMapperClass(SequenceFileMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(BytesWritable.class);job.setReducerClass(SequenceFileReduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);FileInputFormat.setInputPaths(job, new Path("/mapreduce/inputformat/sequencefiles"));FileOutputFormat.setOutputPath(job, new Path("/mapreduce/inputformat/output"));boolean waitForCompletion = job.waitForCompletion(true);System.exit(waitForCompletion==true?0:1);
}

Hadoop的FileInputFormat解析相关推荐

  1. hadoop MapReduce实例解析

    1.MapReduce理论简介 1.1 MapReduce编程模型 MapReduce采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然 ...

  2. Hadoop源码解析之: TextInputFormat如何处理跨split的行

    Hadoop源码解析之: TextInputFormat如何处理跨split的行 转载于:https://blog.51cto.com/taikongren/1742425

  3. Hadoop源码解析之Mapper数量计算公式

    前言 据说,自0.20.0版本开始,Hadoop同时提供了新旧两套MapReduce API,并在后续版本中也同时支持这两种API的使用.新版本MR API在旧的基础进行了扩展,也制定了新的split ...

  4. Hadoop的FileInputFormat.getSplits()方法的解析

    public List<InputSplit> getSplits(JobContext job) throws IOException {//记录分片过程的开始时间StopWatch s ...

  5. Hadoop常见错误解析

    1:Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out  Answer: 程序里面需要打开多个文件,进行分析,系统一般默认数量 ...

  6. Hadoop中Partition解析

    1.解析Partition Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出,下面我们就来分析参与这个过程的类 ...

  7. Hadoop源码解析

    一.hadoop的Job 提交流程源码 流程图: 1.从我们编写的mapreduce的代码中进入job提交源码 支线一:进入connect(); 2.支线二:进入submitter.submitJob ...

  8. MapReduce中FileInputFormat解析

    1.概述 在map阶段,文件先被切分成split块,而后每一个split切片对应一个Mapper任务. FileInputFormat这个类先对输入文件进行逻辑上的划分,以128M为单位,将原始数据从 ...

  9. 超详细单机版搭建hadoop环境图文解析

    转自:http://weixiaolu.iteye.com/blog/1401931 安装过程: 一.安装Linux操作系统 二.在Ubuntu下创建hadoop用户组和用户 三.在Ubuntu下安装 ...

最新文章

  1. SAP物料标准成本估算结果
  2. apt来安装mysql5.7,linux系统ubuntu18.04安装mysql 5.7
  3. [原创] 指针操作程序答案 — 谭浩强C语言习题答案
  4. abap中的弹出窗体函数
  5. sublime text3 package control 报错
  6. 开源框架openresty+nginx 实现web应用防火墙(WAF)
  7. 非计算机专业《Python程序设计基础》教学参考大纲
  8. qt 两种按钮点击事件应用
  9. 《Python分布式计算》第2章 异步编程 (Distributed Computing with Python)
  10. [Silverlight动画]转向行为 - 路径跟随
  11. (转)AssetBundle系列——游戏资源打包(二)
  12. php在HTML的遍历,php里面怎么循环嵌入html元素
  13. 《微机原理与应用》题库
  14. cmd字体推荐-更纱黑体
  15. 前端利用jQuery设置日期选择框
  16. 这一年,我“生病”了
  17. ubuntu下发送邮件到外部邮箱
  18. (附源码)ssm学校疫情服务平台 毕业设计 291202
  19. 超简单的配置java环境变量(绝对路径)
  20. 一款好用的tomcat插件---TomcatPlugin插件

热门文章

  1. TKmybatis的使用,MyBatis的Mapper接口、Example方法
  2. 【proteus】proteus界面介绍
  3. Pycharm设置自动代码提示(超详细)
  4. Spring5-Spring的基本配置
  5. 万维网运行原理分析实验
  6. lucas定理 学习笔记
  7. Live800:客户服务无小事,别让服务击溃企业口碑
  8. 使用两年之后,我为什么卸载了Istio?
  9. 工作三年,我不仅脱发,还得了职业病———百格活动
  10. jquery实现拖动效果(代码+解释)