一.Spark Core中的checkpoint

def main(args: Array[String]) {val spark = SparkSession.builder().appName("Checkpoint Test").master("local[2]").getOrCreate()val sc = spark.sparkContextsc.setCheckpointDir("checkpoint")val data = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'))val pairs = sc.parallelize(data, 3)pairs.cache()pairs.checkpoint()println(pairs.count)
}

二.Spark Streaming中的checkpoint

checkpoint主要保存:

  • 1.metadata(一些配置)
  • 2.RDD数据(保存状态)

1.无状态

def main(args: Array[String]): Unit = {val checkpointDirectory = "offset/checkpoints"def functionToCreateContext(): StreamingContext = {val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")val ssc = new StreamingContext(sparkConf,Seconds(2))  ssc.checkpoint(checkpointDirectory)val kafkaParams = Map[String, Object]("bootstrap.servers" -> "hadoop000:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "my-spark-group-1","auto.offset.reset" -> "earliest","enable.auto.commit" -> (false: java.lang.Boolean))val topics = Array("my-topic")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))val words = stream.flatMap(_.value().split(" ")).map(word => (word, 1))words.print()ssc}val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)ssc.start()ssc.awaitTermination()
}

2.自定义有状态

用到updateStateByKey函数进行状态保存。

val words = stream.flatMap(_.value().split(" ")).map(word => (word, 1)).updateStateByKey[Int](updateFunction _)
words.print()def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {val newCount = newValues.sumval old = runningCount.getOrElse(0)Some(newCount+old)
}

checkpoint-time这些文件就是元数据,保存ssc的配置和状态。 如果driver挂掉,yarn会自动重启AM,则通过StreamingContext.getOrCreate来重新获得ssc,不用getOrCreate,则每次重启都会重新new一个ssc对象,会丢掉过去信息。

  • 元数据保存当前应用的信息,如果更改应用,会出问题,比如批次间隔以前是10s,现在改为2s,跑的时候依然是10s。
  • 所以用来保存offset的话,程序还是会用以前的ssc的配置和状态,但是更改应用后,处理起来会很麻烦。

UUID文件夹下就是保存状态的RDD数据。

3.窗口函数中的有状态和无状态

单纯的window函数是无状态的。
countByWindow函数是有状态的,需要checkpoint来保存状态。

4.问答

Q1:streaming中checkpoint是在何时做的?

A1:在spark streaming中,jobGenerator会定期生成任务(jobGenerator.generateJobs)。在任务生成后将会调用doCheckpoint方法对系统做checkpoint。此外,在当前批次任务结束,清理metadata(jobGenerator.clearMetadata)时,也会调用doCheckpoint方法。

Q2:在streaming checkpoint过程中,具体都写入了哪些数据到checkpoint目录?

A2: 做checkpoint的主要逻辑基本都在JobGenerator.doCheckpoint方法中。

在该方法中,首先更新当前时间段需要做checkpoint RDD的相关信息,如在DirectKafkaInputDStream中,将已经生成的RDD信息的时间,topic,partition,offset等相关信息进行更新。

其次,通过checkpointWriter将Checkpoint对象写入到checkpoint目录中(CheckPoint.write → CheckpointWriteHandle)。至此,我们清楚了,写入到checkpoint目录的数据其实就是Checkpoint对象。

Checkpoint主要包含的信息如下:

val master = ssc.sc.master
val framework = ssc.sc.appName
val jars = ssc.sc.jars
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.getPendingTimes().toArray
val sparkConfPairs = ssc.conf.getAll

具体包括相关配置信息,checkpoint目录,DStreamGraph等。对于DStreamGraph,主要包含InputDstream以及outputStream等相关信息,从而我们可以看出定义应用相关的计算函数也被序列化保存到checkpoint目录中了。

三.Spark Structured Streaming中的checkpoint

def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName(this.getClass.getName).config("spark.sql.shuffle.partitions", 10).getOrCreate()import spark.implicits._val lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "hadoop000:9092").option("subscribe", "my-topic").load().selectExpr("CAST(value AS STRING)").as[String].flatMap(_.split(" ")).map(word => (word, 1))//.groupBy("value")//.count().writeStream.outputMode("append").format("console").option("checkpointLocation","sss/chk").start().awaitTermination()
}

内置batchID,重启前batchID为5,重新启动后,第一个batchID为6,batchID是全局唯一的。

offsets文件夹下每个文件都是一个batch的偏移量。

{"batchWatermarkMs":0,"batchTimestampMs":1647251960388,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"10"}}
{"my-topic":{"0":81}}

稍微更改下业务逻辑,flatMap(.split(" ")) 改为 flatMap(.split(",")) 依然能继续消费,ss也能。
但是ss取消map(word => (word, 1))操作,会报错,sss不会。因为ss的checkpoint中写死了操作集的元数据信息。

sss中append模式改为complete模式则会报错。

.groupBy("value")
.count()
.writeStream
.outputMode("complete")

使用聚合操作,会产生state文件夹,这是内存中的状态持久化到容错存储里。

.flatMap(_.split(","))
.groupBy("value")
.count()
.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation","sss/chk")

个参数决定了state/0 下文件夹下的个数。
delta前的数字每个批次完成后加一。

四.checkpoint比较

五.参考文章

Spark中的checkpoint机制_程研板的博客-CSDN博客_spark的checkpoint机制

Spark中的checkpoint机制相关推荐

  1. Spark cache和checkpoint机制

    2019独角兽企业重金招聘Python工程师标准>>> Spark cache和checkpoint机制 博客分类: spark 1. RDD cache缓存 当持久化某个RDD后, ...

  2. Spark Streaming之checkpoint机制

    一 什么类型的数据需要使用checkpoint? Spark Streaming是最需要进行容错的,因为一般都是7 * 24小时运转,所以需要将足够的信息checkpoint到容错的存储系统上,比如H ...

  3. spark基础之spark streaming的checkpoint机制

    一 什么类型的数据需要使用checkpoint? Spark Streaming是最需要进行容错的,因为一般都是7 * 24小时运转,所以需要将足够的信息checkpoint到容错的存储系统上,比如H ...

  4. spark基础之checkpoint机制

    一 Spark中Checkpoint是什么 假设一个应用程序特别复杂场景,从初始RDD开始到最后整个应用程序完成,有非常多的步骤,比如超过20个transformation操作,而且整个运行时间也比较 ...

  5. Spark Streaming 实战案例(四) Spark Streaming 缓存、Checkpoint机制

    主要内容 Spark Stream 缓存 Checkpoint 案例 1. Spark Stream 缓存 通过前面一系列的课程介绍,我们知道DStream是由一系列的RDD构成的,它同一般的RDD一 ...

  6. Spark源码分析之Checkpoint机制

    对于一个复杂的RDD,我们如果担心某些关键的,会在后面反复使用的RDD,可能会因为节点的故障,导致持久化数据的丢失,就可以针对该RDD启动checkpoint机制,实现容错和高可用. 在进行check ...

  7. spark中的cache()、persist()和checkpoint()的区别

    首先,这三者都是做RDD持久化的,cache()和persist()是将数据默认缓存在内存中,checkpoint()是将数据做物理存储的(本地磁盘或Hdfs上),当然rdd.persist(Stor ...

  8. Spark中CheckPoint、Cache、Persist的用法、区别

    Spark中CheckPoint.Cache.Persist 大家好,我是一拳就能打爆A柱的猛男 这几天看到一套视频<尚硅谷2021迎新版大数据Spark从入门到精通>,其中有关于检查点( ...

  9. Spark中cache、persist、checkpoint区别

    spark中的cache.persist.checkpoint都可以将RDD保存起来,进行持久化操作,供后面重用或者容错处理.但是三者有所不同. cache 将数据临时存储在内存中进行数据重用,不够安 ...

最新文章

  1. 深度学习 -- TensorFlow(9)循环神经网络RNN
  2. matlab复数方程的根,matlab解一元三次方程,得到的都是复数根。
  3. APUE 学习笔记(三) 文件和目录
  4. 8.1.4 CSS3文字(1)( 文字阴影和描边、文字排版、自定义文字 )
  5. 我经常逛的技术网站,个个经典
  6. Protocol(协议)(二十)
  7. 用计算机探索规律概括,《用计算器探索规律》优秀教学设计
  8. 软件产品需求分析思维导图
  9. 小样本学习记录————MAML的改进MAML++
  10. 浅谈python运算符运算法则
  11. java标识符命名规范之驼峰命名法
  12. 塑造棋牌游戏文化内涵
  13. [1304]求圆的周长和面积(Java)
  14. 自旋锁与适应性自旋锁
  15. 根据字体的中文名 获取 字体的路径 和 英文名
  16. 主要有哪些具体的技术指标?
  17. 从ERP谈到中国企业升级
  18. 视频教程-实用数据分析:数据分析师从小白到精通-大数据
  19. android获取手机的当前OS版本。sdk版本
  20. 反对称矩阵乘任意矩阵满足交换性?

热门文章

  1. 人体模型 java代码_【人体分析-人体关键点识别】-Java示例代码
  2. 常规项目风险识别的规范流程和方法
  3. 首先的亚当和末后的亚当_亚当–实时短片动画
  4. Android Studio ADB5037端口被vschost.exe占用
  5. 海盗船电源RM1000x优势解析:这款电源到底强在哪里?
  6. 微服务应用性能分析实战11 资源节点树:通过 Sentinel 无侵入实现流量链生成规则
  7. 大学计算机高级应用考试,2016年大学计算机考试选择题及答案
  8. torch.unsqueeze和 torch.squeeze() 详解
  9. JQ实现智能校验表单
  10. uniapp微信公众号跳转到小程序(是点击微信页面上面的按钮/菜单跳转)