这篇博文,我们就来讲讲Executor启动后,是如何在Executor上执行Task的,以及其后续处理。

执行Task

我们在《深入理解Spark 2.1 Core (三):任务调度器的原理与源码分析 》中提到了,任务调度完成后,CoarseGrainedSchedulerBackend.DriverEndpoint会调用launchTasks向CoarseGrainedExecutorBackend发送带着serializedTask的LaunchTask信号。接下来,我们就来讲讲CoarseGrainedExecutorBackend接收到LaunchTask信号后,是如何执行Task的。

调用栈如下:

  • CoarseGrainedExecutorBackend.receive

    • Executor.launchTask

      • Executor.TaskRunner.run

        • Executor.updateDependencies
        • Task.run
          • ShuffleMapTask.runTask
          • ResultTask.runTask

CoarseGrainedExecutorBackend.receive

    case LaunchTask(data) =>if (executor == null) {exitExecutor(1, "Received LaunchTask command but executor was null")} else {// 反序列话task描述val taskDesc = ser.deserialize[TaskDescription](data.value)  logInfo("Got assigned task " + taskDesc.taskId)// 调用executor.launchTaskexecutor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,taskDesc.name, taskDesc.serializedTask)}

Executor.launchTask

  def launchTask(context: ExecutorBackend,taskId: Long,attemptNumber: Int,taskName: String,serializedTask: ByteBuffer): Unit = {// 创建TaskRunnerval tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,serializedTask)// 把taskID 以及 对应的 TaskRunner,// 加入到 ConcurrentHashMap[Long, TaskRunner]runningTasks.put(taskId, tr)// 线程池 执行 TaskRunnerthreadPool.execute(tr)}
  • 1

Executor.TaskRunner.run

    override def run(): Unit = {val threadMXBean = ManagementFactory.getThreadMXBeanval taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)// 记录开始反序列化的时间val deserializeStartTime = System.currentTimeMillis()// 记录开始反序列化的时的Cpu时间val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0LThread.currentThread.setContextClassLoader(replClassLoader)val ser = env.closureSerializer.newInstance()logInfo(s"Running $taskName (TID $taskId)")execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)var taskStart: Long = 0var taskStartCpu: Long = 0// 开始GC的时间startGCTime = computeTotalGcTime()try {//反序列化任务信息val (taskFiles, taskJars, taskProps, taskBytes) =Task.deserializeWithDependencies(serializedTask)// 根据taskProps设置executor属性Executor.taskDeserializationProps.set(taskProps)// 根据taskFiles和taskJars,// 下载任务所需的File 和 加载所需的Jar包updateDependencies(taskFiles, taskJars)// 根据taskBytes生成tasktask = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)//设置task属性task.localProperties = taskProps//设置task内存管理task.setTaskMemoryManager(taskMemoryManager)// 若在反序列话之前Task就被kill了,// 抛出异常if (killed) {throw new TaskKilledException}logDebug("Task " + taskId + "'s epoch is " + task.epoch)//更新mapOutputTracker Epoch 为task epochenv.mapOutputTracker.updateEpoch(task.epoch)// 记录任务开始时间taskStart = System.currentTimeMillis()// 记录任务开始时的cpu时间taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0Lvar threwException = trueval value = try {// 运行Taskval res = task.run(taskAttemptId = taskId,attemptNumber = attemptNumber,metricsSystem = env.metricsSystem)threwException = falseres} finally {val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()if (freedMemory > 0 && !threwException) {val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {throw new SparkException(errMsg)} else {logWarning(errMsg)}}if (releasedLocks.nonEmpty && !threwException) {val errMsg =s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +releasedLocks.mkString("[", ", ", "]")if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false)) {throw new SparkException(errMsg)} else {logWarning(errMsg)}}}// 记录任务结束时间val taskFinish = System.currentTimeMillis()// 记录任务结束时的cpu时间val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0L// 若task在运行中被kill了// 则抛出异常if (task.killed) {throw new TaskKilledException}val resultSer = env.serializer.newInstance()// 结果记录序列化开始的系统时间val beforeSerialization = System.currentTimeMillis()// 序列化结果val valueBytes = resultSer.serialize(value)// 结果记录序列化完成的系统时间val afterSerialization = System.currentTimeMillis()// 反序列话发生在两个地方:// 1. 在该函数下反序列化Task信息以及Task实例。// 2. 在任务启动后,Task.run 反序列化 RDD 和 函数// 计算task的反序列化费时task.metrics.setExecutorDeserializeTime((taskStart - deserializeStartTime) + task.executorDeserializeTime)// 计算task的反序列化cpu费时task.metrics.setExecutorDeserializeCpuTime((taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime)// 计算task运行费时task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)// 计算task运行cpu费时task.metrics.setExecutorCpuTime((taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime)// 计算GC时间task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)//计算结果序列化时间 task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization)val accumUpdates = task.collectAccumulatorUpdates()// 这里代码存在缺陷:// value相当于被序列化了两次val directResult = new DirectTaskResult(valueBytes, accumUpdates)val serializedDirectResult = ser.serialize(directResult)// 得到结果的大小val resultSize = serializedDirectResult.limit// 对于计算结果,会根据结果的大小有不同的策略:// 1.生成结果在(正无穷,1GB):// 超过1GB的部分结果直接丢弃,// 可以通过spark.driver.maxResultSize实现// 默认为1G// 2.生成结果大小在$[1GB,128MB - 200KB]// 会把该结果以taskId为编号存入BlockManager中,// 然后把该编号通过Netty发送给Driver,// 该阈值是Netty框架传输的最大值// spark.akka.frameSize(默认为128MB)和Netty的预留空间reservedSizeBytes(200KB)的差值// 3.生成结果大小在(128MB - 200KB,0):// 直接通过Netty发送到Driverval serializedResult: ByteBuffer = {if (maxResultSize > 0 && resultSize > maxResultSize) {logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +s"dropping it.")ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))} else if (resultSize > maxDirectResultSize) {val blockId = TaskResultBlockId(taskId)env.blockManager.putBytes(blockId,new ChunkedByteBuffer(serializedDirectResult.duplicate()),StorageLevel.MEMORY_AND_DISK_SER)logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))} else {logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")serializedDirectResult}}// 更新execBackend 状态execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)} catch {case ffe: FetchFailedException =>val reason = ffe.toTaskFailedReasonsetTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))case _: TaskKilledException =>logInfo(s"Executor killed $taskName (TID $taskId)")setTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))case _: InterruptedException if task.killed =>logInfo(s"Executor interrupted and killed $taskName (TID $taskId)")setTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))case CausedBy(cDE: CommitDeniedException) =>val reason = cDE.toTaskFailedReasonsetTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))case t: Throwable =>logError(s"Exception in $taskName (TID $taskId)", t)val accums: Seq[AccumulatorV2[_, _]] =if (task != null) {task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)task.collectAccumulatorUpdates(taskFailed = true)} else {Seq.empty}val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))val serializedTaskEndReason = {try {ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))} catch {case _: NotSerializableException =>ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))}}setTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)if (Utils.isFatalError(t)) {SparkUncaughtExceptionHandler.uncaughtException(t)}} finally {// 任务结束后移除runningTasks.remove(taskId)}}
  • 1

Executor.updateDependencies

接下来,我们来看看更新executor的依赖,即下载任务所需的File和加载所需的Jar包:

  private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)synchronized {// 下载任务所需的Filefor ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {logInfo("Fetching " + name + " with timestamp " + timestamp)Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,env.securityManager, hadoopConf, timestamp, useCache = !isLocal)currentFiles(name) = timestamp}// 加载所需的Jar包for ((name, timestamp) <- newJars) {val localName = name.split("/").lastval currentTimeStamp = currentJars.get(name).orElse(currentJars.get(localName)).getOrElse(-1L)if (currentTimeStamp < timestamp) {logInfo("Fetching " + name + " with timestamp " + timestamp)Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,env.securityManager, hadoopConf, timestamp, useCache = !isLocal)currentJars(name) = timestamp// 把它加入到 class loaderval url = new File(SparkFiles.getRootDirectory(), localName).toURI.toURLif (!urlClassLoader.getURLs().contains(url)) {logInfo("Adding " + url + " to class loader")urlClassLoader.addURL(url)}}}}}
  • 1

Task.run

接下来,我们来看看这篇博文最核心的部分——task运行:

  final def run(taskAttemptId: Long,attemptNumber: Int,metricsSystem: MetricsSystem): T = {SparkEnv.get.blockManager.registerTask(taskAttemptId)//创建TaskContextImplcontext = new TaskContextImpl(stageId,partitionId,taskAttemptId,attemptNumber,taskMemoryManager,localProperties,metricsSystem,metrics)//在TaskContext中设置TaskContextImplTaskContext.setTaskContext(context)taskThread = Thread.currentThread()if (_killed) {kill(interruptThread = false)}new CallerContext("TASK", appId, appAttemptId, jobId, Option(stageId), Option(stageAttemptId),Option(taskAttemptId), Option(attemptNumber)).setCurrentContext()try {// 调用runTaskrunTask(context)} catch {case e: Throwable =>try {context.markTaskFailed(e)} catch {case t: Throwable =>e.addSuppressed(t)}throw e} finally {// 标记Task完成context.markTaskCompleted()try {Utils.tryLogNonFatalError {// 释放内存SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP)val memoryManager = SparkEnv.get.memoryManagermemoryManager.synchronized { memoryManager.notifyAll() }}} finally {//取消TaskContext设置TaskContext.unset()}}}
  • 1

Task有两个子类,一个是非最后的Stage的Task,ShuffleMapTask;一个是最后的Stage的Task,ResultTask。它们都覆盖了Task的runTask方法,接下来我们就分别来讲下它们的runTask方法。

ShuffleMapTask.runTask

根据每个Stage的partition数量来生成ShuffleMapTask,ShuffleMapTask会根据下游的Partition数量和Shuffle的策略来生成一系列文件。

  override def runTask(context: TaskContext): MapStatus = {val threadMXBean = ManagementFactory.getThreadMXBean// 记录反序列化开始时间val deserializeStartTime = System.currentTimeMillis()// 记录反序列化开始时的Cpu时间val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0Lval ser = SparkEnv.get.closureSerializer.newInstance()// 反序列化rdd 及其 依赖val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)// 计算 反序列化费时_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime// 计算 反序列化Cpu费时_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime} else 0Lvar writer: ShuffleWriter[Any, Any] = nulltry {//获取shuffleManagerval manager = SparkEnv.get.shuffleManager// writerwriter = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)// 调用writer.write 开始计算RDD,// 这部分 我们会在后续博文讲解writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])// 停止计算,并返回结果writer.stop(success = true).get} catch {case e: Exception =>try {if (writer != null) {writer.stop(success = false)}} catch {case e: Exception =>log.debug("Could not stop writer", e)}throw e}}
  • 1

ResultTask.runTask

  override def runTask(context: TaskContext): U = {val threadMXBean = ManagementFactory.getThreadMXBean// 记录反序列化开始时间val deserializeStartTime = System.currentTimeMillis()// 记录反序列化开始时的Cpu时间val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0Lval ser = SparkEnv.get.closureSerializer.newInstance()// 反序列化rdd 及其 作用于RDD的结果函数val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)// 计算 反序列化费时_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime// 计算 反序列化Cpu费时_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime} else 0L// 这部分 我们会在后续博文讲解func(context, rdd.iterator(partition, context))}

后续处理

计量统计

对各个费时的统计,上章已经讲解。

回收内存

这在上章Task.run也已经讲解。

处理执行结果

Executor.TaskRunner.run的execBackend.statusUpdate在《深入理解Spark 2.1 Core (四):运算结果处理和容错的原理与源码分析 》中我们已经讲解过。

深入理解Spark 2.1 Core (七):Standalone模式任务执行的原理与源码分析相关推荐

  1. 深入理解Spark 2.1 Core (一):RDD的原理与源码分析

    摘要 本文提出了分布式内存抽象的概念--弹性分布式数据集(RDD,Resilient Distributed Datasets),它具备像MapReduce等数据流模型的容错特性,并且允许开发人员在大 ...

  2. 深入理解Spark 2.1 Core (八):Standalone模式容错及HA的原理与源码分析

    第五.第六.第七篇博文,我们讲解了Standalone模式集群是如何启动的,一个App起来了后,集群是如何分配资源,Worker启动Executor的,Task来是如何执行它,执行得到的结果如何处理, ...

  3. 深入理解Spark 2.1 Core (十一):Shuffle Reduce 端的原理与源码分析

    我们曾经在<深入理解Spark 2.1 Core (一):RDD的原理与源码分析 >讲解过: 为了有效地实现容错,RDD提供了一种高度受限的共享内存,即RDD是只读的,并且只能通过其他RD ...

  4. 深入理解Spark 2.1 Core (六):Standalone模式运行的原理与源码分析

    我们讲到了如何启动Master和Worker,还讲到了如何回收资源.但是,我们没有将AppClient是如何启动的,其实它们的启动也涉及到了资源是如何调度的.这篇博文,我们就来讲一下AppClient ...

  5. 深入理解Spark 2.1 Core (十二):TimSort 的原理与源码分析

    在博文<深入理解Spark 2.1 Core (十):Shuffle Map 端的原理与源码分析 >中我们提到了: 使用Sort等对数据进行排序,其中用到了TimSort 这篇博文我们就来 ...

  6. 深入理解Spark 2.1 Core (十):Shuffle Map 端的原理与源码分析

    在上一篇<深入理解Spark 2.1 Core (九):迭代计算和Shuffle的原理与源码分析>提到经过迭代计算后, SortShuffleWriter.write中: // 根据排序方 ...

  7. 深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析

    概述 前几篇博文都在介绍Spark的调度,这篇博文我们从更加宏观的调度看Spark,讲讲Spark的部署模式.Spark部署模式分以下几种: local 模式 local-cluster 模式 Sta ...

  8. 深入理解Spark 2.1 Core (二):DAG调度器的原理与源码分析

    概述 上一篇<深入理解Spark(一):RDD实现及源码分析 >提到: 定义RDD之后,程序员就可以在动作(注:即action操作)中使用RDD了.动作是向应用程序返回值,或向存储系统导出 ...

  9. 深入理解GO语言:GC原理及源码分析

    Go 中的runtime 类似 Java的虚拟机,它负责管理包括内存分配.垃圾回收.栈处理.goroutine.channel.切片(slice).map 和反射(reflection)等.Go 的可 ...

最新文章

  1. visual studio code 里调试运行 Python代码
  2. SQL系列ROLLUP关键字
  3. linux 小白启航之路-搭建linuxDHCP中继服务器
  4. php图片添加角标,分享一段html在消息按钮上增加数量角标的实现代码
  5. mac 安装nodejs_阿里开源——用于前端和nodejs的轻量级任务管理和构建工具Dawn
  6. Android学习之在Eclipse看源代码的技巧
  7. ERP实施--常见问题
  8. xss挖掘思路分享_视频分享:XSS的利用与挖掘
  9. 超好用的开源 IP 地址管理系统,告别传统 Excel 统计方式!
  10. 吴恩达 深度学习 2021版 作业
  11. 一个PHP的QRcode类,与大家分享
  12. 同城小程序 30.0 完整版源码(含全部插件)
  13. [Git 1]基本操作与协同开发
  14. 贫穷不可怕,贫穷的思维才最可怕
  15. 【Java位运算】n1和n>>1含义
  16. 【数学建模】常用微分方程模型 + 详细手写公式推导 + Matlab代码实现
  17. vivoS7e和oppoa72的区别 哪个更值得入手
  18. markdown笔记(源码)
  19. W10安装Hyper-V
  20. 中国的古人写文字,是从右向左竖向排版的。本题就请你编写程序,把一段文字按古风排版。

热门文章

  1. 简洁易懂:c:out标签详解
  2. c语言入门教程文库,C语言入门教程(全集)课件
  3. python笔记之if练习
  4. java set iterator_Java中的TreeSet的iterator()方法 Java.util.TreeSet.iterator() - Break易站
  5. 沃舍尔算法_[数据结构拾遗]图的最短路径算法
  6. 短域名php,php生成短域名函数_PHP教程
  7. python统计字典里面value出现的次数_python统计字典中元素出现的次数
  8. bat 启动 不弹出对话框_CAD中转换出的PDF文件模糊要怎么办
  9. 语言里怎么防误输_育儿知识|我们的孩子为什么会怕输?
  10. sort list java leetcode_[LeetCode] 148. Sort List Java