前言

在某些业务场景下,需要对原始的数据进行合理的分类输出,减少后续的程序处理数据带来的麻烦,其实这也属于ETL中的一种,比如,我们收集到了一份原始的日志,主体字段为区域编码,需要根据这个编码将这份日志分割输出到不同的文件中

在前面的一篇自定义分区中,可以将原始的文件在自定义的Partioner类中结合实际业务需求,将数据最终输出到不同的分区文件下,这属于一种解决方案,但使用这种方式也有一定的弊端,因为分区数量增大了,必然带来MapTask 数量的增加,带来的是服务器资源的更多开销

与分区输出不同的是,自定义OutputFormat更加灵活,相当于是MapReduce之外的扩展,可以在自定义OutputFormat中编写更复杂的业务逻辑定制化业务场景,同时,更重要的是,真正做到按照业务字段,将原始的文件最终输出到不同的文件中去。

OutputFormat使用场景

输出数据到中间件,例如mysql,es,hbase等

自定义OutputFormat步骤

  • 自定义一个OutputFormat类
  • 重写里面的RecordWriter方法,具体改造输出数据的write方法

业务场景

有下面的一个日志文件

需求是,将这份文件中,包含 "www.wangyi.com"的内容输出到一个文件,其他的输出到另一个文件

编码实现

1、自定义Mapper类

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class LogMapper extends Mapper<LongWritable,Text,Text,NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {context.write(value,NullWritable.get());}
}

2、自定义Reduce类

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class LogReducer extends Reducer<Text,NullWritable,Text,NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {for (NullWritable nullWritable : values){context.write(key,NullWritable.get());}}}

3、自定义OutputFormat类

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class LogOutputFormat extends FileOutputFormat<Text,NullWritable> {@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {LogRecordWriter recordWriter = new LogRecordWriter(taskAttemptContext);return recordWriter;}}

LogRecordWriter

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class LogRecordWriter extends RecordWriter<Text,NullWritable> {private FSDataOutputStream data1 ;private FSDataOutputStream data2 ;public LogRecordWriter(TaskAttemptContext context){try {FileSystem fileSystem = FileSystem.get(context.getConfiguration());data1 = fileSystem.create(new Path("F:\\网盘\\csv\\logs\\wangyi.log"));data2 = fileSystem.create(new Path("F:\\网盘\\csv\\logs\\other.log"));} catch (IOException e) {e.printStackTrace();}}/*** 执行写逻辑* @param text* @param nullWritable* @throws IOException* @throws InterruptedException*/@Overridepublic void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {String value = text.toString();if(value.trim().contains("www.wangyi.com")){data1.writeBytes(value.trim() + "\n");}else {data2.writeBytes(value.trim() + "\n");}}@Overridepublic void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {IOUtils.closeStream(data1);IOUtils.closeStream(data2);}
}

4、job类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class LogJob {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(LogJob.class);job.setMapperClass(LogMapper.class);job.setReducerClass(LogReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//设置自定义的outputformatjob.setOutputFormatClass(LogOutputFormat.class);FileInputFormat.setInputPaths(job, new Path("F:\\网盘\\csv\\website.txt"));//虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat//而fileoutputformat要输出一个_SUCCESS文件,所以在这还得指定一个输出目录FileOutputFormat.setOutputPath(job, new Path("F:\\网盘\\csv\\logs"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}}

运行上面的程序,观察输出目录文件,这样就可以满足上面的需求

hadoop 自定义OutputFormat相关推荐

  1. hadoop入门8:自定义OutputFormat,根据需求数据输出不同的路径

    在有些情况下,我们需要数据分开输出,即指定输出路径,这时就需要重写outputformat. 测试数据(部分): 1374609798.19 1374609798.20 1374609798.20 1 ...

  2. Hadoop_27_MapReduce_运营商原始日志增强(自定义OutputFormat)

    1.需求: 现有一些原始日志需要做增强解析处理,流程: 1. 从原始日志文件中读取数据(日志文件:https://pan.baidu.com/s/12hbDvP7jMu9yE-oLZXvM_g) 2. ...

  3. Hadoop之OutputFormat数据输出详解

    Hadoop之OutputFormat数据输出详解 目录 OutputFormat接口实现类 自定义OutputFormat 1. OutputFormat接口实现类 OutputFormat是Map ...

  4. [Hadoop] - 自定义Mapreduce InputFormatOutputFormat

    在MR程序的开发过程中,经常会遇到输入数据不是HDFS或者数据输出目的地不是HDFS的,MapReduce的设计已经考虑到这种情况,它为我们提供了两个组建,只需要我们自定义适合的InputFormat ...

  5. 自定义OutputFormat案例实操

    自定义OutputFormat案例实操 文章目录 1)需求 2)需求分析 3)编程实现 1.创建Mapper类 2.创建Reducer类 3.创建OutputFormat类 4.创建RecordWri ...

  6. Hadoop3中实现MapReduce自定义OutputFormat

    由于MapReduce中默认的OutputFormat是TextOutputFormat,按行写入输出文件.但是对于我们实际应用场景中,对于Reduce的输出结果可能想要放到各种各样的输出目的地,可能 ...

  7. Hadoop的MR编程实现partition、sort和自定义outputformat

    Partitioner public abstract class Partitioner<KEY, VALUE> (key.hashCode() & Integer.MAX_VA ...

  8. 关于spark写入文件至文件系统并制定文件名之自定义outputFormat

    引言: spark项目中通常我们需要将我们处理之后数据保存到文件中,比如将处理之后的RDD保存到hdfs上指定的目录中,亦或是保存在本地 spark保存文件: 1.rdd.saveAsTextFile ...

  9. hadoop自定义类型注意问题

    自定义类型要实现WritableComparable 接口,(之前只实现Writable  ,结果报错) 问题的主要原因是因为自定义类型在Partitioners 阶段要用到hashCode() 方法 ...

最新文章

  1. python 同步 互斥 信号量 锁 简介
  2. php中cookie的用法
  3. ASP.net 網站和Web Application的區別(轉)
  4. springboot + redis
  5. layui 折叠面板使用无效问题
  6. 编程输出2的90次方的精确值
  7. 【华为云技术分享】一文讲清C语言核心要点
  8. vue、cnpm不是内部文件_解决vue不是内部或者外部命令
  9. html刘海屏高度,iphone刘海屏网页适配方法
  10. 依赖 netty spring_面试官:如何写好一个 Spring 组件?懵圈!
  11. 在串口输入input keyevent发送按键值给机器
  12. Go语言:数组练习—冒泡排序
  13. 全局配置_再次强调:必须站在全局的角度去考虑客厅的内机配置
  14. webuploader多图片上传php,PHP 多图上传,图片批量上传插件,webuploader.js,百度文件上传插件...
  15. 【ELMAN回归预测】基于matlab鲸鱼算法优化ELMAN回归预测【含Matlab源码 1667期】
  16. 因果图法测试中国象棋马
  17. JHOST邀请码,2012年7月31日申请,2012年8月31日过期
  18. 正则表达式判断手机号码运营商
  19. web项目上云_联想Filez携手浙江中烟,发力“云”端,打造“烟草上云”新势能...
  20. 卷积神经网络 图像识别,卷积神经网络 图像处理

热门文章

  1. systemd 开机无法启动privoxy
  2. Winform Echarts 显示百度地图的用法(3)
  3. Calypso - Android和Evolution下的CalDAV/CardDAV/Web...
  4. Windows结束某个端口的进程
  5. 一对一软件开发:在一对一社交app源码中加入这个功能,很有用...
  6. php面试题之一——PHP核心技术(高级部分)
  7. linux下mycat测试安装
  8. dataTransfer对象
  9. 敏捷开发 我的经验(一)基本概念
  10. WebAPI 和 WebService的区别