RDD 缓存

级别 说明
MEMORY_ONLY 系统默认 数据缓存到内存中
MEMORY_AND_DISK 优先存储在内存中,当不适合存储在内存中时,会启用磁盘存储
MEMORY_ONLY_SER 和MEMORY_ONLY都是存储在内存中,不同的MEMORY_ONLY_SER存储的是java 对象,MEMORY_ONLY存储的是反序列的对象
MEMORY_AND_DISK_SER 和MEMORY_AND_DISK相同的是存储模式,不同的是存储的结构
DISK_ONLY DISK_ONLY将数据存在磁盘中

persist

persist() 源码
  /*** Persist this RDD with the default storage level (`MEMORY_ONLY`).*/def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
persist(newLevel: StorageLevel) 源码
  /*** Set this RDD's storage level to persist its values across operations after the first time* it is computed. This can only be used to assign a new storage level if the RDD does not* have a storage level set yet. Local checkpointing is an exception.*/def persist(newLevel: StorageLevel): this.type = {if (isLocallyCheckpointed) {// This means the user previously called localCheckpoint(), which should have already// marked this RDD for persisting. Here we should override the old storage level with// one that is explicitly requested by the user (after adapting it to use disk).persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)} else {persist(newLevel, allowOverride = false)}}
StorageLevel 源码

/*** Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating* new storage levels.*/
object StorageLevel {val NONE = new StorageLevel(false, false, false, false)val DISK_ONLY = new StorageLevel(true, false, false, false)val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)val MEMORY_ONLY = new StorageLevel(false, true, false, true)val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
eg:
    val a = sc.parallelize(1 to 100)a.cache()a.persist()a.persist(StorageLevel.MEMORY_ONLY)

cache 本质是 persist


  /*** Persist this RDD with the default storage level (`MEMORY_ONLY`).*/def cache(): this.type = persist()

检查点 checkpoint 容错机制

checkpoint(检查点)容错是对lineage血统容错的辅助,lineage过长时,造成容错成本过高,当检查点后的任务丢失分区时,可以从检查点处的RDD重新做lineage,可以减少开销。官方建议:在此RDD上执行任何作业之前,必须调用此函数。将此RDD保存在内存中。

源码
  /*** Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint* directory set with `SparkContext#setCheckpointDir` and all references to its parent* RDDs will be removed. This function must be called before any job has been* executed on this RDD. It is strongly recommended that this RDD is persisted in* memory, otherwise saving it on a file will require recomputation.*/def checkpoint(): Unit = RDDCheckpointData.synchronized {// NOTE: we use a global lock here due to complexities downstream with ensuring// children RDD partitions point to the correct parent partitions. In the future// we should revisit this consideration.if (context.checkpointDir.isEmpty) {throw new SparkException("Checkpoint directory has not been set in the SparkContext")} else if (checkpointData.isEmpty) {checkpointData = Some(new ReliableRDDCheckpointData(this))}}

eg:

    val a = sc.parallelize(1 to 100)sc.setCheckpointDir("hdfs://192.168.72.2:8020/checkpoint/20190521")a.persist(StorageLevel.MEMORY_ONLY)a.checkpoint()

广播变量 broadcast

在对于一些共享数据集时,broadcast变量为每台机器缓存一份数据而不是在每个task上缓存一份数据从而减少资源开销。使用场景:大表 join 小表时,将小表的数据向每台机器分发一份数据。
源码
  /*** Broadcast a read-only variable to the cluster, returning a* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.* The variable will be sent to each cluster only once.** @param value value to broadcast to the Spark nodes* @return `Broadcast` object, a read-only variable cached on each machine*/def broadcast[T: ClassTag](value: T): Broadcast[T] = {assertNotStopped()require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),"Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")val bc = env.broadcastManager.newBroadcast[T](value, isLocal)val callSite = getCallSitelogInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)cleaner.foreach(_.registerBroadcastForCleanup(bc))bc}

eg:

假设一个地区有10万个免费WIFI,100万个用户,现在有10亿条连接信息
10万个免费WIFI组成一个 A 表: WIFI_ID , POS
10亿条连接信息组成一个 B 表 : USER_ID,WIFI_ID,TIME,MESSAGE
100万个用户组成一个 C 表: USER_ID , USER_NAME
现在要跑出一张结果表 D :USER_NAME,POS,TIME,MESSAGE
    val A = sc.textFile("hdfs://192.168.72.2:8020/wifi").map(line => {val fields = line.split("\\|")val wifi = fields(0)val pos = fields(1)(wifi, pos)})val B = sc.textFile("hdfs://192.168.72.2:8020/connectionInfo").map(line => {val fields = line.split("\\|")val userID = fields(0)val wifiID = fields(1)val time = fields(2)val message = fields(3)(userID, wifiID, time, message)})val C = sc.textFile("hdfs://192.168.72.2:8020/user").map(line => {val fields = line.split("\\|")val userID = fields(0)val userName = fields(1)(userID, userName)})//生成不可变的集合,广播到task中去val wifiPosBroakcast = A.collect()val wifiPos = sc.broadcast(wifiPosBroakcast)val userBroakcast = C.collect()val user = sc.broadcast(userBroakcast)def mapPartitionFunc(iter: Iterator[(String, String, String, String)]): Iterator[(String, String, String, String)] = {var result = ListBuffer[(String, String, String, String)]()val wifis = wifiPos.valueval users = user.value// result = iter.join(wifis).join(users) result.iterator}B.mapPartitions(mapPartitionFunc)

Accumulator 累加器

Accumulator:创建并注册一个累加器,该累加器从0开始,通过“add”累积输入。通过“value”获得累加器的值。

源码

  /*** Create and register a long accumulator, which starts with 0 and accumulates inputs by `add`.*/def longAccumulator(name: String): LongAccumulator = {val acc = new LongAccumulatorregister(acc, name)acc}override def add(v: T): Unit = _value = param.addAccumulator(_value, v)override def value: jl.Long = _sum
eg;
al accum = sc.longAccumulator("My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
accum.value //10

南有乔木,不可休思。汉有游女,不可求思。汉之广矣,不可泳思。江之永矣,不可方思。
翘翘错薪,言刈其楚。之子于归,言秣其马。汉之广矣,不可泳思。江之永矣,不可方思。
翘翘错薪,言刈其蒌。之子于归。言秣其驹。汉之广矣,不可泳思。江之永矣,不可方思。

RDD -- 其他操作相关推荐

  1. Spark 的键值对(pair RDD)操作,Scala实现

    一:什么是Pair RDD? Spark为包含键值对对类型的RDD提供了一些专有操作,这些操作就被称为Pair RDD,Pair RDD是很多程序的构成要素,因为它们提供了并行操作对各个键或跨节点重新 ...

  2. “戏”说Spark-Spark核心-RDD转换操作算子详解(一)

    "戏"说Spark-Spark核心-RDD转换行动类算子详解 算子概述 对于RDD可以有两种计算方式: 转换(返回值还是一个RDD)---懒执行 操作(返回值不是一个RDD)--- ...

  3. [scala-spark]10. RDD转换操作

    RDD提供了一组非常丰富的操作来操作数据,如:map,flatMap,filter等转换操作,以及SaveAsTextFile,conutByKey等行动操作.这里仅仅综述了转换操作. map map ...

  4. [scala-spark]12. RDD行动操作

    first first返回RDD中的第一个元素,不排序. scala> var rdd1 = sc.makeRDD(Array(("A","1"),(&q ...

  5. [scala-spark]9. RDD创建操作

    1. 从集合创建RDD parallelize def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implici ...

  6. Spark RDD创建操作

    从集合创建RDD parallelize def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit a ...

  7. Spark算子:RDD创建操作

    关键字:Spark RDD 创建.parallelize.makeRDD.textFile.hadoopFile.hadoopRDD.newAPIHadoopFile.newAPIHadoopRDD ...

  8. 2021年大数据Spark(十四):Spark Core的RDD操作

    目录 RDD的操作 函数(算子)分类 Transformation函数 ​​​​​​​Action函数 RDD的操作 有一定开发经验的读者应该都使用过多线程,利用多核 CPU 的并行能力来加快运算速率 ...

  9. spark RDD操作的底层实现原理

    RDD操作闭包外部变量原则 RDD相关操作都需要传入自定义闭包函数(closure),如果这个函数需要访问外部变量,那么需要遵循一定的规则,否则会抛出运行时异常.闭包函数传入到节点时,需要经过下面的步 ...

最新文章

  1. [密码学] 双重与三重DES
  2. mysql查询大于等于效率_mysql 中4 OR 6的效率要大于 !=5?
  3. php mssql生僻字,php生僻字的處理,插入數據庫被截斷,空白,亂碼解決辦法,php冷僻字處理...
  4. nyoj985带通配符的数
  5. 【问链财经-区块链基础知识系列】 第十八课 区块链应用于贷款结算
  6. 2019-06-12 Java学习日记之JDBC
  7. JFreeChart(五)之XY图
  8. JavaFX UI控件教程(六)之Toggle Button
  9. matlab 赋空值,未对输出参数赋值 求大神帮忙解惑
  10. centos下valgrind的安装与使用
  11. 【数据结构-链表】malloc函数头文件
  12. Oracle 计算时间差
  13. 杰理之RX传导杂散【篇】
  14. ..以前似乎用作函数或命令,与其在此处作为变量名称的用法冲突?
  15. java集合比较大小_arraylist 怎么比较元素大小?
  16. layui 改写 table 排序,填加中文按照拼音排序
  17. angularJs 页面筛选标签小功能
  18. java中Date计算时间差
  19. 微信小程序——点击文字出现编辑区域修改该文字
  20. 禁忌搜索算法TS(学习笔记_04)

热门文章

  1. 项目动态|Apache Pulsar 2.10.0 版本介绍
  2. html完整语句,html语句
  3. mac加密_如何加密Mac的Time Machine备份
  4. 天下无贼是假的,天下无票倒是真的;如来神掌是假功夫,能买到车票才是真功夫。
  5. 他本硕博连跨3大专业,毕业后没多久被破格聘为985高校教授!
  6. 《环太平洋》(pacific rim)观后感
  7. ff6全部青魔法获得(不走兽之原)
  8. Leetcode 2233. Maximum Product After K Increments
  9. PN结的空间电荷区与耗尽区
  10. 林子雨教程中sqoop安装过程时区问题解决