Task执行成功时的结果处理

在上一节中,给出了Task在Executor上的运行代码演示,我们知道代码的最终运行通过的是TaskRunner方法

class TaskRunner(execBackend: ExecutorBackend,val taskId: Long,val attemptNumber: Int,taskName: String,serializedTask: ByteBuffer)extends Runnable {//其它无关代码省略//向Driver端发状态更新execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)//其它非关键代码省略//执行完成后,通知Driver端进行状态更新execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)} catch {//出错时,通知Driver端的状态更新//代码省略}

状态更新时,先调用的是CoarseGrainedExecutorBackend中的statusUpdate方法

  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {val msg = StatusUpdate(executorId, taskId, state, data)driver match {//将Driver端发送StatusUpdate消息case Some(driverRef) => driverRef.send(msg)case None => logWarning(s"Drop $msg because has not yet connected to driver")}}
}

DriverEndpoint中的receive方法接收并处理发送过来的StatusUpdate消息,具体源码如下:

 override def receive: PartialFunction[Any, Unit] = {//接收StatusUpdate发送过来的消息case StatusUpdate(executorId, taskId, state, data) =>//调用TaskSchedulerImpl中的statusUpdate方法scheduler.statusUpdate(taskId, state, data.value)//if (TaskState.isFinished(state)) {executorDataMap.get(executorId) match {case Some(executorInfo) =>executorInfo.freeCores += scheduler.CPUS_PER_TASKmakeOffers(executorId)case None =>// Ignoring the update since we don't know about the executor.logWarning(s"Ignored task status update ($taskId state $state) " +s"from unknown executor with ID $executorId")}}case ReviveOffers =>makeOffers()case KillTask(taskId, executorId, interruptThread) =>executorDataMap.get(executorId) match {case Some(executorInfo) =>executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread))case None =>// Ignoring the task kill since the executor is not registered.logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")}}

TaskSchedulerImpl中的statusUpdate方法源码如下:

 def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {var failedExecutor: Option[String] = Nonesynchronized {try {if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) { // We lost this entire executor, so remember that it's goneval execId = taskIdToExecutorId(tid)if (activeExecutorIds.contains(execId)) {removeExecutor(execId)failedExecutor = Some(execId)}}taskIdToTaskSetManager.get(tid) match {case Some(taskSet) =>if (TaskState.isFinished(state)) {taskIdToTaskSetManager.remove(tid)taskIdToExecutorId.remove(tid)} //任务执行成功时的处理if (state == TaskState.FINISHED) {taskSet.removeRunningTask(tid) //taskResultGetter为线程池,处理执行成功的情况taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) //任务执行不成功,包括任务执行失败、任务丢失及任务被杀死} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {taskSet.removeRunningTask(tid) //处理任务执行失败的情况taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)}case None =>logError(("Ignoring update with state %s for TID %s because its task set is gone (this is " +"likely the result of receiving duplicate task finished status updates)").format(state, tid))}} catch {case e: Exception => logError("Exception in statusUpdate", e)}} // Update the DAGScheduler without holding a lock on this, since that can deadlockif (failedExecutor.isDefined) {dagScheduler.executorLost(failedExecutor.get)backend.reviveOffers()}}

对于Task执行成功的情况,它会调用TaskResultGetter的enqueueSuccessfulTask方法进行处理:

 def enqueueSuccessfulTask(taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {getTaskResultExecutor.execute(new Runnable {override def run(): Unit = Utils.logUncaughtExceptions {try {val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {//结果为最终的计算结果case directResult: DirectTaskResult[_] =>if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {return}// deserialize "value" without holding any lock so that it won't block other threads.// We should call it here, so that when it's called again in// "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.directResult.value()(directResult, serializedData.limit())//结果保存在远程Worker节点的BlockManager当中case IndirectTaskResult(blockId, size) =>if (!taskSetManager.canFetchMoreResults(size)) {// dropped by executor if size is larger than maxResultSizesparkEnv.blockManager.master.removeBlock(blockId)return}logDebug("Fetching indirect task result for TID %s".format(tid))scheduler.handleTaskGettingResult(taskSetManager, tid)//从远程Worker获取结果val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)if (!serializedTaskResult.isDefined) {/* We won't be able to get the task result if the machine that ran the task failed* between when the task ended and when we tried to fetch the result, or if the* block manager had to flush the result. *///获取结果时,如果远程Eexecutor对应的机器出现故障或其它错误时,可能导致结果获取失败scheduler.handleFailedTask(taskSetManager, tid, TaskState.FINISHED, TaskResultLost)return}//反序列化远程获取的结果val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](serializedTaskResult.get)//删除远程结果sparkEnv.blockManager.master.removeBlock(blockId)(deserializedResult, size)}result.metrics.setResultSize(size)//TaskSchedulerImpl处理获取到的结果scheduler.handleSuccessfulTask(taskSetManager, tid, result)} catch {case cnf: ClassNotFoundException =>val loader = Thread.currentThread.getContextClassLoadertaskSetManager.abort("ClassNotFound with classloader: " + loader)// Matching NonFatal so we don't catch the ControlThrowable from the "return" above.case NonFatal(ex) =>logError("Exception while getting task result", ex)taskSetManager.abort("Exception while getting task result: %s".format(ex))}}})}

TaskSchedulerImpl中的handleSuccessfulTask方法将最终对计算结果进行处理,具有源码如下:

def handleSuccessfulTask(taskSetManager: TaskSetManager,tid: Long,taskResult: DirectTaskResult[_]): Unit = synchronized {//调用TaskSetManager.handleSuccessfulTask方法进行处理taskSetManager.handleSuccessfulTask(tid, taskResult)}

TaskSetManager.handleSuccessfulTask方法源码如下:

/*** Marks the task as successful and notifies the DAGScheduler that a task has ended.*/def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {val info = taskInfos(tid)val index = info.indexinfo.markSuccessful()removeRunningTask(tid)// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not// "deserialize" the value when holding a lock to avoid blocking other threads. So we call// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.// Note: "result.value()" only deserializes the value when it's called at the first time, so// here "result.value()" just returns the value and won't block other threads.//调用DagScheduler的taskEnded方法sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)if (!successful(index)) {tasksSuccessful += 1logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks))// Mark successful and stop if all the tasks have succeeded.successful(index) = trueif (tasksSuccessful == numTasks) {isZombie = true}} else {logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +" because task " + index + " has already completed successfully")}failedExecutors.remove(index)maybeFinishTaskSet()}

进入DAGScheduler的taskEnded方法

//DAGScheduler中的taskEnded方法
/*** Called by the TaskSetManager to report task completions or failures.*/def taskEnded(task: Task[_],reason: TaskEndReason,result: Any,accumUpdates: Map[Long, Any],taskInfo: TaskInfo,taskMetrics: TaskMetrics): Unit = {//调用DAGSchedulerEventProcessLoop的post方法将CompletionEvent提交到事件队列中,交由eventThread进行处理,onReceive方法将处理该事件eventProcessLoop.post(CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))}

跳转到onReceive方法当中,可以看到其调用的是onReceive

//DAGSchedulerEventProcessLoop中的onReceive方法
/*** The main event loop of the DAG scheduler.*/override def onReceive(event: DAGSchedulerEvent): Unit = {val timerContext = timer.time()try {doOnReceive(event)} finally {timerContext.stop()}}

跳转到doOnReceive方法到当中,可以看到

//DAGSchedulerEventProcessLoop中的doOnReceive方法
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)case StageCancelled(stageId) =>dagScheduler.handleStageCancellation(stageId)case JobCancelled(jobId) =>dagScheduler.handleJobCancellation(jobId)case JobGroupCancelled(groupId) =>dagScheduler.handleJobGroupCancelled(groupId)case AllJobsCancelled =>dagScheduler.doCancelAllJobs()case ExecutorAdded(execId, host) =>dagScheduler.handleExecutorAdded(execId, host)case ExecutorLost(execId) =>dagScheduler.handleExecutorLost(execId, fetchFailed = false)case BeginEvent(task, taskInfo) =>dagScheduler.handleBeginEvent(task, taskInfo)case GettingResultEvent(taskInfo) =>dagScheduler.handleGetTaskResult(taskInfo)//处理CompletionEvent事件case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>//交由DAGScheduler.handleTaskCompletion方法处理dagScheduler.handleTaskCompletion(completion)case TaskSetFailed(taskSet, reason, exception) =>dagScheduler.handleTaskSetFailed(taskSet, reason, exception)case ResubmitFailedStages =>dagScheduler.resubmitFailedStages()}

DAGScheduler.handleTaskCompletion方法完成计算结果的处理

/*** Responds to a task finishing. This is called inside the event loop so it assumes that it can* modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.*/private[scheduler] def handleTaskCompletion(event: CompletionEvent) {val task = event.taskval stageId = task.stageIdval taskType = Utils.getFormattedClassName(task)outputCommitCoordinator.taskCompleted(stageId, task.partitionId,event.taskInfo.attempt, event.reason)// The success case is dealt with separately below, since we need to compute accumulator// updates before posting.if (event.reason != Success) {val attemptId = task.stageAttemptIdlistenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, event.reason,event.taskInfo, event.taskMetrics))}if (!stageIdToStage.contains(task.stageId)) {// Skip all the actions if the stage has been cancelled.return}val stage = stageIdToStage(task.stageId)event.reason match {case Success =>listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,event.reason, event.taskInfo, event.taskMetrics))stage.pendingTasks -= tasktask match {//处理ResultTaskcase rt: ResultTask[_, _] =>// Cast to ResultStage here because it's part of the ResultTask// TODO Refactor this out to a function that accepts a ResultStageval resultStage = stage.asInstanceOf[ResultStage]resultStage.resultOfJob match {case Some(job) =>if (!job.finished(rt.outputId)) {updateAccumulators(event)job.finished(rt.outputId) = truejob.numFinished += 1// If the whole job has finished, remove it//判断job是否已处理完毕,即所有Task是否处理完毕if (job.numFinished == job.numPartitions) {markStageAsFinished(resultStage)cleanupStateForJobAndIndependentStages(job)listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))}// taskSucceeded runs some user code that might throw an exception. Make sure// we are resilient against that.//通知JobWaiter,job处理完毕try {job.listener.taskSucceeded(rt.outputId, event.result)} catch {case e: Exception =>// TODO: Perhaps we want to mark the resultStage as failed?job.listener.jobFailed(new SparkDriverExecutionException(e))}}case None =>logInfo("Ignoring result from " + rt + " because its job has finished")}//处理ShuffleMapTaskcase smt: ShuffleMapTask =>val shuffleStage = stage.asInstanceOf[ShuffleMapStage]updateAccumulators(event)val status = event.result.asInstanceOf[MapStatus]val execId = status.location.executorIdlogDebug("ShuffleMapTask finished on " + execId)if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")} else {//结果保存到ShuffleMapStageshuffleStage.addOutputLoc(smt.partitionId, status)}if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) {markStageAsFinished(shuffleStage)logInfo("looking for newly runnable stages")logInfo("running: " + runningStages)logInfo("waiting: " + waitingStages)logInfo("failed: " + failedStages)// We supply true to increment the epoch number here in case this is a// recomputation of the map outputs. In that case, some nodes may have cached// locations with holes (from when we detected the error) and will need the// epoch incremented to refetch them.// TODO: Only increment the epoch number if this is not the first time//       we registered these map outputs.mapOutputTracker.registerMapOutputs(shuffleStage.shuffleDep.shuffleId,shuffleStage.outputLocs.map(list => if (list.isEmpty) null else list.head),changeEpoch = true)clearCacheLocs()//处理部分Task失败的情况if (shuffleStage.outputLocs.contains(Nil)) {// Some tasks had failed; let's resubmit this shuffleStage// TODO: Lower-level scheduler should also deal with thislogInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +") because some of its tasks had failed: " +shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty).map(_._2).mkString(", "))//重新提交submitStage(shuffleStage)} else {//处理其它未提交的Stageval newlyRunnable = new ArrayBuffer[Stage]for (shuffleStage <- waitingStages) {logInfo("Missing parents for " + shuffleStage + ": " +getMissingParentStages(shuffleStage))}for (shuffleStage <- waitingStages if getMissingParentStages(shuffleStage).isEmpty){newlyRunnable += shuffleStage}waitingStages --= newlyRunnablerunningStages ++= newlyRunnablefor {shuffleStage <- newlyRunnable.sortBy(_.id)jobId <- activeJobForStage(shuffleStage)} {logInfo("Submitting " + shuffleStage + " (" +shuffleStage.rdd + "), which is now runnable")submitMissingTasks(shuffleStage, jobId)}}}}//其它代码省略}

执行流程:
1. org.apache.spark.executor.TaskRunner.statusUpdate方法
2. org.apache.spark.executor.CoarseGrainedExecutorBackend.statusUpdate方法
3. org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#DriverEndpoint.recieve方法,DriverEndPoint是内部类
4. org.apache.spark.scheduler.TaskSchedulerImpl中的statusUpdate方法
5. org.apache.spark.scheduler.TaskResultGetter.enqueueSuccessfulTask方法
6. org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion方法

Spark修炼之道(高级篇)——Spark源码阅读:第九节 Task执行成功时的结果处理...相关推荐

  1. Linux驱动修炼之道-SPI驱动框架源码分析(上)

    Linux驱动修炼之道-SPI驱动框架源码分析(上)   SPI协议是一种同步的串行数据连接标准,由摩托罗拉公司命名,可工作于全双工模式.相关通讯设备可工作于m/s模式.主设备发起数据帧,允许多个从设 ...

  2. 命令构建gradle项目_【Android 修炼手册】Gradle 篇 -- Gradle 源码分析

    预备知识 理解 gradle 的基本开发 了解 gradle task 和 plugin 使用及开发 了解 android gradle plugin 的使用 看完本文可以达到什么程度 了解 grad ...

  3. 【Android 修炼手册】Gradle 篇 -- Gradle 源码分析

    预备知识 理解 gradle 的基本开发 了解 gradle task 和 plugin 使用及开发 了解 android gradle plugin 的使用 看完本文可以达到什么程度 了解 grad ...

  4. 【源码篇】源码阅读集合

    怎么读达到什么目的 Spring源码 Redis源码 JDK源码 集合源码

  5. ClickHouse源码阅读(0000 0110) —— 使用ReplicatedMergeTree引擎时的副本选择问题

    在使用ReplicatedMergeTree引擎和Distributed引擎的时候,对于同一张表,服务器上存在多个副本,在查询数据的时候,是如何在这些副本之间进行选择的呢?结合源码来试着分析一下... ...

  6. springmvc源码阅读3--dispatcherServlet reqeust的执行流程

    一.前言 太多的时候总是会遇到群里面问,报404怎么肥事呀,前台的参数怎么后台收不到呀--,上次就在群里面遇到过,围绕这一个点:input的name值是不是错了呀,人家妹子都截了好几次图说没有问题,然 ...

  7. Linux驱动修炼之道-SPI驱动框架源码分析(中)

    来自:http://blog.csdn.NET/woshixingaaa/article/details/6574220 这篇来分析spi子系统的建立过程. 嵌入式微处理器访问SPI设备有两种方式:使 ...

  8. 大神手把手教源码阅读的方法、误区以及三种境界

    丁威 中间件兴趣圈 读完需要 1 分钟 速读仅需 1 分钟 在技术职场中普遍存在如下几种现象: 对待工作中所使用的技术不需要阅读源码,只需在开发过程中能够熟练运用就行 看源码太费时间,而且容易忘记,如 ...

  9. Flume-NG源码阅读之SourceRunner,及选择器selector和拦截器interceptor的执行

    在AbstractConfigurationProvider类中loadSources方法会将所有的source进行封装成SourceRunner放到了Map<String, SourceRun ...

最新文章

  1. 第八周项目实践2 建立连串算法库
  2. C++下关于XML开源库的使用
  3. 学习编程的25个“坑”,你踩到了吗?
  4. QTP自动化测试框架的基础知识
  5. ASP.NET入门五步详解
  6. Android布局怎么画图形,Android开发者的图形化布局
  7. python文件操作以及相对路径和绝对路径问题
  8. 计算机启用时间 查找方式,电脑实用知识技巧 篇六:不需要第三方软件,这种方法查看系统启动时间...
  9. 基于javaweb的公交查询系统的设计与实现(含源文件)
  10. 面试题 04.06. 后继者
  11. 可以扦插的花有哪些?
  12. 2020下半年软考中级网工答案
  13. 谷歌企业邮箱:应用专用密码
  14. Badboy安装教程(含下载地址)
  15. XXL-JOB任务调度
  16. Android 扫码盒子全局接收付款码
  17. 学习《可复制的领导力》有感
  18. 【分享】请回答1988(一)
  19. Android读写日历,android – 读写日历
  20. 注册苹果开发者,登录后提示Need assistance with accessing your developer account?解决过程

热门文章

  1. 如何简单利用git_stats脚本统计项目的代码量(以及win平台使用时的错误排除)...
  2. ORACLE sid,pid,spid总结
  3. 运维工程师必须掌握的技巧
  4. 浏览器中打开IOS应用并传参
  5. 企业级备份方案系列PART3:SCDPM 2012备份/恢复Exchange2010
  6. 当网络安全遇上大数据分析(6)
  7. TypeScript--函数
  8. jmeter压力性能测试-多台机器并发请求
  9. spring security reactive获取security context
  10. iOS 分组索引和索引分区