1. MapReduce数据流向分析

MapReduce是一种并行数据处理框架,首先我们需要关注系统中数据流向问题,也就是从输入到输出过程中的数据传输问题。

1)从HDFS到Mpper节点输入文件。在一般情况下,存储数据的节点是Mapper运行的节点,不需要在节点之间进行数据传输,也就是尽量让存储靠近计算。但是由于用户的数据文件往往不是均衡的分布在整个集群中,则MapReduce的计算槽位资源却是均衡的分布在整个集群中的,因此某些计算节点就需要从数据存储节点获取数据到自己的计算节点,这样就会存在数据从HDFS上的存储节点到另一个计算节点的数据传输,为了提高集群资源的利用率,Hadoop会从距离计算节点最近的数据副本进行数据传输。

2)Mapper输出到内存缓冲区。Mapper的输出并不是直接写入本地文件系统,而是先写入内存缓冲区。

3)当缓冲区达到一定的阈值时就将缓冲区中的数据以临时文件的形式写入本地磁盘。默认的缓冲区大小是100MB,溢写比例默认是0.8 (可通过spill.percent参数来调节)

当达到阈值时,溢写线程就会启动并锁定这80MB内存执行溢写过程,这一过程称为spill。溢写线程启动的同时还会对这80MB的内存数据依据key的序列化字节做排序。当整个map任务结束后,会对这个map任务产生的所有临时文件进行合并,并产生最终的输出文件。

需要注意:在写入内存缓冲区的同时执行Partition分区。

如果用户作业设置了Combiner,那么在溢写到磁盘之前会对Map输出的键值对调用Combiner归约,这样可以减少溢写到本地磁盘文件的数据量。

4)从Mapper端的本地文件系统流入Reduce端,也就是 Reduce中的Shuffle阶段 分三种情况:

  1. 多个Reduce,需要将Mapper输出中的分区Region文件远程复制到相应的Reduce节点,如4-1
  2. Mapper节点所在机器有Reduce槽位,则会直接写入本机Reduce缓冲区,如4-2
  3. 本机的Reduce还会接受其他Mapper输出的分区Region文件,如4-3

需要注意的是Reduce端的这个内存缓冲区也有一个阈值,当相应的Region文件大于这个阈值便写入磁盘。

5)从Reduce端内存缓冲区流向本地磁盘的过程就是Reduce中Merge和Sort阶段。Merge分为内存文件合并和磁盘文件合并,同时还会以key为键排序,最终生成已经对相同key的value进行聚集并排序好的输出文件。

6)流向Reduce函数进行归约处理

7)写入HDFS中,生成输出文件。

2. MapTask实现分析

MapTask的总逻辑流程,包括以下几个阶段:

2.1 Read阶段

首先通过taskContext.getInputFormatClass()得到用户指定的InputFormatClass来创建InputFormat对象实例;其次,创建InputSplit对象,这个对象负责对文件进行数据块的逻辑切分;最后,创建RecordReader对象。InputFormat对象会提供getSplit()重要方法,通过getSplit()将输入文件切分成若干个逻辑InputSplit实例对象会把InputSplit提供的输入文件转化为Mapper需要的keys/vaule键值对集合形式。

2.2 Map阶段

对输入的键值对调用用户编写的Map函数进行处理,输出<key,value>键值对

2.3 Collector和Partitioner阶段

收集Mapper输出,在OutputCollector函数内部对键值对进行Partitioner分区,以便确定相应的Reducer处理,这个阶段将最终的键值对集合输出到内存缓冲区。

2.4 Spill阶段

包含Sort和Combiner阶段,当内存缓冲区达到阈值后写入本地磁盘,在这个阶段会对Mapper的输出键值对进行Sort排序,如果设置了Combiner会执行Combiner函数

Spill阶段有两个重要的逻辑,Sort和Combiner(如果用户设置了Combiner)。

每个mapTask都有一个内存的环形缓存区(默认是100MB),存储着map的输出结果。当缓存区块满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘(Spill)。溢写时由单线程来完成的,不影响往缓冲区写map结果的线程(spill.percent,默认是0.8)

2.5 Merge阶段

对Spill阶段在本地磁盘生成的小文件进行多次合并,最终生成一个大文件。

(3),(4),(5)也称Map端的Shuffle

3. ReduceTask 实现分析

ReduceTask的总逻辑流程,包括以下几个阶段:

3.1 Shuffle阶段

这个阶段就是Reduce中的Copy阶段,运行Reducer的TaskTracker需要从各个Mapper节点远程复制属于自己处理的一段数据。

Mapper的输出是写入本地磁盘的,并且是按照partition分区号进行组织的,Reduce的输入便是分布在集群中多个Mapper任务输出数据中同一partition段的数据,Map任务可能会在不同的时间内完成,只要其中一个Map任务完成了,ReduceTask就开始复制它的数据了。

3.2 Merge阶段

由于执行Shuffle阶段时会从各个Mapper节点复制很多同一partition段的数据,因此需要进行多次合并,以防止ReduceTask节点上内存使用过多或小文件过多。

在Shuffle阶段中启动数据复制线程MapOutputCopier后就开始进行Merge阶段。Merge包括两种情况:基于内存的合并和基于磁盘的合并,其分别对应的线程为InMemFSMergeTheread和LocalFSMerge。

3.3 Sort阶段

虽然每个Mapper的输出是按照key排序好的,但是经过Shuffle和Merge阶段后并不是统一有序的,因此还需要在Reduce端进行多轮归并排序。

3.4 Reduce 阶段

Reduce的输入要求是按照key排序的,因此只有在Sort阶段执行完成之后才可以对数据调用用户编写的Reduce类进行归约处理。

Hadoop详解(八):MapReduce深度分析相关推荐

  1. Hadoop详解(三)——MapReduce原理和执行过程,远程Debug,Writable序列化接口,MapReduce程序编写

    MapReduce概述 MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题. MR由两个阶段组成:Map和Reduce,用户只需要实现map()和Re ...

  2. hadoop作业初始化过程详解(源码分析第三篇)

    (一)概述 我们在上一篇blog已经详细的分析了一个作业从用户输入提交命令到到达JobTracker之前的各个过程.在作业到达JobTracker之后初始化之前,JobTracker会通过submit ...

  3. spark 调度模块详解及源码分析

    spark 调度模块详解及源码分析 @(SPARK)[spark] spark 调度模块详解及源码分析 一概述 一三个主要的类 1class DAGScheduler 2trait TaskSched ...

  4. Batch Normalization详解(原理+实验分析)

    Batch Normalization详解(原理+实验分析) 1. 计算过程 2. 前向传播过程 3. 反向传播过程 4. 实验分析 4.1 实验一:验证有没有BatchNorm下准确率的区别 4.2 ...

  5. Hadoop详解以及历史版本介绍

    Hadoop详解 Hadoop的介绍以及发展历史 Hadoop之父Doug Cutting Hadoop最早起源于lucene下的Nutch.Nutch的设计目标是构建一个大型的全网搜索引擎,包括网页 ...

  6. 方案详解|AARRR+八角行为分析=用游戏化思维实现用户增长

    我们需要一套基于AARRR模型,围绕增长成本.效率.质量三个话题来针对每一层转化漏斗提炼可操作的运营方案,以AARRR模型+八角行为分析法为理论框架的,游戏化运营增长策略应运而生.随着互联网线上流量的 ...

  7. 【小白入门】超详细的OCRnet详解(含代码分析)

    [小白入门]超详细的OCRnet详解(含代码分析) OCRnet 简介 网络结构 具体实现(含代码分析) 实验结果 本文仅梳理总结自己在学习过程中的一些理解和思路,不保证绝对正确,请酌情参考.如果各位 ...

  8. SpringMVC异常处理机制详解[附带源码分析]

    SpringMVC异常处理机制详解[附带源码分析] 参考文章: (1)SpringMVC异常处理机制详解[附带源码分析] (2)https://www.cnblogs.com/fangjian0423 ...

  9. Hadoop详解(十二):Yarn资源调度策略

    在Yarn中有三种调度器可以选择:FIFO Scheduler ,Capacity Scheduler,Fair Scheduler. FIFO Scheduler FIFO Scheduler把应用 ...

  10. spark RDD详解及源码分析

    spark RDD详解及源码分析 @(SPARK)[spark] spark RDD详解及源码分析 一基础 一什么是RDD 二RDD的适用范围 三一些特性 四RDD的创建 1由一个已经存在的scala ...

最新文章

  1. oracle user_scheduler_jobs,Oracle 定时任务dbms_scheduler
  2. github删除文件夹
  3. 类的方法(通过引用来传递参数)
  4. 修改引入表打造穿透KIS6的下载者(转)
  5. Redis学习---(10)Redis 集合(Set)
  6. Windows Server 2008 R2 MSDN ISO镜像简体中文版和英文版下载
  7. 数字的眼光看世界(常见常量、数值大下)
  8. python生成词云_词云制作没那么难,Python 10 行代码就实现了!
  9. 用Visual Studio写PHP代码
  10. sql server 表结构信息查询
  11. Java实现抓取百度识图结果的实现和思路-1-创造百度识图的URL链接
  12. 常见拓展名--的含义(扩宽知识面)
  13. 教室录播系统方案_校园录播教室搭建方案?
  14. 容联云通讯发送短信java实现
  15. matlab——整数规划
  16. python基于PHP+MySQL的在线音乐点歌系统
  17. 框架—— Serverlet
  18. Speedoffice(PPT)怎么自定义设置幻灯片大小
  19. State Threads 回调终结者
  20. android微博分块,微信、微博刷屏利器 图片切割9Cut安卓版

热门文章

  1. 对比两个表中,字段名不一样的SQL
  2. 实战:向GitHub提交代码时触发Jenkins自动构建
  3. CSS之 :before :after的用法,伪类和伪元素的区别
  4. Entity Framework
  5. 你的.net 2.0 真的能与1.1 安全正确地运行在同一台电脑上吗? 小心Server Application Unavailable 错误...
  6. Python机器学习及分析工具:Scipy篇
  7. 利用you-get批量下载bilibili等网站的视频
  8. 利用74LS138实现4-16译码器,并在QuartusⅡ上进行仿真
  9. 报错笔记:cannot convert parameter 1 from 'char [1024]' to 'unsigned char *'
  10. vc6.0打开工程出现程序崩了的原因