关于RDD, 详细可以参考Spark的论文, 下面看下源码
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
Represents an immutable, partitioned collection of elements that can be operated on in parallel.

* Internally, each RDD is characterized by five main properties:
*  - A list of partitions
*  - A function for computing each split
*  - A list of dependencies on other RDDs
*  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
*  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

RDD分为一下几类,

basic(org.apache.spark.rdd.RDD): This class contains the basic operations available on all RDDs, such as `map`, `filter`, and `persist`.

org.apache.spark.rdd.PairRDDFunctions: contains operations available only on RDDs of key-value pairs, such as `groupByKey` and `join`

org.apache.spark.rdd.DoubleRDDFunctions: contains operations available only on RDDs of Doubles

org.apache.spark.rdd.SequenceFileRDDFunctions: contains operations available on RDDs that can be saved as SequenceFiles

RDD首先是泛型类, T表示存放数据的类型, 在处理数据是都是基于Iterator[T]
以SparkContext和依赖关系Seq deps为初始化参数
从RDD提供的这些接口大致就可以知道, 什么是RDD
1. RDD是一块数据, 可能比较大的数据, 所以不能保证可以放在一个机器的memory中, 所以需要分成partitions, 分布在集群的机器的memory
所以自然需要getPartitions, partitioner如果分区, getPreferredLocations分区如何考虑locality

Partition的定义很简单, 只有id, 不包含data

trait Partition extends Serializable {/*** Get the split's index within its parent RDD*/def index: Int// A better default implementation of HashCodeoverride def hashCode(): Int = index
}

2. RDD之间是有关联的, 一个RDD可以通过compute逻辑把父RDD的数据转化成当前RDD的数据, 所以RDD之间有因果关系
并且通过getDependencies, 可以取到所有的dependencies

3. RDD是可以被persisit的, 常用的是cache, 即StorageLevel.MEMORY_ONLY

4. RDD是可以被checkpoint的, 以提高failover的效率, 当有很长的RDD链时, 单纯的依赖replay会比较低效

5. RDD.iterator可以产生用于迭代真正数据的Iterator[T]

6. 在RDD上可以做各种transforms和actions

abstract class RDD[T: ClassManifest](@transient private var sc: SparkContext, //@transient, 不需要序列化@transient private var deps: Seq[Dependency[_]]) extends Serializable with Logging {
  /**辅助构造函数, 专门用于初始化1对1依赖关系的RDD,这种还是很多的, filter, map... 

     Construct an RDD with just a one-to-one dependency on one parent */def this(@transient oneParent: RDD[_]) = this(oneParent.context , List(new OneToOneDependency(oneParent)))  // 不同于一般的RDD, 这种情况因为只有一个parent, 所以直接传入parent RDD对象即可

  // =======================================================================// Methods that should be implemented by subclasses of RDD// =======================================================================/** Implemented by subclasses to compute a given partition. */def compute(split: Partition, context: TaskContext): Iterator[T]/*** Implemented by subclasses to return the set of partitions in this RDD. This method will only* be called once, so it is safe to implement a time-consuming computation in it.*/protected def getPartitions: Array[Partition]/*** Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only* be called once, so it is safe to implement a time-consuming computation in it.*/protected def getDependencies: Seq[Dependency[_]] = deps/** Optionally overridden by subclasses to specify placement preferences. */protected def getPreferredLocations(split: Partition): Seq[String] = Nil/** Optionally overridden by subclasses to specify how they are partitioned. */val partitioner: Option[Partitioner] = None// =======================================================================// Methods and fields available on all RDDs// =======================================================================/** The SparkContext that created this RDD. */def sparkContext: SparkContext = sc/** A unique ID for this RDD (within its SparkContext). */val id: Int = sc.newRddId()/** A friendly name for this RDD */var name: String = null/*** Set this RDD's storage level to persist its values across operations after the first time* it is computed. This can only be used to assign a new storage level if the RDD does not* have a storage level set yet..*/def persist(newLevel: StorageLevel): RDD[T] = {// TODO: Handle changes of StorageLevelif (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {throw new UnsupportedOperationException("Cannot change storage level of an RDD after it was already assigned a level")}storageLevel = newLevel// Register the RDD with the SparkContextsc.persistentRdds(id) = thisthis}/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */def cache(): RDD[T] = persist()
  /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */def getStorageLevel = storageLevel// Our dependencies and partitions will be gotten by calling subclass's methods below, and will// be overwritten when we're checkpointedprivate var dependencies_ : Seq[Dependency[_]] = null  @transient private var partitions_ : Array[Partition] = null /** An Option holding our checkpoint RDD, if we are checkpointed     * checkpoint就是把RDD存到磁盘文件中, 以提高failover的效率, 虽然也可以选择replay    * 并且在RDD的实现中, 如果存在checkpointRDD, 则可以直接从中读到RDD数据, 而不需要compute */private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD)

  
  /*** Internal method to this RDD; will read from cache if applicable, or otherwise compute it.* This should ''not'' be called by users directly, but is available for implementors of custom* subclasses of RDD.*/
  /** 这是RDD访问数据的核心, 在RDD中的Partition中只包含id而没有真正数据    * 那么如果获取RDD的数据? 参考storage模块     * 在cacheManager.getOrCompute中, 会将RDD和Partition id对应到相应的block, 并从中读出数据*/ final def iterator(split: Partition, context: TaskContext): Iterator[T] = {if (storageLevel != StorageLevel.NONE) {//StorageLevel不为None,说明这个RDD persist过, 可以直接读出来SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)} else {computeOrReadCheckpoint(split, context) //如果没有persisit过, 只有从新计算出, 或从checkpoint中读出}}

  // Transformations (return a new RDD)
  //...... 各种transformations的接口,map, union...
  /*** Return a new RDD by applying a function to all elements of this RDD.*/def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))

  // Actions (launch a job to return a value to the user program)
  //......各种actions的接口,count, collect...
  /*** Return the number of elements in the RDD.*/def count(): Long = {// 只有在action中才会真正调用runJob, 所以transform都是lazy的sc.runJob(this, (iter: Iterator[T]) => {var result = 0Lwhile (iter.hasNext) {result += 1Liter.next()}result}).sum}

  // =======================================================================// Other internal methods and fields// =======================================================================  
  /** Returns the first parent RDD       返回第一个parent RDD*/protected[spark] def firstParent[U: ClassManifest] = {dependencies.head.rdd.asInstanceOf[RDD[U]]}

  //................
}

这里先只讨论一些basic的RDD, pairRDD会单独讨论

FilteredRDD

One-to-one Dependency, FilteredRDD

使用FilteredRDD, 将当前RDD作为第一个参数, f函数作为第二个参数, 返回值是filter过后的RDD

  /*** Return a new RDD containing only the elements that satisfy a predicate.*/def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))

在compute中, 对parent RDD的Iterator[T]进行filter操作

private[spark] class FilteredRDD[T: ClassManifest]( //filter是典型的one-to-one dependency, 使用辅助构造函数 prev: RDD[T],   //parent RDDf: T => Boolean) //f,过滤函数extends RDD[T](prev) {//firstParent会从deps中取出第一个RDD对象, 就是传入的prev RDD, 在One-to-one Dependency中,parent和child的partition信息相同override def getPartitions: Array[Partition] = firstParent[T].partitionsoverride val partitioner = prev.partitioner    // Since filter cannot change a partition's keysoverride def compute(split: Partition, context: TaskContext) =firstParent[T].iterator(split, context).filter(f) //compute就是真正产生RDD的逻辑
}

UnionRDD

Range Dependency, 仍然是narrow的

先看看如果使用union的, 第二个参数是, 两个RDD的array, 返回值就是把这两个RDD union后产生的新的RDD

  /*** Return the union of this RDD and another one. Any identical elements will appear multiple* times (use `.distinct()` to eliminate them).*/def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))

先定义UnionPartition, Union操作的特点是, 只是把多个RDD的partition合并到一个RDD中, 而partition本身没有变化, 所以可以直接重用parent partition

3个参数
idx, partition id, 在当前UnionRDD中的序号
rdd, parent RDD
splitIndex, parent partition的id

private[spark] class UnionPartition[T: ClassManifest](idx: Int, rdd: RDD[T], splitIndex: Int)extends Partition {var split: Partition = rdd.partitions(splitIndex)//从parent RDD中取出相应的partition, 重用def iterator(context: TaskContext) = rdd.iterator(split, context)//Iterator也可以重用def preferredLocations() = rdd.preferredLocations(split)override val index: Int = idx//partition id是新的, 因为多个合并后, 序号肯定会发生变化
}

定义UnionRDD

class UnionRDD[T: ClassManifest](sc: SparkContext,@transient var rdds: Seq[RDD[T]])  //parent RDD Seqextends RDD[T](sc, Nil) {  // Nil since we implement getDependenciesoverride def getPartitions: Array[Partition] = {val array = new Array[Partition](rdds.map(_.partitions.size).sum) //UnionRDD的partition数,是所有parent RDD中的partition数目的和var pos = 0for (rdd <- rdds; split <- rdd.partitions) {array(pos) = new UnionPartition(pos, rdd, split.index) //创建所有的UnionPartitionpos += 1}array}override def getDependencies: Seq[Dependency[_]] = {val deps = new ArrayBuffer[Dependency[_]]var pos = 0for (rdd <- rdds) { deps += new RangeDependency(rdd, 0, pos, rdd.partitions.size)//创建RangeDependencypos += rdd.partitions.size)//由于是RangeDependency, 所以pos的递增是加上整个区间size}deps}override def compute(s: Partition, context: TaskContext): Iterator[T] =s.asInstanceOf[UnionPartition[T]].iterator(context)//Union的compute非常简单,什么都不需要做override def getPreferredLocations(s: Partition): Seq[String] =s.asInstanceOf[UnionPartition[T]].preferredLocations()
}

转载于:https://www.cnblogs.com/fxjwind/p/3489107.html

Spark 源码分析 -- RDD相关推荐

  1. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  2. Spark 源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 一. 启动篇 (一) 引子 在spark-shell终端执行 val arr = Array(1,2,3,4) val rdd ...

  3. spark 源码分析 Blockmanager

    原文链接 参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memo ...

  4. Spark源码分析 – DAGScheduler

    DAGScheduler的架构其实非常简单, 1. eventQueue, 所有需要DAGScheduler处理的事情都需要往eventQueue中发送event 2. eventLoop Threa ...

  5. spark 源码分析之十八 -- Spark存储体系剖析

    本篇文章主要剖析BlockManager相关的类以及总结Spark底层存储体系. 总述 先看 BlockManager相关类之间的关系如下: 我们从NettyRpcEnv 开始,做一下简单说明. Ne ...

  6. spark 源码分析之二十 -- Stage的提交

    引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...

  7. spark 源码分析之十九 -- DAG的生成和Stage的划分

    上篇文章 spark 源码分析之十八 -- Spark存储体系剖析 重点剖析了 Spark的存储体系.从本篇文章开始,剖析Spark作业的调度和计算体系. 在说DAG之前,先简单说一下RDD. 对RD ...

  8. Spark源码分析:多种部署方式之间的区别与联系

    作者:过往记忆 从官方的文档我们可以知道, Spark 的部署方式有很多种:local.Standalone.Mesos.YARN-..不同部署方式的后台处理进程是不一样的,但是如果我们从代码的角度来 ...

  9. Spark源码分析之Sort-Based Shuffle读写流程

    一 概述 我们知道Spark Shuffle机制总共有三种: # 未优化的Hash Shuffle:每一个ShuffleMapTask都会为每一个ReducerTask创建一个单独的文件,总的文件数是 ...

最新文章

  1. 本轮股市行情性质的分析
  2. Linux与shell环境,Linux 环境及 Shell 程序
  3. 2008年5月小记(??, #, DataContractJsonSerializer, CTE Ranking top)
  4. LightOJ 1401 No More Tic-tac-toe 博弈论SG打表
  5. c语言 地址+1,C语言中,为什么指针表达式的值+1.对应的地址值却+4?/为什么两个数组元素的地址相减之差不为地址之差?...
  6. 2021 年 9 月 TIOBE 指数 C# 同比增长突破 1.2%
  7. java中按钮的接口_Java接口基础
  8. Jsonschema2pojo从JSON生成Java类(Maven)
  9. Ssm角色权限后台管理实战开发
  10. SQL Server外键中的DELETE CASCADE和UPDATE CASCADE
  11. 学习clojure(2)
  12. 通用设备的动态DMA映射
  13. 用C#语言实现记事本
  14. xctf攻防世界 MISC高手进阶区 MISCall
  15. 用python写一个恩格尔方格游戏
  16. 合并石子(三种方法)
  17. 用CCS搭建简单的F28069M工程并控制LED闪烁
  18. 干支纪年法简便算法_电子工程师笔记,常用的C语言算法总结,值得转发收藏...
  19. 计算机flash方面参考文献,flash毕业论文参考文献(2)
  20. Effect C++ 学习笔记三:资源管理

热门文章

  1. OpenCv中实现了三种立体匹配算法:
  2. SAD立体匹配算法在opencv中的实现
  3. 立体匹配十大概念综述
  4. 给定2个字符串,如何计算变化(插入、删除、替换)?【levenshtein distance 算法】
  5. 《信息可视化:交互设计(原书第2版)》——第2章基本概念
  6. Xcode(7.0以上版本)真机调试
  7. Log4j 2使用教程转
  8. 通过启动项设置实现应用程序自启动功能
  9. Jetty9.2.2集群Session共享
  10. http 301 和 302的区别