一、前提:

无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,系统提供的InputFor就不适用了,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。

二、需求:

将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。

(1)输入数据:

(2)期望输出文件格式:

三、实现步骤:

1、自定义类实现继承FileInputFormat

(1)重写isSplitable()方法,返回 false不可分割。

(2)重写createRecordReader()方法,创建自定义的RecordReader对象,并初始化。

package com.c21.demo;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import java.io.IOException;public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable> {//重写isSplitable()方法,返回 false不可分割。@Overrideprotected boolean isSplitable(JobContext context, Path filename) {return false;}//重写createRecordReader()方法,创建自定义的RecordReader对象,并初始化。@Overridepublic RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {return new WholeRecordReader();}
}

2、自定义RecordReader,实现一次读取一个文件封装为kv

package com.c21.demo;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class WholeRecordReader extends RecordReader<Text, BytesWritable> {private Configuration configuration;private FileSplit split;private boolean isProgress= true;private BytesWritable value = new BytesWritable();private Text k = new Text();public WholeRecordReader() {super();}//初始化@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {this.split = (FileSplit)split;configuration = context.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if (isProgress) {// 1 定义缓存区byte[] contents = new byte[(int)split.getLength()];FileSystem fs = null;FSDataInputStream fis = null;try {// 2 获取文件系统Path path = split.getPath();fs = path.getFileSystem(configuration);// 3 读取数据fis = fs.open(path);// 4 读取文件内容IOUtils.readFully(fis, contents, 0, contents.length);// 5 输出文件内容value.set(contents, 0, contents.length);// 6 获取文件路径及名称String name = split.getPath().toString();// 7 设置输出的key值k.set(name);} catch (Exception e) {}finally {IOUtils.closeStream(fis);}isProgress = false;return true;}return false;}@Overridepublic Text getCurrentKey() throws IOException, InterruptedException {return k;}@Overridepublic BytesWritable getCurrentValue() throws IOException, InterruptedException {return value;}@Overridepublic float getProgress() throws IOException, InterruptedException {return 0;}@Overridepublic void close() throws IOException {}
}

3、自定义Mapper类:

package com.c21.demo;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {@Overrideprotected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {context.write(key, value);}
}

4、自定义Reducer类:

package com.c21.demo;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {@Overrideprotected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {context.write(key,values.iterator().next());}
}

5、Driver 类:

package com.c21.demo;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import java.io.IOException;public class SequenceFileDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 输入输出路径需要根据自己电脑上实际的输入输出路径设置args = new String[] { "e:/input/", "e:/output" };// 1 获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2 设置jar包存储位置、关联自定义的mapper和reducerjob.setJarByClass(SequenceFileDriver.class);job.setMapperClass(SequenceFileMapper.class);job.setReducerClass(SequenceFileReducer.class);// 7设置输入的inputFormatjob.setInputFormatClass(WholeFileInputformat.class);// 8设置输出的outputFormatjob.setOutputFormatClass(SequenceFileOutputFormat.class);// 3 设置map输出端的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(BytesWritable.class);// 4 设置最终输出端的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);// 5 设置输入输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 6 提交jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

运行结果:

Hadoop之自定义InputFormat相关推荐

  1. 自定义InputFormat案例

    自定义InputFormat案例 背景说明 需求 1. 需求说明 2.文件 案例分析 1.需求 2.输入数据 3.输出数据 4.实现分析 代码实现 1.自定义InputFromat 2.自定义Reco ...

  2. 大数据教程(10.6)自定义inputFormat(小文件合并)

    2019独角兽企业重金招聘Python工程师标准>>> 上一篇文章分析了运营商流量日志解析增强的实现,至此,mapreduce的组件中除了inputFormat全都自定义写过了!博主 ...

  3. hive处理日志,自定义inputformat

    开放环境,hadoop-0.20.2,hive-0.6 1.日志分隔符 Xml代码   2010-05-31 10:50:17|||61.132.4.82|||http://www.360buy.co ...

  4. Hadoop_28_MapReduce_自定义 inputFormat

    1. 自定义inputFormat 1.1.需求: 无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件,此时就需要有相应解决方案; 1.2.分析: 小文件的优化 ...

  5. Hadoop之自定义数据类型

    2019独角兽企业重金招聘Python工程师标准>>> 1.写一个类实现Writable接口 2.重写write和readFilelds方法 3.自定义数据类型,提供相应的gette ...

  6. Hadoop之——自定义计数器

    转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46057909 1.Mapper类的实现 /*** KEYIN 即k1 表示行的偏移 ...

  7. Hadoop系列之InputFormat,OutputFormat用法

    首先看下InputFormat接口的代码 public interface InputFormat<K, V> {InputSplit[] getSplits(JobConf var1, ...

  8. [转]自定义hadoop map/reduce输入文件切割InputFormat

    本文转载自:http://hi.baidu.com/lzpsky/blog/item/99d58738b08a68e7b311c70d.html hadoop会对原始输入文件进行文件切割,然后把每个s ...

  9. Hadoop之InputFormat数据输入详解

    Hadoop之InputFormat数据输入详解 Job提交流程和切片源码详解 FileInputFormat切片机制 CombineTextInputFormat切片机制 InputFormat接口 ...

  10. [Hadoop] - 自定义Mapreduce InputFormatOutputFormat

    在MR程序的开发过程中,经常会遇到输入数据不是HDFS或者数据输出目的地不是HDFS的,MapReduce的设计已经考虑到这种情况,它为我们提供了两个组建,只需要我们自定义适合的InputFormat ...

最新文章

  1. 6分钟完成ImageNet训练,NVIDIA创下六项AI性能新记录!
  2. 大数据小白系列——HDFS(3)
  3. 第十二章:二叉查找树(1)
  4. 测试龙芯 LoongArch .NET之 使用 FastTunnel 做内网穿透远程计算机
  5. 『无聊透顶』一篇很无聊的文章
  6. 10个给程序员的建议
  7. 电气专业学校排名全国计算机专业学校排名,电气工程及其自动化专业学校排名...
  8. SoundPool控件
  9. openwrt配置内核,加载air720 4G模块的USB串口设备
  10. BIOS知识枝桠——PCD
  11. 百度地图API之根据经纬度查询地址信息(Android) .(10)
  12. Java将String型字符串转换成int型(或int型数组)
  13. Matlab视觉处理模块定位控制全向轮小车运动:目标跟踪测试
  14. AndroidStudio(Idea) 快捷键错乱,insert键乱入,复制粘贴查找快捷键无效
  15. 并行计算求pi值C语言,使用并行计算求圆周率π.pdf
  16. 数据类型之列表与元组
  17. 江南春:在不确定的市场,找到确定性的增长
  18. Python Curses
  19. Proxy returns “HTTP/1.1 407 Proxy Authentication Required
  20. Wi-Fi理论基础概述

热门文章

  1. 首届技术播客月开播在即
  2. Pr 音频效果参考:降噪/恢复
  3. 2020年最新Java后端学习路线,适用于所有Java初学者!
  4. 第十六章 python Pygame的使用
  5. 高尔顿钉板实验的matlab代码动画演示
  6. 报表生成器FastReport .Net如何存储和加载报告
  7. 平均年薪 15 万,超 6 成本科学历,程序员薪资调查报告大曝光!
  8. 小米手机+MIUI系统开发版线刷到稳定版(小米8+MIUI10)
  9. 阿里云服务器远程桌面安装
  10. 阿里云弹性云桌面、传统PC和虚拟桌面VDI区别对比