深入理解Spark RDD——RDD实现的初次分析
RDD(Resilient Distributed Datasets,弹性分布式数据集)代表可并行操作元素的不可变分区集合。对于Spark的初学者来说,这个概念会十分陌生。即便是对于一些有Spark使用经验的人,要想说清楚什么是RDD,以及为什么需要RDD还是一件比较困难的事情。在《深入理解Spark RDD——为什么需要RDD?》一文解释了第二个问题,本文将开启对第一个问题的解答。
有些读者可能对本文的标题感到困惑,这是因为RDD的API非常多,所以本文首先对RDD中与调度系统息息相关的API方法进行分析,转换API、动作API及检查点API将在后续文章中进行介绍。
抽象类RDD定义了所有RDD的规范,我们从RDD的属性开始,逐步了解RDD的实现。
- _sc:即SparkContext。_sc由@transient修饰,所以此属性不会被序列化。
- deps:构造器参数之一,是Dependency的序列,用于存储当前RDD的依赖。RDD的子类在实现时不一定会传递此参数。由于deps由@transient修饰,所以此属性不会被序列化。
- partitioner:当前RDD的分区计算器。partitioner由@transient修饰,所以此属性不会被序列化。
- id:当前RDD的唯一身份标识。此属性通过调用SparkContext的nextRddId属性生成。
- name:RDD的名称。name由@transient修饰,所以此属性不会被序列化。
- dependencies_:与deps相同,但是可以被序列化。
- partitions_:存储当前RDD的所有分区的数组。partitions_由@transient修饰,所以此属性不会被序列化。
- storageLevel:当前RDD的存储级别。
- creationSite:创建当前RDD的用户代码。creationSite由@transient修饰,所以此属性不会被序列化。
- scope:当前RDD的操作作用域。scope由@transient修饰,所以此属性不会被序列化。
- checkpointData:当前RDD的检查点数据。
- checkpointAllMarkedAncestors:是否对所有标记了需要保存检查点的祖先保存检查点。
- doCheckpointCalled:是否已经调用了doCheckpoint方法设置检查点。此属性可以阻止对RDD多次设置检查点。
RDD采用了模板方法模式的设计,抽象类RDD中定义了模板方法以及一些未实现的接口,这些接口将需要RDD的各个子类分别实现。下面先来介绍RDD中定义的接口。
- compute:对RDD的分区进行计算。此方法的定义如下:
@DeveloperApidef compute(split: Partition, context: TaskContext): Iterator[T]
- getPartitions:获取当前RDD的所有分区。此方法的定义如下:
protected def getPartitions: Array[Partition]
- getDependencies:获取当前RDD的所有依赖。此方法的定义如下:
protected def getDependencies: Seq[Dependency[_]] = deps
- getPreferredLocations:获取某一分区的偏好位置。此方法的定义如下:
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
RDD中除定义了以上接口外,还实现了一些模板方法。
partitions
partitions方法(见代码清单1)用于获取RDD的分区数组。
代码清单1 partitions的实现
final def partitions: Array[Partition] = {checkpointRDD.map(_.partitions).getOrElse {if (partitions_ == null) {partitions_ = getPartitions// 省略次要代码}partitions_}}
根据代码清单1,partitions方法查找分区数组的优先级为:从CheckPoint查找 > 读取partitions_属性 > 调用getPartitions方法获取。检查点的内容将在10.3节详细介绍。
preferredLocations
preferredLocations方法(见代码清单2)优先调用CheckPoint中保存的RDD的getPreferredLocations方法获取指定分区的偏好位置,当没有保存CheckPoint时调用自身的getPreferredLocations方法获取指定分区的偏好位置。
代码清单2 preferredLocations的实现
final def preferredLocations(split: Partition): Seq[String] = {checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {getPreferredLocations(split)}}
dependencies
dependencies方法(见代码清单3)用于获取当前RDD的所有依赖的序列。
代码清单3 dependencies的实现
final def dependencies: Seq[Dependency[_]] = {checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {if (dependencies_ == null) {dependencies_ = getDependencies}dependencies_}}
根据代码清单3,dependencies方法的执行步骤如下。
- 从CheckPoint中获取RDD并将这些RDD封装为OneToOneDependency列表。如果从CheckPoint中获取到RDD的依赖,则返回RDD的依赖,否则进入下一步。
- 如果dependencies_等于null,那么调用子类实现的getDependencies方法获取当前RDD的依赖后赋予dependencies,最后返回dependencies_。
其他方法
除了以上的模板方法,RDD中还实现的方法如下:
(1)context
context方法(见代码清单4)实际返回了_sc(即SparkContext)。
代码清单4 context的实现
def context: SparkContext = sc
(2)getStorageLevel
getStorageLevel方法(见代码清单5)实际返回了当前RDD的StorageLevel。
代码清单5 获取当前RDD的存储级别
def getStorageLevel: StorageLevel = storageLevel
(3)getNarrowAncestors
getNarrowAncestors方法(见代码清单6)用于获取当前RDD的祖先依赖中属于窄依赖的RDD序列。
代码清单6 getNarrowAncestors的实现
private[spark] def getNarrowAncestors: Seq[RDD[_]] = {val ancestors = new mutable.HashSet[RDD[_]]def visit(rdd: RDD[_]) {val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]])val narrowParents = narrowDependencies.map(_.rdd)val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains)narrowParentsNotVisited.foreach { parent =>ancestors.add(parent)visit(parent)}}visit(this)ancestors.filterNot(_ == this).toSeq}
如要继续了解RDD,请继续阅读《深入理解Spark RDD——RDD依赖(构建DAG的关键)》。
深入理解Spark RDD系列文章:
《深入理解Spark RDD——为什么需要RDD?》
《深入理解Spark RDD——RDD实现的初次分析》
《深入理解Spark RDD——RDD依赖(构建DAG的关键)》
《深入理解Spark RDD——RDD分区计算器Partitioner》
《深入理解Spark RDD——RDD信息对象》
深入理解Spark RDD——RDD实现的初次分析相关推荐
- 理解Spark中RDD(Resilient Distributed Dataset)
文章目录 1 RDD 基础 1.1 分区 1.2 不可变 1.3 并行执行 2 RDD 结构 2.1 SparkContext.SparkConf 2.2 Partitioner 2.3 Depend ...
- 深入理解Spark RDD——RDD分区计算器Partitioner
在<深入理解Spark RDD--RDD依赖(构建DAG的关键)>一文,详细描述了RDD的宽窄依赖.RDD之间的依赖关系如果是Shuffle依赖,那么上游RDD该如何确定每个分区的输出将交 ...
- 理解Spark的核心RDD
与许多专有的大数据处理平台不同,Spark建立在统一抽象的RDD之上,使得它可以以基本一致的方式应对不同的大数据处理场景,包括MapReduce,Streaming,SQL,Machine Learn ...
- 深入理解Spark 2.1 Core (一):RDD的原理与源码分析
摘要 本文提出了分布式内存抽象的概念--弹性分布式数据集(RDD,Resilient Distributed Datasets),它具备像MapReduce等数据流模型的容错特性,并且允许开发人员在大 ...
- 深入理解Spark RDD——RDD信息对象
RDDInfo用于描述RDD的信息,RDDInfo提供的信息如下: id:RDD的id. name:RDD的名称. numPartitions:RDD的分区数量. storageLevel:RDD的存 ...
- Spark的RDD检查点实现分析
版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/beliefer/article/details/51206980 概述 在<深入理解Spark ...
- Spark的RDD概要DAG概述
Spark的RDD概要&DAG概述 1.RDD: 弹性分布式数据集(相当于集合) 2.RDD对应HDFS关系图 3.DAG概要 1.RDD: 弹性分布式数据集(相当于集合) 弹性:RDD的数据 ...
- Spark:RDD编程总结(概述、算子、分区、共享变量)
目录 1.RDD概述 1.1.RDD是什么 1.2.RDD的弹性 1.3.RDD的特点 1.3.1.分区 1.3.2.只读 1.3.3.依赖 1.3.4.缓存 1.3.5.检查点 2.RDD编程 2. ...
- spark的RDD中的action(执行)和transformation(转换)两种操作中常见函数介绍
参考文章:spark的RDD中的action(执行)和transformation(转换)两种操作中常见函数介绍 spark常见的RDD 1. 函数概览 2. 常见的Transformations 操 ...
最新文章
- 阿里首次公开量子通信技术 为20年后做准备
- 【剑指offer-Java版】37两个链表的第一个公共结点
- SDN控制器OpenDaylight简介—VeCloud
- Docker之docker简介及其优势
- 黑盒之嵌入式操作系统鲁棒性研究
- 脚艺人什么意思,网络上脚艺人什么梗,变身脚艺人意思
- 开源管理系统OSSIM设置 语言为中文简体
- 力扣每日一刷--验证回文串
- vue属性绑定加载图片不成功
- easyui图标对照
- 《21天学通C语言》总结(1)
- python爬虫实例(一) b站篇
- extremecomponents 配置
- python nlpir_NLPIR(ICTCLAS 2013)分词工具Python封装
- 敏捷项目管理术语大全
- Allegro静态铜皮避让问题
- Mac自定义触控栏 Touch Bar的显示教程
- CAPTCHA(验证码)的来源与作用
- .net5 开启Lucene的全文搜索之旅
- 关于Linux的介绍与安装
热门文章
- java高并发递增编号_java 高并发 订单编号递增(解决方案)
- 2021年高处作业安装拆除维护证考试题库及高处安装、维护、拆除试题解析
- 6款提升办公效率的超牛软件,从此告别加班
- npm报错Failed at the node-sass@4.14.1 postinstall script
- win10家庭版安装VMWare和Ubuntu 20.04 LTS
- JS简写技巧(来自于微信公众号-前端之巅)
- 联想R720安装原装内存条图文过程
- jQuery框架介绍-简化js
- kotlin常用语法扫盲及开发注意点,勿错失(持续更新)
- 为什么大家都喜欢买白色的汽车