spark RDD详解及源码分析

@(SPARK)[spark]

  • spark RDD详解及源码分析
  • 一基础
    • 一什么是RDD
    • 二RDD的适用范围
    • 三一些特性
    • 四RDD的创建
      • 1由一个已经存在的scala集合创建
      • 2由外部存储系统的数据创建
    • 五RDD的操作
  • 二RDD的缓存
    • 一缓存方式
    • 二缓存级别
    • 三序列化
  • 三窄依赖与宽依赖stage的划分依据
    • 一Dependency
    • 一窄依赖
      • 1NarrowDependency
      • 2OneToOneDependency
      • 3RangeDependency
    • 二宽依赖
      • 三stage的划分
  • 四RDD的源码
    • 一子类
    • 二属性
      • 1SpackContext
      • 2SeqDependency_
    • 三方法
      • 1tranformation与action
      • 2缓存

一、基础

(一)什么是RDD

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.

RDD是spark最基本的抽象概念,spark中的所有数据均通过RDD的形式进行组织。RDD是弹性的,自动容错的,分区的,只读的记录集合。

(二)RDD的适用范围

RDD尤其适用于迭代式的数据处理,如机器学习等。但它不适合那些异步更新共享状态的应用,例如web爬虫。

(三)一些特性

1、 在部分分区数据丢失时,spark可以通过依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重算。
2、用户可以在创建RDD时指定RDD的分区数量,如果没有指定,那么就会采用默认值,即程序分区到的CPU core数目。对于HDFS,每个block会分配一个分区。对于由父RDD生成的子RDD,其分区数量与父RDD相同,或者在transformation中显式指定。详见spark调优那篇文章。

(四)RDD的创建

RDD有2种创建方式

1、由一个已经存在的scala集合创建

val rdd = sc.paralellize(List(1,2,3,4))

一般只在试验性代码中使用,生产环境不大可能用到。

2、由外部存储系统的数据创建

比如本地文件,HDFS, HBASE等,常用textFile方法

val rdd = sc.textFile("hdfs:///tmp/myfile.txt")

(五)RDD的操作

RDD有2种操作:transformation 与 action,详见RDD的transformation 与 action那篇文章。

二、RDD的缓存

对于一个经常被使用的RDD或者计算代价较大的RDD,将其缓存下来,会大大的提高处理速度。

(一)缓存方式

persist()是标准的缓存方法
cache()是其简化方法,当只使用内存作缓存时使用。

(二)缓存级别

MEMORY_ONLY Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they’re needed. This is the default level.
MEMORY_AND_DISK Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don’t fit on disk, and read them from there when they’re needed.
MEMORY_ONLY_SER Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER Similar to MEMORY_ONLY_SER, but spill partitions that don’t fit in memory to disk instead of recomputing them on the fly each time they’re needed.
DISK_ONLY Store the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental) Store RDD in serialized format in Tachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon, the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts from memory. If you plan to use Tachyon as the off heap store, Spark is compatible with Tachyon out-of-the-box. Please refer to this page for the suggested version pairings.

(三)序列化

缓存数据时可以选择是否同时进行序列化。序列化后占用的空间会减少,但有序列化/反序列化的成本。
如果确定需要使用序列化,则同时应该设置序列化的方式,默认是使用java自带的序列化机制,可以通过kyro等框架优化序列化效率。详见《spark调优》那篇文章。

即使完全无其它属性,一个java对象都要占据8个字节的内存,包括:锁标志位、经历了几次gc、类的类信息等,因此序列化可节省此部分的空间。

三、窄依赖与宽依赖&stage的划分依据

RDD根据对父RDD的依赖关系,可分为窄依赖与宽依赖2种。
主要的区分之处在于父RDD的分区被多少个子RDD分区所依赖,如果一个就为窄依赖,多个则为宽依赖。更好的定义应该是:
窄依赖的定义应该是子RDD的每一个分区都依赖于父RDD的一个或者少量几个分区(不依赖于全部分区)

与依赖相关的以下5个类:

Dependency
<--NarrowDependency<--OneToOneDependency<--RangeDependency
<--ShuffleDependency

它们全部在同一个scala文件中,Dependency是一个abstract class, NarrowDependency(abstract class)与ShuffleDependency直接继承与它,OneToOneDependency与RangeDependency继承自NarrowDependency,大致如上图所示。

因此,关于Dependency的真正实现有三个,2个窄依赖:OneToOneDependency与RangeDependency,一个宽依赖:ShuffleDependency。

(一)Dependency

Dependency是一个抽象类,所有的依赖相关的类都必须继承自它。Dependency只有一个成员变量,表示的是父RDD。

 /*** :: DeveloperApi ::* Base class for dependencies.*/@DeveloperApiabstract class Dependency[T] extends Serializable {def rdd: RDD[T]}

(一)窄依赖

1、NarrowDependency

看看代码中对NarrowDependency的说明:

Base class for dependencies where each partition of the child RDD depends on a small number of partitions of the parent RDD. Narrow dependencies allow for pipelined execution。
即窄依赖的定义应该是子RDD的每一个分区都依赖于父RDD的一个或者少量几个分区(不依赖于全部分区)。

/*** :: DeveloperApi ::* Base class for dependencies where each partition of the child RDD depends on a small number* of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.*/
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {/*** Get the parent partitions for a child partition.* @param partitionId a partition of the child RDD* @return the partitions of the parent RDD that the child partition depends upon*/def getParents(partitionId: Int): Seq[Int]override def rdd: RDD[T] = _rdd
}

getParents根据子RDD的分区ID返回父RDD的分区ID。

主构建函数中的rdd是父RDD,下同。

2、OneToOneDependency

一对一依赖,即每个子RDD的分区的与父RDD的分区一一对应。

    /*** :: DeveloperApi ::* Represents a one-to-one dependency between partitions of the parent and child RDDs.*/@DeveloperApiclass OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {override def getParents(partitionId: Int): List[Int] = List(partitionId)}

重写了NarrowDependency的getParents方法,返回一个List,这个List只有一个元素,且与子RDD的分区ID相同。即子分区的ID与父分区的ID一一对应且相等。

3、RangeDependency

子RDD中的每个分区依赖于父RDD的几个分区,而父RDD的每个分区仅补一个子RDD分区所依赖,即多对一的关系。它仅仅被UnionRDD所使用。

/*** :: DeveloperApi ::* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.* @param rdd the parent RDD* @param inStart the start of the range in the parent RDD* @param outStart the start of the range in the child RDD* @param length the length of the range*/
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)extends NarrowDependency[T](rdd) {override def getParents(partitionId: Int): List[Int] = {if (partitionId >= outStart && partitionId < outStart + length) {List(partitionId - outStart + inStart)} else {Nil}}

(二)宽依赖

宽依赖只有一种:shuffleDependency,即子RDD依赖于父RDD的所有分区,父RDD的分每个区被所有子RDD的分区所依赖。

/*** :: DeveloperApi ::* Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,* the RDD is transient since we don't need it on the executor side.** @param _rdd the parent RDD* @param partitioner partitioner used to partition the shuffle output* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,*                   the default serializer, as specified by `spark.serializer` config option, will*                   be used.* @param keyOrdering key ordering for RDD's shuffles* @param aggregator map/reduce-side aggregator for RDD's shuffle* @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)*/
@DeveloperApi
class ShuffleDependency[K, V, C](@transient _rdd: RDD[_ <: Product2[K, V]],val partitioner: Partitioner,val serializer: Option[Serializer] = None,val keyOrdering: Option[Ordering[K]] = None,val aggregator: Option[Aggregator[K, V, C]] = None,val mapSideCombine: Boolean = false)extends Dependency[Product2[K, V]] {override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]val shuffleId: Int = _rdd.context.newShuffleId()val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(shuffleId, _rdd.partitions.size, this)_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

(三)stage的划分

DAG根据宽依赖来划分stage,每个宽依赖的处理均会一个stage的划分点。同一个stage中的多个操作会在一个task中完成。因为子RDD的分区仅依赖于父RDD的一个分区,因此这些步骤可以串行执行。

四、RDD的源码

RDD定义了所有RDD共有的一些属性和方法,下面介绍了主要的属性和方法。

(一)子类

CoGroupedRDD, EdgeRDD, EdgeRDDImpl, HadoopRDD, JdbcRDD, NewHadoopRDD, PartitionPruningRDD, ShuffledRDD, UnionRDD, VertexRDD, VertexRDDImpl

(二)属性

1、SpackContext

@transient private var _sc: SparkContext

在主构建函数中定义,表示RDD所在运行环境,可用于获取配置,清理环境等。

2、Seq[Dependency[_]]

@transient private var deps: Seq[Dependency[_]]

定义了这个RDD对父RDD的依赖关系。

(三)方法

1、tranformation与action

RDD中定义了所有RDD所共用的tranformation与action,如map, filter, reduce, first等,举个filter的例子:

  def filter(f: T => Boolean): RDD[T] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[T, T](this,(context, pid, iter) => iter.filter(cleanF),preservesPartitioning = true)}

2、缓存

包括pesist的多个实现及cache等,举个例子

  private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {// TODO: Handle changes of StorageLevelif (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {throw new UnsupportedOperationException("Cannot change storage level of an RDD after it was already assigned a level")}// If this is the first time this RDD is marked for persisting, register it// with the SparkContext for cleanups and accounting. Do this only once.if (storageLevel == StorageLevel.NONE) {sc.cleaner.foreach(_.registerRDDForCleanup(this))sc.persistRDD(this)}storageLevel = newLevelthis}

spark RDD详解及源码分析相关推荐

  1. spark 调度模块详解及源码分析

    spark 调度模块详解及源码分析 @(SPARK)[spark] spark 调度模块详解及源码分析 一概述 一三个主要的类 1class DAGScheduler 2trait TaskSched ...

  2. hadoop作业初始化过程详解(源码分析第三篇)

    (一)概述 我们在上一篇blog已经详细的分析了一个作业从用户输入提交命令到到达JobTracker之前的各个过程.在作业到达JobTracker之后初始化之前,JobTracker会通过submit ...

  3. SpringMVC异常处理机制详解[附带源码分析]

    SpringMVC异常处理机制详解[附带源码分析] 参考文章: (1)SpringMVC异常处理机制详解[附带源码分析] (2)https://www.cnblogs.com/fangjian0423 ...

  4. FPGA学习之路—接口(2)—I2C协议详解+Verilog源码分析

    FPGA学习之路--I2C协议详解+Verilog源码分析 定义 I2C Bus(Inter-Integrated Circuit Bus) 最早是由Philips半导体(现被NXP收购)开发的两线时 ...

  5. HashMap、ConcurretnHashMap面试题详解,源码分析

    文章目录 面试题 HashMap.LinkedHashMap和TreeMap的区别是什么? ①:为什么hashmap每次扩容大小为2的n次方? ③:jdk1.7的hashmap的扩容操作是在元素插入之 ...

  6. 安卓PopupWindow使用详解与源码分析(附项目实例)

    基本用法 首先定义弹窗的Layout文件 res/layout/popup_window.xml <?xml version="1.0" encoding="utf ...

  7. Epoll详解及源码分析

    文章来源:http://blog.csdn.net/chen19870707/article/details/42525887 Author:Echo Chen(陈斌) Email:chenb1987 ...

  8. JDK动态代理实现原理详解(源码分析)

    无论是静态代理,还是Cglib动态代理,都比较容易理解,本文就通过进入源码的方式来看看JDK动态代理的实现原理进行分析 要了解动态代理的可以参考另一篇文章,有详细介绍,这里仅仅对JDK动态代理做源码分 ...

  9. 解密android日志xlog,XLog 详解及源码分析

    一.前言 这里的 XLog 不是微信 Mars 里面的 xLog,而是elvishew的xLog.感兴趣的同学可以看看作者 elvishwe 的官文史上最强的 Android 日志库 XLog.这里先 ...

最新文章

  1. debian6更新网卡驱动
  2. Matlab实用程序--图形应用-条形图形
  3. linux ar 命令详解
  4. c语言获取按键,c语言获得键盘的按键
  5. javaweb连接不上mysql怎么办_java web应用连接mysql会突然connection连接失败
  6. linux hash 算法,识别哈希算法类型hash-identifier
  7. ARCGIS 拓扑规则阐述
  8. 前端工程化和Reactjs的模式
  9. WIFI抓包理论篇——802.11帧与EthernetII帧的差别
  10. Latex 爬过的坑(4)——Latex中插入Emoji
  11. 使用Excel处理姓名数据,转为拼音并且姓与名首字母大写
  12. 怎样复制秀米html码,秀米微信图文编辑器如何复制?
  13. PowerVR SDK 2020 Release 2发布:多处更新优化,性能更强大
  14. Qt简述如何实现不规则按钮
  15. 如何用PS(photoshop)给照片加文字
  16. Golang调用mssql存储过程
  17. 十年一梦,小米的原罪得到救赎了吗?
  18. python文件操作练习题【学生成绩.txt】
  19. IPv6:不发展才是最大的不安全
  20. SOA 架构中的ESB是更好的应用于异构系统集成整合还是用于统一服务调用/基础服务实施...

热门文章

  1. POJ 1185 炮兵阵地
  2. 【一步到胃解决】Several ports (8005, 8080, 8009) required by Tomcat v9.0 Server at localhost are already
  3. 27行代码AC_迷宫 2017年第八届蓝桥杯A组第一题(暴力、仿迷宫)
  4. 算法竞赛入门经典(第二版) | 例题5-1 大理石在哪 (普适查找)(UVa10474,Where is the Marble?)
  5. ArrayList各方法的时间复杂度
  6. kali如何取得超级用户权限_如何在 Ubuntu 上为用户授予和移除 sudo 权限 | Linux 中国...
  7. matlab 与dsp联合仿真,matlab和DSP联合开发前景很大?
  8. java写 IP十进制转变_java实现ip地址与十进制数相互转换
  9. 路由(二)——动态路由简介与RIP协议
  10. 在c语言中i10是什么意思啊,2011年计算机二级考试C语言十套上机题