RDD它是Spark基,它是最根本的数据抽象。http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf 它开着RDD文件。假设英语阅读太费时,:http://shiyanjun.cn/archives/744.html

本文也是基于这篇论文和源代码,分析RDD的实现。

第一个问题,RDD是什么?Resilient Distributed Datasets(RDD,)弹性分布式数据集。RDD是仅仅读的、分区记录的集合。

RDD仅仅能基于在稳定物理存储中的数据集和其它已有的RDD上运行确定性操作来创建。这些确定性操作称之为转换。如map、filter、groupBy、join(转换不是程开发者在RDD上运行的操作)。

RDD不须要物化。RDD含有怎样从其它RDD衍生(即计算)出本RDD的相关信息(即Lineage)。据此能够从物理存储的数据计算出对应的RDD分区。

看一下内部实现对于RDD的概述:

Internally, each RDD is characterized by five main properties:

*

*  - A list of partitions

*  - A function for computing each split

*  - A list of dependencies on other RDDs

*  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

*  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for

*    an HDFS file)

每一个RDD有5个基本的属性:

一组分片(partition)。即数据集的基本组成单位

一个计算每一个分片的函数

对parent RDD的依赖,这个依赖描写叙述了RDD之间的lineage

对于key-value的RDD,一个Partitioner

一个列表,存储存取每一个partition的preferred位置。对于一个HDFS文件来说。存储每一个partition所在的块的位置。

org.apache.spark.rdd.RDD是一个抽象类,定义了RDD的基本操作和属性。这些基本操作包含map,filter和persist。另外。org.apache.spark.rdd.PairRDDFunctions定义了key-value类型的RDD的操作,包含groupByKey,join,reduceByKey,countByKey,saveAsHadoopFile等。org.apache.spark.rdd.SequenceFileRDDFunctions包含了全部的RDD都适用的saveAsSequenceFile。

RDD支持两种操作:转换(transformation)从现有的数据集创建一个新的数据集;而动作(actions)在数据集上运行计算后,返回一个值给驱动程序。 比如,map就是一种转换。它将数据集每一个元素都传递给函数。并返回一个新的分布数据集表示结果。

还有一方面,reduce是一种动作,通过一些函数将全部的元素叠加起来,并将终于结果返回给Driver程序。(只是还有一个并行的reduceByKey,能返回一个分布式数据集)

Spark中的全部转换都是惰性的。也就是说,他们并不会直接计算结果。相反的,它们仅仅是记住应用到基础数据集(比如一个文件)上的这些转换动作。

仅仅有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这个设计让Spark更加有效率的运行。比如。我们能够实现:通过map创建的一个新数据集。并在reduce中使用。终于仅仅返回reduce的结果给driver,而不是整个大的新数据集。

默认情况下,每一个转换过的RDD都会在你在它之上运行一个动作时被又一次计算。只是,你也能够使用persist(或者cache)方法,持久化一个RDD在内存中。在这种情况下,Spark将会在集群中,保存相关元素。下次你查询这个RDD时,它将能更高速訪问。在磁盘上持久化数据集。或在集群间复制数据集也是支持的。

下表列出了Spark中的RDD转换和动作。

每一个操作都给出了标识,当中方括号表示类型參数。前面说过转换是延迟操作。用于定义新的RDD;而动作启动计算操作,并向用户程序返回值或向外部存储写数据。表1Spark中支持的RDD转换和动作转换map(f : T ) U) : RDD[T] ) RDD[U]filter(f : T ) Bool) : RDD[T] ) RDD[T]flatMap(f : T ) Seq[U]) : RDD[T] ) RDD[U]sample(fraction : Float) : RDD[T] ) RDD[T] (Deterministic sampling)groupByKey() : RDD[(K, V)] ) RDD[(K, Seq[V])]reduceByKey(f : (V; V) ) V) : RDD[(K, V)] ) RDD[(K, V)]union() : (RDD[T]; RDD[T]) ) RDD[T]join() : (RDD[(K, V)]; RDD[(K, W)]) ) RDD[(K, (V, W))]cogroup() : (RDD[(K, V)]; RDD[(K, W)]) ) RDD[(K, (Seq[V], Seq[W]))]crossProduct() : (RDD[T]; RDD[U]) ) RDD[(T, U)]mapValues(f : V ) W) : RDD[(K, V)] ) RDD[(K, W)] (Preserves partitioning)sort(c : Comparator[K]) : RDD[(K, V)] ) RDD[(K, V)]partitionBy(p : Partitioner[K]) : RDD[(K, V)] ) RDD[(K, V)]

动作count() : RDD[T] ) Longcollect() : RDD[T] ) Seq[T]reduce(f : (T; T) ) T) : RDD[T] ) Tlookup(k : K) : RDD[(K, V)] ) Seq[V] (On hash/range partitioned RDDs)save(path : String) : Outputs RDD to a storage system, e.g., HDFS

注意,有些操作仅仅对键值对可用,比方join。

另外。函数名与Scala及其它函数式语言中的API匹配。比如map是一对一的映射,而flatMap是将每一个输入映射为一个或多个输出(与MapReduce中的map相似)。

除了这些操作以外。用户还能够请求将RDD缓存起来。并且,用户还能够通过Partitioner类获取RDD的分区顺序。然后将还有一个RDD依照相同的方式分区。有些操作会自己主动产生一个哈希或范围分区的RDD,像groupByKey。reduceByKey和sort等。

从一个样例開始

以下的样例摘自RDD的论文,实现了处理一个HDFS日志文件里错误日志的逻辑。

lines = spark.textFile("hdfs://...") // lines is a org.apache.spark.rdd.MappedRDD

errors = lines.filter(_.startsWith("ERROR")) // errors is a org.apache.spark.rdd.FilteredRDD

errors.cache() // persist 到内存中

errors.count() // 触发action。计算errors有多少个,即ERROR的多少行

// Count errors mentioning MySQL:

errors.filter(_.contains("MySQL")).count()

// Return the time fields of errors mentioning

// HDFS as an array (assuming time is field

// number 3 in a tab-separated format):

errors.filter(_.contains("HDFS"))

.map(_.split('\t')(3))

.collect()

spark是一个org.apache.spark.SparkContext的实例。基本上spark的应用都是以定义一个SparkContext開始的。textFile的定义例如以下:

/**

* Read a text file from HDFS, a local file system (available on all nodes), or any

* Hadoop-supported file system URI, and return it as an RDD of Strings.

*/

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {

hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],

minPartitions).map(pair => pair._2.toString).setName(path)

}

hadoopFile创建了一个org.apache.spark.rdd.HadoopRDD,而在HadoopRDD上调用map则生成了一个MappedRDD:

/**

* Return a new RDD by applying a function to all elements of this RDD.

*/

def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))

errors.cache()并不会马上运行,它的作用是在RDD的计算完毕后,将结果cache起来。以供以后的计算使用。这种话能够加快以后运算的速度。

errors.count() 就触发了一个action,这个时候就须要向集群提交job了:

/**

* Return the number of elements in the RDD.

*/

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

提交后,SparkContext会将runJob提交到DAGScheduler。DAGScheduler会将当前的DAG划分成Stage。然后生成TaskSet后通过TaskScheduler的submitTasks提交tasks,而这又会调用SchedulerBackend。SchedulerBackend会将这些任务发送到Executor去运行。

怎样划分Stage?怎样生成tasks?接下来将得到解决。

为了明天还要上班,,早在今天休息吧。

版权声明:本文博主原创文章,博客,未经同意不得转载。

spark的rdd的含义_Spark里边:到底是什么RDD相关推荐

  1. spark如何防止内存溢出_Spark 理论基石 —— RDD

    概述 RDD,学名可伸缩的分布式数据集(Resilient Distributed Dataset).是一种对数据集形态的抽象,基于此抽象,使用者可以在集群中执行一系列计算,而不用将中间结果落盘.而这 ...

  2. spark on yarn 完全分布式_Spark编程笔记(1)-架构基础与运行原理

    引言 根据IBM前首席执行官郭士纳的观点,IT领域每隔十五年就会迎来一 次重大变革 .当前我们正处于第三次信息浪潮(2010年前后),物联网.云计算和大数据技术突飞猛进. 信息爆炸是我们当前所需要解决 ...

  3. spark的数三角形算法_Spark任务调度概述

    本文讲述了Spark任务调度的实现框架,概要分析了Spark从Job提交到Task创建并提交给Worker的整个过程.并对Spark任务调度相关的概念进行了介绍. 任务调度总体流程 从设计层面来说,S ...

  4. spark 读取多个路径_spark

    Spark系统的概述 mapredcue 是一个简单通用和自动容错的批处理计算模型 劣势: 交互式和流式计算 使用Storm,Impala结合hadoop的不足: •重复工作 •组合问题 •使用范围的 ...

  5. spark的数三角形算法_spark graphx 图计算

    写在前面 态度决定高度!让优秀成为一种习惯! 世界上没有什么事儿是加一次班解决不了的,如果有,就加两次!(- - -茂强) 什么是一个图 一个网络 Network 一个树 Tree 一个RDBMS R ...

  6. 编程实现将rdd转换为dataframe:源文件内容如下(_大数据 什么是RDD?可以干什么?为什么要有RDD?...

    什么是RDD 弹性分布式数据集(Resilient Distributed Dataset,RDD)是 Spark 中的核心概念. RDD在抽象上来讲是一种抽象的分布式的数据集.它是被分区的,每个分区 ...

  7. RDD的依赖关系、窄依赖、宽依赖、RDD的缓存、RDD缓存方式、DAG的生成、RDD容错机制之Checkpoint

    1.RDD的依赖关系 RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency). 1.1.窄依赖 窄依赖指的是每一 ...

  8. spark Java oracle,spark2.x由浅入深深到底系列六之RDD java api用JdbcRDD读取关系型数据库...

    课程咨询以及领取大额优惠请加微信:bigdatatang01 以下是用spark RDD java api实现从关系型数据库中读取数据,这里使用的是derby本地数据库,当然可以是mysql或者ora ...

  9. spark和python的关系_spark submit和pyspark有什么区别?

    如果启动pyspark,然后运行以下命令:import my_script; spark = my_script.Sparker(sc); spark.collapse('./data/') 一切都很 ...

最新文章

  1. Java学习之if语句
  2. java基础:任意进制到十进制的转换
  3. MFC CEdit 自动换行功能
  4. 新手必看 | RVB2601开发板快速上手指南
  5. WTM框架使用技巧之:CI/DI(持续集成/持续部署)
  6. php 图片 3d旋转图片,html5实现图片的3D旋转效果
  7. python合并数组输出重复项_python进行数组合并的方法
  8. C语言是菜鸟和大神的分水岭
  9. 信用评分系统运行原理中篇-分箱逻辑
  10. ifconfig没有ip地址_虚拟机创建后该如何获取IP地址并访问互联网实用教程
  11. 【数据结构和算法笔记】用c语言实现栈
  12. ETF的战争从未停止【文献推荐·天风金工吴先兴团队】
  13. Unicode编码在JavaScript中的作用是什么?
  14. Java并发编程之美系列汇总
  15. 微信模拟器不显示鼠标解决办法
  16. 微信小程序实现腾讯地图定位功能修改地址功能
  17. uni-app商城源码/公众号/小程序/APP多端适配
  18. ir2104s的自举电容_有关IR2104的自举电容和NMOS选择问题 - 图文 -
  19. CSS的border属性绘制简单三角形、边框三角形
  20. eNSP配置VLAN

热门文章

  1. 常见的数据库连接字符串收集
  2. Note - Shader - 2
  3. jQuery Moblie 页面模版(基础)
  4. ubuntu 12.04安装与配置
  5. 修改node_modules的包
  6. 7-2 定义日期类 (28 分)
  7. Duplicated Numbers (10分)
  8. redis java 缓存服务器_java中对Redis的缓存进行操作
  9. 两种方法:实现输入一行字符,统计数字字符、英文字母和其他字符的个数(C语言)
  10. 申请购买计算机的报告,关于申请购买电脑的请示(最新整理)