Spark任务调度

TaskScheduler调度入口:

(1)       CoarseGrainedSchedulerBackend 在启动时会创建DriverEndPoint. 而DriverEndPoint中存在一定时任务,每隔一定时间(spark.scheduler.revive.interval, 默认为1s)进行一次调度(给自身发送ReviveOffers消息, 进行调用makeOffers进行调度)。代码如下所示

 override def onStart() {// Periodically revive offers to allow delay scheduling to workval reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")reviveThread.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {Option(self).foreach(_.send(ReviveOffers))}}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)}

(2)当Executor执行完成已分配任务时,会向Driver发送StatusUpdate消息,当Driver接收到消后会调用 makeOffers(executorId)方法,进行任务调度, CoarseGrainedExecutorBackend 状态变化时向Driver (DriverEndPoint)向送StatusUpdate消息

  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {val msg = StatusUpdate(executorId, taskId, state, data)driver match {case Some(driverRef) => driverRef.send(msg)case None => logWarning(s"Drop $msg because has not yet connected to driver")}}

Dirver接收到StatusUpdate消息时将会触发设调度(makeOffers),为完成任务的Executor分配任务。

override def receive: PartialFunction[Any, Unit] = {case StatusUpdate(executorId, taskId, state, data) =>scheduler.statusUpdate(taskId, state, data.value)if (TaskState.isFinished(state)) {executorDataMap.get(executorId) match {case Some(executorInfo) =>executorInfo.freeCores += scheduler.CPUS_PER_TASKmakeOffers(executorId)case None =>// Ignoring the update since we don't know about the executor.logWarning(s"Ignored task status update ($taskId state $state) " +s"from unknown executor with ID $executorId")}}case ReviveOffers =>makeOffers()case KillTask(taskId, executorId, interruptThread) =>executorDataMap.get(executorId) match {case Some(executorInfo) =>executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread))case None =>// Ignoring the task kill since the executor is not registered.logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")}}

其中makeOffers方法,会调用TaskSchedulerImpl中的resourceOffers方法,依其的调度策略为Executor分配适合的任务。具体代码如下:

a、为所有资源分配任务

 // Make fake resource offers on all executorsprivate def makeOffers() {// Filter out executors under killingval activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))val workOffers = activeExecutors.map { case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toSeqlaunchTasks(scheduler.resourceOffers(workOffers))}

b、为单个executor分配任务

 // Make fake resource offers on just one executorprivate def makeOffers(executorId: String) {// Filter out executors under killingif (!executorsPendingToRemove.contains(executorId)) {val executorData = executorDataMap(executorId)val workOffers = Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))launchTasks(scheduler.resourceOffers(workOffers))}}

分配完任务后,向Executor发送LaunchTask指令,启动任务,执行用户逻辑代码

 // Launch tasks returned by a set of resource offersprivate def launchTasks(tasks: Seq[Seq[TaskDescription]]) {for (task <- tasks.flatten) {val serializedTask = ser.serialize(task)if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>try {var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +"spark.akka.frameSize or using broadcast variables for large values."msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,AkkaUtils.reservedSizeBytes)taskSetMgr.abort(msg)} catch {case e: Exception => logError("Exception in error callback", e)}}}else {val executorData = executorDataMap(task.executorId)executorData.freeCores -= scheduler.CPUS_PER_TASKexecutorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))}}}

View Code

Spark任务调度策略

Ò  FIFO

FIFO(先进先出)方式调度Job,如下图所示,每个Job被切分成多个Stage.第一个Job优先获取所有可用资源,接下来第二个Job再获取剩余可用资源。(每个Stage对应一个TaskSetManager)

Ò  FAIR

FAIR共享模式调度下,Spark以在多Job之间轮询方式为任务分配资源,所有的任务拥有大致相当的优先级来共享集群的资源。FAIR调度模型如下图:

下面从源码的角度对调度策略进行说明:

当触发调度时,会调用TaskSchedulerImpl的resourceOffers方法,方法中会依照调度策略选出要执行的TaskSet, 然后取出适合(考虑本地性)的task交由Executor执行, 其代码如下:

  /*** Called by cluster manager to offer resources on slaves. We respond by asking our active task* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so* that tasks are balanced across the cluster.*/def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {// Mark each slave as alive and remember its hostname// Also track if new executor is addedvar newExecAvail = falsefor (o <- offers) {executorIdToHost(o.executorId) = o.hostactiveExecutorIds += o.executorIdif (!executorsByHost.contains(o.host)) {executorsByHost(o.host) = new HashSet[String]()executorAdded(o.executorId, o.host)newExecAvail = true}for (rack <- getRackForHost(o.host)) {hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host}}// Randomly shuffle offers to avoid always placing tasks on the same set of workers.val shuffledOffers = Random.shuffle(offers)// Build a list of tasks to assign to each worker.val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))val availableCpus = shuffledOffers.map(o => o.cores).toArrayval sortedTaskSets = rootPool.getSortedTaskSetQueuefor (taskSet <- sortedTaskSets) {logDebug("parentName: %s, name: %s, runningTasks: %s".format(taskSet.parent.name, taskSet.name, taskSet.runningTasks))if (newExecAvail) {taskSet.executorAdded()}}// Take each TaskSet in our scheduling order, and then offer it each node in increasing order// of locality levels so that it gets a chance to launch local tasks on all of them.// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANYvar launchedTask = falsefor (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {do {launchedTask = resourceOfferSingleTaskSet(taskSet, maxLocality, shuffledOffers, availableCpus, tasks)} while (launchedTask)}if (tasks.size > 0) {hasLaunchedTask = true}return tasks}

View Code

经过分析可知,通过rootPool.getSortedTaskSetQueue对队列中的TaskSet进行排序,getSortedTaskSetQueue的具体实现如下:

  override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]val sortedSchedulableQueue =schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)for (schedulable <- sortedSchedulableQueue) {sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue}sortedTaskSetQueue}

由上述代码可知,其通过算法做为比较器对taskSet进行排序, 其中调度算法有FIFO和FAIR两种,下面分别进行介绍。

FIFO

优先级(Priority): 在DAGscheduler创建TaskSet时使用JobId做为优先级的值。

 FIFO调度算法实现如下所示

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {val priority1 = s1.priorityval priority2 = s2.priorityvar res = math.signum(priority1 - priority2)if (res == 0) {val stageId1 = s1.stageIdval stageId2 = s2.stageIdres = math.signum(stageId1 - stageId2)}if (res < 0) {true} else {false}}
}

View Code

由源码可知,FIFO依据JobId进行挑选较小值。因为越早提交的作业,JobId越小。

对同一个作业(Job)来说越先生成的Stage,其StageId越小,有依赖关系的多个Stage之间,DAGScheduler会控制Stage是否会被提交到调度队列中(若其依赖的Stage未执行完前,此Stage不会被提交),其调度顺序可通过此来保证。但若某Job中有两个无入度的Stage的话,则先调度StageId小的Stage.

Fair

Fair调度队列相比FIFO较复杂,其可存在多个调度队列,且队列呈树型结构(现阶段Spark的Fair调度只支持两层树结构),每用户可以使用sc.setLocalProperty(“spark.scheduler.pool”, “poolName”)来指定要加入的队列,默认情况下会加入到buildDefaultPool。每个队列中还可指定自己内部的调度策略,且Fair还存在一些特殊的属性:

schedulingMode: 设置调度池的调度模式FIFO或FAIR, 默认为FIFO.

minShare:最少资源保证量,当一个队列最少资源未满足时,它将优先于其它同级队列获取资源。

weight: 在一个队列内部分配资源时,默认情况下,采用公平轮询的方法将资源分配给各个应用程序,而该参数则将打破这种平衡。例如,如果用户配置一个指定调度池权重为2, 那么这个调度池将会获得相对于权重为1的调度池2倍的资源。

以上参数,可通过conf/fairscheduler.xml文件配置调度池的属性。

Fair调度算法实现如下所示:

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {val minShare1 = s1.minShareval minShare2 = s2.minShareval runningTasks1 = s1.runningTasksval runningTasks2 = s2.runningTasksval s1Needy = runningTasks1 < minShare1val s2Needy = runningTasks2 < minShare2val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDoubleval minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDoubleval taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDoubleval taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDoublevar compare: Int = 0if (s1Needy && !s2Needy) {return true} else if (!s1Needy && s2Needy) {return false} else if (s1Needy && s2Needy) {compare = minShareRatio1.compareTo(minShareRatio2)} else {compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)}if (compare < 0) {true} else if (compare > 0) {false} else {s1.name < s2.name}}
}

View Code

由原码可知,未满足minShare规定份额的资源的队列或任务集先执行;如果所有均不满足minShare的话,则选择缺失比率小的先调度;如果均不满足,则按执行权重比进行选择,先调度执行权重比小的。如果执行权重也相同的话则会选择StageId小的进行调度(name=“TaskSet_”+ taskSet.stageId.toString)。

以此为标准将所有TaskSet进行排序, 然后选出优先级最高的进行调度。

Spark 任务调度之任务本地性

  当选出TaskSet后,将按本地性从中挑选适合Executor的任务,在Executor上执行。

   (详细见http://www.cnblogs.com/barrenlake/p/4550800.html一小节相关内容)

文章地址: http://www.cnblogs.com/barrenlake/p/4891589.html

  

转载于:https://www.cnblogs.com/barrenlake/p/4891589.html

Spark任务调度流程及调度策略分析相关推荐

  1. Spark 任务调度机制详解

    Spark 任务调度机制 在工厂环境下,Spark 集群的部署方式一般为 YARN-Cluster 模式,之后的内核分析内容中我们默认集群的部署方式为 YARN-Cluster 模式. 4.1 Spa ...

  2. Spark内核解析之四:Spark 任务调度机制

    前言 在生产环境下,Spark集群的部署方式一般为YARN-Cluster模式,之后的内核分析内容中我们默认集群的部署方式为YARN-Cluster模式. Spark任务提交流程 在前面我们讲解了Sp ...

  3. Spark任务调度概述_大数据培训

    Spark 任务调度机制 在工厂环境下,Spark集群的部署方式一般为YARN-Cluster模式,之后的内核分析内容中我们默认集群的部署方式为YARN-Cluster模式.在上一章中我们讲解了Spa ...

  4. Spark整理:spark 任务调度

    在生产环境下,Spark 集群的部署方式一般为 YARN-Cluster 模式,之后的内核分析内容中我们默认集群的部署方式为 YARN-Cluster 模式. Driver 线程主 要 是 初 始 化 ...

  5. 深入分析Spark任务调度的原理--Java后端同学入门Spark编程系列

    作者:陌北有棵树,Java人,架构师社区合伙人! 之前写了一篇:<我作为Java后端,分享一下入门Spark编程的经历!> 上篇是Spark入门的第一篇,写了一些关于Spark编程中RDD ...

  6. Spark RPC框架源码分析(二)RPC运行时序

    前情提要: Spark RPC框架源码分析(一)简述 一. Spark RPC概述 上一篇我们已经说明了Spark RPC框架的一个简单例子,Spark RPC相关的两个编程模型,Actor模型和Re ...

  7. 常见算子使用_spark快速入门(二)spark粗略流程简述及常见名词解释

    大家元旦快乐,牛年发发发~~牛气冲天o(* ̄︶ ̄*)o spark粗略流程简述 (1)有算子触发Action,Driver端和hdfs的namenode进行通信,询问元数据信息.根据元数据信息 及相应 ...

  8. X-Pack Spark归档POLARDB数据做分析

    简介 POLARDB数据库是阿里云自研的下一代关系型云数据库,100%兼容MySQL,性能最高是MySQL的6倍,但是随着数据量不断增大,面临着单条SQL无法分析出结果的现状.X-Pack Spark ...

  9. 数据分享|基于Python、Hadoop零售交易数据的Spark数据处理与Echarts可视化分析

    分析师:Enno 案例数据集是在线零售业务的交易数据,采用Python为编程语言,采用Hadoop存储数据,采用Spark对数据进行处理分析,并使用Echarts做数据可视化.由于案例公司商业模式类似 ...

最新文章

  1. ASP .NET 如何在 SQL 查询层面实现分页
  2. 网络营销——专业的站内、站外优化还是得靠专业网络营销公司
  3. setHomeButtonEnabled
  4. [Linux] Vmware 15安装CentOs后显示网络不可用
  5. No result defined for action action.QueryAction and result result
  6. 酷!GitHub开发者自研火星车,开发教程全面开源
  7. java-接口与多态-
  8. 计算机大赛横幅标语有趣的,有趣的横幅标语
  9. 记一次Springboot启动异常
  10. 微软服务器虚拟化-Hyper-v详解
  11. Zephyr移植到NXP MIMXRT1060_EVK-RT1061 CVL5A过程
  12. 基于麻雀优化的BP神经网络(分类应用) - 附代码
  13. mysql存储emij表情_【MySQL】存储emoji表情报错(Incorrect string value: '\xF0\x9F\x98\x82\xF0\x9F...')的解决方案...
  14. android:scaleType属性 centerCrop,fitXY,fitCenter
  15. [转]Repeate分页
  16. kettle spoon 连接mysql数据库
  17. java游戏房间匹配_游戏匹配和结算实现
  18. 2023常见的前端面试题(附加解答)
  19. Web服务器群集--Nginx网站服务(运行控制,访问状态统计,基于授权和客户端的访问控制,基于域名,端口,IP的虚拟web主机访问)
  20. 医学图像DICOM文件解析——DICOM内部信息详解篇

热门文章

  1. 修改ActiveProcessLinks链表隐藏进程
  2. windows mobile开发循序渐进(5)移动应用程序与webservice之间的数据交互
  3. Google退出中国 谁最受伤
  4. 剑指Offer:剪绳子(动态规划、贪婪算法)
  5. StringBuffer、StringBuilder、ArrayList、Vector、HashMap、HashTable 的扩容机制
  6. 【LDA学习系列】神奇的Gama函数Python代码
  7. 机器学习知识点(三十四)机器学习类学习资源
  8. Java开发SVM之Eclipse集成LibSVM示例
  9. Windows上 万能的串口调试助手
  10. Android内容提供程序