项目中会经常使用到Spark和Flink这些分布式框架,使用的时候老是担心如果出现异常了会怎样,今天就来了解下Spark以及Flink的容错机制。

容错是指一个系统部分出现错误的情况还能持续的提供服务,当集群达到较大的规模以后,很可以出机器故障以及网络延迟等情况,导致某个节点不能提供服务,所以分布式框架一般都会进行高容错设计。

Spark的容错机制:

Master异常退出:

个人理解是,只有StandAlone模式下才需要额外进行Master容错配置。如果是On Yarn模式,资源是由ResourceManager管理的,RM自身有容错机制,无需外部再进行配置了。

和HBase的HMaster很像,Spark会启动多个StandBy Master,当前Master异常时,会按照一定的规格选取其中一个接管Master的工作,有如下几种配置:

1. ZOOKEEPER

将集群元数据持久化到ZK中,基于ZK进行主备切换

2. FILESYSTEM

集群元数据持久化到白嫩地文件系统中

3. CUSTOM

用户自定义恢复方式

4. NONE

不持久化集群元数据。Master出现异常时,新启动的Master不进行恢复集群状态

这些策略在spark-env.sh配置文件中配置,配置项为spark.deploy.recoveryMode,默认为NODE。个人觉得生产环境下建议使用On Yarn模式,进行资源隔离比较好。如果非得使用Standalone,HA建议配置成ZK。

总结下就是Master会进行主备切换,接着移除未响应Master切换信息的driver(application)和Worker,然后重新调度driver执行。

case ElectedLeader =>val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {RecoveryState.ALIVE} else {RecoveryState.RECOVERING}logInfo("I have been elected leader! New state: " + state)if (state == RecoveryState.RECOVERING) {// 1. 先走这个方法,通知driver Master发生变更了beginRecovery(storedApps, storedDrivers, storedWorkers)recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {self.send(CompleteRecovery)}}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)}// 2. 重新调度任务执行
// Kill off any workers and apps that didn't respond to us
// Reschedule drivers which were not claimed by any workers
case CompleteRecovery => completeRecovery()

Worker异常退出:

个人理解是,Worker是StandAlone模式下才有的概念,On Yarn模式对应的是NodeManager,Yarn自身带有NodeManager的容错机制。

Spark Worker会保持和Master的心跳,当Worker出现超时时,Master调用timeOutDeadWorkers()方法进行处理,移除超时的Worker。移除时会通知Driver Executor已经移除(Executor异常处理详见下文),然后设置运行在当前Worker上的Driver重启或者直接删除:

个人理解只有在cluster模式下启动时,才会有Driver的资源调度,如果在client模式下启动,Driver就在提交Job的机器上启动。关于Driver的重启策略这个还真不太清楚,有知道的朋友麻烦告知下...因为用的Client模式比较多,我理解driver挂了整个程序就挂了。

总结下就是Worker挂了会被Master移除,Worker上的Driver有可能会被重新调度执行或者直接移除。

Executor异常退出:

Executor是真正负责任务的执行,然后将任务的运行状态发送给Driver。

Executor发生异常时,外部的包装类ExecutorRunner会把异常信息发送给Worker,然后Worker会讲信息发送给Master。Master 接收到 Executor 状态变化消息后,如果发现 Executor 出现异常退出,则调用 Master.schedule 方法,尝试获取可用的 Worker 节点并重新启动 Executor。

重新启动一个新的Executor会尝试一定次数,如果还不成功,那么整个application就运行失败了。这样是为了保证application不会一直占用集群资源。

Spark Job Task的容错(即RDD的容错机制):

以上所讲的容错其实更多的是说Spark集群自身如何保证任务稳定运行,如果以上异常发生了,个人理解为本次Stage就运行失败了,幸好RDD有容错机制,可以恢复Job。Spark 会对运行失败的Stage进行retry,默认retry 3次。

RDD Lineage血统层容错:

  Spark RDD实现基于Lineage的容错机制,基于RDD的各项transformation构成了compute chain,在部分计算结果丢失的时候可以根据Lineage重新恢复计算。

  (1)在窄依赖中,在子RDD的分区丢失,要重算父RDD分区时,父RDD相应分区的所有数据都是子RDD分区的数据,并不存在冗余计算。
  (2)在宽依赖情况下,丢失一个子RDD分区,重算的每个父RDD的每个分区的所有数据并不是都给丢失的子RDD分区用的,会有一部分数据相当于对应的是未丢失的子RDD分区中需要的数据,整个RDD都要重新计算,这样就会产生冗余计算开销和巨大的性能浪费。所以如果调用链路比较长的话,宽依赖最好做一次Checkpoint。

Checkpoint容错:

  Lineage过长会造成容错成本过高,这时在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。

总结:

1. 个人建议Spark尽量不要使用standalone,而是使用Yarn或者K8S等模式,这样可以做到更好的资源隔离。

2. 如果使用standalone,Master建议基于ZK配置高可用

3.  Worker或者Executor异常退出没关系,Spark stage会重新执行调度。如果Spark job链路过长的话,建议在宽依赖那里执行CheckPoint,加快spark job的恢复。

参考:

https://blog.csdn.net/qq_32603475/article/details/103617089(Hadoop中Yarn的容错机制)

https://www.cnblogs.com/juncaoit/p/6542902.html(Spark容错特性)

https://www.jianshu.com/p/4e1b2d986883(Spark Master主备切换)

https://blog.csdn.net/u010886217/article/details/103289687(Spark RDD的容错机制)

Spark的容错机制相关推荐

  1. 《循序渐进学Spark》一3.5 容错机制及依赖

    本节书摘来自华章出版社<循序渐进学Spark>一书中的第3章,第3.5节,作者 小象学院 杨 磊,更多章节内容可以访问云栖社区"华章计算机"公众号查看. 3.5 容错机 ...

  2. Spark Streaming之容错机制以及事务语义

    我们知道RDD本身是一个不可变的,可重新计算的.分布式的数据集.每一个RDD都会记住确定好的操作血缘关系. 如果因为某些原因,导致某个worker节点失败,则导致RDD的某个partition数据丢失 ...

  3. 25.Flink监控\什么是Metrics\Metrics分类\Flink性能优化的方法\合理调整并行度\合理调整并行度\Flink内存管理\Spark VS Flink\时间机制\容错机制等

    25.Flink监控 25.1.什么是Metrics 25.2.Metrics分类 25.2.1.Metric Types 25.2.2.代码 25.2.3.操作 26.Flink性能优化 26.1. ...

  4. 2021年大数据Flink(二十七):Flink 容错机制 Checkpoint

    目录 Flink 容错机制 Checkpoint State Vs Checkpoint Checkpoint执行流程 简单流程 复杂流程 State状态后端/State存储介质 MemStateBa ...

  5. 通过案例对 spark streaming 透彻理解三板斧之三:spark streaming运行机制与架构

    本期内容: 1. Spark Streaming Job架构与运行机制 2. Spark Streaming 容错架构与运行机制 事实上时间是不存在的,是由人的感官系统感觉时间的存在而已,是一种虚幻的 ...

  6. 深入理解 Flink 容错机制

    本文作者:Paul Lin 本文链接: 2019/07/28/深入理解-Flink-容错机制/ 版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 CN 许可协议.转载请注 ...

  7. Flink核心篇,四大基石、容错机制、广播、反压、序列化、内存管理、资源管理...

    Flink基础篇,基本概念.设计理念.架构模型.编程模型.常用算子 大纲: 1.Flink的四大基石包含哪些? 2.讲一下Flink的Time概念? 3.介绍下Flink窗口,以及划分机制? 4.介绍 ...

  8. RDD的依赖关系、窄依赖、宽依赖、RDD的缓存、RDD缓存方式、DAG的生成、RDD容错机制之Checkpoint

    1.RDD的依赖关系 RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency). 1.1.窄依赖 窄依赖指的是每一 ...

  9. Flink教程(14)- Flink高级API(容错机制)

    文章目录 01 引言 02 Checkpoint 2.1 Checkpoint VS State 2.2 Checkpoint 执行流程 2.2.1 简单流程 2.2.2 复杂流程 2.3 State ...

最新文章

  1. 电子技术基础三_电子技术基础
  2. java 广义表_java 输入广义表 生成二叉树 | 学步园
  3. WCF Testing Tool(转)
  4. SpringMVC jsp界面值渲染不出来
  5. minicom与USB转串口
  6. windos 服务怎么写_我的产品或服务怎么写?
  7. 单体测试书的检查要点
  8. paip.程序不报错自动退出的解决
  9. 如何用c语言写一个简答的整人小程序
  10. 我的世界神奇宝贝服务器怎么修改6v,我的世界神奇宝贝mod修改精灵6V满努力等级图文教程...
  11. 数据流程图,业务流程图,模块结构图
  12. 2020德勤面试开始了吗_四大2020年春招时间曝光!
  13. 2022-03-25 Python作业3
  14. IRP(I/O Request Package)详解
  15. 网页开发的一些尺寸单位
  16. ONES × 中国信通院《中国企业软件研发管理白皮书》即将发布 | 预约直播
  17. 算法提高 质数的后代
  18. 【来日复制粘贴】输出匹配到的字符串
  19. oracle dba 创建视图,拥有dba权限的情况下创建视图报ORA-01031: insufficient privileg错误...
  20. 华为OD机试 - 网上商城优惠活动(Java JS Python)

热门文章

  1. 军品研制过程所需文件-进阶版
  2. win10如何让窗口固定保持在最上层
  3. 如何把HTML转换成动图,视频转gif 如何将视频制作gif动画图片
  4. Python3,掌握这20个小技巧,小菜鸡瞬间变成老码农~
  5. socket连接 error 113 /111 的解决方法
  6. word转换成excel导致身份证错乱的解决办法
  7. SNAT(源地址转换)
  8. redis数据类型及操作
  9. 浅谈 “ 站内信 ” 的实现
  10. MPLab X 配置字的设置