Hadoop之自定义InputFormat
一、前提:
无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,系统提供的InputFor就不适用了,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。
二、需求:
将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。
(1)输入数据:
(2)期望输出文件格式:
三、实现步骤:
1、自定义类实现继承FileInputFormat
(1)重写isSplitable()方法,返回 false不可分割。
(2)重写createRecordReader()方法,创建自定义的RecordReader对象,并初始化。
package com.c21.demo;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import java.io.IOException;public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable> {//重写isSplitable()方法,返回 false不可分割。@Overrideprotected boolean isSplitable(JobContext context, Path filename) {return false;}//重写createRecordReader()方法,创建自定义的RecordReader对象,并初始化。@Overridepublic RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {return new WholeRecordReader();}
}
2、自定义RecordReader,实现一次读取一个文件封装为kv
package com.c21.demo;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class WholeRecordReader extends RecordReader<Text, BytesWritable> {private Configuration configuration;private FileSplit split;private boolean isProgress= true;private BytesWritable value = new BytesWritable();private Text k = new Text();public WholeRecordReader() {super();}//初始化@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {this.split = (FileSplit)split;configuration = context.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if (isProgress) {// 1 定义缓存区byte[] contents = new byte[(int)split.getLength()];FileSystem fs = null;FSDataInputStream fis = null;try {// 2 获取文件系统Path path = split.getPath();fs = path.getFileSystem(configuration);// 3 读取数据fis = fs.open(path);// 4 读取文件内容IOUtils.readFully(fis, contents, 0, contents.length);// 5 输出文件内容value.set(contents, 0, contents.length);// 6 获取文件路径及名称String name = split.getPath().toString();// 7 设置输出的key值k.set(name);} catch (Exception e) {}finally {IOUtils.closeStream(fis);}isProgress = false;return true;}return false;}@Overridepublic Text getCurrentKey() throws IOException, InterruptedException {return k;}@Overridepublic BytesWritable getCurrentValue() throws IOException, InterruptedException {return value;}@Overridepublic float getProgress() throws IOException, InterruptedException {return 0;}@Overridepublic void close() throws IOException {}
}
3、自定义Mapper类:
package com.c21.demo;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {@Overrideprotected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {context.write(key, value);}
}
4、自定义Reducer类:
package com.c21.demo;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {@Overrideprotected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {context.write(key,values.iterator().next());}
}
5、Driver 类:
package com.c21.demo;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import java.io.IOException;public class SequenceFileDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 输入输出路径需要根据自己电脑上实际的输入输出路径设置args = new String[] { "e:/input/", "e:/output" };// 1 获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2 设置jar包存储位置、关联自定义的mapper和reducerjob.setJarByClass(SequenceFileDriver.class);job.setMapperClass(SequenceFileMapper.class);job.setReducerClass(SequenceFileReducer.class);// 7设置输入的inputFormatjob.setInputFormatClass(WholeFileInputformat.class);// 8设置输出的outputFormatjob.setOutputFormatClass(SequenceFileOutputFormat.class);// 3 设置map输出端的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(BytesWritable.class);// 4 设置最终输出端的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);// 5 设置输入输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 6 提交jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}
运行结果:
Hadoop之自定义InputFormat相关推荐
- 自定义InputFormat案例
自定义InputFormat案例 背景说明 需求 1. 需求说明 2.文件 案例分析 1.需求 2.输入数据 3.输出数据 4.实现分析 代码实现 1.自定义InputFromat 2.自定义Reco ...
- 大数据教程(10.6)自定义inputFormat(小文件合并)
2019独角兽企业重金招聘Python工程师标准>>> 上一篇文章分析了运营商流量日志解析增强的实现,至此,mapreduce的组件中除了inputFormat全都自定义写过了!博主 ...
- hive处理日志,自定义inputformat
开放环境,hadoop-0.20.2,hive-0.6 1.日志分隔符 Xml代码 2010-05-31 10:50:17|||61.132.4.82|||http://www.360buy.co ...
- Hadoop_28_MapReduce_自定义 inputFormat
1. 自定义inputFormat 1.1.需求: 无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件,此时就需要有相应解决方案; 1.2.分析: 小文件的优化 ...
- Hadoop之自定义数据类型
2019独角兽企业重金招聘Python工程师标准>>> 1.写一个类实现Writable接口 2.重写write和readFilelds方法 3.自定义数据类型,提供相应的gette ...
- Hadoop之——自定义计数器
转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46057909 1.Mapper类的实现 /*** KEYIN 即k1 表示行的偏移 ...
- Hadoop系列之InputFormat,OutputFormat用法
首先看下InputFormat接口的代码 public interface InputFormat<K, V> {InputSplit[] getSplits(JobConf var1, ...
- [转]自定义hadoop map/reduce输入文件切割InputFormat
本文转载自:http://hi.baidu.com/lzpsky/blog/item/99d58738b08a68e7b311c70d.html hadoop会对原始输入文件进行文件切割,然后把每个s ...
- Hadoop之InputFormat数据输入详解
Hadoop之InputFormat数据输入详解 Job提交流程和切片源码详解 FileInputFormat切片机制 CombineTextInputFormat切片机制 InputFormat接口 ...
- [Hadoop] - 自定义Mapreduce InputFormatOutputFormat
在MR程序的开发过程中,经常会遇到输入数据不是HDFS或者数据输出目的地不是HDFS的,MapReduce的设计已经考虑到这种情况,它为我们提供了两个组建,只需要我们自定义适合的InputFormat ...
最新文章
- 6分钟完成ImageNet训练,NVIDIA创下六项AI性能新记录!
- 大数据小白系列——HDFS(3)
- 第十二章:二叉查找树(1)
- 测试龙芯 LoongArch .NET之 使用 FastTunnel 做内网穿透远程计算机
- 『无聊透顶』一篇很无聊的文章
- 10个给程序员的建议
- 电气专业学校排名全国计算机专业学校排名,电气工程及其自动化专业学校排名...
- SoundPool控件
- openwrt配置内核,加载air720 4G模块的USB串口设备
- BIOS知识枝桠——PCD
- 百度地图API之根据经纬度查询地址信息(Android) .(10)
- Java将String型字符串转换成int型(或int型数组)
- Matlab视觉处理模块定位控制全向轮小车运动:目标跟踪测试
- AndroidStudio(Idea) 快捷键错乱,insert键乱入,复制粘贴查找快捷键无效
- 并行计算求pi值C语言,使用并行计算求圆周率π.pdf
- 数据类型之列表与元组
- 江南春:在不确定的市场,找到确定性的增长
- Python Curses
- Proxy returns “HTTP/1.1 407 Proxy Authentication Required
- Wi-Fi理论基础概述