首先,这三者都是做RDD持久化的,cache()和persist()是将数据默认缓存在内存中,checkpoint()是将数据做物理存储的(本地磁盘或Hdfs上),当然rdd.persist(StorageLevel.DISK_ONLY)也可以存储在磁盘 。

其次,缓存机制里的cache和persist都是用于将一个RDD进行缓存,区别就是:cache()是persisit()的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY)将数据持久化到内存中。如果需要从内存中清除缓存,那么可以使用unpersist()方法。cache () = persist()=persist(StorageLevel.Memory_Only)

另外,cache 跟 persist不会截断血缘关系,checkPoint会截断血缘关系。

cache()与persist()的区别

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()/*** Set this RDD's storage level to persist its values across operations after the first time* it is computed. This can only be used to assign a new storage level if the RDD does not* have a storage level set yet. Local checkpointing is an exception.*/
def persist(newLevel: StorageLevel): this.type = {if (isLocallyCheckpointed) {// This means the user previously called localCheckpoint(), which should have already// marked this RDD for persisting. Here we should override the old storage level with// one that is explicitly requested by the user (after adapting it to use disk).persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)} else {persist(newLevel, allowOverride = false)}
}/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

从以上源码可以看出,cache 底层调用的是 persist 方法,存储等级为: memory only,persist 的默认存储级别也是 memory only,persist 与 cache 的主要区别是 persist 可以自定义存储级别。如rdd.persist(StorageLevel.DISK_ONLY) ,就可以将数据存储在磁盘上。哪些 RDD 需要cache?会被重复使用的(但是)不能太大的RDD需要cache,cache 只使用 memory。

cache()与checkpoint()的区别

cache 和 checkpoint 之间有一个重大的区别,cache 将 RDD 以及 RDD 的血统(记录了这个RDD如何产生)缓存到内存中,当缓存的 RDD 失效的时候(如内存损坏),它们可以通过血统重新计算来进行恢复。但是 checkpoint 将 RDD 缓存到了 HDFS 中,同时忽略了它的血统(也就是RDD之前的那些依赖)。为什么要丢掉依赖?因为可以利用 HDFS 多副本特性保证容错!

/*** Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint* directory set with `SparkContext#setCheckpointDir` and all references to its parent* RDDs will be removed. This function must be called before any job has been* executed on this RDD. It is strongly recommended that this RDD is persisted in* memory, otherwise saving it on a file will require recomputation.*/
def checkpoint(): Unit = RDDCheckpointData.synchronized {// NOTE: we use a global lock here due to complexities downstream with ensuring// children RDD partitions point to the correct parent partitions. In the future// we should revisit this consideration.if (context.checkpointDir.isEmpty) {throw new SparkException("Checkpoint directory has not been set in the SparkContext")} else if (checkpointData.isEmpty) {checkpointData = Some(new ReliableRDDCheckpointData(this))}
}

从上面的源码中可以看出,在进行checkpoint之前要设置 sc.setCheckpointDir("...")。除此之外,在进行 checkpoint 前,要先对 RDD 进行 cache。Why??? checkpoint 会等到 job 结束后另外启动专门的 job 去完成 checkpoint,也就是说需要 checkpoint 的 RDD 会被计算两次。

persist()与checkpoint()的区别

persist(StorageLevel.DISK_Only) != checkPoint()

rdd.persist(StorageLevel.DISK_ONLY) 与 checkpoint 也有区别。前者虽然可以将 RDD 的 partition 持久化到磁盘,但该 partition 由 blockManager 管理。一旦 driver program 执行结束,也就是 executor 所在进程 CoarseGrainedExecutorBackend stop,blockManager 也会 stop,被 cache 到磁盘上的 RDD 也会被清空(整个 blockManager 使用的 local 文件夹被删除)。而 checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,如果不被手动 remove 掉,是一直存在的,也就是说可以被下一个 driver program 使用,而 cached RDD 不能被其他 dirver program 使用。

应用场景:

cache: 对于会被重复使用,但是数据量不是太大的RDD,可以将其cache()到内存当中。cache 是每计算出一个要 cache 的 partition 就直接将其 cache 到内存中。缓存完之后,可以在任务监控界面storage里面看到缓存的数据。

checkpoint:对于computing chain 计算链过长或依赖其他 RDD 很多的 RDD,就需要进行checkpoint,将其放入到HDFS或者本地文件夹当中。需要注意的是,checkpoint 需要等到job完成了,再启动专门的job去完成checkpoint 操作,因此RDD是被计算了两次的。一般使用的时候配合rdd.cache(),这样第二次就不用重新计算RDD了,直接读取 cache 写磁盘。

注意:rdd.persist(StorageLevel.DISK_ONLY) 与 checkpoint 也有区别,persist一旦程序执行结束,所有的缓存无论在内存还是磁盘都会被删掉。而 checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,如果不被手动 remove 掉,是一直存在的,也就是说可以被下一个 driver,program 使用,而 cached RDD 不能被其他 dirver program 使用。

spark的缓存级别参照【org.apache.spark.storage.StorageLevel.scala】
    在persist()中可以指定一个StorageLevel,当StorageLevel为MEMORY_ONLY时就是cache.

StorageLevel的列表可以在StorageLevel伴生单例对象中查看 :

 class StorageLevel private(private var _useDisk: Boolean,private var _useMemory: Boolean,private var _useOffHeap: Boolean,private var _deserialized: Boolean,private var _replication: Int = 1) //hdfs的副本是存3份,RDD默认的副本数是1extends Externalizablenew StorageLevel(_useDisk,_useMemory, _useOffHeap,_deserialized,_replication: Int = 1)val NONE = new StorageLevel(false, false, false, false)val DISK_ONLY = new StorageLevel(true, false, false, false)val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)val MEMORY_ONLY = new StorageLevel(false, true, false, true)val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
  • Memory_Only:只存于内存
  • Disk_Only:只存于磁盘
  • Memory_and_disk:内存不够,存于磁盘
  • Memory_only_ser:将数据以序列化的方式存于内存

默认缓存级别:def persist(): this.type = persist(StorageLevel.MEMORY_ONLY),
    默认情况下persist() 会把数据以反序列化的形式缓存在JVM的堆(heap)空间中。 
    取消缓存,执行RDD.unpersist()

Examples:

1.缓存cache()的使用

object Test01 {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local"))val rdd1 = sc.textFile("file:///D:\\IdeaProgram\\Bigdata\\file\\Spark\\wc\\input")//在这里设置一个cache缓存,只缓存结果,建议缓存完之后执行action,再去执行转换算子+执行算子val rdd2 = rdd1.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _).cache()//这里如果不设置缓存,每次执行完一个action以后再去执行下一个action的时候都要从头开始进行计算//如果设置了缓存,每次执行完一个action以后再去执行下一个action的时候直接从缓存为止读取数据rdd2.collect()rdd2.foreach(println(_))rdd2.take(2)}
}

2.persist()的使用

import org.apache.spark.storage._def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("DISK_ONLY")val sc = new  SparkContext(conf);sc.setLogLevel("ERROR")val a = sc.parallelize(1 to 9, 3).persist(StorageLevel.DISK_ONLY)println(a.first()) }

一旦 driver program 执行结束,也就是 executor 所在进程 CoarseGrainedExecutorBackend stop,blockManager也会stop;被缓存到磁盘上的RDD也会被清空(整个blockManager使用的local文件夹被删除)

3.checkpoint()的使用

checkpoint检查点机制
检查点(本质就是通过将RDD写入Disk做检查点)是为了通过lineage(血统)做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点问题而丢失分区,从做查点的RDD开始重做lineage,就会减少开销检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能.检查点,类似于快照,chekpoint的作⽤就是将DAG中⽐较重要的数据做⼀个检查点,将结果存储到⼀个⾼可⽤的地⽅
//检查点的使用
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SparkDemo").setMaster("local")val sc = new SparkContext(conf)//设置检查点的路径sc.setCheckpointDir("hdfs://hadoop01:8020/ck")
val rdd = sc.textFile("hdfs://hadoop01:8020/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//检查点的触发⼀定要使⽤个action算⼦rdd.checkpoint()rdd.saveAsTextFile("hdfs://hadoop01:8020/out10")println(rdd.getCheckpointFile) //查看存储的位置/**查看是否可以设置检查点 rdd.isCheckpointed 这个⽅法在shell中可以使⽤ 但是代码中不好⽤*/
}

4.实用心得:

缓存和检查点的区别
缓存把RDD计算出来的然后放在内存,但是RDD的依赖链不能丢掉,当某个exexutor宕机时,上面cache的RDD就会丢掉,需要通过依赖链放入重新计算,不同的是,checkpoint是把RDD保存在HDFS上,是多副本可靠存储,所以依赖链可以丢掉,就是斩断了依赖链,是通过复制实现的高容错cache和persist的比较
1.cache底层调用的是persist
2.cache默认持久化等级是内存且不能修改,persist可以修改持久化的等级
什么时候使用cache或checkpoint
1.某步骤计算特别耗时
2.计算链条特别长
3.发生shuffle之后
一般情况建议使用cache或是persist模式,因为不需要创建存储位置,默认存储到内存中计算速度快,而checkpoint需要手动创建存储位置和手动删除数据,若数据量非常庞大建议使用checkpointTask包括ResultTask(最后一个阶段的任务) + ShuffleMapTask(非最后一个阶段)

spark中的cache()、persist()和checkpoint()的区别相关推荐

  1. 数组 spark_结合实例理解Spark中的cache()

    cache()在Spark上的应用 前言:在上MSBD5003的时候,刚接触spark对laziness的特性已经cache()(persist())方法的理解不是很透彻.这里复现了一个问题并提供了两 ...

  2. Spark中RDD与DataFrame与DataSet的区别与联系

    1.概述 这是一个面试题 在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似传统数据库中的二维表格 DataFrame与RDD的主要区别在于,前者带有schema元数据信息,既 ...

  3. Spark中RDD、DataFrame和DataSet的区别与联系

    一.RDD.DataFrame和DataSet的定义 在开始Spark RDD与DataFrame与Dataset之间的比较之前,先让我们看一下Spark中的RDD,DataFrame和Dataset ...

  4. spark中的转换算子和行动算子区别(transformations and actions)

    算子(RDD Operations): 对于初学者来说,算子的概念比较抽象,算子可以直译为 "RDD的操作", 我们把它理解为RDD的方法即可 . 转换算子(transformat ...

  5. Spark中distinct、reduceByKey和groupByKey的区别与取舍

    1. 代码实例: a. val rdd = sc.makeRDD(Seq("aa", "bb", "cc", "aa", ...

  6. Spark中cache、persist、checkpoint区别

    spark中的cache.persist.checkpoint都可以将RDD保存起来,进行持久化操作,供后面重用或者容错处理.但是三者有所不同. cache 将数据临时存储在内存中进行数据重用,不够安 ...

  7. Spark中CheckPoint、Cache、Persist的用法、区别

    Spark中CheckPoint.Cache.Persist 大家好,我是一拳就能打爆A柱的猛男 这几天看到一套视频<尚硅谷2021迎新版大数据Spark从入门到精通>,其中有关于检查点( ...

  8. spark中stage的划分与宽依赖/窄依赖(转载+自己理解/整理)

    [1]宽依赖和窄依赖,这是Spark计算引擎划分Stage的根源所在,遇到宽依赖,则划分为多个stage,针对每个Stage,提交一个TaskSet: 上图:一张网上的图: (个人笔记,rdd中有多个 ...

  9. Spark中foreachRDD、foreachPartition和foreach解读

    精选30+云产品,助力企业轻松上云!>>> 点击蓝色"大数据每日哔哔"关注我 加个"星标",第一时间获取大数据架构,实战经验 区别 最近有不少 ...

最新文章

  1. iframe 自动适应高和宽问题 和 其他Frame操作技巧
  2. SQL Server系统表sysobjects介绍与使用
  3. mysql io_MySQL服务器 IO 100%的分析与优化方案
  4. 对话框响应WM_KEYDOWN消息
  5. thinkphp5.0如何隐藏index.php入口文件
  6. python语言简介
  7. 使用python简单免费转换视频格式
  8. c语言常量定义的数组初始化
  9. matlab 求切平面,求二次曲面的切平面的简便方法
  10. 有趣!用太极拳讲分布式理论,真舒服!
  11. 诗画丽水 文化传承 萌娃上演宋韵国风非遗主题秀
  12. 芯片供应最难的居然是TI,交期拉长
  13. Java将查询到的List,list集合还嵌套一个list集合(把这个list集合和嵌套的list集合合并为一个list集合)
  14. Maven跳过单元测试配置
  15. 关于网卡eth0、eth1以及服务器为什么要把内网和外网卡区分开
  16. java和工程造价_(   )不是Java的开发工具。
  17. 开源聚合路由 OpenMPTCProuter 配置使用
  18. Python语言的简介(语言特点/pyc介绍/Python版本语言兼容问题(python2 VS Python3))、安装、学习路线(数据分析/机器学习/网页爬等编程案例分析)之详细攻略
  19. 简易版“美颜”来了!肝了一夜!用Python做一个高瘦脸神器!
  20. 施努卡:机器视觉公司有什么(国内机器视觉公司)

热门文章

  1. 双软企业认定(软件产品/软件企业认定)
  2. 财政部、税务总局:集成电路设计和软件企业免两年所得税
  3. 多亏了这个神器,让我斩获华为入场券
  4. H5学习之旅-H5的基本标签(2)
  5. 企业固定资产管理系统建设方案
  6. VMware精简系统Win系列|体积更小更稳定
  7. flstudio软件怎么设置中文语言切换?
  8. c语言实现动态二维数组
  9. 关于create-react-app搭建react环境并修改端口号
  10. Python正则表达式(regular expression)简介-re模块