spark提交应用的全流程分析

@(SPARK)[spark]

本文分析一下spark的应用通过spark-submit后,如何提交到集群中并开始运行。

先介绍一下spark从提交到运行的全流程,下面再详细分析。

  • 1、用户通过spark-submit脚本提交应用。
  • 2、spark-submit根据用户代码及配置确定使用哪个资源管理器,以及在合适的位置启动driver。
  • 3、driver与集群管理器(如YARN)通信,申请资源以启动executor。
  • 4、集群管理器启动executor。
  • 5、driver进程执行用户的代码,根据程序中定义的transformation和action,进行stage的划分,然后以task的形式发送到executor。(通过DAGScheduler划分stage,通过TaskScheduler和TaskSchedulerBackend来真正申请资源运行task)
  • 6、task在executor中进行计算并保存结果。
  • 7、如果driver中的main()方法执行完成退出,或者调用了SparkContext#stop(),driver会终止executor进程,并且通过集群管理器释放资源。

一、提交前准备

(一)脚本调用

1、spark-submit

spark通过spark-submit脚本来向集群提交应用,举个例子:

/home/hadoop/spark/bin/spark-submit --master yarn-client --num-executors 10 --class com.lujinhong.spark.ml.TrainModel myusml-0.0.1-SNAPSHOT.jar args1 args2 args3

我们看看spark-submit脚本,很简单,只有3行:

SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

其实就是调用spark-class这个脚本。

2、spark-class

spark-class完成了配置的加载:

. "$SPARK_HOME"/bin/load-spark-env.sh

以及调用上面说的SparkSubmit类

3、load-spark-env

有兴趣的可以看看如何加载配置,主要是spark-evn.sh文件,以及scala的版本等。

(二)SparkSubmit

1、main函数

很简单,appArgs解释命令行中的参数,然后判断action是什么,并执行相应的操作。

  def main(args: Array[String]): Unit = {val appArgs = new SparkSubmitArguments(args)if (appArgs.verbose) {// scalastyle:off printlnprintStream.println(appArgs)// scalastyle:on println}appArgs.action match {case SparkSubmitAction.SUBMIT => submit(appArgs)case SparkSubmitAction.KILL => kill(appArgs)case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)}}

verbose是一个布尔值,用于确定是否打印一些JVM的信息,默认为false。
action的默认值是submit,我们这里也只分析submit的过程,因此下面将进入submit函数看目的地。

2、submit(args: SparkSubmitArguments): Unit

submit函数先是将参数转化为一个4元组的形式:

val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)

然后就使用这些参数调用runMain函数了:

runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)

其它内容分别考虑了使用proxy以及standalone的情形。下一步:runMain函数。

3、runMain函数

runMain函数开始执行Client类中的main函数了。

首先是一大堆的环境变量及参数的加载,判断类是否存在等,最后的目的是执行Client类中的main函数。

找到主类:

 mainClass = Utils.classForName(childMainClass)

然后是主函数:

val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)

最后调用main方法:

  mainMethod.invoke(null, childArgs.toArray)

那mainClass是哪个类呢?对于yarn-cluster来说,是:

if (isYarnCluster) childMainClass = "org.apache.spark.deploy.yarn.Client"

如果是yarn-client,是:

// In client mode, launch the application main class directly
// In addition, add the main application jar and any added jars (if any) to the classpath
if (deployMode == CLIENT) childMainClass = args.mainClass

即,client类就是用户定义的主类,直接开始运行主类即可。

二、提交应用

(一)yarn-cluster方式

我们先看一下yarn-cluster方式,由上面的分析可知,yarn-cluster使用的是org.apache.spark.deploy.yarn.Client这个类进行任务提交,先看一下流程图:

图片来自于spark技术内幕P84,下同。
先说一下总体的流程步骤:
======================================
步骤一:Client类提交应用到YARN ResourceManager,向RM申请资源作为AM
步骤二:在申请到的机器中启动driver,注册成为AM,并调用用户代码,然后创建SparkContext。(driver是一个逻辑概念,并不实际存在,通过抽象出driver这一层,所有的运行模式都可以说是在driver中调用用户代码了)
步骤三:SparkContext中创建DAGScheduler与YarnClusterScheduler与YarnClusterSchedulerBackend。当在用户代码中遇到action时,即会调用DAGScheduler的runJob,任务开始调度执行。
步骤四:YarnClusterSchedulerBackend在NodeManager上启动Executor
步骤五:Executor启动Task,开始执行任务
======================================
简单的说就是:
向RM申请资源建立driver——->在driver中执行用户代码,并创建AM——->遇到action时调用runJob——->开始调度、执行的过程了
因此3个比较复杂的流程分别为:
* 1、如何向YARN中申请资源,这涉及YARN的源码
* 2、如何调度,涉及DAGScheduler、YarnClusterScheduler与YarnClusterSchedulerBackend
* 3、如何执行任务,涉及Executor与Task。这3个部分会有专门的章节来讨论,我们这里先把整个流程理顺。

下面按按被调用的类来详细分析一下:

1、Client

Client类作为向YARN提交应用的客户端

步骤一:Client类提交应用到YARN ResourceManager,向RM申请资源作为 AM

(1)main函数
我们从main函数开始入手:

  def main(argStrings: Array[String]) {.....new Client(args, sparkConf).run()}

将不关键代码去掉后,就剩下一行,它调用run方法,继续看run方法

(2)run方法
好吧,它的主要内容也只是一行:

  def run(): Unit = {val appId = submitApplication().......}

它调用了submitApplication方法。

(3)submitApplication方法

  def submitApplication(): ApplicationId = {var appId: ApplicationId = nulltry {// Setup the credentials before doing anything else,// so we have don't have issues at any point.setupCredentials()yarnClient.init(yarnConf)yarnClient.start()// Get a new application from our RMval newApp = yarnClient.createApplication()val newAppResponse = newApp.getNewApplicationResponse()appId = newAppResponse.getApplicationId()// Verify whether the cluster has enough resources for our AMverifyClusterResources(newAppResponse)// Set up the appropriate contexts to launch our AMval containerContext = createContainerLaunchContext(newAppResponse)val appContext = createApplicationSubmissionContext(newApp, containerContext)// Finally, submit and monitor the applicationyarnClient.submitApplication(appContext)appId}

在submitApplication方法中,先对yarnClient进行了初始化,并从RM中申请到一个application,设置合适的AM(见下一点),最后就向RM提交应用了,并返回应用的ID。

(4)createContainerLaunchContext方法
上面在启动一个应用前,调用了createContainerLaunchContext方法,用于指定的appContext使用哪个AM:

val amClass =if (isClusterMode) {Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName} else {Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName}

上面代码中指定了当yarn-cluster模式和yarn-client时,分别使用哪个类作为AM。

当向RM提交应用后,RM就会开始启动AM。YARN中启动AM的源码分析以后再补充。

步骤二:在申请到的机器中启动driver,注册成为AM,并调用用户代码,然后创建SparkContext。

2、ApplicationMaster

(1)main函数
当RM启动AM后,AM就开始执行main函数了

  def main(args: Array[String]): Unit = {val amArgs = new ApplicationMasterArguments(args)SparkHadoopUtil.get.runAsSparkUser { () =>master = new ApplicationMaster(amArgs, new YarnRMClient(amArgs))System.exit(master.run())}}

关键是调用了run方法,我们继续看run方法。

(2)run方法
先是设置了一些参数,并加载yarn的配置文件。然后设置了一些钩子
最后关键是执行了这2个方法:

if (isClusterMode) {runDriver(securityMgr)} else {runExecutorLauncher(securityMgr)}

分别对应yarn-cluster模式和yarn-client模式。

(3)runDriver方法
定义了如何启动driver,这也是yarn-cluster和yarn-client最大的区别,前者在yarn分配一台机器启动driver,并注册成为AM,而后者在本地上启动driver,再注册成为AM。

  private def runDriver(securityMgr: SecurityManager): Unit = {addAmIpFilter()userClassThread = startUserApplication()// This a bit hacky, but we need to wait until the spark.driver.port property has// been set by the Thread executing the user class.val sc = waitForSparkContextInitialized()// If there is no SparkContext at this point, just fail the app.if (sc == null) {finish(FinalApplicationStatus.FAILED,ApplicationMaster.EXIT_SC_NOT_INITED,"Timed out waiting for SparkContext.")} else {rpcEnv = sc.env.rpcEnvval driverRef = runAMEndpoint(sc.getConf.get("spark.driver.host"),sc.getConf.get("spark.driver.port"),isClusterMode = true)registerAM(rpcEnv, driverRef, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)userClassThread.join()}}

startUserApplication主要执行了调用用户的代码,以及创建了一个spark driver的进程。
Start the user class, which contains the spark driver, in a separate Thread.

registerAM向RM中正式注册AM。有了AM以后,用户代码就可以执行了,开始将任务切分、调度、执行。我们继续往下看。

然后,用户代码中的action会调用SparkContext的runJob,SparkContext中有很多个runJob,但最后都是调用DAGScheduler的runJob

步骤三:SparkContext中创建DAGScheduler与YarnClusterScheduler与YarnClusterSchedulerBackend

val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)

然后调用DAGScheduler的runJob:
* Run a function on a given set of partitions in an RDD and pass the results to the given handler function. This is the main entry point for all actions in Spark.*

      def runJob[T, U: ClassTag](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],resultHandler: (Int, U) => Unit): Unit = {if (stopped.get()) {throw new IllegalStateException("SparkContext has been shutdown")}val callSite = getCallSiteval cleanedFunc = clean(func)logInfo("Starting job: " + callSite.shortForm)if (conf.getBoolean("spark.logLineage", false)) {logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)}dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)progressBar.foreach(_.finishAll())rdd.doCheckpoint()}

至此,应用就正式提交到集群准备运行了。

然后就开始DAGScheduler调用YarnClusterScheduler,YarnClusterScheduler调用YarnClusterSchedulerBackend,Executor启动Task开始执行任务的具体流程了。* 这些内容在之后的专题中详细分析。*

步骤四:YarnClusterSchedulerBackend在NodeManager上启动Executor

步骤五:Executor启动Task

(二)yarn-cluster方式

yarn-client的流程与yarn-cluster类似,主要区别在于它在本地运行driver,而cluster是在AM上运行driver。
先看一下流程图:

1、区别一:主类入口不同

如果是yarn-client,是:

// In client mode, launch the application main class directly
// In addition, add the main application jar and any added jars (if any) to the classpath
if (deployMode == CLIENT) childMainClass = args.mainClass

即,client类就是用户定义的主类,直接开始运行主类即可。
cluster是在专门的Client类中开始执行的,而yarn-client是在用户代码中开始执行的。

2、启动driver的方式不一样

client模式将在本机启动进程,并注册成为AM。

if (isClusterMode) {runDriver(securityMgr)} else {runExecutorLauncher(securityMgr)}private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {val port = sparkConf.getInt("spark.yarn.am.port", 0)rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr)val driverRef = waitForSparkDriver()addAmIpFilter()registerAM(rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)// In client mode the actor will stop the reporter thread.reporterThread.join()}

spark提交应用的全流程分析相关推荐

  1. 项目提交到GitHub(全流程)

    项目提交到GitHub(全流程) 持续输出学习心得干货,欢迎关注收藏 1.准备工作 1.1创建Github账号 账号肯定是必须要有滴~~ 官网跳转 1.2安装Git工具 下载链接 选择需要的版本进行下 ...

  2. git和SVN提交上库全流程

    git和SVN提交上库全流程 1.背景 我们经常需要将自己修改或添加的代码合入到库里面,完成代码库的更新.我们的代码库一般分为git库和SVN库,两种库的提交上库流程存在区别,本文档简要介绍两种提交上 ...

  3. 关于视频直播系统源码所开发的直播平台全流程分析

    直播全流程探索 近年来,直播兴起,QQ音乐也接入了直播能力,支持演唱会的直播和主播.明星直播,根据互动方式的不同,我们可以分为互动直播和推流直播,本人有幸参与了直播从无到有的过程:对直播这一块有了一个 ...

  4. 干货满满!MES生产制造管理全流程分析

    阅读本文您将了解:1.什么是MES生产管理流程:2.MES生产管理流程具体步骤:3.实施MES生产管理流程优势:4.MES生产管理流程中可能会遇见的问题. 一.什么是MES生产管理流程 MES生产管理 ...

  5. c语言先调用load函数,透过源码全流程分析+load函数初始化

    网络上讲解+load函数的文章很多很多,但我总觉得缺少点什么,主要表现在不够系统化,割裂的看待问题.本文只是谈一下个人的理解,主要涉及以下四个方面: 苹果开发文档对+load方法的介绍 dyld是如何 ...

  6. qq音乐sign算法还原源码放送及jsvmp全流程分析

    1.声明 本次分析过程仅限于学习使用,请勿用于非法用途,若读者用于非法用途其造成的一切后果与本人无关,若本文章侵犯了贵公司的权益请添加本人微信YotaGit联系删除 博客所写的所有算法还原均已开源在G ...

  7. RNA-seq全流程分析

    RNA-seq 数据处理记录(2) 原始数据的处理 去除adapter 找到接头序列 可以通过建库的试剂盒在Illumina官网查找,也可以通过trim_galore自动找到接头并去除. conda ...

  8. 支付宝怎么提交html表单提交,支付宝支付全流程

    支付宝沙箱环境 蚂蚁沙箱环境(Beta)是协助开发者进行接口功能开发及主要功能联调的辅助环境.沙箱环境模拟了开放平台部分产品的主要功能和主要逻辑(当前沙箱支持产品请参考"沙箱支持产品列表&q ...

  9. GEO数据挖掘全流程分析

    声明:以下学习资料根据"生信技能树"网络系列免费教学材料整理而成,代码来自"生信技能树"校长jimmy的github.GEO数据库挖掘系列知识分享课程,于201 ...

最新文章

  1. 全国大学生智能汽车竞赛英飞凌AURIXTM培训--应用篇 : 3月30日直播
  2. 基于物化视图优化_「PostgreSQL技巧」PostgreSQL中的物化视图与汇总表比较
  3. 在计算机系统中使用防病毒软件的作用,防病毒软件的作用是 江苏省网络与信息安全技能竞赛题库(5)...
  4. 7、MySQL设置日志输出方式
  5. Coding Contest HDU - 5988
  6. c语言几种排序方法的比较,基于C语言的几种排序方法比较.doc
  7. fft之后求模值和相位_如何利用相位噪声测量表征时钟抖动来加速设计验证过程...
  8. 【人工智能中“预测”的知识点】
  9. E: Could not get lock /var/lib/dpkg/lock(无法获得锁)
  10. jqprint 分页打印_jQuery打印Html页面自动分页
  11. VBScript教程
  12. C++Comb Sort梳排序的实现算法(附完整源码)
  13. Android网络编程入门解析
  14. System/360 大型机差点毁了 IBM !
  15. 关于cv2.cvtColor(im, cv2.COLOR_RGB2BGR)的一点细节
  16. Ubuntu 下使用MTK FLASH TOOLS
  17. 洛谷 P2404 自然数的拆分问题
  18. PC:各大主板开机启动项快捷键
  19. org.xml.sax.SAXParseException; lineNumber: 9; columnNumber: 105; cvc-elt.1: 找不到元素 'beans' 的声明。
  20. NTSTATUS类型返回值及含义

热门文章

  1. jQuery的DOM操作之取值/赋值(1)
  2. 【双100%提交】剑指 Offer 09. 用两个栈实现队列
  3. 21行代码AC——习题3-7 DNA序列(UVa-1368)_解题报告
  4. (*长期更新)软考网络工程师学习笔记——Section 14 Linux服务器配置
  5. python输入多组测试数据_python ddt数据驱动实例代码分享
  6. Linux账号和权限管理详解(超详细示例操作)!
  7. 高并发环境下的Nginx该如何优化,让用户再也不会说卡
  8. Xcode搭建真机调试环境 图文实例
  9. android 无appid分享_App ID 和Bundle ID 有什么不同?ios面试攻克篇(六)
  10. js大屏导出图片_整理了30个实用可视化大屏模板,附源文件+工具