Spark Application中可以有不同的Action触发多个Job,也就是说一个Application中可以有很多的Job,每个Job是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行。
然而Stage划分的依据就是宽依赖,什么时候产生宽依赖(产生shuffle)呢?例如reduceByKey,groupByKey等等。
DAGScheduler的handleJobSubmitted方法主要是用来创建最后一个stage,同时将job划分成多个stage。

一、stage的划分算法’

/**** 来处理这次提交的Job来处理这次提交的Job*/private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],allowLocal: Boolean,callSite: CallSite,listener: JobListener,properties: Properties = null){// 一、使用触发job的RDD最后一个stagevar finalStage: Stage = nulltry {// New stage creation may throw an exception if, for example, jobs are run on a// HadoopRDD whose underlying HDFS files have been deleted.// stage的划分是从最后一个stage往前倒序划分的,最后一个就是一个stage// 并将stage放入DAGschedule的缓存中finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)} catch {case e: Exception =>logWarning("Creating new stage failed due to exception - job: " + jobId, e)listener.jobFailed(e)return}if (finalStage != null) {// 用finalStage创建job,也就是说这个job最后一个stage,肯定就是finalstage// 这里就创建了一个job了val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)clearCacheLocs()logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(job.jobId, callSite.shortForm, partitions.length, allowLocal))logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")logInfo("Parents of final stage: " + finalStage.parents)logInfo("Missing parents: " + getMissingParentStages(finalStage))val shouldRunLocally =localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1val jobSubmissionTime = clock.getTimeMillis()if (shouldRunLocally) {// Compute very short actions like first() or take() with no parent stages locally.listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))runLocally(job)} else {// 三、将job添加到内存缓存中jobIdToActiveJob(jobId) = jobactiveJobs += jobfinalStage.resultOfJob = Some(job)val stageIds = jobIdToStageIds(jobId).toArrayval stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))// 四、提交最后一个stage// 这个方法会提交第一个stage 并把其余的stage放在缓存中submitStage(finalStage)}}// 在提交等待队列的stagesubmitWaitingStages()}
 /*** 提交stage的方法* 同时包含stage的划分算法* @param stage*/private def submitStage(stage: Stage) {val jobId = activeJobForStage(stage)if (jobId.isDefined) {logDebug("submitStage(" + stage + ")")if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {// 这个很重要,获取某个stage 的父stageval missing = getMissingParentStages(stage).sortBy(_.id)logDebug("missing: " + missing)// 如果返回为空if (missing == Nil) {logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")submitMissingTasks(stage, jobId.get)} else {// 继续递归调用划分stagefor (parent <- missing) {submitStage(parent)}// 同时将stage加入到等待队列waitingStages += stage}}} else {abortStage(stage, "No active job for stage " + stage.id)}}

获取某个stage的父stage

 /*** 获取stage划分的父stage* @param stage* @return*/private def getMissingParentStages(stage: Stage): List[Stage] = {val missing = new HashSet[Stage]val visited = new HashSet[RDD[_]]// We are manually maintaining a stack here to prevent StackOverflowError// caused by recursively visiting// 压栈的方式 先入后出val waitingForVisit = new Stack[RDD[_]]def visit(rdd: RDD[_]) {if (!visited(rdd)) {visited += rddif (getCacheLocs(rdd).contains(Nil)) {// 遍历rdd的依赖for (dep <- rdd.dependencies) {dep match {// 宽依赖(shuffle依赖)case shufDep: ShuffleDependency[_, _, _] =>// 使用那个宽依赖创建一个ShuffleMapStage,并且会将isshuffleMap设置为true// 那么默认最后一个stage,不是shuffleMap stage// 但是finalStage之前所有的stage,都是shuffleMap stageval mapStage = getShuffleMapStage(shufDep, stage.jobId)if (!mapStage.isAvailable) {missing += mapStage}// 窄依赖,直接压栈case narrowDep: NarrowDependency[_] =>waitingForVisit.push(narrowDep.rdd)}}}}}// 首先往栈中压入一个RDDwaitingForVisit.push(stage.rdd)//遍历RDDwhile (!waitingForVisit.isEmpty) {visit(waitingForVisit.pop())}missing.toList}

通过以上两个方法递归循环调用将所有的stage保存在waitingStages缓存中。

循环调用下面的方法将stage提交

private def submitWaitingStages() {// TODO: We might want to run this less often, when we are sure that something has become// runnable that wasn't before.logTrace("Checking for newly runnable parent stages")logTrace("running: " + runningStages)logTrace("waiting: " + waitingStages)logTrace("failed: " + failedStages)val waitingStagesCopy = waitingStages.toArraywaitingStages.clear()// 循环提交stagefor (stage <- waitingStagesCopy.sortBy(_.jobId)) {submitStage(stage)}}

stage划分算法的总结:
1、stage从finalstage倒推
2、通过宽依赖,来对新的stage进行划分提交
3、通过递归的方式,优先提交父stage
对于产生shuffle的算子,底层会产生三个RDD,分别是MappartitionRDD、shuffleRDD和MappartitionRDD,第一个MappartitionRDD和ShuffleRDD之间会产生shuffle,所以这个就是stage分配的分割点。

二、task的最佳位置计算算法

/*** 提交stage,为stage创建一批task,task 的数量与partition数量相同*/private def submitMissingTasks(stage: Stage, jobId: Int) {logDebug("submitMissingTasks(" + stage + ")")// Get our pending tasks and remember them in our pendingTasks entrystage.pendingTasks.clear()// First figure out the indexes of partition ids to compute.// 获取创建task的数量val partitionsToCompute: Seq[Int] = {if (stage.isShuffleMap) {(0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)} else {val job = stage.resultOfJob.get(0 until job.numPartitions).filter(id => !job.finished(id))}}val properties = if (jobIdToActiveJob.contains(jobId)) {jobIdToActiveJob(stage.jobId).properties} else {// this stage will be assigned to "default" poolnull}// 将stage加入到runningStagesrunningStages += stage// SparkListenerStageSubmitted should be posted before testing whether tasks are// serializable. If tasks are not serializable, a SparkListenerStageCompleted event// will be posted, which should always come after a corresponding SparkListenerStageSubmitted// event.stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))outputCommitCoordinator.stageStart(stage.id)listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast// the serialized copy of the RDD and for each task we will deserialize it, which means each// task gets a different copy of the RDD. This provides stronger isolation between tasks that// might modify state of objects referenced in their closures. This is necessary in Hadoop// where the JobConf/Configuration object is not thread-safe.var taskBinary: Broadcast[Array[Byte]] = nulltry {// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).// For ResultTask, serialize and broadcast (rdd, func).val taskBinaryBytes: Array[Byte] =if (stage.isShuffleMap) {closureSerializer.serialize((stage.rdd, stage.shuffleDep.get) : AnyRef).array()} else {closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : AnyRef).array()}taskBinary = sc.broadcast(taskBinaryBytes)} catch {// In the case of a failure during serialization, abort the stage.case e: NotSerializableException =>abortStage(stage, "Task not serializable: " + e.toString)runningStages -= stagereturncase NonFatal(e) =>abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")runningStages -= stagereturn}// 为stage创建指定数量的task// task最佳位置的计算算法// 最后一个stage的Task是ResultTask,其他的都是ShuffleMapTaskval tasks: Seq[Task[_]] = if (stage.isShuffleMap) {partitionsToCompute.map { id =>// 给每个partition创建一个task// 并计算task的最佳位置val locs = getPreferredLocs(stage.rdd, id)val part = stage.rdd.partitions(id)// 对于finalStage之外的stage,他的isshuffleMap设置为true// 所以会创建ShuffleMapTasknew ShuffleMapTask(stage.id, taskBinary, part, locs)}} else {// 不是ShuffleMapTask,那就是finalStage// finalStage,是用来创建ResultTask的val job = stage.resultOfJob.getpartitionsToCompute.map { id =>val p: Int = job.partitions(id)val part = stage.rdd.partitions(p)val locs = getPreferredLocs(stage.rdd, p)new ResultTask(stage.id, taskBinary, part, locs, id)}}if (tasks.size > 0) {logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")stage.pendingTasks ++= taskslogDebug("New pending tasks: " + stage.pendingTasks)// 最后通过taskScheduler提交task settaskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))stage.latestInfo.submissionTime = Some(clock.getTimeMillis())} else {// Because we posted SparkListenerStageSubmitted earlier, we should post// SparkListenerStageCompleted here in case there are no tasks to run.outputCommitCoordinator.stageEnd(stage.id)listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))logDebug("Stage " + stage + " is actually done; %b %d %d".format(stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))runningStages -= stage}}

总结:
1、一个stage内部会有很多个task来执行
2、task的位置是根据cache和checkpoint决定的。
3、从stage的最后一个RDD开始,去查找RDD的partition上寻找是不是被cache了还是checkpoint了,如果有的话那么最佳位置就是cache或者checkpoint的partition上,因为在这个partition上,就不需要计算以前的父RDD了
4、如果既有cache还有checkpoint,那么以cache的partition为准。
5、如果没有查找到最佳位置,那么最后由taskschedule来决定。
6、只有最后一个stage的task是resultTask,其他的都是shuffleMaptTask

Spark的stage划分算法源码分析相关推荐

  1. 《深入理解Spark:核心思想与源码分析》——1.2节Spark初体验

    本节书摘来自华章社区<深入理解Spark:核心思想与源码分析>一书中的第1章,第1.2节Spark初体验,作者耿嘉安,更多章节内容可以访问云栖社区"华章社区"公众号查看 ...

  2. 《深入理解Spark:核心思想与源码分析》——第1章环境准备

    本节书摘来自华章社区<深入理解Spark:核心思想与源码分析>一书中的第1章环境准备,作者耿嘉安,更多章节内容可以访问云栖社区"华章社区"公众号查看 第1章 环 境 准 ...

  3. 《深入理解Spark:核心思想与源码分析》——3.10节创建和启动ExecutorAllocationManager...

    本节书摘来自华章社区<深入理解Spark:核心思想与源码分析>一书中的第3章,第3.10节创建和启动ExecutorAllocationManager,作者耿嘉安,更多章节内容可以访问云栖 ...

  4. 《深入理解Spark:核心思想与源码分析》——1.3节阅读环境准备

    本节书摘来自华章社区<深入理解Spark:核心思想与源码分析>一书中的第1章,第1.3节阅读环境准备,作者耿嘉安,更多章节内容可以访问云栖社区"华章社区"公众号查看 1 ...

  5. Spark MLlib: Decision Tree源码分析

    http://spark.apache.org/docs/latest/mllib-decision-tree.html 以决策树作为开始,因为简单,而且也比较容易用到,当前的boosting或ran ...

  6. Spark MLlib矩阵分解源码分析

    基础知识 特征值分解 如果一个向量 vv 是方阵 AA 的特征向量,可以表示成下面的形式: Av=λv Av = \lambda v 其中, λ\lambda 为特征向量 vv 对应的特征值,矩阵 A ...

  7. spark读取文件源码分析-3

    本篇是spark read一个parquet源码分析的第三篇,这一篇主要介绍spark的默认的partition的设置逻辑,当然,这一篇实际上算不上源码分析了 第一篇 第二篇 1 . userProf ...

  8. 深入理解Spark:核心思想与源码分析

    大数据技术丛书 深入理解Spark:核心思想与源码分析 耿嘉安 著 图书在版编目(CIP)数据 深入理解Spark:核心思想与源码分析/耿嘉安著. -北京:机械工业出版社,2015.12 (大数据技术 ...

  9. spark 源码分析之二十 -- Stage的提交

    引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...

最新文章

  1. 萨默尔机器人_沣东新城王寺街道《民法典》企业宣传活动走进西安萨默尔科技...
  2. 广义互相关的公式,这一文都搜集全了
  3. python基础代码-python基础代码
  4. 将STM32的标准库编译成lib
  5. 064_let关键字
  6. oracle record is locked by another user
  7. github标星11600+:最全的吴恩达机器学习课程资源(完整笔记、中英文字幕视频、python作业,提供百度云镜像!)...
  8. ROS中配置主从机需注意的几点
  9. Tomcat源码解析三:tomcat的启动过程
  10. 移植开源QT软件-SameGame
  11. Java 8 到 Java 14,改变了哪些你写代码的方式?
  12. [七]JavaIO之 PipedInputStream 和 PipedInputStream
  13. [转载] Python中NumPy简介及使用举例
  14. [转载]幂等和高并发在电商系统中的使用
  15. C 标准库中输出到字符串、到文件的相关函数
  16. 两台服务器怎么发文件,两台服务器怎么发文件
  17. ELF、BIN、HEX、AXF的区别,资料整理
  18. 电商行业短信平台选择,电商行业短信通道选择考虑的问题
  19. 有哪些在家健身的软件?Mac健身软件推荐
  20. 股票市场行情走势图绘制

热门文章

  1. 一年有几个月几个季度_胎教几个月开始 注意事项有哪些?
  2. arm优化编译参数选项解释
  3. html中的expand属性,expand的用法总结大全
  4. 恒生java开发复试_2019恒生电子面试经验(JAVA开发人员,实施工程师等)
  5. Altium designer中元器件库(SCHLIB)元件引脚上文字(标号)大小及距离边缘位置设置
  6. ubuntu如何安装linux驱动程序,Ubuntu下如何安装驱动程序和应用软件?
  7. 手把手教学电瓶车进电梯检测、多类别车辆追踪、异常行为检测产业级应用
  8. dart和python混编,Flutter与iOS混编(一)
  9. .net 新添加的项目未加载_JDK 13 新特性一览
  10. JAVA揭竿而起总要有名号