TaskScheduler是一个接口,DAGScheduler在提交TaskSet给底层调度器的时候是面向接口TaskScheduler。

TaskSchduler的核心任务是提交Taskset到集群运算并汇报结果

# 为TaskSet创建和维护一个TaskSetManager并追踪任务的本地性以及错误信息

# 遇到Straggle任务会放到其他的节点进行重试

# 向DAGScheduler汇报执行情况,包括在Shuffle输出lost的时候报告fetch failed错误等信息

在Standalone模式下StandaloneSchedulerBackend在启动的时候构造AppClient实例并在该实例start的时候启动了ClientEndpoint这个消息循环体。ClientEndpoint在启动的时候会向Master注册当前程序。而StandaloneSchedulerBackend的父类CoarseGrainedSchedulerBackend在start的时候会实例化类型为DriverEndpoint(这就是我们程序运行时候的经典对象的Driver)的消息循环体,StandaloneSchedulerBackend专门负责收集Worker上的资源信息,当ExecutorBackend启动的时候会发送RegisteredExecutor信息向DriverEndpoint注册,此时StandaloneSchedulerBackend就掌握了当前应用程序拥有的计算资源,TaskScheduler就是通过StandaloneSchedulerBackend拥有的计算资源来具体运行Task

一 核心属性

Int maxTaskFailures: task最多失败次数

Boolean isLocal: 是否本地运行

AtomicLong nextTaskId:递增的task id

SPECULATION_INTERVAL_MS : 多久检查一次推测任务

CPUS_PER_TASK: 每一个任务需要的cpu核数

HashMap[Long, TaskSetManager]taskIdToTaskSetManager: 为TaskSet创建和维护一个TaskSetManager并追踪任务的本地性以及错误信息

HashMap[Long, String] taskIdToExecutorId: 维护的taskId和executorId的映射

HashMap[String, HashSet[Long]]executorIdToRunningTaskIds:每一个execuotor上运行的task集合的映射

HashMap[String, HashSet[String]] hostToExecutors: 主机名和executors之间的映射

HashMap[String, HashSet[String]] hostsByRack:机架和主机名的映射

HashMap[String, String] executorIdToHost: executorID和主机名映射

DAGScheduler dagScheduler:

SchedulerBackend backend:调度器的通信终端

SchedulableBuilder schedulableBuilder:调度模式,比如FIFO或者Fair

schedulingModeConf:所配置的调度模式,默认FIFO

Pool rootPool: 用于调度TaskManager

TaskResultGetter taskResultGetter: Task结果获取器

二 重要方法

2.1 初始化和启动方法

我们知道,在SparkContext初始化的时候,就会初始化TaskScheduler以及SchedulerBackend,并且会初始化和启动TaskScheduler。

definitialize(backend:SchedulerBackend) {
  // 初始化SchedulerBackend
 
this.backend= backend
  // 创建一个Pool用于调度TasksetManager
 
rootPool = new Pool("",schedulingMode, 0, 0)
  // 通过配置的调度模式,构建SchedulableBuilder
 
schedulableBuilder= {
    schedulingModematch{
      case SchedulingMode.FIFO=>
        new FIFOSchedulableBuilder(rootPool)
      case SchedulingMode.FAIR=>
        new FairSchedulableBuilder(rootPool,conf)
      case _ =>
        throw new IllegalArgumentException(s"Unsupportedspark.scheduler.mode:$schedulingMode")
    }
  }
  // 开始构建pool
  schedulableBuilder
.buildPools()
}

override def start() {// 启动SchedulerBackend的start方法,StandaloneSchedulerBackend在// 启动的时候构造AppClient实例并在该实例start的时候启动了ClientEndpoint// 这个消息循环体。ClientEndpoint在启动的时候会向Master注册当前程序。backend.start()// 如果非本地执行,则检查是否需要推测if (!isLocal && conf.getBoolean("spark.speculation", false)) {logInfo("Starting speculative execution thread")// 如果可以推测则调用speculationSchedule定时调度checkSpeculatableTasks方法speculationScheduler.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryOrStopSparkContext(sc) {checkSpeculatableTasks()}}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)}
}

2.2 submitTasks 提交task

override def submitTasks(taskSet: TaskSet) {// 获取task集合,TaskSet是对Task的封装val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")this.synchronized {// 创建TaskSetManager,用于跟踪每一个Task,task失败进行重试等val manager = createTaskSetManager(taskSet, maxTaskFailures)// 获取该TaskSet所对应的stageIdval stage = taskSet.stageId// 构建一个<stageId,HashMap<stageAttemptId,TaskSetManager>映射val stageTaskSets =taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])// 将这个创建TaskSetManager放入到映射中stageTaskSets(taskSet.stageAttemptId) = manager// 如果有冲突的TaskSet,则抛异常val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>ts.taskSet != taskSet && !ts.isZombie}if (conflictingTaskSet) {throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")}// 申请任务调度,有FIFO和FAIR两种策略。根据executor的空闲资源状态// 及locality策略将task分配给executor。调度的数据结构封装为Pool类,// 对于FIFO,Pool就是TaskSetManager的队列;对于Fair,则是TaskSetManager// 组成的树。Pool维护TaskSet的优先级,等待executor接受资源offer(resourceOffer)// 的时候出列并提交executor计算schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)// 不是本地且没有接收task,启动一个timer定时调度,如果一直没有task就警告,直到有taskif (!isLocal && !hasReceivedTask) {starvationTimer.scheduleAtFixedRate(new TimerTask() {override def run() {if (!hasLaunchedTask) {logWarning("Initial job has not accepted any resources; " +"check your cluster UI to ensure that workers are registered " +"and have sufficient resources")} else {this.cancel()}}}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)}hasReceivedTask = true}// SchedulerBackend向driver发送ReviveOffers消息backend.reviveOffers()
}

2.3 DriverEndPoint主要用于接受消息

# receive

override def receive: PartialFunction[Any, Unit] = {// 如果接收StatusUpdate消息,用于状态更新case StatusUpdate(executorId, taskId, state, data) =>// 调用TaskSchedulerImpl#statusUpdate进行更新scheduler.statusUpdate(taskId, state, data.value)// 如果Task处于完成状态if (TaskState.isFinished(state)) {// 通过executor id获取ExecutorDataexecutorDataMap.get(executorId) match {// 如果存在数据case Some(executorInfo) =>// 则更新executor的cpu核数executorInfo.freeCores += scheduler.CPUS_PER_TASK// 获取集群中可用的executor列表,发起taskmakeOffers(executorId)case None =>logWarning(s"Ignored task status update ($taskId state $state) " +s"from unknown executor with ID $executorId")}}// 如果发送ReviveOffers消息case ReviveOffers =>// 获取集群中可用的executor列表,发起taskmakeOffers()// 如果是KillTask消息,表示kill掉这个taskcase KillTask(taskId, executorId, interruptThread) =>executorDataMap.get(executorId) match {// 向Executor发送KillTask的消息case Some(executorInfo) =>executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread))case None =>// Ignoring the task kill since the executor is not registered.logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")}
}

# receiveAndReply

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {// 接收RegisterExecutor表示向Executor注册case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>// 如果已经注册过,则会返回RegisterExecutorFailed向executor注册失败的消息if (executorDataMap.contains(executorId)) {executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))context.reply(true)} else {// 获取executor的地址val executorAddress = if (executorRef.address != null) {executorRef.address} else {context.senderAddress}logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")// 更新<address,executorId>集合addressToExecutorId(executorAddress) = executorId// 重新计算现在的总的CPU核数totalCoreCount.addAndGet(cores)// 计算现在已经注册executor数量totalRegisteredExecutors.addAndGet(1)// 构建一个Executor数据val data = new ExecutorData(executorRef, executorRef.address, hostname,cores, cores, logUrls)// 然后开始注册executorCoarseGrainedSchedulerBackend.this.synchronized {executorDataMap.put(executorId, data)if (currentExecutorIdCounter < executorId.toInt) {currentExecutorIdCounter = executorId.toInt}if (numPendingExecutors > 0) {numPendingExecutors -= 1logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")}}// 然后返回消息RegisteredExecutorexecutorRef.send(RegisteredExecutor)context.reply(true)listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))// 获取有效的executor,开始发起任务makeOffers()}// 接收StopDriver消息,表示停止Drivercase StopDriver =>context.reply(true)stop()// 接收StopExecutors消息,表示停止Executorcase StopExecutors =>logInfo("Asking each executor to shut down")// 遍历注册所有的executor,然后向Executor终端发送StopExecutor消息for ((_, executorData) <- executorDataMap) {executorData.executorEndpoint.send(StopExecutor)}context.reply(true)// 接收RemoveExecutor消息,表示删除Executorcase RemoveExecutor(executorId, reason) =>executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))removeExecutor(executorId, reason)context.reply(true)// 接收RetrieveSparkAppConfig消息,表示获取application相关的配置信息case RetrieveSparkAppConfig =>val reply = SparkAppConfig(sparkProperties,SparkEnv.get.securityManager.getIOEncryptionKey())context.reply(reply)
}

# makeOffers获取有效的executor,开始发起任务

private def makeOffers() {val activeExecutors = executorDataMap.filterKeys(executorIsAlive)val workOffers = activeExecutors.map { case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toIndexedSeqlaunchTasks(scheduler.resourceOffers(workOffers))
}
private def makeOffers(executorId: String) {// 获取集群中可用的executor列表if (executorIsAlive(executorId)) {val executorData = executorDataMap(executorId)// 创建WorkerOffer,只是表示executor上有可用的空闲资源val workOffers = IndexedSeq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))// 发起tasklaunchTasks(scheduler.resourceOffers(workOffers))}
}

# launchTasks

发起task,会把任务一个个发送到worker节点上的CoarseGrainedExecutorBackend,由其内部的executor来执行

private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {for (task <- tasks.flatten) {// 将每一个task序列化val serializedTask = ser.serialize(task)// 检查task序列化之后是否超过所允许的rpc消息的最大值if (serializedTask.limit >= maxRpcMessageSize) {scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>try {var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +"spark.rpc.message.maxSize (%d bytes). Consider increasing " +"spark.rpc.message.maxSize or using broadcast variables for large values."msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)taskSetMgr.abort(msg)} catch {case e: Exception => logError("Exception in error callback", e)}}}else {// 获取对应的ExecutorData数据val executorData = executorDataMap(task.executorId)// Executor的剩余核数就需要减少一个task需要的cpu核数executorData.freeCores -= scheduler.CPUS_PER_TASKlogDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +s"${executorData.executorHost}.")// 然后向Executor终端发送LaunchTask,发起taskexecutorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))}}
}

# removeExecutor

private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {logDebug(s"Asked to remove executor $executorId with reason $reason")// 获取对应的ExecutorDataexecutorDataMap.get(executorId) match {case Some(executorInfo) =>// 从相关集合或者列表移除该executorIdval killed = CoarseGrainedSchedulerBackend.this.synchronized {addressToExecutorId -= executorInfo.executorAddressexecutorDataMap -= executorIdexecutorsPendingLossReason -= executorIdexecutorsPendingToRemove.remove(executorId).getOrElse(false)}// 重新计算CPU核数totalCoreCount.addAndGet(-executorInfo.totalCores)totalRegisteredExecutors.addAndGet(-1)scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason)listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString))case None =>scheduler.sc.env.blockManager.master.removeExecutorAsync(executorId)logInfo(s"Asked to remove non-existent executor $executorId")}
}

2.4 resourceOffers 为executor分配task

计算每一个TaskSetMangaer的本地化级别(locality_level);并且对task set尝试使用最小的本地化级别(locality_level), 将task set的task在executor上启动;如果启动不了,放大本地化级别,以此类推直到某种本地化级别尝试成功

defresourceOffers(offers:IndexedSeq[WorkerOffer]):Seq[Seq[TaskDescription]] = synchronized {
  // 标记每一个slave是可用的且记住主机名
 
var newExecAvail= false
  // 遍历有可用资源的Executor
 
for (o <- offers) {
    // 如果没有包含了这个executor的host,初始化一个集合,存放host
   
if (!hostToExecutors.contains(o.host)) {
      hostToExecutors(o.host) =new HashSet[String]()
    }
    // 如果<executorId,taskId集合>不包含这个executorId
   
if (!executorIdToRunningTaskIds.contains(o.executorId)) {
      hostToExecutors(o.host)+= o.executorId
      // 通知DAGScheduler添加executor
     
executorAdded(o.executorId, o.host)
      executorIdToHost(o.executorId) = o.host
      executorIdToRunningTaskIds(o.executorId) =HashSet[Long]()
      newExecAvail = true
    }
    // 遍历主机所在机架
   
for (rack <- getRackForHost(o.host)) {
      // 更新hosts和机架的映射
      hostsByRack
.getOrElseUpdate(rack,new HashSet[String]())+= o.host
    }
  }

// 将WorkerOffer打乱,做到负载均衡
 
val shuffledOffers= Random.shuffle(offers)
  // 构建一个task列表,然后分配给每一个worker
 
val tasks= shuffledOffers.map(o=> new ArrayBuffer[TaskDescription](o.cores))
  // 有效可用的CPU核数
 
val availableCpus= shuffledOffers.map(o=> o.cores).toArray
  // 从调度池获取按照调度策略排序好的TaskSetManager
 
val sortedTaskSets= rootPool.getSortedTaskSetQueue
  // 如果有新加入的executor,需要重新计算数据本地性
 
for (taskSet <- sortedTaskSets) {
    logDebug("parentName: %s, name: %s, runningTasks: %s".format(
      taskSet.parent.name, taskSet.name, taskSet.runningTasks))
    if (newExecAvail) {
      taskSet.executorAdded()
    }
  }
  // 为排好序的TaskSetManager列表分配资源,分配原则是就近原则,按照顺序为
  // PROCESS_LOCAL, NODE_LOCAL, NO_PREF,RACK_LOCAL, ANY
 
for (taskSet <- sortedTaskSets) {
    var launchedAnyTask= false
    var launchedTaskAtCurrentMaxLocality=false
    // 计算每一个TaskSetMangaer的本地化级别(locality_level),
    // 并且对task set尝试使用最小的本地化级别(locality_level),将task set的task在executor上启动
    // 如果启动不了,放大本地化级别,以此类推直到某种本地化级别尝试成功
   
for (currentMaxLocality <- taskSet.myLocalityLevels) {
      do {
        launchedTaskAtCurrentMaxLocality= resourceOfferSingleTaskSet(
          taskSet, currentMaxLocality, shuffledOffers,availableCpus, tasks)
        launchedAnyTask|= launchedTaskAtCurrentMaxLocality
      } while (launchedTaskAtCurrentMaxLocality)
    }
    // 如果这个task在任何本地化级别都启动不了,有可能在黑名单
   
if (!launchedAnyTask) {
      taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
    }
  }
  if (tasks.size> 0) {
    hasLaunchedTask= true
  }
  return tasks
}

2.5 resourceOfferSingleTaskSet 分配单个TaskSet里的task到executor

调用resourceOffer方法找到在executor上,哪些TaskSet的task可以通过当前本地化级别启动;遍历在该executor上当前本地化级别可以运行的task

private defresourceOfferSingleTaskSet(
    taskSet: TaskSetManager,
    maxLocality: TaskLocality,
    shuffledOffers: Seq[WorkerOffer],
    availableCpus: Array[Int],
    tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
  // 默认发起task为false
 
var launchedTask= false
  // 遍历所有executor
 
for (i <- 0 until shuffledOffers.size) {
    // 获取executorId和host
   
val execId= shuffledOffers(i).executorId
    val host= shuffledOffers(i).host
    // 必须要有每一个task可供分配的的CPU核数,否则直接返回
   
if (availableCpus(i) >=CPUS_PER_TASK) {
      try {
        // 调用resourceOffer方法找到在executor上,哪些TaskSet的task可以通过当前本地化级别启动
        // 遍历在该executor上当前本地化级别可以运行的task
       
for (task <- taskSet.resourceOffer(execId,host, maxLocality)) {
          // 如果存在,则把每一个task放入要在当前executor运行的task数组里面
          // 即指定executor要运行的task
         
tasks(i) += task
          // 将相应的分配信息加入内存缓存
         
val tid= task.taskId
          taskIdToTaskSetManager(tid) =taskSet
          taskIdToExecutorId(tid) =execId
          executorIdToRunningTaskIds(execId).add(tid)
          availableCpus(i) -= CPUS_PER_TASK
          assert
(availableCpus(i) >=0)
          launchedTask = true
        }
      } catch {
        case e:TaskNotSerializableException =>
          logError(s"Resource offer failed, task set${taskSet.name} was not serializable")
          // Do notoffer resources for this task, but don't throw an error to allow other
          // task sets to be submitted.
         
return launchedTask
      }
    }
  }
  return launchedTask
}

Spark源码分析之SchedulerBackend分析相关推荐

  1. Spark源码性能优化案例分析

    本篇文章枚举了几例常见的问题并给出了优化方案,推荐了两套测试性能优化工具 问题: Spark 任务文件初始化调优 资源分析,发现第一个 stage 时间特别长,耗时长达 14s , CPU 和网络通信 ...

  2. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  3. Spark 源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 一. 启动篇 (一) 引子 在spark-shell终端执行 val arr = Array(1,2,3,4) val rdd ...

  4. spark 源码分析之二十 -- Stage的提交

    引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...

  5. Spark源码分析:多种部署方式之间的区别与联系

    作者:过往记忆 从官方的文档我们可以知道, Spark 的部署方式有很多种:local.Standalone.Mesos.YARN-..不同部署方式的后台处理进程是不一样的,但是如果我们从代码的角度来 ...

  6. Spark源码分析之九:内存管理模型

    Spark是现在很流行的一个基于内存的分布式计算框架,既然是基于内存,那么自然而然的,内存的管理就是Spark存储管理的重中之重了.那么,Spark究竟采用什么样的内存管理模型呢?本文就为大家揭开Sp ...

  7. spark 源码分析 Blockmanager

    原文链接 参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memo ...

  8. Apache Spark源码走读之6 -- 存储子系统分析

    Spark计算速度远胜于Hadoop的原因之一就在于中间结果是缓存在内存而不是直接写入到disk,本文尝试分析Spark中存储子系统的构成,并以数据写入和数据读取为例,讲述清楚存储子系统中各部件的交互 ...

  9. Spark源码分析 – DAGScheduler

    DAGScheduler的架构其实非常简单, 1. eventQueue, 所有需要DAGScheduler处理的事情都需要往eventQueue中发送event 2. eventLoop Threa ...

  10. spark 源码分析之十八 -- Spark存储体系剖析

    本篇文章主要剖析BlockManager相关的类以及总结Spark底层存储体系. 总述 先看 BlockManager相关类之间的关系如下: 我们从NettyRpcEnv 开始,做一下简单说明. Ne ...

最新文章

  1. Java自学笔记(13):【面向对象】方法覆盖,final关键字,对象转型
  2. python2好还是python3好-新手入门选择Python2还是Python3
  3. ABP(现代ASP.NET样板开发框架)系列之2、ABP入门教程
  4. 大牛书单 | 春节特辑
  5. c语言的程序灵魂是什么,C语言 第二章 程序的灵魂--算法
  6. 德国大学:如何改变一个民族和整个世界的命运
  7. 在SSP中查询某个用户Profile信息的SQL语句
  8. pandas重置索引的几种方法探究
  9. vue 使用百度地图api_高水准 Vue 百度地图组件Vue-BaiduMap
  10. 2021 年百度之星·程序设计大赛 - 初赛三
  11. 梦想CAD控件关于批注问题
  12. 仓库温度湿度控制措施_一般仓库的温湿度控制范围是多少合适?
  13. 信息安全等级保护等级划分及适用行业
  14. KY261 Jugs
  15. laragon 更换php的版本
  16. KeyShot 实时光线追踪三维渲染软件
  17. 基于OpenStack的云计算平台搭建
  18. matlab中Cci,CCI指标实战操作中使用技巧
  19. JAVA学习-JDK8环境的安装与卸载
  20. 跬智信息(Kyligence)荣获浦东新区人工智能创新应用大赛一等奖

热门文章

  1. directui 3d界面引擎_美术设计师浅谈AR/VR中3D建模设计的工具、挑战与区别
  2. 5-11attention网络结构
  3. 3-28Pytorch与autograd导数
  4. Python数据结构:汉诺塔问题
  5. 物体抓取位姿估計算法綜述_3D视觉技术在机器人抓取作业中的应用
  6. 动态规划算法分析与探究
  7. 分数加减乘除混合运算带答案_分数分数加减乘除混合运算练习题及答案_0.doc
  8. matplotlib 子图超过4个_Python数据分析:用Matplotlib可视化创建套图
  9. 解决sodu echo写入文件是权限不足-bash: test.txt: Permission denied
  10. 注意力机制学习(一)——通道注意力与pytorch案例