RDD(Resilient Distributed Datasets,弹性分布式数据集)代表可并行操作元素的不可变分区集合。对于Spark的初学者来说,这个概念会十分陌生。即便是对于一些有Spark使用经验的人,要想说清楚什么是RDD,以及为什么需要RDD还是一件比较困难的事情。在《深入理解Spark RDD——为什么需要RDD?》一文解释了第二个问题,本文将开启对第一个问题的解答。

有些读者可能对本文的标题感到困惑,这是因为RDD的API非常多,所以本文首先对RDD中与调度系统息息相关的API方法进行分析,转换API、动作API及检查点API将在后续文章中进行介绍。

抽象类RDD定义了所有RDD的规范,我们从RDD的属性开始,逐步了解RDD的实现。

  • _sc:即SparkContext。_sc由@transient修饰,所以此属性不会被序列化。
  • deps:构造器参数之一,是Dependency的序列,用于存储当前RDD的依赖。RDD的子类在实现时不一定会传递此参数。由于deps由@transient修饰,所以此属性不会被序列化。
  • partitioner:当前RDD的分区计算器。partitioner由@transient修饰,所以此属性不会被序列化。
  • id:当前RDD的唯一身份标识。此属性通过调用SparkContext的nextRddId属性生成。
  • name:RDD的名称。name由@transient修饰,所以此属性不会被序列化。
  • dependencies_:与deps相同,但是可以被序列化。
  • partitions_:存储当前RDD的所有分区的数组。partitions_由@transient修饰,所以此属性不会被序列化。
  • storageLevel:当前RDD的存储级别。
  • creationSite:创建当前RDD的用户代码。creationSite由@transient修饰,所以此属性不会被序列化。
  • scope:当前RDD的操作作用域。scope由@transient修饰,所以此属性不会被序列化。
  • checkpointData:当前RDD的检查点数据。
  • checkpointAllMarkedAncestors:是否对所有标记了需要保存检查点的祖先保存检查点。
  • doCheckpointCalled:是否已经调用了doCheckpoint方法设置检查点。此属性可以阻止对RDD多次设置检查点。

RDD采用了模板方法模式的设计,抽象类RDD中定义了模板方法以及一些未实现的接口,这些接口将需要RDD的各个子类分别实现。下面先来介绍RDD中定义的接口。

  • compute:对RDD的分区进行计算。此方法的定义如下:
  @DeveloperApidef compute(split: Partition, context: TaskContext): Iterator[T]
  • getPartitions:获取当前RDD的所有分区。此方法的定义如下:
  protected def getPartitions: Array[Partition]
  • getDependencies:获取当前RDD的所有依赖。此方法的定义如下:
  protected def getDependencies: Seq[Dependency[_]] = deps
  • getPreferredLocations:获取某一分区的偏好位置。此方法的定义如下:
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

RDD中除定义了以上接口外,还实现了一些模板方法。

partitions

partitions方法(见代码清单1)用于获取RDD的分区数组。

代码清单1       partitions的实现

  final def partitions: Array[Partition] = {checkpointRDD.map(_.partitions).getOrElse {if (partitions_ == null) {partitions_ = getPartitions// 省略次要代码}partitions_}}

根据代码清单1,partitions方法查找分区数组的优先级为:从CheckPoint查找 > 读取partitions_属性 > 调用getPartitions方法获取。检查点的内容将在10.3节详细介绍。

preferredLocations

preferredLocations方法(见代码清单2)优先调用CheckPoint中保存的RDD的getPreferredLocations方法获取指定分区的偏好位置,当没有保存CheckPoint时调用自身的getPreferredLocations方法获取指定分区的偏好位置。

代码清单2       preferredLocations的实现

  final def preferredLocations(split: Partition): Seq[String] = {checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {getPreferredLocations(split)}}

dependencies

dependencies方法(见代码清单3)用于获取当前RDD的所有依赖的序列。

代码清单3        dependencies的实现

  final def dependencies: Seq[Dependency[_]] = {checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {if (dependencies_ == null) {dependencies_ = getDependencies}dependencies_}}

根据代码清单3,dependencies方法的执行步骤如下。

  1. 从CheckPoint中获取RDD并将这些RDD封装为OneToOneDependency列表。如果从CheckPoint中获取到RDD的依赖,则返回RDD的依赖,否则进入下一步。
  2. 如果dependencies_等于null,那么调用子类实现的getDependencies方法获取当前RDD的依赖后赋予dependencies,最后返回dependencies_。

其他方法

除了以上的模板方法,RDD中还实现的方法如下:

(1)context

context方法(见代码清单4)实际返回了_sc(即SparkContext)。

代码清单4        context的实现

  def context: SparkContext = sc

(2)getStorageLevel

getStorageLevel方法(见代码清单5)实际返回了当前RDD的StorageLevel。

代码清单5        获取当前RDD的存储级别

  def getStorageLevel: StorageLevel = storageLevel

(3)getNarrowAncestors

getNarrowAncestors方法(见代码清单6)用于获取当前RDD的祖先依赖中属于窄依赖的RDD序列。

代码清单6        getNarrowAncestors的实现

  private[spark] def getNarrowAncestors: Seq[RDD[_]] = {val ancestors = new mutable.HashSet[RDD[_]]def visit(rdd: RDD[_]) {val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]])val narrowParents = narrowDependencies.map(_.rdd)val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains)narrowParentsNotVisited.foreach { parent =>ancestors.add(parent)visit(parent)}}visit(this)ancestors.filterNot(_ == this).toSeq}

如要继续了解RDD,请继续阅读《深入理解Spark RDD——RDD依赖(构建DAG的关键)》。

深入理解Spark RDD系列文章:

《深入理解Spark RDD——为什么需要RDD?》

《深入理解Spark RDD——RDD实现的初次分析》

《深入理解Spark RDD——RDD依赖(构建DAG的关键)》

《深入理解Spark RDD——RDD分区计算器Partitioner》

《深入理解Spark RDD——RDD信息对象》

深入理解Spark RDD——RDD实现的初次分析相关推荐

  1. 理解Spark中RDD(Resilient Distributed Dataset)

    文章目录 1 RDD 基础 1.1 分区 1.2 不可变 1.3 并行执行 2 RDD 结构 2.1 SparkContext.SparkConf 2.2 Partitioner 2.3 Depend ...

  2. 深入理解Spark RDD——RDD分区计算器Partitioner

    在<深入理解Spark RDD--RDD依赖(构建DAG的关键)>一文,详细描述了RDD的宽窄依赖.RDD之间的依赖关系如果是Shuffle依赖,那么上游RDD该如何确定每个分区的输出将交 ...

  3. 理解Spark的核心RDD

    与许多专有的大数据处理平台不同,Spark建立在统一抽象的RDD之上,使得它可以以基本一致的方式应对不同的大数据处理场景,包括MapReduce,Streaming,SQL,Machine Learn ...

  4. 深入理解Spark 2.1 Core (一):RDD的原理与源码分析

    摘要 本文提出了分布式内存抽象的概念--弹性分布式数据集(RDD,Resilient Distributed Datasets),它具备像MapReduce等数据流模型的容错特性,并且允许开发人员在大 ...

  5. 深入理解Spark RDD——RDD信息对象

    RDDInfo用于描述RDD的信息,RDDInfo提供的信息如下: id:RDD的id. name:RDD的名称. numPartitions:RDD的分区数量. storageLevel:RDD的存 ...

  6. Spark的RDD检查点实现分析

    版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/beliefer/article/details/51206980 概述 在<深入理解Spark ...

  7. Spark的RDD概要DAG概述

    Spark的RDD概要&DAG概述 1.RDD: 弹性分布式数据集(相当于集合) 2.RDD对应HDFS关系图 3.DAG概要 1.RDD: 弹性分布式数据集(相当于集合) 弹性:RDD的数据 ...

  8. Spark:RDD编程总结(概述、算子、分区、共享变量)

    目录 1.RDD概述 1.1.RDD是什么 1.2.RDD的弹性 1.3.RDD的特点 1.3.1.分区 1.3.2.只读 1.3.3.依赖 1.3.4.缓存 1.3.5.检查点 2.RDD编程 2. ...

  9. spark的RDD中的action(执行)和transformation(转换)两种操作中常见函数介绍

    参考文章:spark的RDD中的action(执行)和transformation(转换)两种操作中常见函数介绍 spark常见的RDD 1. 函数概览 2. 常见的Transformations 操 ...

最新文章

  1. 阿里首次公开量子通信技术 为20年后做准备
  2. 【剑指offer-Java版】37两个链表的第一个公共结点
  3. SDN控制器OpenDaylight简介—VeCloud
  4. Docker之docker简介及其优势
  5. 黑盒之嵌入式操作系统鲁棒性研究
  6. 脚艺人什么意思,网络上脚艺人什么梗,变身脚艺人意思
  7. 开源管理系统OSSIM设置 语言为中文简体
  8. 力扣每日一刷--验证回文串
  9. vue属性绑定加载图片不成功
  10. easyui图标对照
  11. 《21天学通C语言》总结(1)
  12. python爬虫实例(一) b站篇
  13. extremecomponents 配置
  14. python nlpir_NLPIR(ICTCLAS 2013)分词工具Python封装
  15. 敏捷项目管理术语大全
  16. Allegro静态铜皮避让问题
  17. Mac自定义触控栏 Touch Bar的显示教程
  18. CAPTCHA(验证码)的来源与作用
  19. .net5 开启Lucene的全文搜索之旅
  20. 关于Linux的介绍与安装

热门文章

  1. java高并发递增编号_java 高并发 订单编号递增(解决方案)
  2. 2021年高处作业安装拆除维护证考试题库及高处安装、维护、拆除试题解析
  3. 6款提升办公效率的超牛软件,从此告别加班
  4. npm报错Failed at the node-sass@4.14.1 postinstall script
  5. win10家庭版安装VMWare和Ubuntu 20.04 LTS
  6. JS简写技巧(来自于微信公众号-前端之巅)
  7. 联想R720安装原装内存条图文过程
  8. jQuery框架介绍-简化js
  9. kotlin常用语法扫盲及开发注意点,勿错失(持续更新)
  10. 为什么大家都喜欢买白色的汽车