Spark的RDD持久化
RDD持久化
1. RDD Cache 缓存
说明
RDD 通过Cache 或者Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该RDD 将会被缓存在计算节点的内存中,并供后面重用。
// cache 操作会增加血缘关系,不改变原有的血缘关系
println(wordToOneRdd.toDebugString) // 数据缓存。
wordToOneRdd.cache() // 可以更改存储级别
//mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)
案例实操
package com.atguigu.bigdata.spark.core.rdd.Persistimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark03_RDD_Persist {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val list = List("Hello Spark","Hello Scala")val rdd = sc.makeRDD(list)val flatRDD = rdd.flatMap(_.split(" "))val mapRDD = flatRDD.map(word => {println("@@@@@@@@@@")(word,1)})mapRDD.cache() ///持久化操作val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)reduceRDD.collect().foreach(println)println("***************************")val groupRDD: RDD[(String, Iterable[Int])] = mapRDD.groupByKey()groupRDD.collect().foreach(println)sc.stop()}}
存储级别
object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD 的一系列转换,丢失的数据会被重算,由于RDD 的各个Partition 是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist 或 cache。
2. RDD CheckPoint 检查点
说明
所谓的检查点其实就是通过将RDD 中间结果写入磁盘。
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
对RDD 进行checkpoint 操作并不会马上被执行,必须执行Action 操作才能触发。
案例实操
package com.atguigu.bigdata.spark.core.rdd.Persistimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark04_RDD_Persist {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)sc.setCheckpointDir("cp")val list = List("Hello Spark","Hello Scala")val rdd = sc.makeRDD(list)val flatRDD = rdd.flatMap(_.split(" "))val mapRDD = flatRDD.map(word => {println("@@@@@@@@@@")(word,1)})//checkpoint需要落盘,需要指定检查点保存的路径//检查点保存的文件,作业执行完,不会删除//一般的保存路径都是在分布式存储系统中,HDFSmapRDD.checkpoint()val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)reduceRDD.collect().foreach(println)println("***************************")val groupRDD: RDD[(String, Iterable[Int])] = mapRDD.groupByKey()groupRDD.collect().foreach(println)sc.stop()}}
3. 缓存和检查点区别
(1)Cache 缓存只是将数据临时保存起来进行重用,不切断血缘依赖,它会在血缘关系中添加新的依赖,一旦出现问题,它可以从头读取数据。Checkpoint 检查点切断血缘依赖,会重新建立新的血缘关系,它等同于改变的数据源。同时将数据长久的保存在磁盘文件中进行数据重用。
(2)Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存储在HDFS 等容错、高可用的文件系统,可靠性高。
(3)建议对checkpoint () 的RDD 使用Cache 缓存,这样 checkpoint 的job 只需从 Cache 缓存
中读取数据即可,否则需要再从头计算一次RDD。
(4)persist:将数据临时存储在磁盘文件中进行数据重用,涉及到磁盘IO,性能较低,但是数据安全;如果作业执行完毕,临时保存的数据文件就会丢失。
Spark的RDD持久化相关推荐
- 2021年大数据Spark(十七):Spark Core的RDD持久化
目录 RDD 持久化 引入 API 缓存/持久化函数 缓存/持久化级别 释放缓存/持久化 代码演示 总结:何时使用缓存/持久化 RDD 持久化 引入 在实际开发中某些RDD的计算或转换可能会比较耗费时 ...
- Spark RDD概念学习系列之rdd持久化、广播、累加器(十八)
1.rdd持久化 2.广播 3.累加器 1.rdd持久化 通过spark-shell,可以快速的验证我们的想法和操作! 启动hdfs集群 spark@SparkSingleNode:/usr/loca ...
- Spark基础学习笔记20:RDD持久化、存储级别与缓存
文章目录 零.本讲学习目标 一.RDD持久化 (一)引入持久化的必要性 (二)案例演示持久化操作 1.RDD的依赖关系图 2.不采用持久化操作 3.采用持久化操作 二.存储级别 (一)持久化方法的参数 ...
- PySpark | RDD持久化 | 共享变量 | Spark内核调度
文章目录 一.RDD持久化 1.RDD的数据是过程数据 2.RDD缓存 2.1 RDD缓存的特点 2.2 cache()与unpersist()实战 3.RDD CheckPoint 3.1 Chec ...
- Spark RDD 持久化
RDD Cache 缓存 RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中.但是并不是这两个方法被调用时立即缓存,而是触发后面的 ...
- 【Spark分布式内存计算框架——Spark Core】6. RDD 持久化
3.6 RDD 持久化 在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高 ...
- 10.Spark之RDD及编程接口
2019独角兽企业重金招聘Python工程师标准>>> 1.起点Hello World val sc = new SparkContext("spark://...&quo ...
- 谈一谈RDD 持久化的三个算子:cache、persist、checkpoint
这段伪代码的瑕疵: lines = sc.textFile("hdfs://...") errors = lines.filter(_.startsWith("ERROR ...
- Spark之RDD理论篇
Spark的基石RDD: RDD与MapReduce Spark的编程模型是弹性分布式数据集(Resilient Distributed Dataset,RDD),它是MapReduce的扩展和延申, ...
最新文章
- WPF的二维绘图(二)——几何图形Geometry
- 关于Kubernetes Dashboard漏洞CVE-2018-18264的修复公告
- Science发布2021年度十大科学突破榜单:除了AlphaFold2,还有哪些大丰收?
- JS Math.sin() 与 Math.cos()
- 炉石传说服务器维护有补偿吗,炉石传说官网维护补偿什么时候到 未到原因说明...
- 2021 考研 基本常识
- rfid射频前端的主要组成部分有_第4章 RFID的射频前端(simple).ppt
- android 微信支付,body为中文字符,签名错误
- Python面向对象,站在更高的角度来思考
- WHY数学表达式的3D可视化
- dubbo服务RpcException异常:Tried 3 times of the providers 或com.alibaba.dubbo.remoting.TimeoutException
- Python 异常处理
- 三星530换固态硬盘_笔记本电脑换固态硬盘+装系统
- 【计算机网络】网络安全 : 对称密钥分配 ( 密钥分配 | 密钥分配中心 KDC | 对称密钥分配 | 密钥分配协议 | Kerberos 协议 )
- 微信小程序--瀑布流
- android 获取路由器mac,android设备获取当前wifi下的路由器的mac和路由器的名称
- 160603、使用pd4ml.jar和ss_css2.jar转pdf的工具类
- 概率论05 - 随机变量及其分布函数
- 羊毛出在狗身上,猪来买单
- win10无法登录Microsoft账号(登录页面无法加载)
热门文章
- SpringMVC数据绑定与转换代码分析
- 并发控制:(三)MVCC 多版本并发控制
- 基于window-based模板的多View程序(转)
- 怀念 儿时课本贴图,你还记得课文名吗
- oracle你如何重置序列号,oracle sequence语句重置方介绍
- java xml 学习_java学习(四)xml
- C语言中 #pragma pack()
- matlab自适应逆控制,基于matlab仿真模块的自适应有源噪声逆控制研究
- pycharm 链接wsl和 wsl 配置cuda nvidia
- MFCC+DTW做声纹识别