深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析
概述
前几篇博文都在介绍Spark的调度,这篇博文我们从更加宏观的调度看Spark,讲讲Spark的部署模式。Spark部署模式分以下几种:
- local 模式
- local-cluster 模式
- Standalone 模式
- YARN 模式
- Mesos 模式
我们先来简单介绍下YARN模式,然后深入讲解Standalone模式。
YARN 模式介绍
YARN介绍
YARN是一个资源管理、任务调度的框架,主要包含三大模块:ResourceManager(RM)、NodeManager(NM)、ApplicationMaster(AM)。
其中,ResourceManager负责所有资源的监控、分配和管理;ApplicationMaster负责每一个具体应用程序的调度和协调;NodeManager负责每一个节点的维护。
对于所有的applications,RM拥有绝对的控制权和对资源的分配权。而每个AM则会和RM协商资源,同时和NodeManager通信来执行和监控task。几个模块之间的关系如图所示。
Yarn Cluster 模式
Spark的Yarn Cluster 模式流程如下:
- 本地用YARN Client 提交App 到 Yarn Resource Manager
- Yarn Resource Manager 选个 YARN Node Manager,用它来
- 创建个ApplicationMaster,SparkContext相当于是这个ApplicationMaster管的APP,生成YarnClusterScheduler与YarnClusterSchedulerBackend
- 选择集群中的容器启动CoarseCrainedExecutorBackend,用来启动spark.executor。
- ApplicationMaster与CoarseCrainedExecutorBackend会有远程调用。
Yarn Client 模式
Spark的Yarn Client 模式流程如下:
- 本地启动SparkContext,生成YarnClientClusterScheduler 和 YarnClientClusterSchedulerBackend
- YarnClientClusterSchedulerBackend启动yarn.Client,用它提交App 到 Yarn Resource Manager
- Yarn Resource Manager 选个 YARN Node Manager,用它来选择集群中的容器启动CoarseCrainedExecutorBackend,用来启动spark.executor
- YarnClientClusterSchedulerBackend与CoarseCrainedExecutorBackend会有远程调用。
Standalone 模式介绍
- 启动app,在SparkContxt启动过程中,先初始化DAGScheduler 和 TaskScheduler,并初始化 SparkDeploySchedulerBackend,并在其内部启动DriverEndpoint和ClientEndpoint。
- ClientEndpoint想Master注册app,Master收到注册信息后把该app加入到等待运行app列表中,等待由Master分配给该app worker。
- app获取到worker后,Master通知Worker的WorkerEndpont创建CoarseGrainedExecutorBackend进程,在该进程中创建执行容器executor
- executor创建完毕后发送信息给Master和DriverEndpoint,告知Executor创建完毕,在SparkContext注册,后等待DriverEndpoint发送执行任务的消息。
- SparkContext分配TaskSet给CoarseGrainedExecutorBackend,按一定调度策略在executor执行。详见:《深入理解Spark 2.1 Core (二):DAG调度器的实现与源码分析 》与《深入理解Spark 2.1 Core (三):任务调度器的实现与源码分析 》
- CoarseGrainedExecutorBackend在Task处理的过程中,把处理Task的状态发送给DriverEndpoint,Spark根据不同的执行结果来处理。若处理完毕,则继续发送其他TaskSet。详见:《深入理解Spark 2.1 Core (四):运算结果处理和容错的实现与源码分析 》
- app运行完成后,SparkContext会进行资源回收,销毁Worker的CoarseGrainedExecutorBackend进程,然后注销自己。
Standalone 启动集群
启动Master
master.Master
我们先来看下Master对象的main函数做了什么:
private[deploy] object Master extends Logging {val SYSTEM_NAME = "sparkMaster"val ENDPOINT_NAME = "Master"def main(argStrings: Array[String]) {Utils.initDaemon(log)//创建SparkConfval conf = new SparkConf//解析SparkConf参数val args = new MasterArguments(argStrings, conf)val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)rpcEnv.awaitTermination()}def startRpcEnvAndEndpoint(host: String,port: Int,webUiPort: Int,conf: SparkConf): (RpcEnv, Int, Option[Int]) = {val securityMgr = new SecurityManager(conf)val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)//创建Masterval masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)//返回 Master RpcEnv,//web UI 端口,//其他服务的端口(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)}
}
- 1
master.MasterArguments
接下来我们看看master是如何解析参数的:
private[master] class MasterArguments(args: Array[String], conf: SparkConf) extends Logging {//默认配置var host = Utils.localHostName()var port = 7077var webUiPort = 8080//Spark属性文件 //默认为 spark-default.confvar propertiesFile: String = null// 检查环境变量if (System.getenv("SPARK_MASTER_IP") != null) {logWarning("SPARK_MASTER_IP is deprecated, please use SPARK_MASTER_HOST")host = System.getenv("SPARK_MASTER_IP")}if (System.getenv("SPARK_MASTER_HOST") != null) {host = System.getenv("SPARK_MASTER_HOST")}if (System.getenv("SPARK_MASTER_PORT") != null) {port = System.getenv("SPARK_MASTER_PORT").toInt}if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt}parse(args.toList)// 转变SparkConfpropertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)//环境变量的SPARK_MASTER_WEBUI_PORT//会被Spark属性spark.master.ui.port所覆盖if (conf.contains("spark.master.ui.port")) {webUiPort = conf.get("spark.master.ui.port").toInt}//解析命令行参数//命令行参数会把环境变量和Spark属性都覆盖@tailrecprivate def parse(args: List[String]): Unit = args match {case ("--ip" | "-i") :: value :: tail =>Utils.checkHost(value, "ip no longer supported, please use hostname " + value)host = valueparse(tail)case ("--host" | "-h") :: value :: tail =>Utils.checkHost(value, "Please use hostname " + value)host = valueparse(tail)case ("--port" | "-p") :: IntParam(value) :: tail =>port = valueparse(tail)case "--webui-port" :: IntParam(value) :: tail =>webUiPort = valueparse(tail)case ("--properties-file") :: value :: tail =>propertiesFile = valueparse(tail)case ("--help") :: tail =>printUsageAndExit(0)case Nil => case _ =>printUsageAndExit(1)}private def printUsageAndExit(exitCode: Int) {System.err.println("Usage: Master [options]\n" +"\n" +"Options:\n" +" -i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h) \n" +" -h HOST, --host HOST Hostname to listen on\n" +" -p PORT, --port PORT Port to listen on (default: 7077)\n" +" --webui-port PORT Port for web UI (default: 8080)\n" +" --properties-file FILE Path to a custom Spark properties file.\n" +" Default is conf/spark-defaults.conf.")System.exit(exitCode)}
}
- 1
- 37
我们可以看到上述参数设置的优先级别为:
系统环境变量<spark−default.conf中的属性<命令行参数<应用级代码中的参数设置
启动Worker
worker.Worker
我们先来看下Worker对象的main函数做了什么:
private[deploy] object Worker extends Logging {val SYSTEM_NAME = "sparkWorker"val ENDPOINT_NAME = "Worker"def main(argStrings: Array[String]) {Utils.initDaemon(log)//创建SparkConfval conf = new SparkConf//解析SparkConf参数val args = new WorkerArguments(argStrings, conf)val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,args.memory, args.masters, args.workDir, conf = conf)rpcEnv.awaitTermination()}def startRpcEnvAndEndpoint(host: String,port: Int,webUiPort: Int,cores: Int,memory: Int,masterUrls: Array[String],workDir: String,workerNumber: Option[Int] = None,conf: SparkConf = new SparkConf): RpcEnv = {val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")val securityMgr = new SecurityManager(conf)val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))//创建WorkerrpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))rpcEnv}***
worker.WorkerArguments
worker.WorkerArguments与master.MasterArguments类似:
private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {var host = Utils.localHostName()var port = 0var webUiPort = 8081var cores = inferDefaultCores()var memory = inferDefaultMemory()var masters: Array[String] = nullvar workDir: String = nullvar propertiesFile: String = null// 检查环境变量if (System.getenv("SPARK_WORKER_PORT") != null) {port = System.getenv("SPARK_WORKER_PORT").toInt}if (System.getenv("SPARK_WORKER_CORES") != null) {cores = System.getenv("SPARK_WORKER_CORES").toInt}if (conf.getenv("SPARK_WORKER_MEMORY") != null) {memory = Utils.memoryStringToMb(conf.getenv("SPARK_WORKER_MEMORY"))}if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt}if (System.getenv("SPARK_WORKER_DIR") != null) {workDir = System.getenv("SPARK_WORKER_DIR")}parse(args.toList)// 转变SparkConfpropertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)if (conf.contains("spark.worker.ui.port")) {webUiPort = conf.get("spark.worker.ui.port").toInt}checkWorkerMemory()@tailrecprivate def parse(args: List[String]): Unit = args match {case ("--ip" | "-i") :: value :: tail =>Utils.checkHost(value, "ip no longer supported, please use hostname " + value)host = valueparse(tail)case ("--host" | "-h") :: value :: tail =>Utils.checkHost(value, "Please use hostname " + value)host = valueparse(tail)case ("--port" | "-p") :: IntParam(value) :: tail =>port = valueparse(tail)case ("--cores" | "-c") :: IntParam(value) :: tail =>cores = valueparse(tail)case ("--memory" | "-m") :: MemoryParam(value) :: tail =>memory = valueparse(tail)//工作目录case ("--work-dir" | "-d") :: value :: tail =>workDir = valueparse(tail)case "--webui-port" :: IntParam(value) :: tail =>webUiPort = valueparse(tail)case ("--properties-file") :: value :: tail =>propertiesFile = valueparse(tail)case ("--help") :: tail =>printUsageAndExit(0)case value :: tail =>if (masters != null) { // Two positional arguments were givenprintUsageAndExit(1)}masters = Utils.parseStandaloneMasterUrls(value)parse(tail)case Nil =>if (masters == null) { // No positional argument was givenprintUsageAndExit(1)}case _ =>printUsageAndExit(1)}***
资源回收
我们在概述中提到了“ app运行完成后,SparkContext会进行资源回收,销毁Worker的CoarseGrainedExecutorBackend进程,然后注销自己。”接下来我们就来讲解下Master和Executor是如何感知到Application的退出的。
调用栈如下:
- SparkContext.stop
- DAGScheduler.stop
- TaskSchedulerImpl.stop
- CoarseGrainedSchedulerBackend.stop
- CoarseGrainedSchedulerBackend.stopExecutors
- CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply
- CoarseGrainedExecutorBackend.receive
- Executor.stop
- CoarseGrainedExecutorBackend.receive
- CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply
- CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply
- CoarseGrainedSchedulerBackend.stopExecutors
- CoarseGrainedSchedulerBackend.stop
- TaskSchedulerImpl.stop
- DAGScheduler.stop
SparkContext.stop
SparkContext.stop会调用DAGScheduler.stop
***if (_dagScheduler != null) {Utils.tryLogNonFatalError {_dagScheduler.stop()}_dagScheduler = null}***
DAGScheduler.stop
DAGScheduler.stop会调用TaskSchedulerImpl.stop
def stop() {//停止消息调度messageScheduler.shutdownNow()//停止事件处理循环eventProcessLoop.stop()//调用TaskSchedulerImpl.stoptaskScheduler.stop()}
TaskSchedulerImpl.stop
TaskSchedulerImpl.stop会调用CoarseGrainedSchedulerBackend.stop
override def stop() {//停止推断speculationScheduler.shutdown()//调用CoarseGrainedSchedulerBackend.stopif (backend != null) {backend.stop()}//停止结果获取if (taskResultGetter != null) {taskResultGetter.stop()}starvationTimer.cancel()}
CoarseGrainedSchedulerBackend.stop
override def stop() {//调用stopExecutors()stopExecutors()try {if (driverEndpoint != null) {//发送StopDriver信号driverEndpoint.askWithRetry[Boolean](StopDriver)}} catch {case e: Exception =>throw new SparkException("Error stopping standalone scheduler's driver endpoint", e)}}
CoarseGrainedSchedulerBackend.stopExecutors
我们先来看下CoarseGrainedSchedulerBackend.stopExecutors
def stopExecutors() {try {if (driverEndpoint != null) {logInfo("Shutting down all executors")//发送StopExecutors信号driverEndpoint.askWithRetry[Boolean](StopExecutors)}} catch {case e: Exception =>throw new SparkException("Error asking standalone scheduler to shut down executors", e)}}
- 1
- 12
CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply
DriverEndpoint接收并回应该信号:
case StopExecutors =>logInfo("Asking each executor to shut down")for ((_, executorData) <- executorDataMap) {//给CoarseGrainedExecutorBackend发送StopExecutor信号executorData.executorEndpoint.send(StopExecutor)}context.reply(true)
- 1
CoarseGrainedExecutorBackend.receive
CoarseGrainedExecutorBackend接收该信号:
case StopExecutor =>stopping.set(true)logInfo("Driver commanded a shutdown")//这里并没有直接关闭Executor,//因为Executor必须先返回确认帧给CoarseGrainedSchedulerBackend//所以,这的策略是给自己再发一个Shutdown信号,然后处理self.send(Shutdown)case Shutdown =>stopping.set(true)new Thread("CoarseGrainedExecutorBackend-stop-executor") {override def run(): Unit = {// executor.stop() 会调用 `SparkEnv.stop()` // 直到 RpcEnv 彻底结束 // 但是, 如果 `executor.stop()` 运行在和RpcEnv相同的线程里面, // RpcEnv 会等到`executor.stop()`结束后才能结束,// 这就产生了死锁// 因此,我们需要新建一个线程executor.stop()}
- 1
Executor.stop
def stop(): Unit = {env.metricsSystem.report()//关闭心跳heartbeater.shutdown()heartbeater.awaitTermination(10, TimeUnit.SECONDS)//关闭线程池threadPool.shutdown()if (!isLocal) {//停止SparkEnvenv.stop()}}
CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply
我们回过头来看CoarseGrainedSchedulerBackend.stop,调用stopExecutors()结束后,会给 driverEndpoint发送StopDriver信号。CoarseGrainedSchedulerBackend.DriverEndpoint.接收信号并回复:
case StopDriver =>context.reply(true)//停止driverEndpointstop()
深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析相关推荐
- 深入理解Spark 2.1 Core (六):Standalone模式运行的原理与源码分析
我们讲到了如何启动Master和Worker,还讲到了如何回收资源.但是,我们没有将AppClient是如何启动的,其实它们的启动也涉及到了资源是如何调度的.这篇博文,我们就来讲一下AppClient ...
- 深入理解Spark 2.1 Core (十四):securityManager 类源码分析
securityManager主要用于权限设置,比如在使用yarn作为资源调度框架时,用于生成secret key进行登录.该类默认只用一个实例,所以的app使用同一个实例,下面是该类的所有源代码: ...
- 深入理解Spark 2.1 Core (八):Standalone模式容错及HA的原理与源码分析
第五.第六.第七篇博文,我们讲解了Standalone模式集群是如何启动的,一个App起来了后,集群是如何分配资源,Worker启动Executor的,Task来是如何执行它,执行得到的结果如何处理, ...
- 深入理解Spark 2.1 Core (七):Standalone模式任务执行的原理与源码分析
这篇博文,我们就来讲讲Executor启动后,是如何在Executor上执行Task的,以及其后续处理. 执行Task 我们在<深入理解Spark 2.1 Core (三):任务调度器的原理与源 ...
- 深入理解Spark 2.1 Core (十二):TimSort 的原理与源码分析
在博文<深入理解Spark 2.1 Core (十):Shuffle Map 端的原理与源码分析 >中我们提到了: 使用Sort等对数据进行排序,其中用到了TimSort 这篇博文我们就来 ...
- 深入理解Spark 2.1 Core (十一):Shuffle Reduce 端的原理与源码分析
我们曾经在<深入理解Spark 2.1 Core (一):RDD的原理与源码分析 >讲解过: 为了有效地实现容错,RDD提供了一种高度受限的共享内存,即RDD是只读的,并且只能通过其他RD ...
- 深入理解Spark 2.1 Core (十):Shuffle Map 端的原理与源码分析
在上一篇<深入理解Spark 2.1 Core (九):迭代计算和Shuffle的原理与源码分析>提到经过迭代计算后, SortShuffleWriter.write中: // 根据排序方 ...
- spring源码分析第五天------springAOP核心原理及源码分析
spring源码分析第五天------springAOP核心原理及源码分析 1. 面向切面编程.可以通过预 编译方式和运行期动态代理实现在不修改源代码的情况下给程序动态统一添加功能的一种技术 切面(A ...
- 深入理解Spark 2.1 Core (二):DAG调度器的原理与源码分析
概述 上一篇<深入理解Spark(一):RDD实现及源码分析 >提到: 定义RDD之后,程序员就可以在动作(注:即action操作)中使用RDD了.动作是向应用程序返回值,或向存储系统导出 ...
最新文章
- java操作字符串的工具类StringUtil
- pytorch 实现线性回归
- Android中常用到的权限
- goip技术原理图解_图解电工识图一看就懂
- tensorflow实现余弦cosine相似度
- python 功能代码是什么_Python功能代码
- 如何用计算机管理员权限,怎么打开管理员权限,电脑怎么用管理员权限
- 52周存钱挑战2.0
- 女生什么样的表现会说明她喜欢你?——男生记得都看一遍,谨记~
- 【友盟】 微博分享缺少C8998文件
- YOLOv5 Head解耦
- 如何实现伸缩 /折叠报表
- 回顾2015年发生的知识产权十大热点案件
- 个人实用java 常用语句(工作笔记)
- 【13】2016.12.13 周二--《小结2016》
- oracle出现“无法为表空间 XX 中的段创建 INITIAL 区”错误
- linux如何进u盘 命令,如何在linux下使用u盘
- 关于当代青少年梦想追逐的问卷调查
- 深度学习-误差反向传播算法
- !!! LVS项目----官方网址 !!!
热门文章
- C语言满分代码:1018 锤子剪刀布 (20分)(解题报告)
- 工程之星android版使用,安卓版工程之星软件网络1+1模式及网络cors连接操作详解...
- onclick如何调用含参函数_在 golang 中如何调用私有函数(绑定隐藏的标识符)
- iphone电池怎么保养_蓄电池在ups系统中应该怎么维护保养?
- response.setHeader各种用法 .
- 崩坏三x86架构闪退_不给X86留活路?苹果下一代M系列芯片竟然这么强
- md5会重复吗_自媒体平台视频重复审查机制,如何避免自己做的视频和别人的重复...
- php开发领域,PHP-MySQL相关领域
- python里面如何安装nltk_nltk的安装和简单使用
- python 图片地址_python解码data:image开头的图片地址