MapReduce

提示:请先看完Yarn知识点yarn知识


文章目录

  • MapReduce
  • 前言
  • 一、MapReduce 优缺点
    • 1) 优点
    • 2) 缺点
  • 二 MapReduce核心思想
    • 1) MapReduce流程
    • 2) 序列化类型
  • 三、MapReduce API操作
    • 1.环境搭建
  • 三 Hadoop 序列化
  • 四 MapReduce 框架原理
    • 1)切块原理
    • 2)Shuffle 机制(重点面试)
    • 3)MapTask工作机制
    • 4)ReduceTask
  • 五 Join 应用
    • 1)需求
    • 2)代码实现
  • 六 数据清洗(ETL)
  • 总结
    • 1)输入数据接口:InputFormat
    • 2)逻辑处理接口:Mapper
    • 3)Partitioner 分区
    • 4)Comparable 排序
    • 5)Combiner 合并
    • 6)逻辑处理接口:Reducer
    • 7)输出数据接口:OutputFormat

前言

MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析
应用”的核心框架。
MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的
分布式运算程序,并发运行在一个 Hadoop 集群上

一、MapReduce 优缺点

1) 优点

(1)MapReduce 易于编程
它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量
廉价的 PC 机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一
样的。就是因为这个特点使得 MapReduce 编程变得非常流行。
(2)良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
(3)高容错性
MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求它具有很高
的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,
不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由 Hadoop 内部完成的。
(4)适合 PB 级以上海量数据的离线处理
可以实现上千台服务器集群并发工作,提供数据处理能力。

2) 缺点

(1)不擅长实时计算MapReduce 无法像 MySQL 一样,在毫秒或者秒级内返回结果。
(2)不擅长流式计算
流式计算的输入数据是动态的,而 MapReduce 的输入数据集是静态的,不能动态变化。
这是因为 MapReduce 自身的设计特点决定了数据源必须是静态的。
(3)不擅长 DAG(有向无环图)计算
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,
MapReduce 并不是不能做,而是使用后,每个 MapReduce 作业的输出结果都会写入到磁盘,
会造成大量的磁盘 IO,导致性能非常的低下。

二 MapReduce核心思想

1) MapReduce流程

(1)分布式的运算程序往往需要分成至少 2 个阶段。
(2)第一个阶段的 MapTask 并发实例,完全并行运行,互不相干。
(3)第二个阶段的 ReduceTask 并发实例互不相干,但是他们的数据依赖于上一个阶段
的所有 MapTask 并发实例的输出。
(4)MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce 阶段,如果用户的业
务逻辑非常复杂,那就只能多个 MapReduce 程序,串行运行。

采用反编译工具反编译源码,发现 WordCount 案例有 Map 类、Reduce 类和驱动类。且
数据的类型是 Hadoop 自身封装的序列化类型。

用户编写的程序分成三个部分:Mapper、Reducer 和 Driver。
1.Mapper阶段
(1)用户自定义的Mapper要继承自己的父类
(2)Mapper的输入数据是KV对的形式(KV的类型可自定义) (3)Mapper中的业务逻辑写在map()方法中
(4)Mapper的输出数据是KV对的形式(KV的类型可自定义) (5)map()方法(MapTask进程)对每一个<K,V>调用一次
2.Reducer阶段
(2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
(3)Reducer的业务逻辑写在reduce()方法中
(4)ReduceTask进程对每一组相同k的<k,v>组调用一次reduce()方法
3.Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是
封装了MapReduce程序相关运行参数的job对象

2) 序列化类型

三、MapReduce API操作

1.环境搭建

需求
在给定的文本文件中统计输出每一个单词出现的总次数
(1)输入数据

hadoop
lalalal
ssssss
dieawedas
hadooop
hadoop

(2)期望输出数据
hadoop 2
lalalal 1
ssssss 1
dieawedas 1
hadooop 1

(3) 创建一个maven 并加入下列maven

     <dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>2.9.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.4</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency>

(4)在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在
文件中填入

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

(5)编写 Mapper 类

package com.example.mapreduce;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {Text k = new Text();IntWritable v = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 1 获取一行String line = value.toString();
// 2 切割String[] words = line.split(" ");
// 3 输出for (String word : words) {k.set(word);context.write(k, v);}}
}

** (6)编写 Reducer 类**

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text,
IntWritable>{int sum;
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context
context) throws IOException, InterruptedException {// 1 累加求和
sum = 0;
for (IntWritable count : values) {sum += count.get();
}
// 2 输出v.set(sum);
context.write(key,v);
} }

(7)编写 Driver 驱动类

package com.example.mapreduce;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCountDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 1 获取配置信息以及获取 job 对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);
// 2 关联本 Driver 程序的 jarjob.setJarByClass(WordCountDriver.class);
// 3 关联 Mapper 和 Reducer 的 jarjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);
// 4 设置 Mapper 输出的 kv 类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出 kv 类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);
// 6 设置输入和输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交 jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

打成jar包去linux上运行

hadoop jar 包名 /work /outfile

在yarn-size-xml配置文件下配置以下,以下是实现HA高可用时的配置参数,分布式非HA的配置请自行修改参数

<property><!-- RM HTTP访问地址 默认:${yarn.resourcemanager.hostname}:8088--><name>yarn.resourcemanager.webapp.address.rm2</name><value>hadoop2:8088</value>
</property>
<property><!-- RM HTTP访问地址 默认:${yarn.resourcemanager.hostname}:8088--><name>yarn.resourcemanager.webapp.address.rm3</name><value>hadoop3:8088</value>
</property>

mapreduce-size.xml

    <property><name>yarn.application.classpath</name><value>#hadoop classpath 下的值
/exper/ha/hadoop-3.1.3/etc/hadoop,/exper/ha/hadoop-3.1.3/share/hadoop/common/lib/*,/exper/ha/hadoop-3.1.3/share/hadoop/common/*,/exper/ha/hadoop-3.1.3/share/hadoop/hdfs,/exper/ha/hadoop-3.1.3/share/hadoop/hdfs/lib/*,/exper/ha/hadoop-3.1.3/share/hadoop/hdfs/*,/exper/ha/hadoop-3.1.3/share/hadoop/mapreduce/lib/*,/exper/ha/hadoop-3.1.3/share/hadoop/mapreduce/*,/exper/ha/hadoop-3.1.3/share/hadoop/yarn,/exper/ha/hadoop-3.1.3/share/hadoop/yarn/lib/*,/exper/ha/hadoop-3.1.3/share/hadoop/yarn/*</value></property><property><name>mapreduce.application.classpath</name><value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value></property><property><name>yarn.app.mapreduce.am.env</name><value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value></property><property><name>mapreduce.map.env</name><value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value></property><property><name>mapreduce.reduce.env</name><value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value></property>

如果代码出错可以试试官网的示范

import java.io.IOException;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCount {public static class TokenizerMapperextends Mapper<Object, Text, Text, IntWritable>{private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}public static class IntSumReducerextends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
 bin/hadoop jar jar包地址 WordCount /user/joe/wordcount/input /user/joe/wordcount/output

注意:jdk要和hadoop里的jdk相同

三 Hadoop 序列化

1)什么是序列化
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁
盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换
成内存中的对象。
2)为什么要序列化
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能
由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”
对象,可以将“活的”对象发送到远程计算机。
3)为什么不用 Java 的序列化
Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带
很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,
Hadoop 自己开发了一套序列化机制(Writable)。
4)Hadoop 序列化特点:
(1)紧凑 :高效使用存储空间。
(2)快速:读写数据的额外开销小。
(3)互操作:支持多语言的交互
简单说就是当我们需要处理的数据写成一个bean对象时,我们的类型和hadoop的类型不同,所以hadoop有了一套序列化的过程。


import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
//1 继承 Writable 接口
public class FlowBean implements Writable {private long upFlow; //上行流量private long downFlow; //下行流量private long sumFlow; //总流量//2 提供无参构造public FlowBean() {}//3 提供三个参数的 getter 和 setter 方法public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}public void setSumFlow() {this.sumFlow = this.upFlow + this.downFlow;}//4 实现序列化和反序列化方法,注意顺序一定要保持一致@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(upFlow);dataOutput.writeLong(downFlow);dataOutput.writeLong(sumFlow);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.upFlow = dataInput.readLong();this.downFlow = dataInput.readLong();this.sumFlow = dataInput.readLong();}//5 重写 ToString@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;} }

四 MapReduce 框架原理

1)切块原理


数据块:Block 是 HDFS 物理上把数据分成一块一块。数据块是 HDFS 存储数据单位。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行
存储。数据切片是 MapReduce 程序计算输入数据的单位,一个切片会对应启动一个 MapTask
CombineTextInputFormat切片机制
框架默认的 TextInputFormat 切片机制是对任务按文件规划切片,不管文件多小,都会
是一个单独的切片,都会交给一个 MapTask,这样如果有大量小文件,就会产生大量的
MapTask,处理效率极其低下。

(a)判断虚拟存储的文件大小是否大于
setMaxInputSplitSize值,大于等于则单独形成
一个切片。
(b)如果不大于则跟下一个虚拟存储文件
进行合并,共同形成一个切片。
源码流程:

2)Shuffle 机制(重点面试)

Map 方法之后,Reduce 方法之前的数据处理过程称之为 Shuffle。

1) Partitioner分区
默认分区Partitioner分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个
key存储到哪个分区。
所以我们可以自定义分区类
(1)自定义类继承Partitioner,重写getPartition()方法

public class CustomPartitioner extends Partitioner<Text, FlowBean> {@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {// 控制分区代码逻辑
… …
//partition返回数字几就分到哪个分区
return partition;}
}

(2)在Job驱动中,设置自定义Partitioner

job.setPartitionerClass(CustomPartitioner.class);

(3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask

job.setNumReduceTasks(5);

分区注意
(1)如果ReduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
(2)如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;
(3)如 果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个
ReduceTask,最终也就只会产生一个结果文件 part-r-00000;
(4)分区号必须从零开始,逐一累加。
2) WritableComparable 排序
排序是MapReduce框架中最重要的操作之一。
MapTask和ReduceTask均会对数据按 照key进行排序。该操作属于
Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是
否需要。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
(1)增加自定义分区类

package com.atguigu.mapreduce.partitionercompable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner2 extends Partitioner<FlowBean, Text> {@Overridepublic int getPartition(FlowBean flowBean, Text text, int numPartitions)
{//获取手机号前三位String phone = text.toString();String prePhone = phone.substring(0, 3);//定义一个分区号变量 partition,根据 prePhone 设置分区号int partition;if("136".equals(prePhone)){partition = 0;}else if("137".equals(prePhone)){partition = 1;}else if("138".equals(prePhone)){partition = 2;}else if("139".equals(prePhone)){partition = 3;}else {partition = 4;}//最后返回分区号 partitionreturn partition;} }

(2)在驱动类中添加分区类
// 设置自定义分区器

job.setPartitionerClass(ProvincePartitioner2.class);

3) Combiner合并
(1)Combiner是MR程序中Mapper和Reducer之外的一种组件。
(2)Combiner组件的父类就是Reducer。
(3)Combiner和Reducer的区别在于运行的位置
Combiner是在每一个MapTask所在的节点运行;
(4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。
(5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv
应该跟Reducer的输入kv类型要对应起来。
(a)自定义一个 Combiner 继承 Reducer,重写 Reduce 方法

public class WordCountCombiner extends Reducer<Text, IntWritable, Text,
IntWritable> {private IntWritable outV = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context
context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum += value.get();}outV.set(sum);context.write(key,outV);} }

(b)在 Job 驱动类中设置:

job.setCombinerClass(WordCountCombiner.class);

因为 WordcountReducer 和 Combiner 基本上一样 所有可以将直接用 WordcountReducer

job.setCombinerClass(WordCountReducer.class);

(3)自定义一个 LogOutputFormat 类

package com.atguigu.mapreduce.outputformat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogOutputFormat extends FileOutputFormat<Text, NullWritable>
{@Overridepublic RecordWriter<Text, NullWritable>
getRecordWriter(TaskAttemptContext job) throws IOException,
InterruptedException {//创建一个自定义的 RecordWriter 返回LogRecordWriter logRecordWriter = new LogRecordWriter(job);return logRecordWriter;} }

(4)编写 LogRecordWriter 类

package com.atguigu.mapreduce.outputformat;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class LogRecordWriter extends RecordWriter<Text, NullWritable> {private FSDataOutputStream atguiguOut;private FSDataOutputStream otherOut;public LogRecordWriter(TaskAttemptContext job) {try {//获取文件系统对象FileSystem fs = FileSystem.get(job.getConfiguration());//用文件系统对象创建两个输出流对应不同的目录atguiguOut = fs.create(new Path("d:/hadoop/atguigu.log"));otherOut = fs.create(new Path("d:/hadoop/other.log"));} catch (IOException e) {e.printStackTrace();}}@Overridepublic void write(Text key, NullWritable value) throws IOException,
InterruptedException {String log = key.toString();//根据一行的 log 数据是否包含 atguigu,判断两条输出流输出的内容if (log.contains("atguigu")) {atguiguOut.writeBytes(log + "\n");} else {otherOut.writeBytes(log + "\n");}}@Overridepublic void close(TaskAttemptContext context) throws IOException,
InterruptedException {//关流IOUtils.closeStream(atguiguOut);IOUtils.closeStream(otherOut);} }

(5)编写 LogDriver 类

package com.atguigu.mapreduce.outputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogDriver {public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(LogDriver.class);job.setMapperClass(LogMapper.class);job.setReducerClass(LogReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//设置自定义的 outputformatjob.setOutputFormatClass(LogOutputFormat.class);FileInputFormat.setInputPaths(job, new Path("D:\\input"));// 虽 然 我 们 自 定 义 了 outputformat , 但 是 因 为 我 们 的 outputformat 继承自
fileoutputformat//而 fileoutputformat 要输出一个_SUCCESS 文件,所以在这还得指定一个输出目录FileOutputFormat.setOutputPath(job, new Path("D:\\logoutput"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);} }

3)MapTask工作机制


(1)Read 阶段:MapTask 通过 InputFormat 获得的 RecordReader,从输入 InputSplit 中
解析出一个个 key/value。
(2)Map 阶段:该节点主要是将解析出的 key/value 交给用户编写 map()函数处理,并
产生一系列新的 key/value。
(3)Collect 收集阶段:在用户编写 map()函数中,当数据处理完成后,一般会调用
OutputCollector.collect()输出结果。在该函数内部,它会将生成的 key/value 分区(调用
Partitioner),并写入一个环形内存缓冲区中。
(4)Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,
生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。 溢写阶段详情:
步骤 1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号
Partition 进行排序,然后按照 key 进行排序。这样,经过排序后,数据以分区为单位聚集在
一起,且同一分区内所有数据按照 key 有序。
步骤 2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文
件 output/spillN.out(N 表示当前溢写次数)中。如果用户设置了 Combiner,则写入文件之
前,对每个分区中的数据进行一次聚集操作。
步骤 3:将分区数据的元信息写到内存索引数据结构 SpillRecord 中,其中每个分区的元
信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大
小超过 1MB,则将内存索引写到文件 output/spillN.out.index 中。
(5)Merge 阶段:当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,
以确保最终只会生成一个数据文件。
当所有数据处理完后,MapTask 会将所有临时文件合并成一个大文件,并保存到文件
output/file.out 中,同时生成相应的索引文件 output/file.out.index。 在进行文件合并过程中,MapTask 以分区为单位进行合并。对于某个分区,它将采用多
轮递归合并的方式。每轮合并 mapreduce.task.io.sort.factor(默认 10)个文件,并将产生的文
件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
让每个 MapTask 最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量
小文件产生的随机读取带来的开销。

4)ReduceTask


(1)Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数
据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
(2)Sort 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁
盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照 MapReduce 语义,用
户编写 reduce()函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一
起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了
局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可。
(3)Reduce 阶段:reduce()函数将计算结果写到 HDFS 上

五 Join 应用

1)需求

2)代码实现

(1)创建商品和订单合并后的 TableBean 类

package com.atguigu.mapreduce.reducejoin;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class TableBean implements Writable {private String id; //订单 idprivate String pid; //产品 idprivate int amount; //产品数量private String pname; //产品名称private String flag; //判断是 order 表还是 pd 表的标志字段public TableBean() {}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname = pname;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}@Overridepublic String toString() {return id + "\t" + pname + "\t" + amount;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(id);out.writeUTF(pid);out.writeInt(amount);out.writeUTF(pname);out.writeUTF(flag);}@Overridepublic void readFields(DataInput in) throws IOException {this.id = in.readUTF();this.pid = in.readUTF();this.amount = in.readInt();this.pname = in.readUTF();this.flag = in.readUTF();} }

(2)编写 TableMapper 类

package com.atguigu.mapreduce.reducejoin;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class TableMapper extends Mapper<LongWritable,Text,Text,TableBean>
{private String filename;private Text outK = new Text();private TableBean outV = new TableBean();@Overrideprotected void setup(Context context) throws IOException,
InterruptedException {//获取对应文件名称InputSplit split = context.getInputSplit();FileSplit fileSplit = (FileSplit) split;filename = fileSplit.getPath().getName();}@Overrideprotected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {//获取一行String line = value.toString();//判断是哪个文件,然后针对文件进行不同的操作if(filename.contains("order")){ //订单表的处理String[] split = line.split("\t");//封装 outKoutK.set(split[1]);//封装 outVoutV.setId(split[0]);outV.setPid(split[1]);outV.setAmount(Integer.parseInt(split[2]));outV.setPname("");outV.setFlag("order");}else { //商品表的处理String[] split = line.split("\t");//封装 outKoutK.set(split[0]);//封装 outVoutV.setId("");outV.setPid(split[0]);outV.setAmount(0);outV.setPname(split[1]);outV.setFlag("pd");}//写出 KVcontext.write(outK,outV);} }

(3)编写 TableReducer 类

package com.atguigu.mapreduce.reducejoin;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
public class TableReducer extends Reducer<Text,TableBean,TableBean,
NullWritable> {@Overrideprotected void reduce(Text key, Iterable<TableBean> values, Context
context) throws IOException, InterruptedException {ArrayList<TableBean> orderBeans = new ArrayList<>();TableBean pdBean = new TableBean();for (TableBean value : values) {//判断数据来自哪个表if("order".equals(value.getFlag())){ //订单表//创建一个临时 TableBean 对象接收 valueTableBean tmpOrderBean = new TableBean();try {BeanUtils.copyProperties(tmpOrderBean,value);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}//将临时 TableBean 对象添加到集合 orderBeansorderBeans.add(tmpOrderBean);}else { //商品表try {BeanUtils.copyProperties(pdBean,value);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}}//遍历集合 orderBeans,替换掉每个 orderBean 的 pid 为 pname,然后写出for (TableBean orderBean : orderBeans) {orderBean.setPname(pdBean.getPname());//写出修改后的 orderBean 对象context.write(orderBean,NullWritable.get());}} }

(4)编写 TableDriver 类

package com.atguigu.mapreduce.reducejoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class TableDriver {public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {Job job = Job.getInstance(new Configuration());job.setJarByClass(TableDriver.class);job.setMapperClass(TableMapper.class);job.setReducerClass(TableReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableBean.class);job.setOutputKeyClass(TableBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path("D:\\input"));FileOutputFormat.setOutputPath(job, new Path("D:\\output"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);} }

六 数据清洗(ETL)

“ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取
(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL 一词较常用在数据仓
库,但其对象并不限于数据仓库
在运行核心业务 MapReduce 程序之前,往往要先对数据进行清洗,清理掉不符合用户
要求的数据。清理的过程往往只需要运行 Mapper 程序,不需要运行 Reduce 程序。
(1)编写 WebLogMapper 类

package com.atguigu.mapreduce.weblog;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WebLogMapper extends Mapper<LongWritable, Text, Text,
NullWritable>{@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {// 1 获取 1 行数据
String line = value.toString();
// 2 解析日志
boolean result = parseLog(line,context);
// 3 日志不合法退出
if (!result) {return;
}
// 4 日志合法就直接写出
context.write(value, NullWritable.get());
}
// 2 封装解析日志的方法
private boolean parseLog(String line, Context context) {// 1 截取
String[] fields = line.split(" ");
// 2 日志长度大于 11 的为合法
if (fields.length > 11) {return true;
}else {return false;
} } }

(2)编写 WebLogDriver 类

package com.atguigu.mapreduce.weblog;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WebLogDriver {public static void main(String[] args) throws Exception {// 输入输出路径需要根据自己电脑上实际的输入输出路径设置args = new String[] { "D:/input/inputlog", "D:/output1" };
// 1 获取 job 信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 加载 jar 包
job.setJarByClass(LogDriver.class);
// 3 关联 map
job.setMapperClass(WebLogMapper.class);
// 4 设置最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 设置 reducetask 个数为 0
job.setNumReduceTasks(0);
// 5 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 提交boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);
} }

总结

1)输入数据接口:InputFormat

(1)默认使用的实现类是:TextInputFormat
(2)TextInputFormat 的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为
key,行内容作为 value 返回。
(3)CombineTextInputFormat 可以把多个小文件合并成一个切片处理,提高处理效率。

2)逻辑处理接口:Mapper

用户根据业务需求实现其中三个方法:map() setup() cleanup ()

3)Partitioner 分区

(1)有默认实现 HashPartitioner,逻辑是根据 key 的哈希值和 numReduces 来返回一个
分区号;key.hashCode()&Integer.MAXVALUE % numReduces
(2)如果业务上有特别的需求,可以自定义分区。

4)Comparable 排序

(1)当我们用自定义的对象作为 key 来输出时,就必须要实现 WritableComparable 接
口,重写其中的 compareTo()方法。
(2)部分排序:对最终输出的每一个文件进行内部排序。
(3)全排序:对所有数据进行排序,通常只有一个 Reduce。 (4)二次排序:排序的条件有两个。

5)Combiner 合并

Combiner 合并可以提高程序执行效率,减少 IO 传输。但是使用时必须不能影响原有的
业务处理结果。

6)逻辑处理接口:Reducer

用户根据业务需求实现其中三个方法:reduce() setup() cleanup ()

7)输出数据接口:OutputFormat

(1)默认实现类是 TextOutputFormat,功能逻辑是:将每一个 KV 对,向目标文本文件
输出一行。
(2)用户还可以自定义 OutputFormat。

MapReduce重点知识相关推荐

  1. 计算机二级vb重点知识,计算机二级《VB》历年考试重点知识

    计算机二级<VB>历年考试重点知识 一.变量或常量的命名规则 1)必须以字母或汉字开头,由字母.汉字.数字或下划线组成,长度≤255个字符; 2)不能使用VB中的关键字,并尽量不与VB中标 ...

  2. 【springmvc+mybatis项目实战】杰信商贸-6.重点知识回顾

    1.重点知识回顾 Maven 1)覆盖仓库文件,实际企业开发,公司会架一个测试服务器,在测试服务器中架私服.我们开发人员的程序,都连接私服.当本地没有项目中要使用的jar,Myeclipse mave ...

  3. Hadoop之MapReduce面试知识复习

    Hadoop之MapReduce面试知识复习 目录 谈谈Hadoop序列化和反序列化及自定义bean对象实现序列化? FileInputFormat切片机制 在一个运行的Hadoop 任务中,什么是I ...

  4. 计算机的概念重点,2019考研计算机重点知识:十二大基础概念释义

    2019考研复习正在紧张的进行中,为了更好的帮助同学们学习.新东方在线为大家整理了"2019计算机考研重点知识:十二大基础概念释义"的相关信息,提醒各位考生要合理安排复习时间,做好 ...

  5. 计算机vf的讲解,计算机二级vf重点知识讲解.doc

    计算机二级vf重点知识讲解 谢谢浏览 一.算法的基本概念 算法是指解题方案的准确而完整的描述.注意:算法不等于程序,也不等于计算方法. 算法的基本特征:1)可行性 2)确定性 3)有穷性 4)拥有足够 ...

  6. 计算机系统具有整体性质,2017年计算机二级《公共基础》重点知识

    2017年计算机二级<公共基础>重点知识 计算机系统实现自动维护和诊断的技术.实施维护诊断自动化的主要软件为功能检查程序和自动诊断程序.下面是小编整理的关于计算机二级<公共基础> ...

  7. 《计算机操作系统》重点知识笔记整理(一)

    <计算机操作系统>重点知识总结1(1-4章)

  8. ajax心得体会论文,AJAX重点知识的心得体会

    下面就为大家带来一篇 AJAX重点知识的心得体会.学习还是有点帮助的,给大家做个参考吧. AJAX是什么? 是Asynchronous Javascript And XML的首字母的缩写, 它不是一门 ...

  9. C#重点知识详解(转)

    C#重点知识详解 在微软的.NET推出后,关于C#的有关文章也相继出现,作为微软的重要的与JAVA抗衡的语言,C#具有很多优点.本文将选一些C#语言中的重要知识详细介绍, 第一章:参数 1.1 IN ...

最新文章

  1. 使用Jasper Reports以Java创建报告
  2. linux 进程的vss rss uss,内核/内存管理中的VSS/RSS/PSS/USS
  3. html5表白页面3d,七夕节表白3d相册制作(html5+css3)
  4. Python:通过SNMP协议获取华为交换机的ARP地址表
  5. jquery获取radio的值,a标签传值
  6. 【clickhouse】clickhouse 同时查询数过多 Too many simultaneous queries
  7. 1087 1 10 100 1000
  8. Mac上恢复已删除或未保存的Word文档该怎么做
  9. Python爬虫 | 手把手教你扒一扒贝壳网成交房源数据
  10. 163-H桥电机驱动电路
  11. gcc编译部分编译选项
  12. 前端-埋点-理念-通识-浅谈
  13. AHRS 原理算法+代码实现(好文记录)
  14. web work 。。。
  15. Lumerical MODE solution FBG光栅透射谱的仿真,官网fbg.lms例子的详解
  16. macos 打开终端弹出:(eval):export:1: not valid in this context: Fusion.app/Contents/Public
  17. 《设计进化论日本版式设计速查手查手册》菜单版式
  18. 如何解决 fs.renameSync() 跨区移动文件的问题
  19. 无情剑梦断危情java_无情剑梦断危情java版下载-无情剑梦断危情最新版下载v1.1.0 安卓版 - 星光下载...
  20. 批处理怎么调用计算机名,Reg命令使用详解 批处理操作注册表必备

热门文章

  1. 广东省谷歌地球高程DEM等高线下载
  2. 项目经理start法则_开放科学项目的7条经验法则
  3. 这届年轻人,是最孤独的一代吗?
  4. Python案例之QQ空间自动登录程序实现
  5. JSON.parse 转换字符串样式的数组
  6. 径向基函数和粗糙集在进化多目标优化中的应用
  7. 数学建模——多元统计分析例题及程序
  8. 【2020-09-30】一个适合爬虫练手的网站--中国土地市场网
  9. 计算机硬盘做u盘启动不了,u盘启动盘读不出来硬盘如何解决_电脑U盘启动找不到硬盘的解决教程...
  10. linux cups工作原理,Linux打印系统CUPS原理分析