我们讲到了如何启动Master和Worker,还讲到了如何回收资源。但是,我们没有将AppClient是如何启动的,其实它们的启动也涉及到了资源是如何调度的。这篇博文,我们就来讲一下AppClient的启动和逻辑与物理上的资源调度。

启动AppClient

调用栈如下:

  • StandaloneSchedulerBackend.start

    • StandaloneAppClient.start

      • StandaloneAppClient.ClientEndpoint.onStart

        • StandaloneAppClient.registerWithMaster

          • StandaloneAppClient.tryRegisterAllMasters
  • Master.receive
    • Master.createApplication
    • Master.registerApplication
  • StandaloneAppClient.ClientEndpoint.receive

StandaloneSchedulerBackend.start

在Standalone模式下,SparkContext中的backend是StandaloneSchedulerBackend。在StandaloneSchedulerBackend.start中可以看到:

***val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)val initialExecutorLimit =if (Utils.isDynamicAllocationEnabled(conf)) {Some(0)} else {None}val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)//创建AppClientclient = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)//启动AppClientclient.start()***
  • 1
  • 8

StandaloneAppClient.start

  def start() {//生成了ClientEndpoint,于是调用其onStartendpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))}
  • 1

StandaloneAppClient.ClientEndpoint.onStart

调用registerWithMaster

    override def onStart(): Unit = {try {registerWithMaster(1)} catch {case e: Exception =>logWarning("Failed to connect to master", e)markDisconnected()stop()}}

StandaloneAppClient.registerWithMaster

private def registerWithMaster(nthRetry: Int) {//向所有的Master注册当前App//一旦成功连接的一个master,其他将被取消registerMasterFutures.set(tryRegisterAllMasters())registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {override def run(): Unit = {if (registered.get) {registerMasterFutures.get.foreach(_.cancel(true))registerMasterThreadPool.shutdownNow()} //若达到最大尝试次数,则标志死亡,默认为3else if (nthRetry >= REGISTRATION_RETRIES) {markDead("All masters are unresponsive! Giving up.")} else {registerMasterFutures.get.foreach(_.cancel(true))registerWithMaster(nthRetry + 1)}}}, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))}
  • 1

StandaloneAppClient.tryRegisterAllMasters

给Master发送RegisterApplication信号:

    private def tryRegisterAllMasters(): Array[JFuture[_]] = {for (masterAddress <- masterRpcAddresses) yield {registerMasterThreadPool.submit(new Runnable {override def run(): Unit = try {if (registered.get) {return}logInfo("Connecting to master " + masterAddress.toSparkURL + "...")val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)masterRef.send(RegisterApplication(appDescription, self))} catch {case ie: InterruptedException => // Cancelledcase NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)}})}}
  • 1
  • 17

Master.receive

Master.receive接收并处理RegisterApplication信号

    case RegisterApplication(description, driver) =>// 若之前注册过if (state == RecoveryState.STANDBY) {// 忽略} else {logInfo("Registering app " + description.name)//创建appval app = createApplication(description, driver)//注册appregisterApplication(app)logInfo("Registered app " + description.name + " with ID " + app.id)//持久化persistenceEngine.addApplication(app)//回复RegisteredApplication信号driver.send(RegisteredApplication(app.id, self))//资源调度schedule()}
  • 1
  • 13

让我们深入来看下Master是如何注册app的。

Master.createApplication

先创建app:

  private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef):ApplicationInfo = {val now = System.currentTimeMillis()val date = new Date(now)//根据日期生成appIdval appId = newApplicationId(date)//传入 时间,appId, 描述信息, 日期, driver, 默认核数,//生成app信息new ApplicationInfo(now, appId, desc, date, driver, defaultCores)}
  • 1

Master.registerApplication

再注册app:

  private def registerApplication(app: ApplicationInfo): Unit = {//若已有这个app地址,//则返回val appAddress = app.driver.addressif (addressToApp.contains(appAddress)) {logInfo("Attempted to re-register application at same address: " + appAddress)return}//向 applicationMetricsSystem 注册appSourceapplicationMetricsSystem.registerSource(app.appSource)//将app加入到 集合//HashSet[ApplicationInfo]apps += app//更新 id到App //HashMap[String, ApplicationInfo]idToApp(app.id) = app//更新 endpoint到App// HashMap[RpcEndpointRef, ApplicationInfo]endpointToApp(app.driver) = app//更新 address到App// HashMap[RpcAddress, ApplicationInfo]addressToApp(appAddress) = app// 加入到等待数组中//ArrayBuffer[ApplicationInfo]waitingApps += appif (reverseProxy) {webUi.addProxyTargets(app.id, app.desc.appUiUrl)}}
  • 1

StandaloneAppClient.ClientEndpoint.receive

      case RegisteredApplication(appId_, masterRef) =>//这里的代码有两个缺陷://1. 一个Master可能接收到多个注册请求,// 并且回复多个RegisteredApplication信号,//这会导致网络不稳定。//2.若master正在变化,//则会接收到多个RegisteredApplication信号//设置appIdappId.set(appId_)//编辑已经注册registered.set(true)//创建master信息master = Some(masterRef)//绑定监听listener.connected(appId.get)
  • 11

逻辑资源调度

我们可以看到在上一章,Master.receive接收并处理RegisterApplication信号时的最后一行代码:

        //资源调度schedule()
  • 1

下面,我们就来讲讲资源调度。

调用栈如下:

  • Master.schedule

    • Master.startExecutorsOnWorkers

      • Master.scheduleExecutorsOnWorkers
      • Master.allocateWorkerResourceToExecutors

Master.schedule

该方法主要来在等待的app之间调度资源。每次有新的app加入或者可用资源改变的时候,这个方法都会被调用:

  private def schedule(): Unit = {if (state != RecoveryState.ALIVE) {return}// 得到活的Worker,// 并打乱它们val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))// worker数量val numWorkersAlive = shuffledAliveWorkers.sizevar curPos = 0//为driver分配资源,//该调度策略为FIFO的策略,//先来的driver会先满足其资源所需的条件for (driver <- waitingDrivers.toList) { var launched = falsevar numWorkersVisited = 0while (numWorkersVisited < numWorkersAlive && !launched) {val worker = shuffledAliveWorkers(curPos)numWorkersVisited += 1if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {launchDriver(worker, driver)waitingDrivers -= driverlaunched = true}curPos = (curPos + 1) % numWorkersAlive}}//启动worker上的executorstartExecutorsOnWorkers()}
  • 1
  • 2
  • 2

Master.startExecutorsOnWorkers

接下来我们来看下executor的启动:

 private def startExecutorsOnWorkers(): Unit = {// 这里还是使用的FIFO的调度方式for (app <- waitingApps if app.coresLeft > 0) {val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor// 过滤掉资源不够启动executor的worker// 并按资源逆序排序val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE).filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&worker.coresFree >= coresPerExecutor.getOrElse(1)).sortBy(_.coresFree).reverse//调度worker上的executor,//确定在每个worker上给这个app分配多少核val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)//分配for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {allocateWorkerResourceToExecutors(app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))}}}

Master.scheduleExecutorsOnWorkers

接下来我们就来讲讲核心的worker上的executor资源调度。在将现在的Spark代码之前,我们看看在Spark1.4之前,这部分逻辑是如何实现的:

***val numUsable = usableWorkers.length// 用来记录每个worker已经分配的核数val assigned = new Array[Int](numUsable) var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)var pos = 0while (toAssign > 0) {//遍历worker,//若当前worker还存在资源,//则分配掉1个核。//直到workers的资源全都被分配掉,//或者是app所需要的资源被满足。if (usableWorkers(pos).coresFree - assigned(pos) > 0) {toAssign -= 1assigned(pos) += 1}pos = (pos + 1) % numUsable}
***
  • 1

在Spark1.4的时候,这段代码被修改了。我们来想一下,以上代码有什么问题?

问题就在于,core是一个一个的被分配的。设想,一个集群中有4 worker,每个worker有16个core。用户想启动3个executor,且每个executor拥有16个core。于是,他会这样配置参数:

spark.cores.max = 48
spark.executor.cores = 16

显然,我们集群的资源是能满足用户的需求的。但如果一次只能分配一个core,那最终的结果是每个worker上都分配了12个core。由于12 < 16, 所有没有一个executor能够启动。

下面,我们回过头来看现在的源码中是如何实现这部分逻辑的:

  private def scheduleExecutorsOnWorkers(app: ApplicationInfo,usableWorkers: Array[WorkerInfo],spreadOutApps: Boolean): Array[Int] = {val coresPerExecutor = app.desc.coresPerExecutorval minCoresPerExecutor = coresPerExecutor.getOrElse(1)val oneExecutorPerWorker = coresPerExecutor.isEmptyval memoryPerExecutor = app.desc.memoryPerExecutorMBval numUsable = usableWorkers.length// 用来记录每个worker已经分配的核数val assignedCores = new Array[Int](numUsable) // 用来记录每个worker已经分配的executor数val assignedExecutors = new Array[Int](numUsable)// 剩余总共资源 var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)//判断是否能启动Executordef canLaunchExecutor(pos: Int): Boolean = {//先省略}//过滤去能启动executor的Workervar freeWorkers = (0 until numUsable).filter(canLaunchExecutor)//调度资源,//直到worker上的executor被分配完while (freeWorkers.nonEmpty) {freeWorkers.foreach { pos =>var keepScheduling = truewhile (keepScheduling && canLaunchExecutor(pos)) {// minCoresPerExecutor 是用户设置的 spark.executor.corescoresToAssign -= minCoresPerExecutorassignedCores(pos) += minCoresPerExecutor// 若用户没有设置 spark.executor.cores// 则oneExecutorPerWorker就为True// 也就是说,assignedCores中的core都被一个executor使用// 若用户设置了spark.executor.cores,// 则该Worker的assignedExecutors会加1if (oneExecutorPerWorker) {assignedExecutors(pos) = 1} else {assignedExecutors(pos) += 1}//资源分配算法有两种:// 1. 尽量打散,将一个app尽可能的分配到不同的节点上,// 这有利于充分利用集群的资源,// 在这种情况下,spreadOutApps设为True,// 于是,该worker分配好了一个executor之后就退出循环// 轮询到下一个worker// 2. 尽量集中,将一个app尽可能的分配到同一个的节点上,// 这适合cpu密集型而内存占用比较少的app// 在这种情况下,spreadOutApps设为False,// 于是,继续下一轮的循环// 在该Worker上分配executorif (spreadOutApps) {keepScheduling = false}}}freeWorkers = freeWorkers.filter(canLaunchExecutor)}assignedCores}
  • 1
  • 4

接下来看下该函数的内部函数canLaunchExecutor:

    def canLaunchExecutor(pos: Int): Boolean = {// 条件1 :若集群剩余core >= spark.executor.coresval keepScheduling = coresToAssign >= minCoresPerExecutor// 条件2: 若该Worker上的剩余core >= spark.executor.coresval enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor// 条件3: 若设置了spark.executor.cores // 或者 该Worker还未分配executorval launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0if (launchingNewExecutor) {val assignedMemory = assignedExecutors(pos) * memoryPerExecutor// 条件4:若该Worker上的剩余内存 >= spark.executor.memoryval enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor// 条件5: 若分配了该executor后,// 总共分配的core数量 <= spark.cores.maxval underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit//若满足 条件3,//且满足 条件1,条件2,条件4,条件5//则返回TruekeepScheduling && enoughCores && enoughMemory && underLimit} else {//若不满足 条件3,//即一个worker只有一个executor//且满足 条件1,条件2//也返回True。// 返回后,不会增加 assignedExecutorskeepScheduling && enoughCores}}
  • 1
  • 2

通过以上源码,我们可以清楚看到,Spark1.4以后新的逻辑实现其实就是将分配单位从原来的一个core,变为了一个executor(即spark.executor.cores)。而若一个worker上只有一个executor(即没有设置spark.executor.cores),那么就按照原来的逻辑实现。

值得我注意的是:

    //直到worker上的executor被分配完while (freeWorkers.nonEmpty) 

一个app会尽可能的使用掉集群的所有资源,所以设置spark.cores.max参数是非常有必要的!

Master.allocateWorkerResourceToExecutors

现在我们回到上述提到的Master.startExecutorsOnWorkers中,深入allocateWorkerResourceToExecutors:

  private def allocateWorkerResourceToExecutors(app: ApplicationInfo,assignedCores: Int,coresPerExecutor: Option[Int],worker: WorkerInfo): Unit = {// 该work上的executor数量// 若没设置 spark.executor.cores// 则为1val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)// 分配给一个executor的core数量// 若没设置 spark.executor.cores// 则为该worker上所分配的所有core是数量val coresToAssign = coresPerExecutor.getOrElse(assignedCores)for (i <- 1 to numExecutors) {//创建该executor信息//并把它加入到app信息中//并返回executor信息val exec = app.addExecutor(worker, coresToAssign)//启动launchExecutor(worker, exec)app.state = ApplicationState.RUNNING}}
  • 1
  • 9
  • 10

要注意的是

app.state = ApplicationState.RUNNING

这句代码并不是将该app从waitingApp队列中去除。若在该次资源调度中该app并没有启动足够的executor,等到集群资源变化时,会再次资源调度,在waitingApp中遍历到该app,其coresLeft > 0。

for (app <- waitingApps if app.coresLeft > 0)

我们这里做一个实验:

  • 我们的实验集群是4*8核的集群:

  • 第1个app,我们申请4个executor,该executor为4个core:
spark-shell --master spark://cdh03:7077 --total-executor-cores 4 --executor-cores 4

可以看到集群资源:

app1的executor:

  • 第2个app,我们申请4个executor,该executor为6个core:
spark-shell --master spark://cdh03:7077 --total-executor-cores 24 --executor-cores 6

可以看到集群资源:

app2的executor:

我们可以看到,Spark只为app2分配了3个executor。

  • 当我们把app1退出

会发现集群资源状态:

app2的executor:

会发现新增加了一个“ worker-20170102151129”的executor。

其实,只要集群中的app没结束,它们都会在waitingApps中,当该app结束时,才会将这个app从waitingApps中移除

  def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
***waitingApps -= app
***
}

物理资源调度与启动Executor

接下来,我们就来讲逻辑上资源调度完后,该如何物理上资源调度,即启动Executor。

调用栈如下:

  • Master.launchExecutor
  • Worker.receive
    • ExecutorRunner.start

      • ExecutorRunner.fetchAndRunExecutor
  • CoarseGrainedExecutorBackend.main
    • CoarseGrainedExecutorBackend.run

      • CoarseGrainedExecutorBackend.onStart
  • CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply
  • CoarseGrainedExecutorBackend.receive

Master.launchExecutor

  private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)//在worker信息中加入executor信息worker.addExecutor(exec)//给worker发送LaunchExecutor信号worker.endpoint.send(LaunchExecutor(masterUrl,exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))//给driver发送ExecutorAdded信号exec.application.driver.send(ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))}

Worker.receive

worker接收到LaunchExecutor信号后处理:

    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>if (masterUrl != activeMasterUrl) {logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")} else {try {logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))// 创建executor的工作目录// shuffle持久化结果会存在这个目录下// 节点应每块磁盘大小尽可能相同// 并在配置中在每块磁盘上都设置SPARK_WORKER_DIR,// 以增加IO性能val executorDir = new File(workDir, appId + "/" + execId)if (!executorDir.mkdirs()) {throw new IOException("Failed to create directory " + executorDir)}// 为app创建本地dir// app完成后,此目录会被删除val appLocalDirs = appDirectories.getOrElse(appId,Utils.getOrCreateLocalRootDirs(conf).map { dir =>val appDir = Utils.createDirectory(dir, namePrefix = "executor")Utils.chmod700(appDir)appDir.getAbsolutePath()}.toSeq)appDirectories(appId) = appLocalDirs//创建ExecutorRunnerval manager = new ExecutorRunner(appId,execId,appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),cores_,memory_,self,workerId,host,webUi.boundPort,publicAddress,sparkHome,executorDir,workerUri,conf,appLocalDirs, ExecutorState.RUNNING)executors(appId + "/" + execId) = manager//启动ExecutorRunnermanager.start()coresUsed += cores_memoryUsed += memory_// 向Master发送ExecutorStateChanged信号sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))} catch {case e: Exception =>logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)if (executors.contains(appId + "/" + execId)) {executors(appId + "/" + execId).kill()executors -= appId + "/" + execId}sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,Some(e.toString), None))}}
  • 1
  • 5

ExecutorRunner.start

接下来我们深入看下ExecutorRunner

  private[worker] def start() {//创建worker线程workerThread = new Thread("ExecutorRunner for " + fullId) {override def run() { fetchAndRunExecutor() }}//启动worker线程workerThread.start()// 创建Shutdownhook线程 // 用于worker关闭时,杀掉executorshutdownHook = ShutdownHookManager.addShutdownHook { () =>if (state == ExecutorState.RUNNING) {state = ExecutorState.FAILED}killProcess(Some("Worker shutting down")) }}

ExecutorRunner.fetchAndRunExecutor

workerThread执行主要是调用fetchAndRunExecutor,下面我们来看下这个方法:

  private def fetchAndRunExecutor() {try {// 创建进程builderval builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),memory, sparkHome.getAbsolutePath, substituteVariables)val command = builder.command()val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")logInfo(s"Launch command: $formattedCommand")//创建进程builder执行目录builder.directory(executorDir)//为进程builder设置环境变量builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")val baseUrl =if (conf.getBoolean("spark.ui.reverseProxy", false)) {s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="} else {s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="}builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")//启动进程builder,创建进程process = builder.start()val header = "Spark Executor Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)// 重定向它的stdout和stderr到文件中val stdout = new File(executorDir, "stdout")stdoutAppender = FileAppender(process.getInputStream, stdout, conf)val stderr = new File(executorDir, "stderr")Files.write(header, stderr, StandardCharsets.UTF_8)stderrAppender = FileAppender(process.getErrorStream, stderr, conf)// 等待进程退出。// 当driver通知该进程退出// executor会退出并返回0或者非0的exitCodeval exitCode = process.waitFor()state = ExecutorState.EXITEDval message = "Command exited with code " + exitCode// 给Worker发送ExecutorStateChanged信号worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))} catch {case interrupted: InterruptedException =>logInfo("Runner thread for executor " + fullId + " interrupted")state = ExecutorState.KILLEDkillProcess(None)case e: Exception =>logError("Error running executor", e)state = ExecutorState.FAILEDkillProcess(Some(e.toString))}}
}
  • 1
  • 1

CoarseGrainedExecutorBackend.main

builder start的是CoarseGrainedExecutorBackend实例进程,我们看下它的主函数:

  def main(args: Array[String]) {var driverUrl: String = nullvar executorId: String = nullvar hostname: String = nullvar cores: Int = 0var appId: String = nullvar workerUrl: Option[String] = Noneval userClassPath = new mutable.ListBuffer[URL]()// 设置参数var argv = args.toListwhile (!argv.isEmpty) {argv match {case ("--driver-url") :: value :: tail =>driverUrl = valueargv = tailcase ("--executor-id") :: value :: tail =>executorId = valueargv = tailcase ("--hostname") :: value :: tail =>hostname = valueargv = tailcase ("--cores") :: value :: tail =>cores = value.toIntargv = tailcase ("--app-id") :: value :: tail =>appId = valueargv = tailcase ("--worker-url") :: value :: tail =>workerUrl = Some(value)argv = tailcase ("--user-class-path") :: value :: tail =>userClassPath += new URL(value)argv = tailcase Nil =>case tail =>System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")printUsageAndExit()}}if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||appId == null) {printUsageAndExit()}//调用run方法run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)System.exit(0)}
  • 1
  • 29

CoarseGrainedExecutorBackend.run

  private def run(driverUrl: String,executorId: String,hostname: String,cores: Int,appId: String,workerUrl: Option[String],userClassPath: Seq[URL]) {Utils.initDaemon(log)SparkHadoopUtil.get.runAsSparkUser { () =>Utils.checkHost(hostname)val executorConf = new SparkConfval port = executorConf.getInt("spark.executor.port", 0)val fetcher = RpcEnv.create("driverPropsFetcher",hostname,port,executorConf,new SecurityManager(executorConf),clientMode = true)val driver = fetcher.setupEndpointRefByURI(driverUrl)// 给driver发送RetrieveSparkAppConfig信号,// 并根据返回的信息创建属性val cfg = driver.askWithRetry[SparkAppConfig](RetrieveSparkAppConfig)val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))fetcher.shutdown()// 根据这些属性来创建SparkEnvval driverConf = new SparkConf()for ((key, value) <- props) {if (SparkConf.isExecutorStartupConf(key)) {driverConf.setIfMissing(key, value)} else {driverConf.set(key, value)}}if (driverConf.contains("spark.yarn.credentials.file")) {logInfo("Will periodically update credentials from: " +driverConf.get("spark.yarn.credentials.file"))SparkHadoopUtil.get.startCredentialUpdater(driverConf)}val env = SparkEnv.createExecutorEnv(driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)// 创建CoarseGrainedExecutorBackend Endpointenv.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))// 创建WorkerWatcher Endpoint// 用来给worker发送心跳,// 告诉worker 这个进程还活着workerUrl.foreach { url =>env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))}env.rpcEnv.awaitTermination()SparkHadoopUtil.get.stopCredentialUpdater()}}
  • 46

CoarseGrainedExecutorBackend.onStart

new CoarseGrainedExecutorBackend 会调用CoarseGrainedExecutorBackend.onStart:

  override def onStart() {logInfo("Connecting to driver: " + driverUrl)rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>//向driver端发送RegisterExecutor信号driver = Some(ref)ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))}(ThreadUtils.sameThread).onComplete {case Success(msg) =>case Failure(e) =>exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)}(ThreadUtils.sameThread)}
  • 1

CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply

      case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>if (executorDataMap.contains(executorId)) {executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))context.reply(true)} else {// 设置executor信息val executorAddress = if (executorRef.address != null) {executorRef.address} else {context.senderAddress}logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")addressToExecutorId(executorAddress) = executorIdtotalCoreCount.addAndGet(cores)totalRegisteredExecutors.addAndGet(1)val data = new ExecutorData(executorRef, executorRef.address, hostname,cores, cores, logUrls)CoarseGrainedSchedulerBackend.this.synchronized {executorDataMap.put(executorId, data)if (currentExecutorIdCounter < executorId.toInt) {currentExecutorIdCounter = executorId.toInt}if (numPendingExecutors > 0) {numPendingExecutors -= 1logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")}}//向executor端发送RegisteredExecutor信号executorRef.send(RegisteredExecutor)context.reply(true)listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))makeOffers()}
  • 1

makeOffers()所做的逻辑,在《深入理解Spark 2.1 Core (三):任务调度器的原理与源码分析 》里已经讲解过。主要是调度任务,并想executor发送任务。

CoarseGrainedExecutorBackend.receive

CoarseGrainedExecutorBackend接收到来自driver的RegisteredExecutor信号后:

    case RegisteredExecutor =>logInfo("Successfully registered with driver")try {//创建executorexecutor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)} catch {case NonFatal(e) =>exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)}

至此,Executor就成功的启动了!

深入理解Spark 2.1 Core (六):Standalone模式运行的原理与源码分析相关推荐

  1. 深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析

    概述 前几篇博文都在介绍Spark的调度,这篇博文我们从更加宏观的调度看Spark,讲讲Spark的部署模式.Spark部署模式分以下几种: local 模式 local-cluster 模式 Sta ...

  2. 深入理解Spark 2.1 Core (十四):securityManager 类源码分析

    securityManager主要用于权限设置,比如在使用yarn作为资源调度框架时,用于生成secret key进行登录.该类默认只用一个实例,所以的app使用同一个实例,下面是该类的所有源代码: ...

  3. 深入理解Spark 2.1 Core (八):Standalone模式容错及HA的原理与源码分析

    第五.第六.第七篇博文,我们讲解了Standalone模式集群是如何启动的,一个App起来了后,集群是如何分配资源,Worker启动Executor的,Task来是如何执行它,执行得到的结果如何处理, ...

  4. 深入理解Spark 2.1 Core (七):Standalone模式任务执行的原理与源码分析

    这篇博文,我们就来讲讲Executor启动后,是如何在Executor上执行Task的,以及其后续处理. 执行Task 我们在<深入理解Spark 2.1 Core (三):任务调度器的原理与源 ...

  5. 深入理解Spark 2.1 Core (十二):TimSort 的原理与源码分析

    在博文<深入理解Spark 2.1 Core (十):Shuffle Map 端的原理与源码分析 >中我们提到了: 使用Sort等对数据进行排序,其中用到了TimSort 这篇博文我们就来 ...

  6. 深入理解Spark 2.1 Core (十一):Shuffle Reduce 端的原理与源码分析

    我们曾经在<深入理解Spark 2.1 Core (一):RDD的原理与源码分析 >讲解过: 为了有效地实现容错,RDD提供了一种高度受限的共享内存,即RDD是只读的,并且只能通过其他RD ...

  7. 深入理解Spark 2.1 Core (十):Shuffle Map 端的原理与源码分析

    在上一篇<深入理解Spark 2.1 Core (九):迭代计算和Shuffle的原理与源码分析>提到经过迭代计算后, SortShuffleWriter.write中: // 根据排序方 ...

  8. 深入理解Spark 2.1 Core (二):DAG调度器的原理与源码分析

    概述 上一篇<深入理解Spark(一):RDD实现及源码分析 >提到: 定义RDD之后,程序员就可以在动作(注:即action操作)中使用RDD了.动作是向应用程序返回值,或向存储系统导出 ...

  9. 深入理解GO语言:GC原理及源码分析

    Go 中的runtime 类似 Java的虚拟机,它负责管理包括内存分配.垃圾回收.栈处理.goroutine.channel.切片(slice).map 和反射(reflection)等.Go 的可 ...

最新文章

  1. linux 下去掉^M
  2. ubuntu分屏软件_Ubuntu 17.10安装终端分屏:Terminator终端终结者
  3. AndroidStudio 如何关闭 Install Run
  4. 泛型通用函数的一些特殊问题的解决方法
  5. 典型案例:TL的困局,自己忙不过来,团队没有结果
  6. PyTorch | torch.from_numpy使用方法 | torch.from_numpy如何使用?torch.from_numpy()例子 | 通过torch.from_numpy创建张量
  7. 在docker容器里创建ubuntu系统
  8. VmWare工作笔记001---弹出错误提示无法连接mks:套接字连接尝试次数太多
  9. 利润从‮而何‬来?​‎
  10. PIXI.JS兼容微信小游戏
  11. linux 内存管理_真香!Linux 原来是这么管理内存的
  12. Blocking waiting for file lock on package cache
  13. Javascript特效:商品橱窗
  14. DevOps使用教程 华为云(11)git分支怎么用 分支合并 评审
  15. 小学生python趣味编程-小学生C++趣味编程 PDF 全资料版
  16. 职工工资管理系统c语言,C++实现企业职工工资管理系统
  17. 拼写纠错原理以及模型(Spelling Correction model)
  18. java覆盖的概念_java中覆盖是什么意思?java方法覆盖的概念详解
  19. 半/全加器中的异或门和与门的应用
  20. KMIP协议/TTLV格式解码

热门文章

  1. C语言满分:L1-061 新胖子公式 (10分)
  2. InnoDB和MyISAM有哪些不同
  3. java会员卡的绑定和解绑_SpringMVC源码之参数解析绑定原理
  4. python sort函数排序_Python中排序常用到的sort 、sorted和argsort函数
  5. markdown的11个语法
  6. Linux Shell脚本 - 什么是Shell
  7. python的进阶之路_Python 从入门到进阶之路(三)
  8. turtle python tkinter_【案例】 什么?idle 中竟然有内置 turtle 样例?(paint)
  9. 二次探测再散列_杭州二次元影像测量仪
  10. showmodaldialog 为什么不能复制_防复制的门禁读头可以防止UID和FUID读卡器