Task提交

在上一节中的 Stage提交中我们提到,最终stage被封装成TaskSet,使用taskScheduler.submitTasks提交,具体代码如下:

taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))

Stage由一系列的tasks组成,这些task被封装成TaskSet,TaskSet类定义如下:

/*** A set of tasks submitted together to the low-level TaskScheduler, usually representing* missing partitions of a particular stage.*/
private[spark] class TaskSet(val tasks: Array[Task[_]],val stageId: Int,val stageAttemptId: Int,val priority: Int,val properties: Properties) {val id: String = stageId + "." + stageAttemptIdoverride def toString: String = "TaskSet " + id
}

submitTasks方法定义在TaskScheduler Trait当中,目前TaskScheduler 只有一个子类TaskSchedulerImpl,其submitTasks方法源码如下:

//TaskSchedulerImpl类中的submitTasks方法
override def submitTasks(taskSet: TaskSet) {val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")this.synchronized {//创建TaskSetManager,TaskSetManager用于对TaskSet中的Task进行调度,包括跟踪Task的运行、Task失败重试等val manager = createTaskSetManager(taskSet, maxTaskFailures)val stage = taskSet.stageIdval stageTaskSets =taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])stageTaskSets(taskSet.stageAttemptId) = managerval conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>ts.taskSet != taskSet && !ts.isZombie}if (conflictingTaskSet) {throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")}//schedulableBuilder中添加TaskSetManager,用于完成所有TaskSet的调度,即整个Spark程序生成的DAG图对应Stage的TaskSet调度schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)if (!isLocal && !hasReceivedTask) {starvationTimer.scheduleAtFixedRate(new TimerTask() {override def run() {if (!hasLaunchedTask) {logWarning("Initial job has not accepted any resources; " +"check your cluster UI to ensure that workers are registered " +"and have sufficient resources")} else {this.cancel()}}}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)}hasReceivedTask = true}//为Task分配运行资源backend.reviveOffers()}

SchedulerBackend有多种实现,如下图所示:

我们以SparkDeploySchedulerBackend为例进行说明,SparkDeploySchedulerBackend继承自CoarseGrainedSchedulerBackend中的reviveOffers方法,具有代码如下:

//CoarseGrainedSchedulerBackend中定义的reviveOffers方法override def reviveOffers() {//driverEndpoint发送ReviveOffers消息,由DriverEndPoint接受处理driverEndpoint.send(ReviveOffers)}

driverEndpoint的类型是RpcEndpointRef

//CoarseGrainedSchedulerBackend中的成员变量driverEndpoint
var driverEndpoint: RpcEndpointRef = null

它具有如下定义形式:

//RpcEndpointRef是远程RpcEndpoint的引用,它是一个抽象类,有一个子类AkkaRpcEndpointRef
/*** A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.*/
private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)extends Serializable with Logging //在底层采用的是Akka进行实现
private[akka] class AkkaRpcEndpointRef(@transient defaultAddress: RpcAddress,@transient _actorRef: => ActorRef,@transient conf: SparkConf,@transient initInConstructor: Boolean = true)extends RpcEndpointRef(conf) with Logging {lazy val actorRef = _actorRefoverride lazy val address: RpcAddress = {val akkaAddress = actorRef.path.addressRpcAddress(akkaAddress.host.getOrElse(defaultAddress.host),akkaAddress.port.getOrElse(defaultAddress.port))}override lazy val name: String = actorRef.path.nameprivate[akka] def init(): Unit = {// Initialize the lazy valsactorRefaddressname}if (initInConstructor) {init()}override def send(message: Any): Unit = {actorRef ! AkkaMessage(message, false)}
//其它代码省略

DriverEndpoint中的receive方法接收driverEndpoint.send(ReviveOffers)发来的消息,DriverEndpoint继承了ThreadSafeRpcEndpoint trait,具体如下:

class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])extends ThreadSafeRpcEndpoint with Logging

ThreadSafeRpcEndpoint 继承 RpcEndpoint trait,RpcEndpoint对receive方法进行了描述,具体如下:

/*** Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a* unmatched message, [[SparkException]] will be thrown and sent to `onError`.*/def receive: PartialFunction[Any, Unit] = {case _ => throw new SparkException(self + " does not implement 'receive'")}

DriverEndpoint 中的对其receive方法进行了重写,具体实现如下:

 override def receive: PartialFunction[Any, Unit] = {case StatusUpdate(executorId, taskId, state, data) =>scheduler.statusUpdate(taskId, state, data.value)if (TaskState.isFinished(state)) {executorDataMap.get(executorId) match {case Some(executorInfo) =>executorInfo.freeCores += scheduler.CPUS_PER_TASKmakeOffers(executorId)case None =>// Ignoring the update since we don't know about the executor.logWarning(s"Ignored task status update ($taskId state $state) " +s"from unknown executor with ID $executorId")}}//重要!处理发送来的ReviveOffers消息case ReviveOffers =>makeOffers()case KillTask(taskId, executorId, interruptThread) =>executorDataMap.get(executorId) match {case Some(executorInfo) =>executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread))case None =>// Ignoring the task kill since the executor is not registered.logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")}}

从上面的代码可以看到,处理ReviveOffers消息时,调用的是makeOffers方法

  // Make fake resource offers on all executorsprivate def makeOffers() {// Filter out executors under killing//所有可用的Executorval activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))//WorkOffer表示Executor上可用的资源,val workOffers = activeExecutors.map { case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toSeq//先调用TaskSchedulerImpl的resourceOffers方法,为Task的运行分配资源//再调用CoarseGrainedSchedulerBackend中的launchTasks方法启动Task的运行,最终Task被提交到Worker节点上的Executor上运行launchTasks(scheduler.resourceOffers(workOffers))}

上面的代码逻辑全部是在Driver端进行的,调用完launchTasks方法后,Task的执行便在Worker节点上运行了,至此完成Task的提交。
关于resourceOffers方法及launchTasks方法的具体内容,在后续章节中将进行进一步的解析。

Spark修炼之道(高级篇)——Spark源码阅读:第六节 Task提交相关推荐

  1. Spark修炼之道(高级篇)——Spark源码阅读:第九节 Task执行成功时的结果处理...

    Task执行成功时的结果处理 在上一节中,给出了Task在Executor上的运行代码演示,我们知道代码的最终运行通过的是TaskRunner方法 class TaskRunner(execBacke ...

  2. Linux驱动修炼之道-SPI驱动框架源码分析(上)

    Linux驱动修炼之道-SPI驱动框架源码分析(上)   SPI协议是一种同步的串行数据连接标准,由摩托罗拉公司命名,可工作于全双工模式.相关通讯设备可工作于m/s模式.主设备发起数据帧,允许多个从设 ...

  3. Soul 网关源码阅读(六)Sofa请求处理概览

    Soul 网关源码阅读(六)Sofa请求处理概览 简介     今天来探索一下Sofa请求处理流程,看看和前面的HTTP.Dubbo有什么异同 Sofa示例运行 PS:如果请求加上参数运行不成功,请更 ...

  4. Soul网关源码阅读(六)请求类型探索

    Soul网关源码阅读(六)请求类型探索 简介     在上几篇文章中分析了请求的处理流程,HTTP和RPC请求处理是互斥的,通过请求类型来判断,这篇文章来探索下请求类型的前世今生 源码分析     通 ...

  5. WINVNC源码阅读(六)

    VNC客户端源码 Windows版本的VNC客户端源码阅读笔记. while (!hosts.empty()) { char* hostinfo = hosts.front(); Thread* co ...

  6. Lidar_imu自动标定源码阅读(六)——run部分

    源码阅读,能力有限,如有某处理解错误,请指出,谢谢. run_lidar2imu.cpp:输入文件所在路径,开始标定. #include <Eigen/Core> #include < ...

  7. wrappers.php,PHP源码阅读笔记六:stream_get_wrappers函数

    PHP源码阅读笔记stream_get_wrappers函数 stream_get_wrappers (PHP 5) stream_get_wrappers - 返回注册的数据流列表 Descript ...

  8. 命令构建gradle项目_【Android 修炼手册】Gradle 篇 -- Gradle 源码分析

    预备知识 理解 gradle 的基本开发 了解 gradle task 和 plugin 使用及开发 了解 android gradle plugin 的使用 看完本文可以达到什么程度 了解 grad ...

  9. 【Android 修炼手册】Gradle 篇 -- Gradle 源码分析

    预备知识 理解 gradle 的基本开发 了解 gradle task 和 plugin 使用及开发 了解 android gradle plugin 的使用 看完本文可以达到什么程度 了解 grad ...

最新文章

  1. 紧急提醒!售价3980,成本价80,你被坑过吗?
  2. Learning hard C#学习笔记 C#简介
  3. 手把手教你用Python进行SSH暴力破解
  4. UE4学习-设置地图,创建材质
  5. 牛客题霸 [滑动窗口的最大值] C++题解/答案
  6. LeetCode (合集)合并链表和数组
  7. 云图说|读请求太多怎么办?一键读写分离来帮忙
  8. 辽宁工业大学有没有计算机专业,辽宁工业大学(专业学位)计算机技术考研难吗...
  9. python arp_用Python构造ARP请求、扫描、欺骗
  10. 英特尔发布CPU新架构,突破性采用3D堆栈法
  11. 重启windows资源管理器命令
  12. 简单好用的mac版Mysql可视化工具 - Sequel Pro
  13. python实现星号打印出金字塔
  14. 深入理解 OC/C++ 闭包
  15. 安装darknet报libQt5Core.so.5: undefined reference
  16. python中set什么意思_set在python里是什么意思
  17. 打破985校史!她以独作身份投中顶刊,曾因换方向重读博士7年,科研之路也“坎坎坷坷”……...
  18. 靶机渗透练习07-HackMyVm Area51 (Log4j2复现)
  19. 软件测试工程师和技术支持工程师哪个更有钱途,以后会发展更好?
  20. matlab解比例导引法方程,[转载]比例导引法在三维制导中应用的程序详解与过程图解...

热门文章

  1. 三维重建9:点云图像的滤波方法小结
  2. java注释模板_Java注释模板设置
  3. linux 再多的running也挡不住锁
  4. 2018第三届中国数字化零售创新国际峰会9月即将震撼来袭
  5. AD 脚本kixtart运用之五(用户电脑屏保设置)
  6. 够快云库, 加速企业信息化建设
  7. Office 365 Outlook Web App 移动设备体验
  8. CXF与Web项目集成---without Spring
  9. 在CentOS Linux上安装oracle11g之二 安装oracle11g
  10. SQL Server 2008连载之存储结构——基本系统视图