spark启动的worker节点是localhost_「Spark源码分析1」Spark standalone模式Master和Worker启动流程...
1.简述
spark 源码分析第一篇,准备从最基本的集群搭建入手,全面剖析spark。希望自己能对spark又更深入的理解。希望对读者有所帮助。言归正传,Spark standalone 模式,架构图:
这里先讨论Master和worker启动,以及之间的通讯:worker向master注册,worker向master发送heartbeat。
2.Master及启动流程
继承ThreadSafeRpcEndpoint类。启动master,会执行它自己的onStart函数。
2.1.执行start-master.sh脚本,->spark-daemon.sh "org.apache.spark.deploy.master.Master" -> spark-class start ->org.apache.spark.launcher.Main ->org.apache.spark.deploy.master.Master
2.2.调用Master的main方法,执行
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
在此方法里创建RpcEnv对象和master对象:
def startRpcEnvAndEndpoint( host: String, port: Int, webUiPort: Int, conf: SparkConf): (RpcEnv, Int, Option[Int]) = { val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr) val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest) (rpcEnv, portsResponse.webUIPort, portsResponse.restPort) }
2.3.创建master对象,执行onstart方法。
(1).onstart函数做了如下的事情:
启动web UI
发送CheckForWorkerTimeOut消息给自己,移除超时的worker。
根据参数判断是否启动rest的接口,
注册master resources到master MetricsSystem
根据参数指定的recovery mode进行恢复。
(2) receive 函数接收了哪些消息
ElectedLeader
CompleteRecovery
RevokedLeadership 收回leader权力
RegisterWorker 注册worker :注册成功后,发送RegisteredWorker消息给worker
RegisterApplication 注册application
ExecutorStateChanged
DriverStateChanged
Heartbeat
MasterChangeAcknowledged
WorkerSchedulerStateResponse
WorkerLatestState
CheckForWorkerTimeOut
RequestSubmitDriver 请求提交driver
RequestKillDriver
RequestDriverStatus
RequestMasterState
BoundPortsRequest
3.worker及启动流程
同样继承ThreadSafeRpcEndpoint类.
3.1.执行start-slave.sh脚本 -> spark-daemon.sh start "org.apache.spark.deploy.worker.Worker" ->-> spark-class start ->org.apache.spark.launcher.Main ->org.apache.spark.deploy.worker.Worker
3.2.执行Worker的main方法
启动rpcEnv对象和Worker对象。
val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores, args.memory, args.masters, args.workDir, conf = conf)
def startRpcEnvAndEndpoint( host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None, conf: SparkConf = new SparkConf): RpcEnv = { // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("") val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr) val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_)) rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory, masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr)) rpcEnv }
3.3.创建worker对象,执行onstart方法。
(1).onstart函数做了如下的事情:
在SPAKR_HOME目录下创建work目录
启动外部shuffle服务
启动workerweb UI
注册到master上,根据 val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)获取masterRef。
启动MetricsSystem
3.4.接收到master注册完成的消息RegisteredWorker之后调用handleRegisterResponse方法
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized { msg match { case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress) => if (preferConfiguredMasterAddress) { logInfo("Successfully registered with master " + masterAddress.toSparkURL) } else { logInfo("Successfully registered with master " + masterRef.address.toSparkURL) } registered = true //设置masterRef changeMaster(masterRef, masterWebUiUrl, masterAddress) //定时发送heartbeat forwordMessageScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(SendHeartbeat) } }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS) //是否清理workdir if (CLEANUP_ENABLED) { logInfo( s"Worker cleanup enabled; old application directories will be deleted in: $workDir") forwordMessageScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(WorkDirCleanup) } }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS) } val execs = executors.values.map { e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state) } masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq)) case RegisterWorkerFailed(message) => if (!registered) { logError("Worker registration failed: " + message) System.exit(1) } case MasterInStandby => // Ignore. Master not yet ready. } }
4.总结
最好是跟着上面的讲接把源码理一遍,就清晰很多了, 接下来会分析driver的启动。
spark启动的worker节点是localhost_「Spark源码分析1」Spark standalone模式Master和Worker启动流程...相关推荐
- 【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
承接[[深度挖掘 RocketMQ底层源码]「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)] pullBlockIfNotFound方法 通过该方法获取 ...
- 「从源码中学习」面试官都不知道的Vue题目答案
前言 当回答面试官问及的Vue问题,我们除了照本宣科的回答外,其实还可以根据少量的源码来秀一把,来体现出你对Vue的深度了解. 本文会陆续更新,此次涉及以下问题: "new Vue()做了什 ...
- springmvc项目在启动完成之后执行一次方法_SpringMVC源码分析
一 SpringMVC运行原理 二 SpingMVC源码分析 1 DispatcherServlet 1.1 DispatcherServlet继承结构 ServletConfig对象获取Init标签 ...
- 「Leakcanary 源码分析」看这一篇就够了
image.png 「Leakcanary 」是我们经常用于检测内存泄漏的工具,简单的使用方式,内存泄漏的可视化,是我们开发中必备的工具之一. 分析源码之前 Leakcanary 大神的 github ...
- 自动驾驶 Apollo 源码分析系列,感知篇(二):Perception 如何启动?
从 Apollo 的官方文档,我们很容易得知 Perception 是核心的组件之一,但像所有的 C++ 程序一样,每个应用都有一个 Main 函数入口,那么引出本文要探索的 2 个问题: Perce ...
- java 最少使用(lru)置换算法_「Redis源码分析」Redis中的LRU算法实现
如果对我的文章感兴趣.希望阅读完可以得到你的一个[三连],这将是对我最大的鼓励和支持. LRU是什么 LRU(least recently used)是一种缓存置换算法.即在缓存有限的情况下,如果有新 ...
- Spark 2.2 Core :TimSort 的原理与源码分析
文章目录 1.想理解TimeSort必须先理解归并排序 2. timsort 其实只是对归并排序进行了一系列的改进. 3.Timsort的核心过程 4.源代码 5.全局流程图 6.演示 1.想理解Ti ...
- 「Ceph源码分析」纠删码解码
存储系统:ceph-14.2.22 操作系统:ubuntu-server-18.04 总体框架 源码分析 ECBackend::objects_read_and_reconstruct [ 文件路径 ...
- C/C++编程打造单机麻将「附源码+说明文档」一个不错的入门项目
项目介绍 基于最新版 Cocos2d-X 3.17 与 cocostudio V3.10 开发的单机麻将游戏,麻将算法为商业级麻将服务器端算法,整个项目代码精简.注释详细. 如果你刚刚接触Cocos2 ...
- Android 9(P)之init进程启动源码分析指南之一
Android 9 之init进程启动源码分析指南之一 Android 9 (P) 系统启动及进程创建源码分析目录: Android 9 (P)之init进程启动源码分析指南之一 Andro ...
最新文章
- PowerShell批量修改邮箱配额和已删除保留期
- STM32 应用程序加密的一种设计方案
- React - S1
- mysql or_MySQL中or语句用法示例
- 链接详解--共享库命名
- delphi ,安装插件
- 中国期货交易技术的逆袭之路
- Java实现 蓝桥杯VIP 算法训练 会议中心
- Cesium 纹理贴图
- ArcGIS影像空值填充\插补
- [逆向]汇编movs,stos,rep指令讲解
- 计算机专业面试银行的自我介绍,银行面试自我介绍范文3分钟
- ① 如何优雅快乐打出漂亮LaTeX数学公式——小乐数学zzllrr Mather公式编辑器教程
- 【noip模拟题】天神下凡(贪心)
- 用pip安装pytorch
- 我的世界无限法则服务器怎么用,我的世界无限法则版
- 英文邮件模板--向论文作者索要源代码(write an email requesting for code)
- 构造方法--带参构造方法
- 字符串转换成JSON
- 基于GA优化BP神经网络的传感器故障诊断算法matlab仿真
热门文章
- iOS开发UI篇—程序启动原理和UIApplication
- ubuntu下MySQL的安装及远程连接配置(转)
- Error: could not open `C:\Program Files\Java\jre6\lib\i386\jvm.cfg')
- pureftp在企业中的应用及配置
- 今天博客园肿了吗?希望团队修复一下
- 今天写了个查看 ABAP 类层级的程序
- 跟我一起学写Makefile-Lesson 8
- Python爬虫开发【第1篇】【正则表达式】
- Canvas坐标轴中的Y轴距离是X轴的两倍
- 【并行计算-CUDA开发】CUDA shared memory bank 冲突