rdd的特性,DAG,Stage的理解

  • RDD
  • 结构化理解
  • RDD的数据集与Partitions
  • Partitioner
  • Dependencies与Lineage
  • NarrowDependency与ShuffleDependency
  • 为什么区分窄依赖和宽依赖?
  • Stage
  • Checkpoint
  • Iterator和Compute
  • StorageLevel
  • PreferredLocation
  • Sparkcontext
  • sparkconf
  • transformations
  • actions

RDD

RDD(Resilient Distributed Datasets,弹性分布式数据集),是Spark最为核心的概念,自然也是理解Apache Spark 工作原理的最佳入口之一。

RDD的特点:

  1. 是一个分区的只读记录的集合;
  2. 一个具有容错机制的特殊集;
  3. 只能通过在稳定的存储器或其他RDD上的确定性操作(转换)来创建;
  4. 可以分布在集群的节点上,以函数式操作集合的方式,进行各种并行操作

RDD之所以为“弹性”的特点

  1. 基于Lineage的高效容错(第n个节点出错,会从第n-1个节点恢复,血统容错);
  2. Task如果失败会自动进行特定次数的重试(默认4次);
  3. Stage如果失败会自动进行特定次数的重试(可以值运行计算失败的阶段),只计算失败的数据分片;
  4. 数据调度弹性:DAG TASK 和资源管理无关;
  5. checkpoint;
  6. 自动的进行内存和磁盘数据存储的切换;

结构化理解

查阅了很多资料基本都没有介绍RDD长什么样子的,什么样的结构,都说里面有依赖、有分区,但是长什么样呢?对它没有一点头绪,我想初学者一定是和我一样的。

没有结构图,怎么理解RDD?上图!(自己瞎做的图,基本借鉴这位博主,不准确的地方请指正)

RDD的数据集与Partitions

具体参考分区的具体分析+源代码分析
RDD是一个只读的有属性的数据集。属性用来描述当前数据集的状态,数据集是由数据的分区(partition)组成。
RDD 内部的数据集合在逻辑上和物理上被划分成多个小子集合,这样的每一个子集合我们将其称为分区(partitions),分区的个数会决定并行计算的粒度,而每一个分区数值的计算都是在一个单独的任务中进行,因此并行任务的个数,也是由 RDD分区的个数决定的。

但事实上,RDD 只是数据集的抽象,分区内部并不会存储具体的数据。Partition 类内包含一个 index 成员,表示该分区在 RDD 内的编号,通过 RDD 编号 + 分区编号可以唯一确定该分区对应的块编号,利用底层数据存储层提供的接口,就能从存储介质(如:HDFS、Memory)中提取出分区对应的数据。下面是Partition 类的代码:

trait Partition extends Serializable {/*** Get the partition's index within its parent RDD*/def index: Int// A better default implementation of HashCodeoverride def hashCode(): Int = index
}

Partitioner

Partitioner决定RDD的分区方式。
RDD的分区方式主要包含两种(HashPartitioner和RangePartitioner),这两种分区类型都是针对Key-Value类型的数据。如是非Key-Value类型,则分区为None。 Hash是以key作为分区条件的散列分布,分区数据不连续,极端情况也可能散列到少数几个分区上,导致数据不均等;Range按Key的排序平衡分布,分区内数据连续,大小也相对均等。

Dependencies与Lineage

图中最显眼的一定是Dependencies(依赖),它扩展出了一个箭头到前面一个块。Parents在很多面向对象的计算机语言可以知道它表示“继承”,在RDD中的Dependencies意思略有不同。看一段实际操作的Spark代码:

lines = spark.textFile("hdfs://...")
errors = lines.filter(_.startsWith("ERROR"))
errors.cache()// Count errors mentioning MySQL:
errors.filter(_.contains("MySQL")).count()
// Return the time fields of errors mentioning
// HDFS as an array (assuming time is field
// number 3 in a tab-separated format):
errors.filter(_.contains("HDFS")).map(_.split('\t')(3)).collect()

这段代码可以化为如下图的流程(图和代码都是盗来的):

在每次transformations操作时,都是重新创建了一个新的RDD2,这个RDD2时基于原有的RDD1,RDD1是RDD2的Parents,也就是说这个RDD2依赖于RDD1。这些依赖描述了RDD的Lineage(血统)。

NarrowDependency与ShuffleDependency

如果父RDD的每个分区最多只能被子RDD的一个分区使用,我们称之为(narrow dependency)窄依赖;
若一个父RDD的每个分区可以被子RDD的多个分区使用,我们称之为(wide dependency)宽依赖,在源代码中方法名为ShuffleDependency,顾名思义这之中还需要Shuffle操作。

窄依赖每个child RDD 的partition的生成操作都是可以并行的,而宽依赖则需要所有的parent partition shuffle结果得到后再进行。

NarrowDependency也还有两个子类,一个是 OneToOneDependency,一个是 RangeDependency

OneToOneDependency,可以看到getParents实现很简单,就是传进一个partitionId: Int,再把partitionId放在List里面传出去,即去parent RDD 中取与该RDD 相同 partitionID的数据

RangeDependency,用于union。与上面不同的是,这里我们要算出该位置,设某个parent RDD 从 inStart 开始的partition,逐个生成了 child RDD 从outStart 开始的partition,则计算方式为: partitionId - outStart + inStart

那么为什么要把依赖分为窄依赖和宽依赖呢?

为什么区分窄依赖和宽依赖?

来源:https://www.jianshu.com/p/dd7c7243e7f9?from=singlemessage
首先,从计算过程来看,窄依赖是数据以管道方式经一系列计算操作可以运行在了一个集群节点上,如(map、filter等),宽依赖则可能需要将数据通过跨节点传递后运行(如groupByKey),有点类似于MR的shuffle过程。
其次,从失败恢复来看,窄依赖的失败恢复起来更高效,因为它只需找到父RDD的一个对应分区即可,而且可以在不同节点上并行计算做恢复;宽依赖则牵涉到父RDD的多个分区,恢复起来相对复杂些。

Stage

Stage可以简单理解为是由一组RDD组成的可进行优化的执行计划。如果RDD的衍生关系都是窄依赖,则可放在同一个Stage中运行,若RDD的依赖关系为宽依赖,则要划分到不同的Stage。这样Spark在执行作业时,会按照Stage的划分, 生成一个完整的最优的执行计划。下面引用一张比较流行的图片辅助大家理解Stage,如图RDD-A到RDD-B和RDD-F到RDD-G均属于宽依赖,所以与前面的父RDD划分到了不同的Stage中。

Checkpoint

尽管当一个RDD出现问题可以由它的依赖也就是Lineage信息可以用来故障恢复,但对于那些Lineage链较长的RDD来说,这种恢复可能很耗时。
Checkpoint是Spark提供的一种缓存机制,当需要计算的RDD过多时,为了避免重新计算之前的RDD,可以对RDD做Checkpoint处理,检查RDD是否被物化或计算,并将结果持久化到磁盘或HDFS。

  1. Checkpoint会把当前RDD保存到一个目录中。
  2. Checkpoint的时候,会把所有依赖的父级rdd信息清除掉。
  3. Checkpoint不会马上执行,要触发action操作的时候才会执行。
  4. 因为 Checkpoint会清除父级RDD的信息,所以在Checkpoint应该先做persist(持久化)操作,否则就要重新计算一遍。
  5. 一般来说,Lineage链较长、宽依赖的RDD需要采用检查点机制。
  6. Checkpoint的好处显而易见,比如做1000次迭代,在第999次时做了Checkpoint,如果第1000次的时候,只要重新计算第1000即可,不用从头到尾再计算一次。
  7. 与spark提供的另一种缓存机制cache相比, cache缓存数据由executor管理,当executor消失了,被cache的数据将被清除,RDD重新计算,而checkpoint将数据保存到磁盘或HDFS,job可以从checkpoint点继续计算。

Iterator和Compute

本节参考来源:http://blog.csdn.net/liben2007/article/details/53700399

Iterator和Compute都是来表示该RDD如何通过父RDD计算得到。
Iterator用来查找当前RDD Partition与父RDD中Partition的血缘关系。并通过StorageLevel确定迭代位置,直到确定真实数据的位置。
Iterator函数实现大体是这么个流程:

  1. 若标记了有缓存,则取缓存,取不到则进行computeOrReadCheckpoint(计算或读检查点)。完了再存入缓存,以备后续使用。
  2. 若未标记有缓存,则直接进行computeOrReadCheckpoint。
  3. “computeOrReadCheckpoint”这个过程也做两个判断:有做过checkpoint,没有做过checkpoint。做过checkpoint则可以读取到检查点数据返回。无则调该RDD的实现类的compute函数计算。compute函数实现方式就是向上递归“获取父RDD分区数据进行计算”,直到遇到检查点RDD获取有缓存的RDD。

StorageLevel

用来记录RDD的存储级别,在官网中可以看到RDD的存储级别表,这里不多解释:

PreferredLocation

它是一个列表,用于存储每个Partition的优先位置的一个列表。对于每个HDFS文件来说,这个列表保存的是每个Partition所在的块的位置,也就是对这个文件的”划分点“。

Sparkcontext

SparkContext为Spark job的入口,由Spark driver创建在client端,包括集群连接,RddID,创建抽样,累加器,广播变量等信息。

sparkconf

配置信息,即sc.conf
Spark参数配置信息
提供三个位置用来配置系统:
Spark api:控制大部分的应用程序参数,可以用SparkConf对象或者Java系统属性设置
环境变量:可以通过每个节点的conf/spark-env.sh脚本设置。例如IP地址、端口等信息
日志配置:可以通过log4j.properties配置

transformations

根据原有的RDD创建一个新的RDD。
注意:
RDD的所有转换操作都是lazy模式,即Spark不会立刻计算结果,而只是简单的记住所有对数据集的转换操作。这些转换只有遇到action操作的时候才会开始计算。这样的设计使得Spark更加的高效。
例如,对一个输入数据做一次map操作后进行reduce操作,只有reduce的结果返回给driver,而不是把数据量更大的map操作后的数据集传递给driver。
*略

actions

对RDD操作后把结果返回给driver
*略

原文链接:https://blog.csdn.net/u011094454/article/details/78992293

Spark基础之:rdd的特性,DAG,Stage的理解相关推荐

  1. spark基础之RDD详解

    一 什么是RDD,有什么特点? RDD: Resilient Distributed Dataset,弹性分布式数据集. 特点: # 它是一种数据的集合 # 它可以被分区,每一个分区分布在不同的集群中 ...

  2. spark基础之RDD和DataFrame和Dataset比较

    一 SparkSQL简介 Spark SQL是一个能够利用Spark进行结构化数据的存储和操作的组件,结构化数据可以来自外部结构化数据源也可以通过RDD获取. 外部的结构化数据源包括Hive,JSON ...

  3. spark基础之RDD和DataFrame的转换方式

    一 通过定义Case Class,使用反射推断Schema 定义Case Class,在RDD的转换过程中使用Case Class可以隐式转换成SchemaRDD,然后再注册成表,然后就可以利用sql ...

  4. Spark基础 DAG

    为什么使用spark的原因是早期的编程模式MapReduce缺乏对数据共享的高效元语,会造成磁盘I/O 以及序列号等开销,spark提出了统一的编程抽象---弹性分布式数据集(RDD),该模型可以令并 ...

  5. Spark 的核心 RDD 以及 Stage 划分细节,运行模式总结

    精选30+云产品,助力企业轻松上云!>>> 阅读文本大概需要 5 分钟. 以下内容,部分参考网络资料,也有自己的理解, 图片 99% 为自己制作.如有错误,欢迎留言指出,一起交流. ...

  6. Spark中,RDD概述(五大属性,弹性介绍,5个特性)

    1 什么是RDD RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark 中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行计算的集合.在Spa ...

  7. Spark基础学习笔记22:Spark RDD案例分析

    文章目录 零.本讲学习目标 一.案例分析:Spark RDD实现单词计数 (一)案例概述 (二)实现步骤 1.新建Maven管理的Spark项目 2.添加Scala和Spark依赖 3.创建WordC ...

  8. Spark基础学习笔记16:创建RDD

    文章目录 零.本讲学习目标 一.RDD为何物 (一)RDD概念 (二)RDD示例 (三)RDD主要特征 二.做好准备工作 (一)准备文件 1.准备本地系统文件 2.准备HDFS系统文件 (二)启动Sp ...

  9. Spark基础学习笔记20:RDD持久化、存储级别与缓存

    文章目录 零.本讲学习目标 一.RDD持久化 (一)引入持久化的必要性 (二)案例演示持久化操作 1.RDD的依赖关系图 2.不采用持久化操作 3.采用持久化操作 二.存储级别 (一)持久化方法的参数 ...

最新文章

  1. 一种PacBio测序数据组装得到的基因组序列的纠错方法技术 (专利技术)
  2. 关于日期比较compareTo
  3. 算法每日学打卡:java语言基础题目打卡(19-21)
  4. HDU6956-Pass!(2021杭电多校一)(BSGS)
  5. zkfc 异常退出问题,报错Received stat error from Zookeeper. code:CONNECTIONLOSS
  6. 网卡添加VLAN TAG
  7. 信息系统项目管理师知识要点
  8. mysql msvcp140.dll修复_丢失msvcp140.dll怎么办
  9. 计算机考试综合模块怎么做,《综合素质》几大模块备考指导要知道!
  10. aardio 模拟键盘按键,实现msgbox对话框自动关闭
  11. ViewPager(六)让ViewPager用起来更顺滑——设置间距与添加转场动画
  12. 记录---Rosalind之problemsSolutions__0002
  13. 培训机构要不要去 适合谁去
  14. ubuntu安装软件提示snap错误has install-snap change in progress
  15. 翻译文章后再来看翻译文章
  16. Kotlin中协程理解与实战(一)
  17. 【源头活水】IEEE TIFS 2022 | 基于不确定因素感知的鲁棒虹膜识别
  18. 图示LDA主题模型(酒店评论数据演示)
  19. [需求管理-1]:IT需求管理总体流程+需求管理需要的技能
  20. Jira统计工时sql备份

热门文章

  1. 再也不用担心偶像“塌房”了!资本杀入虚拟偶像
  2. win10 wubi linux,Wubi安装Ubuntu 10.04图文教程
  3. 使用压缩软件对文件夹进行加密和解密过程
  4. c# 清除 html,C#方法清除HTML标记
  5. python-pandas学习笔记
  6. 小米note3 android 8,手机 篇五十五:红米Note8Pro拍照翻车,竟不敌两年前小米Note3!...
  7. JSON格式化工具和beyondcompare对比工具
  8. Visio 上下标 微调 建立模具
  9. 品牌控价、淘宝控价、拼多多控价,如何有效利用平台投诉
  10. 主板定制中X86主板和ARM主板的比较