Spark源码阅读02-Spark核心原理之容错及HA(高可用)
容错及HA(高可用)
- 概念介绍
- Executor异常
- Worker异常
- Master异常
概念介绍
容错指的是一个系统在部分模块出现故障时还能否持续的对外提供服务,一个高可用(HA)的系统应该具有很高的容错性,系统不会因为一点小的错误导致系统性能严重下降或者造成系统瘫痪。
对于一个大的集群系统来说,机器故障、网络异常等都是很常见的,其不能因为这些原因造成集群系统不能提供服务。所以像Spark这样的大型分布式计算集群提供了很多的容错机制来提高整个系统的可用性,Spark的容错机制可以通过Executor、Worker和Master的异常处理来体现,下面分别对它们进行介绍。
Executor异常
在Spark中,Executor的作用是:负责执行任务的运行,并把任务运行状态发给Driver。下面以独立运行模式分析Executor出现异常的情况。Executor异常容错过程图如下:
其中虚线为正常运行的通信线路,实线为异常处理步骤。
流程如下:
(1)首先看Executor的启动过程
在集群中由Master给应用程序分配运行资源后,然后在Worker中启动 ExecutorRunner ,而 ExecutorRunner根据当前的运行模式启动CoarseGrainedExecutorBackend进程,当该进程会向 Driver 发送注册Executor信息。如果注册成功,则CoarseGraineaExecutorBackend在其内部启动Executor。Executor由 ExecutorRunner进行管理,当Executor出现异常时(如所运行容器CoarseGrainedExecutorBackend进程异常退出等),由 ExecutorRunner 捕获该异常并发送 ExecutorStateChanged消息给Worker。
(2) Worker 接收到ExecutorStateChanged消息
在Worker的 handleExecutorStateChanged方法中,根据Executor 状态进行信息更新,同时把 Executor状态信息转发给Master。
(3)Master 接收到 Executor状态变化消息
如果发现Executor 出现异常退出,则调用Master. schedule方法,尝试获取可用的 Worker 节点并启动Executor,而这个Worker很可能不是失败之前运行Executor 的 Worker 节点。该尝试系统会进行10次,如果超过10次,则标记该应用运行失败并移除集群中移除该应用。这种限定失败次数是为了避免提交的应用程序存在Bug而反复提交,进而挤占集群宝贵的资源。
Worker异常
以独立运行模式来讨论Worker异常。Spark的 独立运行模式采用的是Master/Slave的结构,其中Slave是由Worker来担任的其在运行的时候会发送心跳给Master,让 Master 知道Worker 的实时状态。另一方面Master也会检测注册的Worker是否连接超时,因为在集群运行过程中,可能由于机器宕机或者进程被杀死等原因造成Worker进程异常退出。Worker异常容错过程图如下:
(1)Master接收Worker的心疼来监测其实时状态,同时,Master中的onStart方法中也有检测Worker超时的线程。实现代码如下:
case Heartbeat(workerId, worker) =>idToWorker.get(workerId) match {case Some(workerInfo) =>workerInfo.lastHeartbeat = System.currentTimeMillis()case None =>if (workers.map(_.id).contains(workerId)) {logWarning(s"Got heartbeat from unregistered worker $workerId." +" Asking it to re-register.")worker.send(ReconnectWorker(masterUrl))} else {logWarning(s"Got heartbeat from unregistered worker $workerId." +" This worker was never registered, so ignoring the heartbeat.")}}
override def onStart(): Unit = {...
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {self.send(CheckForWorkerTimeOut)}}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
...
}
(2)当Worker出现超时时,Master调用timeOutDeadWorkers方法进行处理。代码实现如下:
/** Check for, and remove, any timed-out workers */private def timeOutDeadWorkers() {// Copy the workers into an array so we don't modify the hashset while iterating through itval currentTime = System.currentTimeMillis()val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArrayfor (worker <- toRemove) {if (worker.state != WorkerState.DEAD) {logWarning("Removing %s because we got no heartbeat in %d seconds".format(worker.id, WORKER_TIMEOUT_MS / 1000))removeWorker(worker, s"Not receiving heartbeat for ${WORKER_TIMEOUT_MS / 1000} seconds")} else {if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it}}}}
Master异常
Master作为Spark独立运行模式中的核心,如果Master出现异常,则整个集群的运行和资源调度将无法进行处理,这会造成非常严重的问题。因此,Spark通过在集群运行的时候,启动一个Master,然后同时启动一个或者多个Standby Master来解决这个问题(高可用)。当Master出现问题的时候,Standby Master将会根据一定的规则确定其中一个来接管Master。Master异常容错过程图如下:
下面介绍几种模式(可以在spark-env.sh的配置项spark。deploy.recoveryMode进行设置,默认NONE):
- ZOOKEEPER:集群的元数据持久化到7ooKeener中,当 Master出现异常时,ZooKeeper会通过选举机制选举出新的Master,新的Master接管时需要从ZooKeeper获取持久化信息并根据这些信息恢复集群状态。具体结构如图4-13所示。
- FILESYSTEM:集群的元数据持久化到太地文件系统中,当 Master出现异常时,只要在该机器上重新启动Master,启动后新的Master 获取持久化信息并根据这些信息恢复集群状态。
- CUSTOM:自定义恢复方式,对StandaloneRecoveryModeFactory抽象类进行实现并把该类配置到系统中,当Master 出现异常时,会根据用户自定义的方式进行恢复集群状太
- NONE:不持久化集群的元数据,当Master出现异常时候,新启动的Master不进行恢复进群状态,而是直接接管集群。
Spark源码阅读02-Spark核心原理之容错及HA(高可用)相关推荐
- Spark源码阅读(五) --- Spark的支持的join方式以及join策略
版本变动 2021-08-30 增加了对Broadcast Hash Join小表大小的评估内容 增加了对Sort Merge Join优于Shuffle Hash Join调用的解释 目录 Spar ...
- Spark源码阅读——任务提交过程
2019独角兽企业重金招聘Python工程师标准>>> Spark 源码阅读--任务提交过程 当我们在使用spark编写mr作业是,最后都要涉及到调用reduce,foreach或者 ...
- Windows + IDEA + SBT 打造Spark源码阅读环境
Spark源码阅读环境的准备 Spark源码是有Scala语言写成的,目前,IDEA对Scala的支持要比eclipse要好,大多数人会选在在IDEA上完成Spark平台应用的开发.因此,Spark源 ...
- 3000门徒内部训练绝密视频(泄密版)第2课:Scala面向对象彻底精通及Spark源码阅读
Scala面向对象彻底精通及Spark源码阅读 不用写public class中的public class Person {private var myName = "flink" ...
- 3000门徒内部训练绝密视频(泄密版)第3课:Scala中函数式编程彻底精通及Spark源码阅读
Scala中函数式编程彻底精通及Spark源码阅读 函数可以不依赖于类,函数可以作为函数的参数,函数可以作为函数的返回值 =>表明对左面的参数进行右面的加工 函数赋值给变量需要在函数名后面加空格 ...
- 3000门徒内部训练绝密视频(泄密版)第5课:彻底精通Scala隐式转换和并发编程及Spark源码阅读
彻底精通Scala隐式转换和并发编程及Spark源码阅读 Akka ,Scala内部并发 隐式转换.隐式类.隐式参数 可以手动指定某种类型的对象或类转换成其他类型的对象或类.转换的原因是假设写好接口 ...
- spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClientFactory剖析
spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClientFactory剖析 TransportContext 首先官方文档对Transpor ...
- FreeSWITCH 1.10 源码阅读(3)-sofia 模块原理及其呼入处理流程
文章目录 1. 前言 2. 源码分析 2.1 sofia 模块的加载 2.2 呼入的处理流程 1. 前言 SIP(Session Initiation Protocol) 是应用层的信令控制协议,有许 ...
- Spark源码阅读——DirectInputDStream
2019独角兽企业重金招聘Python工程师标准>>> Spark源码分析--DirectInputDStream 在Spark-Streaming中,对流的抽象是使用DStream ...
- [以浪为码]Spark源码阅读03 - 序列化介绍 serializer
版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/u013054888/article/details/90237348 系列文章专栏目录:小浪阅读 S ...
最新文章
- 自定义ServicesLoader来实现根据配置使用不通的SPI实现从而实现项目扩展
- POJ1149 PIGS
- Java 8的新增功能(第二部分–可能会出现什么)
- 网站页面左右_广州网站优化的技巧是什么?
- bootstrap 树形表格渲染慢_layUI之树状表格异步加载组件treetableAsync.js(基于treetable.js)...
- TIOBE 6 月编程语言排行榜:Perl 成为 Python 过分炒作的牺牲品?
- Python全栈开发——subprocess struct
- 判断能否组成三角形(水题)
- 表多个字段与其他表关系,left join on 顺序
- (附源码)springboot高校科研管理系统 毕业设计 222055
- 计算机一级在上网题中如何新建文本文件?
- 模拟器安装 xposed
- iOS-设置导航栏颜色(iOS8+)
- c语言投票程序设计,C语言课程设计-投票程序设计.doc
- 恋恋山城 Jean de Florette (1986) 男人的野心 / 弗洛莱特的若望 / 让·德·弗罗莱特 / 水源 下一部 甘泉,玛侬...
- html左右滚动div隐藏部分div,只让DIV出现横向滚动条,窗口不要有滚动条
- Πολιτική απορρήτου
- 技术实践|Redis基础知识及集群搭建(上)
- 网站如何报价 做一个普通企业网站多少钱?
- 顺丰快递电话查询比网上查询更提前更详细