一、行动算子定义

spark的算子可以分为trans action算子 以及 action算子 ,即变换/转换 算子。如果执行一个RDD算子并不触发作业的提交,仅仅只是记录作业中间处理过程,那么这就是trans action算子 ,相反如果执行这个 RDD 时会触发 Spark Context 提交 Job 作业,那么它就是 action算子及行动算子。
总结来说就是在Spark中,转换算子并不会马上进行运算的,即所谓的“惰性运算”,而是在遇到行动算子时才会执行相应的语句的,触发Spark的任务调度并开始进行计算。
我们可以将行动算子分为两类:

  • 1,数据运算类:主要用于触发RDD计算,并得到计算结果返回给Spark程序或Shell界面;
  • 2,数据存储类:用于触发RDD计算后,将结果保存到外部存储系统中,如HDFS文件系统或数据库。

二、总览

一、数据运算类:
1、reduce              将rdd中的数据进行聚合,先进行分区内聚合,在进行分区间聚合
2、collect             将rdd中的数据按分区号采集,并以数组的形式返回所有数据
3、collectAsMap        收集Key/Value型RDD中的元素,并以map的形式返回数据
4、foreach             循环遍历分区内数据,该算子执行位置是在Executor端
5、count               计算rdd中数据个数
6、first               取rdd中数据的第一个
7、take                取rdd中数据的前num个
8、takeOrdered         将rdd中的数据进行排序后取前num个
9、aggregate           类似于aggregateByKey算子,同样两个参数列表,分别传递初始值和分区内计算规则和分区间计算规则。
10、fold               简化版的aggregate,分区内计算规则和分区间计算规则一样。
11、countByKey         根据键值对中的key进行计数,返回一个map,对应了每个key在rdd中出现的次数。
12、countByValue       根据rdd中数据的数据值进行计数,注不是键值对中的value,同样返回一个map,对应每个数据出现的次数。
13、max                求rdd中数据的最大值
14、min                求rdd中数据的最小值
二、数据存储类:
1、saveAsTextFile      存储为文本文件
2、saveAsObjectFile    存储为二进制文件
3、saveAsSequenceFile  要求数据必须为<k,v>类型, 保存为 Sequencefile文件

注:sequenceFile文件是Hadoop用来存储二进制形式的 (Key,Value) 对而设计的一种平面文件。详细可以看这篇文章了解:链接

三、数据运算类action算子

1、reduce

通过传入的方法聚集rdd中所有的元素,先聚合分区内的数据,再聚合分区间的数据
def reduce(f: (T, T) => T): T

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val count: Int = rdd.reduce((_: Int) + (_: Int))

2、collect

数据采集,以数组Array的形式按分区顺序返回数据集中的所有元素
def collect(): Array[T]

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val ints: Array[Int] = rdd.collect()
println(ints.mkString(","))

3、collectAsMap

收集Key/Value型RDD中的元素,并以map的形式返回数据
注:只有key/value类型的RDD才有这个方法
def collectAsMap(): Map[K, V]

val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("c", 1)))
val map: collection.Map[String, Int] = rdd2.collectAsMap()
println(map.mkString(","))

4、foreach

循环遍历分区内数据,该算子执行位置是在Executor端
def foreach(f: T => Unit): Unit

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
rdd.collect().foreach(print)
println()
println("********************")
rdd.foreach(print)

5、count

返回rdd中元素的个数,即collect返回的数组的长度
def count(): Long

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val count: Long = rdd.count()
println(count)

6、first

返回rdd中的第一个元素,即collect返回的数组的第一个元素
def first(): T

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val first: Int = rdd.first()
println(first)

7、take

返回rdd中的前n个元素,即collect返回的数组的前n个元素
def take(num: Int): Array[T]

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val ints: Array[Int] = rdd.take(3)
println(ints.mkString(","))

8、takeOrdered

返回rdd中排序后的前n个元素
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

val rdd: RDD[Int] = sc.makeRDD(List(4, 2, 1, 3))
val ints: Array[Int] = rdd.takeOrdered(3)
println(ints.mkString(","))

9、aggregate

与aggregateByKey类似,需要传入两个参数列表,列表元素意义也相同

  • 第一个列表,传入初始的比较值
  • 第二个参数列表传入两个函数,分别表示分区内计算规则和分区间计算规则

aggregateByKey:初始值只会参与分区内计算
aggregate:初始值既会参与分区内计算也会参与分区间计算
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val res: Int = rdd.aggregate(10)((_: Int) + (_: Int), (_: Int) + (_: Int))
// [1, 2] => 10 + 1 + 2 => 13
// [3, 4] => 10 + 3 + 4 => 17
// [13, 14] => 10 + 13 + 17 = 40
println(res)

10、fold

类似于foldByKey,即当aggregate的分区内和分区间计算规则相同时可以简化使用fold,只需要传入一个计算规则
def fold(zeroValue: T)(op: (T, T) => T): T

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val res: Int = rdd.fold(10)((_: Int) + (_: Int))
//[1, 2] => 10 + 1 + 2 => 13
//[3, 4] => 10 + 3 + 4 => 17
//[13, 14] => 10 + 13 + 17 = 40
println(res)

11、countByKey

用于统计键值对类型的数据中每个key出现的个数
def countByKey(): Map[K, Long]

val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("c", 1)))
val res: collection.Map[String, Long] = rdd.countByKey()
println(res)

12、countByValue

根据rdd中数据的数据值进行计数,注不是键值对中的value,同样返回一个map,对应每个数据出现的次数。
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("c", 1)))
val res1: collection.Map[Int, Long] = rdd.countByValue()
val res2: collection.Map[(String, Int), Long] = rdd2.countByValue()
println(res1)
println(res2)

13、max && min

返回rdd数据集中的最大值/最小值

def max()(implicit ord: Ordering[T]): T = withScope {this.reduce(ord.max)
}
def min()(implicit ord: Ordering[T]): T = withScope {this.reduce(ord.min)
}
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("c", 1)))
println(rdd.max())
println(rdd2.max())
println(rdd.min())
println(rdd2.min())

SparkRDD——行动算子相关推荐

  1. SparkRDD常用算子实践(附运行效果图)

    目录 1.简单算子说明 2.复杂算子说明 目录 SparkRDD算子分为两类:Transformation与Action. Transformation:即延迟加载数据,Transformation会 ...

  2. SparkRDD——转换算子

    转换算子 一.单value型转换算子(只使用1个RDD): 1.map 将数据进行转换,数据量不会增加和减少 2.mapPartitions 以分区为单位将一个分区内的数据进行批处理操作,且可以执行过 ...

  3. Spark考试题总结

    一.选择判断 1.单选 下面哪个不是 RDD 的特点 ( ) A.可分区 B.可序列化 C.可修改 D.可持久化 关于累加器,下面哪个是错误的 ( ) A.支持加法  B.支持数值类型 C.可并行  ...

  4. 第三课 大数据技术之Spark-RDD介绍和转换算子

    第三课 大数据技术之Spark-RDD介绍和转换算子 文章目录 第三课 大数据技术之Spark-RDD介绍和转换算子 第一节 RDD相关介绍 1.1 什么是 RDD 1.2 核心属性 1.3 执行原理 ...

  5. SparkRDD算子--mapPartitionsWithIndex算子

    语法 val newRdd = oldRdd.mapPartitionsWithIndex{case (num, datas) => {func}} 源码 def mapPartitionsWi ...

  6. sparkRDD算子数据处理实践

    文章目录 各算子实例 部分算子处理过程的解释(取于大作业答辩ppt的一部分): 编写词频统计程序 创建一个测试文件,RddTest,内容如下: [root@hadoop01 ~]# cd /expor ...

  7. spark笔记之RDD常用的算子操作

    Spark Rdd的所有算子操作,请见<sparkRDD函数详解.docx> 启动spark-shell 进行测试: spark-shell --master spark://node1: ...

  8. 大数据——Spark RDD常用算子总结

    Spark的核心是建立在同一的抽象弹性分布式数据集(Resilient Distributed Datasets,RDD)之上的,这使得Spark的各个组件可以无缝的进行集成,能够在同一个应用程序中完 ...

  9. 写算子单元测试Writing Unit Tests

    写算子单元测试Writing Unit Tests! 一些单元测试示例,可在tests/python/relay/test_op_level3.py中找到,用于累积总和与乘积算子. 梯度算子 梯度算子 ...

最新文章

  1. Struts2中action接收参数的三种方法及ModelDriven跟Preparable接口结合JAVA反射机制的灵活用法...
  2. python爬虫requests-Python爬虫之requests介绍
  3. Port already in use: 1099;
  4. hive报错:hive create table: Specified key was too long; max key length is 767 bytes
  5. 就计算机结构与课程的论文,关于计算机组成原理的课程论文(2)
  6. Mac 系统如何修改python的IDLE默认模块导入路径。
  7. excel制作跨职能流程图_一款小白轻松上手流程图绘制工具亿图图示
  8. 如何通过Tik Tok月入2w美金
  9. 实例079RTF文件的保存
  10. 【Android Studio】XUI框架的使用记录:源代码Demo安装+从Demo中获取捷径快速开发自己的APP
  11. java.lang.IllegalArgumentException: requirement failed: indices should be one-based and in ascending
  12. 二阶魔方高级玩法公式
  13. html水晶按钮图片,20个纯CSS3实现的彩色透明水晶按钮
  14. odi oracle to mysql_【ODI】| 数据ETL:从零开始使用Oracle ODI完成数据集成(三)
  15. Barefoot P4加速SDN
  16. 平面离散点集Delaunay三角化
  17. 基于Qt的笛卡尔心形表白程序
  18. web网页设计期末课程大作业——汉中印象旅游景点介绍网页设计与实现19页面HTML+CSS+JavaScript
  19. 10个有效管理人员的 “黄金 “法则|优思学院
  20. 【现代控制理论】传递函数建立状态空间表达式

热门文章

  1. mysql locate用法,LOCATE()函数如何与MySQL WHERE子句一起使用?
  2. C语言——结构体知识点总结
  3. 初识esp8266与在Arduino的环境配置
  4. StarGAN-VC语音音色转换
  5. matplotlib基础(4)之饼图 pie
  6. 遇到VerifyError束手无策?
  7. keil如何添加STM32系列
  8. 漫谈程序员系列:找工作的辟邪剑谱
  9. 海淘 海外购 iherb 礼券码 优惠码
  10. 2019届本科计算机工资,2019届本科毕业生平均月薪出炉!