本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。

  • Spark商业环境实战-Spark内置框架rpc通讯机制及RpcEnv基础设施
  • Spark商业环境实战-Spark事件监听总线流程分析
  • Spark商业环境实战-Spark存储体系底层架构剖析
  • Spark商业环境实战-Spark底层多个MessageLoop循环线程执行流程分析
  • Spark商业环境实战-Spark一级资源调度Shedule机制及SpreadOut模式源码深入剖析
  • Spark商业环境实战-Spark二级调度系统Stage划分算法和最佳任务调度细节剖析
  • Spark商业环境实战-Spark任务延迟调度及调度池Pool架构剖析
  • Spark商业环境实战-Task粒度的缓存聚合排序结构AppendOnlyMap详细剖析
  • Spark商业环境实战-ExternalSorter 外部排序器在Spark Shuffle过程中设计思路剖析
  • Spark商业环境实战-ShuffleExternalSorter外部排序器在Spark Shuffle过程中的设计思路剖析
  • Spark商业环境实战-Spark ShuffleManager内存缓冲器SortShuffleWriter设计思路剖析
  • Spark商业环境实战-Spark ShuffleManager内存缓冲器UnsafeShuffleWriter设计思路剖析
  • Spark商业环境实战-Spark ShuffleManager内存缓冲器BypassMergeSortShuffleWriter设计思路剖析
  • Spark商业环境实战-Spark Shuffle 核心组件BlockStoreShuffleReader内核原理深入剖析
  • Spark商业环境实战-Spark Shuffle 管理器SortShuffleManager内核原理深入剖析
  • Spark商业环境实战-StreamingContext启动流程及Dtream 模板源码剖析
  • Spark商业环境实战-ReceiverTracker 启动过程及接收器 receiver RDD 任务提交机制源码剖析
  • Spark商业环境实战-SparkStreaming数据流从Batch到Block定时转化过程源码深度剖析
  • Spark商业环境实战-SparkStreaming之JobGenerator周期性任务数据处理逻辑源码深度剖析
  • [Spark商业环境实战-SparkStreaming Graph 处理链迭代过程源码深度剖析]
  • [Spark商业环境实战-JobGenerator 数据清理流程源码深度剖析]
  • [Spark商业环境实战-SparkStreaming 容错机制源码深度剖析]
  • [Spark商业环境实战-SparkStreaming 之No Receiver方式基于Kafka 拉取内幕源码深度剖析]
  • [Spark商业环境实战-SparkStreaming 反压机制控制消费速率内幕源码深度剖析]

1 Shedule 在哪里?干什么?

Shedule()发生在Master中,那么Master都负责什么呢?可以看到只要发生以下任何事件,就会重新执行Shedule()

  • RegisterWorker
  • RegisterApplication
  • ExecutorStateChanged
  • RequestSubmitDriver
  • completeRecovery
  • relaunchDriver
  • removeApplication
  • handleRequestExecutors
  • handleKillExecutors
  • removeDriver
因此,可以看出所谓的一级资源调度,在Local-cluster部署模式和Standalone部署模式中,其实就是基于Master实现的资源调度,更确切的说是对Driver的资源调度和对Application(参数指定数量的Executor)的资源调度。

2 Master的天子王权(除Yarn资源和K8s容器编排)

Master是Local-cluster部署模式和Standalone部署模式中,最为核心的管理组件。Master将直接决定整个集群的可用性,容错性,可用性。可谓位于整个Spark集群中最重要,最核心的位置。职责如下:

  • Worker的管理
  • Application的管理
  • Driver的管理
  • 统一管理和分配集群中的资源(如内存和cpu)
  • 接收各个Worker的注册,状态更新,心跳
  • Driver和Application的注册

3 Driver 的前世今生?是什么?如何纳管?

  • Driver的诞生来源于Master接收到RequestSubmitDriver请求,那么RequestSubmitDriver来源于何处,这又要从SparkSubmit类说起,先上代码段,看看STANDALONE_CLUSTER_SUBMIT_CLASS,就从这里开始:

        private[deploy] val YARN_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication"   private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS ="org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"
    复制代码
  • 这里开始封装Spark-submit提交的各个参数,同时在StandAlone模式下,我们开始关注ClientEndpoint它是一个终端.

      // In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).// All Spark parameters are expected to be passed to the client through system properties.if (args.isStandaloneCluster) {if (args.useRest) {childMainClass = REST_CLUSTER_SUBMIT_CLASSchildArgs += (args.primaryResource, args.mainClass)} else {// In legacy standalone cluster mode, use Client as a wrapper around the user classchildMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS   <= 神来之笔ClientAppif (args.supervise) { childArgs += "--supervise" }Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }childArgs += "launch"childArgs += (args.master, args.primaryResource, args.mainClass)}if (args.childArgs != null) {childArgs ++= args.childArgs}}
    复制代码
  • ClientApp (ClientEndpoint) 开始向Master异步发送RequestSubmitDriver请求,也就是说:一次Spark-Submit提交,就会发送一次RequestSubmitDriver请求,进而生成一个资源申请的Driver。

     val driverDescription = new DriverDescription(driverArgs.jarUrl,driverArgs.memory,driverArgs.cores,driverArgs.supervise,command)asyncSendToMasterAndForwardReply[SubmitDriverResponse](RequestSubmitDriver(driverDescription))
    复制代码
  • Master接收到提交的资源申请,开始向自己的成员变量drivers中放入一个Driver,也即每一次任务提交的的资源申请驱动。

       case RequestSubmitDriver(description) =>if (state != RecoveryState.ALIVE) {val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +"Can only accept driver submissions in ALIVE state."context.reply(SubmitDriverResponse(self, false, None, msg))} else {logInfo("Driver submitted " + description.command.mainClass)val driver = createDriver(description)persistenceEngine.addDriver(driver)waitingDrivers += driverdrivers.add(driver)schedule()// TODO: It might be good to instead have the submission client poll the master to determine//       the current status of the driver. For now it's simply "fire and forget".context.reply(SubmitDriverResponse(self, true, Some(driver.id),s"Driver successfully submitted as ${driver.id}"))}
    复制代码
  • 封装资源申请实体DriverInfo

    private def createDriver(desc: DriverDescription): DriverInfo = {val now = System.currentTimeMillis()val date = new Date(now)new DriverInfo(now, newDriverId(date), desc, date)
    }
    复制代码
总结:Master的receiveAndReply方法接收ClientEndpoint发送的消息RequestSubmitDriver,将收到的Driver注册到waitingDrivers。基于此,才会有后面的基于Driver的一级资源调度。
  RequestSubmitDriver详情请参考这篇博客,比我的更详细。https://blog.csdn.net/u011564172/article/details/68496848
复制代码

4 Application 的前世今生?和Driver渊源?如何纳管?

  • 差一点就疯了,Application和Driver完全不是一个概念。Driver的诞生发生在Spark-submit阶段。而Application的诞生发生在DAG调度阶段,也即SparkContext实例化阶段。拼了非讲清不可。

  • Master 最终会根据Application的资源申请,把appDesc放入apps队列中,并对Application进行资源调度。

      val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)SparkContext:SparkContext -> StandaloneSchedulerBackend -> StandaloneAppClient.start() -> registerWithMaster -> masterRef.send(RegisterApplication(appDescription, self)) -> Master:-> apps += app ->shedule()[Driver启动后,调用startExecutorsOnWorkers()->allocateWorkerResourceToExecutors]
    复制代码
  • Master端点registerApplication

    private def registerApplication(app: ApplicationInfo): Unit = { val appAddress = app.driver.address if (addressToApp.contains(appAddress)) { logInfo("Attempted to re-register application at same address: " + appAddress) return }

      applicationMetricsSystem.registerSource(app.appSource)apps += appidToApp(app.id) = appendpointToApp(app.driver) = appaddressToApp(appAddress) = appwaitingApps += app
    }
    复制代码

5 Master的职责再讲

  • 首先集群启动之后,Worker会向Master注册,同时携带身份标识和资源情况(如ID,host,port,cpu核数,内存大小),那么这些资源交由Master纳管后,Master会按照一定的资源调度策略分配给Driver和Application。
  • Master给Driver分配完资源后,将会向Worker发送启动Driver命令,Worker接收到命令后,开始启动Driver。
  • Master给Application分配完资源后,将向Worker发送启动Executor命令,Worker接收到命令后,开始启动Executor。

6 Shedule()神秘面纱

6.1 Shedule 核心思想

  • 打乱洗牌存活的Worker,在Worker资源满足的情况下,启动Executor。

  • 神来之笔(Driver资源调度)==> launchDriver(worker, driver)

  • 神来之笔(Executor调度)==> startExecutorsOnWorkers()

        * Schedule the currently available resources among waiting apps. This method will be called* every time a new app joins or resource availability changes.private def schedule(): Unit = {if (state != RecoveryState.ALIVE) {return}// Drivers take strict precedence over executorsval shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))val numWorkersAlive = shuffledAliveWorkers.sizevar curPos = 0for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers// We assign workers to each waiting driver in a round-robin fashion. For each driver, we// start from the last worker that was assigned a driver, and continue onwards until we have// explored all alive workers.var launched = falsevar numWorkersVisited = 0while (numWorkersVisited < numWorkersAlive && !launched) {val worker = shuffledAliveWorkers(curPos)numWorkersVisited += 1if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {launchDriver(worker, driver)      <= 神来之笔(Driver资源调度)waitingDrivers -= driverlaunched = true}curPos = (curPos + 1) % numWorkersAlive}}startExecutorsOnWorkers()               <= 神来之笔(Executor调度)}
    复制代码

6.2 launchDriver 挖一挖

  • 发送到Worker开始启动driver

    private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {logInfo("Launching driver " + driver.id + " on worker " + worker.id)worker.addDriver(driver)driver.worker = Some(worker)worker.endpoint.send(LaunchDriver(driver.id, driver.desc))driver.state = DriverState.RUNNING
    }
    复制代码
  • Worker端的回馈

       case LaunchDriver(driverId, driverDesc) =>logInfo(s"Asked to launch driver $driverId")val driver = new DriverRunner(conf,driverId,workDir,sparkHome,driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),self,workerUri,securityMgr)drivers(driverId) = driverdriver.start()coresUsed += driverDesc.coresmemoryUsed += driverDesc.mem
    复制代码

6.3 startExecutorsOnWorkers 钻一钻

  • coresPerExecutor:参数设置的每一个Executor所使用的内核数,默认为1。

  • app.desc.memoryPerExecutorMB :参数设置的ExecutorMemory。

  • scheduleExecutorsOnWorkers :返回各个Worker上分配的内核数

  • allocateWorkerResourceToExecutors:

      private def startExecutorsOnWorkers(): Unit = {// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app// in the queue, then the second app, etc.for (app <- waitingApps) {val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1) <= 神来之笔(Worker资源情况判断)// If the cores left is less than the coresPerExecutor,the cores left will not be allocatedif (app.coresLeft >= coresPerExecutor) {// Filter out workers that don't have enough resources to launch an executorval usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE).filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&worker.coresFree >= coresPerExecutor).sortBy(_.coresFree).reverse                 <= 神来之笔(Worker资源情况判断)val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) <= 神来之笔// Now that we've decided how many cores to allocate on each worker, let's allocate themfor (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {allocateWorkerResourceToExecutors(app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos)) <= 神来之笔}}}}
    复制代码

6.3 scheduleExecutorsOnWorkers 较较真

   * Schedule executors to be launched on the workers.* Returns an array containing number of cores assigned to each worker.** There are two modes of launching executors. The first attempts to spread out an application's* executors on as many workers as possible, while the second does the opposite (i.e. launch them* on as few workers as possible). The former is usually better for data locality purposes and is* the default.** The number of cores assigned to each executor is configurable. When this is explicitly set,* multiple executors from the same application may be launched on the same worker if the worker* has enough cores and memory. Otherwise, each executor grabs all the cores available on the* worker by default, in which case only one executor per application may be launched on each* worker during one single schedule iteration.* Note that when `spark.executor.cores` is not set, we may still launch multiple executors from* the same application on the same worker. Consider appA and appB both have one executor running* on worker1, and appA.coresLeft > 0, then appB is finished and release all its cores on worker1,* thus for the next schedule iteration, appA launches a new executor that grabs all the free* cores on worker1, therefore we get multiple executors from appA running on worker1.** It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core* at a time). Consider the following example: cluster has 4 workers with 16 cores each.* User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is* allocated at a time, 12 cores from each worker would be assigned to each executor.* Since 12 < 16, no executors would launch [SPARK-8881].
复制代码
  • spreadOutApps 决定了Executor的分配是集中的,还是按照顺序分散的。

  • oneExecutorPerWorker :如果没有指定coresPerExecutor,那么就说明每一个Worker上只有一个Executor,否则就是多个

  • assignedCores(pos)是返回的数组,其中freeWorkers就是索引0,1,2。对应的可分配的Cores就会是指定Worker上能够分配的。

  • allocateWorkerResourceToExecutors:就是根据打散后的Worker索引,进行Executor的启动,玄机在于每一个Worker是否需要启动多个Executor

      private def scheduleExecutorsOnWorkers(app: ApplicationInfo,usableWorkers: Array[WorkerInfo],spreadOutApps: Boolean): Array[Int] = {val coresPerExecutor = app.desc.coresPerExecutorval minCoresPerExecutor = coresPerExecutor.getOrElse(1)val oneExecutorPerWorker = coresPerExecutor.isEmptyval memoryPerExecutor = app.desc.memoryPerExecutorMBval numUsable = usableWorkers.lengthval assignedCores = new Array[Int](numUsable) // Number of cores to give to each workerval assignedExecutors = new Array[Int](numUsable) // Number of new executors on each workervar coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)/** Return whether the specified worker can launch an executor for this app. */def canLaunchExecutor(pos: Int): Boolean = {val keepScheduling = coresToAssign >= minCoresPerExecutorval enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor// If we allow multiple executors per worker, then we can always launch new executors.// Otherwise, if there is already an executor on this worker, just give it more cores.val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0if (launchingNewExecutor) {val assignedMemory = assignedExecutors(pos) * memoryPerExecutorval enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutorval underLimit = assignedExecutors.sum + app.executors.size < app.executorLimitkeepScheduling && enoughCores && enoughMemory && underLimit} else {// We're adding cores to an existing executor, so no need// to check memory and executor limitskeepScheduling && enoughCores}}// Keep launching executors until no more workers can accommodate any// more executors, or if we have reached this application's limitsvar freeWorkers = (0 until numUsable).filter(canLaunchExecutor)while (freeWorkers.nonEmpty) {freeWorkers.foreach { pos =>var keepScheduling = truewhile (keepScheduling && canLaunchExecutor(pos)) {coresToAssign -= minCoresPerExecutorassignedCores(pos) += minCoresPerExecutor// If we are launching one executor per worker, then every iteration assigns 1 core// to the executor. Otherwise, every iteration assigns cores to a new executor.if (oneExecutorPerWorker) {assignedExecutors(pos) = 1} else {assignedExecutors(pos) += 1}// Spreading out an application means spreading out its executors across as// many workers as possible. If we are not spreading out, then we should keep// scheduling executors on this worker until we use all of its resources.// Otherwise, just move on to the next worker.if (spreadOutApps) {keepScheduling = false}}}freeWorkers = freeWorkers.filter(canLaunchExecutor)}assignedCores}
    复制代码

6.3 allocateWorkerResourceToExecutors 探究竟

  • 通知Worker根据Application的要求,也即根据应用提交时的要求,开始启动Executor。

      private def allocateWorkerResourceToExecutors(app: ApplicationInfo,assignedCores: Int,coresPerExecutor: Option[Int],worker: WorkerInfo): Unit = {// If the number of cores per executor is specified, we divide the cores assigned// to this worker evenly among the executors with no remainder.// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)val coresToAssign = coresPerExecutor.getOrElse(assignedCores)for (i <- 1 to numExecutors) {val exec = app.addExecutor(worker, coresToAssign)launchExecutor(worker, exec)app.state = ApplicationState.RUNNING}
    }
    复制代码
  • Master 终端发送 LaunchExecutor

      private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)worker.addExecutor(exec)worker.endpoint.send(LaunchExecutor(masterUrl,exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))exec.application.driver.send(ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))}
    复制代码

7 总结

至此,一级资源调度Shedule机制剖析完毕,真的是剖析的体无完肤啊。贴一张图,该休息了。因为已经0:18了。

秦凯新 于深圳 香港太平山全景 人定胜天

再见 2018 11 12

Spark一级资源调度Shedule机制及SpreadOut模式源码深入剖析相关推荐

  1. Spark内核(上)——附:两种Yarn模式源码解析

    文章目录 一.Spark内核概述 1.1 Spark核心组件回顾 1.1.1 Driver 1.1.2 Executor 1.2 Spark通用运行流程概述 二.Spark通信架构概述 2.1 Spa ...

  2. SpringMVC异常处理机制详解[附带源码分析]

    SpringMVC异常处理机制详解[附带源码分析] 参考文章: (1)SpringMVC异常处理机制详解[附带源码分析] (2)https://www.cnblogs.com/fangjian0423 ...

  3. 大数据Spark “蘑菇云”行动第103课:Hive源码大师之路第一步:Hive源码思考和解析初体验

    大数据Spark "蘑菇云"行动第103课:Hive源码大师之路第一步:Hive源码思考和解析初体验 老师上课使用的Hive源码下载地址:http://www-eu.apache. ...

  4. java spark淘宝大数据分析可视化系统(源码+数据+报告)

    下载地址:https://download.csdn.net/download/a13689028602/18298100 项目介绍 java spark淘宝大数据分析可视化系统(源码+数据+报告) ...

  5. 美团Leaf源码——号段模式源码解析

    前言 分布式ID生成策略基本要求就是全局不重复,最好还能递增,长度较短,性能高,可用性强.关于相关的实现方案有很多,本文着重使用美团开源的分布式ID生成解决方案--Leaf. 关于Leaf,美团官方的 ...

  6. Spark技术内幕:Stage划分及提交源码分析

    当触发一个RDD的action后,以count为例,调用关系如下: org.apache.spark.rdd.RDD#count org.apache.spark.SparkContext#runJo ...

  7. Java是如何实现自己的SPI机制的? JDK源码(一)

    注:该源码分析对应JDK版本为1.8 1 引言 这是[源码笔记]的JDK源码解读的第一篇文章,本篇我们来探究Java的SPI机制的相关源码. 2 什么是SPI机制 那么,什么是SPI机制呢? SPI是 ...

  8. Android Doze模式源码分析

    科技的仿生学无处不在,给予我们启发.为了延长电池是使用寿命,google从蛇的冬眠中得到体会,那就是在某种情况下也让手机进入类冬眠的情况,从而引入了今天的主题,Doze模式,Doze中文是打盹儿,打盹 ...

  9. android handler的机制和原理_一文搞懂handler:彻底明白Android消息机制的原理及源码

    提起Android消息机制,想必都不陌生.其中包含三个部分:Handler,MessageQueue以及Looper,三者共同协作,完成消息机制的运行.本篇文章将由浅入深解析Android消息机制的运 ...

最新文章

  1. python学习_22(文件)
  2. 【Pygame小游戏】别找了,休闲游戏专题来了丨泡泡龙小程序——休闲游戏研发推荐
  3. 使用SQLQuery
  4. python中创建集合的语句_Python 集合(set) 介绍
  5. MacOS下MySQL配置
  6. java 中导出word后压缩文件_Java批量导出word压缩后的zip文件案例
  7. H264 帧边界识别简介
  8. RedmiK40系列首销5分钟破30万台 旗舰焊门员实至名归
  9. 用python画熊猫代码_python-使用Pandas绘制包含列表的列
  10. Elasticsearch地理位置总结
  11. 【JAVA】-- 黄金矿工小游戏(一)(实现思路+每步代码)
  12. 好用的 Windows 软件授权管理工具 - slmgr
  13. matlab ode45使用,ODE45函数的使用——翻译
  14. Windows cmd卸载程序
  15. 在线工具:将图片透明化
  16. 腾讯 Code Review 规范出炉!
  17. 传美云商系统软件方案详解
  18. 几何机器学习:如何在基础科学领域成为现实??
  19. 我爷爷来了都能看懂的数据库主键,候选键,外键,非空和check的几种约束方式及使用方法
  20. android 极光推送1011,两条推送信息,一条推送成功,一条推送失败(errcode:1011,errmsg:没有满足条件的推送目标)...

热门文章

  1. Qt之统一的UI界面格式基调,漂亮的UI界面
  2. 设计银行模拟业务系统
  3. Android 隐藏导航栏
  4. 学历不高的程序员还有机会进BAT、网易等大厂吗?
  5. 苹果计划明年在印度开设iOS应用设计和开发加速器
  6. 浏览器中的performance检测页面性能
  7. arcsde for oracle11g,arcsde10.0 for oracle11g 分布式安装教程
  8. Win11 22H2 22621.754(KB5018496)RP测试版推送了!
  9. 网络基础之“WINS服务器是什么?”
  10. 数据平台发展史-从数据仓库数据湖到数据湖仓