上篇文章我们讲解了master调度driver和executor(application)的过程,但是对于executor在worker上的创建过程没有讲,这里我们接着上篇文章继续讲。

首先我们从worker接收到master发送过来的LaunchExecutor事件开始,源码如下:

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, resources_) =>//验证当前masterurl是否有效if (masterUrl != activeMasterUrl) {logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")} else {try {logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))// 创建executor工作目录val executorDir = new File(workDir, appId + "/" + execId)if (!executorDir.mkdirs()) {throw new IOException("Failed to create directory " + executorDir)}// 创建application目录,在application执行结束后由worker删除val appLocalDirs = appDirectories.getOrElse(appId, {val localRootDirs = Utils.getOrCreateLocalRootDirs(conf)val dirs = localRootDirs.flatMap { dir =>try {val appDir = Utils.createDirectory(dir, namePrefix = "executor")Utils.chmod700(appDir)Some(appDir.getAbsolutePath())} catch {case e: IOException =>logWarning(s"${e.getMessage}. Ignoring this directory.")None}}.toSeqif (dirs.isEmpty) {throw new IOException("No subfolder can be created in " +s"${localRootDirs.mkString(",")}.")}dirs})//存储application和其目录的对应关系appDirectories(appId) = appLocalDirs//重点:创建ExecutorRunner对象并启动val manager = new ExecutorRunner(appId,execId,appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),cores_,memory_,self,workerId,webUi.scheme,host,webUi.boundPort,publicAddress,sparkHome,executorDir,workerUri,conf,appLocalDirs,ExecutorState.LAUNCHING,resources_)executors(appId + "/" + execId) = managermanager.start()//executor启动后,当前worker被使用的资源都加上executor所占用的coresUsed += cores_memoryUsed += memory_addResourcesUsed(resources_)} catch {case e: Exception =>logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)if (executors.contains(appId + "/" + execId)) {executors(appId + "/" + execId).kill()executors -= appId + "/" + execId}sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,Some(e.toString), None))}}

可以看到,executor的创建主要是创建一个ExecutorRunner对象并调用start启动,创建对象没有什么可说的,我们直接追踪其start方法,源码如下:

  private[worker] def start(): Unit = {workerThread = new Thread("ExecutorRunner for " + fullId) {//重点:通过新建一个线程专门用于创建executoroverride def run(): Unit = { fetchAndRunExecutor() }}workerThread.start()// 钩子函数处理一些收尾操作shutdownHook = ShutdownHookManager.addShutdownHook { () =>//到这一步后,Executor应该早就结束了构建发起状态,如果还是发起态,说明Executor出现了异常,所以设置其状态为失败态if (state == ExecutorState.LAUNCHING) {state = ExecutorState.FAILED}killProcess(Some("Worker shutting down")) }}

在ExecutorRunner的启动方法中,会创建一个java线程,通过该线程创建executor。接下来我们再来看看这个线程里面的操作内容,如下:

private def fetchAndRunExecutor(): Unit = {try {//准备环境val resourceFileOpt = prepareResourcesFile(SPARK_EXECUTOR_PREFIX, resources, executorDir)// 拼接命令参数val arguments = appDesc.command.arguments ++ resourceFileOpt.map(f =>Seq("--resourcesFile", f.getAbsolutePath)).getOrElse(Seq.empty)val subsOpts = appDesc.command.javaOpts.map {Utils.substituteAppNExecIds(_, appId, execId.toString)}val subsCommand = appDesc.command.copy(arguments = arguments, javaOpts = subsOpts)val builder = CommandUtils.buildProcessBuilder(subsCommand, new SecurityManager(conf),memory, sparkHome.getAbsolutePath, substituteVariables)//构建命令  val command = builder.command()val redactedCommand = Utils.redactCommandLineArgs(conf, command.asScala).mkString("\"", "\" \"", "\"")logInfo(s"Launch command: $redactedCommand")//环境准备builder.directory(executorDir)builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")// Add webUI log urlsval baseUrl =if (conf.get(UI_REVERSE_PROXY)) {s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="} else {s"$webUiScheme$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="}builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")//发起命令,此时会有一个新的运行态进程,这个进程就是spark具体执行任务的executor  process = builder.start()//重点一:将启动命令输出到stderr文件流中(这里之所以标记重点,是因为这块代码不好debug查看,但是我们可以去日志中查看其具体执行的命令是什么,这样我们也可以推测出后续的处理流程)val header = "Spark Executor Command: %s\n%s\n\n".format(redactedCommand, "=" * 40)// 重定向输出流val stdout = new File(executorDir, "stdout")stdoutAppender = FileAppender(process.getInputStream, stdout, conf)val stderr = new File(executorDir, "stderr")Files.write(header, stderr, StandardCharsets.UTF_8)stderrAppender = FileAppender(process.getErrorStream, stderr, conf)//修改Executor状态为运行态state = ExecutorState.RUNNING//通知当前worker此Executor的状态变化worker.send(ExecutorStateChanged(appId, execId, state, None, None))// 线程阻塞,等待executor的退出val exitCode = process.waitFor()// 阻塞结束,说明executor退出了state = ExecutorState.EXITEDval message = "Command exited with code " + exitCode//再次向worker通知其状态变化worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))} catch {case interrupted: InterruptedException =>logInfo("Runner thread for executor " + fullId + " interrupted")state = ExecutorState.KILLEDkillProcess(None)case e: Exception =>logError("Error running executor", e)state = ExecutorState.FAILEDkillProcess(Some(e.toString))}}

可以看到executor是通过命令专门发起的一个进程,因为debug比较困难,所以我们没法实时看到发起的细节,但是好在其会打印出执行命令,我们通过命令也可以继续追踪后续的操作。所以在日志打印那行源码我也加了个重点标识。

这一步其实还有一个比较重要的操作,就是通知当前worker此Executor的状态变化,worker接收该事件后会先通知Master,随后自己有一系列的记录和相关的处理操作,不过由于这块代码比较简单易懂,且和我们的源码阅读目的关联不大,所以这就不专门介绍。

下面我们详细看下executor的命令构建及其后续操作:

重点:打印executor构建命令

我们到stderr中查看下启动命令:

这个命令很简单,主要是调用CoarseGrainedExecutorBackend这个类,我们知道jvm调用一个类时,程序的入口就是main函数,所以我们去看下该类的main函数源码:

  def main(args: Array[String]): Unit = {//定义一个变量函数,其会创建CoarseGrainedExecutorBackend 对象val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,arguments.resourcesFileOpt, resourceProfile)}//核心逻辑run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)System.exit(0)}

main方法中需要留意的是定义一个变量函数,这个函数在调用时会创建CoarseGrainedExecutorBackend 对象,随后是调用该对象的run方法,我们继续深入看下:

def run(arguments: Arguments,backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>CoarseGrainedExecutorBackend): Unit = {Utils.initDaemon(log)//获取driver的rpc客户端SparkHadoopUtil.get.runAsSparkUser { () =>Utils.checkHost(arguments.hostname)val executorConf = new SparkConfval fetcher = RpcEnv.create("driverPropsFetcher",arguments.bindAddress,arguments.hostname,-1,executorConf,new SecurityManager(executorConf),numUsableCores = 0,clientMode = true)var driver: RpcEndpointRef = nullval nTries = 3for (i <- 0 until nTries if driver == null) {try {driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)} catch {case e: Throwable => if (i == nTries - 1) {throw e}}}//获取driver的配置和属性val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(arguments.resourceProfileId))val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", arguments.appId))fetcher.shutdown()// 根据我们从driver获取的信息构建SparkEnvval driverConf = new SparkConf()for ((key, value) <- props) {if (SparkConf.isExecutorStartupConf(key)) {driverConf.setIfMissing(key, value)} else {driverConf.set(key, value)}}cfg.hadoopDelegationCreds.foreach { tokens =>SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)}driverConf.set(EXECUTOR_ID, arguments.executorId)val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)//重点:创建Executor节点,而这个节点就是CoarseGrainedExecutorBackend对象env.rpcEnv.setupEndpoint("Executor",backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))//创建worker监控节点,用于worker和executor间的信息交互(具体不是很明白)arguments.workerUrl.foreach { url =>env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))}env.rpcEnv.awaitTermination()}}

run方法中主要是根据driver的配置创建SparkEnv对象,随后创建了Executor节点,这是一个RPC节点,可以接受TaskScheduler发布过来的任务。如下:

至此,关于Executor的启动过程我们已经知道了,但是我们还不知道Executor的执行本质是什么,所以我们接着看下Executor RPC节点启动的时候会有哪些操作。

  override def onStart(): Unit = {logInfo("Connecting to driver: " + driverUrl)try {//资源准备_resources = parseOrFindResources(resourcesFileOpt)} catch {case NonFatal(e) =>exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)}rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>driver = Some(ref)//重点一:向Driver发送RegisterExecutor事件ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,extractAttributes, _resources, resourceProfile.id))}(ThreadUtils.sameThread).onComplete {case Success(_) =>//重点二:向driver注册成功Executor后,向自己发送Executor节点self.send(RegisteredExecutor)case Failure(e) =>exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)}(ThreadUtils.sameThread)}

这块的代码比较简单,首先是准备资源,其次是分别向Driver和自己本身发送Executor的注册事件。下面我们来分别看下:

重点一:向Driver发送RegisterExecutor事件

如果看过我上篇文章的小伙伴看到这肯定会对上篇文章遗留的问题有所触动,那我们就来看下吧:

注意看,这次driver是CoarseGrainedSchedulerBackend对象,并不是我们再driver期间创建的三个节点之一,但是查看下继承关系可以知道,该对象被StandaloneSchedulerBackend对象继承。到这基本可以和之前的内容连上了。我们接着看下事件的大致处理流程:

case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,attributes, resources, resourceProfileId) =>//如果当前executor以及被注册过,则返回异常if (executorDataMap.contains(executorId)) {context.sendFailure(new IllegalStateException(s"Duplicate executor ID: $executorId"))//如果当前executor被设置为黑名单,则同样返回异常} else if (scheduler.nodeBlacklist.contains(hostname) ||isBlacklisted(executorId, hostname)) {logInfo(s"Rejecting $executorId as it has been blacklisted.")context.sendFailure(new IllegalStateException(s"Executor is blacklisted: $executorId"))} else {//获取executor的地址信息val executorAddress = if (executorRef.address != null) {executorRef.address} else {context.senderAddress}logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")//记录executor地址和executorId的映射关系addressToExecutorId(executorAddress) = executorIdtotalCoreCount.addAndGet(cores)totalRegisteredExecutors.addAndGet(1)val resourcesInfo = resources.map{ case (k, v) =>(v.name,new ExecutorResourceInfo(v.name, v.addresses,taskResourceNumParts.getOrElse(v.name, 1)))}//封装executor的信息为一个对象val data = new ExecutorData(executorRef, executorAddress, hostname,0, cores, logUrlHandler.applyPattern(logUrls, attributes), attributes,resourcesInfo, resourceProfileId)//记录当前executor的信息CoarseGrainedSchedulerBackend.this.synchronized {executorDataMap.put(executorId, data)if (currentExecutorIdCounter < executorId.toInt) {currentExecutorIdCounter = executorId.toInt}if (numPendingExecutors > 0) {numPendingExecutors -= 1logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")}}listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))context.reply(true)}

可以看到,StandaloneSchedulerBackend接收到Executor的注册信息后,将记录Executor的信息,不过这还不能够完全解答我们上一章的遗留问题,要想解决需要阅读task的任务提交源码才行,这个我们放到后面章节讲解,但是这里可能也是其铺垫之一,所以这块也要留心下。

重点二:向driver注册成功Executor后,向自己发送Executor节点

到目前为止Executor的启动基本已经结束,但是Executor的执行本质我们还没看到,而这是整体Executor构建流程的最后一步了,所以我们的答案就在这块,首先还是先看下源码:

case RegisteredExecutor =>logInfo("Successfully registered with driver")try {//重点:创建一个Executor对象,其内部封装一个线程池,Task任务也是提交到它的线程池运行executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,resources = _resources)//想driver发送LaunchedExecutor时间driver.get.send(LaunchedExecutor(executorId))} catch {case NonFatal(e) =>exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)}

这块的代码只有两步,一个是创建Executor对象,一个是通知driver Executor已经启动好了,随时可以接受task任务。通知driver没什么好讲的,几乎是老套路,这里不再过多介绍,我们来看下Executor的线程池信息:

Executor有一个私有的成员属性,他就是我们常用的缓冲线程池,所以到此为止,我们本章的源码阅读目的已经达成,我们了解了executor创建的完整流程,并且知道了executor执行的本质其实是用缓冲线程池执行任务。

总结:

1、SparkContext初始化时创建了三个核心RPC节点,分别是SchedulerBackend(实现类是StandaloneSchedulerBackend)、TaskScheduler(实现类是TaskSchedulerImpl)、StandaloneAppClient,原则上这三个RPC节点都处于driver运行期间,泛称上都可以称为driver节点

2、executor内部封装有缓冲线程池,所以task任务提交到executor执行的本质是最终使用线程池进行处理。

spark源码(四)executor在worker上的创建过程,executor本质是什么,是线程池吗?相关推荐

  1. 面试官系统精讲Java源码及大厂真题 - 39 经验总结:不同场景,如何使用线程池

    39 经验总结:不同场景,如何使用线程池 人的影响短暂而微弱,书的影响则广泛而深远. --普希金 引导语 ThreadPoolExecutor 初始化时,主要有如下几个参数: public Threa ...

  2. spark源码阅读总纲

    spark使用了这么长时间,对于driver.master.worker.BlockManage.RDD.DAGScheduler.TaskScheduler这些概念或多或少都了解一些,但是对于其任务 ...

  3. spark源码(三)spark 如何进行driver、executor任务的调度,以及executor向driver的注册

    上篇文章我们介绍到,SparkContext初始化的时候,在TaskScheduler启动阶段会向Master发送RegisterApplication事件,下面我们进入Master主类,从它接收到该 ...

  4. 在Windows上编译Spark源码

    原文转自http://my.oschina.net/u/1452001/blog/344067?fromerr=1iS9kYnS 在本机(Windows 8)上安装spark玩一玩,Spark的Qui ...

  5. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  6. Spark源码和调优简介 Spark Core

    作者:calvinrzluo,腾讯 IEG 后台开发工程师 本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了 ...

  7. Spark源码分析之九:内存管理模型

    Spark是现在很流行的一个基于内存的分布式计算框架,既然是基于内存,那么自然而然的,内存的管理就是Spark存储管理的重中之重了.那么,Spark究竟采用什么样的内存管理模型呢?本文就为大家揭开Sp ...

  8. Apache Spark源码走读之6 -- 存储子系统分析

    Spark计算速度远胜于Hadoop的原因之一就在于中间结果是缓存在内存而不是直接写入到disk,本文尝试分析Spark中存储子系统的构成,并以数据写入和数据读取为例,讲述清楚存储子系统中各部件的交互 ...

  9. spark 源码分析之二十 -- Stage的提交

    引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...

最新文章

  1. java培训分享:java软件开发可以用哪些软件?
  2. 从零开始一起学习SLAM | 掌握g2o顶点编程套路
  3. Spring(24)——自定义BeanDefinitionRegistryPostProcessor
  4. 安卓虚拟机_安卓虚拟机(*New*)v1.1.31去广告/去推荐/Mod/精简/VIP版
  5. 十三、开多线程,咱们一起来斗图
  6. .NET Core加解密实战系列之——RSA非对称加密算法
  7. Python学习 :面向对象 -- 成员修饰符
  8. LeetCode7——Reverse Integer(将一个整数反转,注意溢出的处理)
  9. c 与java性能测试_JNI只C性能测试
  10. 使用Asp.net MVC源代码调试你的应用程序
  11. cannot import name 'StrictRedis' from 'redis'
  12. firefox插件使用感受
  13. 什么是二进制数?二进制数是如何表示计算机信息的?
  14. 深度学习中的感受野计算
  15. mysql创建索引降序_Mysql中的降序索引底层实现
  16. HTML中添加超链接、音频标签、视频标签、内嵌框架标签
  17. 180天如何突击高考2-从465到378...
  18. SSO(single sign on)模式 单点登录
  19. 我的人生只适合黑夜,第一天
  20. libevent实现http server

热门文章

  1. 面试题:js拍平多维数组
  2. 史上最详细的Android Studio系列教程(一)--下载和安装
  3. 基于VUE的电商系统的设计与实现
  4. 知识点: Java FutureTask 使用详解
  5. iOS 播放网络音乐
  6. selenium 中隐藏元素如何定位?
  7. 树脂胶的全球与中国市场2022-2028年:技术、参与者、趋势、市场规模及占有率研究报告
  8. mysql 字段名字有空格_mysql 字段名字包含空格引发的问题
  9. 计算机网络应用ppt课件图片,川教版信息技术八下第1课《计算机网络及应用》PPT课件10.ppt...
  10. C语言项目—学生成绩管理系统(完结)