博客地址: http://blog.csdn.net/yueqian_zhu/

shuffle的读流程也是从compute方法开始的

override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context).read().asInstanceOf[Iterator[(K, C)]]}

目前来说,不管是sortShuffleManager还是hashShuffleManager,getReader方法返回的都是HashShuffleReader。

接着调用read方法,如下:

/** Read the combined key-values for this reduce task */override def read(): Iterator[Product2[K, C]] = {val ser = Serializer.getSerializer(dep.serializer)val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {if (dep.mapSideCombine) {new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))} else {new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))}} else {require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")// Convert the Product2s to pairs since this is what downstream RDDs currently expectiter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))}// Sort the output if there is a sort ordering defined.dep.keyOrdering match {case Some(keyOrd: Ordering[K]) =>// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,// the ExternalSorter won't spill to disk.val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))sorter.insertAll(aggregatedIter)context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)sorter.iteratorcase None =>aggregatedIter}}

该方法首先调用了fetch方法,介绍一下

1、在task运行那节介绍过,shuffleMapTask运行完成后,会将shuffleId及mapstatus的映射注册到mapOutputTracker中

2、fetch方法首先尝试在本地mapstatuses中查找是否有该shuffleId的信息,有则本地取;否则想master的mapOutputTracker请求并读取,返回块管理器的地址和对应partition的文件长度

3、然后根据我们得到的shuffleId等信息去remote或者local通过netty/nio读取,返回一个迭代器

4、返回的迭代器中的数据并不是全部在内存中的,读取时会根据配置的内存最大值来读取。内存不够的话,下一个待读取

fetch方法返回一个迭代器后,根据是否mapSideCombine来区分时候需要将读取到的数据进行合并操作。合并过程与写流程类似,内存放不下就写入本地磁盘。

如果还需要keyOrdering的,new一个ExternalSorter进行外部排序。之后也是同shuffle写流程的insertAll。

spark core源码分析16 Shuffle详解-读流程相关推荐

  1. javaweb_笔记2(Servlet源码分析;request详解;请求域;转发和重定向;WebServlet注解;jsp基础语法,JavaBean。)

    1.HttpServlet源码分析 HttpServlet类是专门为HTTP协议准备的.比GenericServlet更加适合HTTP协议下的开发. HttpServlet在哪个包下? jakarta ...

  2. nginx源码分析configure脚本详解

    一.前言 在分析源码时,经常可以看到类似 #if (NGX_PCRE) .... #endif 这样的代码段,这样的设计可以在不改动源码的情况下,通过简单的定义宏的方式来实现功能的打开与关闭,但是在n ...

  3. linux lsof 源码分析,linux lsof详解

    lsof全名list openedfiles,也就是列举系统中已经被打开的文件.我们都知道,linux环境中,任何事物都是文件,设备是文件,目录是文件,甚至sockets也是文件.所以,用好lsof命 ...

  4. Xposed源码剖析——app_process作用详解

    Xposed源码剖析--app_process作用详解 首先吐槽一下CSDN的改版吧,发表这篇文章之前其实我已经将此篇文章写过了两三次了.就是发表不成功.而且CSDN将我的文章草稿也一带>删除掉 ...

  5. 【SemiDrive源码分析】【X9芯片启动流程】30 - AP1 Android Kernel 启动流程 start_kernel 函数详细分析(一)

    [SemiDrive源码分析][X9芯片启动流程]30 - AP1 Android Kernel 启动流程 start_kernel 函数详细分析(一) 一.Android Kernel 启动流程分析 ...

  6. php+mysql案例含源码_【专注】Zabbix源码安装教程—步骤详解(1)安装前准备

    一.实验环境准备 Rhel 7.6 x86_64(server) 192.168.163.72 Rhel 6.5 x86_64(agent) 192.168.163.61 均已配置操作安装光盘为YUM ...

  7. Android 系统(78)---《android framework常用api源码分析》之 app应用安装流程

    <android framework常用api源码分析>之 app应用安装流程 <android framework常用api源码分析>android生态在中国已经发展非常庞大 ...

  8. React 源码系列 | React Context 详解

    目前来看 Context 是一个非常强大但是很多时候不会直接使用的 api.大多数项目不会直接使用 createContext 然后向下面传递数据,而是采用第三方库(react-redux). 想想项 ...

  9. 【SemiDrive源码分析】【X9芯片启动流程】19 - MailBox 核间通信机制介绍(理论篇)

    [SemiDrive源码分析][X9芯片启动流程]19 - MailBox 核间通信机制介绍(理论篇) 一.核间通信 二.核间通信软件架构 三.Mailbox 设备驱动 3.1 Mailbox for ...

  10. Netty源码分析第1章(Netty启动流程)----第4节: 注册多路复用

    Netty源码分析第1章(Netty启动流程)---->第4节: 注册多路复用 Netty源码分析第一章:Netty启动流程   第四节:注册多路复用 回顾下以上的小节, 我们知道了channe ...

最新文章

  1. 近期活动盘点:心电数据标注系统和深度学习诊断算法研究、2019年第六届清华大学大数据社会科学讲习班...
  2. linux删除U盘分区、创建分区和格式化
  3. 数据结构(四)---栈的顺序存储的实现---java版
  4. 校友会2019中国大学计算机,校友会2019中国计算机类一流专业排名,清华大学排名第一...
  5. mavenspringboot项目打包引入lib目录下jar包
  6. Python学习笔记--CSV模块读写数据(转)
  7. 使用Eclipse开发Android应用程序
  8. 【亲测】Ripro子主题美化X系列主题(夏系列)-开源未加密
  9. Mysql数据库常用命令总结
  10. 南开大学2017年数学分析高等代数考研试题
  11. CSP201903-1 小中大 (Python)
  12. 教程|最新空白昵称与透明头像设置方法教程,让自己在微信中隐藏掉
  13. 机械键盘win键和alt键反了
  14. Javascript的事件驱动
  15. oCPC实践录 | 广告算法工程师的自我修养
  16. 解决 plt.savefig() 生成空白图片的问题
  17. android原生见缝插针游戏自定义控件源码
  18. 4-3指令系统-CISC和RISC
  19. kalilinux链接蓝牙音响_Kali-Linux安驱动、使用Blueman连接蓝牙耳机
  20. Android系统架构与分层

热门文章

  1. 《SpringCloud Alibaba 微服务架构》专题(二十三)-Seat工作原理
  2. 第五节、Vim编辑器与Shell入门
  3. 智能视频抠图_抠图扣视频,AI人工智能还能这样玩!
  4. rovio环境配置与测试笔记
  5. matlab rtw 生成c代码,MATLAB Coder ——从MATLAB代码生成C/C++代码
  6. 云解析DNS使用教程
  7. MATLAB怎么求非线性度误差,用Matlab进行最小二乘法线性拟合(求传感器非线性误差、灵敏度)...
  8. 创业关于抖音的技术分析与同类产品如何弯道超车的几个问题
  9. 打印机连接计算机主机的,电脑怎样连接打印机,详细教您电脑如何连接网络打印机...
  10. 最值得收藏的 搜狗输入法 常用快捷键使用, 让你的效率成倍增加