1、持久化

RDD的持久化包括两个方面:①操作RDD的时候怎么保存结果,这个部分属于action算子的部分②在实现算法的时候要进行cache、persist,还有checkpoint进行持久化。

1.1 persist和cache

Spark稍微复杂一点的算法里面都会有persit的身影,因为spark默认情况下是放在内存中,比较适合高速的迭代,如一个Stage有步骤非常多,中间不会产生临时数据,对于高速迭代是非常好的事情,但是对于分布式文件系统风险非常高,容易出错,这个时候就涉及到容错,由于RDD有血统继承关系,后面的RDD如果数据分片出错或者RDD本身出错之后可以根据前面的依赖血统关系算出来,但是如果没有对父RDD进行persist或cache还是要从头开始做。

首先先看下StorageLevel类,里面设置了RDD的各种缓存级别,总共有12种,其实是它的多个构造参数的组合形成的。

cache源码如下:

由源码可知,cache方法实际上调用了无参数的persist方法,缓存级别为仅在内存中。

persist源码如下:

无参的persist方法,默认缓存级别为仅在内存中

其实有2种情况:①是我们之前对RDD调用了checkpoint方法,这个方法是把RDD存储到disk上,之后我们再调用persist(newLevel)方法也是不会报错的,他会做检查你是否执行过checkpoint方法(即isLocallyCheckpointed),如果是的话就会调用persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true),而这里LocalRDDCheckpointData.transformStorageLevel(newLevel)返回的缓存级别是disk级别,故不会报错②如果我们之前设置过RDD的缓存级别,现在再次调用此方法进行缓存级别设置,但是缓存级别与之前一样,程序也是不会报错的,因为里面调用了persist(newLevel, allowOverride = false)方法

这个persist方法,适用于之前我们设置过了RDD的缓存级别,现在想要修改RDD的缓存级别的情况,只需要把allowOverride设置为true

这一段程序也解释了上面第一种方法的第一个特殊情况为什么不会报错

总结:①cache方法其实是persist方法的一个特例:调用的是无参数的persist(),代表缓存级别是仅内存的情况②persist方法有三种,分为默认无参数仅内存级别的persist(),还有persist(newLevel):这个方法需要之前对RDD没有设置过缓存级别,persist(newLevel,allowOverride):这个方法适用于之前对RDD设置过缓存级别,但是想更改缓存级别的情况。③取消缓存统一使用unpersist()方法④persist是lazy级别的(前面的算子都是lazy的每执行,所以他肯定也要是lazy级别的),unpersist是eager级别的(即调用的时候会立即清除)

注意:①cache之后一定不能立即有其他算子,cache后有算子的话,它每次都会重新触发这个计算过程,cache不是一个action;②cache被gc清除的两种方式:unpersist强制销毁数据;会被后续的计算结果挤掉

缓存实现的原理:DiskStore磁盘存储和MemoryStore内存存储

DiskStore磁盘存储:spark会在磁盘上创建spark文件夹,命名为(spark-local-x年x月x日时分秒-随机数),block块都会存在这里,然后把block id映射成相应的文件路径,就可以存取文件了

MemoryStore内存存储:使用hashmap管理block就行了,block id作为key,MemoryEntry为value

1.2 做缓存的时机

(1)计算特别耗时

(2)计算链条很长,失败的时候会有很大的代价:假设900个步骤在第800个步骤缓存,801的步骤失败了就会在800个步骤开始恢复;

(3)shuffle之后:shuffle是进行分发数据,缓存之后假设后面失败就不需要重新shuffle;

(4)checkpoint之前:checkpoint是把整个数据放到分布是文件系统中或磁盘,checkpoint是在当前作业执行之后,再触发一个作业,恢复时前面的步骤就不需要计算

缓存是不一定可靠的,缓存在内存中不一定是可靠的,把数据缓存在内存中有可能会丢失,例如只缓存在内存中,而不同时放在内存和磁盘上,可能内存crash(奔溃),crash内存现在有一种办法就是用Tachyon做底层存储,但是使用checkpoint的数数据一定放在文件系统上,这个时候数据就不会丢失。假设缓存了100万个数据分片,开始缓存是成功的,由于内存的紧张在一些机器上把一些数据分片清理掉了,那这时候就需要重新计

(5)shuffle之前persist(不过框架已经默认帮我们把数据持久化到本地磁盘)

//cache实例
val cached=sc.textFile("G:\\Scala\\data\\README.md").flatMap(_.split(" ")).map(_=>(_,1)).reduceByKey(_+_,1).cache
cached.count

广播BroadCast

为什么需要广播?每个task运行,读取全局数据的时候每个task每次都要拷贝一个数据副本,他的好处就是状态一致性,不好的就是耗大量的内存,变量大就容易OOM,这种是非常严重的,必须全部放在内存上,不能一部分放磁盘上。

广播过去的全局只读不能修改的,广播到worker的executor内存中。广播变量不需要销毁,应用程序存在他就存在,sc销毁它也就销毁了。

总结:广播是由Driver发送给当前Application分配的所有Executor内存级别的全局只读变量,Executor中的线程池中的线程共享该全局变量,极大减少网络传输(否则每个task都要传输一次该变量),并极大的节省内存,减少OOM的可能,当然也隐形的提高CPU的有效工作(因为每次传CPU也很忙)

//广播实例:
val number =6
val broadcastVar = sc.broadcast(number ) //广播只能由broadcast广播
val data=sc.parallelize(1 to 10)//创建RDD,由Task使用广播变量
val bn=data.map(_*broadcastVar .value)

3 累加器Accumulator

为什么需要累加器?累加器的特征:是全局级别的,且Executor中的task只能修改(增加内容),只有driver可读,因为我们通过driver控制整个集群有必要知道整个集群的状态。(对于Executor只能修改但不可读,只对driver可读)。Executor修改一定不会彼此覆盖相当于加锁了。

因为他的特性,在记录集群状态的时候,尤其是全局唯一状态的时候至关重要,可以保存唯一的全局变量。

累加器原理:由于被driver控制,在实际task运行的时候,每次都可以保证只对driver可读获取全局唯一的状态对象

总结:①累加器是全局唯一的,每次操作只增不减②在executor中只能修改他,也就是只能增加他的值。

可以认为Broadcast是线程级别全局共享,累加器是executor全局共享

//累加器实例:
val acc = sc.accumulator(0)
val data=sc.parallelize(1 to 10)
val result=data.foreach(item=>acc +=item)
println(result)

RDD持久化、广播、累加器相关推荐

  1. Spark RDD概念学习系列之rdd持久化、广播、累加器(十八)

    1.rdd持久化 2.广播 3.累加器 1.rdd持久化 通过spark-shell,可以快速的验证我们的想法和操作! 启动hdfs集群 spark@SparkSingleNode:/usr/loca ...

  2. PySpark | RDD持久化 | 共享变量 | Spark内核调度

    文章目录 一.RDD持久化 1.RDD的数据是过程数据 2.RDD缓存 2.1 RDD缓存的特点 2.2 cache()与unpersist()实战 3.RDD CheckPoint 3.1 Chec ...

  3. 2021年大数据Spark(十七):Spark Core的RDD持久化

    目录 RDD 持久化 引入 API 缓存/持久化函数 缓存/持久化级别 释放缓存/持久化 代码演示 总结:何时使用缓存/持久化 RDD 持久化 引入 在实际开发中某些RDD的计算或转换可能会比较耗费时 ...

  4. Spark的RDD持久化

    RDD持久化 1. RDD Cache 缓存 说明 RDD 通过Cache 或者Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM 的堆内存中.但是并不是这两个方法被调用时立即 ...

  5. 谈一谈RDD 持久化的三个算子:cache、persist、checkpoint

    这段伪代码的瑕疵: lines = sc.textFile("hdfs://...") errors = lines.filter(_.startsWith("ERROR ...

  6. Spark基础学习笔记20:RDD持久化、存储级别与缓存

    文章目录 零.本讲学习目标 一.RDD持久化 (一)引入持久化的必要性 (二)案例演示持久化操作 1.RDD的依赖关系图 2.不采用持久化操作 3.采用持久化操作 二.存储级别 (一)持久化方法的参数 ...

  7. RDD持久化(缓存)

    Spark RDD 是惰性求值的,而有时我们希望能多次使用同一个 RDD.如果简单地对 RDD 调用行动操作,Spark 每次都会重算 RDD 以及它的所有依赖.这在迭代算法中消耗格外大,因为迭代算法 ...

  8. 【Spark分布式内存计算框架——Spark Core】6. RDD 持久化

    3.6 RDD 持久化 在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高 ...

  9. Spark性能优化:对RDD持久化或CheckPoint操作

    对多次使用的RDD进行持久化或Checkpoint 如果程序中,对某一个RDD,基于它进行了多次transformation或者action操作.那么就非常有必要对其进行持久化操作,以避免对一个RDD ...

最新文章

  1. vue element ui 滚动条
  2. 路由器DHCP,单臂路由配置
  3. 文本显示变量_无代码软件开发文本字符显示设计
  4. luogu P1046 陶陶摘苹果
  5. 算法导论 思考题4-1
  6. Rust 2018临近:设法从Rust 2015过渡
  7. pandas 作图 统计_Pandas数据可视化工具——Seaborn用法整理(下)
  8. 汇编语言学习资料汇总
  9. Intel CPU 官方的温度监控软件 - Intel Power Gadget Options
  10. 类型 异常报告 消息 null 描述 服务器遇到一个意外的情况,阻止它完成请求。 例外情况 java.lang.NumberFormatException: null java.base/
  11. TZT1306标准模拟应变源
  12. Python实用案例,Python脚本实现玩转emoji,我微又“偷偷”更新这个表情!
  13. 实现自己的音乐搜索软件(一)
  14. (Python语法篇)4.序列(3)字典
  15. 网页报错404原因及解决方法
  16. iframe 、frameset 、frame的区别
  17. SQL 子查询怎么优化?写的很深的这种!
  18. javascript中change事件的用法
  19. 韩钰带你走进电商世界之淘宝网店卖家快捷短语大全!
  20. 基于Javaweb的商铺租赁管理系统/租赁管理系统

热门文章

  1. layui 在springboot2.x 时,页面展示不了layui的问题
  2. BootStrap 用法
  3. /etc/fstab 参数详解及如何设置开机自动挂载
  4. 前端模块化(二):模块化编程
  5. cf 621E. Wet Shark and Blocks
  6. SQL Server中行列转换 Pivot UnPivot
  7. 学习:Web安装项目创建桌面快捷方式及重写安装类(转)
  8. multi-line comment In file
  9. mysql3.5 所有表_mysql学习笔记3.5
  10. java面向过程编程怎么写_从面向过程到面向对象,我的编程之路