MapReduce:Shuffle过程详解
1、Map任务处理
1.1 读取HDFS中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数。 <0,hello you> <10,hello me>
1.2 覆盖map(),接收1.1产生的<k,v>,进行处理,转换为新的<k,v>输出。 <hello,1> <you,1> <hello,1> <me,1>
1.3 对1.2输出的<k,v>进行分区。默认分为一个区。Partitioner
- partitioner的作用是将mapper输出的键/值对拆分为分片(shard),每个reducer对应一个分片。 默认情况下,partitioner先计算目标的散列值(通常为md5值)。然后,通过reducer个数执行取模运算key.hashCode()%(reducer的个数)。 这种方式不仅能够随机地将整个键空间平均分发给每个reducer,同时也能确保不同mapper产生的相同键能被分发至同一个reducer。 如果用户自己对Partitioner有需求,可以订制并设置到job上。 job.setPartitionerClass(clz);
1.4 溢写Split
- map之后的key/value对以及Partition的结果都会被序列化成字节数组写入缓冲区,这个内存缓冲区是有大小限制的,默认是100MB。
- 当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。
- 当溢写线程启动后,需要对这80MB空间内的key做排序sort(1.5)
- 内存缓冲区没有对将发送到相同reduce端的数据做合并,那么这种合并应该是体现是磁盘文件中的。即Combine(1.6)。
1.5 对不同分区中的数据进行排序(按照k)Sort。
- 排序:每个分区内调用job.setSortComparatorClass()设置的Key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass()设置 Key比较函数类,则使用Key实现的compareTo()方法,即字典排序。job.setSortComparatorClass(clz); 排序后:<hello,1> <hello,1> <me,1> <you,1>
1.6 (可选)对分组后的数据进行归约。Combiner
combiner是一个可选的本地reducer,可以在map阶段聚合数据。combiner通过执行单个map范围内的聚合,减少通过网络传输的数据量。
例如,一个聚合的计数是每个部分计数的总和,用户可以先将每个中间结果取和,再将中间结果的和相加,从而得到最终结果。
求平均值的时候不能用,因为123的平均是2,12平均再和3平均结果就不对了。Combiner应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景,比如累加,最大值等。
1.7 合并Merge
- 每次溢写会在磁盘上生成一个溢写文件,如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个溢写文件存在。
- 最终的文件只有一个,所以需要将这些溢写文件归并到一起,这个过程就叫做Merge。
- “hello”从两个map task读取过来,因为它们有相同的key,所以得merge成group。什么是group。group中的值就是从不同溢写文件中读取出来的,然后再把这些值加起来。group后:<hello,{1,1}><me,{1}><you,{1}>
- 因为merge是将多个溢写文件合并到一个文件,所以可能也有相同的key存在,在这个过程中如果设置过Combiner,也会使用Combiner来合并相同的key。
2、Reduce任务处理
2.1 拉取数据Fetch
- Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求map task所在的TaskTracker获取map task的输出文件。因为map task早已结束,这些文件就归TaskTracker管理在本地磁盘中。
2.2 合并Merge
- Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。
- 这里需要强调的是,merge有三种形式:1)内存到内存 2)内存到磁盘 3)磁盘到磁盘。
- 默认情况下第一种形式不启用。
- 当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,也有sort排序,如果设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束。
- 然后启动第三种磁盘到磁盘的merge方式,有相同的key的键值队,merge成group,job.setGroupingComparatorClass设置的分组函数类,进行分组,同一个分组的value放在一个迭代器里面(二次排序会重新设置分组规则)。如果未指定GroupingComparatorClass则则使用Key的实现的compareTo方法来对其分组。group中的值就是从不同溢写文件中读取出来的,group后:<hello,{1,1}><me,{1}><you,{1}>
- 最终的生成的文件作为Reducer的输入,整个Shuffle才最终结束。
2.3 Reduce
- Reducer执行业务逻辑,产生新的<k,v>输出,将结果写到HDFS中。
3、WordCount代码
package mapreduce;import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class WordCountApp {static final String INPUT_PATH = "hdfs://chaoren:9000/hello";static final String OUT_PATH = "hdfs://chaoren:9000/out";public static void main(String[] args) throws Exception {Configuration conf = new Configuration();FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);Path outPath = new Path(OUT_PATH);if (fileSystem.exists(outPath)) {fileSystem.delete(outPath, true);}Job job = new Job(conf, WordCountApp.class.getSimpleName());// 指定读取的文件位于哪里 FileInputFormat.setInputPaths(job, INPUT_PATH);// 指定如何对输入的文件进行格式化,把输入文件每一行解析成键值对//job.setInputFormatClass(TextInputFormat.class);// 指定自定义的map类job.setMapperClass(MyMapper.class);// map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略//job.setOutputKeyClass(Text.class);//job.setOutputValueClass(LongWritable.class);// 指定自定义reduce类job.setReducerClass(MyReducer.class);// 指定reduce的输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);// 指定写出到哪里 FileOutputFormat.setOutputPath(job, outPath);// 指定输出文件的格式化类//job.setOutputFormatClass(TextOutputFormat.class);// 分区//job.setPartitionerClass(clz);// 排序、分组、归约 //job.setSortComparatorClass(clz);//job.setGroupingComparatorClass(clz);//job.setCombinerClass(clz);// 有一个reduce任务运行//job.setNumReduceTasks(1);// 把job提交给jobtracker运行job.waitForCompletion(true);}/*** * KEYIN 即K1 表示行的偏移量 * VALUEIN 即V1 表示行文本内容 * KEYOUT 即K2 表示行中出现的单词 * VALUEOUT 即V2 表示行中出现的单词的次数,固定值1* */static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException, InterruptedException {String[] splited = v1.toString().split("\t");for (String word : splited) {context.write(new Text(word), new LongWritable(1));}};}/*** KEYIN 即K2 表示行中出现的单词 * VALUEIN 即V2 表示出现的单词的次数 * KEYOUT 即K3 表示行中出现的不同单词* VALUEOUT 即V3 表示行中出现的不同单词的总次数*/static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException, InterruptedException {long times = 0L;for (LongWritable count : v2s) {times += count.get();}ctx.write(k2, new LongWritable(times));};} }
转载于:https://www.cnblogs.com/erbing/p/9989480.html
MapReduce:Shuffle过程详解相关推荐
- Shuffle过程详解
Shuffle过程详解 Shuffle过程是MapReduce的核心,最近看了很多资料,网上说法大体相同,但有些地方有一点点出入,就是各个阶段的执行顺序 总个shuffle过程可以看做是从map输出到 ...
- Hadoop学习之Mapreduce执行过程详解
一.MapReduce执行过程 MapReduce运行时,首先通过Map读取HDFS中的数据,然后经过拆分,将每个文件中的每行数据分拆成键值对,最后输出作为Reduce的输入,大体执行流程如下图所示: ...
- hadoop: Shuffle过程详解 (转载)
原文地址:http://langyu.iteye.com/blog/992916 另一篇博文:http://www.cnblogs.com/gwgyk/p/3997849.html Shuffle过程 ...
- MapReduce模型过程详解
@Author : Spinach | GHB @Link : http://blog.csdn.net/bocai8058 文章目录 MapReduce过程 从客户端.jobTracker.task ...
- MapReduce内部shuffle过程详解(Combiner的使用)
Maptask调用一个组件FileInputFormat FileInputFormat有一个最高层的接口 --> InputFormat 我们不需要去写自己的实现类,使用的就是内部默认的组件: ...
- Hadoop之Shuffle机制详解
Hadoop之Shuffle机制详解 目录 Shuffle机制 Partition分区 WritableComparable排序 Combiner合并 GroupingComparator分组(辅助排 ...
- 简历项目描述过程详解
简历项目描述过程详解 一.项目分点 1.1 集群规模 1.2 框架结构,画出来 1.3 框架 1.3.1 第一个Flume 1.3.1.1 碰到的问题 1.3.2 kafka 1.3.2.1 框架介绍 ...
- hadoop作业初始化过程详解(源码分析第三篇)
(一)概述 我们在上一篇blog已经详细的分析了一个作业从用户输入提交命令到到达JobTracker之前的各个过程.在作业到达JobTracker之后初始化之前,JobTracker会通过submit ...
- Spark 3.2.0 版本新特性 push-based shuffle 论文详解(一)概要和介绍
前言 本文隶属于专栏<大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见大数据技术体系 目录 Spark 3.2.0 ...
最新文章
- Spring-boot国际化
- 用java调用oracle存储过程总结
- 计算机5G英语文献,无线通信英文参考文献精选
- np.random.seed的有效期及固定的种子会有固定的顺序
- Oracle 11g 新特性简介
- 第三次学JAVA再学不好就吃翔(part66)--Pattern类和Matcher类
- 数据校验器架构模式组
- Oracle主要概念汇总
- Android系列之Fragment(三)----Fragment和Activity之间的通信(含接口回调)
- FLEX中restrict限定TextInput输入
- ACM题目————STL练习之众数问题
- 嘉兴 机器人仓库 菜鸟_投资 2.1 亿!又一高端智能装备项目落户嘉兴科技城
- nero linux刻录教程,nero刻录数据光盘的图文教程
- 普林斯顿微积分读本小记(未完待续)
- MatLab:数字图像处理实验
- c语言中while循环语句的作用,c语言中while的用法
- 计算机大作业的范本,计算机应用基础大作业.doc
- 安装texthero踩过的坑
- 区块链通证经济——资产流动性的变革
- Word操作之Mathtype自动进行公式编号
热门文章
- Redis源码分析(一)--Redis结构解析
- mariadb数据库文件的组成
- mysql5.5主从同步复制配置
- 计算机等级考试绝对应用,96年4月至210年全国计算机等级考试绝对全收集.docx
- 计算机专业买r7000,2020年双十一有哪些游戏本值得买-7千到1万游戏本排行
- springboot的api_【粉丝投稿】无需额外注解的 SpringBoot API文档生成工具
- Spring高级之注解@DependsOn详解(超详细)
- Java虚拟机(五)——程序计数器
- 5-8经典卷子神经网络结构介绍
- OpenCV计算机视觉实战(Python版)_003阈值与平滑处理