Spark Streaming之checkpoint机制
一 什么类型的数据需要使用checkpoint?
Spark Streaming是最需要进行容错的,因为一般都是7 * 24小时运转,所以需要将足够的信息checkpoint到容错的存储系统上,比如HDFS,从而能够让他从失败中进行恢复。有两种数据需要被进行checkpoint:
1、元数据checkpoint:
# 配置信息:创建spark streaming应用程序的配置信息,比如SparkConf中的信息
# DStream的操作信息:定义了应用程序计算逻辑的DStream操作信息
# 未处理的batch信息:那些job正在排队,还没处理的batch信息
2、数据checkpoint
将实时计算过程中产生的RDD的数据保存到可靠的存储系统之中。对于一些将多个batch的数据进行聚合的,有状态的转换操作,这是很有用的,在这种操作中,生成的RDD是依赖于之前的batch的RDD的,这会导致随着时间的推移,RDD的依赖链条变得越来越长。
要避免由于依赖链条越拉越长,导致的一起变得越来越长的失败恢复时间,有状态的转换操作执行过程中产生的RDD,会定期的被checkpoint到可靠的存储系统上,比如HDFS.从而消减RDD的依赖链条,进行而缩短失败恢复时候的RDD恢复时间。
所以:元数据checkpoint主要是为了从driver失败中恢复过来;而RDD checkpoint主要是为了,使用到有状态的转换操作的时候,能够在其生产出的数据丢失时进行快速的恢复。
二 什么时候启用checkpoint机制?
2.1 使用了有状态的转换操作
比如updateStateByKey或者reduceByKeyAndWindow操作
2.2 要保证可以从driver失败中进行恢复
比如元数据的checkpoint需要启用
当然如果不是必须要从driver失败中恢复或者没有使用到转换操作,那么也就无需启用checkpoint,这样反而有助于提升性能
三 如何启用checkpoint机制?
3.1 对于有状态的转换操作,启用checkpoint机制,是比较简单的,定期将其产生的RDD数据checkpoint。可以通过配置容错文件系统,比如HDFS的目录,来启用checkpoint机制,checkpoint数据就会写入该目录。
3.2 如果为了要从driver失败中恢复,那么启用checkpoint机制是比较复杂的。需要改写spark streaming应用程序
第一步:
当应用程序第一次启动的时候,需要创建一个StreamingContext,并且调用其start方法进行启动。当driver从失败中恢复过来时,需要从checkpoint目录记录的元数据中恢复出来一个StreamingContext。
如下代码所示:
val checkpointDir = "hdfs://hdfs-cluster/user/spark/chkdir01"
def createContext():StreamingContext = {
val conf = new SparkConf().setAppName("Driver Checkpoint").setMaster("local[*]")
val ssc = new StreamingContext(conf,Seconds(2))
val hostname = "hadoop-all-01"
val port = 9999;
val lines = ssc.socketTextStream(hostname,port)
ssc.checkpoint(checkpointDir)
ssc
}
val context = StreamingContext.getOrCreate(checkpointDir,createContext)
context.start()
context.awaitTermination()
第二步:
必须确保Driver可以在失败时,自动重启。要是能够从Driver失败中恢复过来,运行spark streaming应用程序的集群,就必须监控driver的运行的过程,并且在它失败的时候将它重启,对于standalone需要配置supervise driver,在它失败时将其重启
在spark-submit中,添加--deploy-mode参数,默认值是client,即在提交应用程序的机器上启动driver,但是要能够重启driver就必须设置为cluster,此外需要添加--supervise参数
Spark Streaming之checkpoint机制相关推荐
- spark基础之spark streaming的checkpoint机制
一 什么类型的数据需要使用checkpoint? Spark Streaming是最需要进行容错的,因为一般都是7 * 24小时运转,所以需要将足够的信息checkpoint到容错的存储系统上,比如H ...
- Spark中的checkpoint机制
一.Spark Core中的checkpoint def main(args: Array[String]) {val spark = SparkSession.builder().appName(& ...
- Spark Streaming metadata checkpoint
Checkpointing 一个流应用程序必须全天候运行,所有必须能够解决应用程序逻辑无关的故障(如系统错误,JVM崩溃等).为了使这成为可能,Spark Streaming需要checkpoint足 ...
- sparkstreaming监听hdfs目录如何终止_Spark笔试题:Spark Streaming 反压机制
Spark Streaming 反压机制是1.5版本推出的特性,用来解决处理速度比摄入速度慢的情况,简单来讲就是做流量控制.当批处理时间(Batch Processing Time)大于批次间隔(Ba ...
- Spark Streaming的工作机制
1. Spark Streaming的工作机制 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理. 支持从多种数据源获取数据,包括K ...
- Spark cache和checkpoint机制
2019独角兽企业重金招聘Python工程师标准>>> Spark cache和checkpoint机制 博客分类: spark 1. RDD cache缓存 当持久化某个RDD后, ...
- Spark Streaming之容错机制以及事务语义
我们知道RDD本身是一个不可变的,可重新计算的.分布式的数据集.每一个RDD都会记住确定好的操作血缘关系. 如果因为某些原因,导致某个worker节点失败,则导致RDD的某个partition数据丢失 ...
- Spark Streaming的WAL机制
WAL(Write Ahead Logs)是Spark中的一个保障HA(High Available)的机制, 在Hbase中也有应用到 抛开带着很多专业词的场景假设, 我觉得应该把技术上的事情用尽可 ...
- spark基础之checkpoint机制
一 Spark中Checkpoint是什么 假设一个应用程序特别复杂场景,从初始RDD开始到最后整个应用程序完成,有非常多的步骤,比如超过20个transformation操作,而且整个运行时间也比较 ...
最新文章
- 利用计算机软件温度补偿,基于自主传感器信号调理芯片温度补偿的软件设计
- 配置hadoop集群一
- 怎么设置ppt页面的长度和宽度_将PPT里的字弄很小,PPT就有逼格吗?
- 2.11 矩阵和实数运算不同之处
- arduino uno电压_Arduino UNO中文数据手册
- html取元素的文本,解析HTML以获取元素内的文本
- Navicat安装配置
- AlphaControls TsSkinManager 控件
- 离散数学(第二版) 第一章、第二章习题
- java输出日志_Java日志打印方法
- 2分钟搞定收货地址三级联动,数据易于维护,更新。
- 关于总线、现场总线、RS-485和modbus之间的关系
- AI插画设计,用AI制作一个只可爱的短腿柯基插画
- Map的某种创建方式
- C语言游戏: 俄罗斯方块(Tetris)@兼谈程序优化方法 [源码+exe下载]
- CSS 标签诡异添加 injected stylesheet
- 你觉得程序员最需要具备哪些软技能?
- Python技巧——解析式
- 【EMV L2】Select PSE应用选择相关的卡片数据格式
- FPGA到底是什么?
热门文章
- 剑灵电五服务器位置,选对服务器很重要 剑灵新手选服攻略
- overleaf创建表格
- 常用概率论矩阵论公式
- 10恢复出厂设置_笔记本电脑怎么恢复出厂设置
- js html 拼接,JavaScript concat() 方法
- set集合 结构体_Redis底层数据结构
- matlab矩阵中的 *、/、\、.*
- Java 设计模式之 Visitor 访问者模式
- 5数之和python_Python基本语法5:数字和列表,基础,五
- vega56刷64_Vega56刷入BIOS跑分直逼旗舰Vega64