Task执行

在上一节中,我们提到在Driver端CoarseGrainedSchedulerBackend中的launchTasks方法向Worker节点中的Executor发送启动任务命令,该命令的接收者是CoarseGrainedExecutorBackend(Standalone模式),类定义源码如下:

private[spark] class CoarseGrainedExecutorBackend(override val rpcEnv: RpcEnv,driverUrl: String,executorId: String,hostPort: String,cores: Int,userClassPath: Seq[URL],env: SparkEnv)extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {

可以看到它继承ThreadSafeRpcEndpoint,它ThreadSafeRpcEndpoint中的receive方法进行了实现,具体源代码如下:

override def receive: PartialFunction[Any, Unit] = {case RegisteredExecutor =>logInfo("Successfully registered with driver")val (hostname, _) = Utils.parseHostPort(hostPort)executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)case RegisterExecutorFailed(message) =>logError("Slave registration failed: " + message)System.exit(1)//处理Driver端发送过来的LaunchTask命令case LaunchTask(data) =>if (executor == null) {logError("Received LaunchTask command but executor was null")System.exit(1)} else {//对任务进行反序列化val taskDesc = ser.deserialize[TaskDescription](data.value)logInfo("Got assigned task " + taskDesc.taskId)//Executor启动任务的运行executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,taskDesc.name, taskDesc.serializedTask)}case KillTask(taskId, _, interruptThread) =>if (executor == null) {logError("Received KillTask command but executor was null")System.exit(1)} else {executor.killTask(taskId, interruptThread)}case StopExecutor =>logInfo("Driver commanded a shutdown")executor.stop()stop()rpcEnv.shutdown()}

从前面的代码可以看到,通过 executor.launchTask方法启动Worker节点上Task的运行,其源码如下:

//Executor类中的launchTask方法
def launchTask(context: ExecutorBackend,taskId: Long,attemptNumber: Int,taskName: String,serializedTask: ByteBuffer): Unit = {//创建TaskRunnerval tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,serializedTask)runningTasks.put(taskId, tr)//线程池执行TaskRunner线程,该线程中有一个run方法,完成Task的执行threadPool.execute(tr)}

TaskRunner是一个线程,它是一个内部类,被定义在org.apache.spark.executor.Executor类当中,具体源码如下:

 class TaskRunner(execBackend: ExecutorBackend,val taskId: Long,val attemptNumber: Int,taskName: String,serializedTask: ByteBuffer)extends Runnable {/** Whether this task has been killed. */@volatile private var killed = false/** How much the JVM process has spent in GC when the task starts to run. */@volatile var startGCTime: Long = _/*** The task to run. This will be set in run() by deserializing the task binary coming* from the driver. Once it is set, it will never be changed.*/@volatile var task: Task[Any] = _def kill(interruptThread: Boolean): Unit = {logInfo(s"Executor is trying to kill $taskName (TID $taskId)")killed = trueif (task != null) {task.kill(interruptThread)}}override def run(): Unit = {val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager)val deserializeStartTime = System.currentTimeMillis()Thread.currentThread.setContextClassLoader(replClassLoader)val ser = env.closureSerializer.newInstance()logInfo(s"Running $taskName (TID $taskId)")//向Driver端发状态更新execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)var taskStart: Long = 0startGCTime = computeTotalGcTime()try {val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)updateDependencies(taskFiles, taskJars)task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)task.setTaskMemoryManager(taskMemoryManager)// If this task has been killed before we deserialized it, let's quit now. Otherwise,// continue executing the task.if (killed) {// Throw an exception rather than returning, because returning within a try{} block// causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl// exception will be caught by the catch block, leading to an incorrect ExceptionFailure// for the task.throw new TaskKilledException}logDebug("Task " + taskId + "'s epoch is " + task.epoch)env.mapOutputTracker.updateEpoch(task.epoch)// Run the actual task and measure its runtime.taskStart = System.currentTimeMillis()var threwException = trueval (value, accumUpdates) = try {//执行Task的run方法,不同的Task有不同的实现,例如ShuffleMapTask及ResultTask有各自的实现val res = task.run(taskAttemptId = taskId,attemptNumber = attemptNumber,metricsSystem = env.metricsSystem)threwException = falseres} finally {val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()if (freedMemory > 0) {val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) {throw new SparkException(errMsg)} else {logError(errMsg)}}}val taskFinish = System.currentTimeMillis()// If the task has been killed, let's fail it.if (task.killed) {throw new TaskKilledException}val resultSer = env.serializer.newInstance()val beforeSerialization = System.currentTimeMillis()val valueBytes = resultSer.serialize(value)val afterSerialization = System.currentTimeMillis()for (m <- task.metrics) {// Deserialization happens in two parts: first, we deserialize a Task object, which// includes the Partition. Second, Task.run() deserializes the RDD and function to be run.m.setExecutorDeserializeTime((taskStart - deserializeStartTime) + task.executorDeserializeTime)// We need to subtract Task.run()'s deserialization time to avoid double-countingm.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)m.setJvmGCTime(computeTotalGcTime() - startGCTime)m.setResultSerializationTime(afterSerialization - beforeSerialization)m.updateAccumulators()}val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)val serializedDirectResult = ser.serialize(directResult)val resultSize = serializedDirectResult.limit// directSend = sending directly back to the driverval serializedResult: ByteBuffer = {if (maxResultSize > 0 && resultSize > maxResultSize) {logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +s"dropping it.")ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))} else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {val blockId = TaskResultBlockId(taskId)env.blockManager.putBytes(blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))} else {logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")serializedDirectResult}}//执行完成后,通知Driver端进行状态更新execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)} catch {case ffe: FetchFailedException =>val reason = ffe.toTaskEndReasonexecBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))case _: TaskKilledException | _: InterruptedException if task.killed =>logInfo(s"Executor killed $taskName (TID $taskId)")execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))case cDE: CommitDeniedException =>val reason = cDE.toTaskEndReasonexecBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))case t: Throwable =>// Attempt to exit cleanly by informing the driver of our failure.// If anything goes wrong (or this was a fatal exception), we will delegate to// the default uncaught exception handler, which will terminate the Executor.logError(s"Exception in $taskName (TID $taskId)", t)val metrics: Option[TaskMetrics] = Option(task).flatMap { task =>task.metrics.map { m =>m.setExecutorRunTime(System.currentTimeMillis() - taskStart)m.setJvmGCTime(computeTotalGcTime() - startGCTime)m.updateAccumulators()m}}val serializedTaskEndReason = {try {ser.serialize(new ExceptionFailure(t, metrics))} catch {case _: NotSerializableException =>// t is not serializable so just send the stacktraceser.serialize(new ExceptionFailure(t, metrics, false))}}//任务失败时,同样进行状态更新,方便后期任务重运行execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)// Don't forcibly exit unless the exception was inherently fatal, to avoid// stopping other tasks unnecessarily.if (Utils.isFatalError(t)) {SparkUncaughtExceptionHandler.uncaughtException(t)}} finally {//从运行任务列表中删除runningTasks.remove(taskId)}}}

Task run方法负责Task的执行,其源码如下:

 /*** Called by [[Executor]] to run this task.** @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext.* @param attemptNumber how many times this task has been attempted (0 for the first attempt)* @return the result of the task along with updates of Accumulators.*/final def run(taskAttemptId: Long,attemptNumber: Int,metricsSystem: MetricsSystem): (T, AccumulatorUpdates) = {//任务运行环境信息context = new TaskContextImpl(stageId,partitionId,taskAttemptId,attemptNumber,taskMemoryManager,metricsSystem,internalAccumulators,runningLocally = false)TaskContext.setTaskContext(context)context.taskMetrics.setHostname(Utils.localHostName())context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators)taskThread = Thread.currentThread()if (_killed) {kill(interruptThread = false)}try {//调用runTask方法执行,不同的任务其实现不同,例如ShuffleMapTask和ResultTask其runTask方法逻辑不同(runTask(context), context.collectAccumulators())} finally {context.markTaskCompleted()try {Utils.tryLogNonFatalError {// Release memory used by this thread for shufflesSparkEnv.get.shuffleMemoryManager.releaseMemoryForThisTask()}Utils.tryLogNonFatalError {// Release memory used by this thread for unrolling blocksSparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()}} finally {TaskContext.unset()}}}

以ResultTask为例,其runTask方法源码如下:

//ResultTask中的runTask方法override def runTask(context: TaskContext): U = {// Deserialize the RDD and the func using the broadcast variables.val deserializeStartTime = System.currentTimeMillis()val ser = SparkEnv.get.closureSerializer.newInstance()//反序列化rdd及执行函数val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTimemetrics = Some(context.taskMetrics)//执行rdd.iterator方法,完成任务的计算func(context, rdd.iterator(partition, context))}

总结一下Task的执行过程:
1 调用Driver端org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend中的launchTasks
2 调用Worker端的org.apache.spark.executor.CoarseGrainedExecutorBackend.launchTask
3 执行org.apache.spark.executor.TaskRunner线程中的run方法
4 调用org.apache.spark.scheduler.Task.run方法
5 调用org.apache.spark.scheduler.ResultTask.runTask方法
6 调用org.apache.spark.rdd.RDD.iterator方法

Spark修炼之道(高级篇)——Spark源码阅读:第八节 Task执行相关推荐

  1. Linux驱动修炼之道-SPI驱动框架源码分析(上)

    Linux驱动修炼之道-SPI驱动框架源码分析(上)   SPI协议是一种同步的串行数据连接标准,由摩托罗拉公司命名,可工作于全双工模式.相关通讯设备可工作于m/s模式.主设备发起数据帧,允许多个从设 ...

  2. 命令构建gradle项目_【Android 修炼手册】Gradle 篇 -- Gradle 源码分析

    预备知识 理解 gradle 的基本开发 了解 gradle task 和 plugin 使用及开发 了解 android gradle plugin 的使用 看完本文可以达到什么程度 了解 grad ...

  3. 【Android 修炼手册】Gradle 篇 -- Gradle 源码分析

    预备知识 理解 gradle 的基本开发 了解 gradle task 和 plugin 使用及开发 了解 android gradle plugin 的使用 看完本文可以达到什么程度 了解 grad ...

  4. 【源码篇】源码阅读集合

    怎么读达到什么目的 Spring源码 Redis源码 JDK源码 集合源码

  5. springmvc源码阅读3--dispatcherServlet reqeust的执行流程

    一.前言 太多的时候总是会遇到群里面问,报404怎么肥事呀,前台的参数怎么后台收不到呀--,上次就在群里面遇到过,围绕这一个点:input的name值是不是错了呀,人家妹子都截了好几次图说没有问题,然 ...

  6. Linux驱动修炼之道-SPI驱动框架源码分析(中)

    来自:http://blog.csdn.NET/woshixingaaa/article/details/6574220 这篇来分析spi子系统的建立过程. 嵌入式微处理器访问SPI设备有两种方式:使 ...

  7. 大神手把手教源码阅读的方法、误区以及三种境界

    丁威 中间件兴趣圈 读完需要 1 分钟 速读仅需 1 分钟 在技术职场中普遍存在如下几种现象: 对待工作中所使用的技术不需要阅读源码,只需在开发过程中能够熟练运用就行 看源码太费时间,而且容易忘记,如 ...

  8. Flume-NG源码阅读之SourceRunner,及选择器selector和拦截器interceptor的执行

    在AbstractConfigurationProvider类中loadSources方法会将所有的source进行封装成SourceRunner放到了Map<String, SourceRun ...

  9. 12.源码阅读(app启动流程-android api 26)

    activity的启动流程之前已经通过源码了解了,那么app的启动流程是怎样的,从我们按下app的图标,到应用启动起来显示出画面,中间都经历了什么? 安卓是基于java的,所以和java有一定的相似性 ...

  10. Golang流媒体实战之六:lal拉流服务源码阅读

    欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos <Golang流媒体实战>系列的链接 体验 ...

最新文章

  1. Vue中数组赋值问题
  2. HDLBits 系列(32)Sequence recognition(序列检测)
  3. 【模板】折线分割平面
  4. cs231n 学习笔记(5)——神经网络part1:建立神经网络架构
  5. Javascript(6)
  6. java swing运行没反应_java – 无法从命令行运行swing
  7. 设计模式之UML类图
  8. python程序在函数内执行得更快
  9. 【Python】处理ConvergenceWarning: lbfgs failed to converge (status=1):STOP: ...
  10. 计算机中间层怎么解决,电脑中间层服务器地址怎么看
  11. python提示AttributeError: 'NoneType' object has no attribute 'append'
  12. 尾纤SC、ST、FC、LC区分
  13. 【更新】互联网公司可投之参考
  14. 设计模式四:用一个生产手机的简单例子说清楚工厂模式
  15. SSM毕设项目毕业生就业推荐平台s0m59(java+VUE+Mybatis+Maven+Mysql)
  16. java针刺治疗尿潴留,针刺治疗尿潴留52例疗效观察
  17. Mission Planner中级应用(APM或PIX飞控)4——无人机APM飞控硬件故障简单维修(上)
  18. 计算机博士5篇sci,研究员送5篇SCI论文给女博士 SCI论文到底是啥
  19. 无线mesh网络路由协议分类
  20. 4P和4C的物理学分析

热门文章

  1. Linux下PHP5.5编译参数详解
  2. VB API 之 第七课 字体应用四
  3. 05-不借用第三个变量实现两个变量值互换(运算符)
  4. Luogu4494 [HAOI2018]反色游戏 【割顶】
  5. 每天一个Linux命令(3):ls命令
  6. Subversion代码提交中的org.apache.subversion.javahl.ClientException: svn: E200007: Commit failed异常解决...
  7. IDEA创建maven项目之后无法编写java类
  8. [HDU1754]I Hate It线段树裸题
  9. 通过代码控制ArcGIS Server的服务
  10. 上次遗留下来的XMLUtil的问题