Spark版本2.4.0

在SparkContext的初始化过程中,将会根据配置的启动模式来选择不同的任务调度器TaskScheduler,而这个不同模式的实现也是在这里根据选择的TaskScheduler类型进行区分并实现。

case masterUrl =>val cm = getClusterManager(masterUrl) match {case Some(clusterMgr) => clusterMgrcase None => throw new SparkException("Could not parse Master URL: '" + master + "'")}try {val scheduler = cm.createTaskScheduler(sc, masterUrl)val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)cm.initialize(scheduler, backend)(backend, scheduler)} catch {case se: SparkException => throw secase NonFatal(e) =>throw new SparkException("External scheduler cannot be instantiated", e)}

上方式SparkContext的createTaskScheduler()方法,在这里当选择了yarn模式,将会在这里加载相应的ClusterManager来进行创建TaskScheduler,在标题所提到的yarn-client模式下,这里会分别创建一个YarnScheduler和YarnClinetSchedulerBackend作为spark任务运行的调度者。

YarnScheduler实现只是简单的继承了local模型下会选择的TaskSchedulerImpl,因为在yarn-client模式下和local一样,Driver端运行在本地,所以YarnScheduler的实现并没有什么特殊的地方。

但是相应的,由于backend实现了和yarn的交互,自然实现存在比较大的差异。

当TaskScheduler正式开始启动的时候,在YarnClinetSchedulerBackend的start()方法中,也会开始初始化一个yarn客户端,并在这里完成向yarn的ResourceManager注册提交应用的流程。

override def start() {val driverHost = conf.get("spark.driver.host")val driverPort = conf.get("spark.driver.port")val hostport = driverHost + ":" + driverPortsc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) }val argsArrayBuf = new ArrayBuffer[String]()argsArrayBuf += ("--arg", hostport)logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))val args = new ClientArguments(argsArrayBuf.toArray)totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)client = new Client(args, conf)bindToYarn(client.submitApplication(), None)// SPARK-8687: Ensure all necessary properties have already been set before// we initialize our driver scheduler backend, which serves these properties// to the executorssuper.start()waitForApplication()monitorThread = asyncMonitorApplication()monitorThread.start()
}

上方是YarnClinetSchedulerBackend的start()方法,可以看到在这里核心两个步骤,构建Client,Client封装了与yarn的连接与操作,而后便是通过初始化完毕的Client通过submitApplication()方法提交应用。

重点来看Client的submitApplication()方法。

yarnClient.init(hadoopConf)
yarnClient.start()

首先根据工程中的配置完成yarnClient的初始化,之后相关操作都是通过yarnClient来进行完成。

val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()

之后先通过createApplication()方法向yarn申请一个新的Application,在这里得到的newAppResponse不仅包含yarn的相关配置部署信息及限制,更重要的是在这里返回了所申请应用在接下来的appId,在yarn模式下appid是yarn所提供的。

val containerContext = createContainerLaunchContext(newAppResponse)

接下来是重要的一步,根据createContainerLauchContext()方法来构建yarn中的重要属性Container的上下文containerContext。

对应在yarn中构建Container中所需的相关重要属性,都会在createContainerLauchContext()方法中得到。

val appId = newAppResponse.getApplicationId
val appStagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))

在这里,根据配置的hdfs属性,使用用户,以及刚刚得到的appid创建了之后相关jar包和资源将会上传的hdfs路径。

val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)
val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources.asJava)
amContainer.setEnvironment(launchEnv.asJava)

相应的得到了这个路径,将会在这里准备将将要上传至hdfs的本地资源准备上传到hdfs对应的路径上去。

val javaOpts = ListBuffer[String]()// Set the environment variable through a command prefix
// to append to the existing value of the variable
var prefixEnv: Option[String] = None// Add Xmx for AM memory
javaOpts += "-Xmx" + amMemory + "m"val tmpDir = new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
javaOpts += "-Djava.io.tmpdir=" + tmpDir// TODO: Remove once cpuset version is pushed out.
// The context is, default gc for server class machines ends up using all cores to do gc -
// hence if there are multiple containers in same node, Spark GC affects all other containers'
// performance (which can be that of other Spark containers)
// Instead of using this, rely on cpusets by YARN to enforce "proper" Spark behavior in
// multi-tenant environments. Not sure how default Java GC behaves if it is limited to subset
// of cores on a node.
val useConcurrentAndIncrementalGC = launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean)
if (useConcurrentAndIncrementalGC) {// In our expts, using (default) throughput collector has severe perf ramifications in// multi-tenant machinesjavaOpts += "-XX:+UseConcMarkSweepGC"javaOpts += "-XX:MaxTenuringThreshold=31"javaOpts += "-XX:SurvivorRatio=8"javaOpts += "-XX:+CMSIncrementalMode"javaOpts += "-XX:+CMSIncrementalPacing"javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"javaOpts += "-XX:CMSIncrementalDutyCycle=10"
}

显然上方一部分是在yarn上将要启动的一部分java命令行参数的构建,该部分代码只是对应功能的一部分实现,该部分涉及到的参数很多,代码也很长。

val amClass =if (isClusterMode) {Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName} else {Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName}

值得一提的是,yarn-client模式将要提交给yarn实现的ApplicationMaster将是ExecutorLauncher。

上述提到的都将作为一部分供在yarn上进行任务创建的时候使用。

val appContext = createApplicationSubmissionContext(newApp, containerContext)

在完成containerContext的创建,将会通过createApplicationSubmissionContext()方法创建appContext,这个app上下文将会直接被用在向yan提交app上。

yarnClient.submitApplication(appContext)

createApplicationSubmissionContext()方法中,进一步根据yarn的要求进行提交app的封装,之前提到的containerContext也会作为一部分被封装,最后通过yarnClient提交app宣告app的提交完毕。

到这里,yarn-client向yarn的ResourceManager提交ApplicationMaster的步骤完成。

提交到yarn上后,首先会在一个NodeManager上启动一个ExecutorLauncher来与先前的spark端进行通信,由于是yarn-client模式,将根据运行在本地的Driver端的调度来在yarn中进行task的创建。

object ExecutorLauncher {def main(args: Array[String]): Unit = {ApplicationMaster.main(args)}}

ExecutorLauncher的实现其实还是和yarn-cluster一样通过ApplicationMaster实现,但是将会在ApplicationMaster具体的实现逻辑中进行相应的区分。

在yarn-client模式中,ApplicationMaster的主要逻辑实现在了runExecutorLauncher()方法中。

val (driverHost, driverPort) = Utils.parseHostPort(args.userArgs(0))
val driverRef = rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort),YarnSchedulerBackend.ENDPOINT_NAME)
addAmIpFilter(Some(driverRef))
createAllocator(driverRef, sparkConf)

在runExecutorLauncher()方法中,首先会直接构造与Driver端的通信连接,并构造一个yarnAllocator准备通过和yarn申请资源来执行task。

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case r: RequestExecutors =>Option(allocator) match {case Some(a) =>if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) {resetAllocatorInterval()}context.reply(true)case None =>logWarning("Container allocator is not ready to request executors yet.")context.reply(false)}case KillExecutors(executorIds) =>logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.")Option(allocator) match {case Some(a) => executorIds.foreach(a.killExecutor)case None => logWarning("Container allocator is not ready to kill executors yet.")}context.reply(true)case GetExecutorLossReason(eid) =>Option(allocator) match {case Some(a) =>a.enqueueGetLossReasonRequest(eid, context)resetAllocatorInterval()case None =>logWarning("Container allocator is not ready to find executor loss reasons yet.")}
}

在于Driver端的通信中,将会持续监听Driver端的task下发,并根据此向yarn申请资源执行task。

override def onDisconnected(remoteAddress: RpcAddress): Unit = {// In cluster mode, do not rely on the disassociated event to exit// This avoids potentially reporting incorrect exit codes if the driver failsif (!isClusterMode) {logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)}
}

Yarn-client模式下本地Driver关闭会导致整个应用关闭也在此实现,当与Driver端的连接关闭的时候,将会结束在yarn上的运行。

最后回到Driver端,上文YarnClinetSchedulerBackend继承自YarnSchedulerBackend,当任务在调度执行环节时,将task下发至yarn上的ApplicationMaster,便是在YarnSchedulerBackend中实现的。

/*** Request executors from the ApplicationMaster by specifying the total number desired.* This includes executors already pending or running.*/
override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal))
}/*** Request that the ApplicationMaster kill the specified executors.*/
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {yarnSchedulerEndpointRef.ask[Boolean](KillExecutors(executorIds))
}

最后executor的下发都在这里通过网络通信下发到yarn上的ApplicationMaster,来进行远程调度。

以上便是spark on yarn中yarn-client的源码走读。

spark on yarn yarn-client模式实现源码走读相关推荐

  1. Spark源码走读概述

    Spark代码量 --Spark:20000loc --Hadoop 1.0:90000loc --Hadoop 2.0:220000loc Spark生态系统代码量 Spark生态系统 概述 --构 ...

  2. Java的三种代理模式完整源码分析

    Java的三种代理模式&完整源码分析 Java的三种代理模式&完整源码分析 参考资料: 博客园-Java的三种代理模式 简书-JDK动态代理-超详细源码分析 [博客园-WeakCach ...

  3. Apache Spark源码走读之16 -- spark repl实现详解

    欢迎转载,转载请注明出处,徽沪一郎. 概要 之所以对spark shell的内部实现产生兴趣全部缘于好奇代码的编译加载过程,scala是需要编译才能执行的语言,但提供的scala repl可以实现代码 ...

  4. 装饰者模式在源码中的应用

    装饰器模式在源码中也应用得非常多,在JDK 中体现最明显的类就是IO 相关的类,如BufferedReader.InputStream.OutputStream,看一下常用的InputStream 的 ...

  5. 【深入设计模式】责任链模式—责任链模式及责任链模式在源码中的应用

    文章目录 1. 责任链模式 1.1 责任链模式简介 1.2 责任链模式结构 1.3 责任链模式示例 2. 责任链模式在源码中的应用 2.1 Servlet 中的责任链模式 2.2 Spring 中的责 ...

  6. ConcurrentHashMap源码走读

    ConcurrentHashMap源码走读 文章目录 ConcurrentHashMap源码走读 简介 放入数据 容器元素总数更新 容器扩容 协助扩容 遍历 简介 在从JDK8开始,为了提高并发度,C ...

  7. JAVA进阶之路-CountDownLatch源码走读

    前言 本章用到了之前谈到的AQS,就是在该FIFO阻塞框架的基础上改造的,不理解的,可以去看JAVA进阶之路-AbstractQueuedSynchronizer(AQS)源码走读 用途 CountD ...

  8. 深度增强学习DDPG(Deep Deterministic Policy Gradient)算法源码走读

    原文链接:https://blog.csdn.net/jinzhuojun/article/details/82556127 本文是基于OpenAI推出deep reinforcement learn ...

  9. [redis 源码走读] 主从数据复制(上)

    阅读源码前,先了解 redis 主从复制的基本知识. 详细源码分析,请参考下一章:[redis 源码走读] 主从数据复制(下)

最新文章

  1. vsftpd服务与客户机的传输和下载
  2. 交流充电桩电路图_直流充电桩和交流充电桩给电动汽车充电过程中是如何工作的?...
  3. Android AutoCompleteTextView自动提示文本框
  4. python的前端个web的前端有什么区别_用Python 操作Web 前端 基础
  5. 九章基础算法01:链表
  6. eplan2.7在win10安装教程
  7. 监狱电视系统设计原则及应用场景
  8. 如何从uboot中推算路由器flash烧写地址
  9. PGP加密并签名邮件 实验
  10. 【c语言】打印出100以内奇数
  11. 将远程linux中的文件拷贝过来,Linux之间远程拷贝文件
  12. 2021-06-03TunePat Amazon Video Downloader使用教程:如何下载电影和电视节目
  13. 在n*n方阵里填入1,2,...n*n,要求填成蛇形
  14. 图片扩展名如何修改,转换图片格式轻松搞定
  15. linux去除内容重复行,Linux删除文本中的重复行 - 米扑博客
  16. c 语言编译程序的首要工作,2017年计算机基础试题选择题「附答案」
  17. DIH增量、定时导入并检索数据--转载
  18. 加密流量分类-论文6:Learning to Classify A Flow-Based Relation Network for Encrypted Traffic Classification
  19. 485 CAN 单总线 SPI I2C 的总结
  20. datatime模块之timedelta

热门文章

  1. java中的泛型(E)
  2. spring——autowire自动注入
  3. 更高效地刷OJ——Java中常用的排序方法,Array.sort(),Arrays.parallelSort(), Collections.sort()
  4. ES基础命令(参照mysql)
  5. matlab中结构体的定义,matlab中怎么定义结构体啊 !!!
  6. Linux初级运维(十七)——Linux内核编译与系统裁减
  7. Yii2语言国际化配置Twig翻译解决方案
  8. 内存泄露部分检测工具
  9. 系统补丁自动批量安装
  10. Angular 的概念模型