本质上在Actions算子中通过SparkContext执行提交作业的runJob操作,触发了RDD DAG的执行。 
根据Action算子的输出空间将Action算子进行分类:无输出、 HDFS、 Scala集合和数据类型。

无输出

foreach

对RDD中的每个元素都应用f函数操作,不返回RDD和Array,而是返回Uint。 

图中,foreach算子通过用户自定义函数对每个数据项进行操作。 本例中自定义函数为println,控制台打印所有数据项。

源码:

 
  1. /**

  2. * Applies a function f to all elements of this RDD.

  3. */

  4. def foreach(f: T => Unit) {

  5. val cleanF = sc.clean(f)

  6. sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))

  7. }

HDFS

(1)saveAsTextFile

函数将数据输出,存储到HDFS的指定目录。将RDD中的每个元素映射转变为(Null,x.toString),然后再将其写入HDFS。 

图中,左侧的方框代表RDD分区,右侧方框代表HDFS的Block。 通过函数将RDD的每个分区存储为HDFS中的一个Block。

源码:

 
  1. /**

  2. * Save this RDD as a text file, using string representations of elements.

  3. */

  4. def saveAsTextFile(path: String) {

  5. // https://issues.apache.org/jira/browse/SPARK-2075

  6. //

  7. // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit

  8. // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`

  9. // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an

  10. // Ordering for `NullWritable`. That's why the compiler will generate different anonymous

  11. // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.

  12. //

  13. // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate

  14. // same bytecodes for `saveAsTextFile`.

  15. val nullWritableClassTag = implicitly[ClassTag[NullWritable]]

  16. val textClassTag = implicitly[ClassTag[Text]]

  17. val r = this.mapPartitions { iter =>

  18. val text = new Text()

  19. iter.map { x =>

  20. text.set(x.toString)

  21. (NullWritable.get(), text)

  22. }

  23. }

  24. RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)

  25. .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)

  26. }

  27. /**

  28. * Save this RDD as a compressed text file, using string representations of elements.

  29. */

  30. def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {

  31. // https://issues.apache.org/jira/browse/SPARK-2075

  32. val nullWritableClassTag = implicitly[ClassTag[NullWritable]]

  33. val textClassTag = implicitly[ClassTag[Text]]

  34. val r = this.mapPartitions { iter =>

  35. val text = new Text()

  36. iter.map { x =>

  37. text.set(x.toString)

  38. (NullWritable.get(), text)

  39. }

  40. }

  41. RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)

  42. .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)

  43. }

(2)saveAsObjectFile

saveAsObjectFile将分区中的每10个元素组成一个Array,然后将这个Array序列化,映射为(Null,BytesWritable(Y))的元素,写入HDFS为SequenceFile的格式。

图中,左侧方框代表RDD分区,右侧方框代表HDFS的Block。 通过函数将RDD的每个分区存储为HDFS上的一个Block。

源码:

 
  1. /**

  2. * Save this RDD as a SequenceFile of serialized objects.

  3. */

  4. def saveAsObjectFile(path: String) {

  5. this.mapPartitions(iter => iter.grouped(10).map(_.toArray))

  6. .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))

  7. .saveAsSequenceFile(path)

  8. }

Scala集合和数据类型

(1)collect

collect相当于toArray,toArray已经过时不推荐使用,collect将分布式的RDD返回为一个单机的scala Array数组。 在这个数组上运用scala的函数式操作。

图中,左侧方框代表RDD分区,右侧方框代表单机内存中的数组。通过函数操作,将结果返回到Driver程序所在的节点,以数组形式存储。

源码:

 
  1. /**

  2. * Return an array that contains all of the elements in this RDD.

  3. */

  4. def collect(): Array[T] = {

  5. val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)

  6. Array.concat(results: _*)

  7. }

(2)collectAsMap

collectAsMap对(K,V)型的RDD数据返回一个单机HashMap。对于重复K的RDD元素,后面的元素覆盖前面的元素。 

图中,左侧方框代表RDD分区,右侧方框代表单机数组。数据通过collectAsMap函数返回给Driver程序计算结果,结果以HashMap形式存储。

源码:

 
  1. /**

  2. * Return the key-value pairs in this RDD to the master as a Map.

  3. *

  4. * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only

  5. * one value per key is preserved in the map returned)

  6. */

  7. def collectAsMap(): Map[K, V] = {

  8. val data = self.collect()

  9. val map = new mutable.HashMap[K, V]

  10. map.sizeHint(data.length)

  11. data.foreach { pair => map.put(pair._1, pair._2) }

  12. map

  13. }

(3)reduceByKeyLocally

实现的是先reduce再collectAsMap的功能,先对RDD的整体进行reduce操作,然后再收集所有结果返回为一个HashMap。

源码:

 
  1. /**

  2. * Merge the values for each key using an associative reduce function, but return the results

  3. * immediately to the master as a Map. This will also perform the merging locally on each mapper

  4. * before sending results to a reducer, similarly to a "combiner" in MapReduce.

  5. */

  6. def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {

  7. if (keyClass.isArray) {

  8. throw new SparkException("reduceByKeyLocally() does not support array keys")

  9. }

  10. val reducePartition = (iter: Iterator[(K, V)]) => {

  11. val map = new JHashMap[K, V]

  12. iter.foreach { pair =>

  13. val old = map.get(pair._1)

  14. map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))

  15. }

  16. Iterator(map)

  17. } : Iterator[JHashMap[K, V]]

  18. val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {

  19. m2.foreach { pair =>

  20. val old = m1.get(pair._1)

  21. m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))

  22. }

  23. m1

  24. } : JHashMap[K, V]

  25. self.mapPartitions(reducePartition).reduce(mergeMaps)

  26. }

(4)lookup

Lookup函数对(Key,Value)型的RDD操作,返回指定Key对应的元素形成的Seq。这个函数处理优化的部分在于,如果这个RDD包含分区器,则只会对应处理K所在的分区,然后返回由(K,V)形成的Seq。如果RDD不包含分区器,则需要对全RDD元素进行暴力扫描处理,搜索指定K对应的元素。

图中,左侧方框代表RDD分区,右侧方框代表Seq,最后结果返回到Driver所在节点的应用中。

源码:

 
  1. /**

  2. * Return the list of values in the RDD for key `key`. This operation is done efficiently if the

  3. * RDD has a known partitioner by only searching the partition that the key maps to.

  4. */

  5. def lookup(key: K): Seq[V] = {

  6. self.partitioner match {

  7. case Some(p) =>

  8. val index = p.getPartition(key)

  9. val process = (it: Iterator[(K, V)]) => {

  10. val buf = new ArrayBuffer[V]

  11. for (pair <- it if pair._1 == key) {

  12. buf += pair._2

  13. }

  14. buf

  15. } : Seq[V]

  16. val res = self.context.runJob(self, process, Array(index), false)

  17. res(0)

  18. case None =>

  19. self.filter(_._1 == key).map(_._2).collect()

  20. }

  21. }

(5)count

count返回整个RDD的元素个数。 

图中,返回数据的个数为5。一个方块代表一个RDD分区。

源码:

 
  1. /**

  2. * Return the number of elements in the RDD.

  3. */

  4. def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

(6)top

top可返回最大的k个元素。 
相近函数说明:

  • top返回最大的k个元素。
  • take返回最小的k个元素。
  • takeOrdered返回最小的k个元素, 并且在返回的数组中保持元素的顺序。
  • first相当于top( 1) 返回整个RDD中的前k个元素, 可以定义排序的方式Ordering[T]。返回的是一个含前k个元素的数组。

源码:

 
  1. /**

  2. * Returns the top k (largest) elements from this RDD as defined by the specified

  3. * implicit Ordering[T]. This does the opposite of [[takeOrdered]]. For example:

  4. * {{{

  5. * sc.parallelize(Seq(10, 4, 2, 12, 3)).top(1)

  6. * // returns Array(12)

  7. *

  8. * sc.parallelize(Seq(2, 3, 4, 5, 6)).top(2)

  9. * // returns Array(6, 5)

  10. * }}}

  11. *

  12. * @param num k, the number of top elements to return

  13. * @param ord the implicit ordering for T

  14. * @return an array of top elements

  15. */

  16. def top(num: Int)(implicit ord: Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)

(7)reduce

reduce函数相当于对RDD中的元素进行reduceLeft函数的操作。 
reduceLeft先对两个元素

 
  1. /**

  2. * Reduces the elements of this RDD using the specified commutative and

  3. * associative binary operator.

  4. */

  5. def reduce(f: (T, T) => T): T = {

  6. val cleanF = sc.clean(f)

  7. val reducePartition: Iterator[T] => Option[T] = iter => {

  8. if (iter.hasNext) {

  9. Some(iter.reduceLeft(cleanF))

  10. } else {

  11. None

  12. }

  13. }

  14. var jobResult: Option[T] = None

  15. val mergeResult = (index: Int, taskResult: Option[T]) => {

  16. if (taskResult.isDefined) {

  17. jobResult = jobResult match {

  18. case Some(value) => Some(f(value, taskResult.get))

  19. case None => taskResult

  20. }

  21. }

  22. }

  23. sc.runJob(this, reducePartition, mergeResult)

  24. // Get the final result out of our Option, or throw an exception if the RDD was empty

  25. jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))

  26. }

(8)fold

fold和reduce的原理相同,但是与reduce不同,相当于每个reduce时,迭代器取的第一个元素是zeroValue。 

图中,通过用户自定义函数进行fold运算,图中的一个方框代表一个RDD分区。

源码:

 
  1. /**

  2. * Aggregate the elements of each partition, and then the results for all the partitions, using a

  3. * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to

  4. * modify t1 and return it as its result value to avoid object allocation; however, it should not

  5. * modify t2.

  6. */

  7. def fold(zeroValue: T)(op: (T, T) => T): T = {

  8. // Clone the zero value since we will also be serializing it as part of tasks

  9. var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())

  10. val cleanOp = sc.clean(op)

  11. val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)

  12. val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)

  13. sc.runJob(this, foldPartition, mergeResult)

  14. jobResult

  15. }

(9)aggregate

aggregate先对每个分区的所有元素进行aggregate操作,再对分区的结果进行fold操作。 
aggreagate与fold和reduce的不同之处在于,aggregate相当于采用归并的方式进行数据聚集,这种聚集是并行化的。 而在fold和reduce函数的运算过程中,每个分区中需要进行串行处理,每个分区串行计算完结果,结果再按之前的方式进行聚集,并返回最终聚集结果。

图中,通过用户自定义函数对RDD 进行aggregate的聚集操作,图中的每个方框代表一个RDD分区。

源码:

 
  1. /**

  2. * Aggregate the elements of each partition, and then the results for all the partitions, using

  3. * given combine functions and a neutral "zero value". This function can return a different result

  4. * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U

  5. * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are

  6. * allowed to modify and return their first argument instead of creating a new U to avoid memory

  7. * allocation.

  8. */

  9. def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {

  10. // Clone the zero value since we will also be serializing it as part of tasks

  11. var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())

  12. val cleanSeqOp = sc.clean(seqOp)

  13. val cleanCombOp = sc.clean(combOp)

  14. val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)

  15. val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)

  16. sc.runJob(this, aggregatePartition, mergeResult)

  17. jobResult

  18. }

原文链接:http://blog.csdn.net/jasonding1354

Spark RDD使用详解5--Action算子相关推荐

  1. Spark RDD 论文详解(五)实现

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  2. Spark RDD 论文详解(三)Spark 编程接口

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  3. Spark RDD 论文详解(二)RDDs

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  4. Spark RDD 论文详解(一)摘要和介绍

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  5. Spark RDD 论文详解(七)讨论

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  6. Spark RDD 论文详解(四)表达 RDDs

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  7. Spark RDD使用详解3--Value型Transformation算子

    处理数据类型为Value型的Transformation算子可以根据RDD变换算子的输入分区与输出分区关系分为以下几种类型: 1)输入分区与输出分区一对一型  2)输入分区与输出分区多对一型  3)输 ...

  8. Spark RDD使用详解1--RDD原理

    RDD简介 在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(Resilient Distributed Dataset,RDD).RDD是Spark的最基本抽象,是对分布式内存的抽象使 ...

  9. Spark RDD使用详解--RDD原理

    RDD简介 在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(Resilient Distributed Dataset,RDD).RDD是Spark的最基本抽象,是对分布式内存的抽象使 ...

最新文章

  1. 【转载】sourceforge上面提供的多种后缀的压缩包有何区别?
  2. 视频压缩编码 gop(Group of Pictures)(I帧间隔)的概念、IDR、I帧(关键帧,intra picture)、P帧、B帧、帧内压缩、帧间压缩、pts(显示时间)、dts(解码时间)
  3. 软件专业人才应具备四种素质
  4. 「Jupyter」ubuntu下安装jupyterlab后jupyterlab:未找到命令
  5. Java方法 传值方式
  6. [视频演示].NET Core开发的iNeuOS物联网平台,实现从设备PLC、云平台、移动APP数据链路闭环...
  7. ACM学习历程—51NOD 1685 第K大区间2(二分 树状数组 中位数)
  8. linux 搭建FTP
  9. 2021 年百度之星·程序设计大赛 - 复赛 1001 Palindrome(结论,奇偶性)
  10. ZebraDesigner3 打印到.prn文件乱码
  11. rar怎么用计算机打开,解答电脑rar文件怎么打开
  12. 美化windows xp 完全教程
  13. ffmpeg-linux录音录像
  14. vuepress-theme-reco + Github Actions 构建静态博客,部署到第三方服务器
  15. 游戏‘微信打飞机’ 第三课
  16. 如何debug Vue源码
  17. MySQL规范及优化
  18. matlab es2函数,zjdy doing的终极地狱完整版。可以说 这个版本在ES2架构中算首屈一指的,虽然他说自 matlab 238万源代码下载- www.pudn.com...
  19. php100以内质数求和,100以内的质数_PHP质数计算三种方法 php求100以内的质数
  20. 计算机路由器工作原理,路由器工作原理

热门文章

  1. 理解OSI七层模型(了解OSI七层模型,数据如何传输,封装,解封装)
  2. Linux挂载windows中的共享目录步骤及问题解决方案(步骤清晰)
  3. Exchange Server 2013 一步步安装图解
  4. c语言while可以改为when,控制流:if、when、for、while
  5. android包内存放视频,Android性能优化:手把手教你如何让App更快、更稳、更省(含内存、布局优化等)...
  6. vst3插件_2B Played Music发布用于劲爆舞曲风格的极端失真和剪辑插件2B Clipped XT
  7. c++循环执行一个函数_20川大计算机 | 时间复杂度,你避不开的一个考点
  8. bouml 逆向分析c++_JS逆向之漫画柜
  9. python短信接口_短信接口DEMO-PYTHON
  10. oracletns中不存在名称为_oracle tnsname.ora中的SERVICE_NAME 代表实例的名称还是代表全局数据库的名称?...