spark task 任务状态管理

spark task 的任务状态经常进行更新,当任务完成后,这个任务是怎么取得结果的呢,看下面的代码流程

def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
// 进行状态的更新 了
synchronized {try {if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {// We lost this entire executor, so remember that it's goneval execId = taskIdToExecutorId(tid)if (executorIdToTaskCount.contains(execId)) {// 删除该进程相关removeExecutor(execId,SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))failedExecutor = Some(execId)}}taskIdToTaskSetManager.get(tid) match {case Some(taskSet) =>if (TaskState.isFinished(state)) {taskIdToTaskSetManager.remove(tid)taskIdToExecutorId.remove(tid).foreach { execId =>if (executorIdToTaskCount.contains(execId)) {executorIdToTaskCount(execId) -= 1}}}if (state == TaskState.FINISHED) {taskSet.removeRunningTask(tid)// 这个任务成功了taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {taskSet.removeRunningTask(tid)taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)}

可以看到上面的任务的状态的更新,当LOST状态时,就直接删除executor进程信息,当任务完成时,流程如下

def enqueueSuccessfulTask(
taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
// 这个任务已经完成了
getTaskResultExecutor.execute(new Runnable {override def run(): Unit = Utils.logUncaughtExceptions {try {// 结果数据反序列化val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {case directResult: DirectTaskResult[_] =>if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {// 判断当前是否数据量比较大return}directResult.value()(directResult, serializedData.limit())case IndirectTaskResult(blockId, size) =>if (!taskSetManager.canFetchMoreResults(size)) {// dropped by executor if size is larger than maxResultSizesparkEnv.blockManager.master.removeBlock(blockId)return}logDebug("Fetching indirect task result for TID %s".format(tid))// 去拉取数据了scheduler.handleTaskGettingResult(taskSetManager, tid)// 拉取远程的block数据了val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)if (!serializedTaskResult.isDefined) {/* We won't be able to get the task result if the machine that ran the task failed* between when the task ended and when we tried to fetch the result, or if the* block manager had to flush the result. */// 拉取失败的情况下scheduler.handleFailedTask(taskSetManager, tid, TaskState.FINISHED, TaskResultLost)return}val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](serializedTaskResult.get)// 在master中BlockManagerMasterEndpoint 要去各个 slave  里面删除一下这个blockid的数据sparkEnv.blockManager.master.removeBlock(blockId)(deserializedResult, size)}result.metrics.setResultSize(size)// 任务完成了scheduler.handleSuccessfulTask(taskSetManager, tid, result)}

可以看到下面的代码,结果都是通过序列化对象返回的,如果结果比较少,就直接返回,否则就存放在一个block中,然后异步去拉取。具体的发送
过程可查看之前写的《spark 业务执行进程》
当数据量比较大时,会尝试调用 scheduler.handleTaskGettingResult 方法去拉取。

def taskGettingResult(taskInfo: TaskInfo) {
// 发送消息 去拉取结果
eventProcessLoop.post(GettingResultEvent(taskInfo))
}

然后调用 sparkEnv.blockManager.getRemoteBytes(blockId) 方法去远程拉取block数据

// 拉取远程的block数据
private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
require(blockId != null, "BlockId is null")
// 去driver里面拉取block在那个位置
val locations = Random.shuffle(master.getLocations(blockId))
var numFetchFailures = 0
for (loc <- locations) {logDebug(s"Getting remote block $blockId from $loc")val data = try {// 去这个block所在的节点拉取数据blockTransferService.fetchBlockSync(loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()} catch {case NonFatal(e) =>numFetchFailures += 1if (numFetchFailures == locations.size) {// An exception is thrown while fetching this block from all locationsthrow new BlockFetchException(s"Failed to fetch block from" +s" ${locations.size} locations. Most recent failure cause:", e)} else {// This location failed, so we retry fetch from a different one by returning null herelogWarning(s"Failed to fetch remote block $blockId " +s"from $loc (failed attempt $numFetchFailures)", e)null}}

可以看到,先去master里面确定这个blockID存储在那些节点中,然后一个一个节点异步去拉取数据。确定block位置的方式是通过发送
信息到 spark driver 中的线程进行确定的,如下

/** Get locations of the blockId from the driver */
def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
// 从driver里面确定block的位置
driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetLocations(blockId))
}

发送到BlockManagerMasterEndpoint 类当中,这个类是在master中,管理所有和slave的相关信息的。然后通过NettyBlockTransferService
类去拉取block数据信息

override def fetchBlocks(host: String,port: Int,execId: String,blockIds: Array[String],listener: BlockFetchingListener): Unit = {
logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
// 通过netty的方式去拉取block文件
try {val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {// 这里就是创建netty客户端进行拉取数据了val client = clientFactory.createClient(host, port)new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener).start()}}// 如果有重试器,则就创建一个包装对象进行重试val maxRetries = transportConf.maxIORetries()if (maxRetries > 0) {// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's// a bug in this code. We should remove the if statement once we're sure of the stability.new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()} else {blockFetchStarter.createAndStart(blockIds, listener)}
}

所以这样就可以把数据拉取回来了,当把结果拉取回来后,反序列化。调用

scheduler.handleSuccessfulTask(taskSetManager, tid, result)

方法进行task的结果处理。

def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
// 这个task任务完成了
val info = taskInfos(tid)
val index = info.index
info.markSuccessful()
removeRunningTask(tid)
// 这个任务完成了
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
if (!successful(index)) {tasksSuccessful += 1logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks))// Mark successful and stop if all the tasks have succeeded.successful(index) = trueif (tasksSuccessful == numTasks) {// 当前管理的 taskSet集合已经全部完成了isZombie = true}
} else {logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +" because task " + index + " has already completed successfully")
}
failedExecutors.remove(index)
maybeFinishTaskSet()
}

然后向dagScheduler 发送task完成的信息了。在DAGScheduler中接收CompletionEvent的信息。

/*** Responds to a task finishing. This is called inside the event loop so it assumes that it can* modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.*/private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
val task = event.task
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)
// 任务完成了
outputCommitCoordinator.taskCompleted(stageId,task.partitionId,event.taskInfo.attemptNumber, // this is a task attempt numberevent.reason)// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
if (event.reason != Success) {val attemptId = task.stageAttemptIdlistenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, event.reason,event.taskInfo, event.taskMetrics))
}if (!stageIdToStage.contains(task.stageId)) {// Skip all the actions if the stage has been cancelled.return
}
// 任务完成的状态信息
val stage = stageIdToStage(task.stageId)
event.reason match {case Success =>listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,event.reason, event.taskInfo, event.taskMetrics))stage.pendingPartitions -= task.partitionIdtask match {case rt: ResultTask[_, _] =>// Cast to ResultStage here because it's part of the ResultTask// TODO Refactor this out to a function that accepts a ResultStageval resultStage = stage.asInstanceOf[ResultStage]resultStage.activeJob match {case Some(job) =>if (!job.finished(rt.outputId)) {updateAccumulators(event)job.finished(rt.outputId) = truejob.numFinished += 1// If the whole job has finished, remove itif (job.numFinished == job.numPartitions) {// 全部完成了markStageAsFinished(resultStage)// 清理一些相关的依赖和缓存cleanupStateForJobAndIndependentStages(job)listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))}// taskSucceeded runs some user code that might throw an exception. Make sure// we are resilient against that.try {// 记录运行的结果job.listener.taskSucceeded(rt.outputId, event.result)} catch {case e: Exception =>// TODO: Perhaps we want to mark the resultStage as failed?job.listener.jobFailed(new SparkDriverExecutionException(e))}}case None =>logInfo("Ignoring result from " + rt + " because its job has finished")}case smt: ShuffleMapTask =>val shuffleStage = stage.asInstanceOf[ShuffleMapStage]updateAccumulators(event)val status = event.result.asInstanceOf[MapStatus]val execId = status.location.executorIdlogDebug("ShuffleMapTask finished on " + execId)if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {// 这是一个失败的任务logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")} else {// 记录运行的结果shuffleStage.addOutputLoc(smt.partitionId, status)}if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {markStageAsFinished(shuffleStage)mapOutputTracker.registerMapOutputs(shuffleStage.shuffleDep.shuffleId,shuffleStage.outputLocInMapOutputTrackerFormat(),changeEpoch = true)clearCacheLocs()if (!shuffleStage.isAvailable) {// Some tasks had failed; let's resubmit this shuffleStage// TODO: Lower-level scheduler should also deal with thislogInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +") because some of its tasks had failed: " +shuffleStage.findMissingPartitions().mkString(", "))submitStage(shuffleStage)} else {// Mark any map-stage jobs waiting on this stage as finishedif (shuffleStage.mapStageJobs.nonEmpty) {val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)for (job <- shuffleStage.mapStageJobs) {markMapStageJobAsFinished(job, stats)}}}// Note: newly runnable stages will be submitted below when we submit waiting stages}}case Resubmitted =>logInfo("Resubmitted " + task + ", so marking it as still running")stage.pendingPartitions += task.partitionIdcase FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>val failedStage = stageIdToStage(task.stageId)val mapStage = shuffleToMapStage(shuffleId)if (failedStage.latestInfo.attemptId != task.stageAttemptId) {logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" +s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +s"(attempt ID ${failedStage.latestInfo.attemptId}) running")} else {// It is likely that we receive multiple FetchFailed for a single stage (because we have// multiple tasks running concurrently on different executors). In that case, it is// possible the fetch failure has already been handled by the scheduler.if (runningStages.contains(failedStage)) {logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +s"due to a fetch failure from $mapStage (${mapStage.name})")markStageAsFinished(failedStage, Some(failureMessage))} else {logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " +s"longer running")}failedStages += failedStagefailedStages += mapStage// Mark the map whose fetch failed as broken in the map stageif (mapId != -1) {mapStage.removeOutputLoc(mapId, bmAddress)mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)}// TODO: mark the executor as failed only if there were lots of fetch failures on itif (bmAddress != null) {handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch))}}case commitDenied: TaskCommitDenied =>// Do nothing here, left up to the TaskScheduler to decide how to handle denied commitscase exceptionFailure: ExceptionFailure =>// Do nothing here, left up to the TaskScheduler to decide how to handle user failurescase TaskResultLost =>// Do nothing here; the TaskScheduler handles these failures and resubmits the task.case _: ExecutorLostFailure | TaskKilled | UnknownReason =>// Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler// will abort the job.
}
submitWaitingStages()
}

上面就完成task任务相关的状态的处理。是成功、重新提交、状态掉失等等。

总结spark task任务状态更新的过程

  1. 通过statusUpdate接收各个task任务上报的状态信息
  2. 判断这个任务是否是FINISHED状态,如果是,反序列化结果
  3. 如果反序列Task的结果太大,就去BlockManagerMasterEndpoint中获取该block存放在那些work进程节点中
  4. 然后通过netty方式异步去拉取结果回来
  5. 结果回来后向dagScheduler 发送CompletionEvent 命令
  6. 最后把结果返回或者缓存在addOutputLoc中

spark task 任务状态管理相关推荐

  1. Spark Streaming揭秘 Day14 State状态管理

    Spark Streaming揭秘 Day14 State状态管理 今天让我们进入下SparkStreaming的一个非常好用的功能,也就State相关的操作.State是SparkStreaming ...

  2. Flink 状态管理与 Checkpoint 机制

    点击上方"zhisheng",选择"设为星标" 一.状态分类 相对于其他流计算框架,Flink 一个比较重要的特性就是其支持有状态计算.即你可以将中间的计算结果 ...

  3. 【转】Asp.net控件开发学习笔记整理篇 - Asp.net客户端状态管理

    最近一直在做MVC项目,对于WEBFORM 好像快忘记了.周末无聊,顺带看看他人的笔记.再次温习下. 复习大纲: 导航.页面生命周期及其它导论 一.服务器控件生命周期 二.控件开发基础 三.Asp.n ...

  4. 1.15.Flink state(状态)管理与恢复、什么是state、Keyed State、Operator State、状态容错(生成快照,恢复快照),checkPoint简介,重启策略等

    1.15.Flink state(状态)管理与恢复 1.15.1.什么是state 1.15.2.状态(State) 1.15.3.Keyed State 1.15.4.Operator State ...

  5. Flink状态管理与CheckPoint、Savepoint

    转载自:https://blog.csdn.net/hxcaifly/article/details/84673292     https://blog.csdn.net/rlnLo2pNEfx9c/ ...

  6. Dapr + .NET Core实战(三)状态管理

    状态管理解决了什么 分布式应用程序中的状态可能很有挑战性.例如: 应用程序可能需要不同类型的数据存储. 访问和更新数据可能需要不同的一致性级别. 多个用户可以同时更新数据,这需要解决冲突. 服务必须重 ...

  7. Dapr牵手.NET学习笔记:状态管理之docker-compose发布

    Dapr牵手.NET学习笔记:想入非非的服务调用 Dapr牵手.NET学习笔记:跨物理机负载均衡服务调用 Dapr牵手.NET学习笔记:用docker-compose部署服务 说明:为了给出demo的 ...

  8. 面向.NET开发人员的Dapr——状态管理

    目录: 面向.NET开发人员的Dapr--前言 面向.NET开发人员的Dapr--分布式世界 面向.NET开发人员的Dapr--俯瞰Dapr 面向.NET开发人员的Dapr--入门 面向.NET开发人 ...

  9. 通过Dapr实现一个简单的基于.net的微服务电商系统(五)——一步一步教你如何撸Dapr之状态管理...

    状态管理和上一章的订阅发布都算是Dapr相较于其他服务网格框架来讲提供的比较特异性的内容,今天我们来讲讲状态管理. 目录: 一.通过Dapr实现一个简单的基于.net的微服务电商系统 二.通过Dapr ...

最新文章

  1. Word中的图片显示出不来的解决办法
  2. PHP 读取Excel数据
  3. 数据挖掘与数据化运营实战. 3.12 数据产品
  4. 论文浅尝 | 采用多层注意力机制的事件检测
  5. UIWebView清空本地缓存
  6. java 爬 维基百科_爬取维基百科词条
  7. spark压缩和序列化相关
  8. 关于jQuery、AJAX、JSON(一)
  9. 【浅墨著作】《逐梦旅程:Windows游戏编程之从零开始》勘误配套源代码下载...
  10. 常用网络特殊符号大全(含彩色表情符号)
  11. c++ 设计模式推荐书籍
  12. Jsp jsp实现原理
  13. linux中添加一行,linux – sed:在某个位置插入一行
  14. 服务器做虚拟网吧,一种基于游戏的虚拟网吧实现方法
  15. 关于Servlet的两种配置Web.xml文件配置或者使用(@WebServlet(name = ,urlPatterns = ))配置问题——WebServlet注解
  16. 中文语音合成综合评测一(可懂度)
  17. C1. Pokémon Army (easy version)(栈模拟)
  18. 请在mysql配置文件修sql-mode或sql_mode为NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTIT windows下设置mysql的sql_mode
  19. python 文件操作 os模块和shutil模块
  20. Colletion集合2

热门文章

  1. java写 狐狸找兔子_狐狸找兔 算法分析
  2. Winter(Sleep)Camp2016酱油鸡
  3. python的列表与元素基本操作
  4. uniapp 点击动画_uni-app 点击元素左右抖动效果
  5. SSL P2719 买礼物的艰辛
  6. 统计Excel数据的重复个数(两个方法)
  7. 新媒体运营黎想教程:活动运营策划的简略4个方式
  8. c语言地图染色程序,求C语言地图四染色代码..
  9. 曼哈顿距离最小生成树莫队算法
  10. 企业申请E-mark认证要检测些什么?