Spark源码分析之Master状态改变处理机制原理
一Master故障挥着宕机,可能触发新的Master选举
当重新选择Leader的时候,会进行集群的恢复,在恢复的过程中,就会向Worker和AppClient发送MasterChanged消息。
private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],storedWorkers: Seq[WorkerInfo]) {// 遍历每一个存储的application,注册该application,并且发送MasterChanged请求for (app <- storedApps) {logInfo("Trying to recover app: " + app.id)try {registerApplication(app)// 将该application状态置为UNKNOWN状态app.state = ApplicationState.UNKNOWN// 然后这个app向master发送MasterChanged请求app.driver.send(MasterChanged(self, masterWebUiUrl))} catch {case e: Exception => logInfo("App " + app.id + " had exception on reconnect")}}// 遍历每一个存储的driver, 更新master所维护的driver集合for (driver <- storedDrivers) {drivers += driver}// 遍历每一个存储的wroker,然后向master注册workerfor (worker <- storedWorkers) {logInfo("Trying to recover worker: " + worker.id)try {// 注册worker,就是更新master的woker集合,和worker相关的映射列表registerWorker(worker)// 将该worker状态置为UNKNOWN状态worker.state = WorkerState.UNKNOWN// 然后改worker向master发送MasterChanged请求worker.endpoint.send(MasterChanged(self, masterWebUiUrl))} catch {case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")}} }
二Worker和AppClient会接受到来自Master的MasterChanged消息
2.1 Worker在收到MasterChanged消息
# 获取新的master的url和master,连接状态置为true,取消之前的尝试重新注册
# 向新的master发送WorkerSchedulerStateResponse消息,然后会做一些操作
case MasterChanged(masterRef, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
// 获取新的master的url和master,连接状态置为true,取消之前的尝试重新注册
changeMaster(masterRef, masterWebUiUrl)
// 创建当前节点executors的简单描述对象ExecutorDescription
val execs = executors.values.
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
// 向新的master发送WorkerSchedulerStateResponse消息,然后会做一些操作
masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))
2.2 AppClient在收到MasterChanged消息
# 更新master
# 向新的master发送MasterChangeAcknowledged消息
case MasterChanged(masterRef, masterWebUiUrl) =>logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)// 更新mastermaster = Some(masterRef)alreadyDisconnected = false// 向新的master发送MasterChangeAcknowledged消息masterRef.send(MasterChangeAcknowledged(appId.get))
三 Master会接受来自Worker和AppClient的消息
3.1 Master在收到Worker的WorkerSchedulerStateResponse消息
由于这是新的master,所以worker需要重新注册,然后新的master再次把之前相关的应用程序在worker上进行恢复
case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>// 根据workerId获取workeridToWorker.get(workerId) match {case Some(worker) =>logInfo("Worker has been re-registered: " + workerId)// worker状态置为aliveworker.state = WorkerState.ALIVE// 从指定的executor中过滤出哪些是有效的executorval validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)// 遍历有效的executorsfor (exec <- validExecutors) {// 获取executor所对应的appval app = idToApp.get(exec.appId).get// 为app设置executor,比如哪一个worker,多少核数等资源val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))// 将该executor添加到该woker上worker.addExecutor(execInfo)execInfo.copyState(exec)}// 将所有的driver设置为RUNNING然后加入到worker中for (driverId <- driverIds) {drivers.find(_.id == driverId).foreach { driver =>driver.worker = Some(worker)driver.state = DriverState.RUNNINGworker.drivers(driverId) = driver}}case None =>logWarning("Scheduler state from unknown worker: " + workerId)}// 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作if (canCompleteRecovery) { completeRecovery() }
3.2 Master收到AppClient的MasterChangeAcknowledged消息
# 更新application状态为WAITTING
# 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作
case MasterChangeAcknowledged(appId) =>idToApp.get(appId) match {case Some(app) =>logInfo("Application has been re-registered: " + appId)app.state = ApplicationState.WAITINGcase None =>logWarning("Master change ack from unknown app: " + appId)}// 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作if (canCompleteRecovery) { completeRecovery() }
Spark源码分析之Master状态改变处理机制原理相关推荐
- Spark源码分析之Master启动和通信机制
Master主要就是用于管理集群,负责资源的调度什么的.它继承了ThreadSafeRpcEndpoint和LeaderElectable,由于继承ThreadSafeRpcEndpoint,所以Ma ...
- Spark源码分析之Master主备切换机制
Master作为Spark standalone模式的核心,如果Master出现异常,那么集群就不能正常工作.所以Spark会从Standby中选择一个节点作为Master. Spark支持以下几种策 ...
- Spark源码分析之Master资源调度算法原理
Master是通过schedule方法进行资源调度,告知worker启动executor等. 一schedule方法 1判断master状态,只有alive状态的master才可以进行资源调度,sta ...
- Spark源码分析之Master注册机制原理
一 Worker向Master注册 1.1 Worker启动,调用registerWithMaster,向Master注册 当worker启动的时候,会调用registerWithMaster方法 # ...
- Spark源码分析 – DAGScheduler
DAGScheduler的架构其实非常简单, 1. eventQueue, 所有需要DAGScheduler处理的事情都需要往eventQueue中发送event 2. eventLoop Threa ...
- spark 源码分析之十八 -- Spark存储体系剖析
本篇文章主要剖析BlockManager相关的类以及总结Spark底层存储体系. 总述 先看 BlockManager相关类之间的关系如下: 我们从NettyRpcEnv 开始,做一下简单说明. Ne ...
- Spark源码分析:多种部署方式之间的区别与联系
作者:过往记忆 从官方的文档我们可以知道, Spark 的部署方式有很多种:local.Standalone.Mesos.YARN-..不同部署方式的后台处理进程是不一样的,但是如果我们从代码的角度来 ...
- Spark源码分析之七:Task运行(一)
在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...
- Spark 源码分析
2019独角兽企业重金招聘Python工程师标准>>> 一. 启动篇 (一) 引子 在spark-shell终端执行 val arr = Array(1,2,3,4) val rdd ...
最新文章
- 测试工程师的好日子来啦?Testin发布AI测试产品,提升易用性和自动化效率
- python掌握程度怎么判断-想要学习人工智能需要掌握Python到什么程度
- boxfilter 函数
- python 爬虫爬取小说信息
- DCS::TabNotebook
- 解决局域网共享时提示:你没有权限访问,请与网络管理员联系
- 哈工大2022形式语言与自动机期末
- 代码静态检测——QAC
- 箩筐火车免费wifi v4.3.0
- html 外联 变 内联,Html 内联元素、外联元素 和 可变元素
- 分享下nirsoft提供的注册表工具
- #PixelConFi | 这个教师节,以投票代替祝福
- Python之You-Get库学习
- 最新官方新浪短网址API接口分享-附代码调用演示
- HBuilderX 打包问题
- 页面5秒钟刷新一次(html,php)均可用
- Axure第一周学习日志
- 第八题、哈夫曼编码大全
- 赛尔号服务器维护时间4月27,赛尔号03月27日更新攻略汇总 瀚海界神重获新生
- mescroll.js下拉刷新
热门文章
- 亚太数学建模竞赛优秀论文_全国大学生数学建模竞赛介绍
- qimage加载bmp图片_9个最佳的优化动态gif图片大小的工具
- 微信小程序 服务器触发事件,微信小程序组件间通讯与事件
- audio标签的controls属性_HTML5 新增标签和属性
- 数字取证Linux发行版,Parrot 4.2.2 发布,数字取证Linux发行版
- onu光功率多少是正常_ONU、机顶盒、路由器常见网络问题及处理方法
- Java 算法 等差数列
- Linux zip加密压缩
- 解决Mac安装LightGBM报错LightGBM and gcc 8 in MacOS: Library not loaded
- Linux解压tar.gz、zip、tar.bz2 文件与对应的命令