spark on yarn yarn-client模式实现源码走读
Spark版本2.4.0
在SparkContext的初始化过程中,将会根据配置的启动模式来选择不同的任务调度器TaskScheduler,而这个不同模式的实现也是在这里根据选择的TaskScheduler类型进行区分并实现。
case masterUrl =>val cm = getClusterManager(masterUrl) match {case Some(clusterMgr) => clusterMgrcase None => throw new SparkException("Could not parse Master URL: '" + master + "'")}try {val scheduler = cm.createTaskScheduler(sc, masterUrl)val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)cm.initialize(scheduler, backend)(backend, scheduler)} catch {case se: SparkException => throw secase NonFatal(e) =>throw new SparkException("External scheduler cannot be instantiated", e)}
上方式SparkContext的createTaskScheduler()方法,在这里当选择了yarn模式,将会在这里加载相应的ClusterManager来进行创建TaskScheduler,在标题所提到的yarn-client模式下,这里会分别创建一个YarnScheduler和YarnClinetSchedulerBackend作为spark任务运行的调度者。
YarnScheduler实现只是简单的继承了local模型下会选择的TaskSchedulerImpl,因为在yarn-client模式下和local一样,Driver端运行在本地,所以YarnScheduler的实现并没有什么特殊的地方。
但是相应的,由于backend实现了和yarn的交互,自然实现存在比较大的差异。
当TaskScheduler正式开始启动的时候,在YarnClinetSchedulerBackend的start()方法中,也会开始初始化一个yarn客户端,并在这里完成向yarn的ResourceManager注册提交应用的流程。
override def start() {val driverHost = conf.get("spark.driver.host")val driverPort = conf.get("spark.driver.port")val hostport = driverHost + ":" + driverPortsc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) }val argsArrayBuf = new ArrayBuffer[String]()argsArrayBuf += ("--arg", hostport)logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))val args = new ClientArguments(argsArrayBuf.toArray)totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)client = new Client(args, conf)bindToYarn(client.submitApplication(), None)// SPARK-8687: Ensure all necessary properties have already been set before// we initialize our driver scheduler backend, which serves these properties// to the executorssuper.start()waitForApplication()monitorThread = asyncMonitorApplication()monitorThread.start()
}
上方是YarnClinetSchedulerBackend的start()方法,可以看到在这里核心两个步骤,构建Client,Client封装了与yarn的连接与操作,而后便是通过初始化完毕的Client通过submitApplication()方法提交应用。
重点来看Client的submitApplication()方法。
yarnClient.init(hadoopConf)
yarnClient.start()
首先根据工程中的配置完成yarnClient的初始化,之后相关操作都是通过yarnClient来进行完成。
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()
之后先通过createApplication()方法向yarn申请一个新的Application,在这里得到的newAppResponse不仅包含yarn的相关配置部署信息及限制,更重要的是在这里返回了所申请应用在接下来的appId,在yarn模式下appid是yarn所提供的。
val containerContext = createContainerLaunchContext(newAppResponse)
接下来是重要的一步,根据createContainerLauchContext()方法来构建yarn中的重要属性Container的上下文containerContext。
对应在yarn中构建Container中所需的相关重要属性,都会在createContainerLauchContext()方法中得到。
val appId = newAppResponse.getApplicationId
val appStagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
在这里,根据配置的hdfs属性,使用用户,以及刚刚得到的appid创建了之后相关jar包和资源将会上传的hdfs路径。
val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)
val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources.asJava)
amContainer.setEnvironment(launchEnv.asJava)
相应的得到了这个路径,将会在这里准备将将要上传至hdfs的本地资源准备上传到hdfs对应的路径上去。
val javaOpts = ListBuffer[String]()// Set the environment variable through a command prefix
// to append to the existing value of the variable
var prefixEnv: Option[String] = None// Add Xmx for AM memory
javaOpts += "-Xmx" + amMemory + "m"val tmpDir = new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
javaOpts += "-Djava.io.tmpdir=" + tmpDir// TODO: Remove once cpuset version is pushed out.
// The context is, default gc for server class machines ends up using all cores to do gc -
// hence if there are multiple containers in same node, Spark GC affects all other containers'
// performance (which can be that of other Spark containers)
// Instead of using this, rely on cpusets by YARN to enforce "proper" Spark behavior in
// multi-tenant environments. Not sure how default Java GC behaves if it is limited to subset
// of cores on a node.
val useConcurrentAndIncrementalGC = launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean)
if (useConcurrentAndIncrementalGC) {// In our expts, using (default) throughput collector has severe perf ramifications in// multi-tenant machinesjavaOpts += "-XX:+UseConcMarkSweepGC"javaOpts += "-XX:MaxTenuringThreshold=31"javaOpts += "-XX:SurvivorRatio=8"javaOpts += "-XX:+CMSIncrementalMode"javaOpts += "-XX:+CMSIncrementalPacing"javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"javaOpts += "-XX:CMSIncrementalDutyCycle=10"
}
显然上方一部分是在yarn上将要启动的一部分java命令行参数的构建,该部分代码只是对应功能的一部分实现,该部分涉及到的参数很多,代码也很长。
val amClass =if (isClusterMode) {Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName} else {Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName}
值得一提的是,yarn-client模式将要提交给yarn实现的ApplicationMaster将是ExecutorLauncher。
上述提到的都将作为一部分供在yarn上进行任务创建的时候使用。
val appContext = createApplicationSubmissionContext(newApp, containerContext)
在完成containerContext的创建,将会通过createApplicationSubmissionContext()方法创建appContext,这个app上下文将会直接被用在向yan提交app上。
yarnClient.submitApplication(appContext)
createApplicationSubmissionContext()方法中,进一步根据yarn的要求进行提交app的封装,之前提到的containerContext也会作为一部分被封装,最后通过yarnClient提交app宣告app的提交完毕。
到这里,yarn-client向yarn的ResourceManager提交ApplicationMaster的步骤完成。
提交到yarn上后,首先会在一个NodeManager上启动一个ExecutorLauncher来与先前的spark端进行通信,由于是yarn-client模式,将根据运行在本地的Driver端的调度来在yarn中进行task的创建。
object ExecutorLauncher {def main(args: Array[String]): Unit = {ApplicationMaster.main(args)}}
ExecutorLauncher的实现其实还是和yarn-cluster一样通过ApplicationMaster实现,但是将会在ApplicationMaster具体的实现逻辑中进行相应的区分。
在yarn-client模式中,ApplicationMaster的主要逻辑实现在了runExecutorLauncher()方法中。
val (driverHost, driverPort) = Utils.parseHostPort(args.userArgs(0))
val driverRef = rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort),YarnSchedulerBackend.ENDPOINT_NAME)
addAmIpFilter(Some(driverRef))
createAllocator(driverRef, sparkConf)
在runExecutorLauncher()方法中,首先会直接构造与Driver端的通信连接,并构造一个yarnAllocator准备通过和yarn申请资源来执行task。
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case r: RequestExecutors =>Option(allocator) match {case Some(a) =>if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) {resetAllocatorInterval()}context.reply(true)case None =>logWarning("Container allocator is not ready to request executors yet.")context.reply(false)}case KillExecutors(executorIds) =>logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.")Option(allocator) match {case Some(a) => executorIds.foreach(a.killExecutor)case None => logWarning("Container allocator is not ready to kill executors yet.")}context.reply(true)case GetExecutorLossReason(eid) =>Option(allocator) match {case Some(a) =>a.enqueueGetLossReasonRequest(eid, context)resetAllocatorInterval()case None =>logWarning("Container allocator is not ready to find executor loss reasons yet.")}
}
在于Driver端的通信中,将会持续监听Driver端的task下发,并根据此向yarn申请资源执行task。
override def onDisconnected(remoteAddress: RpcAddress): Unit = {// In cluster mode, do not rely on the disassociated event to exit// This avoids potentially reporting incorrect exit codes if the driver failsif (!isClusterMode) {logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)}
}
Yarn-client模式下本地Driver关闭会导致整个应用关闭也在此实现,当与Driver端的连接关闭的时候,将会结束在yarn上的运行。
最后回到Driver端,上文YarnClinetSchedulerBackend继承自YarnSchedulerBackend,当任务在调度执行环节时,将task下发至yarn上的ApplicationMaster,便是在YarnSchedulerBackend中实现的。
/*** Request executors from the ApplicationMaster by specifying the total number desired.* This includes executors already pending or running.*/
override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal))
}/*** Request that the ApplicationMaster kill the specified executors.*/
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {yarnSchedulerEndpointRef.ask[Boolean](KillExecutors(executorIds))
}
最后executor的下发都在这里通过网络通信下发到yarn上的ApplicationMaster,来进行远程调度。
以上便是spark on yarn中yarn-client的源码走读。
spark on yarn yarn-client模式实现源码走读相关推荐
- Spark源码走读概述
Spark代码量 --Spark:20000loc --Hadoop 1.0:90000loc --Hadoop 2.0:220000loc Spark生态系统代码量 Spark生态系统 概述 --构 ...
- Java的三种代理模式完整源码分析
Java的三种代理模式&完整源码分析 Java的三种代理模式&完整源码分析 参考资料: 博客园-Java的三种代理模式 简书-JDK动态代理-超详细源码分析 [博客园-WeakCach ...
- Apache Spark源码走读之16 -- spark repl实现详解
欢迎转载,转载请注明出处,徽沪一郎. 概要 之所以对spark shell的内部实现产生兴趣全部缘于好奇代码的编译加载过程,scala是需要编译才能执行的语言,但提供的scala repl可以实现代码 ...
- 装饰者模式在源码中的应用
装饰器模式在源码中也应用得非常多,在JDK 中体现最明显的类就是IO 相关的类,如BufferedReader.InputStream.OutputStream,看一下常用的InputStream 的 ...
- 【深入设计模式】责任链模式—责任链模式及责任链模式在源码中的应用
文章目录 1. 责任链模式 1.1 责任链模式简介 1.2 责任链模式结构 1.3 责任链模式示例 2. 责任链模式在源码中的应用 2.1 Servlet 中的责任链模式 2.2 Spring 中的责 ...
- ConcurrentHashMap源码走读
ConcurrentHashMap源码走读 文章目录 ConcurrentHashMap源码走读 简介 放入数据 容器元素总数更新 容器扩容 协助扩容 遍历 简介 在从JDK8开始,为了提高并发度,C ...
- JAVA进阶之路-CountDownLatch源码走读
前言 本章用到了之前谈到的AQS,就是在该FIFO阻塞框架的基础上改造的,不理解的,可以去看JAVA进阶之路-AbstractQueuedSynchronizer(AQS)源码走读 用途 CountD ...
- 深度增强学习DDPG(Deep Deterministic Policy Gradient)算法源码走读
原文链接:https://blog.csdn.net/jinzhuojun/article/details/82556127 本文是基于OpenAI推出deep reinforcement learn ...
- [redis 源码走读] 主从数据复制(上)
阅读源码前,先了解 redis 主从复制的基本知识. 详细源码分析,请参考下一章:[redis 源码走读] 主从数据复制(下)
最新文章
- vsftpd服务与客户机的传输和下载
- 交流充电桩电路图_直流充电桩和交流充电桩给电动汽车充电过程中是如何工作的?...
- Android AutoCompleteTextView自动提示文本框
- python的前端个web的前端有什么区别_用Python 操作Web 前端 基础
- 九章基础算法01:链表
- eplan2.7在win10安装教程
- 监狱电视系统设计原则及应用场景
- 如何从uboot中推算路由器flash烧写地址
- PGP加密并签名邮件 实验
- 【c语言】打印出100以内奇数
- 将远程linux中的文件拷贝过来,Linux之间远程拷贝文件
- 2021-06-03TunePat Amazon Video Downloader使用教程:如何下载电影和电视节目
- 在n*n方阵里填入1,2,...n*n,要求填成蛇形
- 图片扩展名如何修改,转换图片格式轻松搞定
- linux去除内容重复行,Linux删除文本中的重复行 - 米扑博客
- c 语言编译程序的首要工作,2017年计算机基础试题选择题「附答案」
- DIH增量、定时导入并检索数据--转载
- 加密流量分类-论文6:Learning to Classify A Flow-Based Relation Network for Encrypted Traffic Classification
- 485 CAN 单总线 SPI I2C 的总结
- datatime模块之timedelta
热门文章
- java中的泛型(E)
- spring——autowire自动注入
- 更高效地刷OJ——Java中常用的排序方法,Array.sort(),Arrays.parallelSort(), Collections.sort()
- ES基础命令(参照mysql)
- matlab中结构体的定义,matlab中怎么定义结构体啊 !!!
- Linux初级运维(十七)——Linux内核编译与系统裁减
- Yii2语言国际化配置Twig翻译解决方案
- 内存泄露部分检测工具
- 系统补丁自动批量安装
- Angular 的概念模型