2019独角兽企业重金招聘Python工程师标准>>>

上一篇文章分析了运营商流量日志解析增强的实现,至此,mapreduce的组件中除了inputFormat全都自定义写过了!博主今天将继续分享自定义inputFormat。

一、需求

无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。

二、分析

小文件的优化无非以下几种方式:
           1、在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
           2、在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
           3、在mapreduce处理时,可采用combineInputFormat提高效率

三、实现

本节实现的是上述第二种方式,程序的核心机制:自定义一个InputFormat,改写RecordReader,实现一次读取一个完整文件封装为KV,在输出时使用SequenceFileOutPutFormat输出合并文件.

代码如下:

WholeFileRecordReader(自定义RecordReader)

package com.empire.hadoop.mr.combinefile;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;/*** RecordReader的核心工作逻辑: 通过nextKeyValue()方法去读取数据构造将返回的key value 通过getCurrentKey 和* getCurrentValue来返回上面构造好的key和value* * @author*/
class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {private FileSplit     fileSplit;private Configuration conf;private BytesWritable value     = new BytesWritable();private boolean       processed = false;@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {this.fileSplit = (FileSplit) split;this.conf = context.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if (!processed) {byte[] contents = new byte[(int) fileSplit.getLength()];Path file = fileSplit.getPath();FileSystem fs = file.getFileSystem(conf);FSDataInputStream in = null;try {in = fs.open(file);IOUtils.readFully(in, contents, 0, contents.length);value.set(contents, 0, contents.length);} finally {IOUtils.closeStream(in);}processed = true;return true;}return false;}@Overridepublic NullWritable getCurrentKey() throws IOException, InterruptedException {return NullWritable.get();}@Overridepublic BytesWritable getCurrentValue() throws IOException, InterruptedException {return value;}/*** 返回当前进度*/@Overridepublic float getProgress() throws IOException {return processed ? 1.0f : 0.0f;}@Overridepublic void close() throws IOException {// do nothing}
}

WholeFileInputFormat(自定义InputFormat)

package com.empire.hadoop.mr.combinefile;import java.io.IOException;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {//设置每个小文件不可分片,保证一个小文件生成一个key-value键值对@Overrideprotected boolean isSplitable(JobContext context, Path file) {return false;}@Overridepublic RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {WholeFileRecordReader reader = new WholeFileRecordReader();reader.initialize(split, context);return reader;}}

SmallFilesToSequenceFileConverter(小文件合并主程序)

package com.empire.hadoop.mr.combinefile;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class SmallFilesToSequenceFileConverter extends Configured implements Tool {static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {private Text filenameKey;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {InputSplit split = context.getInputSplit();Path path = ((FileSplit) split).getPath();filenameKey = new Text(path.toString());}@Overrideprotected void map(NullWritable key, BytesWritable value, Context context)throws IOException, InterruptedException {context.write(filenameKey, value);}}@Overridepublic int run(String[] args) throws Exception {Configuration conf = new Configuration();/* System.setProperty("HADOOP_USER_NAME", "hadoop"); */String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: combinefiles <in> <out>");System.exit(2);}Job job = Job.getInstance(conf, "combine small files to sequencefile");job.setJarByClass(SmallFilesToSequenceFileConverter.class);job.setInputFormatClass(WholeFileInputFormat.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);job.setMapperClass(SequenceFileMapper.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(), args);System.exit(exitCode);}
}

四、运行程序

#上传jarAlt+p
lcd d:/
put combinefile.jar #准备hadoop处理的数据文件cd /home/hadoop/apps/hadoop-2.9.1
hadoop fs  -mkdir -p /combinefile/smallinput
hdfs dfs -put  xxx.txt xxx1.txt  xxx2.txt xxx3.txt  /combinefile/smallinput#运行combinefile程序hadoop jar combinefile.jar  com.empire.hadoop.mr.combinefile.SmallFilesToSequenceFileConverter /combinefile/smallinput /combinefile/smalloutput

五、运行效果

[hadoop@centos-aaron-h1 ~]$ hadoop jar combinefile.jar  com.empire.hadoop.mr.combinefile.SmallFilesToSequenceFileConverter /combinefile/smallinput /combinefile/smalloutput
18/12/30 07:45:25 INFO client.RMProxy: Connecting to ResourceManager at centos-aaron-h1/192.168.29.144:8032
18/12/30 07:45:26 INFO input.FileInputFormat: Total input files to process : 3
18/12/30 07:45:26 INFO mapreduce.JobSubmitter: number of splits:3
18/12/30 07:45:26 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
18/12/30 07:45:27 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1546126874346_0001
18/12/30 07:45:27 INFO impl.YarnClientImpl: Submitted application application_1546126874346_0001
18/12/30 07:45:27 INFO mapreduce.Job: The url to track the job: http://centos-aaron-h1:8088/proxy/application_1546126874346_0001/
18/12/30 07:45:27 INFO mapreduce.Job: Running job: job_1546126874346_0001
18/12/30 07:45:36 INFO mapreduce.Job: Job job_1546126874346_0001 running in uber mode : false
18/12/30 07:45:36 INFO mapreduce.Job:  map 0% reduce 0%
18/12/30 07:45:45 INFO mapreduce.Job:  map 33% reduce 0%
18/12/30 07:45:57 INFO mapreduce.Job:  map 100% reduce 0%
18/12/30 07:45:58 INFO mapreduce.Job:  map 100% reduce 100%
18/12/30 07:45:59 INFO mapreduce.Job: Job job_1546126874346_0001 completed successfully
18/12/30 07:45:59 INFO mapreduce.Job: Counters: 50File System CountersFILE: Number of bytes read=74230FILE: Number of bytes written=938941FILE: Number of read operations=0FILE: Number of large read operations=0FILE: Number of write operations=0HDFS: Number of bytes read=74393HDFS: Number of bytes written=74324HDFS: Number of read operations=12HDFS: Number of large read operations=0HDFS: Number of write operations=2Job Counters Killed map tasks=1Launched map tasks=3Launched reduce tasks=1Data-local map tasks=3Total time spent by all maps in occupied slots (ms)=41278Total time spent by all reduces in occupied slots (ms)=10813Total time spent by all map tasks (ms)=41278Total time spent by all reduce tasks (ms)=10813Total vcore-milliseconds taken by all map tasks=41278Total vcore-milliseconds taken by all reduce tasks=10813Total megabyte-milliseconds taken by all map tasks=42268672Total megabyte-milliseconds taken by all reduce tasks=11072512Map-Reduce FrameworkMap input records=3Map output records=3Map output bytes=74213Map output materialized bytes=74242Input split bytes=371Combine input records=0Combine output records=0Reduce input groups=3Reduce shuffle bytes=74242Reduce input records=3Reduce output records=3Spilled Records=6Shuffled Maps =3Failed Shuffles=0Merged Map outputs=3GC time elapsed (ms)=808CPU time spent (ms)=8390Physical memory (bytes) snapshot=754876416Virtual memory (bytes) snapshot=3381936128Total committed heap usage (bytes)=380628992Shuffle ErrorsBAD_ID=0CONNECTION=0IO_ERROR=0WRONG_LENGTH=0WRONG_MAP=0WRONG_REDUCE=0File Input Format Counters Bytes Read=74022File Output Format Counters Bytes Written=74324
[hadoop@centos-aaron-h1 ~]$ 

六、运行结果

[hadoop@centos-aaron-h1 ~]$ hdfs dfs -ls /combinefile/smalloutput
Found 2 items
-rw-r--r--   2 hadoop supergroup          0 2018-12-30 07:45 /combinefile/smalloutput/_SUCCESS
-rw-r--r--   2 hadoop supergroup      74324 2018-12-30 07:45 /combinefile/smalloutput/part-r-00000
[hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /combinefile/smalloutput/part-r-00000
bbs
ttx tttt yyyy hhahh
shame jime apple kelly
chelly Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.
Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.
Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.
Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.
Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.
Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.
hellow  hello
hellowx hellowbbb  mail
shell bananer vival value
bbs
ttx tttt yyyy hhahh
shame jime apple kelly
chelly Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.
Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.
Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.
Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.
Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.
Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.
hellow  hello
hellowx hellowbbb  mail
shell bananer vival value
bbs
ttx tttt yyyy hhahh
shame jime apple kelly
chelly Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.
Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.
Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.
Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.
Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.
Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.
[hadoop@centos-aaron-h1 ~]$

七、总结

通过mapreduce程序运行结果,可以看到三个文件最后处理合并成了一个文件。在以上主程序中实现了extends Configured implements Tool ,可以使用这种方式来执行我们的mr程序。

最后寄语,以上是博主本次文章的全部内容,如果大家觉得博主的文章还不错,请点赞;如果您对博主其它服务器大数据技术或者博主本人感兴趣,请关注博主博客,并且欢迎随时跟博主沟通交流。

转载于:https://my.oschina.net/u/2371923/blog/2995319

大数据教程(10.6)自定义inputFormat(小文件合并)相关推荐

  1. 大数据的10大发展趋势

    如今,人们寻求获得更多的数据有着充分的理由,因为数据分析推动了数字创新.然而,将这些庞大的数据集转化为可操作的洞察力仍然是一个难题.而那些获得应对强大数据挑战的解决方案的组织将能够更好地从数字创新的成 ...

  2. 好程序员大数据教程分享之Hadoop优缺点

    好程序员大数据教程分享之Hadoop优缺点,大数据成为时代主流,开启时代的大门,全球43亿部电话.20亿位互联网用户每秒都在不断地产生大量数据,人们发送短信给朋友.上传视频.用手机拍照.更新社交网站的 ...

  3. 大数据量10道面试题及解析

    大数据量10道面试题及解析 海量数据处理:十道面试题与十个海量数据处理方法总结 作者:July.youwang.yanxionglu. 时间:二零一一年三月二十六日 本文之总结:教你如何迅速秒杀掉:9 ...

  4. Intel研究院院长吴甘沙:大数据的10个技术前沿(1.8万字+61PPT)

    Intel研究院院长吴甘沙:大数据的10个技术前沿(1.8万字+61PPT) 本次讲座是清华大数据产业联合会"技术•前沿"系列讲座的第一讲,主讲人为英特尔(中国)研究院院长吴甘沙. ...

  5. 好程序员大数据教程:SparkShell和IDEA中编写Spark程序

    好程序员大数据教程:SparkShell和IDEA中编写Spark程序,spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用Scala编写Spa ...

  6. 成都大数据培训之C++的异同小常识

    成都大数据培训之C++的异同小常识 一.C++类class和结构体struct区别 C++语言继承了 C语言的 struct,并且加以扩充.在 C语言中,struct 是只能定义数据成员,而不能定义成 ...

  7. mysql 序列自增长 恢复到1_大数据教程分享MySQL数据库约束条件和自增长序列

    大数据教程分享MySQL数据库约束条件和自增长序列,一.约束(constraint) 约束就是在表上强制执行的一种校验规则,当执行DML操作时,数据必须符合这些规则,如果不符合,将无法执行. 约束的全 ...

  8. 一幅长文细学华为MRS大数据开发(二)——HDFS分布式文件系统和ZooKeeper

    文章目录 2 HDFS分布式文件系统和ZooKeeper 2.1 HDFS概述以及应用场景 HDFS概述 HDFS应用场景 HDFS不适合的场景 2.2 HDFS相关概念 计算机集群结构 基本系统架构 ...

  9. 10分钟掌握Hive小文件过多如何解决?

    写在前面 在做数据仓库的时候,使用动态分区会产生许多的小文件,给计算资源造成较大的影响,所以本文针对小文件如何规避计算资源浪费作了一些设计 为什么要处理小文件: 1.从Hive在进行mapreduce ...

最新文章

  1. WPF基础入门3 - Panel和 Canvas基本使用
  2. 自定义注解在拦截器中为空_如何在Android中为特定联系人设置自定义铃声
  3. 史上最气人的数学家:文理双全智商还高,说话只说半句,解题只解半个,调戏人调戏了三百年还不够......
  4. carsim中质心加速度_振动CAE分析在空调压缩机支架设计中的应用
  5. springboot启动自动停止
  6. 麦克斯韦电磁场理论基础
  7. Openssl win8 用法
  8. 微信小程序开发工具报错对应的服务器证书无效
  9. 网络词汇泛滥:神马都是浮云?
  10. Python代码破解路由器config.bin从入门到放弃
  11. android 远程调试工具,【教程】搭配Android studio,如何实现app远程真机debug...
  12. 为什么国内服务器需要备案?
  13. zabbix 监控hp 打印机
  14. 论文排版中MathType的使用(论文投稿必备)
  15. 清除Windows系统用户密码
  16. 【考研英语语法】状语从句精讲
  17. UVA-10082 WERTYU
  18. 尚学堂Java学习笔记
  19. java 线程起名字_java多线程学习三::::为什么要给线程起名字并且知道守护作用?...
  20. 【云原生与5G】微服务加持5G核心网

热门文章

  1. UI仿写 - 收藏集 - 掘金
  2. UWP 矢量字体图标(iconfont)使用
  3. Cocos2d:使用 CCCamera 做滚动效果 (Four Ways of Scrolling with Cocos2D)
  4. .NET平台开源项目速览(8)Expression Evaluator表达式计算组件使用
  5. linux C实现mkdir功能
  6. 单片机控制24v电压_最全变频器控制端子接线方法和技巧
  7. 抓包概念大比较:数据报、数据包、分组
  8. 反汇编引擎Capstone
  9. Xamarin XAML语言教程基础语法篇大学霸
  10. matlab图像融合评价,MATLAB 图像融合评估算法