1. 应用程序之间

在Standalone模式下,Master提供里资源管理调度功能。在调度过程中,Master先启动等待列表中应用程序的Driver,这个Driver尽可能分散在集群的Worker节点上,然后根据集群的内存和CPU使用情况,对等待运行的应用程序进行资源分配。默认分配规则是有条件的FIFO,先分配的应用程序会尽可能多的获取满足条件的资源,后分配的应用程序只能在剩余资源中再次筛选。如果没有合适资源的应用程序只能等待。Master.scheduler方法如下:

private def schedule(): Unit = {if (state != RecoveryState.ALIVE) {return}// 随机打乱Worker节点val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))val numWorkersAlive = shuffledAliveWorkers.sizevar curPos = 0// 这是只对Standalone下的Cluster模式才生效,client模式Driver是在客户端for (driver <- waitingDrivers.toList) { var launched = falsevar numWorkersVisited = 0while (numWorkersVisited < numWorkersAlive && !launched) {val worker = shuffledAliveWorkers(curPos)numWorkersVisited += 1if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {launchDriver(worker, driver)waitingDrivers -= driverlaunched = true}curPos = (curPos + 1) % numWorkersAlive}}// 对等待应用程序按照顺序分配运行资源startExecutorsOnWorkers()}

默认情况下,在Standalone模式下,每个应用程序可以分配到的CPU核数可以由spark.deploy.defaultCores进行设置,但是该配置默认为Int.max,也就是不限制,从而应用程序会尽可能获取CPU资源。为了限制每个应用程序使用CPU资源,用户一方面可以设置spark.core.max配置项,约束每个应用程序所能申请的最大CPU核数;另一方面可以设置spark.executor.cores配置项,用于设置在每个Executor上启动的CPU核数。

 /*** Schedule and launch executors on workers*/private def startExecutorsOnWorkers(): Unit = {// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app// in the queue, then the second app, etc.// 使用FIFO算法运行应用,即先注册的应用先运行for (app <- waitingApps if app.coresLeft > 0) {val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor// Filter out workers that don't have enough resources to launch an executorval usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE).filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&worker.coresFree >= coresPerExecutor.getOrElse(1)).sortBy(_.coresFree).reverse// 一种是spreadOutApps,就是把应用运行在尽量多的Worker上,另一种是非spreadOutAppsval assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)// Now that we've decided how many cores to allocate on each worker, let's allocate them// 给每个worker分配完application要求的cpu core之后,遍历worker启动executorfor (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {allocateWorkerResourceToExecutors(app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))}}}

对于Worker的分配策略有两种:一种是尽量把应用程序运行可能多的Worker上,这种分配算法不仅能充分利用集群资源,还有利于数据本地性;另一种就是应用程序运行在尽量少的Worker上,这种适用于CPU密集型而内存使用较少的场景。配置项为spark.deploy.spreadOut。主要代码为:Master.scheduleExecutorsOnWorkers方法实现。

  private def scheduleExecutorsOnWorkers(app: ApplicationInfo,usableWorkers: Array[WorkerInfo],spreadOutApps: Boolean): Array[Int] = {// 应用程序中每个Executor所需要的CPU核数val coresPerExecutor = app.desc.coresPerExecutor// 每个Executor所需的最少核数,如果设置了coresPerExecutor则为该值,否则为1val minCoresPerExecutor = coresPerExecutor.getOrElse(1)// 如果没有设置coresPerExecutor,那么每个Worker上只有一个Executor,并尽可能分配资源val oneExecutorPerWorker = coresPerExecutor.isEmpty// 每个Executor需要分配多少内存val memoryPerExecutor = app.desc.memoryPerExecutorMB// 集群中可用的Worker节点的数量val numUsable = usableWorkers.length// Worker节点所能提供的CPU核数数组val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker// Worker分配Executor个数数组val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker// 需要分配的CPU核数,为应用程序所需CPU核数和可用CPU核数最小值var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)/** Return whether the specified worker can launch an executor for this app. *//*** 返回指定的Worker节点是否能够启动Executor,满足条件:*   1. 应用程序需要分配CPU核数>=每个Executor所需的最少CPU核数*   2. 是否有足够的CPU核数,判断条件为该Worker节点可用CPU核数-该Worker节点已分配的CPU核数>=每个Executor所需最少CPU核数* 如果在该Worker节点上允许启动新的Executor,需要追加以下两个条件:*   1. 判断内存是否足够启动Executor,其方法是:当前Worker节点可用内存-该Worker已分配的内存>=每个Executor分配的内存大小*   2. 已经分配给该应用程序的Executor数量+已经运行该应用程序的Executor数量<该应用程序Executor设置的最大值*/def canLaunchExecutor(pos: Int): Boolean = {val keepScheduling = coresToAssign >= minCoresPerExecutorval enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor// If we allow multiple executors per worker, then we can always launch new executors.// Otherwise, if there is already an executor on this worker, just give it more cores.// 启动新Executor条件是:该Worker节点允许启动多个Executor或者在该Worker节点上没有为该应用程序分配Executorval launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0// 如果在该Worker节点上允许启动多个Executor,那么该Executor节点满足启动条件就可以启动新Executor,// 否则只能启动一个Executor并尽可能的多分配CPU核数if (launchingNewExecutor) {val assignedMemory = assignedExecutors(pos) * memoryPerExecutorval enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutorval underLimit = assignedExecutors.sum + app.executors.size < app.executorLimitkeepScheduling && enoughCores && enoughMemory && underLimit} else {// We're adding cores to an existing executor, so no need// to check memory and executor limitskeepScheduling && enoughCores}}// Keep launching executors until no more workers can accommodate any// more executors, or if we have reached this application's limitsvar freeWorkers = (0 until numUsable).filter(canLaunchExecutor)// 在可用的Worker节点中启动Executor,在Worker节点每次分配资源时,分配给Executor所需的最少CPU核数,该过程是通过多次轮询进行,// 直到没有Worker节点满足启动Executor条件活着已经达到应用程序限制。在分配过程中Worker节点可能多次分配,// 如果该Worker节点可以启动多个Executor,则每次分配的时候启动新的Executor并赋予资源;// 如果该Worker节点只能启动一个Executor,则每次分配的时候把资源追加到该Executorwhile (freeWorkers.nonEmpty) {freeWorkers.foreach { pos =>var keepScheduling = true// 满足 keepScheduling标志为真(第一次分配或者集中运行)和该Worker节点满足// 启动Executor条件时,进行资源分配while (keepScheduling && canLaunchExecutor(pos)) {// 每次分配CPU核数为Executor所需的最少CPU核数coresToAssign -= minCoresPerExecutorassignedCores(pos) += minCoresPerExecutor// If we are launching one executor per worker, then every iteration assigns 1 core// to the executor. Otherwise, every iteration assigns cores to a new executor.// 如果设置每个Executor启动CPU核数,则该Worker只能为该应用程序启动1个Executor,// 否则在每次分配中启动1个新的Executorif (oneExecutorPerWorker) {assignedExecutors(pos) = 1} else {assignedExecutors(pos) += 1}// Spreading out an application means spreading out its executors across as// many workers as possible. If we are not spreading out, then we should keep// scheduling executors on this worker until we use all of its resources.// Otherwise, just move on to the next worker.// 如果是分散运行,则在某一Worker节点上做完资源分配立即移到下一个Worker节点,// 如果是集中运行,则持续在某一Worker节点上做资源分配,知道使用完该Worker节点所有资源。// 传入的Worker节点列表是按照CPU核数倒序排列,在集中运行时,会尽可能少的使用Worker节点if (spreadOutApps) {keepScheduling = false}}}// 继续从上一次分配完的可用Worker节点列表获取满足启动Executor的Worker节点列表freeWorkers = freeWorkers.filter(canLaunchExecutor)}// 返回每个Worker节点分配的CPU核数assignedCores}

tips:
关于branch-2.0,这个算法是在Spark 1.4.2的版本中优化的。在以前,Worker节点中,只能为某应用程序启动一个Executor。轮询分配资源时,Worker节点每次分配1个CPU核数,这样有可能会造成某个Worker节点最终分配CPU核数小于每个Executor所需CPU核数,那么该节点将不启动该Executor。例如:
在集群中有4个Worker节点,每个节点拥有16个CPU核数,其中设置了spark.cores.max=48和spark.executor.cores=16,由于每个Worker只启动一个Executor,按照每次分配一个CPU核数,则每个Worker节点的Executor将分配到12个CPU核数,每个由于12<16, 所以没有Executor能启动。现在改进的算法是,如果设置了spark.executor.cores,那么每次分配的时候就分配这个指定的CPU核数

2. 作业以及调度阶段之间

Spark应用程序提交执行时,会根据RDD依赖关系形成有向无环图(DAG),然后交给DAGScheduler进行划分作业和调度阶段。这些作业之间没有任何依赖关系,对于多个作业之间的调度,Spark目前提供了两种不同的调度策略,一种是FIFO模式,这也是目前默认的模式;还有一种是FAIR模式,该模式的调度可以通过两个参数来决定Job执行的优先模式,两个参数分别是minShare(最小任务数)和weight(任务的权重)。

2.1 创建调度池

在TaskSchedulerImpl.initialize方法中先创建根调度池rootPool对象,然后根据系统配置的调度模式创建调度创建器,针对两种调度策略进行具体实例化FIFOSchedulableBuilder或者FairSchedulableBuilder,最终使用调度器创建buildPools方法根调度池rootPool下创建调度池。

def initialize(backend: SchedulerBackend) {this.backend = backend// temporarily set rootPool name to emptyrootPool = new Pool("", schedulingMode, 0, 0)schedulableBuilder = {schedulingMode match {case SchedulingMode.FIFO =>new FIFOSchedulableBuilder(rootPool)case SchedulingMode.FAIR =>new FairSchedulableBuilder(rootPool, conf)case _ =>throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")}}schedulableBuilder.buildPools()}

2. 调度池中加入调度内容

在TaskSchedulerImpl.submitTask方法中,先把调度阶段拆分为任务集,然后把这些任务集交给管理器TaskSetManager进行管理,最后把该任务集的管理器加入到调度池中,等待分配执行。在FIFO中,由于创建的buildPools方法为空,所以在根调度器rootPool中并没有下级调度池,而是直接包含了一组TaskSetManager;而在Fair调度器中,根调度池rootPool中包含了下级调度池Pool,在这些下级调度池Pool包含一组TaskSetManager。

 override def submitTasks(taskSet: TaskSet) {val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")this.synchronized {// 为每一个taskSet创建一个taskSetManager// taskSetManager在后面负责,TaskSet的任务执行状况的监视和管理val manager = createTaskSetManager(taskSet, maxTaskFailures)val stage = taskSet.stageIdval stageTaskSets =taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])// 把manager加入内存缓存中stageTaskSets(taskSet.stageAttemptId) = managerval conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>ts.taskSet != taskSet && !ts.isZombie}if (conflictingTaskSet) {throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")}// 将该任务集的管理器加入到系统调度池中,由系统统一调配,该调度器属于应用级别// 支持FIFO和FAIR两种,默认FIFOschedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

3. 提供已排序的任务集管理器

在TaskSchedulerImpl.resourceOffers方法中进行资源分配时,会从根调度池rootPool获取以及排序的任务管理器,排序算法由两种调度策略提供。

// 获取按照调度策略排序好的TaskSetManager// 从rootPool中取出排序了的TaskSetManager// 在创建完TaskScheduler StandaloneSchedulerBackend之后,会执行initialize()方法,其实会创建一个调度池// 这里就是所有提交的TaskSetManager,首先会放入这个调度池中,然后再执行task分配算法的时候,会从这个调度池中,取出排好队的TaskSetManagerval sortedTaskSets = rootPool.getSortedTaskSetQueue

在FIFO调度策略中,由于根调度池rootPool直接包含了多个作业的任务管理器,在比较时,首先需要比较作业的优先级(根据作业编号判断,作业编号越小优先级越高),如果是同一个作业,则会比较调度阶段优先级(根据调度阶段编号判断,调度阶段编号越小优先级越高)

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {//获取作业优先级,实际上是作业编号val priority1 = s1.priorityval priority2 = s2.priorityvar res = math.signum(priority1 - priority2)//如果是同一个作业,再比较调度阶段优先级if (res == 0) {val stageId1 = s1.stageIdval stageId2 = s2.stageIdres = math.signum(stageId1 - stageId2)}res < 0}
}

在FAIR调度策略中包含了两层调度,第一层调度池rootPool中包含了下级调度池Pool,第二层为下级调度池Pool包含多个TaskSetManager。具体配置参见$SPARK_HOME/conf/fairscheduler.xml文件,在该文件中包含多个下级调度池Pool配置项,其中minShare(最小任务数)和weight(任务的权重)用来设置第一级调度算法,而SchedulingMode参数是用来设置第二层调度算法。

在FAIR算法中,先获取两个调度器的饥饿程度,饥饿程度为正在运行的任务是否小于最小任务,如果是,则表示该调度处于饥饿程度。获取饥饿程度后进行如下比较:

  • 如果某个调度处于解状态,另一个处于非饥饿状态,则先满足处于饥饿状态的调度
  • 如果两个调度都处于饥饿状态,则比较资源比,先们在这资源比较少的调度。
  • 如果两个调度处于非饥饿状态,则比较权重比,先满足权重比少的调度。
  • 以上情况均相同的情况,根据调度的名称进行排序。
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {//最小任务数val minShare1 = s1.minShareval minShare2 = s2.minShare//正在运行的任务数val runningTasks1 = s1.runningTasksval runningTasks2 = s2.runningTasks//饥饿程序,判断标准为正在运行的任务数量是否小于最小任务数量val s1Needy = runningTasks1 < minShare1val s2Needy = runningTasks2 < minShare2//资源比,正在运行的任务数量/最小任务数量val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)//权重比,正在运行的任务数/任务的权重val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDoubleval taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble//判断执行var compare = 0if (s1Needy && !s2Needy) {return true} else if (!s1Needy && s2Needy) {return false} else if (s1Needy && s2Needy) {compare = minShareRatio1.compareTo(minShareRatio2)} else {compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)}if (compare < 0) {true} else if (compare > 0) {false} else {s1.name < s2.name}}
}

3. 任务之间

我们先了解数据本地型和延迟两个概念。

3.1 数据本地型

数据的计算尽可能在数据所在的节点上进行,这样可以减少数据在网络上的传输,毕竟移动计算比移动数据代价来得小些。进一步看,数据如果在运行的节点的内存中,就能够进一步减少磁盘I/O的传输。在Spark中数据本地型优先级从高到低为PROCESS_LOCAL>NODE_LOCAL>NO_PREF>PACK_LOCAL>ANY,即最好是任务运行的节点内存中存在数据,次好是同一个Node(同一个机器)上,再次是同机架,最后是同一个为。其中任务数据本地型通过以下情况来确定:

  • 如果任务处于作业的开始的调度阶段,这些任务对于的RDD分区都有首选运行位置,该位置也是任务运行的首选位置,数据本地性为NODE_LOCAL.
  • 如果任务处于非作业开头的调度阶段,可以根据父调度阶段运行的位置得到任务的首选位置,这种情况下,如果Executor处于活跃状态,则数据本地性为PROCESS_LOCAL;如果Executor不处于活动状态,但存在父调度阶段运行结果,则数据本地性为NODE_LOCAL
  • 如果没有首选位置,则数据本地型为NO_PREF

3.2 延迟执行

在任务分配运行节点时先判断最佳运行节点是否空闲,如果该节点没有足够的资源运行该任务,在这种情况下任务会等待一定的时间;如果在等待时间内该节点释放足够的资源,则任务在该节点运行,如果还不足会找出次佳的节点运行。通过这样的方式进行能够让任务运行在更高级别的数据本地性节点,从而减少磁盘I/O和网络传输。一般来说PROCESS_LOCAL和NODE_LOCAL两个数据本地性级别进行等待,系统默认延迟时间为3s。

Spakr任务分配的原则就是让任务运行在数据本地性优先级别更高的节点上,甚至可以为此等待一定的时间。该任务分配有TaskSchedulerImpl.resourceOffers方法实现,在该方法中先对应于程序获取的资源(Worker节点)进行混洗,以使任务能够更加均衡的分散在集群中运行。然后对任务集对应的TaskManager根据设置地调度算法进行排序,最后对TaskSetManager中的任务按照数据本地性分配任务运行节点,在分配时先根据任务集的本地性从优先级高到低进行分配任务,在分配的过程中动态地判断集群中节点运行的请,通过延迟执行等待数据本地性更高的节点运行。

Spark详解(六):Spark集群资源调度算法原理相关推荐

  1. Kafka详解(包括kafka集群搭建)

    目录 一.Kafka简介 Kafka是什么 消息系统简介 点对点消息传递模式 发布-订阅消息传递模式 kafka简单理解 Kafka的优点特点 学Kafka的意义何在 二.常用Message Queu ...

  2. Tomcat原理详解和各种集群的实现

    注意:本篇博文涉及的知识内容如下,实验中所用的系统环境为RHEL6.4. 1.Java基础知识讲解 2.Tomcat的安装和实现 3.通过apache的mod_proxy模块代理并实现Tomcat负载 ...

  3. Tomcat原理详解和各种集群的实现(转自:http://harisxiong.blog.51cto.com/7513022/1304746)

    标签:apache Java Tomca mod_proxy 原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 .作者信息和本声明.否则将追究法律责任.http://harisxiong ...

  4. Spark详解(十):SparkShuffle机制原理分析

    1. Spark Shuffle简介 在Hadoop的MapReduce框架中Shuffle是连接Map和Reduce的桥梁,Map的输出要用到Reduce中必须经过Shuffle这个环节.由于Shu ...

  5. oracle crsctl详解,Oracle RAC 集群 crsctl 常用命令大全

    1.启停集群: (需要root用户) $GRID_HOME/bin/crsctl stop crs $GRID_HOME/bin/crsctl start crs 2.开启/关闭集群自动启动 $GRI ...

  6. ssh无密登录配置详解(hadoop集群搭建)

    ssh无密登录原理 Hadoop集群搭建时,配置ssh是给yarn用的,最好先清空.ssh目录下的文件,再生成密钥,在节点中含有resourcemanager的服务器(有多个resourcemanag ...

  7. Spark集群资源如何分配

    前言 新来的实习小伙,在公司的一个小集群(14台服务器,用于处理一些数据量较小的月数据)上面提交了一个spark application,然后其他人提交的application都在排队中,一个个的在抱 ...

  8. spark提交到yarn_详细总结spark基于standalone、yarn集群提交作业流程

    最近总结了一些关于spark core的内容,今天先来和大家分享一下spark的运行模式. spark运行模式 (1)local:在本地eclipse.IDEA中写spark代码运行程序,一般用于测试 ...

  9. Spark详解(十二):Spark Streaming原理和实现

    1 简介 SparkStreaming是Spark核心API的一个扩展,具有高吞吐量和容错能力的实时流数据处理系统,可以对多种数据源(如Kdfka.Flume.Twitter.Zero和TCP 套接字 ...

最新文章

  1. WPF入门教程系列九——布局之DockPanel与ViewBox(四)
  2. BZOJ1975[Sdoi2010]魔法猪学院——可持久化可并堆+最短路树
  3. EIGRP的路由汇总与认证
  4. Freemarker 页面静态化技术使用入门案例
  5. PyTorch基础-交叉熵函数mnist数据集识别-04
  6. 被娱乐在线报道的“唐骏造假门事件”
  7. java版 modbus crc16校验 (已测试成功)_java版 ModBus CRC16校验 (已测试成功)
  8. Python框架篇之Django(Models数据表的创建、数据库配置)
  9. 基于JSP的蛋糕销售系统设计与实现答辩ppt模板
  10. win7 卸载虚拟机重装提示请您确认有足够的权限安装....
  11. GreenSock GSAP 3.0 最新版 所有内容创建于2020年4月4日
  12. 一种使用Python计算可达矩阵的简单方法
  13. fifo的rdata_同步Fifo和异步fifo
  14. OpenGL 颜色混合、图元的反走样(五)
  15. Hash函数经典用法
  16. IDEA 添加类注释、方法注释(快捷键 /** + Enter)
  17. Pet Peeve 是什么?
  18. JavaWeb: Tomcat优化
  19. 怎么通过iTools解决闪退,应用无法安装激活的办法
  20. N76E003模拟EEPROM读取和保存应用配置

热门文章

  1. 设计模式学习笔记——命令模式(Command)
  2. 在js对象上绑定js数组原生方法
  3. wxPython 笔记(3)基本结构
  4. 黑马程序员Linux系统开发视频之线程共享资源与非共享资源
  5. Postman教程大全
  6. Python使用you-get批量下载bilibili网站视频
  7. 关于__VA_ARGS__的说明
  8. c语言解逻辑问题的一般步骤,C语言面试题---逻辑短路问题
  9. activemenu怎么拼 vue_Vue-el-menu使用,点击按钮跳转指定页面
  10. java中extends ,implements的顺序问题