MapReduce总结 + 相关Hadoop权威指南读书笔记(未完......欢迎补充,互相学习)
文章目录
- MapReduce概述
- MapReduce优缺点
- MapReduce核心思想
- MapReduce进程
- MapReduce编程规范
- WordCount 案例实操
- 本地测试
- 集群测试
- Hadoop序列化
- MapReduce框架原理
- InputFormat数据输入
- 切片与MapTask并行度决定机制
- Shuffle过程
- Hadoop权威指南 中这样解释Shuffle
- map端
- reduce端
- Partition分区
- WritableComparable 排序
- 对于MapTask:
- 对于ReduceTask
- 排序分类
- 部分排序
- 全排序
- 辅助排序
- 二次排序
- 全排序案例
- 区内排序案例
- Combiner合并
- 自定义Combiner类
- Combiner合并案例
- OutputFormat数据输出
- OutputFormat接口实现类
- 自定义OutputFormat
- 自定义OutputFormat案例
- MapTask工作机制
- ReduceTask工作机制
- 数据清洗
- 数据清洗案例
MapReduce概述
MapReduce是一种可用于数据处理的编程模型
MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
MapReduce的核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上
MapReduce优缺点
优点
MapReduce易于编程
- 它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价PC机器上运行。
良好的扩展性
- 当计算资源不能得到满足的时候,可以通过简单地增加机器来扩展它的计算能力。
高容错性
- MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如,其中其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。
适合PB级以上海量数据的离线处理
- 可实现上千台服务器集群的并发工作,提供数据处理能力
缺点
不擅长实时计算
- MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果
不擅长流式计算
- 流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的
不擅长DAG(有向无环图)计算
- 多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下
MapReduce核心思想
MapReduce任务过程分为两个处理阶段:
map阶段,第一阶段的MapTask并发实例,完全并行运行,互不相干
map阶段可以作为数据准备阶段,通过这种方式来准备数据,使reduce函数能够继续对它进行处理
map阶段还是一个比较适合出去已损记录的地方
reduce阶段,第二阶段的ReduceTask并发实例,互不相干,但是它们的输入数据依赖于上一阶段的所有MapTask并发实例的输出
(每阶段都以键值对作为输入和输出,其类型由程序员来选择。
程序员还需要写两个函数:map函数和reduce函数。)
MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能够多个MapReduce程序,串行运行。
MapReduce进程
一个完整的MapReduce程序在分布式运行时由三类实例进程:
MrAppMaster,负责整个程序的的过程调度及状态协调
MapTask,负责Map阶段的整个数据处理流程
ReduceTask,负责Reduce阶段的整个数据处理流程
MapReduce编程规范
用户编写的程序分为三个部分:Mapper、Reducer、Driver
Mapper阶段:
用户自定义的Mapper要继承自己的父类
Mapper的输入数据是KV对的形式
- 一般 K 是偏移量,V是这一行的内容
Mapper中的业务逻辑写在map()方法中
Mapper的输出数据是KV对的形式
- K、V的类型与计算逻辑有关系
map()方法(MapTask进程)对每一个K,V调用一次
- 一般K 是偏移量,V是这一行的内容
Reducer阶段
用户自定义的Reducer,要继承自己的父类
Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
Reducer的业务逻辑写在reduce()方法中
reduce()方法(ReduceTask进程)对每一组相同k的<K,V>组调用一次reduce()方法
Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象
WordCount 案例实操
本地测试
WordCount Windows本地案例实操
集群测试
用maven打jar包,显示BUILD SUCCESS
,就可以在target目录下,看到两个jar包,一个包含依赖,另一个不包含依赖
<build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.6.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>
选择不包含依赖的jar包,修改jar包的名字(方便,改不改都行),上传到集群,启动hadoop集群
sbin/start-dfs.sh
sbin/start-yarn.sh
然后执行指令
注意,wcinput下,事先上传了测试数据
hadoop jar wc.jar mapreduce.wordcount.WordCountDriver /wcinput / wcoutput
执行成功以后,就会在hdfs的根目录下,发现一个新创建的wcoutput文件夹,其中的part-r-00000文件记录了统计结果
Hadoop序列化
【Hadoop】序列化、反序列化、序列化案例实操(包括Windows本地运行,hadoop集群运行)
MapReduce框架原理
InputFormat数据输入
切片与MapTask并行度决定机制
MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。
MapTask并行度决定机制
数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储单位
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。
- 一个Job的Map阶段并行度由客户端在提交Job时的切片数决定
- 每一个split切片分配一个MapTask并行实时处理
- 默认情况下,切片大小=BlockSize
- 切片时不考虑数据整体,而是逐个针对每一个文件单独切片
FileInputFormat切片机制
- 简单地按照文件的内容长度进行切片
- 切片大小,默认等于Block大小
- 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
源码中,计算切片大小的公式:
Math.max(minSize,Math.min(maxSize,blockSize))
maxSize(切片最大值):参数如果调得比blockSize小,则会让切片变小,而且就等于配置得这个参数的值
minSize(切片最小值):参数调得比blockSize大,则可以让切片变得比blockSize还大
在运行MapReduce程序时,输入得文件格式包括:
- 基于行的日志文件
- 二进制格式文件
- 数据库表
- …
针对不同的数据类型,MapReduce使用FileInputFormat接口的实现类来读取这些数据,FileInputFormat常见的接口实现类包括:
- TextInputFormat
- 默认的FileInputFormat实现类
- 按行读取每条记录
- 键是存储该行在整个文件中的起始字节偏移量,LongWritable类型
- 值是这行的内容,不包括任何行终止符(回车换行符)
- KeyValueInputFormat
- NLineInputFormat
- CombineTextInputFormat
- 自定义InputFormat
CombineTextInputFormat切片机制
默认的切片机制存在的问题:
框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小、都会是一个单独的切片,都会交给一个MapTask,如果有大量小文件,就会产生大量的MapTask,处理效率极其低下
CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多样多个小文件就可以交给一个MapTask处理
虚拟存储切片最大值设置:
CombineTextInputFormat.setMaxInputSplitSize(job,4194304);//4m
注意:虚拟存储切片最大值最好根据实际的小文件大小情况来设置具体的值
Shuffle过程
map方法之后,reduce方法之前的数据处理过程称之为Shuffle
- MapTask收集map()方法输出的kv对,放到内存缓冲区中
- 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
- 多个溢出文件会被合并成大的溢出文件
- 在溢出过程及合并的过程中,都要调用Partition进行分区和针对key进行排序
- ReduceTask根据自己的分区号,去各个MapTask机器上拉取相应的结果分区数据
- Reducetask会抓取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)
- 合并成大文件后,Shuffle的过程也就结束了,后面进去ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)
注意:
- Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘IO的次数越少,执行速度就越快
- 缓冲区大小可以通过参数调整,参数:mapreduce.task.io.sort.mb,默认100M
Hadoop权威指南 中这样解释Shuffle
MapReduce确保每个reducer的输入都是按键排序的。系统执行排序、将map输出作为输入传给reduce的过程称为shuffle。
map端
map函数开始产生输出时,并不是简单地将它写到磁盘,而是利用缓冲的方式写到内存并出于效率的考虑进行预排序。
每个map任务都有一个环形内存缓冲区用于存储任务输出。在默认情况下,缓冲区的大小为100M。一旦缓冲区达到阈值(默认为80%),一个后台线程便开始把内容溢出到磁盘。在溢出写到磁盘过程中,map输出继续写到缓冲区,但如果在此期间缓冲区被填满,map会被阻塞,直到写磁盘过程完成。
在写磁盘之前,线程首先根据数据最终要传的reducer把数据划分成相应的分区。在每个分区中,后台线程按键进行内存中的排序,如果有一个combiner函数,它就在排序后的输出上运行。运行combiner函数使得map输出结果更紧凑,因此减少写到磁盘的数据和传递给reducer的数据。
每次内存缓冲区达到溢出阈值,就会新建一个溢出文件,因此在map任务写完其最后一个输出记录之后,会有几个溢出文件。在任务完成之前,溢出文件被合并成一个已分区且已排序的输出文件。
如果至少存在三个溢出文件,则combiner就会在输出文件写到磁盘之前再次运行。如果只有一到两个溢出文件,那么由于map输出规模减少,因而不值得调用combiner带来的开销,因此不会为该map输出再次运行combiner。
reduce端
reduce任务需要在集群上若干个map任务的map输出作为其特殊的分区文件。每个map任务的完成时间可能不同,因此在每个任务完成时,reduce任务就开始复制其输出。--------------------复制阶段
Q:Reducer如何知道从哪台机器取得map输出
A:map任务成功完成后,它们会使用心跳机制通知它们的application master。对于指定作业,application master直到map输出和主机位置之间的映射关系。reducer中的一个线程定期询问master以便获取map输出主机的位置,直到获得所有输出位置。
如果map的输出相当小,会被复制到reduce任务JVM的内存,否则,map输出被复制到磁盘。
随着磁盘上副本增多,后台线程会将它们合并为更大的、排序好的文件。这会为后面的合并节省一些时间。
复制完所有map输出后,reduce任务进入排序阶段(更恰当的说法是合并阶段,因为排序是在map端进行的),这个节点将合并map的输出,并维持其顺序。
在reduce阶段,对已排序的每个键调用reduce函数,此阶段的输出直接写到输出文件系统,一般为HDFS。
Partition分区
- 如果ReduceTask的数量 > getPartition的结果数,则会产生几个空的输出文件part-r-000xx
- 如果1 < ReduceTask数量 < getPartition的结果数,则有一部分分区数据无处安放,会报错
- 如果ReduceTask数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给一个ReduceTask,最终也只会产生一个结果文件 part-r-00000
- 分区号必须从0开始,逐一累加
WritableComparable 排序
排序是MapReduce框架中最重要的操作之一。
MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
对于MapTask:
它会将处理的结果暂时放到环形缓冲区,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,将这些有序数据溢写到磁盘,而当数据处理完毕后,它会对磁盘上的所有文件进行归并排序。
对于ReduceTask
它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写到磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大的文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
排序分类
部分排序
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
全排序
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置了一个ReduceTask。但该方法在处理大型文件时,效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构
辅助排序
在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较,不相同的)的key进入同一个reduce方法时,可以采用分组排序。
二次排序
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
自定义排序原理
bean对象作为key传输,需要实现WritableComparable接口,重写compareTo方法,就可以实现排序
全排序案例
全排序案例 - 视频教程
把【Hadoop】序列化、反序列化、序列化案例实操(包括Windows本地运行,hadoop集群运行) 这个案例的输出作为输入
排序的规则是:
- 当总流量不同时,按照总流量的降序排列
- 当总流量相同时,按照总上行流量升序排列
让FlowBean继承WritableComparable接口
在compareTo方法中编写排序的逻辑
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class FlowBean implements WritableComparable<FlowBean> {private long upFlow;private long downFlow;private long sumFlow;public FlowBean() {}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}public void setSumFlow() {this.sumFlow = this.upFlow + this.downFlow;}//重写toString方法@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}//实现序列化和反序列化方法,注意顺序一定要一致@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}@Overridepublic void readFields(DataInput in) throws IOException {this.upFlow = in.readLong();this.downFlow = in.readLong();this.sumFlow = in.readLong();}//二次排序@Overridepublic int compareTo(FlowBean o) {if (this.sumFlow > o.sumFlow){//按照降序排列return -1;}else if (this.sumFlow < o.sumFlow){return 1;}else {//在总流量相同的情况下,按上行流量的升序排序if (this.upFlow > o.upFlow){return 1;}else if (this.upFlow < o.upFlow){return -1;}else {return 0;}}}
}
Mapper类中,map方法是要获取每一行的数据,提取出每个电话号码的总上行流量,总下行流量,总流量,封装到FlowBean对象中,然后context.write()写出,需要注意的是:
排序是对key进行排序,所以,Mapper的输出的key得是FlowBean,输出的Value是 Text类型的电话号码
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {private FlowBean outK = new FlowBean();private Text outV = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException {String str = value.toString();String[] split = str.split("\t");outK.setUpFlow(Long.parseLong(split[1]));outK.setDownFlow(Long.parseLong(split[2]));outK.setSumFlow();outV.set(split[0]);context.write(outK, outV);}
}
相同的key会传入一个Reduce方法,总上行流量和总流量相同的FlowBean对象,会进入同一个reduce方法,为了避免出现有多个相同的情况,使用for循环
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {@Overrideprotected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {//在FlowBean中写了两个比较条件//当总流量不同的时候,按照总流量的降序排列//当总流量相同的时候,按照总上行流量的升序排列//所以这里传进来的,应该是总流量和上行流量相同的for (Text value : values) {context.write(value,key);}}
}
Driver类中需要指出Map阶段的输出类型,和序列化、反序列化时的案例输出类型刚好颠倒
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FlowDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);job.setJarByClass(FlowDriver.class);job.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}}
区内排序案例
在全排序案例的基础上,又增加了一个需求,即根据手机号的前三位数字,将最终的结果分别写入多个文件中
增加自定义分区类
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;public class PhonePartitoner extends Partitioner<FlowBean, Text> {@Overridepublic int getPartition(FlowBean flowBean, Text text, int numPartitions) {String prePhone = text.toString().substring(0, 3);switch (prePhone) {case "136":return 0;case "137":return 1;case "138":return 2;case "139":return 3;default:return 4;}}
}
在驱动类中设置自定义分区类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FlowDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);job.setJarByClass(FlowDriver.class);job.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//指定自定义分区的类job.setPartitionerClass(PhonePartitoner.class);job.setNumReduceTasks(5);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}
}
Combiner合并
- Combiner是MR程序中Mapper和Reducer之外的一种组件
- Combiner组件的父类就是Reducer
- Combiner和Reducer的区别在于运行的位置
- Combiner是在每一个MapTask所在的节点运行
- Reducer是接收全局所有Mapper的输出结果
- Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输
- Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner输出的KV应该跟Reducer的输出KV类型要对应起来
自定义Combiner类
需要继承Reducer类,重写reduce方法
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable outV = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum += value.get();}outV.set(sum);context.write(key, outV);}
}
Combiner合并案例
需求:统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量,即采用Combiner功能。
期望:Combiner输入数据越多,输出时经过合并,输出数据降低。
具体步骤:
- 在已有的WordCount基础上,增加一个Combiner类,继承Reducer
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable outV = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum += value.get();}outV.set(sum);context.write(key, outV);}
}
- 在WordCountDriver中驱动类中,指定Combiner
job.setCombinerClass(WordCountCombiner.class);
OutputFormat数据输出
OutputFormat接口实现类
OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。
下面是几种常见的OutputFormat实现类:
- MapFileOuputFormat
- SequenceFileOutputFormat
- TextOutputFormat
- DBOutputFormat
自定义OutputFormat
步骤:
- 自定义一个类继承FileOutputFormat
- 改写RecordWriter,具体改写输出数据的方法write()
自定义OutputFormat案例
1 需求
过滤输入的log日志,包含atguigu的网站输出到 atguigu.log 这个文件中,不包含atguigu的网站输出到other.log 文件中
2 输入数据
http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.atguigu.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com
3 输出数据
atguigu.log 文件中,包含
http://www.atguigu.com
other.log 文件中,包含
MapReduce默认对key进行排序,所以输出的内容是有序的,默认按照字典序
http://cn.bing.com
http://www.baidu.com
http://www.google.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sina.com
http://www.sindsafa.com
http://www.sohu.com
4 自定义一个OutputFormat类
创建一个类LogRecordWriter继承RecordWriter
- 创建两个文件的输出流:atguiguOut、otherOut
- 如果输出数据包含atguigu,输出到atguiguOut流
- 如果输出数据不包含atguigu,输出到otherOut流
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {return new LogRecordWriter(job);}
}
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class LogRecordWriter extends RecordWriter<Text, NullWritable> {private FSDataOutputStream atguiguOut;private FSDataOutputStream otherOut;public LogRecordWriter(TaskAttemptContext job) {//获取文件系统对象try {FileSystem fs = FileSystem.get(job.getConfiguration());//用文件系统创建两个流对应的目录atguiguOut = fs.create(new Path("D:\\output\\log\\atguigu.log"));otherOut = fs.create(new Path("D:\\output\\log\\other.log"));} catch (IOException e) {e.printStackTrace();}}@Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException {String log = key.toString();if (log.contains("atguigu")) {atguiguOut.writeBytes(log + "\n");} else {otherOut.writeBytes(log + "\n");}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {//关流IOUtils.closeStream(atguiguOut);IOUtils.closeStream(otherOut);}
}
5 驱动类
要将自定义的输出格式组件设置到job中
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class LogDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Job job = Job.getInstance(new Configuration());job.setJarByClass(LogDriver.class);job.setMapperClass(LogMapper.class);job.setReducerClass(LogReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//设置自定义的outputformatjob.setOutputFormatClass(LogOutputFormat.class);//设置的输入和输出路径FileInputFormat.setInputPaths(job, new Path("D:\\input\\log\\"));//虽然定义了outputformat,但是因为自定义的outputformat继承自fileoutputformat//而fileoutputformat需要输出一个_SUCCESS文件,所以还是得指定一个输出目录FileOutputFormat.setOutputPath(job, new Path("D:\\output\\log\\"));boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}
}
MapTask工作机制
MapTask可分为五个阶段:
Read阶段
- MapTask通过InputFormat获得RecordReader,从输入InputSplit中解析出一个个key/value
Map阶段
- 该阶段主要是将解析出的key/value交给用户编写的map()函数处理,并产生一系列新的key/value
Collect收集阶段
- 在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它将会生成key/value分区(调用Patitioner),并写入一个环形内存缓冲区中
Spill(溢写)阶段
- 当缓冲区满后,或所有文件全部读取完之后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,需要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
Merge阶段
当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index
在文件合并的过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并mapreduce.task.io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件
让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销
溢写阶段详情
利用快速排序算法对缓存区内的数据进行排序,排序方式是:先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key排序。
按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写的次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括:在临时文件中的偏移量、压缩前数据大小、压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到output/spillN.out.index中。
ReduceTask工作机制
ReduceTask分为三个阶段:
Copy阶段
- ReduceTask从各个MapTask上远程拷贝一片数据(这是ReduceTask主动拉取的过程),并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中
Sort阶段
- 在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用基于排序的策略。由于各个MapTask已经实现了对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
Reduce阶段
- reduce()函数将计算结果写到HDFS上。
数据清洗
ETL Extract-Transform-Load,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只运行Mapper程序,不需要运行Reduce程序。
数据清洗案例
需求 : 去除日志中字段个数小于等于11的日志
期望 : 每行字段长度都大于11
需求分析 : 需要在Map阶段对输入的数据根据规则进行过滤清洗
Mapper
package mapreduce.etl;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class WebLogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {String line = value.toString();boolean res = parseLog(line, context);if (!res) {return;}context.write(value, NullWritable.get());}private boolean parseLog(String line, Context context) {String[] split = line.split(" ");return split.length > 11;}
}
Driver
package mapreduce.etl;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WebLogDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {args = new String[]{"D:\\input\\etl\\", "D:\\output\\etl\\"};Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(WebLogDriver.class);job.setMapperClass(WebLogMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);job.setNumReduceTasks(0);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//前者表示运行进度信息将输出给用户,后者表示仅仅等待作业结束.boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}}
MapReduce总结 + 相关Hadoop权威指南读书笔记(未完......欢迎补充,互相学习)相关推荐
- mysql数据库权威指南_MySQL_MySQL权威指南读书笔记(三),第二章:MYSQL数据库里面的数 - phpStudy...
MySQL权威指南读书笔记(三) 第二章:MYSQL数据库里面的数据 用想用好MYSQL,就必须透彻理解MYSQL是如何看待和处理数据的.本章主要讨论了两个问题:一是SQL所能处理的数据值的类型:二是 ...
- MongoDB权威指南读书笔记——CRUD
插入并保存文档 插入是向MongoDB中添加数据的基本方法.可以使用Insert方法向目标集合插入一个文档:db.foo.insert({"bar" : "baz&quo ...
- HTTP权威指南读书笔记
<<HTTP权威指南>>读书笔记 第一部分:Web的基础 第1章:HTTP概述 主要内容 1.什么是HTTP 2.HTTP的基本组件 HTTP HTTP:HTTP(Hypert ...
- HTML5权威指南----读书笔记
<!DOCTYPE html> <html> <head><meta name = 'keywords' content="HTML5权威指南--- ...
- 计算机网络和http权威指南 读书笔记
计算机网络笔记 网络层 网络层向上提供无连接的,尽最大努力交付的数据报服务 网络层不提供数据质量承诺 物理层使用的中间设备叫转发器repeater 数据链路层叫网桥bridge 网络层叫路由器rout ...
- mongodb权威指南读书笔记
mongo命令 近期抽了一点时间,从头到尾梳理了这本书,其中书中的一些例子结合日常使用的基础,尽量的复现出来了.因为我使用的是mongo4.4.5的版本.所以部分例如group和mapreduce不再 ...
- 《Hadoop 权威指南 - 大数据的存储与分析》学习笔记
第一章 初识Hadoop 1.2 数据的存储与分析 对多个硬盘中的数据并行进行读/写数据,有以下两个重要问题: 硬件故障问题.解决方案:复制(replication),系统保存数据的副本(replic ...
- Hadoop权威指南阅读笔记
2019独角兽企业重金招聘Python工程师标准>>> 1.MR和关系型数据 MR和传统的关系型数据库处理的数据是不同,传统关系型数据库处理的是较结构化数据,对于半结构化和非机构话数 ...
- WINDOWS+PE权威指南读书笔记(27)
目录 软件安装自动化 基本思路: 补丁程序 patch.exe: 执行线程函数: 简单测试: 消息发送器 _Message.exe: 窗口枚举回调函数: 调用窗口枚举函数: 向指定窗口发送消息: 消息 ...
最新文章
- 让浏览器不记住表单元素输入过的内容
- 本科理工男如何学习Linux
- 等价类、边界值的概念及划分
- python怎么字体加阴影_如何添加阴影到tkinter帧?
- 如何使用 Istio 进行多集群部署管理:单控制平面 Gateway 连接拓扑
- cisco交换机堆叠
- 饿了么商家电脑版_饿了么企业版荣膺“2020中国十大影响力人力资源品牌”大奖...
- python中for循环和while循环else语句的执行过程和陷阱
- Unity3D笔记 愤怒的小鸟四 实现Selelction界面
- Sqlmap脱库之“你的数据我所见”
- 谷歌google chrome浏览器Chrome版本太旧无法更新chrome无法更新至最新版本怎么办
- Android利用NotificationListenerService实现消息盒子功能
- 成为“高维空间”的人
- 【Python】MySQLdb库的使用以及格式化输出字段中的值
- 关于WIN10开机启动慢的一些问题解决
- [深入理解SSD 为SSD编程] SSD的架构和基准
- ctfshow crypto funnyrsa3 RSA之dp泄露
- 提示:Run-time error ‘339’:Cmponent 'MSCOMCTL.OCX'or one of its dependent..........的时候该怎么办?
- Apollo使用方法
- 视频批量添加透明水印,必须用这个方法