案例一: MR实战之小文件合并(自定义inputFormat)

项目准备

  1. 需求

无论hdfs还是MapReduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案

  1. 测试数据

  2. 分析

小文件的优化无非以下几种方式:

  • a) 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
  • b) 在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并
  • c) 在MapReduce处理时,可采用combineInputFormat提高效率

项目实现

注意:本节实现的是上述第二种方式

  1. 程序的核心机制:

自定义一个InputFormat

改写RecordReader,实现一次读取一个完整文件封装为KV

在输出时使用SequenceFileOutPutFormat输出合并文件

  1. 代码如下

a) 自定义InputFromat

/*** @Author 千锋大数据教学团队* @Company 千锋好程序员大数据* @Description */
public class WholeFileInputFormat extendsFileInputFormat<NullWritable, BytesWritable> {//设置每个小文件不可分片,保证一个小文件生成一个key-value键值对@Overrideprotected boolean isSplitable(JobContext context, Path file) {return false;}@Overridepublic RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,InterruptedException {WholeFileRecordReader reader = new WholeFileRecordReader();reader.initialize(split, context);return reader;}
}
复制代码

b) 自定义RecordReader

/*** @Author 千锋大数据教学团队* @Company 千锋好程序员大数据* @Description 自定义RecordReader*/
public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {private FileSplit fileSplit;private Configuration conf;private BytesWritable value = new BytesWritable();private boolean processed = false;@Overridepublic void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {this.fileSplit = (FileSplit) split;this.conf = context.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if (!processed) {byte[] contents = new byte[(int) fileSplit.getLength()];Path file = fileSplit.getPath();FileSystem fs = file.getFileSystem(conf);FSDataInputStream in = null;try {in = fs.open(file);IOUtils.readFully(in, contents, 0, contents.length);value.set(contents, 0, contents.length);} finally {IOUtils.closeStream(in);}processed = true;return true;}return false;}@Overridepublic NullWritable getCurrentKey() throws IOException,InterruptedException {return NullWritable.get();}@Overridepublic BytesWritable getCurrentValue() throws IOException,InterruptedException {return value;}@Overridepublic float getProgress() throws IOException {return processed ? 1.0f : 0.0f;}@Overridepublic void close() throws IOException {// do nothing}
}
复制代码

c) 定义MapReduce处理流程

/*** @Author 千锋大数据教学团队* @Company 千锋好程序员大数据* @Description 定义MapReduce处理流程*/
public class SmallFilesToSequenceFileConverter extends Configured implementsTool {static class SequenceFileMapper extendsMapper<NullWritable, BytesWritable, Text, BytesWritable> {private Text filenameKey;@Overrideprotected void setup(Context context) throws IOException,InterruptedException {InputSplit split = context.getInputSplit();Path path = ((FileSplit) split).getPath();filenameKey = new Text(path.toString());}@Overrideprotected void map(NullWritable key, BytesWritable value,Context context) throws IOException, InterruptedException          {context.write(filenameKey, value);}}@Overridepublic int run(String[] args) throws Exception {Configuration conf = new Configuration();System.setProperty("HADOOP_USER_NAME", "root");String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: combinefiles <in> <out>");System.exit(2);}Job job = Job.getInstance(conf,"combine small files to sequencefile");//     job.setInputFormatClass(WholeFileInputFormat.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);job.setMapperClass(SequenceFileMapper.class);return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(),args);System.exit(exitCode);}
}

全方位揭秘!大数据从0到1的完美落地之MapReduce实战案例(1)相关推荐

  1. 全方位揭秘!大数据从0到1的完美落地之大数据简介

    大数据简介 什么是大数据 ​ 最近几年,IT行业最火的名词中,少不了"大数据"."人工智能"."云计算"."物联网".& ...

  2. 全方位揭秘!大数据从0到1的完美落地之Hive自定义函数

    自定义函数 自定义函数介绍 hive的内置函数满足不了所有的业务需求.hive提供很多的模块可以自定义功能,比如:自定义函数.serde.输入输出格式等.而自定义函数可以分为以下三类: 1)UDF:u ...

  3. 全方位揭秘!大数据从0到1的完美落地之Mysql介绍

    导读 在大数据中,我们需要处理的数据来自不同的渠道,其中有一个很重要的渠道就是关系型数据库中存储的数据.在企业中,会把业务数据存储在关系型数据库中,一般以 MySQL 居多.另外,我们在后续的学习中需 ...

  4. 全方位揭秘!大数据从0到1的完美落地之Hive介绍

    Hive定义 Hive是一个基于Hadoop的数据仓库工具,可以将结构化的数据文件映射成一张数据表,并可以使用类似SQL的方式来对数据文件进行读写以及管理.这套Hive SQL 简称HQL.Hive的 ...

  5. 全方位揭秘!大数据从0到1的完美落地之Hive分桶

    分桶的概述 为什么要分桶 数据分区可能导致有些分区数据过多,有些分区数据极少.分桶是将数据集分解为若干部分(数据文件)的另一种技术. 分区和分桶其实都是对数据更细粒度的管理.当单个分区或者表中的数据越 ...

  6. 全方位揭秘!大数据从0到1的完美落地之运行流程和分片机制

    一个完整的MapReduce程序在分布式运行时有三类实例进程: MRAppMaster: 负责整个程序的过程调度及状态协调 MapTask: 负责Map阶段的整个数据处理流程 ReduceTask: ...

  7. 全方位揭秘!大数据从0到1的完美落地之Shuffle和调优

    MapReduce高级 shuffle阶段 概述 MapReduce会确保每个reducer的输入都是按键排序的.从map方法输出数据开始.到作为输入数据传给reduce方法的过程称为shuffle. ...

  8. 全方位揭秘!大数据从0到1的完美落地之HDFS块详解

    HDFS块详解 传统型分布式文件系统的缺点 现在想象一下这种情况:有四个文件 0.5TB的file1,1.2TB的file2,50GB的file3,100GB的file4:有7个服务器,每个服务器上有 ...

  9. 全方位揭秘!大数据从0到1的完美落地之Hive企业级调优

    Hive企业级调优 调优原则已经在MR优化阶段已经有核心描述,优化Hive可以按照MR的优化思路来执行 优化的主要考虑方面: 环境方面:服务器的配置.容器的配置.环境搭建 具体软件配置参数: 代码级别 ...

最新文章

  1. 蓝色梦想,再次起航 | 水下目标检测算法比赛正式开赛!
  2. js base_64 解密
  3. 多个线程同时运行,顺序打印问题
  4. 收到字节 Offer,月薪 45k,揭秘面试流程及考点
  5. ControllerContext分析
  6. 微信内测“群直播”;小米发布第三代屏下相机技术;马斯克宣布脑机接口重大突破 | 极客头条...
  7. mybatis(数据库增删改查)
  8. WebService开发常用功能详解
  9. iOS上传应用到AppStore出现Authenticating with the iTunes store
  10. Windows驱动开发技术详解——经典书评
  11. 屏幕录像软件camtasia2022汉化版好用的录屏软件
  12. r9270公版bios_换个BIOS再来一次
  13. 【番外】 使用@arcgis/cli脚手架进行ArcGIS JS API开发
  14. 你好,罗茜——爱要怎么说出口
  15. EXTJS开发过程遇到的一些问题的小结(转自麦田守望者)
  16. 批处理——提取SRT文字中的文字
  17. selenium+python自动抢购源码
  18. Expected commajson(514)
  19. oracle----存储过程
  20. 【CSS】CSS列表【CSS基础知识详解】

热门文章

  1. html 基础(1)
  2. 大宋国士——陈启文著
  3. 几行代码的播放器源代码——是真的能播放的
  4. sensor lvds接口介绍
  5. 支付宝商家分账--绑定,解绑,查询的简单实例。记录一下!!!
  6. 亿客CRM系统如何维护好客户关系?
  7. Android Studio官方视频教程笔记
  8. oracle授权时“with admin option”与“with grant option”的区别
  9. 程序员真是太太太太太有趣了!!!
  10. 思科认证的含金量如何?