RDD持久化

1. RDD Cache 缓存

说明
RDD 通过Cache 或者Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该RDD 将会被缓存在计算节点的内存中,并供后面重用。

// cache 操作会增加血缘关系,不改变原有的血缘关系
println(wordToOneRdd.toDebugString) // 数据缓存。
wordToOneRdd.cache() // 可以更改存储级别
//mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)

案例实操

package com.atguigu.bigdata.spark.core.rdd.Persistimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark03_RDD_Persist {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val list = List("Hello Spark","Hello Scala")val rdd = sc.makeRDD(list)val flatRDD = rdd.flatMap(_.split(" "))val mapRDD = flatRDD.map(word => {println("@@@@@@@@@@")(word,1)})mapRDD.cache() ///持久化操作val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)reduceRDD.collect().foreach(println)println("***************************")val groupRDD: RDD[(String, Iterable[Int])] = mapRDD.groupByKey()groupRDD.collect().foreach(println)sc.stop()}}

存储级别

object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1)


缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD 的一系列转换,丢失的数据会被重算,由于RDD 的各个Partition 是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist 或 cache。

2. RDD CheckPoint 检查点

说明
所谓的检查点其实就是通过将RDD 中间结果写入磁盘。
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
对RDD 进行checkpoint 操作并不会马上被执行,必须执行Action 操作才能触发。

案例实操

package com.atguigu.bigdata.spark.core.rdd.Persistimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark04_RDD_Persist {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)sc.setCheckpointDir("cp")val list = List("Hello Spark","Hello Scala")val rdd = sc.makeRDD(list)val flatRDD = rdd.flatMap(_.split(" "))val mapRDD = flatRDD.map(word => {println("@@@@@@@@@@")(word,1)})//checkpoint需要落盘,需要指定检查点保存的路径//检查点保存的文件,作业执行完,不会删除//一般的保存路径都是在分布式存储系统中,HDFSmapRDD.checkpoint()val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)reduceRDD.collect().foreach(println)println("***************************")val groupRDD: RDD[(String, Iterable[Int])] = mapRDD.groupByKey()groupRDD.collect().foreach(println)sc.stop()}}

3. 缓存和检查点区别

(1)Cache 缓存只是将数据临时保存起来进行重用,不切断血缘依赖,它会在血缘关系中添加新的依赖,一旦出现问题,它可以从头读取数据。Checkpoint 检查点切断血缘依赖,会重新建立新的血缘关系,它等同于改变的数据源。同时将数据长久的保存在磁盘文件中进行数据重用。
(2)Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存储在HDFS 等容错、高可用的文件系统,可靠性高。
(3)建议对checkpoint () 的RDD 使用Cache 缓存,这样 checkpoint 的job 只需从 Cache 缓存
中读取数据即可,否则需要再从头计算一次RDD。
(4)persist:将数据临时存储在磁盘文件中进行数据重用,涉及到磁盘IO,性能较低,但是数据安全;如果作业执行完毕,临时保存的数据文件就会丢失。

Spark的RDD持久化相关推荐

  1. 2021年大数据Spark(十七):Spark Core的RDD持久化

    目录 RDD 持久化 引入 API 缓存/持久化函数 缓存/持久化级别 释放缓存/持久化 代码演示 总结:何时使用缓存/持久化 RDD 持久化 引入 在实际开发中某些RDD的计算或转换可能会比较耗费时 ...

  2. Spark RDD概念学习系列之rdd持久化、广播、累加器(十八)

    1.rdd持久化 2.广播 3.累加器 1.rdd持久化 通过spark-shell,可以快速的验证我们的想法和操作! 启动hdfs集群 spark@SparkSingleNode:/usr/loca ...

  3. Spark基础学习笔记20:RDD持久化、存储级别与缓存

    文章目录 零.本讲学习目标 一.RDD持久化 (一)引入持久化的必要性 (二)案例演示持久化操作 1.RDD的依赖关系图 2.不采用持久化操作 3.采用持久化操作 二.存储级别 (一)持久化方法的参数 ...

  4. PySpark | RDD持久化 | 共享变量 | Spark内核调度

    文章目录 一.RDD持久化 1.RDD的数据是过程数据 2.RDD缓存 2.1 RDD缓存的特点 2.2 cache()与unpersist()实战 3.RDD CheckPoint 3.1 Chec ...

  5. Spark RDD 持久化

    RDD Cache 缓存 RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中.但是并不是这两个方法被调用时立即缓存,而是触发后面的 ...

  6. 【Spark分布式内存计算框架——Spark Core】6. RDD 持久化

    3.6 RDD 持久化 在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高 ...

  7. 10.Spark之RDD及编程接口

    2019独角兽企业重金招聘Python工程师标准>>> 1.起点Hello World val sc = new SparkContext("spark://...&quo ...

  8. 谈一谈RDD 持久化的三个算子:cache、persist、checkpoint

    这段伪代码的瑕疵: lines = sc.textFile("hdfs://...") errors = lines.filter(_.startsWith("ERROR ...

  9. Spark之RDD理论篇

    Spark的基石RDD: RDD与MapReduce Spark的编程模型是弹性分布式数据集(Resilient Distributed Dataset,RDD),它是MapReduce的扩展和延申, ...

最新文章

  1. WPF的二维绘图(二)——几何图形Geometry
  2. 关于Kubernetes Dashboard漏洞CVE-2018-18264的修复公告
  3. Science发布2021年度十大科学突破榜单:除了AlphaFold2,还有哪些大丰收?
  4. JS Math.sin() 与 Math.cos()
  5. 炉石传说服务器维护有补偿吗,炉石传说官网维护补偿什么时候到 未到原因说明...
  6. 2021 考研 基本常识
  7. rfid射频前端的主要组成部分有_第4章 RFID的射频前端(simple).ppt
  8. android 微信支付,body为中文字符,签名错误
  9. Python面向对象,站在更高的角度来思考
  10. WHY数学表达式的3D可视化
  11. dubbo服务RpcException异常:Tried 3 times of the providers 或com.alibaba.dubbo.remoting.TimeoutException
  12. Python 异常处理
  13. 三星530换固态硬盘_笔记本电脑换固态硬盘+装系统
  14. 【计算机网络】网络安全 : 对称密钥分配 ( 密钥分配 | 密钥分配中心 KDC | 对称密钥分配 | 密钥分配协议 | Kerberos 协议 )
  15. 微信小程序--瀑布流
  16. android 获取路由器mac,android设备获取当前wifi下的路由器的mac和路由器的名称
  17. 160603、使用pd4ml.jar和ss_css2.jar转pdf的工具类
  18. 概率论05 - 随机变量及其分布函数
  19. 羊毛出在狗身上,猪来买单
  20. win10无法登录Microsoft账号(登录页面无法加载)

热门文章

  1. SpringMVC数据绑定与转换代码分析
  2. 并发控制:(三)MVCC 多版本并发控制
  3. 基于window-based模板的多View程序(转)
  4. 怀念 儿时课本贴图,你还记得课文名吗
  5. oracle你如何重置序列号,oracle sequence语句重置方介绍
  6. java xml 学习_java学习(四)xml
  7. C语言中 #pragma pack()
  8. matlab自适应逆控制,基于matlab仿真模块的自适应有源噪声逆控制研究
  9. pycharm 链接wsl和 wsl 配置cuda nvidia
  10. MFCC+DTW做声纹识别