背景介绍

当正在悠闲敲着代码的时候,业务方兄弟反馈接收到大量线上运行的spark streaming任务的告警短信,查看应用的web页面信息,发现spark应用已经退出了,第一时间拉起线上的应用,再慢慢的定位故障原因。本文代码基于spark 1.6.1。 问题定位

登陆到线上机器,查看错误日志,发现系统一直报Cannot call methods on a stopped SparkContext.,全部日志如下

[ERROR][JobScheduler][2017-03-08+15:56:00.067][org.apache.spark.streaming.scheduler.JobScheduler]Error running job streaming job 1488959760000 ms.0 java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext. This stopped SparkContext was created at: org.apache.spark.SparkContext.<init>(SparkContext.scala:82) org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:874) org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:81) com.xxxx.xxxx.MainApp$.createStreamingContext(MainApp.scala:46) com.xxxx.xxxx.MainApp$$anonfun$15.apply(MainApp.scala:126) com.xxxx.xxxx.MainApp$$anonfun$15.apply(MainApp.scala:126) scala.Option.getOrElse(Option.scala:120) org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:864) com.xxxx.xxxx.MainApp$.main(MainApp.scala:125) com.xxxx.xxxx.MainApp.main(MainApp.scala) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

看到此处应该很清楚了,是SparkContext已经停止了,接下来我们分析下是什么原因导致了SparkContext的停止,首先找到关闭的日志;分析SparkContext的代码可知,在关闭结束后会打印一个成功关闭的详情日志。

logInfo("Successfully stopped SparkContext")

通过grep命令找到相应的日志的位置,如下所示

[INFO][dag-scheduler-event-loop][2017-03-03+22:16:30.841][org.apache.spark.SparkContext]Successfully stopped SparkContext

从日志中可以看出是dag-scheduler-event-loop线程关闭了SparkContext,查看该线程的日志信息,显示如下

java.lang.IllegalStateException: more than one active taskSet for stage 4571114: 4571114.2,4571114.1 at org.apache.spark.scheduler.TaskSchedulerImpl.submitTasks(TaskSchedulerImpl.scala:173) at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1052) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

上面显示有一个stage同时启动了两个TasksetManager,TaskScheduler.submitTasks的代码如下:

override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie } if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } ......... }

看到这震惊了,怎么会出现两个呢。继续看之前的日志,发现stage4571114被resubmit了;

[INFO][dag-scheduler-event-loop][2017-03-03+22:16:29.547][org.apache.spark.scheduler.DAGScheduler]Resubmitting ShuffleMapStage 4571114 (map at MainApp.scala:73) because some of its tasks had failed: 0 [INFO][dag-scheduler-event-loop][2017-03-03+22:16:29.547][org.apache.spark.scheduler.DAGScheduler]Submitting ShuffleMapStage 4571114 (MapPartitionsRDD[3719544] at map at MainApp.scala:73), which has no missing parents

查看stage重新提交的代码,以下代码截取自DAGScheduler.handleTaskCompletion方法

case smt: ShuffleMapTask => val shuffleStage = stage.asInstanceOf[ShuffleMapStage] updateAccumulators(event) val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { shuffleStage.addOutputLoc(smt.partitionId, status) } if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { markStageAsFinished(shuffleStage) logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) logInfo("waiting: " + waitingStages) logInfo("failed: " + failedStages) // We supply true to increment the epoch number here in case this is a // recomputation of the map outputs. In that case, some nodes may have cached // locations with holes (from when we detected the error) and will need the // epoch incremented to refetch them. // TODO: Only increment the epoch number if this is not the first time // we registered these map outputs. mapOutputTracker.registerMapOutputs( shuffleStage.shuffleDep.shuffleId, shuffleStage.outputLocInMapOutputTrackerFormat(), changeEpoch = true) clearCacheLocs() if (!shuffleStage.isAvailable) { // Some tasks had failed; let's resubmit this shuffleStage // TODO: Lower-level scheduler should also deal with this logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name + ") because some of its tasks had failed: " + shuffleStage.findMissingPartitions().mkString(", ")) submitStage(shuffleStage) } else { // Mark any map-stage jobs waiting on this stage as finished if (shuffleStage.mapStageJobs.nonEmpty) { val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep) for (job <- shuffleStage.mapStageJobs) { markMapStageJobAsFinished(job, stats) } } }

可以看出只有shuffleStage.pendingPartitions为空同时shuffleStage.isAvailable为false的时候才会触发resubmit,我们来看下这两个变量是什么时候开始,pendingPartitions表示现在正在处理的partition的数量,当task运行结束后会删除,

val stage = stageIdToStage(task.stageId) event.reason match { case Success => listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType, event.reason, event.taskInfo, event.taskMetrics)) //从正在处理的partition中移除 stage.pendingPartitions -= task.partitionId

isAvaible判断的是已经告知driver的shuffle数据位置的partition数目是否等于总共的partition数目
def isAvailable: Boolean = _numAvailableOutputs == numPartitions
这个变量也是在ShuffleTask运行结束后进行更新的,不过需要注意的是,只有在Shuffle数据所在的executor还是可用的时候才进行更新,如果运行shuffleTask的executor已经挂了,肯定也无法通过该executor获取磁盘上的shuffle数据

case smt: ShuffleMapTask => val shuffleStage = stage.asInstanceOf[ShuffleMapStage] updateAccumulators(event) val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { shuffleStage.addOutputLoc(smt.partitionId, status) }

唯一的可能造成重新调度的就是该处了,根据关键信息查询下日志信息

[INFO][dag-scheduler-event-loop][2017-03-03+22:16:27.427][org.apache.spark.scheduler.DAGScheduler]Ignoring possibly bogus ShuffleMapTask(4571114, 0) completion from executor 4

但就算此时刚运行完shuffleTask的executor挂掉了,造成了stage的重新调度,也不会导致TasksetManager冲突,因为此时taskset.isZombie状态肯定变了为true,因为TasksetManager.handleSuccessfulTask方法执行在DAGScheduler.handleTaskCompletion之前。
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie

TasksetManager.handleSuccessfulTask

def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { val info = taskInfos(tid) val index = info.index info.markSuccessful() removeRunningTask(tid) // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not // "deserialize" the value when holding a lock to avoid blocking other threads. So we call // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here. // Note: "result.value()" only deserializes the value when it's called at the first time, so // here "result.value()" just returns the value and won't block other threads. //最终会提交一个CompletionEvent事件到DAGScheduler的事件队列中等待处理 sched.dagScheduler.taskEnded( tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics) if (!successful(index)) { tasksSuccessful += 1 logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format( info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks)) // Mark successful and stop if all the tasks have succeeded. successful(index) = true if (tasksSuccessful == numTasks) { isZombie = true } } else { logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") } failedExecutors.remove(index) maybeFinishTaskSet() }

可能有的同学已经看出问题来了,为了将问题说的更明白,我画了一个task执行成功的时序图
task执行成功时序图

结合时序图和代码我们可以看出DAGSchduler.handleCompletion执行发生在了TasksetManager.handleSuccessfulTask方法中isZombie变为true之前,handleSuccessfulTask是在task-result-getter线程中执行的,导致isZombie还未变为true,DAGSchduler就触发了stage的重新提交,最终导致TaskManger冲突。
以下日志分别是resubmit提交的时间和handleSuccessfuleTask的结束时间,从侧面(由于isZombie变为true并没有马上打印时间)也能够看出resubmit重新提交的时间早于handleSuccessfuleTask。

handleSuccessfuleTask结束时间 [INFO][task-result-getter-2][2017-03-03+22:16:29.999][org.apache.spark.scheduler.TaskSchedulerImpl]Removed TaskSet 4571114.1, whose tasks have all completed, from pool resubmit stage任务重新提交时间 [INFO][dag-scheduler-event-loop][2017-03-03+22:16:29.549][org.apache.spark.scheduler.TaskSchedulerImpl]Adding task set 4571114.2 with 1 tasks

事件发生的时间轴
事件时间 问题修复

该问题修复其实很简单,只需要修改TasksetManager.handleSuccessfulTask的方法,在isZombie=true后再发送CompletionEvent事件即可,代码修复如下

def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { val info = taskInfos(tid) val index = info.index info.markSuccessful() removeRunningTask(tid) // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not // "deserialize" the value when holding a lock to avoid blocking other threads. So we call // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here. // Note: "result.value()" only deserializes the value when it's called at the first time, so // here "result.value()" just returns the value and won't block other threads. if (!successful(index)) { tasksSuccessful += 1 logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format( info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks)) // Mark successful and stop if all the tasks have succeeded. successful(index) = true if (tasksSuccessful == numTasks) { isZombie = true } sched.dagScheduler.taskEnded( tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics) } else { logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") } failedExecutors.remove(index) maybeFinishTaskSet() sched.dagScheduler.taskEnded( tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics) }

TasksetManager冲突导致SparkContext异常关闭相关推荐

  1. 360浏览器异常关闭,错过点击恢复,如何重新恢复原有网页?

    360浏览器异常关闭,错过点击恢复,如何重新恢复原有网页? 斜体样式周末回来,电脑断电导致360异常关闭,本来以前点击弹出的恢复即可了,可这次没来得及点就'溜'走了,探索了好一会终于恢复了,下面是我操 ...

  2. IP地址配置冲突导致路由振荡怎么办

    本期小编为大家带来的是:由于IP地址配置冲突导致路由振荡,使用户业务时断时通. 背景知识: IP地址在一个局域网内具有唯一性,也就是说局域网内,不能为不同的设备配置相同的IP地址. 组网情况: RTA ...

  3. 华大 MCU 之七 DMA 导致 SPI 异常停止的原因分析、DMA 配置的那些坑

    缘起   在最近的项目测试中发现,SPI 通信总是莫名其妙的失败,查看寄存器发现 SPI 已经被停止了.根据手册,SPI 在异常情况下会被强制停止(SPI 的使能为被清零),而根据波形显示通信过程没有 ...

  4. Oracle12c异常关闭后启动PDBORCL(ORA-01033)

    这个问题已经困扰了我好几天找解决方案,终于找到: 由于Oracle12c的特殊性,但许多用户并不想在创建用户时前面要加"C##" 那么就要创建PDBORCL数据库,来与Oracle ...

  5. oracle u01目录 100,文件目录空间利用率达到100%而导致数据库异常挂起的故障处理过...

    由数据文件目录空间利用率达到100%而导致数据库异常挂起的故障处理过程 错误内容描述: Mon Aug 03 14:05:11 2015 Thread 1 cannot allocate new lo ...

  6. idea全局查找快捷键与win10 的快捷键冲突,导致idea不能被使用

    1. idea全局查找快捷键与win10 的快捷键冲突,导致idea不能被使用 参考文档= https://blog.csdn.net/wenteryan/article/details/782820 ...

  7. MQ异常 关闭原因 = 2009[个人收藏 他人求助及回答]

    1#-问 请教各位大虾: 客户端连接到队列管理器所在的服务器进行消息接收时,如果长时间该队列中一直没有消息可以取,但是客户接收程序还继续运行,过一段时间MQ就会报如下异常: com.ibm.mqser ...

  8. PS占用CPU太高,导致电脑异常卡顿

    Adobe Photoshop(PS):占用CPU太高,导致电脑异常卡顿 1.软件环境 2.问题描述 3.解决方案 3.1.获取后台服务关闭工具 3.2.永久禁止`Adobe`后台服务 3.2.1.快 ...

  9. WPS异常关闭,文件只能以只读模式浏览,无法编辑

    例如,文件为test.xls,用wps打开编辑时,会生成个.~test.xls的隐藏文件.保存关闭wps后,该文件会被删掉.若wps异常关闭,会导致.~test.xls无法被删掉.再次编辑文档,wps ...

最新文章

  1. 重构智能合约(上):非确定性的幽灵
  2. Java常用设计模式————原型模式(二)之深拷贝与浅拷贝
  3. java string字符拼接符+的研究
  4. Linux 查看内存插槽数、最大容量和频率
  5. DeprecationWarning:current URL string parser is deprecated, and will be removed in a future version.
  6. vba移动文件_你想要的爬虫、VBA系列教程这里都有!
  7. 准考证打印系统关闭怎么办_2021国家公务员考试准考证打印系统关闭了怎么办...
  8. 微信中打开网址添加请在在浏览器中打开提示遮罩
  9. 小白入,告诉你.ssh新建config文件究竟是哪种文件!
  10. 使用github安装vue-devtools
  11. 三种开源协议的选择:BSD,Apache,MIT
  12. Aria2+yaaw+Chrome插件BaiduExporter实现百度网盘下载
  13. 洛谷 P1194 买礼物 题解
  14. 分数阶微积分_通知 | 上海大学理学院微积分小导师答疑第一期
  15. 海尔简爱s11怎么进入bios_海尔简爱s11系统应用商店没有登录界面怎么办?
  16. 功能测试--分享功能测试
  17. C++ 视频播放开源库
  18. bitly短网址v4版本的操作及sample code
  19. C++类的六个特殊成员函数20220226
  20. 普通table表格样式大全

热门文章

  1. linux on zfs,在zfsonlinux中增长zpool
  2. sTC8G1K08+通过串口显示内部电压_基于51单片机的数字电流电压表
  3. 获取昨天凌晨毫秒数_Java 获取当前时间距离当天凌晨的秒数
  4. 如何查看一个现有的keil工程之前由什么版本的keil IDE编译
  5. C/C++内存分配与Linux内存管理进程所涉及到的五个数据段 .
  6. 怎么把路由的#号去掉_VLAN应用篇系列:交换机VLAN间路由与传统单臂路由(子接口)方式...
  7. python legb_理解 Python 的 LEGB.
  8. golang byte 转writer_聊聊golang的zap的WriteSyncer
  9. Linux 进内核,arm linux 启动流程之 进入内核
  10. java模拟atm 课程设计_急求,关于Java课程设计ATM创建实现