给Executor划分好资源后,Worker就要按此来启动Executor了。资源划分完毕后,会返回每个Executor实得多少core的数组,然后就是循环可用的Worker节点,给Executor划分资源。

val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {//在worker中给Executor划分资源allocateWorkerResourceToExecutors(app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
}

根据数组,得知每个Executor要分几个core。循环去Worker节点挨个启动Executor

private def allocateWorkerResourceToExecutors(app: ApplicationInfo,assignedCores: Int,coresPerExecutor: Option[Int],worker: WorkerInfo): Unit = {// If the number of cores per executor is specified, we divide the cores assigned// to this worker evenly among the executors with no remainder.// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)//每个Executor要分配多少个coreval coresToAssign = coresPerExecutor.getOrElse(assignedCores)for (i <- 1 to numExecutors) {val exec: ExecutorDesc = app.addExecutor(worker, coresToAssign)//去worker中启动ExecutorlaunchExecutor(worker, exec)app.state = ApplicationState.RUNNING}
}

明确资源后,就给worker发送“LaunchExecutor”消息

//启动Executor
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)worker.addExecutor(exec)/***  获取Worker的通信邮箱,给Worker发送启动Executor【多少core和多少内存】*  在Worker中有receive 方法一直匹配 LaunchExecutor 类型*/worker.endpoint.send(LaunchExecutor(masterUrl,exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))exec.application.driver.send(ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}

worker的receive方法会接收并匹配到这条消息:

//启动Executor
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>if (masterUrl != activeMasterUrl) {logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")} else {try {logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))// Create the executor's working directoryval executorDir = new File(workDir, appId + "/" + execId)if (!executorDir.mkdirs()) {throw new IOException("Failed to create directory " + executorDir)}// Create local dirs for the executor. These are passed to the executor via the// SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the// application finishes.val appLocalDirs = appDirectories.getOrElse(appId, {val localRootDirs = Utils.getOrCreateLocalRootDirs(conf)val dirs = localRootDirs.flatMap { dir =>try {val appDir = Utils.createDirectory(dir, namePrefix = "executor")Utils.chmod700(appDir)Some(appDir.getAbsolutePath())} catch {case e: IOException =>logWarning(s"${e.getMessage}. Ignoring this directory.")None}}.toSeqif (dirs.isEmpty) {throw new IOException("No subfolder can be created in " +s"${localRootDirs.mkString(",")}.")}dirs})appDirectories(appId) = appLocalDirs//创建ExecutorRunnerval manager = new ExecutorRunner(appId,execId,/*** appDesc 中有 Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",.......) 中* 第一个参数就是Executor类*/appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),cores_,memory_,self,workerId,host,webUi.boundPort,publicAddress,sparkHome,executorDir,workerUri,conf,appLocalDirs, ExecutorState.RUNNING)executors(appId + "/" + execId) = manager/*** 启动ExecutorRunner* 启动的就是 CoarseGrainedExecutorBackend 类*/manager.start()coresUsed += cores_memoryUsed += memory_sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))} catch {case e: Exception =>logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)if (executors.contains(appId + "/" + execId)) {executors(appId + "/" + execId).kill()executors -= appId + "/" + execId}sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,Some(e.toString), None))}}

创建ExecutorRunner并start。从appDesc中拿出Command对象,包装有CoarseGrainedExecutorBackend。会触发到CoarseGrainedExecutorBackend的main方法的执行:

    //run方法run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)

会向rpcEnv注册Executor的EndPoint:

//注册Executor的通信邮箱,会调用CoarseGrainedExecutorBackend的onstart方法
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))

必然会触发CoarseGrainedExecutorBackend#onStart方法:

override def onStart() {logInfo("Connecting to driver: " + driverUrl)//从RPC中拿到Driver的引用,给Driver反向注册ExecutorrpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>// This is a very fast action so we can use "ThreadUtils.sameThread"//拿到Driver的引用driver = Some(ref)/*** 给Driver反向注册Executor信息,这里就是注册给之前看到的 CoarseGrainedSchedulerBackend 类中的DriverEndpoint* DriverEndpoint类中会有receiveAndReply 方法来匹配RegisterExecutor*/ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))}(ThreadUtils.sameThread).onComplete {// This is a very fast action so we can use "ThreadUtils.sameThread"case Success(msg) =>// Always receive `true`. Just ignore itcase Failure(e) =>exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)}(ThreadUtils.sameThread)
}

它会拿到Driver的url、Driver的引用,给Driver发“RegisterExecutor”消息。就是向Driver去反向注册。Driver就是:

rpcEnv.setupEndpoint(ENDPOING_NAME,createDriverEndpoint) —>new DriverEndpoint

DriverEndpoint的receiveAndReply方法会接收到:

//反向注册的Executor
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>if (executorDataMap.contains(executorId)) {executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))context.reply(true)} else if (scheduler.nodeBlacklist != null &&scheduler.nodeBlacklist.contains(hostname)) {// If the cluster manager gives us an executor on a blacklisted node (because it// already started allocating those resources before we informed it of our blacklist,// or if it ignored our blacklist), then we reject that executor immediately.logInfo(s"Rejecting $executorId as it has been blacklisted.")executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))context.reply(true)} else {// If the executor's rpc env is not listening for incoming connections, `hostPort`// will be null, and the client connection should be used to contact the executor.val executorAddress = if (executorRef.address != null) {executorRef.address} else {context.senderAddress}logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")addressToExecutorId(executorAddress) = executorIdtotalCoreCount.addAndGet(cores)totalRegisteredExecutors.addAndGet(1)val data = new ExecutorData(executorRef, executorAddress, hostname,cores, cores, logUrls)// This must be synchronized because variables mutated// in this block are read when requesting executorsCoarseGrainedSchedulerBackend.this.synchronized {executorDataMap.put(executorId, data)if (currentExecutorIdCounter < executorId.toInt) {currentExecutorIdCounter = executorId.toInt}if (numPendingExecutors > 0) {numPendingExecutors -= 1logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")}}/*** 拿到Execuotr的通信邮箱,发送消息给ExecutorRef 告诉 Executor已经被注册。* 在 CoarseGrainedExecutorBackend 类中 receive方法一直监听有没有被注册,匹配上就会启动Executor**/executorRef.send(RegisteredExecutor)// Note: some tests expect the reply to come after we put the executor in the mapcontext.reply(true)listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))makeOffers()}

封装了一堆的对象后,要给Executor发“RegisteredExecutor”。Executor的Endpoint就是CoarseGrainedSchedulerBackend,receive方法会接收到:

//匹配上Driver端发过来的消息,已经接受注册Executor了,下面要启动Executor
case RegisteredExecutor =>logInfo("Successfully registered with driver")try {//下面创建Executor,Executor真正的创建Executor,Executor中有线程池用于task运行【Executor中89行】executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)} catch {case NonFatal(e) =>exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)}

Driver接收到后,会new Executor,它里面由线程池threadpool,供task运行使用

Worker启动Executor源码相关推荐

  1. Java Executor源码解析(3)—ThreadPoolExecutor线程池execute核心方法源码【一万字】

    基于JDK1.8详细介绍了ThreadPoolExecutor线程池的execute方法源码! 上一篇文章中,我们介绍了:Java Executor源码解析(2)-ThreadPoolExecutor ...

  2. storm启动supervisor源码分析-supervisor.clj

    storm启动supervisor源码分析-supervisor.clj supervisor是storm集群重要组成部分,supervisor主要负责管理各个"工作节点".sup ...

  3. storm启动nimbus源码分析-nimbus.clj

    storm启动nimbus源码分析-nimbus.clj nimbus是storm集群的"控制器",是storm集群的重要组成部分.我们可以通用执行bin/storm nimbus ...

  4. 【Android 插件化】VirtualApp 源码分析 ( 启动应用源码分析 | HomePresenterImpl 启动应用方法 | VirtualCore 启动插件应用最终方法 )

    文章目录 一.启动应用源码分析 1.HomeActivity 启动应用点击方法 2.HomePresenterImpl 启动应用方法 3.VirtualCore 启动插件应用最终方法 一.启动应用源码 ...

  5. Spring Boot 2.x 启动全过程源码分析(全)

    上篇<Spring Boot 2.x 启动全过程源码分析(一)入口类剖析>我们分析了 Spring Boot 入口类 SpringApplication 的源码,并知道了其构造原理,这篇我 ...

  6. Spring Boot 2.x 启动全过程源码分析(上)入口类剖析

    转载自   Spring Boot 2.x 启动全过程源码分析(上)入口类剖析 Spring Boot 的应用教程我们已经分享过很多了,今天来通过源码来分析下它的启动过程,探究下 Spring Boo ...

  7. Executor源码解读

    Executor源码解读 〇.[源码版本] jdk 1.8 一.不再显式创建线程 [举例1]代码示例 二.不严格要求执行是异步的 [举例1]代码示例 三.任务在调用者线程之外的某个线程中执行 [举例1 ...

  8. idea用maven启动zookeeper源码

    在执行官网的java例子的时候,发现根本无法启动,后来发现是我的zookeeper就没编译好?试一下 idea编译启动zookeeper源码 修改配置文件 1 zoo.cfg 2 log4j.prop ...

  9. linux显示启动logo源码分析以及修改显示logo

    1.linux显示启动logo整个流程分析 (1)logo图片在内核源码中是以ppm格式的文件保存,在编译内核时会把ppm格式的文件自动转换成.c文件,在c文件中会构造一个struct linux_l ...

最新文章

  1. 同等学力计算机综合难吗,报考2018年同等学力申硕计算机在职研究生毕业很困难吗...
  2. 測试新浪微博@小冰 为代码机器人的一些方法
  3. ajax参数中有加号,浅谈在js传递参数中含加号(+)的处理方式
  4. 今天maven install时碰到的两个问题(堆溢出和编译错误)
  5. redis-Set集合操作SADD,SMEMBERS,scard,srem
  6. 台积电已开始试生产3nm芯片 率先为苹果、英特尔供货
  7. javascript 权威指南二
  8. UINavigationController 直接pop到指定controllerView的方法
  9. python3安装setuptools步骤_详解Python3中setuptools、Pip安装教程
  10. Python之深入解析Numpy的高级操作和使用
  11. php性格属于哪类,狗狗性格分为6大类,你家是属哪一类?快来是看聪明型还是粘人型...
  12. java基于for、while循环经典案例题(仅供参考)
  13. 使用ffmpeg来将mp4视频转换成gif格式图片
  14. 计算机无法计算,计算机无法计算到的F1最后一步
  15. 浅尝辄止_数学建模(笔记_系统(层次)聚类算法及其SPSS实现)
  16. Unity3D C#数学系列之矩阵基础
  17. 杨校老师课堂之集群内SSH免密登录功能配置
  18. python英文分句_教你如何对英文段落进行分句
  19. 认知学派用计算机来比拟人,心理学基础习题解答.doc
  20. 广告系统架构:要啥自行车!

热门文章

  1. bazel 链接第三方动态库_Linux 动态库与静态库制作及使用详解
  2. 原创,自己做的一个简单实用的提示小插件,兼容性很好,基本上都兼容!
  3. js如何生成[n,m]的随机数
  4. Laravel项目上线部署
  5. POJ 3982 序列 塔尔苏斯问题解决
  6. 数据结构和算法:递归和迭代算法示例
  7. UIMenuController的使用,对UILabel拷贝以及定制菜单
  8. Linux:echo、read、cat命令
  9. 架构之路--实战项目记录(二) 忘记数据库 开始抽象
  10. python:进程操作