首先区分下AppMaster和Driver,任何一个yarn上运行的任务都必须有一个AppMaster,而任何一个Spark任务都会有一个Driver,Driver就是运行SparkContext(它会构建TaskScheduler和DAGScheduler)的进程,当然在Driver上你也可以做很多非Spark的事情,这些事情只会在Driver上面执行,而由SparkContext上牵引出来的代码则会由DAGScheduler分析,并形成Job和Stage交由TaskScheduler,再由TaskScheduler交由各Executor分布式执行。

所以Driver和AppMaster是两个完全不同的东西,Driver是控制Spark计算和任务资源的,而AppMaster是控制yarn app运行和任务资源的,只不过在Spark on Yarn上,这两者就出现了交叉,而在standalone模式下,资源则由Driver管理。在Spark on Yarn上,Driver会和AppMaster通信,资源的申请由AppMaster来完成,而任务的调度和执行则由Driver完成,Driver会通过与AppMaster通信来让Executor的执行具体的任务。

client与cluster的区别

对于yarn-client和yarn-cluster的唯一区别在于,yarn-client的Driver运行在本地,而AppMaster运行在yarn的一个节点上,他们之间进行远程通信,AppMaster只负责资源申请和释放(当然还有DelegationToken的刷新),然后等待Driver的完成;而yarn-cluster的Driver则运行在AppMaster所在的container里,Driver和AppMaster是同一个进程的两个不同线程,它们之间也会进行通信,AppMaster同样等待Driver的完成,从而释放资源。

Spark里AppMaster的实现:org.apache.spark.deploy.yarn.ApplicationMaster Yarn里MapReduce的AppMaster实现:org.apache.hadoop.mapreduce.v2.app.MRAppMaster

在yarn-client模式里,优先运行的是Driver(我们写的应用代码就是入口),然后在初始化SparkContext的时候,会作为client端向yarn申请AppMaster资源,当AppMaster运行后,它会向yarn注册自己并申请Executor资源,之后由本地Driver与其通信控制任务运行,而AppMaster则时刻监控Driver的运行情况,如果Driver完成或意外退出,AppMaster会释放资源并注销自己。所以在该模式下,如果运行spark-submit的程序退出了,整个任务也就退出了

在yarn-cluster模式里,本地进程则仅仅只是一个client,它会优先向yarn申请AppMaster资源运行AppMaster,在运行AppMaster的时候通过反射启动Driver(我们的应用代码),在SparkContext初始化成功后,再向yarn注册自己并申请Executor资源,此时Driver与AppMaster运行在同一个container里,是两个不同的线程,当Driver运行完毕,AppMaster会释放资源并注销自己。所以在该模式下,本地进程仅仅是一个client,如果结束了该进程,整个Spark任务也不会退出,因为Driver是在远程运行的

下面从源码的角度看看SparkSubmit的代码调用(基于Spark2.0.0):

代码公共部分

SparkSubmit#main =>

  1. val appArgs = new SparkSubmitArguments(args)
  2. appArgs.action match {
  3. // normal spark-submit
  4. case SparkSubmitAction.SUBMIT => submit(appArgs)
  5. // use --kill specified
  6. case SparkSubmitAction.KILL => kill(appArgs)
  7. // use --status specified
  8. case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
  9. }

SparkSubmit的main方法是在用户使用spark-submit脚本提交Spark app的时候调用的,可以看到正常情况下,它会调用SparkSubmit#submit方法

SparkSubmit#submit =>

  1. val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
  2. // 此处省略掉代理账户,异常处理,提交失败的重提交逻辑,只看主干代码
  3. runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)

在submit方法内部,会先进行提交环境相关的处理,调用的是SparkSubmit#prepareSubmitEnvironment方法,之后利用拿到的mainClass等信息,再调用SparkSubmit#runMain方法来执行对于主函数

SparkSubmit#prepareSubmitEnvironment =>

主干相关的代码如下:

  1. // yarn client mode
  2. if (deployMode == CLIENT) {
  3. // client 模式下,运行的是 --class 后指定的mainClass,也即我们的代码
  4. childMainClass = args.mainClass
  5. if (isUserJar(args.primaryResource)) {
  6. childClasspath += args.primaryResource
  7. }
  8. if (args.jars != null) { childClasspath ++= args.jars.split(",") }
  9. if (args.childArgs != null) { childArgs ++= args.childArgs }
  10. }
  11. // yarn cluster mode
  12. val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
  13. if (isYarnCluster) {
  14. // cluster 模式下,运行的是Client类
  15. childMainClass = "org.apache.spark.deploy.yarn.Client"
  16. if (args.isPython) {
  17. childArgs += ("--primary-py-file", args.primaryResource)
  18. childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
  19. } else if (args.isR) {
  20. val mainFile = new Path(args.primaryResource).getName
  21. childArgs += ("--primary-r-file", mainFile)
  22. childArgs += ("--class", "org.apache.spark.deploy.RRunner")
  23. } else {
  24. if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
  25. childArgs += ("--jar", args.primaryResource)
  26. }
  27. // 这里 --class 指定的是AppMaster里启动的Driver,也即我们的代码
  28. childArgs += ("--class", args.mainClass)
  29. }
  30. if (args.childArgs != null) {
  31. args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
  32. }
  33. }

在 prepareSubmitEnvironment 里,主要负责解析用户参数,设置环境变量env,处理python/R等依赖,然后针对不同的部署模式,匹配不同的运行主类,比如: yarn-client>args.mainClass,yarn-cluster>o.a.s.deploy.yarn.Client

SparkSubmit#runMain =>

骨干代码如下

  1. try {
  2. mainClass = Utils.classForName(childMainClass)
  3. } catch {
  4. // ...
  5. }
  6. val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
  7. try {
  8. // childArgs就是用户自己传给Spark应用代码的参数
  9. mainMethod.invoke(null, childArgs.toArray)
  10. } catch {
  11. // ...
  12. }

在runMain方法里,会设置ClassLoader,根据用户代码优先的设置(spark.driver.userClassPathFirst)来加载对应的类,然后反射调用prepareSubmitEnvironment方法返回的主类,并调用其main方法

从所反射的不同主类,我们来看看具体调用方式的不同:

对于yarn-cluster

o.a.s.deploy.yarn.Client#main =>

  1. val sparkConf = new SparkConf
  2. val args = new ClientArguments(argStrings)
  3. new Client(args, sparkConf).run()

在Client伴生对象里构建了Client类的对象,然后调用了Client#run方法

o.a.s.deploy.yarn.Client#run =>

  1. this.appId = submitApplication()
  2. // report application ...

run方法核心的就是提交任务到yarn,其调用了Client#submitApplication方法,拿到提交完的appID后,监控app的状态

o.a.s.deploy.yarn.Client#submitApplication =>

  1. try {
  2. // 获取提交用户的Credentials,用于后面获取delegationToken
  3. setupCredentials()
  4. yarnClient.init(yarnConf)
  5. yarnClient.start()
  6. // Get a new application from our RM
  7. val newApp = yarnClient.createApplication()
  8. val newAppResponse = newApp.getNewApplicationResponse()
  9. // 拿到appID
  10. appId = newAppResponse.getApplicationId()
  11. // 报告状态
  12. reportLauncherState(SparkAppHandle.State.SUBMITTED)
  13. launcherBackend.setAppId(appId.toString)
  14. // Verify whether the cluster has enough resources for our AM
  15. verifyClusterResources(newAppResponse)
  16. // 创建AppMaster运行的context,为其准备运行环境,java options,以及需要运行的java命令,AppMaster通过该命令在yarn节点上启动
  17. val containerContext = createContainerLaunchContext(newAppResponse)
  18. val appContext = createApplicationSubmissionContext(newApp, containerContext)
  19. // Finally, submit and monitor the application
  20. logInfo(s"Submitting application $appId to ResourceManager")
  21. yarnClient.submitApplication(appContext)
  22. appId
  23. } catch {
  24. case e: Throwable =>
  25. if (appId != null) {
  26. cleanupStagingDir(appId)
  27. }
  28. throw e
  29. }

在 submitApplication 里完成了app的申请,AppMaster context的创建,最后完成了任务的提交,对于cluster模式而言,任务提交后本地进程就只是一个client而已,Driver就运行在与AppMaster同一container里,对于client模式而言,执行 submitApplication 方法时,Driver已经在本地运行,这一步就只是提交任务到yarn而已

o.a.s.deploy.yarn.Client#createContainerLaunchContext

  1. val appStagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
  2. // 非pySpark时,pySparkArchives为Nil
  3. val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)
  4. // 这一步会进行delegationtoken的获取,存于Credentials,在AppMasterContainer构建完的最后将其存入到context里
  5. val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)
  6. val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
  7. // 设置AppMaster container运行的资源和环境
  8. amContainer.setLocalResources(localResources.asJava)
  9. amContainer.setEnvironment(launchEnv.asJava)
  10. // 设置JVM参数
  11. val javaOpts = ListBuffer[String]()
  12. javaOpts += "-Djava.io.tmpdir=" + tmpDir
  13. // other java opts setting...
  14. // 对于cluster模式,通过 --class 指定AppMaster运行我们的Driver端,对于client模式则纯作为资源申请和分配的工具
  15. val userClass =
  16. if (isClusterMode) {
  17. Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
  18. } else {
  19. Nil
  20. }
  21. // 设置AppMaster运行的主类
  22. val amClass =
  23. if (isClusterMode) {
  24. Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
  25. } else {
  26. // ExecutorLauncher只是ApplicationMaster的一个warpper
  27. Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
  28. }
  29. val amArgs =
  30. Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++
  31. userArgs ++ Seq(
  32. "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
  33. LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
  34. // Command for the ApplicationMaster
  35. val commands = prefixEnv ++ Seq(
  36. YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server"
  37. ) ++
  38. javaOpts ++ amArgs ++
  39. Seq(
  40. "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
  41. "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
  42. val printableCommands = commands.map(s => if (s == null) "null" else s).toList
  43. // 设置需运行的命令
  44. amContainer.setCommands(printableCommands.asJava)
  45. val securityManager = new SecurityManager(sparkConf)
  46. // 设置应用权限
  47. amContainer.setApplicationACLs(
  48. YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava)
  49. // 设置delegationToken
  50. setupSecurityToken(amContainer)

对于yarn-client

args.mainClass =>

在我们的Spark代码里,需要创建一个SparkContext来执行Spark任务,而在其构造器里创建TaskScheduler的时候,对于client模式就会向yarn申请资源提交任务,如下

  1. // 调用createTaskScheduler方法,对于yarn模式,master=="yarn"
  2. val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
  3. _schedulerBackend = sched
  4. _taskScheduler = ts
  5. // 创建DAGScheduler
  6. _dagScheduler = new DAGScheduler(this)

SparkContext#createTaskScheduler =>

这里会根据master匹配不同模式,比如local/standalone/yarn,在yarn模式下会利用ServiceLoader装载YarnClusterManager,然后由它创建TaskScheduler和SchedulerBackend,如下:

  1. // 当为yarn模式的时候
  2. case masterUrl =>
  3. // 利用当前loader装载YarnClusterManager,masterUrl为"yarn"
  4. val cm = getClusterManager(masterUrl) match {
  5. case Some(clusterMgr) => clusterMgr
  6. case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
  7. }
  8. try {
  9. // 创建TaskScheduler,这里masterUrl并没有用到
  10. val scheduler = cm.createTaskScheduler(sc, masterUrl)
  11. // 创建SchedulerBackend,对于client模式,这一步会向yarn申请AppMaster,提交任务
  12. val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
  13. cm.initialize(scheduler, backend)
  14. (backend, scheduler)
  15. } catch {
  16. case se: SparkException => throw se
  17. case NonFatal(e) =>
  18. throw new SparkException("External scheduler cannot be instantiated", e)
  19. }

YarnClusterManager#createSchedulerBackend

  1. sc.deployMode match {
  2. case "cluster" =>
  3. new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
  4. case "client" =>
  5. new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
  6. case  _ =>
  7. throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
  8. }

可以看到yarn下的SchedulerBackend实现对于client和cluster模式是不同的,yarn-client模式为YarnClientSchedulerBackend,yarn-cluster模式为 YarnClusterSchedulerBackend,之所以不同,是因为在client模式下,YarnClientSchedulerBackend 相当于 yarn application 的client,它会调用o.a.s.deploy.yarn.Client#submitApplication 来准备环境,申请资源并提交yarn任务,如下:

  1. val driverHost = conf.get("spark.driver.host")
  2. val driverPort = conf.get("spark.driver.port")
  3. val hostport = driverHost + ":" + driverPort
  4. sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.appUIAddress) }
  5. val argsArrayBuf = new ArrayBuffer[String]()
  6. argsArrayBuf += ("--arg", hostport)
  7. val args = new ClientArguments(argsArrayBuf.toArray)
  8. totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(conf)
  9. // 创建o.a.s.deploy.yarn.Client对象
  10. client = new Client(args, conf)
  11. // 调用submitApplication准备环境,申请资源,提交任务,并把appID保存下来
  12. // 对于submitApplication,前文有详细的分析,这里与前面是一致的
  13. bindToYarn(client.submitApplication(), None)

而在 YarnClusterSchedulerBackend 里,由于 AppMaster 已经运行起来了,所以它并不需要再做申请资源等等工作,只需要保存appID和attemptID并启动SchedulerBackend即可.

本文作者:佚名

来源:51CTO

从源码角度看Spark on yarn client cluster模式的本质区别相关推荐

  1. 从JDK源码角度看Long

    概况 Java的Long类主要的作用就是对基本类型long进行封装,提供了一些处理long类型的方法,比如long到String类型的转换方法或String类型到long类型的转换方法,当然也包含与其 ...

  2. 从源码角度看Android系统Launcher在开机时的启动过程

    Launcher是Android所有应用的入口,用来显示系统中已经安装的应用程序图标. Launcher本身也是一个App,一个提供桌面显示的App,但它与普通App有如下不同: Launcher是所 ...

  3. 从源码角度看Android系统SystemServer进程启动过程

    SystemServer进程是由Zygote进程fork生成,进程名为system_server,主要用于创建系统服务. 备注:本文将结合Android8.0的源码看SystemServer进程的启动 ...

  4. 从源码角度看Android系统Zygote进程启动过程

    在Android系统中,DVM.ART.应用程序进程和SystemServer进程都是由Zygote进程创建的,因此Zygote又称为"孵化器".它是通过fork的形式来创建应用程 ...

  5. 从JDK源码角度看Short

    概况 Java的Short类主要的作用就是对基本类型short进行封装,提供了一些处理short类型的方法,比如short到String类型的转换方法或String类型到short类型的转换方法,当然 ...

  6. 从源码角度看CPU相关日志

    简介 (本文原地址在我的博客CheapTalks, 欢迎大家来看看~) 安卓系统中,普通开发者常常遇到的是ANR(Application Not Responding)问题,即应用主线程没有相应.根本 ...

  7. 从template到DOM(Vue.js源码角度看内部运行机制)

    写在前面 这篇文章算是对最近写的一系列Vue.js源码的文章(github.com/answershuto-)的总结吧,在阅读源码的过程中也确实受益匪浅,希望自己的这些产出也会对同样想要学习Vue.j ...

  8. Spark源码走读10——Spark On Yarn

    首先需要修改配置文件spark-env.sh.在这个文件中需要添加两个属性: Export HADOOP_HOME=/../hadoop.. ExportHADOOP_CONF_DIR=/../had ...

  9. hotspot源码角度看OOP之类属性的底层实现(一)

    hello,大家好,我是江湖人送外号[道格牙]的子牙老师. 最近看hotspo源码有点入迷.hotspot就像一座宝库,等你探索的东西太多了.每次达到一个新的Level回头细看,都有不同的感触.入迷归 ...

最新文章

  1. 使用 yolov3训练 voc2012
  2. 【C++多线程系列】【四】将类的成员函数作为线程启动函数
  3. Database之SQLSever:SQL命令实现查询之多表查询、嵌套查询、分页复杂查询,删除表内重复记录数据、连接(join、left join和right join简介及其区别)等案例之详细攻略
  4. ThreadLocal的使用
  5. 2020 年微服务项目活跃度报告
  6. Android后台杀死系列之二:ActivityManagerService与App现场恢复机制
  7. 关于大数据学习,实战型的书籍
  8. d3.js 实现烟花鲜果
  9. 清除浮动造成的影响的解决方案总结
  10. 免费体验,阿里云智能LOGO帮你解决设计难题
  11. python变量区变量保存与加载_python – Flask:使用全局变量将数据文件加载到内存中...
  12. ubuntu动态截图(GIF动画)
  13. 聊一聊使用airtest-selenium做Web自动化的常见问题
  14. 【图神经网络】从源头探讨 GCN 的行文思路
  15. Active Directory Get User's groups using LDAP
  16. 汉宁窗+matlab,m汉宁窗hanning汉明窗hamming矩形窗-read.ppt
  17. Connection: Keep-Alive
  18. Windows10 Windows许可证即将过期
  19. 一些免费在线杀毒网址
  20. Ubuntu18.04和Win10共享文件夹

热门文章

  1. WordPress作品设计素材图片站资讯文章教程uigreat主题
  2. wordpress删除网址中的category前缀
  3. PHP在线小说txt生成器源码
  4. codeigniter mysql查询_php – CodeIgniter MySQL查询不起作用
  5. linux mysql general_利用mysql general log 写shell 可行性简要分析
  6. sessionStorage、localStorage存储api
  7. HTML高仿哔哩哔哩(B站)视频网站整站模板
  8. 移通好闹钟微信小程序全套源码
  9. EduSoho网络课堂通用版
  10. sql复制表定义及复制数据行