一Master故障挥着宕机,可能触发新的Master选举

当重新选择Leader的时候,会进行集群的恢复,在恢复的过程中,就会向Worker和AppClient发送MasterChanged消息。

private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],storedWorkers: Seq[WorkerInfo]) {// 遍历每一个存储的application,注册该application,并且发送MasterChanged请求for (app <- storedApps) {logInfo("Trying to recover app: " + app.id)try {registerApplication(app)// 将该application状态置为UNKNOWN状态app.state = ApplicationState.UNKNOWN// 然后这个app向master发送MasterChanged请求app.driver.send(MasterChanged(self, masterWebUiUrl))} catch {case e: Exception => logInfo("App " + app.id + " had exception on reconnect")}}// 遍历每一个存储的driver, 更新master所维护的driver集合for (driver <- storedDrivers) {drivers += driver}// 遍历每一个存储的wroker,然后向master注册workerfor (worker <- storedWorkers) {logInfo("Trying to recover worker: " + worker.id)try {// 注册worker,就是更新master的woker集合,和worker相关的映射列表registerWorker(worker)// 将该worker状态置为UNKNOWN状态worker.state = WorkerState.UNKNOWN// 然后改worker向master发送MasterChanged请求worker.endpoint.send(MasterChanged(self, masterWebUiUrl))} catch {case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")}}
}

二Worker和AppClient会接受到来自Master的MasterChanged消息

2.1 Worker在收到MasterChanged消息

# 获取新的master的url和master,连接状态置为true,取消之前的尝试重新注册

# 向新的master发送WorkerSchedulerStateResponse消息,然后会做一些操作

case MasterChanged(masterRef, masterWebUiUrl) =>
  logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
  // 获取新的master的url和master,连接状态置为true,取消之前的尝试重新注册
 
changeMaster(masterRef, masterWebUiUrl)
  // 创建当前节点executors的简单描述对象ExecutorDescription
 
val execs = executors.values.
    map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
  // 向新的master发送WorkerSchedulerStateResponse消息,然后会做一些操作
 
masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))

2.2 AppClient在收到MasterChanged消息

# 更新master

# 向新的master发送MasterChangeAcknowledged消息

case MasterChanged(masterRef, masterWebUiUrl) =>logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)// 更新mastermaster = Some(masterRef)alreadyDisconnected = false// 向新的master发送MasterChangeAcknowledged消息masterRef.send(MasterChangeAcknowledged(appId.get))

三 Master会接受来自Worker和AppClient的消息

3.1 Master在收到Worker的WorkerSchedulerStateResponse消息

由于这是新的master,所以worker需要重新注册,然后新的master再次把之前相关的应用程序在worker上进行恢复

case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>// 根据workerId获取workeridToWorker.get(workerId) match {case Some(worker) =>logInfo("Worker has been re-registered: " + workerId)// worker状态置为aliveworker.state = WorkerState.ALIVE// 从指定的executor中过滤出哪些是有效的executorval validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)// 遍历有效的executorsfor (exec <- validExecutors) {// 获取executor所对应的appval app = idToApp.get(exec.appId).get// 为app设置executor,比如哪一个worker,多少核数等资源val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))// 将该executor添加到该wokerworker.addExecutor(execInfo)execInfo.copyState(exec)}// 将所有的driver设置为RUNNING然后加入到worker中for (driverId <- driverIds) {drivers.find(_.id == driverId).foreach { driver =>driver.worker = Some(worker)driver.state = DriverState.RUNNINGworker.drivers(driverId) = driver}}case None =>logWarning("Scheduler state from unknown worker: " + workerId)}// 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作if (canCompleteRecovery) { completeRecovery() }

3.2 Master收到AppClient的MasterChangeAcknowledged消息

# 更新application状态为WAITTING

# 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作

case MasterChangeAcknowledged(appId) =>idToApp.get(appId) match {case Some(app) =>logInfo("Application has been re-registered: " + appId)app.state = ApplicationState.WAITINGcase None =>logWarning("Master change ack from unknown app: " + appId)}// 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作if (canCompleteRecovery) { completeRecovery() }

Spark源码分析之Master状态改变处理机制原理相关推荐

  1. Spark源码分析之Master启动和通信机制

    Master主要就是用于管理集群,负责资源的调度什么的.它继承了ThreadSafeRpcEndpoint和LeaderElectable,由于继承ThreadSafeRpcEndpoint,所以Ma ...

  2. Spark源码分析之Master主备切换机制

    Master作为Spark standalone模式的核心,如果Master出现异常,那么集群就不能正常工作.所以Spark会从Standby中选择一个节点作为Master. Spark支持以下几种策 ...

  3. Spark源码分析之Master资源调度算法原理

    Master是通过schedule方法进行资源调度,告知worker启动executor等. 一schedule方法 1判断master状态,只有alive状态的master才可以进行资源调度,sta ...

  4. Spark源码分析之Master注册机制原理

    一 Worker向Master注册 1.1 Worker启动,调用registerWithMaster,向Master注册 当worker启动的时候,会调用registerWithMaster方法 # ...

  5. Spark源码分析 – DAGScheduler

    DAGScheduler的架构其实非常简单, 1. eventQueue, 所有需要DAGScheduler处理的事情都需要往eventQueue中发送event 2. eventLoop Threa ...

  6. spark 源码分析之十八 -- Spark存储体系剖析

    本篇文章主要剖析BlockManager相关的类以及总结Spark底层存储体系. 总述 先看 BlockManager相关类之间的关系如下: 我们从NettyRpcEnv 开始,做一下简单说明. Ne ...

  7. Spark源码分析:多种部署方式之间的区别与联系

    作者:过往记忆 从官方的文档我们可以知道, Spark 的部署方式有很多种:local.Standalone.Mesos.YARN-..不同部署方式的后台处理进程是不一样的,但是如果我们从代码的角度来 ...

  8. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  9. Spark 源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 一. 启动篇 (一) 引子 在spark-shell终端执行 val arr = Array(1,2,3,4) val rdd ...

最新文章

  1. 测试工程师的好日子来啦?Testin发布AI测试产品,提升易用性和自动化效率
  2. python掌握程度怎么判断-想要学习人工智能需要掌握Python到什么程度
  3. boxfilter 函数
  4. python 爬虫爬取小说信息
  5. DCS::TabNotebook
  6. 解决局域网共享时提示:你没有权限访问,请与网络管理员联系
  7. 哈工大2022形式语言与自动机期末
  8. 代码静态检测——QAC
  9. 箩筐火车免费wifi v4.3.0
  10. html 外联 变 内联,Html 内联元素、外联元素 和 可变元素
  11. 分享下nirsoft提供的注册表工具
  12. #PixelConFi | 这个教师节,以投票代替祝福
  13. Python之You-Get库学习
  14. 最新官方新浪短网址API接口分享-附代码调用演示
  15. HBuilderX 打包问题
  16. 页面5秒钟刷新一次(html,php)均可用
  17. Axure第一周学习日志
  18. 第八题、哈夫曼编码大全
  19. 赛尔号服务器维护时间4月27,赛尔号03月27日更新攻略汇总 瀚海界神重获新生
  20. mescroll.js下拉刷新

热门文章

  1. 亚太数学建模竞赛优秀论文_全国大学生数学建模竞赛介绍
  2. qimage加载bmp图片_9个最佳的优化动态gif图片大小的工具
  3. 微信小程序 服务器触发事件,微信小程序组件间通讯与事件
  4. audio标签的controls属性_HTML5 新增标签和属性
  5. 数字取证Linux发行版,Parrot 4.2.2 发布,数字取证Linux发行版
  6. onu光功率多少是正常_ONU、机顶盒、路由器常见网络问题及处理方法
  7. Java 算法 等差数列
  8. Linux zip加密压缩
  9. 解决Mac安装LightGBM报错LightGBM and gcc 8 in MacOS: Library not loaded
  10. Linux解压tar.gz、zip、tar.bz2 文件与对应的命令