MapReduce:Shuffle过程详解

1、Map任务处理

  

  1.1 读取HDFS中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数。                <0,hello you>   <10,hello me>

  1.2 覆盖map(),接收1.1产生的<k,v>,进行处理,转换为新的<k,v>输出。          <hello,1> <you,1> <hello,1> <me,1>

  1.3 对1.2输出的<k,v>进行分区。默认分为一个区。Partitioner

  • partitioner的作用是将mapper输出的键/值对拆分为分片(shard),每个reducer对应一个分片。 默认情况下,partitioner先计算目标的散列值(通常为md5值)。然后,通过reducer个数执行取模运算key.hashCode()%(reducer的个数)。 这种方式不仅能够随机地将整个键空间平均分发给每个reducer,同时也能确保不同mapper产生的相同键能被分发至同一个reducer。 如果用户自己对Partitioner有需求,可以订制并设置到job上。 job.setPartitionerClass(clz);

  1.4 溢写Split

  • map之后的key/value对以及Partition的结果都会被序列化成字节数组写入缓冲区,这个内存缓冲区是有大小限制的,默认是100MB。
  • 当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。
  • 当溢写线程启动后,需要对这80MB空间内的key做排序sort(1.5)
  • 内存缓冲区没有对将发送到相同reduce端的数据做合并,那么这种合并应该是体现是磁盘文件中的。即Combine(1.6)。

  1.5 对不同分区中的数据进行排序(按照k)Sort

  • 排序:每个分区内调用job.setSortComparatorClass()设置的Key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass()设置 Key比较函数类,则使用Key实现的compareTo()方法,即字典排序。job.setSortComparatorClass(clz);       排序后:<hello,1> <hello,1> <me,1> <you,1>

  1.6 (可选)对分组后的数据进行归约。Combiner

  • combiner是一个可选的本地reducer,可以在map阶段聚合数据。combiner通过执行单个map范围内的聚合,减少通过网络传输的数据量。

  • 例如,一个聚合的计数是每个部分计数的总和,用户可以先将每个中间结果取和,再将中间结果的和相加,从而得到最终结果。

  • 求平均值的时候不能用,因为123的平均是2,12平均再和3平均结果就不对了。Combiner应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景,比如累加,最大值等。

  1.7 合并Merge

  • 每次溢写会在磁盘上生成一个溢写文件,如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个溢写文件存在。
  • 最终的文件只有一个,所以需要将这些溢写文件归并到一起,这个过程就叫做Merge。
  • “hello”从两个map task读取过来,因为它们有相同的key,所以得merge成group。什么是group。group中的值就是从不同溢写文件中读取出来的,然后再把这些值加起来。group后:<hello,{1,1}><me,{1}><you,{1}>
  • 因为merge是将多个溢写文件合并到一个文件,所以可能也有相同的key存在,在这个过程中如果设置过Combiner,也会使用Combiner来合并相同的key。

2、Reduce任务处理

  

  2.1 拉取数据Fetch

  • Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求map task所在的TaskTracker获取map task的输出文件。因为map task早已结束,这些文件就归TaskTracker管理在本地磁盘中。

  2.2 合并Merge

  • Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。
  • 这里需要强调的是,merge有三种形式:1)内存到内存  2)内存到磁盘  3)磁盘到磁盘。
  • 默认情况下第一种形式不启用。
  • 当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,也有sort排序,如果设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束。
  • 然后启动第三种磁盘到磁盘的merge方式,有相同的key的键值队,merge成group,job.setGroupingComparatorClass设置的分组函数类,进行分组,同一个分组的value放在一个迭代器里面(二次排序会重新设置分组规则)。如果未指定GroupingComparatorClass则则使用Key的实现的compareTo方法来对其分组。group中的值就是从不同溢写文件中读取出来的,group后:<hello,{1,1}><me,{1}><you,{1}>
  • 最终的生成的文件作为Reducer的输入,整个Shuffle才最终结束。

  2.3  Reduce

  • Reducer执行业务逻辑,产生新的<k,v>输出,将结果写到HDFS中。

3、WordCount代码

package mapreduce;import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class WordCountApp {static final String INPUT_PATH = "hdfs://chaoren:9000/hello";static final String OUT_PATH = "hdfs://chaoren:9000/out";public static void main(String[] args) throws Exception {Configuration conf = new Configuration();FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);Path outPath = new Path(OUT_PATH);if (fileSystem.exists(outPath)) {fileSystem.delete(outPath, true);}Job job = new Job(conf, WordCountApp.class.getSimpleName());// 指定读取的文件位于哪里
        FileInputFormat.setInputPaths(job, INPUT_PATH);// 指定如何对输入的文件进行格式化,把输入文件每一行解析成键值对//job.setInputFormatClass(TextInputFormat.class);// 指定自定义的map类job.setMapperClass(MyMapper.class);// map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略//job.setOutputKeyClass(Text.class);//job.setOutputValueClass(LongWritable.class);// 指定自定义reduce类job.setReducerClass(MyReducer.class);// 指定reduce的输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);// 指定写出到哪里
        FileOutputFormat.setOutputPath(job, outPath);// 指定输出文件的格式化类//job.setOutputFormatClass(TextOutputFormat.class);// 分区//job.setPartitionerClass(clz);// 排序、分组、归约  //job.setSortComparatorClass(clz);//job.setGroupingComparatorClass(clz);//job.setCombinerClass(clz);// 有一个reduce任务运行//job.setNumReduceTasks(1);// 把job提交给jobtracker运行job.waitForCompletion(true);}/*** * KEYIN     即K1     表示行的偏移量 * VALUEIN     即V1     表示行文本内容 * KEYOUT     即K2     表示行中出现的单词 * VALUEOUT 即V2        表示行中出现的单词的次数,固定值1* */static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException, InterruptedException {String[] splited = v1.toString().split("\t");for (String word : splited) {context.write(new Text(word), new LongWritable(1));}};}/*** KEYIN     即K2     表示行中出现的单词 * VALUEIN     即V2     表示出现的单词的次数 * KEYOUT     即K3     表示行中出现的不同单词* VALUEOUT 即V3     表示行中出现的不同单词的总次数*/static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException, InterruptedException {long times = 0L;for (LongWritable count : v2s) {times += count.get();}ctx.write(k2, new LongWritable(times));};}
}  

posted on 2018-11-20 18:03 ErBing 阅读(...) 评论(...) 编辑 收藏

转载于:https://www.cnblogs.com/erbing/p/9989480.html

MapReduce:Shuffle过程详解相关推荐

  1. Shuffle过程详解

    Shuffle过程详解 Shuffle过程是MapReduce的核心,最近看了很多资料,网上说法大体相同,但有些地方有一点点出入,就是各个阶段的执行顺序 总个shuffle过程可以看做是从map输出到 ...

  2. Hadoop学习之Mapreduce执行过程详解

    一.MapReduce执行过程 MapReduce运行时,首先通过Map读取HDFS中的数据,然后经过拆分,将每个文件中的每行数据分拆成键值对,最后输出作为Reduce的输入,大体执行流程如下图所示: ...

  3. hadoop: Shuffle过程详解 (转载)

    原文地址:http://langyu.iteye.com/blog/992916 另一篇博文:http://www.cnblogs.com/gwgyk/p/3997849.html Shuffle过程 ...

  4. MapReduce模型过程详解

    @Author : Spinach | GHB @Link : http://blog.csdn.net/bocai8058 文章目录 MapReduce过程 从客户端.jobTracker.task ...

  5. MapReduce内部shuffle过程详解(Combiner的使用)

    Maptask调用一个组件FileInputFormat FileInputFormat有一个最高层的接口 --> InputFormat 我们不需要去写自己的实现类,使用的就是内部默认的组件: ...

  6. Hadoop之Shuffle机制详解

    Hadoop之Shuffle机制详解 目录 Shuffle机制 Partition分区 WritableComparable排序 Combiner合并 GroupingComparator分组(辅助排 ...

  7. 简历项目描述过程详解

    简历项目描述过程详解 一.项目分点 1.1 集群规模 1.2 框架结构,画出来 1.3 框架 1.3.1 第一个Flume 1.3.1.1 碰到的问题 1.3.2 kafka 1.3.2.1 框架介绍 ...

  8. hadoop作业初始化过程详解(源码分析第三篇)

    (一)概述 我们在上一篇blog已经详细的分析了一个作业从用户输入提交命令到到达JobTracker之前的各个过程.在作业到达JobTracker之后初始化之前,JobTracker会通过submit ...

  9. Spark 3.2.0 版本新特性 push-based shuffle 论文详解(一)概要和介绍

    前言 本文隶属于专栏<大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见大数据技术体系 目录 Spark 3.2.0 ...

最新文章

  1. Spring-boot国际化
  2. 用java调用oracle存储过程总结
  3. 计算机5G英语文献,无线通信英文参考文献精选
  4. np.random.seed的有效期及固定的种子会有固定的顺序
  5. Oracle 11g 新特性简介
  6. 第三次学JAVA再学不好就吃翔(part66)--Pattern类和Matcher类
  7. 数据校验器架构模式组
  8. Oracle主要概念汇总
  9. Android系列之Fragment(三)----Fragment和Activity之间的通信(含接口回调)
  10. FLEX中restrict限定TextInput输入
  11. ACM题目————STL练习之众数问题
  12. 嘉兴 机器人仓库 菜鸟_投资 2.1 亿!又一高端智能装备项目落户嘉兴科技城
  13. nero linux刻录教程,nero刻录数据光盘的图文教程
  14. 普林斯顿微积分读本小记(未完待续)
  15. MatLab:数字图像处理实验
  16. c语言中while循环语句的作用,c语言中while的用法
  17. 计算机大作业的范本,计算机应用基础大作业.doc
  18. 安装texthero踩过的坑
  19. 区块链通证经济——资产流动性的变革
  20. Word操作之Mathtype自动进行公式编号

热门文章

  1. Redis源码分析(一)--Redis结构解析
  2. mariadb数据库文件的组成
  3. mysql5.5主从同步复制配置
  4. 计算机等级考试绝对应用,96年4月至210年全国计算机等级考试绝对全收集.docx
  5. 计算机专业买r7000,2020年双十一有哪些游戏本值得买-7千到1万游戏本排行
  6. springboot的api_【粉丝投稿】无需额外注解的 SpringBoot API文档生成工具
  7. Spring高级之注解@DependsOn详解(超详细)
  8. Java虚拟机(五)——程序计数器
  9. 5-8经典卷子神经网络结构介绍
  10. OpenCV计算机视觉实战(Python版)_003阈值与平滑处理