DAG的生成

概述

DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。

窄依赖 指的是每一个父RDD的Partition最多被子RDD的一个Partition使用
宽依赖 指的是多个子RDD的Partition会依赖同一个父RDD的Partition

DAGScheduler调度队列

当我们看完Executor的创建与启动流程后,我们继续在SparkContext的构造方法中继续查看

class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {。。。。。。private[spark] def createSparkEnv(conf: SparkConf,isLocal: Boolean,listenerBus: LiveListenerBus): SparkEnv = {//通过SparkEnv来创建createDriverEnvSparkEnv.createDriverEnv(conf, isLocal, listenerBus)}//在这里调用了createSparkEnv,返回一个SparkEnv对象,这个对象里面有很多重要属性,最重要的ActorSystemprivate[spark] val env = createSparkEnv(conf, isLocal, listenerBus)SparkEnv.set(env)//创建taskScheduler// Create and start the schedulerprivate[spark] var (schedulerBackend, taskScheduler) =SparkContext.createTaskScheduler(this, master)//创建DAGSchedulerdagScheduler = new DAGScheduler(this)//启动TaksSchedulertaskScheduler.start()。。。。。
}

在构造方法中还创建了一个DAGScheduler对象,这个类的任务就是用来划分Stage任务的,构造方法中初始化了 private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
DAGSchedulerEventProcessLoop是一个事件总线对象,用来负责任务的分发,在构造方法eventProcessLoop.start()被调用,该方法是父类EventLoop的start

  def start(): Unit = {if (stopped.get) {throw new IllegalStateException(name + " has already been stopped")}// Call onStart before starting the event thread to make sure it happens before onReceiveonStart()eventThread.start()}

调用了eventThread的start方法,开启了一个线程

  private val eventThread = new Thread(name) {setDaemon(true)override def run(): Unit = {try {while (!stopped.get) {val event = eventQueue.take()try {onReceive(event)} catch {case NonFatal(e) => {try {onError(e)} catch {case NonFatal(e) => logError("Unexpected error in " + name, e)}}}}} catch {case ie: InterruptedException => // exit even if eventQueue is not emptycase NonFatal(e) => logError("Unexpected error in " + name, e)}}}

run方法中不断的从LinkedBlockingDeque阻塞队列中取消息,然后调用onReceive(event)方法,该方法是由子类DAGSchedulerEventProcessLoop实现的

  override def onReceive(event: DAGSchedulerEvent): Unit = event match {case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>//调用dagScheduler来出来提交任务dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,listener, properties)case StageCancelled(stageId) =>dagScheduler.handleStageCancellation(stageId)case JobCancelled(jobId) =>dagScheduler.handleJobCancellation(jobId)case JobGroupCancelled(groupId) =>dagScheduler.handleJobGroupCancelled(groupId)case AllJobsCancelled =>dagScheduler.doCancelAllJobs()case ExecutorAdded(execId, host) =>dagScheduler.handleExecutorAdded(execId, host)case ExecutorLost(execId) =>dagScheduler.handleExecutorLost(execId, fetchFailed = false)case BeginEvent(task, taskInfo) =>dagScheduler.handleBeginEvent(task, taskInfo)case GettingResultEvent(taskInfo) =>dagScheduler.handleGetTaskResult(taskInfo)case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>dagScheduler.handleTaskCompletion(completion)case TaskSetFailed(taskSet, reason) =>dagScheduler.handleTaskSetFailed(taskSet, reason)case ResubmitFailedStages =>dagScheduler.resubmitFailedStages()}

onReceive中会匹配到传入的任务类型,执行相应的逻辑。到此DAGScheduler的调度队列会一直挂起,不断轮询队列中的任务。

DAG提交Task任务流程

当RDD经过一系列的转换Transformation方法后,最终要执行Action动作方法,这里比如WordCount程序中最后调用collect()方法时会将数据提交到Master上运行,任务真正的被执行,这里的方法执行过程如下

  /*** Return an array that contains all of the elements in this RDD.*/def collect(): Array[T] = {val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)Array.concat(results: _*)}

sc 是SparkContext对象,这里调用 一个runJob该方法调用多次重载的方法后,该方法最终会调用 dagScheduler.runJob

  def runJob[T, U: ClassTag](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],allowLocal: Boolean,resultHandler: (Int, U) => Unit) {if (stopped) {throw new IllegalStateException("SparkContext has been shutdown")}val callSite = getCallSiteval cleanedFunc = clean(func)logInfo("Starting job: " + callSite.shortForm)if (conf.getBoolean("spark.logLineage", false)) {logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)}//dagScheduler出现了,可以切分stagedagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,resultHandler, localProperties.get)progressBar.foreach(_.finishAll())rdd.doCheckpoint()}

dagScheduler的runJob是我们比较关心的

 def runJob[T, U: ClassTag](。。。。。val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)waiter.awaitResult() match {case JobSucceeded => {logInfo("Job %d finished: %s, took %f s".format(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))}case JobFailed(exception: Exception) =>logInfo("Job %d failed: %s, took %f s".format(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))throw exception}}

这里面的我们主要看的是submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)提交任务

def submitJob[T, U](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: CallSite,allowLocal: Boolean,resultHandler: (Int, U) => Unit,properties: Properties): JobWaiter[U] = {。。。。。。//把job加入到任务队列里面eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))waiter}

这里比较关键的地方是 eventProcessLoop.post往任务队列中加入一个JobSubmitted类型的任务,eventProcessLoop是在构造方法中就初始化好的事件总线对象,内部有一个线程不断的轮询队列里的任务

轮询到任务后调用onReceive方法匹配任务类型,在这里我们提交的任务是JobSubmitted类型

    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>//调用dagScheduler来出来提交任务dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,listener, properties)

调用了handleJobSubmitted方法,接下来查看该方法

private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],allowLocal: Boolean,callSite: CallSite,listener: JobListener,properties: Properties) {var finalStage: Stage = nulltry {// New stage creation may throw an exception if, for example, jobs are run on a// HadoopRDD whose underlying HDFS files have been deleted.//最终的stagefinalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)} catch {case e: Exception =>logWarning("Creating new stage failed due to exception - job: " + jobId, e)listener.jobFailed(e)return}。。。。submitStage(finalStage)}

上面的代码中,调用了newStage进行任务的划分,该方法是划分任务的核心方法,划分任务的根据最后一个依赖关系作为开始,通过递归,将每个宽依赖做为切分Stage的依据,切分Stage的过程是流程中的一环,但在这里不详细阐述,当任务切分完毕后,代码继续执行来到submitStage(finalStage)这里开始进行任务提交
这里以递归的方式进行任务的提交

//递归的方式提交stageprivate def submitStage(stage: Stage) {val jobId = activeJobForStage(stage)if (jobId.isDefined) {logDebug("submitStage(" + stage + ")")if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {val missing = getMissingParentStages(stage).sortBy(_.id)logDebug("missing: " + missing)if (missing == Nil) {logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")//提交任务submitMissingTasks(stage, jobId.get)} else {for (parent <- missing) {submitStage(parent)}waitingStages += stage}}} else {abortStage(stage, "No active job for stage " + stage.id)}}

调用submitMissingTasks(stage, jobId.get)提交任务,将每一个Stage和jobId传入

  private def submitMissingTasks(stage: Stage, jobId: Int) {。。。。。if (tasks.size > 0) {logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")stage.pendingTasks ++= taskslogDebug("New pending tasks: " + stage.pendingTasks)//taskScheduler提交tasktaskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))stage.latestInfo.submissionTime = Some(clock.getTimeMillis())} else {// Because we posted SparkListenerStageSubmitted earlier, we should mark// the stage as completed here in case there are no tasks to runmarkStageAsFinished(stage, None)logDebug("Stage " + stage + " is actually done; %b %d %d".format(stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))}}

这里的代码我们需要关注的是`taskScheduler.submitTasks(

    new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))`

创建了一个TaskSet对象,将所有任务的信息封装,包括task任务列表,stageId,任务id,分区数参数等

Task任务调度

override def submitTasks(taskSet: TaskSet) {val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")this.synchronized {//创建TaskSetManager保存了taskSet任务列表val manager = createTaskSetManager(taskSet, maxTaskFailures)activeTaskSets(taskSet.id) = manager//将任务加入调度池schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)if (!isLocal && !hasReceivedTask) {starvationTimer.scheduleAtFixedRate(new TimerTask() {override def run() {if (!hasLaunchedTask) {logWarning("Initial job has not accepted any resources; " +"check your cluster UI to ensure that workers are registered " +"and have sufficient resources")} else {this.cancel()}}}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)}hasReceivedTask = true}//接受任务backend.reviveOffers()}

该方法比较重要,主要将任务加入调度池,最后调用了backend.reviveOffers()这里的backend是CoarseGrainedSchedulerBackend一个Executor任务调度对象

  override def reviveOffers() {//自己给自己发消息driverActor ! ReviveOffers}

这里用了内部的DriverActor对象发送了一个内部消息给自己,接下来查看receiver方法接受的消息

      case ReviveOffers =>makeOffers()

收到消息后调用了 makeOffers()方法

    def makeOffers() {launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toSeq))}

makeOffers方法中,将Executor的信息集合与调度池中的Tasks封装成WokerOffers列表传给了
launchTasks

    def launchTasks(tasks: Seq[Seq[TaskDescription]]) {for (task <- tasks.flatten) {。。。。。。//把task序列化val serializedTask = ser.serialize(task)。。。。。val executorData = executorDataMap(task.executorId)executorData.freeCores -= scheduler.CPUS_PER_TASK//把序列化好的task发送给ExecutorexecutorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))}}}

launchTasks方法将遍历Tasks集合,每个Task任务序列化,发送启动Task执行消息的给Executor
Executor的onReceive方法

  //DriverActor发送给Executor的启动Task的消息case LaunchTask(data) =>if (executor == null) {logError("Received LaunchTask command but executor was null")System.exit(1)} else {val ser = env.closureSerializer.newInstance()//把Task反序列化val taskDesc = ser.deserialize[TaskDescription](data.value)logInfo("Got assigned task " + taskDesc.taskId)//启动taskexecutor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,taskDesc.name, taskDesc.serializedTask)}

Executor收到DriverActor发送的启动Task的消息,这里才开始真正执行任务了,将收到的Task序列化信息反序列化,调用ExecutorlaunchTask方法执行任务

  def launchTask(context: ExecutorBackend,taskId: Long,attemptNumber: Int,taskName: String,serializedTask: ByteBuffer) {//把task的描述信息放到了一份TaskRunnerval tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,serializedTask)runningTasks.put(taskId, tr)//然后把TaskRunner丢到线程池里面threadPool.execute(tr)}

launchTask内将Task提交到线程池去运行,TaskRunner是Runnable对象,里面的run方法执行了我们app生成的每一个RDD的链上的逻辑。

spark源码分析之任务调度篇相关推荐

  1. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  2. hadoop作业初始化过程详解(源码分析第三篇)

    (一)概述 我们在上一篇blog已经详细的分析了一个作业从用户输入提交命令到到达JobTracker之前的各个过程.在作业到达JobTracker之后初始化之前,JobTracker会通过submit ...

  3. Spark 源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 一. 启动篇 (一) 引子 在spark-shell终端执行 val arr = Array(1,2,3,4) val rdd ...

  4. spark 源码分析之十八 -- Spark存储体系剖析

    本篇文章主要剖析BlockManager相关的类以及总结Spark底层存储体系. 总述 先看 BlockManager相关类之间的关系如下: 我们从NettyRpcEnv 开始,做一下简单说明. Ne ...

  5. spark 源码分析之二十 -- Stage的提交

    引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...

  6. spark 源码分析之十九 -- DAG的生成和Stage的划分

    上篇文章 spark 源码分析之十八 -- Spark存储体系剖析 重点剖析了 Spark的存储体系.从本篇文章开始,剖析Spark作业的调度和计算体系. 在说DAG之前,先简单说一下RDD. 对RD ...

  7. Spark源码分析:多种部署方式之间的区别与联系

    作者:过往记忆 从官方的文档我们可以知道, Spark 的部署方式有很多种:local.Standalone.Mesos.YARN-..不同部署方式的后台处理进程是不一样的,但是如果我们从代码的角度来 ...

  8. spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClientFactory剖析

    spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClientFactory剖析 TransportContext 首先官方文档对Transpor ...

  9. Spark源码分析之九:内存管理模型

    Spark是现在很流行的一个基于内存的分布式计算框架,既然是基于内存,那么自然而然的,内存的管理就是Spark存储管理的重中之重了.那么,Spark究竟采用什么样的内存管理模型呢?本文就为大家揭开Sp ...

最新文章

  1. 13.最为经典的动态规划入门
  2. 快手团队长文解读:基于FPGA加速的自动语音识别在大规模直播和短视频场景的应用...
  3. 【数据分析实例】6000 条倒闭企业数据分析
  4. NYOJ_269_VF
  5. 【转】Burp Suite详细使用教程-Intruder模块详解
  6. 网络流量统计using ADB
  7. c#窗体面板求和与平均值
  8. [LibTorch] 指定参数不进行学习
  9. 面试题-java基本数据类型和运算符
  10. Centos6.X 安装MongoDb
  11. CMOS checksum error-Defaults loaded 故障解决办法
  12. java学习--自定义类的实例的大小比较和排序
  13. Exchange 2016 之删除与恢复用户邮箱
  14. redis 主从原理
  15. 服务器硬盘安装win10系统,硬盘安装win10的方法
  16. php解析.krc,krc歌词文件解析
  17. 网卡5790c linux驱动,富士通DPK5790H驱动
  18. 有了这四个网站 再也不用花钱啦
  19. 使用数字滤波器处理音频噪声(附Matlab程序)
  20. RSA加解密的OAEP MGF1 填充解析

热门文章

  1. java list填入table_JavaFX从ObservableList填充TableView
  2. Fiber 数据结构是怎样的?
  3. html实现点赞效果,js实现点赞效果
  4. 乔治亚理工学院计算机专业,乔治亚理工学院
  5. Vue-cli4 配置别名
  6. Linux下hba卡驱动的卸载,SLES11下如何重装qlogic FC HBA卡驱动
  7. java 金数据推送数据_基于JAVA的黄金数据接口调用代码实例
  8. 竖流式沉淀池集水槽设计计算_竖流沉淀池计算书
  9. disruptor小结1--优势
  10. 高速软件加密锁技术的发展历程