由于MapReduce中默认的OutputFormat是TextOutputFormat,按行写入输出文件。但是对于我们实际应用场景中,对于Reduce的输出结果可能想要放到各种各样的输出目的地,可能是想要放到某个指定文件名的文件中,也可能是想写入HBase/Elasticsearch等等数据存储系统中,而hadoop自身提供的几种OutputFormat无法满足我们的需求时,可能就需要我们自定义这样的一个输出 类。

在这个案例中我们想要实现的是,对于一个订单文件中的订单数据进行过滤,将订单中的不同类别商品输出到不同的文件中,这里与分区不同的是,分区无法指定写入的输出文件名,而这里我们可以指定水果放在fruit.txt中,冷冻食品放到frozen.txt中,蔬菜放到vegetable.txt中。

  • 准备数据文件
[root@hadoop301 testdata]# pwd
/usr/local/wyh/software/hadoop-3.1.3/testdata
[root@hadoop301 testdata]# cat testorder.txt
apple-1
dumpling-2
orange-1
grape-1
potato-3
tofu-3
grape-1
tomato-3#注意这里可能会有重复商品,比如:grape-1

商品后面的数字表示类别表示,"1"表示水果,"2"表示冷冻食品,"3"表示蔬菜。

上传至HDFS:

[root@hadoop301 testdata]# hdfs dfs -mkdir /test_order
[root@hadoop301 testdata]# hdfs dfs -put testorder.txt /test_order

新建project:

  • 引入pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>test.wyh</groupId><artifactId>TestOrder</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><packaging>jar</packaging><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>3.1.3</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><minimizeJar>true</minimizeJar></configuration></execution></executions></plugin></plugins></build></project>
  • 创建自定义Mapper类
package test.wyh.order;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class OrderMapper extends Mapper<LongWritable, Text, Text, NullWritable> {/*** Key:行偏移量* Value:apple-1* 输出Key:apple-1* 输出Value:置空*/@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {context.write(value, NullWritable.get());}
}
  • 自定义Reducer类
package test.wyh.order;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class OrderReducer extends Reducer<Text, NullWritable, Text, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {/*** 这里使用for循环是为了避免如果原始数据中有相同的key,合并之后会变成一条记录,但是我们不希望原始数据中的数据合并,希望每条数据都能写入输出文件中。* 所以这里对每个key对应的value进行遍历,因为相同的key对应的value是会写在一个集合中的。* 比如apple-1,合并后是apple-1 <NullWritable.get(), NullWritable.get()>,只不过这里的value是置空的。* 再举一个例子word count中的合并:hello <1,1,1>* 那我们这里对每个value进行遍历,就可以保证相同的key原来有几条数据,那么每条数据都执行一下write()。*/for (NullWritable value : values) {context.write(key, NullWritable.get());}}
}
  • 自定义RecordWriter类

虽然说我们需要自定义OutputFormat去实现输出类,但真正写数据的实现是由RecordWriter完成的,素以我们需要自定义一个类去实现RecordWriter中的方法。

package test.wyh.order;import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class OrderRecordWriter extends RecordWriter<Text, NullWritable> {private FSDataOutputStream fruitOutputStream;private FSDataOutputStream frozenOutputStream;private FSDataOutputStream vegetableOutputStream;public OrderRecordWriter(TaskAttemptContext taskAttemptContext) {try {FileSystem fileSystem = FileSystem.get(taskAttemptContext.getConfiguration());//对于三个输出文件,需要创建三个输出流。输出目录必须不存在fruitOutputStream = fileSystem.create(new Path("hdfs://hadoop301:8020/orderoutput/fruit.txt"));frozenOutputStream = fileSystem.create(new Path("hdfs://hadoop301:8020/orderoutput/frozen.txt"));vegetableOutputStream = fileSystem.create(new Path("hdfs://hadoop301:8020/orderoutput/vegetable.txt"));} catch (IOException e) {e.printStackTrace();}}//设置具体的写规则@Overridepublic void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {String key = text.toString();if(key.contains("1")){//水果fruitOutputStream.writeBytes(key+"\n");}else if(key.contains("2")){//冷冻frozenOutputStream.writeBytes(key+"\n");}else{//蔬菜vegetableOutputStream.writeBytes(key+"\n");}}@Overridepublic void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {//关闭流IOUtils.closeStream(fruitOutputStream);IOUtils.closeStream(frozenOutputStream);IOUtils.closeStream(vegetableOutputStream);}
}
  • 自定义OutputFormat类
package test.wyh.order;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;//这里的泛型与reduce输出的类型保持一致
public class OrderOutputFormat extends FileOutputFormat<Text, NullWritable> {@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {OrderRecordWriter orw = new OrderRecordWriter(taskAttemptContext);return orw;}
}
  • 自定义主类
package test.wyh.order;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class OrderJobMain extends Configured implements Tool {@Overridepublic int run(String[] strings) throws Exception {Job job = Job.getInstance(super.getConf(), "testCombineJob");//!!!!!!!!!!    集群必须要设置    !!!!!!!!job.setJarByClass(OrderJobMain.class);//配置job具体要执行的任务步骤//指定要读取的文件的路径,这里写了目录,就会将该目录下的所有文件都读取到FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop301:8020/test_order"));//指定map处理逻辑类job.setMapperClass(OrderMapper.class);//指定map阶段输出的k2类型job.setMapOutputKeyClass(Text.class);//指定map阶段输出的v2类型job.setMapOutputValueClass(NullWritable.class);//指定reduce处理逻辑类job.setReducerClass(OrderReducer.class);//设置reduce之后输出的k3类型job.setOutputKeyClass(Text.class);//设置reduce之后输出的v3类型job.setOutputValueClass(NullWritable.class);//设置使用自定义输出类job.setOutputFormatClass(OrderOutputFormat.class);//虽然在自定义OutputFormat中已经指定了输出路径,但是在整个过程中还是会输出一个标记文件_SUCCESS,所以需要在这里设置一个路径存放mapreduce过程中产生的其他文件FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop301:8020/testorderoutputmark"));//返回执行状态boolean status = job.waitForCompletion(true);//使用三目运算,将布尔类型的返回值转换为整型返回值,其实这个地方的整型返回值就是返回给了下面main()中的runStatusreturn status ? 0:1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();/*** 参数一是一个Configuration对象,参数二是Tool的实现类对象,参数三是一个String类型的数组参数,可以直接使用main()中的参数args.* 返回值是一个整型的值,这个值代表了当前这个任务执行的状态.* 调用ToolRunner的run方法启动job任务.*/int runStatus = ToolRunner.run(configuration, new OrderJobMain(), args);/*** 任务执行完成后退出,根据上面状态值进行退出,如果任务执行是成功的,那么就是成功退出,如果任务是失败的,就是失败退出*/System.exit(runStatus);}
}
  • 打包并上传至服务器

  • 运行jar
[root@hadoop301 testjar]# hadoop jar TestOrder-1.0-SNAPSHOT.jar test.wyh.order.OrderJobMain
  • 查看执行结果

这样就简单地实现了自定义OutputFormat。

Hadoop3中实现MapReduce自定义OutputFormat相关推荐

  1. Hadoop_27_MapReduce_运营商原始日志增强(自定义OutputFormat)

    1.需求: 现有一些原始日志需要做增强解析处理,流程: 1. 从原始日志文件中读取数据(日志文件:https://pan.baidu.com/s/12hbDvP7jMu9yE-oLZXvM_g) 2. ...

  2. hadoop 自定义OutputFormat

    前言 在某些业务场景下,需要对原始的数据进行合理的分类输出,减少后续的程序处理数据带来的麻烦,其实这也属于ETL中的一种,比如,我们收集到了一份原始的日志,主体字段为区域编码,需要根据这个编码将这份日 ...

  3. 自定义OutputFormat案例实操

    自定义OutputFormat案例实操 文章目录 1)需求 2)需求分析 3)编程实现 1.创建Mapper类 2.创建Reducer类 3.创建OutputFormat类 4.创建RecordWri ...

  4. Hadoop中的MapReduce框架原理、数据清洗(ETL)、MapReduce开发总结、常见错误及解决方案

    文章目录 13.MapReduce框架原理 13.7 数据清洗(ETL) 13.7.1 需求 13.7.1.1 输入数据 13.7.1.2 期望输出数据 13.7.2 需求分析 13.7.3实现代码 ...

  5. hadoop中使用MapReduce编程实例

    原文链接:http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html 从网上搜到的一篇hadoop的编程实例,对于初学者真是帮助太大 ...

  6. MapReduce自定义Partitioner

    Shuffle过程是会按照Map中输出的key,把数据默认分到一个分区中,那么默认的是如何实现的? HashPartitioner是Partitioner默认的分区规则,其中numReduceTask ...

  7. vue里面_Vue中如何使用自定义插件(plugin)

    Vue中如何使用自定义插件(plugin) 1.在根目录src下创建一个libs文件夹,在libs文件夹下面创建一个myPlugins文件夹,用来存放我们的自定义插件,在myPlugins文件夹下面再 ...

  8. 如何在React Native中写一个自定义模块

    前言 在 React Native 项目中可以看到 node_modules 文件夹,这是存放 node 模块的地方,Node.js 的包管理器 npm 是全球最大的开源库生态系统.提到npm,一般指 ...

  9. spring cloud中通过配置文件自定义Ribbon负载均衡策略

    2019独角兽企业重金招聘Python工程师标准>>> spring cloud中通过配置文件自定义Ribbon负载均衡策略 博客分类: 微服务 一.Ribbon中的负载均衡策略 1 ...

最新文章

  1. Fertility of Soils:根系C/P计量比影响水稻残根周际酶活的时空动态分布特征
  2. Microsoft Dynamics SL (SOLOMON) 博客、新闻组和论坛
  3. 如何使用canvas绘图
  4. bzoj 2109: [Noi2010]Plane 航空管制
  5. 分布式光伏补贴_光伏发电上网电价政策综述
  6. 显示和快速隐藏Mac桌面所有图标
  7. Java8 Stream详解~Stream 创建
  8. 手把手教你如何逐步安装OpenStack
  9. 华为荣耀笔记本linux怎么下载软件,华为magic book笔记本怎么下载软件
  10. Android知识点 121 —— AlarmManager与RTC唤醒
  11. Word2016 页码从任意页开始
  12. 附件2-2保密承诺书.docx
  13. 使用LocalDate, LocalTime 和 LocalDateTime
  14. 江恩 计算机,江恩理论基础篇
  15. 阿里云轻量服务器怎么设置密码?
  16. MySQL审核神器Inception
  17. R语言关联规则及其可视化(Foodmart数据)
  18. DSP6455开发: dsp.lib库使用总结
  19. python提取前几行数据_python读取文件的前几行
  20. 正宗干货!关于Apple Watch官方提到的10个设计小技巧

热门文章

  1. 简述Math类中的常用方法,包括ceil,floor,min,max,round,random,double等方法的概述
  2. 融媒宝入门教程(二):融媒宝如何如何做自媒体一键发布?
  3. react-router4.0 访问其他页面未登录时跳到登录页
  4. 4.const和difine以及用const和define去初始化数组的问题
  5. 一维条码和二维码扫描有什么区别?
  6. slab/slob/slub的区别
  7. 边缘计算的现状与挑战:从理论到实践
  8. 机器学习进阶day1
  9. docker 镜像一执行状态就变成 Exited 的原因
  10. 斌哥走访1--百思买