spark task 任务状态管理
spark task 任务状态管理
spark task 的任务状态经常进行更新,当任务完成后,这个任务是怎么取得结果的呢,看下面的代码流程
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
// 进行状态的更新 了
synchronized {try {if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {// We lost this entire executor, so remember that it's goneval execId = taskIdToExecutorId(tid)if (executorIdToTaskCount.contains(execId)) {// 删除该进程相关removeExecutor(execId,SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))failedExecutor = Some(execId)}}taskIdToTaskSetManager.get(tid) match {case Some(taskSet) =>if (TaskState.isFinished(state)) {taskIdToTaskSetManager.remove(tid)taskIdToExecutorId.remove(tid).foreach { execId =>if (executorIdToTaskCount.contains(execId)) {executorIdToTaskCount(execId) -= 1}}}if (state == TaskState.FINISHED) {taskSet.removeRunningTask(tid)// 这个任务成功了taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {taskSet.removeRunningTask(tid)taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)}
可以看到上面的任务的状态的更新,当LOST状态时,就直接删除executor进程信息,当任务完成时,流程如下
def enqueueSuccessfulTask(
taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
// 这个任务已经完成了
getTaskResultExecutor.execute(new Runnable {override def run(): Unit = Utils.logUncaughtExceptions {try {// 结果数据反序列化val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {case directResult: DirectTaskResult[_] =>if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {// 判断当前是否数据量比较大return}directResult.value()(directResult, serializedData.limit())case IndirectTaskResult(blockId, size) =>if (!taskSetManager.canFetchMoreResults(size)) {// dropped by executor if size is larger than maxResultSizesparkEnv.blockManager.master.removeBlock(blockId)return}logDebug("Fetching indirect task result for TID %s".format(tid))// 去拉取数据了scheduler.handleTaskGettingResult(taskSetManager, tid)// 拉取远程的block数据了val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)if (!serializedTaskResult.isDefined) {/* We won't be able to get the task result if the machine that ran the task failed* between when the task ended and when we tried to fetch the result, or if the* block manager had to flush the result. */// 拉取失败的情况下scheduler.handleFailedTask(taskSetManager, tid, TaskState.FINISHED, TaskResultLost)return}val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](serializedTaskResult.get)// 在master中BlockManagerMasterEndpoint 要去各个 slave 里面删除一下这个blockid的数据sparkEnv.blockManager.master.removeBlock(blockId)(deserializedResult, size)}result.metrics.setResultSize(size)// 任务完成了scheduler.handleSuccessfulTask(taskSetManager, tid, result)}
可以看到下面的代码,结果都是通过序列化对象返回的,如果结果比较少,就直接返回,否则就存放在一个block中,然后异步去拉取。具体的发送
过程可查看之前写的《spark 业务执行进程》。
当数据量比较大时,会尝试调用 scheduler.handleTaskGettingResult 方法去拉取。
def taskGettingResult(taskInfo: TaskInfo) {
// 发送消息 去拉取结果
eventProcessLoop.post(GettingResultEvent(taskInfo))
}
然后调用 sparkEnv.blockManager.getRemoteBytes(blockId) 方法去远程拉取block数据
// 拉取远程的block数据
private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
require(blockId != null, "BlockId is null")
// 去driver里面拉取block在那个位置
val locations = Random.shuffle(master.getLocations(blockId))
var numFetchFailures = 0
for (loc <- locations) {logDebug(s"Getting remote block $blockId from $loc")val data = try {// 去这个block所在的节点拉取数据blockTransferService.fetchBlockSync(loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()} catch {case NonFatal(e) =>numFetchFailures += 1if (numFetchFailures == locations.size) {// An exception is thrown while fetching this block from all locationsthrow new BlockFetchException(s"Failed to fetch block from" +s" ${locations.size} locations. Most recent failure cause:", e)} else {// This location failed, so we retry fetch from a different one by returning null herelogWarning(s"Failed to fetch remote block $blockId " +s"from $loc (failed attempt $numFetchFailures)", e)null}}
可以看到,先去master里面确定这个blockID存储在那些节点中,然后一个一个节点异步去拉取数据。确定block位置的方式是通过发送
信息到 spark driver 中的线程进行确定的,如下
/** Get locations of the blockId from the driver */
def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
// 从driver里面确定block的位置
driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetLocations(blockId))
}
发送到BlockManagerMasterEndpoint 类当中,这个类是在master中,管理所有和slave的相关信息的。然后通过NettyBlockTransferService
类去拉取block数据信息
override def fetchBlocks(host: String,port: Int,execId: String,blockIds: Array[String],listener: BlockFetchingListener): Unit = {
logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
// 通过netty的方式去拉取block文件
try {val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {// 这里就是创建netty客户端进行拉取数据了val client = clientFactory.createClient(host, port)new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener).start()}}// 如果有重试器,则就创建一个包装对象进行重试val maxRetries = transportConf.maxIORetries()if (maxRetries > 0) {// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's// a bug in this code. We should remove the if statement once we're sure of the stability.new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()} else {blockFetchStarter.createAndStart(blockIds, listener)}
}
所以这样就可以把数据拉取回来了,当把结果拉取回来后,反序列化。调用
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
方法进行task的结果处理。
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
// 这个task任务完成了
val info = taskInfos(tid)
val index = info.index
info.markSuccessful()
removeRunningTask(tid)
// 这个任务完成了
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
if (!successful(index)) {tasksSuccessful += 1logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks))// Mark successful and stop if all the tasks have succeeded.successful(index) = trueif (tasksSuccessful == numTasks) {// 当前管理的 taskSet集合已经全部完成了isZombie = true}
} else {logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +" because task " + index + " has already completed successfully")
}
failedExecutors.remove(index)
maybeFinishTaskSet()
}
然后向dagScheduler 发送task完成的信息了。在DAGScheduler中接收CompletionEvent的信息。
/*** Responds to a task finishing. This is called inside the event loop so it assumes that it can* modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.*/private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
val task = event.task
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)
// 任务完成了
outputCommitCoordinator.taskCompleted(stageId,task.partitionId,event.taskInfo.attemptNumber, // this is a task attempt numberevent.reason)// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
if (event.reason != Success) {val attemptId = task.stageAttemptIdlistenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, event.reason,event.taskInfo, event.taskMetrics))
}if (!stageIdToStage.contains(task.stageId)) {// Skip all the actions if the stage has been cancelled.return
}
// 任务完成的状态信息
val stage = stageIdToStage(task.stageId)
event.reason match {case Success =>listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,event.reason, event.taskInfo, event.taskMetrics))stage.pendingPartitions -= task.partitionIdtask match {case rt: ResultTask[_, _] =>// Cast to ResultStage here because it's part of the ResultTask// TODO Refactor this out to a function that accepts a ResultStageval resultStage = stage.asInstanceOf[ResultStage]resultStage.activeJob match {case Some(job) =>if (!job.finished(rt.outputId)) {updateAccumulators(event)job.finished(rt.outputId) = truejob.numFinished += 1// If the whole job has finished, remove itif (job.numFinished == job.numPartitions) {// 全部完成了markStageAsFinished(resultStage)// 清理一些相关的依赖和缓存cleanupStateForJobAndIndependentStages(job)listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))}// taskSucceeded runs some user code that might throw an exception. Make sure// we are resilient against that.try {// 记录运行的结果job.listener.taskSucceeded(rt.outputId, event.result)} catch {case e: Exception =>// TODO: Perhaps we want to mark the resultStage as failed?job.listener.jobFailed(new SparkDriverExecutionException(e))}}case None =>logInfo("Ignoring result from " + rt + " because its job has finished")}case smt: ShuffleMapTask =>val shuffleStage = stage.asInstanceOf[ShuffleMapStage]updateAccumulators(event)val status = event.result.asInstanceOf[MapStatus]val execId = status.location.executorIdlogDebug("ShuffleMapTask finished on " + execId)if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {// 这是一个失败的任务logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")} else {// 记录运行的结果shuffleStage.addOutputLoc(smt.partitionId, status)}if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {markStageAsFinished(shuffleStage)mapOutputTracker.registerMapOutputs(shuffleStage.shuffleDep.shuffleId,shuffleStage.outputLocInMapOutputTrackerFormat(),changeEpoch = true)clearCacheLocs()if (!shuffleStage.isAvailable) {// Some tasks had failed; let's resubmit this shuffleStage// TODO: Lower-level scheduler should also deal with thislogInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +") because some of its tasks had failed: " +shuffleStage.findMissingPartitions().mkString(", "))submitStage(shuffleStage)} else {// Mark any map-stage jobs waiting on this stage as finishedif (shuffleStage.mapStageJobs.nonEmpty) {val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)for (job <- shuffleStage.mapStageJobs) {markMapStageJobAsFinished(job, stats)}}}// Note: newly runnable stages will be submitted below when we submit waiting stages}}case Resubmitted =>logInfo("Resubmitted " + task + ", so marking it as still running")stage.pendingPartitions += task.partitionIdcase FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>val failedStage = stageIdToStage(task.stageId)val mapStage = shuffleToMapStage(shuffleId)if (failedStage.latestInfo.attemptId != task.stageAttemptId) {logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" +s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +s"(attempt ID ${failedStage.latestInfo.attemptId}) running")} else {// It is likely that we receive multiple FetchFailed for a single stage (because we have// multiple tasks running concurrently on different executors). In that case, it is// possible the fetch failure has already been handled by the scheduler.if (runningStages.contains(failedStage)) {logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +s"due to a fetch failure from $mapStage (${mapStage.name})")markStageAsFinished(failedStage, Some(failureMessage))} else {logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " +s"longer running")}failedStages += failedStagefailedStages += mapStage// Mark the map whose fetch failed as broken in the map stageif (mapId != -1) {mapStage.removeOutputLoc(mapId, bmAddress)mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)}// TODO: mark the executor as failed only if there were lots of fetch failures on itif (bmAddress != null) {handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch))}}case commitDenied: TaskCommitDenied =>// Do nothing here, left up to the TaskScheduler to decide how to handle denied commitscase exceptionFailure: ExceptionFailure =>// Do nothing here, left up to the TaskScheduler to decide how to handle user failurescase TaskResultLost =>// Do nothing here; the TaskScheduler handles these failures and resubmits the task.case _: ExecutorLostFailure | TaskKilled | UnknownReason =>// Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler// will abort the job.
}
submitWaitingStages()
}
上面就完成task任务相关的状态的处理。是成功、重新提交、状态掉失等等。
总结spark task任务状态更新的过程
- 通过statusUpdate接收各个task任务上报的状态信息
- 判断这个任务是否是FINISHED状态,如果是,反序列化结果
- 如果反序列Task的结果太大,就去BlockManagerMasterEndpoint中获取该block存放在那些work进程节点中
- 然后通过netty方式异步去拉取结果回来
- 结果回来后向dagScheduler 发送CompletionEvent 命令
- 最后把结果返回或者缓存在addOutputLoc中
spark task 任务状态管理相关推荐
- Spark Streaming揭秘 Day14 State状态管理
Spark Streaming揭秘 Day14 State状态管理 今天让我们进入下SparkStreaming的一个非常好用的功能,也就State相关的操作.State是SparkStreaming ...
- Flink 状态管理与 Checkpoint 机制
点击上方"zhisheng",选择"设为星标" 一.状态分类 相对于其他流计算框架,Flink 一个比较重要的特性就是其支持有状态计算.即你可以将中间的计算结果 ...
- 【转】Asp.net控件开发学习笔记整理篇 - Asp.net客户端状态管理
最近一直在做MVC项目,对于WEBFORM 好像快忘记了.周末无聊,顺带看看他人的笔记.再次温习下. 复习大纲: 导航.页面生命周期及其它导论 一.服务器控件生命周期 二.控件开发基础 三.Asp.n ...
- 1.15.Flink state(状态)管理与恢复、什么是state、Keyed State、Operator State、状态容错(生成快照,恢复快照),checkPoint简介,重启策略等
1.15.Flink state(状态)管理与恢复 1.15.1.什么是state 1.15.2.状态(State) 1.15.3.Keyed State 1.15.4.Operator State ...
- Flink状态管理与CheckPoint、Savepoint
转载自:https://blog.csdn.net/hxcaifly/article/details/84673292 https://blog.csdn.net/rlnLo2pNEfx9c/ ...
- Dapr + .NET Core实战(三)状态管理
状态管理解决了什么 分布式应用程序中的状态可能很有挑战性.例如: 应用程序可能需要不同类型的数据存储. 访问和更新数据可能需要不同的一致性级别. 多个用户可以同时更新数据,这需要解决冲突. 服务必须重 ...
- Dapr牵手.NET学习笔记:状态管理之docker-compose发布
Dapr牵手.NET学习笔记:想入非非的服务调用 Dapr牵手.NET学习笔记:跨物理机负载均衡服务调用 Dapr牵手.NET学习笔记:用docker-compose部署服务 说明:为了给出demo的 ...
- 面向.NET开发人员的Dapr——状态管理
目录: 面向.NET开发人员的Dapr--前言 面向.NET开发人员的Dapr--分布式世界 面向.NET开发人员的Dapr--俯瞰Dapr 面向.NET开发人员的Dapr--入门 面向.NET开发人 ...
- 通过Dapr实现一个简单的基于.net的微服务电商系统(五)——一步一步教你如何撸Dapr之状态管理...
状态管理和上一章的订阅发布都算是Dapr相较于其他服务网格框架来讲提供的比较特异性的内容,今天我们来讲讲状态管理. 目录: 一.通过Dapr实现一个简单的基于.net的微服务电商系统 二.通过Dapr ...
最新文章
- Word中的图片显示出不来的解决办法
- PHP 读取Excel数据
- 数据挖掘与数据化运营实战. 3.12 数据产品
- 论文浅尝 | 采用多层注意力机制的事件检测
- UIWebView清空本地缓存
- java 爬 维基百科_爬取维基百科词条
- spark压缩和序列化相关
- 关于jQuery、AJAX、JSON(一)
- 【浅墨著作】《逐梦旅程:Windows游戏编程之从零开始》勘误配套源代码下载...
- 常用网络特殊符号大全(含彩色表情符号)
- c++ 设计模式推荐书籍
- Jsp jsp实现原理
- linux中添加一行,linux – sed:在某个位置插入一行
- 服务器做虚拟网吧,一种基于游戏的虚拟网吧实现方法
- 关于Servlet的两种配置Web.xml文件配置或者使用(@WebServlet(name = ,urlPatterns = ))配置问题——WebServlet注解
- 中文语音合成综合评测一(可懂度)
- C1. Pokémon Army (easy version)(栈模拟)
- 请在mysql配置文件修sql-mode或sql_mode为NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTIT windows下设置mysql的sql_mode
- python 文件操作 os模块和shutil模块
- Colletion集合2