spark stage 划分 源码
Spark在任务提交后首先会在DAGScheduler中根据任务划分为不同的stage,起点在DAGScheduler的handleJobSubmitted()方法中。
private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],callSite: CallSite,listener: JobListener,properties: Properties) {var finalStage: ResultStage = nulltry {// New stage creation may throw an exception if, for example, jobs are run on a// HadoopRDD whose underlying HDFS files have been deleted.finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
这里的finalRDD是action算子之前的最后一个transform算子。
这里会以逻辑上的最后一个transform算子开始从后往前重新根据宽窄依赖构建stage,其中,最末端的stage称为finalStage,获得其的createResultStage()也是整个dag开始构造的起点。
private def createResultStage(rdd: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],jobId: Int,callSite: CallSite): ResultStage = {checkBarrierStageWithDynamicAllocation(rdd)checkBarrierStageWithNumSlots(rdd)checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)val parents = getOrCreateParentStages(rdd, jobId)val id = nextStageId.getAndIncrement()val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)stageIdToStage(id) = stageupdateJobIdStageIdMaps(jobId, stage)stage
}
在createResultStage(),首先通过getOrCreateParentStages()方法不断从前置算子构造stage。
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {getShuffleDependencies(rdd).map { shuffleDep =>getOrCreateShuffleMapStage(shuffleDep, firstJobId)}.toList
}
在整个dag的流程中,在于区分宽窄依赖,窄依赖联系的上下游rdd可作为同一个stage,而宽依赖之间的上下游rdd就需要区分为不同的stage。
在首次进入getOrCreateParentStages()方法中,作为参数的是最后的算子,首先会通过getShuffleDependencies()方法获取所有的shuffle依赖,也就是获取所有宽依赖,在这个方法,实则在归纳宽依赖的同时归并所有遇到宽依赖之前的窄依赖。
private[scheduler] def getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {val parents = new HashSet[ShuffleDependency[_, _, _]]val visited = new HashSet[RDD[_]]val waitingForVisit = new ArrayStack[RDD[_]]waitingForVisit.push(rdd)while (waitingForVisit.nonEmpty) {val toVisit = waitingForVisit.pop()if (!visited(toVisit)) {visited += toVisittoVisit.dependencies.foreach {case shuffleDep: ShuffleDependency[_, _, _] =>parents += shuffleDepcase dependency =>waitingForVisit.push(dependency.rdd)}}}parents
}
以逻辑最末的算子为起点,此处存在两个set分别用来存放已经扫描过的算子和得到的宽依赖,而需要扫描的算子都会存放到堆栈中依次弹出对其上游依赖进行分析。
此处,如果其依次往上游不断检查依赖关系,直到全部到达宽依赖或者扫描完毕,得到的宽依赖集合返回,作为区分stage的依据。
回到getOrCreateParentStages(),得到的宽依赖算子会依次通过getOrCreateShuffleMapStage()中的createShuffleMapStage()方法来构造stage。
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {val rdd = shuffleDep.rddcheckBarrierStageWithDynamicAllocation(rdd)checkBarrierStageWithNumSlots(rdd)checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)val numTasks = rdd.partitions.lengthval parents = getOrCreateParentStages(rdd, jobId)val id = nextStageId.getAndIncrement()val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)stageIdToStage(id) = stageshuffleIdToMapStage(shuffleDep.shuffleId) = stageupdateJobIdStageIdMaps(jobId, stage)if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {// Kind of ugly: need to register RDDs with the cache and map output tracker here// since we can't do it in the RDD constructor because # of partitions is unknownlogInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)}stage
}
具体的构造和一开始的resultStage一样,还是通过getOrCreateParentStages()方法不断重复上述的过程首先构造上游的stage,再构造当前的stage。
按照上述的流程,将会根据宽依赖的情况,从上游开始不断生成stage直到一开始的最末尾的算子生成finalStage结束。
以下面这段代码为例子。
def main(args: Array[String]) {val spark = SparkSession.builder.appName("GroupBy Test").getOrCreate()val numMappers = if (args.length > 0) args(0).toInt else 2val numKVPairs = if (args.length > 1) args(1).toInt else 1000val valSize = if (args.length > 2) args(2).toInt else 1000val numReducers = if (args.length > 3) args(3).toInt else numMappersval pairs1 = spark.sparkContext.parallelize(0 until numMappers, numMappers).flatMap { p =>val ranGen = new Randomval arr1 = new Array[(Int, Array[Byte])](numKVPairs)for (i <- 0 until numKVPairs) {val byteArr = new Array[Byte](valSize)ranGen.nextBytes(byteArr)arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)}arr1}println(pairs1.groupByKey(numReducers).count())spark.stop()
}
其分为三个transform算子,从逻辑起始开始分别为parallel,map,groupByKey,其中map到groupByKey为宽依赖,经过dag的划分,将会被分为两个stage,finalStage只有一个shuffle算子,而其父stage将会有两个算子分别为map和parallel,符合上述的dag划分的流程。
在完成stage的划分之后,就会将最后得到的finalStage提交,准备交给taskScheduler进行调度。
private def submitStage(stage: Stage) {val jobId = activeJobForStage(stage)if (jobId.isDefined) {logDebug("submitStage(" + stage + ")")if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {val missing = getMissingParentStages(stage).sortBy(_.id)logDebug("missing: " + missing)if (missing.isEmpty) {logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")submitMissingTasks(stage, jobId.get)} else {for (parent <- missing) {submitStage(parent)}waitingStages += stage}}} else {abortStage(stage, "No active job for stage " + stage.id, None)}
}
在作为参数的finalStage进入之后将会不断得到其父stage进行递归提交,直到到最上端的stage,而子stage只会在其父stage全部提交完才会进行提交。
提交会通过submitMissingTasks()方法。
val tasks: Seq[Task[_]] = try {val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()stage match {case stage: ShuffleMapStage =>stage.pendingPartitions.clear()partitionsToCompute.map { id =>val locs = taskIdToLocations(id)val part = partitions(id)stage.pendingPartitions += idnew ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())}case stage: ResultStage =>partitionsToCompute.map { id =>val p: Int = stage.partitions(id)val part = partitions(p)val locs = taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptNumber,taskBinary, part, locs, id, properties, serializedTaskMetrics,Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,stage.rdd.isBarrier())}}
} catch {case NonFatal(e) =>abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))runningStages -= stagereturn
}if (tasks.size > 0) {logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
}
在这里会根据分区数量将stage转为对应数量的task,之后将其作为taskSet交由taskScheduler进行调度。
spark stage 划分 源码相关推荐
- spark任务运行源码
spark任务运行源码 spark是一个分布式计算引擎,任务的运行是计算引擎的核心. 一个spark任务怎么能运行起来呢? 1 spark服务启动(Master,Worker): 2 应用程序提交 3 ...
- Spark ALS recommendForAll源码解析实战之Spark1.x vs Spark2.x
文章目录 Spark ALS recommendForAll源码解析实战 1. 软件版本: 2. 本文要解决的问题 3. 源码分析实战 3.1 Spark2.2.2 ALS recommendForA ...
- 第25课 Spark Hash Shuffle源码解读与剖析
第25课: Spark Hash Shuffle源码解读与剖析 Spark 2.1x 现在的版本已经没有Hash Shuffle的方式,那为什么我们还要讲解HashShuffle源码的内容呢?原因有3 ...
- Spark RPC框架源码分析(二)RPC运行时序
前情提要: Spark RPC框架源码分析(一)简述 一. Spark RPC概述 上一篇我们已经说明了Spark RPC框架的一个简单例子,Spark RPC相关的两个编程模型,Actor模型和Re ...
- Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法
Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadO ...
- 《Spark商业案例与性能调优实战100课》第25课:Spark Hash Shuffle源码解读与剖析
<Spark商业案例与性能调优实战100课>第25课:Spark Hash Shuffle源码解读与剖析
- Spark的stage划分算法源码分析
Spark Application中可以有不同的Action触发多个Job,也就是说一个Application中可以有很多的Job,每个Job是由一个或者多个Stage构成的,后面的Stage依赖于前 ...
- Spark存储机制源码剖析
一.Shuffle结果的写入和读取 通过之前的文章Spark源码解读之Shuffle原理剖析与源码分析我们知道,一个Shuffle操作被DAGScheduler划分为两个stage,第一个stage是 ...
- spark读取文件源码分析-3
本篇是spark read一个parquet源码分析的第三篇,这一篇主要介绍spark的默认的partition的设置逻辑,当然,这一篇实际上算不上源码分析了 第一篇 第二篇 1 . userProf ...
最新文章
- zend studio调试
- Perl学习笔记(十)--通过DBI访问数据库
- Leet Code OJ 125. Valid Palindrome [Difficulty: Easy]
- p7zip的解压和压缩
- python 获取li的内容_Python开发案例:爬取四川省统计局数据Matplotlib绘图
- dubbo源码-服务发现
- java parseint(12.0)_java的parseint
- HyperLedger Composer 如何安装、小白入门教程
- 用计算机为题目写作400字,电脑课作文400字
- 网上怎么下载ug软件ug怎样下载安装ug安装包免费领取
- 4999以内阿拉伯数字转罗马字符
- c++三子棋游戏程序
- 个人微信开发api文档
- CXK, 出来打球!
- shuipFCMS收集2
- MySQL 报错InnoDB: Cannot allocate memory for the buffer poo处理方法
- IP Forwarding打开
- Mysql插入数据 Incorrect string value: '\xF0\x9F\x98\x84
- java与软件测试哪个好?
- 12864图片显示操作