当触发一个RDD的action后,以count为例,调用关系如下:

  1. org.apache.spark.rdd.RDD#count

  2. org.apache.spark.SparkContext#runJob

  3. org.apache.spark.scheduler.DAGScheduler#runJob

  4. org.apache.spark.scheduler.DAGScheduler#submitJob

  5. org.apache.spark.scheduler.DAGSchedulerEventProcessActor#receive(JobSubmitted)

  6. org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted

其 中步骤五的DAGSchedulerEventProcessActor是DAGScheduler 的与外部交互的接口代理,DAGScheduler在创建时会创建名字为eventProcessActor的actor。这个actor的作用看它的实 现就一目了然了:

[java] view plaincopy

  1. /**

  2. * The main event loop of the DAG scheduler.

  3. */

  4. def receive = {

  5. case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>

  6. dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,

  7. listener, properties) // 提交job,来自与RDD->SparkContext->DAGScheduler的消息。之所以在这需要在这里中转一下,是为了模块功能的一致性。

  8. case StageCancelled(stageId) => // 消息源org.apache.spark.ui.jobs.JobProgressTab,在GUI上显示一个SparkContext的Job的执行状态。

  9. // 用户可以cancel一个Stage,会通过SparkContext->DAGScheduler 传递到这里。

  10. dagScheduler.handleStageCancellation(stageId)

  11. case JobCancelled(jobId) => // 来自于org.apache.spark.scheduler.JobWaiter的消息。取消一个Job

  12. dagScheduler.handleJobCancellation(jobId)

  13. case JobGroupCancelled(groupId) => // 取消整个Job Group

  14. dagScheduler.handleJobGroupCancelled(groupId)

  15. case AllJobsCancelled => //取消所有Job

  16. dagScheduler.doCancelAllJobs()

  17. case ExecutorAdded(execId, host) =& gt; // TaskScheduler得到一个Executor被添加的消息。具体来自 org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers

  18. dagScheduler.handleExecutorAdded(execId, host)

  19. case ExecutorLost(execId) => //来自TaskScheduler

  20. dagScheduler.handleExecutorLost(execId)

  21. case BeginEvent(task, taskInfo) => // 来自TaskScheduler

  22. dagScheduler.handleBeginEvent(task, taskInfo)

  23. case GettingResultEvent(taskInfo) => //处理获得TaskResult信息的消息

  24. dagScheduler.handleGetTaskResult(taskInfo)

  25. case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => //来自TaskScheduler,报告task是完成或者失败

  26. dagScheduler.handleTaskCompletion(completion)

  27. case TaskSetFailed(taskSet, reason) => //来自TaskScheduler,要么TaskSet失败次数超过阈值或者由于Job Cancel。

  28. dagScheduler.handleTaskSetFailed(taskSet, reason)

  29. case ResubmitFailedStages => //当一个Stage处理失败时,重试。来自org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion

  30. dagScheduler.resubmitFailedStages()

  31. }

总 结一下org.apache.spark.scheduler.DAGSchedulerEventProcessActor的作用:可以把他理解成 DAGScheduler的对外的功能接口。它对外隐藏了自己内部实现的细节,也更易于理解其逻辑;也降低了维护成本,将DAGScheduler的比较 复杂功能接口化。

handleJobSubmitted

org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted 首先会根据RDD创建finalStage。finalStage,顾名思义,就是最后的那个Stage。然后创建job,最后提交。提交的job如果满 足一下条件,那么它将以本地模式运行:

1)spark.localExecution.enabled设置为true  并且 2)用户程序显式指定可以本地运行 并且 3)finalStage的没有父Stage 并且 4)仅有一个partition

3)和 4)的话主要为了任务可以快速执行;如果有多个stage或者多个partition的话,本地运行可能会因为本机的计算资源的问题而影响任务的计算速度。

要 理解什么是Stage,首先要搞明白什么是Task。Task是在集群上运行的基本单位。一个Task负责处理RDD的一个partition。RDD的 多个patition会分别由不同的Task去处理。当然了这些Task的处理逻辑完全是一致的。这一组Task就组成了一个Stage。有两种 Task:

  1. org.apache.spark.scheduler.ShuffleMapTask

  2. org.apache.spark.scheduler.ResultTask

ShuffleMapTask 根据Task的partitioner将计算结果放到不同的bucket中。而ResultTask将计算结果发送回Driver Application。一个Job包含了多个Stage,而Stage是由一组完全相同的Task组成的。最后的Stage包含了一组 ResultTask。

在用户触发了一个action后, 比如count,collect,SparkContext会通过runJob的函数开始进行任务提交。最后会通过DAG的event processor 传递到DAGScheduler本身的handleJobSubmitted,它首先会划分Stage,提交Stage,提交Task。至此,Task就 开始在运行在集群上了。

一个Stage的开始就是从外部存储或者shuffle结果中读取数据;一个Stage的结束就是由于发生shuffle或者生成结果时。

创建finalStage

handleJobSubmitted 通过调用newStage来创建finalStage:

[java] view plaincopy

  1. finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)

创 建一个result stage,或者说finalStage,是通过调用 org.apache.spark.scheduler.DAGScheduler#newStage完成的;而创建一个shuffle stage,需要通过调用org.apache.spark.scheduler.DAGScheduler#newOrUsedStage。

[java] view plaincopy

  1. private def newStage(

  2. rdd: RDD[_],

  3. numTasks: Int,

  4. shuffleDep: Option[ShuffleDependency[_, _, _]],

  5. jobId: Int,

  6. callSite: CallSite)

  7. : Stage =

  8. {

  9. val id = nextStageId.getAndIncrement()

  10. val stage =

  11. new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)

  12. stageIdToStage(id) = stage

  13. updateJobIdStageIdMaps(jobId, stage)

  14. stage

  15. }

对于result 的final stage来说,传入的shuffleDep是None。

我们知道,RDD通过org.apache.spark.rdd.RDD#getDependencies可以获得它依赖的parent RDD。而Stage也可能会有parent Stage。看一个RDD论文的Stage划分吧:

一个stage的边界,输入是外部的存储或者一个stage shuffle的结果;输入则是Job的结果(result task对应的stage)或者shuffle的结果。

上图的话stage3的输入则是RDD A和RDD F shuffle的结果。而A和F由于到B和G需要shuffle,因此需要划分到不同的stage。

从源码实现的角度来看,通过触发action也就是最后一个RDD创建final stage(上图的stage 3),我们注意到new Stage的第五个参数就是该Stage的parent Stage:通过rdd和job id获取:

[java] view plaincopy

  1. // 生成rdd的parent Stage。没遇到一个ShuffleDependency,就会生成一个Stage

  2. private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {

  3. val parents = new HashSet[Stage] //存储parent stage

  4. val visited = new HashSet[RDD[_]] //存储已经被访问到得RDD

  5. // We are manually maintaining a stack here to prevent StackOverflowError

  6. // caused by recursively visiting // 存储需要被处理的RDD。Stack中得RDD都需要被处理。

  7. val waitingForVisit = new Stack[RDD[_]]

  8. def visit(r: RDD[_]) {

  9. if (!visited(r)) {

  10. visited += r

  11. // Kind of ugly: need to register RDDs with the cache here since

  12. // we can't do it in its constructor because # of partitions is unknown

  13. for (dep <- r.dependencies) {

  14. dep match {

  15. case shufDep: ShuffleDependency[_, _, _] => // 在ShuffleDependency时需要生成新的stage

  16. parents += getShuffleMapStage(shufDep, jobId)

  17. case _ =>

  18. waitingForVisit.push(dep.rdd) //不是ShuffleDependency,那么就属于同一个Stage

  19. }

  20. }

  21. }

  22. }

  23. waitingForVisit.push(rdd) // 输入的rdd作为第一个需要处理的RDD。然后从该rdd开始,顺序访问其parent rdd

  24. while (!waitingForVisit.isEmpty) { //只要stack不为空,则一直处理。

  25. visit(waitingForVisit.pop()) //每次visit如果遇到了ShuffleDependency,那么就会形成一个Stage,否则这些RDD属于同一个Stage

  26. }

  27. parents.toList

  28. }

生成了finalStage后,就需要提交Stage了。

[java] view plaincopy

  1. // 提交Stage,如果有parent Stage没有提交,那么递归提交它。

  2. private def submitStage(stage: Stage) {

  3. val jobId = activeJobForStage(stage)

  4. if (jobId.isDefined) {

  5. logDebug("submitStage(" + stage + ")")

  6. // 如果当前stage不在等待其parent stage的返回,并且 不在运行的状态, 并且 没有已经失败(失败会有重试机制,不会通过这里再次提交)

  7. if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {

  8. val missing = getMissingParentStages(stage).sortBy(_.id)

  9. logDebug("missing: " + missing)

  10. if (missing == Nil) { // 如果所有的parent stage都已经完成,那么提交该stage所包含的task

  11. logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")

  12. submitMissingTasks(stage, jobId.get)

  13. } else {

  14. for (parent <- missing) { // 有parent stage为完成,则递归提交它

  15. submitStage(parent)

  16. }

  17. waitingStages += stage

  18. }

  19. }

  20. } else {

  21. abortStage(stage, "No active job for stage " + stage.id)

  22. }

  23. }

DAGScheduler将Stage划分完成后,提交实际上是通过把Stage转换为TaskSet,然后通过TaskScheduler将计算任务最终提交到集群。其所在的位置如下图所示。

接下来,将分析Stage是如何转换为TaskSet,并最终提交到Executor去运行的。

转载于:https://www.cnblogs.com/wzyxidian/p/4853618.html

Spark技术内幕:Stage划分及提交源码分析相关推荐

  1. Spark技术内幕: Task向Executor提交的源代码解析

    在上文<Spark技术内幕:Stage划分及提交源代码分析>中,我们分析了Stage的生成和提交.可是Stage的提交,仅仅是DAGScheduler完毕了对DAG的划分,生成了一个计算拓 ...

  2. 【华为云技术分享】Linux内核补丁源码分析(1)

    在上一期中,我们介绍了Linux内核编程环境,在这一期中,我们将通过实例来介绍如何分析Linux内核的补丁. 一.Linux内核补丁 在"Linux内核发展史"中,我们简要介绍了L ...

  3. spark shell 删除失效_Spark任务提交源码解析

    1. 前言 反反复复捣鼓了很久,终于开始学习Spark的源码了,果不其然,那真的很有趣.这里我打算一本正经的胡说八道来讲一下Spark作业的提交过程. 基础mac系统基础环境如下: JDK 1.8 I ...

  4. Mysql如何实现隔离级别 - 可重复读和读提交 源码分析

    Abstract 本文会(1) 演示Mysql的两种隔离级别.  (2) 跟着mysql的源代码来看看它是怎么实现这两种隔离级别的. Mysql的隔离级别 当有多个事务并发执行时, 我们需要考虑他们之 ...

  5. Mapreduce 任务提交源码分析1

    2019独角兽企业重金招聘Python工程师标准>>> 提交过程 一般我们mapreduce任务是通过如下命令进行提交的 $HADOOP_HOME/bin/hadoop jar $M ...

  6. spring源码分析之spring-core总结篇

    1.spring-core概览 spring-core是spring框架的基石,它为spring框架提供了基础的支持. spring-core从源码上看,分为6个package,分别是asm,cgli ...

  7. spark 源码分析之二十 -- Stage的提交

    引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...

  8. spark 源码分析之十九 -- DAG的生成和Stage的划分

    上篇文章 spark 源码分析之十八 -- Spark存储体系剖析 重点剖析了 Spark的存储体系.从本篇文章开始,剖析Spark作业的调度和计算体系. 在说DAG之前,先简单说一下RDD. 对RD ...

  9. 站长技术导航二开美化网站源码 网站提交自动秒收录

    今天发现一款还挺不错的导航网源码,分享给大家,源码无BUG,无任何后门,导入数据库,然后修改为自己的信息就可以了,数据库里面也有收录接近一百个网站了. 安装教程: PHP版本需要选择5.6才能运行程序 ...

最新文章

  1. C#/Net代码精简优化技巧(3)
  2. Python Inotify 监视LINUX文件系统事件
  3. ORACLE SQL:经典查询练手第一篇
  4. Android开发继承webview,WebView如何从当前的Android主题继承颜色?
  5. 教育|我在美国读博士才发现,美国高等教育如此残酷,以前的感觉完全是扯淡...
  6. SpringMVC工作总结001_SpringMVC拦截器(资源和权限管理)
  7. 用macport安装nginx
  8. pyhon基础(一)
  9. Angularjs的ng-repeat中去除重复的数据
  10. 学生计算机如何用数字小游戏,【数学思维】小学生数学趣味游戏:你身上的计算器...
  11. fat32文件系统详细介绍_文件系统介绍
  12. 51单片机-串行口通信实验
  13. java集成kettle 8.2 获取转换的执行日志与步骤度量
  14. JAVA拳皇jar_拳皇(Java简单的小程序)源码示例
  15. 基于opencv和pillow实现人脸识别系统(附demo)
  16. oracle通过DBlink连接神通数据库方法教程
  17. 利用Apple Developer申请苹果开发者账号(支付宝微信付款)
  18. ffmpeg的中文文档(二)
  19. Python 石墨烯边缘磁性Hatree_Fock计算
  20. 【历史上的今天】4 月 14 日:Ruby 之父诞生;GDPR 首次颁布;Lindows 更名为 Linspire

热门文章

  1. docker设置镜像源 树莓派_树莓派上 Docker 的安装和使用
  2. python小爬虫(爬取职位信息和博客文章信息)
  3. linux-shell命令之chgrp(change group)【更改群组】
  4. Sping boot系列--redis之2 -- RedisKeyValueTemplate处理Model对象
  5. 数据科学自动化_数据科学会自动化吗?
  6. mask rcnn实例分割_使用Mask-RCNN的实例分割
  7. 支付宝18年账单已出,你消费了多少钱?
  8. qt开发环境 - c++类
  9. python中的字体英文名_对python opencv 添加文字 cv2.putText 的各参数介绍
  10. java rc4_nodejs 和 java 进行 rc4 加密得到的结果不一样