[源码] Spark如何划分Stage

文章目录

  • [源码] Spark如何划分Stage
    • 大家好,我是一拳就能打爆你A柱的猛男
    • 1、Stage的介绍
      • 1.1 Stage是什么概念以及Stage的划分
      • 1.3 Stage的意义
    • 2、从SparkPi定位Stage划分代码
    • 3、Spark划分Stage代码解读
    • 4、总结

大家好,我是一拳就能打爆你A柱的猛男

太久不写博客了,今天写的博客可能有点生疏,各位将就看一下。不知道有没有人跟我一样好奇Spark如何划分Stage的,今天翻一下源码给大家看看。

1、Stage的介绍

这部分还是给一点前置知识,如果对Stage有不了解或者了解不清楚的同学可以看一下,如果比较熟悉可以直接跳到第二节。

1.1 Stage是什么概念以及Stage的划分

我们接触Spark的时候一定会接触到下面几个名词:

  • 1、Application:Application就是用户所提交的应用程序,比如写了一个wordcount打成jar提交,这就是一个Application
  • 2、Job:一个Application是由多个Job组成的,Job可以理解成一连串连续的动作(自己的理解,后续会解释)
  • 3、Stage:一个Job里会包含多个Stage,Stage以shuffle切分,其中Stage又会分为ShuffleMapStage和ResultStage两种
  • 4、Task:Task是存在于Stage内的概念,是实际执行的单元,也分为ShuffleMapTask和ResultTask两种

这几个介绍说实话,我看了好多次了,都背下来了。但是我相信很多人还是没有真真切切的搞明白,下面再用一张图解释(也是很多博客 的做法):

这里讲的比较啰嗦,但是应该能帮助大家更好理解 。上面这张图可以看作是一个Job,包含了三个虚线框,这就是三个Stage。每个大写字母旁边的圆角矩形就是一个RDD,RDD内部的小矩形就是一个RDD块。从RDD A到RDD B的笛卡尔积映射关系就是shuffle操作,而RDD C到RDD D这种一对一的映射就是map操作。

Stage的划分从最后一个RDD开始,且按照深度优先的方式遍历,这一点很重要,所以我们从RDD G开始往前看。步骤如下:

  • 1、RDD G -> RDD B是一个map操作,所以会并入G对应的Stage
  • 2、RDD B -> RDD A是一个shuffle操作,所以会不会并入B的Stage,也不是G的Stage的成员
  • 3、RDD A往前就没有RDD了,所以RDD A自成一个Stage,这时StageId=1
  • 4、回到G,G -> F是一个 shuffle操作,所以不会 并入G的Stage
  • 5、F->E是一个map操作,所以并入F的Stage
  • 6、E往前没有RDD,所以停止遍历
  • 7、F->D是一个map操作,所以并入F的Stage
  • 8、D->C是一个map操作,所以并入F的Stage
  • 9、C往前没有RDD,所以停止遍历,F为起点的遍历结束,此时StageId=2
  • 10、G的遍历因为其他遍历的结束而结束,StageId=3

得到下面的表格:

StageId Rdds
1 A
2 C D E F
3 B G

此时可以很清楚的看到,Stage包含了 RDD,而且因为RDD之间有血缘关系,即前一个RDD“计算”结束后才能进行下一个RDD的操作。所以 Stage的执行也有顺序,即Stage从后往前划分,从前往后执行。

1.3 Stage的意义

那么经过上面的操作到底有什么意义呢?为什么不直接按照用户代码给的顺序直接执行下来就完了呢?

观察上面这个图可以发现,shuffle的操作需要所有的RDD块都到位之后才能执行,但是map操作好像并不需要。Stage2中分两台机执行的话,一台机执行要执行两次map操作然后在F处等待即可,另一台机则执行一次map操作就可以等待,全部执行完毕后就可以做shuffle了。

大家可以想象一下一个工厂的流水线上,有的流水线只需要盖个章,而有的流水线需要将商品分发到不同的流水线上。在Spark中数据就想流水线上的商品,盖章的(map)操作哪怕再多,只要不需要跟其他机器的数据有交互,都可以一次性执行下来。而分发的(shuffle)操作,则需要等待这一批的商品(数据)都到齐后,才能分发。

Stage就是按照shuffle划分的,而Stage划分的意义就是将所有的盖章(map)操作放在一起,不做停留的执行,从而提高效率。

这一段比喻我也不知道恰不恰当,各位辩证看待。

2、从SparkPi定位Stage划分代码

我手头刚好是spark-2.2.3的工程,所以就用这套代码了,如果是用的更新的工程也没关系,因为基本上没有大改动。

打开Spark工程找到examples模块下的SparkPi:

object SparkPi {def main(args: Array[String]) {val spark = SparkSession.builder.master("local[*]").appName("Spark Pi").getOrCreate()val slices = if (args.length > 0) args(0).toInt else 2val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflowval count = spark.sparkContext.parallelize(1 until n, slices).map { i =>val x = random * 2 - 1val y = random * 2 - 1if (x * x + y * y <= 1) 1 else 0}.reduce(_ + _) // action算子提交任务println("Pi is roughly " + 4.0 * count / (n - 1))spark.stop()}
}

可以看到只有一个reduce这一个Action算子,即负责提交Job的算子。从这里进入看到下面的代码:

def reduce(f: (T, T) => T): T = withScope {...// 其他都可以不看先,先看runjobsc.runJob(this, reducePartition, mergeResult)// Get the final result out of our Option, or throw an exception if the RDD was emptyjobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

可以看到runJob就是提交Job,而我们知道Job中包含多个Stage,所以划分Stage的代码必然在里面,点进入看到下面代码:

def runJob[T, U: ClassTag](rdd: RDD[T],processPartition: Iterator[T] => U,resultHandler: (Int, U) => Unit)
{val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter)runJob[T, U](rdd, processFunc, 0 until rdd.partitions.length, resultHandler)
}继续点:
def runJob[T, U: ClassTag](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],resultHandler: (Int, U) => Unit): Unit = {...dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)progressBar.foreach(_.finishAll())rdd.doCheckpoint()
}

这里可以看到dagScheduler.runJob,也就是说在SparkContext类里维护了一个DAGScheduler对象,提交Job的时候实际上是调用了dagScheduler.runJob函数,点进去:
可以看到dagScheduler.runJob里做了三件事:

  • 1、记录Job执行时间
  • 2、执行Job
  • 3、对执行结果做处理
def runJob[T, U](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: CallSite,resultHandler: (Int, U) => Unit,properties: Properties): Unit = {val start = System.nanoTimeval waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)waiter.completionFuture.value.get match {case scala.util.Success(_) =>logInfo("Job %d finished: %s, took %f s".format(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))case scala.util.Failure(exception) =>logInfo("Job %d failed: %s, took %f s".format(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.val callerStackTrace = Thread.currentThread().getStackTrace.tailexception.setStackTrace(exception.getStackTrace ++ callerStackTrace)throw exception}
}

重点在执行Job的步骤,val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

跳转到submitJob函数中,这里还是没到划分的代码:

def submitJob[T, U](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: CallSite,resultHandler: (Int, U) => Unit,properties: Properties): JobWaiter[U] = {...eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,SerializationUtils.clone(properties)))waiter
}

这里提交给eventProcessLoop去执行JobSubmitted这个事件了。

注:Spark的RPC通讯(Spark各节点的通讯)以及*EventLoop都是通过事件来发送命令的,也没那么神奇,这些事件就是一个case class,当然事件这个说法是我自己想的,反正就是那么个东西。

接上面的 eventProcessLoop.post,发送了一个JobSubmitted事件,下面点击eventProcessLoop

private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)

接着点击DAGSchedulerEventProcessLoop

private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {...private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

可以看到doOnReceive函数里有对事件类型的判断,第一个就是JobSubmitted,可以看到实际上调用了dagScheduler.handleJobSubmitted函数,继续点击到这个函数内:

private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],callSite: CallSite,listener: JobListener,properties: Properties) {...val stageIds = jobIdToStageIds(jobId).toArrayval stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))submitStage(finalStage)}

**最后一行就是提交Stage的代码了。终于找到了,蛮不容易的。**走这一遍过程其实还是有收获的,除了找到Stage划分的位置之外,也可以无意间了解到Spark有一种特别的调用机制,就是事件调用的方式。其实通过名字可以看出来*EventProcessLoop就是事件处理循环,是*EventLoop的具体实现类。这些类实际上就是一个阻塞线程等待处理相应的事件:

private[spark] abstract class EventLoop[E](name: String) extends Logging {...private val eventThread = new Thread(name) {setDaemon(true)override def run(): Unit = {try {while (!stopped.get) {val event = eventQueue.take()try {onReceive(event)} catch {case NonFatal(e) =>try {onError(e)} catch {case NonFatal(e) => logError("Unexpected error in " + name, e)}}}} catch {case ie: InterruptedException => // exit even if eventQueue is not emptycase NonFatal(e) => logError("Unexpected error in " + name, e)}}}

3、Spark划分Stage代码解读

接下来开始读Stage的划分的代码了,其实这部分蛮简单的,也可以作为刚了解Spark的同学想要读Spark源码的练手册,提升信心。

接下来的代码我尽量逐行的注释,同时会附上完整的代码,可能代码块会长一点,各位注意。点开submitStage(finalStage)函数,进入函数内部:

private def submitStage(stage: Stage) {// 找到Stage对应的JobIdval jobId = activeJobForStage(stage)// 若找得到则继续划分Stage,执行提交Stage的动作if (jobId.isDefined) {logDebug("submitStage(" + stage + ")")// 这里有三个HashSet,分别注释一下:// 1、waitingStages:用来装父Stage没有执行完毕的Stage// 2、runningStages:用来装正在执行的Stage// 3、failedStages:用来装执行失败,将要重新执行的Stage// 下面的判断显然是针对第一次进入的Stage,假如Stage有未执行的父Stage、或者Stage正在运行或者需要重新执行,则没法进入 if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {// 下面getMissingParentStages函数是找到当前Stage的所有父Stage,并按照StageId排序val missing = getMissingParentStages(stage).sortBy(_.id)logDebug("missing: " + missing)// 若当前Stage的父Stage为空,则说明走到头了,可以执行if (missing.isEmpty) {logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")// 执行Stage的函数在这里submitMissingTasks(stage, jobId.get)} else {// 若找得到父Stage,则还需要递归的进入当前函数for (parent <- missing) {submitStage(parent)}// 并且将当前stage加入等待执行的set中waitingStages += stage}}// 若是没找到则终止所有需要使用到当前 入参Stage的Job} else {abortStage(stage, "No active job for stage " + stage.id, None)}
}

经过观察上面的代码,可以看到几个细节:

  • 1、针对Stage的执行,Spark分为3类

    • 1 等待执行的Stage
    • 2 正在执行的Stage
    • 3 需要重新执行的Stage
  • 2、Stage的执行是按照递归进行的,也就是说针对当前入参Stage,Spark会往前找父Stage,一直到没有父Stage的时候才会执行Stage的操作(这点证实了Stage从前往后执行
  • 3、针对找不到JobId的Stage,Spark可能会对所有需要该Stage的Job都做终止操作(不知道理解的对不对)
  • 4、其实这里并没有划分Stage的过程,而划分过程显然放在找父Stage的函数getMissingParentStages
  • 5、具体提交Stage的代码在submitMissingTasks函数里

点击getMissingParentStages进入函数:

// 这里还是蛮简单的,就是用栈做一个深度优先搜索,根据RDD的血缘关系以及依赖类型划分Stage
private def getMissingParentStages(stage: Stage): List[Stage] = {// 装父Stageval missing = new HashSet[Stage]// 用于DFS的setval visited = new HashSet[RDD[_]]// 用于DFS的栈val waitingForVisit = new Stack[RDD[_]]// 这里定义了一个访问函数,具体的划分代码在这里def visit(rdd: RDD[_]) {if (!visited(rdd)) {visited += rdd// 这个函数从名字来看就是确定RDD内各个分区(partition)所处的节点位置val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)if (rddHasUncachedPartitions) {// 从这开始遍历RDD所有的父RDD,并且对RDD的依赖做判断// 若当前RDD到父RDD的依赖是ShuffleDependency,则证明是一个Stage的划分点// 若当前RDD到父RDD的依赖是NarrowDependency,则证明当前RDD和父RDD属于同一个Stagefor (dep <- rdd.dependencies) {dep match {case shufDep: ShuffleDependency[_, _, _] =>// 下面这行操作还没搞清楚,不敢乱说// 还是乱说一下把,这行蛮重要的,就是如果当前Stage是已经访问过的,就直接返回已经构造好的Stage// 如果没访问过,也就是Stage第一次出现,则会构造一个带血缘关系的Stage,当然在提交的时候会根据血缘// 重新构造父Stageval mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)if (!mapStage.isAvailable) {missing += mapStage}case narrowDep: NarrowDependency[_] =>// 若是窄依赖,则将父RDD放入栈中等待进一步的遍历waitingForVisit.push(narrowDep.rdd)}}}}}// DFS第一步,先把Job里最尾巴的Stage的rdd放进等待访问的栈waitingForVisit.push(stage.rdd)// 接下来就是弹栈,访问,划分,压栈的操作,全在visit里while (waitingForVisit.nonEmpty) {visit(waitingForVisit.pop())}// 经过完整的血缘遍历,得到当前Stage所有的父Stage,返回missing.toList
}

所以具体的划分代码已经看完了,有下面几个细节可以注意一下:

  • 1、Stage通过RDD血缘关系的方式往前遍历(证实Stage从后往前遍历
  • 2、Stage的划分依据是Shuffle依赖
  • 3、划分Stage的具体方式是用深度优先遍历实现的

在这里还是留了一点遗憾,就是getOrCreateShuffleMapStage函数的具体内容没有搞清楚,接下来有机会再仔仔细细的看一遍。

4、总结

其实Stage划分的代码是相对来说很简单的,只要找对位置,基本上就能够看得懂。通过去看Spark-core中Stage划分的代码,可以帮助刚接触Spark的同学更好地了解Stage的概念,也可以提高看源码的信心。

总结一下,Stage到底是什么:

感性的解释:

Stage就像是工厂流水线中的一组流水线,而map操作就好像其中的一个流水线。玩过戴森球计划的同学肯定很清楚流水线设计的重要性。map操作就像给商品做一个单一的操作,不需要与其他流水线上的商品有交集,所以可以无顾忌的马力全开。而Shuffle操作就好像一套流水线走完之后的装车、分发操作,将map做好的商品分发到各个节点(流水线)进行下一组的封装。(这里可以再想想一个 工厂做手机屏幕,另一个工厂做手机中框,几百个工厂 同时火力全开做单个零件,最终装车shuffle到苹果、华为的富士康流水线上做组装)

根据代码来的解释:

Stage就是Job里的一个阶段,这个Stage会负责一些操作,然后等所有数据准备好后,shuffle到各节点。Stage划分从后往前划分,而执行的时候却要从前往后执行,因为Stage有前后的依赖关系。

太久不写博客了,欢迎各位大佬指教。

[源码] Spark如何划分Stage相关推荐

  1. CC00082.spark——|HadoopSpark.V08|——|Spark.v08|Spark 原理 源码|Spark Context|

    一.SparkContext启动流程 ### --- sparkContext启动流程~~~ SparkContext 涉及到的组件多,源码比较庞大. ~~~ 有些边缘性的模块主要起到辅助的功能,暂时 ...

  2. spark的java源码,Spark源码包的编译

    Spark源码包的编译和部署生成,其本质只有两种:Maven和SBT (Simple Build Tool), 只不过针对不同的场景而已: Maven编译 SBT编译 IntelliJ IDEA编译( ...

  3. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  4. spark任务运行源码

    spark任务运行源码 spark是一个分布式计算引擎,任务的运行是计算引擎的核心. 一个spark任务怎么能运行起来呢? 1 spark服务启动(Master,Worker): 2 应用程序提交 3 ...

  5. Spark源码阅读(五) --- Spark的支持的join方式以及join策略

    版本变动 2021-08-30 增加了对Broadcast Hash Join小表大小的评估内容 增加了对Sort Merge Join优于Shuffle Hash Join调用的解释 目录 Spar ...

  6. spark shell 删除失效_Spark任务提交源码解析

    1. 前言 反反复复捣鼓了很久,终于开始学习Spark的源码了,果不其然,那真的很有趣.这里我打算一本正经的胡说八道来讲一下Spark作业的提交过程. 基础mac系统基础环境如下: JDK 1.8 I ...

  7. 这样读源码,不牛X也难

    程序员在工作过程中,会遇到很多需要阅读源码的场景,比如技术预研.选择技术框架.接手以前的项目.review他人的代码.维护老产品等等.可以说,阅读源代码是程序员的基本功,这项基本功是否扎实,会在很大程 ...

  8. Java集合---Arrays类源码解析

    一.Arrays.sort()数组排序 Java Arrays中提供了对所有类型的排序.其中主要分为Primitive(8种基本类型)和Object两大类. 基本类型:采用调优的快速排序: 对象类型: ...

  9. linux boost 1.53编译,boost-1.53源码编译

    1.下载源码.首先得明白,boost大多数库都只有头文件,链接时不需Lib文件:以下库需要单独编译成库: Boost.Filesystem Boost.GraphParallel Boost.IOSt ...

最新文章

  1. 电脑ip地址设置_关于电脑的远程开机(唤醒)
  2. 搜索引擎优化 SEO
  3. 5-35 城市间紧急救援 (25分) pat 数据结构
  4. SvsUtil.exe生成服务文件
  5. 启动Activity的两种方式startActivity和startActivityForResult(一)
  6. VC资源分配、释放表
  7. 【MySQL】Linux端-实现Mysql数据定时自动备份
  8. 一个月读完6本书?这些烧脑神书,你能读完1本,就是学霸!
  9. 电力系统分析第三版 第四章 电力系统潮流的计算机算法 习题答案,电力系统分析 第四章 电力系统潮流的计算机算法.ppt...
  10. 一路走来之“华为网络技术大赛”(新含2018年资料)
  11. ffmpeg解码器使用
  12. 3D打印机USB联机打印是如何实现的?(以Cura插件USBPrinting为例)
  13. LAMP环境的搭建与部署
  14. Tech Talk| Redmi K50 电竞版手机极致散热技术详解
  15. [Unity] UGUI学习笔记
  16. 【动态规划】图像压缩问题
  17. 58同城、前程无忧在前,快手做招聘能否弯道超车?
  18. 绝望的主妇第八季/Desperate Housewives迅雷下载
  19. 宇宙代码与磁子计算机,新认识!宇宙产生什么最强磁铁?比人类产生的最强磁场强一亿倍...
  20. 风控模型—WOE与IV指标的深入理解应用

热门文章

  1. python绘制风向玫瑰图
  2. 宏杰文件夹加密V2878【永久免费的文件加密软件】
  3. tcping 命令 工具
  4. html语言可以写微信小程序吗,微信小程序 语句
  5. 苹果笔记本没有计算机管理员,苹果电脑忘记管理员密码怎么办_mac电脑忘记管理员密码的处理办法...
  6. 不说话的“狼人杀”游戏
  7. 基于matlab电力系统的牛顿-拉夫逊法进行潮流计算
  8. 七牛云存储_入门及使用
  9. 快速搭建自己的 KMS 服务器
  10. 学python数据挖掘自学路线图_python零基础学习路线图