全方位揭秘!大数据从0到1的完美落地之MapReduce实战案例(1)
案例一: MR实战之小文件合并(自定义inputFormat)
项目准备
- 需求
无论hdfs还是MapReduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案
测试数据
分析
小文件的优化无非以下几种方式:
- a) 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
- b) 在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并
- c) 在MapReduce处理时,可采用combineInputFormat提高效率
项目实现
注意:本节实现的是上述第二种方式
- 程序的核心机制:
自定义一个InputFormat
改写RecordReader,实现一次读取一个完整文件封装为KV
在输出时使用SequenceFileOutPutFormat输出合并文件
- 代码如下
a) 自定义InputFromat
/*** @Author 千锋大数据教学团队* @Company 千锋好程序员大数据* @Description */
public class WholeFileInputFormat extendsFileInputFormat<NullWritable, BytesWritable> {//设置每个小文件不可分片,保证一个小文件生成一个key-value键值对@Overrideprotected boolean isSplitable(JobContext context, Path file) {return false;}@Overridepublic RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,InterruptedException {WholeFileRecordReader reader = new WholeFileRecordReader();reader.initialize(split, context);return reader;}
}
复制代码
b) 自定义RecordReader
/*** @Author 千锋大数据教学团队* @Company 千锋好程序员大数据* @Description 自定义RecordReader*/
public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {private FileSplit fileSplit;private Configuration conf;private BytesWritable value = new BytesWritable();private boolean processed = false;@Overridepublic void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {this.fileSplit = (FileSplit) split;this.conf = context.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if (!processed) {byte[] contents = new byte[(int) fileSplit.getLength()];Path file = fileSplit.getPath();FileSystem fs = file.getFileSystem(conf);FSDataInputStream in = null;try {in = fs.open(file);IOUtils.readFully(in, contents, 0, contents.length);value.set(contents, 0, contents.length);} finally {IOUtils.closeStream(in);}processed = true;return true;}return false;}@Overridepublic NullWritable getCurrentKey() throws IOException,InterruptedException {return NullWritable.get();}@Overridepublic BytesWritable getCurrentValue() throws IOException,InterruptedException {return value;}@Overridepublic float getProgress() throws IOException {return processed ? 1.0f : 0.0f;}@Overridepublic void close() throws IOException {// do nothing}
}
复制代码
c) 定义MapReduce处理流程
/*** @Author 千锋大数据教学团队* @Company 千锋好程序员大数据* @Description 定义MapReduce处理流程*/
public class SmallFilesToSequenceFileConverter extends Configured implementsTool {static class SequenceFileMapper extendsMapper<NullWritable, BytesWritable, Text, BytesWritable> {private Text filenameKey;@Overrideprotected void setup(Context context) throws IOException,InterruptedException {InputSplit split = context.getInputSplit();Path path = ((FileSplit) split).getPath();filenameKey = new Text(path.toString());}@Overrideprotected void map(NullWritable key, BytesWritable value,Context context) throws IOException, InterruptedException {context.write(filenameKey, value);}}@Overridepublic int run(String[] args) throws Exception {Configuration conf = new Configuration();System.setProperty("HADOOP_USER_NAME", "root");String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: combinefiles <in> <out>");System.exit(2);}Job job = Job.getInstance(conf,"combine small files to sequencefile");// job.setInputFormatClass(WholeFileInputFormat.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);job.setMapperClass(SequenceFileMapper.class);return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(),args);System.exit(exitCode);}
}
全方位揭秘!大数据从0到1的完美落地之MapReduce实战案例(1)相关推荐
- 全方位揭秘!大数据从0到1的完美落地之大数据简介
大数据简介 什么是大数据 最近几年,IT行业最火的名词中,少不了"大数据"."人工智能"."云计算"."物联网".& ...
- 全方位揭秘!大数据从0到1的完美落地之Hive自定义函数
自定义函数 自定义函数介绍 hive的内置函数满足不了所有的业务需求.hive提供很多的模块可以自定义功能,比如:自定义函数.serde.输入输出格式等.而自定义函数可以分为以下三类: 1)UDF:u ...
- 全方位揭秘!大数据从0到1的完美落地之Mysql介绍
导读 在大数据中,我们需要处理的数据来自不同的渠道,其中有一个很重要的渠道就是关系型数据库中存储的数据.在企业中,会把业务数据存储在关系型数据库中,一般以 MySQL 居多.另外,我们在后续的学习中需 ...
- 全方位揭秘!大数据从0到1的完美落地之Hive介绍
Hive定义 Hive是一个基于Hadoop的数据仓库工具,可以将结构化的数据文件映射成一张数据表,并可以使用类似SQL的方式来对数据文件进行读写以及管理.这套Hive SQL 简称HQL.Hive的 ...
- 全方位揭秘!大数据从0到1的完美落地之Hive分桶
分桶的概述 为什么要分桶 数据分区可能导致有些分区数据过多,有些分区数据极少.分桶是将数据集分解为若干部分(数据文件)的另一种技术. 分区和分桶其实都是对数据更细粒度的管理.当单个分区或者表中的数据越 ...
- 全方位揭秘!大数据从0到1的完美落地之运行流程和分片机制
一个完整的MapReduce程序在分布式运行时有三类实例进程: MRAppMaster: 负责整个程序的过程调度及状态协调 MapTask: 负责Map阶段的整个数据处理流程 ReduceTask: ...
- 全方位揭秘!大数据从0到1的完美落地之Shuffle和调优
MapReduce高级 shuffle阶段 概述 MapReduce会确保每个reducer的输入都是按键排序的.从map方法输出数据开始.到作为输入数据传给reduce方法的过程称为shuffle. ...
- 全方位揭秘!大数据从0到1的完美落地之HDFS块详解
HDFS块详解 传统型分布式文件系统的缺点 现在想象一下这种情况:有四个文件 0.5TB的file1,1.2TB的file2,50GB的file3,100GB的file4:有7个服务器,每个服务器上有 ...
- 全方位揭秘!大数据从0到1的完美落地之Hive企业级调优
Hive企业级调优 调优原则已经在MR优化阶段已经有核心描述,优化Hive可以按照MR的优化思路来执行 优化的主要考虑方面: 环境方面:服务器的配置.容器的配置.环境搭建 具体软件配置参数: 代码级别 ...
最新文章
- 蓝色梦想,再次起航 | 水下目标检测算法比赛正式开赛!
- js base_64 解密
- 多个线程同时运行,顺序打印问题
- 收到字节 Offer,月薪 45k,揭秘面试流程及考点
- ControllerContext分析
- 微信内测“群直播”;小米发布第三代屏下相机技术;马斯克宣布脑机接口重大突破 | 极客头条...
- mybatis(数据库增删改查)
- WebService开发常用功能详解
- iOS上传应用到AppStore出现Authenticating with the iTunes store
- Windows驱动开发技术详解——经典书评
- 屏幕录像软件camtasia2022汉化版好用的录屏软件
- r9270公版bios_换个BIOS再来一次
- 【番外】 使用@arcgis/cli脚手架进行ArcGIS JS API开发
- 你好,罗茜——爱要怎么说出口
- EXTJS开发过程遇到的一些问题的小结(转自麦田守望者)
- 批处理——提取SRT文字中的文字
- selenium+python自动抢购源码
- Expected commajson(514)
- oracle----存储过程
- 【CSS】CSS列表【CSS基础知识详解】