Hadoop的FileInputFormat解析
代码存于github:https://github.com/zuodaoyong/Hadoop
在运行MapReduce程序时,输入的文件格式有:日志文件,二进制文件,数据库表等,那么针对不同的数据类型,MapReduce提供了相应的读取数据接口实现类
TextInputFormat,KeyValueTextInputFormat,NLineInputFormat,CombineTextInputFormat和自定义InputFormat
1、TextInputFormat(Text类型)
TextInputFormat是默认的FileInputFormat实现类,按行读取每条记录。
键是存储该行在整个文件中的起始字节偏移量,LongWritable类型,值是该行内容,不包含任何终止符(换行符和回车符)
2、KeyValueTextInputFormat
每一行均为一条记录,被分隔符分割成key,value,在驱动类中设定分隔符,默认分隔符是tab(\t)
//获取配置信息,job对象实例
Configuration configuration=new Configuration();
configuration.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,"\t");
(3)NLineInputFormat
每个map进程处理的InputSplit不再按照Block块划分,而是按照NLineInputFormat指定的行数N来划分,即输入文件的总行数/N=切片数。如果不能够整除,切片数=商+1
(4)自定义InputFormat
Hadoop自带的InputFormat类型不能满足所有应用场景时,需要自定义InputFormat来解决
自定义InputFormat步骤
(1)自定义InputFormat类继承FileInputFormat
i)重写isSplitable方法,返回false不可切割
ii)重写RecordReader方法,创建自定义的RecordReader对象并初始化
public class WholeInputFormat extends FileInputFormat<NullWritable, BytesWritable>{@Overrideprotected boolean isSplitable(JobContext context, Path filename) {return false;}@Overridepublic RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext context)throws IOException, InterruptedException {WholeRecordReader recordReader=new WholeRecordReader();recordReader.initialize(null, context);return recordReader;}}
(2)改写RecordReader,实现一次读取一个完整文件封装为KV
i)采用IO流一次读取一个文件输出到value中,因为设置了不可切片,最终把所有文件都封装在value中
ii)获取文件路径信息+名称,并设置key
public class WholeRecordReader extends RecordReader<Text,BytesWritable>{private BytesWritable value=new BytesWritable();private Text key=new Text();private boolean isProcess=false;private FileSplit fileSplit;private Configuration configuration;@Overridepublic void close() throws IOException {}@Overridepublic Text getCurrentKey() throws IOException, InterruptedException {return key;}@Overridepublic BytesWritable getCurrentValue() throws IOException, InterruptedException {return value;}@Overridepublic float getProgress() throws IOException, InterruptedException {return isProcess?1:0;}@Overridepublic void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {fileSplit=(FileSplit) inputSplit;configuration = context.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if(!isProcess){FSDataInputStream inputStream=null;FileSystem fileSystem=null;try {byte[] bs=new byte[(int) fileSplit.getLength()];//获取文件系统Path path = fileSplit.getPath();fileSystem = path.getFileSystem(configuration);//打开文件流inputStream = fileSystem.open(path);IOUtils.readFully(inputStream, bs, 0,bs.length);value.set(bs, 0, bs.length);key.set(path.toString());}catch(Exception e){e.printStackTrace();}finally {if(inputStream!=null){inputStream.close();}if(fileSystem!=null){fileSystem.close();}}isProcess=true;return true;}return false;}}
(3)在输出时使用SequenceFileOutputFormat输出合并文件
job.setInputFormatClass(WholeInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
Driver端
public static void main(String[] args) throws Exception {System.setProperty("HADOOP_USER_NAME", "root");Configuration configuration=new Configuration();Job job = Job.getInstance(configuration);//设置输入的inputFormatjob.setInputFormatClass(WholeInputFormat.class);//设置输出的outputFormatjob.setOutputFormatClass(SequenceFileOutputFormat.class);job.setMapperClass(SequenceFileMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(BytesWritable.class);job.setReducerClass(SequenceFileReduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);FileInputFormat.setInputPaths(job, new Path("/mapreduce/inputformat/sequencefiles"));FileOutputFormat.setOutputPath(job, new Path("/mapreduce/inputformat/output"));boolean waitForCompletion = job.waitForCompletion(true);System.exit(waitForCompletion==true?0:1);
}
Hadoop的FileInputFormat解析相关推荐
- hadoop MapReduce实例解析
1.MapReduce理论简介 1.1 MapReduce编程模型 MapReduce采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然 ...
- Hadoop源码解析之: TextInputFormat如何处理跨split的行
Hadoop源码解析之: TextInputFormat如何处理跨split的行 转载于:https://blog.51cto.com/taikongren/1742425
- Hadoop源码解析之Mapper数量计算公式
前言 据说,自0.20.0版本开始,Hadoop同时提供了新旧两套MapReduce API,并在后续版本中也同时支持这两种API的使用.新版本MR API在旧的基础进行了扩展,也制定了新的split ...
- Hadoop的FileInputFormat.getSplits()方法的解析
public List<InputSplit> getSplits(JobContext job) throws IOException {//记录分片过程的开始时间StopWatch s ...
- Hadoop常见错误解析
1:Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out Answer: 程序里面需要打开多个文件,进行分析,系统一般默认数量 ...
- Hadoop中Partition解析
1.解析Partition Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出,下面我们就来分析参与这个过程的类 ...
- Hadoop源码解析
一.hadoop的Job 提交流程源码 流程图: 1.从我们编写的mapreduce的代码中进入job提交源码 支线一:进入connect(); 2.支线二:进入submitter.submitJob ...
- MapReduce中FileInputFormat解析
1.概述 在map阶段,文件先被切分成split块,而后每一个split切片对应一个Mapper任务. FileInputFormat这个类先对输入文件进行逻辑上的划分,以128M为单位,将原始数据从 ...
- 超详细单机版搭建hadoop环境图文解析
转自:http://weixiaolu.iteye.com/blog/1401931 安装过程: 一.安装Linux操作系统 二.在Ubuntu下创建hadoop用户组和用户 三.在Ubuntu下安装 ...
最新文章
- SAP物料标准成本估算结果
- apt来安装mysql5.7,linux系统ubuntu18.04安装mysql 5.7
- [原创] 指针操作程序答案 — 谭浩强C语言习题答案
- abap中的弹出窗体函数
- sublime text3 package control 报错
- 开源框架openresty+nginx 实现web应用防火墙(WAF)
- 非计算机专业《Python程序设计基础》教学参考大纲
- qt 两种按钮点击事件应用
- 《Python分布式计算》第2章 异步编程 (Distributed Computing with Python)
- [Silverlight动画]转向行为 - 路径跟随
- (转)AssetBundle系列——游戏资源打包(二)
- php在HTML的遍历,php里面怎么循环嵌入html元素
- 《微机原理与应用》题库
- cmd字体推荐-更纱黑体
- 前端利用jQuery设置日期选择框
- 这一年,我“生病”了
- ubuntu下发送邮件到外部邮箱
- (附源码)ssm学校疫情服务平台 毕业设计 291202
- 超简单的配置java环境变量(绝对路径)
- 一款好用的tomcat插件---TomcatPlugin插件