Spark修炼之道(高级篇)——Spark源码阅读:第六节 Task提交
Task提交
在上一节中的 Stage提交中我们提到,最终stage被封装成TaskSet,使用taskScheduler.submitTasks提交,具体代码如下:
taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
Stage由一系列的tasks组成,这些task被封装成TaskSet,TaskSet类定义如下:
/*** A set of tasks submitted together to the low-level TaskScheduler, usually representing* missing partitions of a particular stage.*/
private[spark] class TaskSet(val tasks: Array[Task[_]],val stageId: Int,val stageAttemptId: Int,val priority: Int,val properties: Properties) {val id: String = stageId + "." + stageAttemptIdoverride def toString: String = "TaskSet " + id
}
submitTasks方法定义在TaskScheduler Trait当中,目前TaskScheduler 只有一个子类TaskSchedulerImpl,其submitTasks方法源码如下:
//TaskSchedulerImpl类中的submitTasks方法
override def submitTasks(taskSet: TaskSet) {val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")this.synchronized {//创建TaskSetManager,TaskSetManager用于对TaskSet中的Task进行调度,包括跟踪Task的运行、Task失败重试等val manager = createTaskSetManager(taskSet, maxTaskFailures)val stage = taskSet.stageIdval stageTaskSets =taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])stageTaskSets(taskSet.stageAttemptId) = managerval conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>ts.taskSet != taskSet && !ts.isZombie}if (conflictingTaskSet) {throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")}//schedulableBuilder中添加TaskSetManager,用于完成所有TaskSet的调度,即整个Spark程序生成的DAG图对应Stage的TaskSet调度schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)if (!isLocal && !hasReceivedTask) {starvationTimer.scheduleAtFixedRate(new TimerTask() {override def run() {if (!hasLaunchedTask) {logWarning("Initial job has not accepted any resources; " +"check your cluster UI to ensure that workers are registered " +"and have sufficient resources")} else {this.cancel()}}}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)}hasReceivedTask = true}//为Task分配运行资源backend.reviveOffers()}
SchedulerBackend有多种实现,如下图所示:
我们以SparkDeploySchedulerBackend为例进行说明,SparkDeploySchedulerBackend继承自CoarseGrainedSchedulerBackend中的reviveOffers方法,具有代码如下:
//CoarseGrainedSchedulerBackend中定义的reviveOffers方法override def reviveOffers() {//driverEndpoint发送ReviveOffers消息,由DriverEndPoint接受处理driverEndpoint.send(ReviveOffers)}
driverEndpoint的类型是RpcEndpointRef
//CoarseGrainedSchedulerBackend中的成员变量driverEndpoint
var driverEndpoint: RpcEndpointRef = null
它具有如下定义形式:
//RpcEndpointRef是远程RpcEndpoint的引用,它是一个抽象类,有一个子类AkkaRpcEndpointRef
/*** A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.*/
private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)extends Serializable with Logging //在底层采用的是Akka进行实现
private[akka] class AkkaRpcEndpointRef(@transient defaultAddress: RpcAddress,@transient _actorRef: => ActorRef,@transient conf: SparkConf,@transient initInConstructor: Boolean = true)extends RpcEndpointRef(conf) with Logging {lazy val actorRef = _actorRefoverride lazy val address: RpcAddress = {val akkaAddress = actorRef.path.addressRpcAddress(akkaAddress.host.getOrElse(defaultAddress.host),akkaAddress.port.getOrElse(defaultAddress.port))}override lazy val name: String = actorRef.path.nameprivate[akka] def init(): Unit = {// Initialize the lazy valsactorRefaddressname}if (initInConstructor) {init()}override def send(message: Any): Unit = {actorRef ! AkkaMessage(message, false)}
//其它代码省略
DriverEndpoint中的receive方法接收driverEndpoint.send(ReviveOffers)发来的消息,DriverEndpoint继承了ThreadSafeRpcEndpoint trait,具体如下:
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])extends ThreadSafeRpcEndpoint with Logging
ThreadSafeRpcEndpoint 继承 RpcEndpoint trait,RpcEndpoint对receive方法进行了描述,具体如下:
/*** Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a* unmatched message, [[SparkException]] will be thrown and sent to `onError`.*/def receive: PartialFunction[Any, Unit] = {case _ => throw new SparkException(self + " does not implement 'receive'")}
DriverEndpoint 中的对其receive方法进行了重写,具体实现如下:
override def receive: PartialFunction[Any, Unit] = {case StatusUpdate(executorId, taskId, state, data) =>scheduler.statusUpdate(taskId, state, data.value)if (TaskState.isFinished(state)) {executorDataMap.get(executorId) match {case Some(executorInfo) =>executorInfo.freeCores += scheduler.CPUS_PER_TASKmakeOffers(executorId)case None =>// Ignoring the update since we don't know about the executor.logWarning(s"Ignored task status update ($taskId state $state) " +s"from unknown executor with ID $executorId")}}//重要!处理发送来的ReviveOffers消息case ReviveOffers =>makeOffers()case KillTask(taskId, executorId, interruptThread) =>executorDataMap.get(executorId) match {case Some(executorInfo) =>executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread))case None =>// Ignoring the task kill since the executor is not registered.logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")}}
从上面的代码可以看到,处理ReviveOffers消息时,调用的是makeOffers方法
// Make fake resource offers on all executorsprivate def makeOffers() {// Filter out executors under killing//所有可用的Executorval activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))//WorkOffer表示Executor上可用的资源,val workOffers = activeExecutors.map { case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toSeq//先调用TaskSchedulerImpl的resourceOffers方法,为Task的运行分配资源//再调用CoarseGrainedSchedulerBackend中的launchTasks方法启动Task的运行,最终Task被提交到Worker节点上的Executor上运行launchTasks(scheduler.resourceOffers(workOffers))}
上面的代码逻辑全部是在Driver端进行的,调用完launchTasks方法后,Task的执行便在Worker节点上运行了,至此完成Task的提交。
关于resourceOffers方法及launchTasks方法的具体内容,在后续章节中将进行进一步的解析。
Spark修炼之道(高级篇)——Spark源码阅读:第六节 Task提交相关推荐
- Spark修炼之道(高级篇)——Spark源码阅读:第九节 Task执行成功时的结果处理...
Task执行成功时的结果处理 在上一节中,给出了Task在Executor上的运行代码演示,我们知道代码的最终运行通过的是TaskRunner方法 class TaskRunner(execBacke ...
- Linux驱动修炼之道-SPI驱动框架源码分析(上)
Linux驱动修炼之道-SPI驱动框架源码分析(上) SPI协议是一种同步的串行数据连接标准,由摩托罗拉公司命名,可工作于全双工模式.相关通讯设备可工作于m/s模式.主设备发起数据帧,允许多个从设 ...
- Soul 网关源码阅读(六)Sofa请求处理概览
Soul 网关源码阅读(六)Sofa请求处理概览 简介 今天来探索一下Sofa请求处理流程,看看和前面的HTTP.Dubbo有什么异同 Sofa示例运行 PS:如果请求加上参数运行不成功,请更 ...
- Soul网关源码阅读(六)请求类型探索
Soul网关源码阅读(六)请求类型探索 简介 在上几篇文章中分析了请求的处理流程,HTTP和RPC请求处理是互斥的,通过请求类型来判断,这篇文章来探索下请求类型的前世今生 源码分析 通 ...
- WINVNC源码阅读(六)
VNC客户端源码 Windows版本的VNC客户端源码阅读笔记. while (!hosts.empty()) { char* hostinfo = hosts.front(); Thread* co ...
- Lidar_imu自动标定源码阅读(六)——run部分
源码阅读,能力有限,如有某处理解错误,请指出,谢谢. run_lidar2imu.cpp:输入文件所在路径,开始标定. #include <Eigen/Core> #include < ...
- wrappers.php,PHP源码阅读笔记六:stream_get_wrappers函数
PHP源码阅读笔记stream_get_wrappers函数 stream_get_wrappers (PHP 5) stream_get_wrappers - 返回注册的数据流列表 Descript ...
- 命令构建gradle项目_【Android 修炼手册】Gradle 篇 -- Gradle 源码分析
预备知识 理解 gradle 的基本开发 了解 gradle task 和 plugin 使用及开发 了解 android gradle plugin 的使用 看完本文可以达到什么程度 了解 grad ...
- 【Android 修炼手册】Gradle 篇 -- Gradle 源码分析
预备知识 理解 gradle 的基本开发 了解 gradle task 和 plugin 使用及开发 了解 android gradle plugin 的使用 看完本文可以达到什么程度 了解 grad ...
最新文章
- 紧急提醒!售价3980,成本价80,你被坑过吗?
- Learning hard C#学习笔记 C#简介
- 手把手教你用Python进行SSH暴力破解
- UE4学习-设置地图,创建材质
- 牛客题霸 [滑动窗口的最大值] C++题解/答案
- LeetCode (合集)合并链表和数组
- 云图说|读请求太多怎么办?一键读写分离来帮忙
- 辽宁工业大学有没有计算机专业,辽宁工业大学(专业学位)计算机技术考研难吗...
- python arp_用Python构造ARP请求、扫描、欺骗
- 英特尔发布CPU新架构,突破性采用3D堆栈法
- 重启windows资源管理器命令
- 简单好用的mac版Mysql可视化工具 - Sequel Pro
- python实现星号打印出金字塔
- 深入理解 OC/C++ 闭包
- 安装darknet报libQt5Core.so.5: undefined reference
- python中set什么意思_set在python里是什么意思
- 打破985校史!她以独作身份投中顶刊,曾因换方向重读博士7年,科研之路也“坎坎坷坷”……...
- 靶机渗透练习07-HackMyVm Area51 (Log4j2复现)
- 软件测试工程师和技术支持工程师哪个更有钱途,以后会发展更好?
- matlab解比例导引法方程,[转载]比例导引法在三维制导中应用的程序详解与过程图解...