spark core源码分析16 Shuffle详解-读流程
博客地址: http://blog.csdn.net/yueqian_zhu/
shuffle的读流程也是从compute方法开始的
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context).read().asInstanceOf[Iterator[(K, C)]]}
目前来说,不管是sortShuffleManager还是hashShuffleManager,getReader方法返回的都是HashShuffleReader。
接着调用read方法,如下:
/** Read the combined key-values for this reduce task */override def read(): Iterator[Product2[K, C]] = {val ser = Serializer.getSerializer(dep.serializer)val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {if (dep.mapSideCombine) {new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))} else {new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))}} else {require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")// Convert the Product2s to pairs since this is what downstream RDDs currently expectiter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))}// Sort the output if there is a sort ordering defined.dep.keyOrdering match {case Some(keyOrd: Ordering[K]) =>// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,// the ExternalSorter won't spill to disk.val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))sorter.insertAll(aggregatedIter)context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)sorter.iteratorcase None =>aggregatedIter}}
该方法首先调用了fetch方法,介绍一下
1、在task运行那节介绍过,shuffleMapTask运行完成后,会将shuffleId及mapstatus的映射注册到mapOutputTracker中
2、fetch方法首先尝试在本地mapstatuses中查找是否有该shuffleId的信息,有则本地取;否则想master的mapOutputTracker请求并读取,返回块管理器的地址和对应partition的文件长度
3、然后根据我们得到的shuffleId等信息去remote或者local通过netty/nio读取,返回一个迭代器
4、返回的迭代器中的数据并不是全部在内存中的,读取时会根据配置的内存最大值来读取。内存不够的话,下一个待读取
fetch方法返回一个迭代器后,根据是否mapSideCombine来区分时候需要将读取到的数据进行合并操作。合并过程与写流程类似,内存放不下就写入本地磁盘。
如果还需要keyOrdering的,new一个ExternalSorter进行外部排序。之后也是同shuffle写流程的insertAll。
spark core源码分析16 Shuffle详解-读流程相关推荐
- javaweb_笔记2(Servlet源码分析;request详解;请求域;转发和重定向;WebServlet注解;jsp基础语法,JavaBean。)
1.HttpServlet源码分析 HttpServlet类是专门为HTTP协议准备的.比GenericServlet更加适合HTTP协议下的开发. HttpServlet在哪个包下? jakarta ...
- nginx源码分析configure脚本详解
一.前言 在分析源码时,经常可以看到类似 #if (NGX_PCRE) .... #endif 这样的代码段,这样的设计可以在不改动源码的情况下,通过简单的定义宏的方式来实现功能的打开与关闭,但是在n ...
- linux lsof 源码分析,linux lsof详解
lsof全名list openedfiles,也就是列举系统中已经被打开的文件.我们都知道,linux环境中,任何事物都是文件,设备是文件,目录是文件,甚至sockets也是文件.所以,用好lsof命 ...
- Xposed源码剖析——app_process作用详解
Xposed源码剖析--app_process作用详解 首先吐槽一下CSDN的改版吧,发表这篇文章之前其实我已经将此篇文章写过了两三次了.就是发表不成功.而且CSDN将我的文章草稿也一带>删除掉 ...
- 【SemiDrive源码分析】【X9芯片启动流程】30 - AP1 Android Kernel 启动流程 start_kernel 函数详细分析(一)
[SemiDrive源码分析][X9芯片启动流程]30 - AP1 Android Kernel 启动流程 start_kernel 函数详细分析(一) 一.Android Kernel 启动流程分析 ...
- php+mysql案例含源码_【专注】Zabbix源码安装教程—步骤详解(1)安装前准备
一.实验环境准备 Rhel 7.6 x86_64(server) 192.168.163.72 Rhel 6.5 x86_64(agent) 192.168.163.61 均已配置操作安装光盘为YUM ...
- Android 系统(78)---《android framework常用api源码分析》之 app应用安装流程
<android framework常用api源码分析>之 app应用安装流程 <android framework常用api源码分析>android生态在中国已经发展非常庞大 ...
- React 源码系列 | React Context 详解
目前来看 Context 是一个非常强大但是很多时候不会直接使用的 api.大多数项目不会直接使用 createContext 然后向下面传递数据,而是采用第三方库(react-redux). 想想项 ...
- 【SemiDrive源码分析】【X9芯片启动流程】19 - MailBox 核间通信机制介绍(理论篇)
[SemiDrive源码分析][X9芯片启动流程]19 - MailBox 核间通信机制介绍(理论篇) 一.核间通信 二.核间通信软件架构 三.Mailbox 设备驱动 3.1 Mailbox for ...
- Netty源码分析第1章(Netty启动流程)----第4节: 注册多路复用
Netty源码分析第1章(Netty启动流程)---->第4节: 注册多路复用 Netty源码分析第一章:Netty启动流程 第四节:注册多路复用 回顾下以上的小节, 我们知道了channe ...
最新文章
- 近期活动盘点:心电数据标注系统和深度学习诊断算法研究、2019年第六届清华大学大数据社会科学讲习班...
- linux删除U盘分区、创建分区和格式化
- 数据结构(四)---栈的顺序存储的实现---java版
- 校友会2019中国大学计算机,校友会2019中国计算机类一流专业排名,清华大学排名第一...
- mavenspringboot项目打包引入lib目录下jar包
- Python学习笔记--CSV模块读写数据(转)
- 使用Eclipse开发Android应用程序
- 【亲测】Ripro子主题美化X系列主题(夏系列)-开源未加密
- Mysql数据库常用命令总结
- 南开大学2017年数学分析高等代数考研试题
- CSP201903-1 小中大 (Python)
- 教程|最新空白昵称与透明头像设置方法教程,让自己在微信中隐藏掉
- 机械键盘win键和alt键反了
- Javascript的事件驱动
- oCPC实践录 | 广告算法工程师的自我修养
- 解决 plt.savefig() 生成空白图片的问题
- android原生见缝插针游戏自定义控件源码
- 4-3指令系统-CISC和RISC
- kalilinux链接蓝牙音响_Kali-Linux安驱动、使用Blueman连接蓝牙耳机
- Android系统架构与分层
热门文章
- 《SpringCloud Alibaba 微服务架构》专题(二十三)-Seat工作原理
- 第五节、Vim编辑器与Shell入门
- 智能视频抠图_抠图扣视频,AI人工智能还能这样玩!
- rovio环境配置与测试笔记
- matlab rtw 生成c代码,MATLAB Coder ——从MATLAB代码生成C/C++代码
- 云解析DNS使用教程
- MATLAB怎么求非线性度误差,用Matlab进行最小二乘法线性拟合(求传感器非线性误差、灵敏度)...
- 创业关于抖音的技术分析与同类产品如何弯道超车的几个问题
- 打印机连接计算机主机的,电脑怎样连接打印机,详细教您电脑如何连接网络打印机...
- 最值得收藏的 搜狗输入法 常用快捷键使用, 让你的效率成倍增加