Spark中的checkpoint机制
一.Spark Core中的checkpoint
def main(args: Array[String]) {val spark = SparkSession.builder().appName("Checkpoint Test").master("local[2]").getOrCreate()val sc = spark.sparkContextsc.setCheckpointDir("checkpoint")val data = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'))val pairs = sc.parallelize(data, 3)pairs.cache()pairs.checkpoint()println(pairs.count)
}
二.Spark Streaming中的checkpoint
checkpoint主要保存:
- 1.metadata(一些配置)
- 2.RDD数据(保存状态)
1.无状态
def main(args: Array[String]): Unit = {val checkpointDirectory = "offset/checkpoints"def functionToCreateContext(): StreamingContext = {val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")val ssc = new StreamingContext(sparkConf,Seconds(2)) ssc.checkpoint(checkpointDirectory)val kafkaParams = Map[String, Object]("bootstrap.servers" -> "hadoop000:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "my-spark-group-1","auto.offset.reset" -> "earliest","enable.auto.commit" -> (false: java.lang.Boolean))val topics = Array("my-topic")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))val words = stream.flatMap(_.value().split(" ")).map(word => (word, 1))words.print()ssc}val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)ssc.start()ssc.awaitTermination()
}
2.自定义有状态
用到updateStateByKey函数进行状态保存。
val words = stream.flatMap(_.value().split(" ")).map(word => (word, 1)).updateStateByKey[Int](updateFunction _)
words.print()def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {val newCount = newValues.sumval old = runningCount.getOrElse(0)Some(newCount+old)
}
checkpoint-time这些文件就是元数据,保存ssc的配置和状态。 如果driver挂掉,yarn会自动重启AM,则通过StreamingContext.getOrCreate来重新获得ssc,不用getOrCreate,则每次重启都会重新new一个ssc对象,会丢掉过去信息。
- 元数据保存当前应用的信息,如果更改应用,会出问题,比如批次间隔以前是10s,现在改为2s,跑的时候依然是10s。
- 所以用来保存offset的话,程序还是会用以前的ssc的配置和状态,但是更改应用后,处理起来会很麻烦。
UUID文件夹下就是保存状态的RDD数据。
3.窗口函数中的有状态和无状态
单纯的window函数是无状态的。
countByWindow函数是有状态的,需要checkpoint来保存状态。
4.问答
Q1:streaming中checkpoint是在何时做的?
A1:在spark streaming中,jobGenerator会定期生成任务(jobGenerator.generateJobs)。在任务生成后将会调用doCheckpoint方法对系统做checkpoint。此外,在当前批次任务结束,清理metadata(jobGenerator.clearMetadata)时,也会调用doCheckpoint方法。
Q2:在streaming checkpoint过程中,具体都写入了哪些数据到checkpoint目录?
A2: 做checkpoint的主要逻辑基本都在JobGenerator.doCheckpoint方法中。
在该方法中,首先更新当前时间段需要做checkpoint RDD的相关信息,如在DirectKafkaInputDStream中,将已经生成的RDD信息的时间,topic,partition,offset等相关信息进行更新。
其次,通过checkpointWriter将Checkpoint对象写入到checkpoint目录中(CheckPoint.write → CheckpointWriteHandle)。至此,我们清楚了,写入到checkpoint目录的数据其实就是Checkpoint对象。
Checkpoint主要包含的信息如下:
val master = ssc.sc.master val framework = ssc.sc.appName val jars = ssc.sc.jars val graph = ssc.graph val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll
具体包括相关配置信息,checkpoint目录,DStreamGraph等。对于DStreamGraph,主要包含InputDstream以及outputStream等相关信息,从而我们可以看出定义应用相关的计算函数也被序列化保存到checkpoint目录中了。
三.Spark Structured Streaming中的checkpoint
def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName(this.getClass.getName).config("spark.sql.shuffle.partitions", 10).getOrCreate()import spark.implicits._val lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "hadoop000:9092").option("subscribe", "my-topic").load().selectExpr("CAST(value AS STRING)").as[String].flatMap(_.split(" ")).map(word => (word, 1))//.groupBy("value")//.count().writeStream.outputMode("append").format("console").option("checkpointLocation","sss/chk").start().awaitTermination()
}
内置batchID,重启前batchID为5,重新启动后,第一个batchID为6,batchID是全局唯一的。
{"batchWatermarkMs":0,"batchTimestampMs":1647251960388,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"10"}}
{"my-topic":{"0":81}}
.groupBy("value")
.count()
.writeStream
.outputMode("complete")
使用聚合操作,会产生state文件夹,这是内存中的状态持久化到容错存储里。
.flatMap(_.split(","))
.groupBy("value")
.count()
.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation","sss/chk")
个参数决定了state/0 下文件夹下的个数。
delta前的数字每个批次完成后加一。
四.checkpoint比较
五.参考文章
Spark中的checkpoint机制_程研板的博客-CSDN博客_spark的checkpoint机制
Spark中的checkpoint机制相关推荐
- Spark cache和checkpoint机制
2019独角兽企业重金招聘Python工程师标准>>> Spark cache和checkpoint机制 博客分类: spark 1. RDD cache缓存 当持久化某个RDD后, ...
- Spark Streaming之checkpoint机制
一 什么类型的数据需要使用checkpoint? Spark Streaming是最需要进行容错的,因为一般都是7 * 24小时运转,所以需要将足够的信息checkpoint到容错的存储系统上,比如H ...
- spark基础之spark streaming的checkpoint机制
一 什么类型的数据需要使用checkpoint? Spark Streaming是最需要进行容错的,因为一般都是7 * 24小时运转,所以需要将足够的信息checkpoint到容错的存储系统上,比如H ...
- spark基础之checkpoint机制
一 Spark中Checkpoint是什么 假设一个应用程序特别复杂场景,从初始RDD开始到最后整个应用程序完成,有非常多的步骤,比如超过20个transformation操作,而且整个运行时间也比较 ...
- Spark Streaming 实战案例(四) Spark Streaming 缓存、Checkpoint机制
主要内容 Spark Stream 缓存 Checkpoint 案例 1. Spark Stream 缓存 通过前面一系列的课程介绍,我们知道DStream是由一系列的RDD构成的,它同一般的RDD一 ...
- Spark源码分析之Checkpoint机制
对于一个复杂的RDD,我们如果担心某些关键的,会在后面反复使用的RDD,可能会因为节点的故障,导致持久化数据的丢失,就可以针对该RDD启动checkpoint机制,实现容错和高可用. 在进行check ...
- spark中的cache()、persist()和checkpoint()的区别
首先,这三者都是做RDD持久化的,cache()和persist()是将数据默认缓存在内存中,checkpoint()是将数据做物理存储的(本地磁盘或Hdfs上),当然rdd.persist(Stor ...
- Spark中CheckPoint、Cache、Persist的用法、区别
Spark中CheckPoint.Cache.Persist 大家好,我是一拳就能打爆A柱的猛男 这几天看到一套视频<尚硅谷2021迎新版大数据Spark从入门到精通>,其中有关于检查点( ...
- Spark中cache、persist、checkpoint区别
spark中的cache.persist.checkpoint都可以将RDD保存起来,进行持久化操作,供后面重用或者容错处理.但是三者有所不同. cache 将数据临时存储在内存中进行数据重用,不够安 ...
最新文章
- 深度学习 -- TensorFlow(9)循环神经网络RNN
- matlab复数方程的根,matlab解一元三次方程,得到的都是复数根。
- APUE 学习笔记(三) 文件和目录
- 8.1.4 CSS3文字(1)( 文字阴影和描边、文字排版、自定义文字 )
- 我经常逛的技术网站,个个经典
- Protocol(协议)(二十)
- 用计算机探索规律概括,《用计算器探索规律》优秀教学设计
- 软件产品需求分析思维导图
- 小样本学习记录————MAML的改进MAML++
- 浅谈python运算符运算法则
- java标识符命名规范之驼峰命名法
- 塑造棋牌游戏文化内涵
- [1304]求圆的周长和面积(Java)
- 自旋锁与适应性自旋锁
- 根据字体的中文名 获取 字体的路径 和 英文名
- 主要有哪些具体的技术指标?
- 从ERP谈到中国企业升级
- 视频教程-实用数据分析:数据分析师从小白到精通-大数据
- android获取手机的当前OS版本。sdk版本
- 反对称矩阵乘任意矩阵满足交换性?
热门文章
- 人体模型 java代码_【人体分析-人体关键点识别】-Java示例代码
- 常规项目风险识别的规范流程和方法
- 首先的亚当和末后的亚当_亚当–实时短片动画
- Android Studio ADB5037端口被vschost.exe占用
- 海盗船电源RM1000x优势解析:这款电源到底强在哪里?
- 微服务应用性能分析实战11 资源节点树:通过 Sentinel 无侵入实现流量链生成规则
- 大学计算机高级应用考试,2016年大学计算机考试选择题及答案
- torch.unsqueeze和 torch.squeeze() 详解
- JQ实现智能校验表单
- uniapp微信公众号跳转到小程序(是点击微信页面上面的按钮/菜单跳转)