回忆一下,每个Stage的上边界,要么需要从外部存储读取数据,要么需要读取上一个Stage的输出;而下边界,要么是需要写入本地文件系统(需要Shuffle),以供childStage读取,要么是最后一个Stage,需要输出结果。这里的Stage,在运行时的时候就是可以以pipeline的方式运行的一组Task,除了最后一个Stage对应的是ResultTask,其余的Stage对应的都是ShuffleMap Task。

而除了需要从外部存储读取数据和RDD已经做过cache或者checkpoint的Task,一般Task的开始都是从ShuffledRDD的ShuffleRead开始的。本节将详细讲解Shuffle Read的过程。

先看一下ShuffleRead的整体架构图。

org.apache.spark.rdd.ShuffledRDD#compute 开始,通过调用org.apache.spark.shuffle.ShuffleManager的getReader方法,获取到org.apache.spark.shuffle.ShuffleReader,然后调用其read()方法进行读取。在Spark1.2.0中,不管是Hash BasedShuffle或者是Sort BasedShuffle,内置的Shuffle Reader都是 org.apache.spark.shuffle.hash.HashShuffleReader。核心实现:

 override def read(): Iterator[Product2[K, C]] = {
val ser =Serializer.getSerializer(dep.serializer)
// 获取结果val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId,startPartition, context, ser)// 处理结果val aggregatedIter: Iterator[Product2[K, C]] = if(dep.aggregator.isDefined) {//需要聚合if (dep.mapSideCombine) {//需要map side的聚合new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))} else {//只需要reducer端的聚合new InterruptibleIterator(context,dep.aggregator.get.combineValuesByKey(iter, context))}}else { // 无需聚合操作iter.asInstanceOf[Iterator[Product2[K,C]]].map(pair => (pair._1, pair._2))}// Sort the output if there is a sort ordering defined.dep.keyOrdering match {//判断是否需要排序case Some(keyOrd: Ordering[K]) => //对于需要排序的情况// 使用ExternalSorter进行排序,注意如果spark.shuffle.spill是false,那么数据是// 不会spill到硬盘的val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd),serializer= Some(ser))sorter.insertAll(aggregatedIter)context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilledcontext.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilledsorter.iteratorcase None => //无需排序aggregatedIter}}

org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher#fetch会获得数据,它首先会通过

org.apache.spark.MapOutputTracker#getServerStatuses来获得数据的meta信息,这个过程有可能需要向org.apache.spark.MapOutputTrackerMasterActor发送读请求,这个读请求是在org.apache.spark.MapOutputTracker#askTracker发出的。在获得了数据的meta信息后,它会将这些数据存入Seq[(BlockManagerId,Seq[(BlockId, Long)])]中,然后调用org.apache.spark.storage.ShuffleBlockFetcherIterator最终发起请求。ShuffleBlockFetcherIterator根据数据的本地性原则进行数据获取。如果数据在本地,那么会调用org.apache.spark.storage.BlockManager#getBlockData进行本地数据块的读取。而getBlockData对于shuffle类型的数据,会调用ShuffleManager的ShuffleBlockManager的getBlockData。

如果数据在其他的Executor上,那么如果用户使用的spark.shuffle.blockTransferService是netty,那么就会通过org.apache.spark.network.netty.NettyBlockTransferService#fetchBlocks获取;如果使用的是nio,那么就会通过org.apache.spark.network.nio.NioBlockTransferService#fetchBlocks获取。

数据读取策略的划分

org.apache.spark.storage.ShuffleBlockFetcherIterator会通过splitLocalRemoteBlocks划分数据的读取策略:如果在本地有,那么可以直接从BlockManager中获取数据;如果需要从其他的节点上获取,那么需要走网络。由于Shuffle的数据量可能会很大,因此这里的网络读有以下的策略:

1)       每次最多启动5个线程去最多5个节点上读取数据

2)       每次请求的数据大小不会超过spark.reducer.maxMbInFlight(默认值为48MB)/5

这样做的原因有几个:

1)  避免占用目标机器的过多带宽,在千兆网卡为主流的今天,带宽还是比较重要的。如果机器使用的万兆网卡,那么可以通过设置spark.reducer.maxMbInFlight来充分利用带宽。

2)  请求数据可以平行化,这样请求数据的时间可以大大减少。请求数据的总时间就是请求中耗时最长的。这样可以缓解一个节点出现网络拥塞时的影响。

主要的实现:

private[this] def splitLocalRemoteBlocks():ArrayBuffer[FetchRequest] = {val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)val remoteRequests = new ArrayBuffer[FetchRequest]for ((address, blockInfos) <- blocksByAddress) {if (address.executorId == blockManager.blockManagerId.executorId) {// Block在本地,需要过滤大小为0的block。localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)numBlocksToFetch += localBlocks.size} else { //需要远程获取的Blockval iterator = blockInfos.iteratorvar curRequestSize = 0Lvar curBlocks = new ArrayBuffer[(BlockId, Long)]while (iterator.hasNext) {//blockId 是org.apache.spark.storage.ShuffleBlockId,// 格式:"shuffle_" +shuffleId + "_" + mapId + "_" + reduceIdval (blockId, size) = iterator.next()// Skip empty blocksif (size > 0) {curBlocks += ((blockId, size))remoteBlocks += blockIdnumBlocksToFetch += 1curRequestSize += size}if (curRequestSize >= targetRequestSize) {// 当前总的size已经可以批量放入一次request中remoteRequests += new FetchRequest(address, curBlocks)curBlocks = new ArrayBuffer[(BlockId, Long)]curRequestSize = 0}}// 剩余的请求组成一次requestif (curBlocks.nonEmpty) {remoteRequests += new FetchRequest(address, curBlocks)}}}remoteRequests}

本地读取

fetchLocalBlocks() 负责本地Block的获取。在splitLocalRemoteBlocks中,已经将本地的Block列表存入了localBlocks:private[this] val localBlocks = newArrayBuffer[BlockId]()

具体过程如下:

  val iter = localBlocks.iteratorwhile (iter.hasNext) {val blockId = iter.next()try {val buf = blockManager.getBlockData(blockId)shuffleMetrics.localBlocksFetched += 1buf.retain()results.put(new SuccessFetchResult(blockId, 0, buf))} catch {}}

而blockManager.getBlockData(blockId)的实现是:

override def getBlockData(blockId:BlockId): ManagedBuffer = {if (blockId.isShuffle) {shuffleManager.shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
}

这就调用了ShuffleBlockManager的getBlockData方法。在Shuffle Pluggable框架中我们介绍了实现一个Shuffle Service之一就是要实现ShuffleBlockManager。

以Hash BasedShuffle为例,它的ShuffleBlockManager是org.apache.spark.shuffle.FileShuffleBlockManager。FileShuffleBlockManager有两种情况,一种是File consolidate的,这种的话需要根据Map ID和 Reduce ID首先获得FileGroup的一个文件,然后根据在文件中的offset和size来获取需要的数据;如果是没有File consolidate,那么直接根据Shuffle Block ID直接读取整个文件就可以。

override def getBlockData(blockId:ShuffleBlockId): ManagedBuffer = {if (consolidateShuffleFiles) {val shuffleState = shuffleStates(blockId.shuffleId)val iter = shuffleState.allFileGroups.iterator
while(iter.hasNext) {// 根据Map ID和Reduce ID获取File Segment的信息val segmentOpt = iter.next.getFileSegmentFor(blockId.mapId,blockId.reduceId)if (segmentOpt.isDefined) {val segment = segmentOpt.get// 根据File Segment的信息,从FileGroup中找到相应的File和Block在// 文件中的offset和sizereturn new FileSegmentManagedBuffer(transportConf, segment.file, segment.offset, segment.length)}}throw new IllegalStateException("Failed to find shuffle block:" + blockId)}else {val file = blockManager.diskBlockManager.getFile(blockId) //直接获取文件句柄new FileSegmentManagedBuffer(transportConf, file, 0, file.length)}}

对于Sort BasedShuffle,它需要通过索引文件来获得数据块在数据文件中的具体位置信息,从而读取这个数据。

具体实现在org.apache.spark.shuffle.IndexShuffleBlockManager#getBlockData中。

override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {// 根据ShuffleID和MapID从org.apache.spark.storage.DiskBlockManager 获取索引文件val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)val in = new DataInputStream(new FileInputStream(indexFile))try {ByteStreams.skipFully(in, blockId.reduceId * 8) //跳到本次Block的数据区val offset = in.readLong() //数据文件中的开始位置val nextOffset = in.readLong() //数据文件中的结束位置new FileSegmentManagedBuffer(transportConf,getDataFile(blockId.shuffleId, blockId.mapId),offset,nextOffset - offset)}finally {in.close()}}
如果您喜欢 本文,那么请动一下手指支持以下博客之星的评比吧。非常感谢您的投票。每天可以一票哦。
点我投票

Spark技术内幕:Shuffle Read的整体流程相关推荐

  1. Spark技术内幕: Task向Executor提交的源代码解析

    在上文<Spark技术内幕:Stage划分及提交源代码分析>中,我们分析了Stage的生成和提交.可是Stage的提交,仅仅是DAGScheduler完毕了对DAG的划分,生成了一个计算拓 ...

  2. Spring技术内幕:设计理念和整体架构概述

    为什么80%的码农都做不了架构师?>>>    程序员都很崇拜技术大神,很大一部分是因为他们发现和解决问题的能力,特别是线上出现紧急问题时,总是能够快速定位和解决. 一方面,他们有深 ...

  3. 我的第一本著作:Spark技术内幕上市!

    现在各大网站销售中! 京东:http://item.jd.com/11770787.html 当当:http://product.dangdang.com/23776595.html 亚马逊:http ...

  4. Spark技术内幕:Stage划分及提交源码分析

    当触发一个RDD的action后,以count为例,调用关系如下: org.apache.spark.rdd.RDD#count org.apache.spark.SparkContext#runJo ...

  5. Spark技术内幕:究竟什么是RDD

    RDD是Spark最基本,也是最根本的数据抽象.http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf 是关于RDD的论文.如果觉得英 ...

  6. spark源码-shuffle原理分析-1-ShuffleWriter

    ShuffleWriter 1.概述 2.ShuffleHandle注册 2.1.注册时间点 2.2.向shuffleManager注册shuffle 2.2.1.BypassMergeSortShu ...

  7. 王家林大咖新书预发布:清华大学出版社即将出版《Spark大数据商业实战三部曲:内核解密|商业案例|性能调优》第二版 及《企业级AI技术内幕讲解》

    王家林大咖新书预发布:清华大学出版社即将出版<Spark大数据商业实战三部曲:内核解密|商业案例|性能调优>第二版,新书在第一版的基础上以Spark 2.4.3版本全面更新源码,并以Ten ...

  8. 技术内幕 | StarRocks Community Champion、阿里云技术专家解读 Optimizer 实现

    作者:范振(花名辰繁),阿里云计算平台-开源大数据-OLAP方向负责人,高级技术专家,StarRocks Community Champion 随着阿里云EMR StarRocks 上线,在和用户交流 ...

  9. Spark技术在京东智能供应链预测的应用

    Spark技术在京东智能供应链预测的应用 原创 2017-03-06 杨冬越 郭景瞻 大数据杂谈 大家晚上好,做一个简单的介绍:我叫郭景瞻,来自京东,著有<图解Spark:核心技术与案例实战&g ...

最新文章

  1. Windows系统运维转linux系统运维的经历
  2. mootools1.3.1源码解读
  3. 住房要注意用电安全-记录一下失火
  4. python小游戏源码-python21点小游戏源码免费下载
  5. 【数字信号处理】相关函数 ( 卷积与交换性 | 相关函数不具有交换性 | 推导过程 )
  6. emacs org-mode文件转html文件
  7. cdh mysql sqoop 驱动_大数据技术之Sqoop学习——原理、安装、使用案例、常用命令...
  8. 除了云原生,2021 年还有这八大趋势值得关注
  9. Centos7yum源配置PID锁定问题
  10. xxx.pch(No such file or directory)
  11. Oracle监听注册和sqlnet,Oracle监听配置(四)--如何实现静态、动态注册
  12. Java的static关键字使用
  13. 常见的web前端面试试题(含答案)
  14. WordPress文章页面获取评论次数
  15. 天下数据解析域名及域名转向
  16. Android11 手动屏幕亮度调节流程代码追踪;
  17. 菜鸟学网站开发入门之(一)—— html 入门
  18. 中船嘉年华邮轮揭幕全新企业品牌标识;美国运通全球商务旅行完成对Expedia集团旗下易信达的收购 | 全球旅报...
  19. 3.摄像模组之Golden模组
  20. MVC之前的那点事儿系列(6):动态注册HttpModule

热门文章

  1. 使用Apache Commons Configuration读取配置信息
  2. 基于最优化方法的超宽带通信信号设计
  3. samba网络服务的搭建和配置
  4. 为保障处理器平稳运行请“三知”cpu
  5. 父窗口控制弹出窗口快捷键ctrl+c关闭
  6. oracle系列(三)oracle的配置与管理
  7. 微信小程序开发之选项卡
  8. LINUX分区空间扩容操作
  9. Hibernate关系映射 一对一双向外键关联@OneToOne Annotation方式
  10. sybase备份问题