master的receive方法接收到RegisterApplication类型的消息,就要给application划分资源了。

    //Driver 端提交过来的要注册Applicationcase RegisterApplication(description, driver) =>// TODO Prevent repeated registrations from some driver//如果Master状态是standby 忽略不提交任务if (state == RecoveryState.STANDBY) {// ignore, don't send response} else {logInfo("Registering app " + description.name)//这里封装application信息,注意,在这里可以跟进去看到默认一个application使用的core的个数就是 Int.MaxValueval app:ApplicationInfo = createApplication(description, driver)//注册app ,这里面有向 waitingApps中加入当前applicationregisterApplication(app)logInfo("Registered app " + description.name + " with ID " + app.id)persistenceEngine.addApplication(app)driver.send(RegisteredApplication(app.id, self))//最终又会执行通用方法schedule()schedule()}

判断master如果是standby,就忽略不提交任务。根据ApplicationDescription,将其封装成ApplicationInfo对象。所谓registerApplication注册,就是将刚得到的ApplicationInfo对象放到ArrayBuffer[ApplicationInfo]中去。
然后就去调度schedule(),这个 方法很熟悉,但是为Master随机挑选一台Worker,就是用的这个方法。之后会在worker节点上启动Executor。拿出要提交的application,先判断启动1个Executor需要几个core:

 //coresPerExecutor 在application中获取启动一个Executor使用几个core 。参数--executor-core可以指定,下面指明不指定就是1
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)

接着会判断是否还有足够的core来启动这个Executor

if (app.coresLeft >= coresPerExecutor)...//coresGranted 已经分配的core ,刚开始就是0个,如果不指定Executor需要的core,requestedCores 就是最大值//如果指定了,就按照我们指定的,来分配coreprivate[master] def coresLeft: Int = requestedCores - coresGranted

这段逻辑是:比如我指定了Executor总共可使用core为100,减掉已经分配了的core,就是剩余的可用core。我就看剩下的core是否够启动正义而Executor的。
然后会过滤出所有处于alive状态的worker节点,且worker节点的剩余内存可以供Executor启动、使用,并排序:

        //过滤出可用的workerval usableWorkers : Array[WorkerInfo]= workers.toArray.filter(_.state == WorkerState.ALIVE).filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&worker.coresFree >= coresPerExecutor).sortBy(_.coresFree).reverse

然后执行scheduleExecutorsOnWorkers方法(传参:applicationInfo对象、满足要求的所有可用Worker),就在Worker节点上调度Executor。它决定了Application如何给Executor划分资源。

首先明确启动1个Ececutor需要多少core,如果不指定,默认最少使用1个core:

    //启动一个Executor使用多少core,这里如果提交任务没有指定 --executor-core 5 这个值就是Noneval coresPerExecutor : Option[Int]= app.desc.coresPerExecutor//这里指定如果提交任务没有指定启动一个Executor使用几个core,默认就是1val minCoresPerExecutor = coresPerExecutor.getOrElse(1)

然后明确启动1个Executor需要多少内存。如果不指定就是默认1G:

    //默认启动一个Executor使用的内存就是1024M//若提价命令中有 --executor-memory 5*1024 就是指定的参数val memoryPerExecutor = app.desc.memoryPerExecutorMB

然后创建两个关键的数组:

    //i号Worker节点给Application分配了几个coreval assignedCores = new Array[Int](numUsable)// i号Worker节点给当前Application启动了几个Executorval assignedExecutors = new Array[Int](numUsable)

将集群中的core都拿走,如果指定了–total-executor-core,就是它。如果没指定,就将集群内所有的core都征用了:

//比如集群内所有worker节点剩余50个core,不过我们不指定Executor需要的core,就会把所有worker的所有core都用了
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

过滤出所有能启动Executor的Worker节点:

var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)//判断某台worker节点是否还可以启动Executordef canLaunchExecutor(pos: Int): Boolean = {//可分配的core是否大于启动一个Executor使用的1个coreval keepScheduling = coresToAssign >= minCoresPerExecutor//是否有足够的coreval enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor// If we allow multiple executors per worker, then we can always launch new executors.// Otherwise, if there is already an executor on this worker, just give it more cores.//assignedExecutors(pos) == 0 为true,launchingNewExecutor就是为trueval launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0//启动新的Executorif (launchingNewExecutor) {val assignedMemory = assignedExecutors(pos) * memoryPerExecutor//是否有足够的内存val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor//这里做安全判断,说的是要分配启动的Executor和当前application启动的使用的Executor总数是否在Application总的Executor限制之下val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimitkeepScheduling && enoughCores && enoughMemory && underLimit} else {// We're adding cores to an existing executor, so no need// to check memory and executor limitskeepScheduling && enoughCores}}

所有能启动Executor的Worker节点,会被过滤放到freeWorkers中。
然后就是判断每台Worker节点是否能启动Executor:

    //判断某台worker节点是否还可以启动Executordef canLaunchExecutor(pos: Int): Boolean = {//可分配的core是否大于启动一个Executor使用的1个coreval keepScheduling = coresToAssign >= minCoresPerExecutor//当前worker节点剩余的core - 当前worker已经分出去的core  是否大于启动1个Executor需要的最少的1个coreval enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor//我们没指定--executor-cores,oneExecutorPerWorker=true//assignedExecutors(pos)==0表示当前worker节点尚未启动Executor节点//assignedExecutors(pos) == 0 为true,launchingNewExecutor就是为trueval launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0//启动新的Executorif (launchingNewExecutor) {//统计当前worker节点启动的Executor需要的内存val assignedMemory = assignedExecutors(pos) * memoryPerExecutor//当前worker节点的剩余可用内存 - 已经给Executor分配了的内存  是否大于 1Gval enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor//这里做安全判断,说的是要分配启动的Executor和当前application启动的使用的Executor总数是否在Application总的Executor限制之下val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimitkeepScheduling && enoughCores && enoughMemory && underLimit} else {// We're adding cores to an existing executor, so no need// to check memory and executor limitskeepScheduling && enoughCores}}

接着要遍历每个Worker节点,开始分配资源了:

    while (freeWorkers.nonEmpty) {freeWorkers.foreach { pos =>var keepScheduling = true//这个循环是给每个Executor分配默认的1个corewhile (keepScheduling && canLaunchExecutor(pos)) {//50-1coresToAssign -= minCoresPerExecutor//i号worker上的Executor就分配1个core//给记录core的那个数组更新assignedCores(pos) += minCoresPerExecutor//如果未指定--executor-core,这里就是trueif (oneExecutorPerWorker) {assignedExecutors(pos) = 1} else {//这可以在1台Worker节点上启动多个ExecutorassignedExecutors(pos) += 1}// Spreading out an application means spreading out its executors across as// many workers as possible. If we are not spreading out, then we should keep// scheduling executors on this worker until we use all of its resources.// Otherwise, just move on to the next worker.if (spreadOutApps) {keepScheduling = false}}}//5个Executor都各自分配完1个core后,再次遍历//把剩下的可用core继续分配给ExecutorfreeWorkers = freeWorkers.filter(canLaunchExecutor)}

这个循环就是给启动Executor分配core,并且会更新那俩关键的数组对象:
表示i号Worker给当前Application启动了1个Executor、提供了1个core。一次循环完毕后,判断是否开启下一轮循环。当集群内的所有core都被要光了,worker列表空了,循环也就结束了。
内层循环会给5台Worker上各启动1个Executor,并各自分配1个core。之后外层循环再次执行canLaunchExecutor方法,将集群内的剩余可用core继续分配给Executor。可以理解为:启动Executor默认使用1G内存,但是会默认将集群内的core都要光。

总结

  • 如果提交任务时不指定参数,每个Worker会为Application启动1个Executor,默认占用1G内存、当前Worker节点上所有的core
  • 如果指定了–total-executor-core,application会申请指定的core
  • Executor启动和core、memory都有关
  • Executor在集群内,默认分散启动
  • 要想在1个Worker节点上启动多个Executor,需要指定–executor -cores:assignedExecutors(pos) += 1

Spark Application资源调度源码相关推荐

  1. 资源调度源码分析和任务调度源码分析

    1.资源调度源码分析 资源请求简单图 资源调度Master路径: 路径:spark-1.6.0/core/src/main/scala/org.apache.spark/deploy/Master/M ...

  2. Spark RPC框架源码分析(二)RPC运行时序

    前情提要: Spark RPC框架源码分析(一)简述 一. Spark RPC概述 上一篇我们已经说明了Spark RPC框架的一个简单例子,Spark RPC相关的两个编程模型,Actor模型和Re ...

  3. spark任务运行源码

    spark任务运行源码 spark是一个分布式计算引擎,任务的运行是计算引擎的核心. 一个spark任务怎么能运行起来呢? 1 spark服务启动(Master,Worker): 2 应用程序提交 3 ...

  4. Spark ALS recommendForAll源码解析实战之Spark1.x vs Spark2.x

    文章目录 Spark ALS recommendForAll源码解析实战 1. 软件版本: 2. 本文要解决的问题 3. 源码分析实战 3.1 Spark2.2.2 ALS recommendForA ...

  5. 《Spark商业案例与性能调优实战100课》第25课:Spark Hash Shuffle源码解读与剖析

    <Spark商业案例与性能调优实战100课>第25课:Spark Hash Shuffle源码解读与剖析

  6. 第25课 Spark Hash Shuffle源码解读与剖析

    第25课: Spark Hash Shuffle源码解读与剖析 Spark 2.1x 现在的版本已经没有Hash Shuffle的方式,那为什么我们还要讲解HashShuffle源码的内容呢?原因有3 ...

  7. spark的java源码,Spark源码包的编译

    Spark源码包的编译和部署生成,其本质只有两种:Maven和SBT (Simple Build Tool), 只不过针对不同的场景而已: Maven编译 SBT编译 IntelliJ IDEA编译( ...

  8. Spark存储机制源码剖析

    一.Shuffle结果的写入和读取 通过之前的文章Spark源码解读之Shuffle原理剖析与源码分析我们知道,一个Shuffle操作被DAGScheduler划分为两个stage,第一个stage是 ...

  9. Spark内核以及源码解析

    2019独角兽企业重金招聘Python工程师标准>>> 一:图RDD 1.上图groupBy,Join会产生shuffle,shuffle可以做性能优化. 2.stage1和stag ...

最新文章

  1. 没答好「进程间通信」,被面试官挂了....
  2. 小强升职记梗概_解读《小强升职记》——一本关于时间管理的书
  3. 第一次参加数学建模竞赛如何夺取一等奖
  4. 【Java学习笔记五】Java异常处理
  5. java中反射机制通过字节码文件对象获取字段和函数的方法
  6. 【C语言】字符数组,碎碎念
  7. 6.042 Mathematics for Computer Science
  8. linux下重命名文件
  9. 服务器上文件添加可信任,如何将服务器配置为受信任以进行委派
  10. linux中apache无法启动,Apache无法启动
  11. 大数据薪水大概多少_大数据工资一般多少
  12. pip设置默认为清华镜像
  13. H5代码正常但在IOS端出现页面卡顿
  14. 计算机金融专业美国学校排名,美国金融专业都有哪些种类?
  15. 【A Neural Algorithm of Artistic Style】 Pics
  16. 计算机组成原理 — GPU 图形处理器
  17. cadence 16.60破解方式及文件下载地址
  18. podman基础教程
  19. 详细分析大型web系统各个子系统架构图 纯干货!
  20. opj文件怎么打开?科研绘图软件Origin中文版下载安装使用教程(1)

热门文章

  1. wordpress漏洞_聊聊 WordPress 5.1.1 CSRF to RCE 漏洞
  2. c语言小程序解决生活中小问题,自己写的一个小程序 有问题帮帮忙
  3. 短视频SDK架构设计,短视频APP开发目标首选
  4. CF401D Roman and Numbers
  5. 方便的boost_python
  6. CentOS 6.4 搭建SVN服务器
  7. [裴礼文数学分析中的典型问题与方法习题参考解答]4.3.13
  8. Apache HTTP Server搭建虚拟主机
  9. Struts框架核心技术小小班
  10. Mask_RCNN训练自己的模型(练习)