一.概述

  • 算子

    • 英文翻译为:Operator(简称op)
    • 狭义:指从一个函数空间到另一个函数空间(或它自身)的映射。
    • 广义:指从一个空间到另一个空间的映射
    • 通俗理解:指事物(数据或函数)从一个状态到另外一个状态的过程抽象。
    • 实质就是映射,就是关系,就是变换。
  • 算子的重要作用
    • 算子越少,灵活性越低,则实现相同功能的编程复杂度越高,算子越多则反之。
    • 算子越少,表现力越差,面对复杂场景则易用性较差。算子越多的则反之。
  • MapReduce 与 Spark算子比较
    • MapReduce只有2个算子,Map和Reduce,绝大多数应用场景下,均需要复杂编码才能达到用户需求。
    • Spark有80多个算子,进行充分的组合应用后,能满足绝大多数的应用场景。

二.算子分类

1.转换算子(Transformation)

此种算子不触发作业提交,只有作业遇到action算子后才会进行提交,提交后才会真正启动转换计算

1.value型转换算子

  • 输入与输出分区一对一型

    • map
    • flatMap
    • mapPartitions
    • glom
  • 输入与输出分区多对一型
    • union
    • cartesian
  • 输入与输出分区多对多型
    • groupBy
  • 输出分区为输出分区子集型
    • filter
    • distinct
    • subtract
    • sample
    • takeSample
    • Cache
    • cache
    • persist

2.key-value型转换算子

  • 输入与输出分区一对一型

    • mapValues
  • 对单个RDD聚集
    • combinByKey
    • reduceByKey
    • partitionBy
  • 对两个RDD聚集
    • cogroup
  • 连接
    • join
    • leftOutJoin
    • rightOutJoin

2.执行算子(Action)

这种算子会触发sparkContext提交作业

1.无输出

foreach

2.HDFS

saveAsTextFile
saveAsObjectFile

3.Scala集合和数据类型

collect
collectAsMap
reduceByKeyLocally
lookup
count
top
reduce
fold
aggregate

三.常用算子分析

1.value型转换算子

map
概述
单输入单输出,将输入进行映射(就是处理一顿)后进行输出
例子

// 创建一个list
scala> var r1 = sc.parallelize(List("hello","world","antg"))
r1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24// 每个元素都拼接一个字符串-->123然后输出
scala> r1.map(x=>x+"-->123").collect()
res1: Array[String] = Array(hello-->123, world-->123, antg-->123)

flatMap
概述
单输入单输出,与map的功能类似,但是会在最终会将结果打平成一个一维集合
例子

//创建一个嵌套的list
scala> var r2 = sc.parallelize(List(List(1,2,3),List(4,5,6)))
r2: org.apache.spark.rdd.RDD[List[Int]] = ParallelCollectionRDD[3] at parallelize at <console>:24
//输出的结果是一个一维数组
scala> r2.flatMap(x=>x).collect
res2: Array[Int] = Array(1, 2, 3, 4, 5, 6)
//对比一下map
scala> r2.map(x=>x).collect
res4: Array[List[Int]] = Array(List(1, 2, 3), List(4, 5, 6))
1

glom
概述
以分区为单位,每个分区的值将会形成一个数组
例子

scala> var r3 = sc.parallelize(List(1,2,3,4,5,6,7),4)
r3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24scala> r3.glom.collect
res5: Array[Array[Int]] = Array(Array(1), Array(2, 3), Array(4, 5), Array(6, 7))

mapPartitions
概述
以分区为单位进行计算处理,而map是以每个元素为单位进行计算处理。
当在map过程中需要频繁创建额外对象时,如文件输出流操作、jdbc操作、Socket操作等时,当用mapPartitions算子
例子

// 创建一份数据,3个分区
scala> var r4 = sc.parallelize(List(1,2,3,4,5,6),3)
r4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
//可以看到mapPartitions是按照分区进行计算的
scala> r4.mapPartitions(partition=>{var init = 10;println("antg");partition.map(x=>x+init);}).collect
antg
antg
antg
res8: Array[Int] = Array(11, 12, 13, 14, 15, 16)

union
概述
将两个rdd合成一个rdd,不去重
例子

scala> val a = sc.parallelize(1 to 4,2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24scala> val b = sc.parallelize(3 to 6,2)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24scala> a.union(b).collect
res0: Array[Int] = Array(1, 2, 3, 4, 3, 4, 5, 6)scala> (a++b).collect
res1: Array[Int] = Array(1, 2, 3, 4, 3, 4, 5, 6)scala> (a union b).collect
res2: Array[Int] = Array(1, 2, 3, 4, 3, 4, 5, 6)

groupBy
概述
按照条件重新分组,输入分区与输出分区多对多型
例子

scala> val c = sc.parallelize(Seq(1,2,3,4,5,6,100,101,102),3)
c: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24scala> c.groupBy(x=>if(x>=100) ">100" else "<100").collect
res5: Array[(String, Iterable[Int])] = Array((>100,CompactBuffer(100, 101, 102)), (<100,CompactBuffer(1, 2, 3, 4, 5, 6)))

filter
概述
按照一定条件进行过滤,输出分区为输入分区子集型
例子

scala> val d = sc.parallelize(1 to 20,4)
d: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24scala> d.filter(x=>x>=10).collect
res6: Array[Int] = Array(10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)scala> d.filter(x=>x>=10).glom.collect
res7: Array[Array[Int]] = Array(Array(), Array(10), Array(11, 12, 13, 14, 15), Array(16, 17, 18, 19, 20))

distinct
概述
去重,输出分区为输入分区子集型,全局去重
例子

scala> val e = sc.parallelize(1 to 4,2)
e: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24scala> val f = sc.parallelize(3 to 6,2)
f: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:24scala> (e++f).distinct.collect
res8: Array[Int] = Array(4, 1, 5, 6, 2, 3)scala> (e++f).distinct.glom.collect
res9: Array[Array[Int]] = Array(Array(4), Array(1, 5), Array(6, 2), Array(3))scala> (e++f).glom.collect
res10: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4), Array(3, 4), Array(5, 6))

cache
概述
cache 将 RDD 元素从磁盘缓存到内存。 相当于 persist(MEMORY_ONLY) 函数的功能。
主要应用在当RDD数据反复被使用的场景下
例子

val a = sc.parallelize(1 to 4, 2)
val b = sc.parallelize(3 to 6, 2)
val c=a.union(b).cache
c.count
c.distinct().collect

2.key-value型转换算子

mapValues
概述
输入分区与输出分区一对一
针对(Key,Value)型数据中的 Value 进行 Map 操作,而不对 Key 进行处理。
例子

scala> val r3 = sc.parallelize(List(("tom",1),("jack",2),("blus",3),("antg",4)),2)
r3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[41] at parallelize at <console>:24scala> r3.mapValues(x=>x+1).collect
res21: Array[(String, Int)] = Array((tom,2), (jack,3), (blus,4), (antg,5))

combineByKey

  • 概述

    • def combineByKey[C](createCombiner: (V) => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(String, C)]
    • createCombiner:对每个分区内的同组元素如何聚合,形成一个累加器
    • mergeValue:将前边的累加器与新遇到的值进行合并的方法
    • mergeCombiners:每个分区都是独立处理,故同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,用方法将各个分区的结果进行合并。
  • 个人理解以上的三个参数
    • 第一个 : 初始化累加器
    • 第二个 : 开始累加value
    • 第三个 : 合并分区

例子

scala> val r1 = sc.parallelize(List(("a",1),("b",2),("c",3),("b",1),("c",2),("d",4)),2)
r1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24scala> r1.combineByKey(List(_),(x:List[Int],y:Int)=>y::x,(x:List[Int],y:List[Int])=>x:::y).collect
res1: Array[(String, List[Int])] = Array((d,List(4)), (b,List(2, 1)), (a,List(1)), (c,List(3, 2))

reduceByKey
概述
按key聚合后对组进行归约处理,如求和、连接等操作
例子

scala> var r2 = sc.parallelize(Array("a","b","c","a","b"))
r2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:24scala> r2.map((_,1)).reduceByKey(_+_).collect
res2: Array[(String, Int)] = Array((a,2), (b,2), (c,1))

概述
对Key-Value结构的RDD进行按Key的join操作,最后将V部分做flatmap打平操作。
例子

scala> val r3 = sc.parallelize(List(("a",1),("b",2)),2)
r3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24scala> val r4 = sc.parallelize(List(("a",3),("b",4)),2)
r4: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24scala> r3.join(r4).collect
res3: Array[(String, (Int, Int))] = Array((b,(2,4)), (a,(1,3)))

3.执行算子

这种算子会触发sparkContext提交作业,触发RDD的DAG执行foreach
概述
无输出型,遍历每个元素
例子

scala> var r5 = sc.parallelize(1 to 5)
r5: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24scala> r5.foreach(println)
3
1
2
4
5

saveAsTextFile
概述
指定输出的路径
例子

scala> val r6 = sc.parallelize(1 to 10)
r6: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24scala> r6.filter(x=>x>=5).saveAsTextFile("file:///home/job018/fujunhua/data/spark/output01")[fujunhua@cluster1 spark]$ cd output01/
[fujunhua@cluster1 output01]$ ls
part-00000  part-00001  part-00002  part-00003  part-00004  part-00005  part-00006  part-00007  _SUCCESS
[fujunhua@cluster1 output01]$ cat ./*
5
6
7
8
9
10
  • collect

    • 概述
    • 相当于toArray操作,将分布式RDD返回成为一个scala array数组结果,实际是Driver所在的机器节点,再针对该结果操作
  • 例子
    • 这个就不举例子了,上面用到了很多
  • collectAsMap
    • 概述
    • 相当于toMap操作,将分布式RDD的kv对形式返回成为一个的scala map集合

例子

scala> val r7 = sc.parallelize(List(("a",1),("b",2)))
r7: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[15] at parallelize at <console>:24scala> r7.collectAsMap
res7: scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)

lookup
概述
对(Key,Value)型的RDD操作,返回指定Key对应的元素形成的Seq。
例子

scala> val r8 = sc.parallelize(List("小米", "华为", "华米", "大米", "苹果","米老鼠"), 2)scala> r8.map(x=>({if(x.contains("米")) "有米" else "无米"},x)).lookup("有米")
res9: Seq[String] = WrappedArray(小米, 华米, 大米, 米老鼠)

reduce
概述
先对两个元素进行reduce函数操作,然后将结果和迭代器取出的下一个元素进行reduce函数操作,直到迭代器遍历完所有元素,得到最后结果。
例子

scala> val r10 = sc.parallelize(1 to 10)
r10: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24scala> r10.reduce((x,y)=>x+y)
res12: Int = 55

fold
概述
ofold算子签名:  def fold(zeroValue: T)(op: (T, T) => T): T
其实就是先对rdd分区的每一个分区进行op函数,在调用op函数过程中将zeroValue参与计算,最后在对所有分区的结果调用op函数,同理此处zeroValue再次参与计算。
例子

//和是41,公式=(1+2+3+4+5+6+10)+10
sc.parallelize(List(1, 2, 3, 4, 5, 6), 1).fold(10)(_+_)
//和是51,公式=(1+2+3+10)+(4+5+6+10)+10=51
sc.parallelize(List(1, 2, 3, 4, 5, 6), 2).fold(10)(_+_)
//和是61,公式=(1+2+10)+(3+4+10)+(5+6+10)+10=61
sc.parallelize(List(1, 2, 3, 4, 5, 6), 3).fold(10)(_+_)

spark 常用算子相关推荐

  1. Spark 常用算子详解(转换算子、行动算子、控制算子)

    Spark简介 Spark是专为大规模数据处理而设计的快速通用的计算引擎: Spark拥有Hadoop MapReduce所具有的优点,但是运行速度却比MapReduce有很大的提升,特别是在数据挖掘 ...

  2. Spark常用算子讲解一

    map map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD,RDD之间的元素是一对一关系. >>> x = sc.parallelize([1,2,3,4]) &g ...

  3. Spark常用算子讲解二

    groupByKey groupByKey([numTasks])是数据分组操作,在一个由(K, V)键值对组成的数据集上调用,返回一个(K, Seq[V])对的数据集. 注意,如果要对每个键执行聚合 ...

  4. Spark一路火花带闪电——Spark常用算子(参数及其返回值)探究

    文章目录 转化算子 行动算子 转化算子 以数据Seq(1,2,3,3)为例子 map(f:T => U):RDD[U] 映射:将函数应用于RDD内的每个元素,将其返回值构成新的RDD rdd.m ...

  5. 2021年大数据Spark(十五):Spark Core的RDD常用算子

    目录 常用算子 基本算子 分区操作函数算子 重分区函数算子 1).增加分区函数 2).减少分区函数 3).调整分区函数 ​​​​​​​聚合函数算子 ​​​​​​​Scala集合中的聚合函数 ​​​​​ ...

  6. spark常用RDD算子 汇总(java和scala版本)

    github: https://github.com/zhaikaishun/spark_tutorial  spark RDD的算子挺多,有时候如何灵活的使用,该如何用一下子想不起来,这一段时间将s ...

  7. 【Spark】Spark的常用算子

    Spark的常用算子 目录内容 Spark的常用算子 一.转换算子(Transformation) 二.行动算子(Action) 三.键值对算子(PairRDDFunctions) 四.文件系统算子( ...

  8. spark算子_Spark常用算子

    Spark的算子分类: 从大方向说,Spark算子大致可以分为以下两类: (1)Transformation变换/转换算子:这种变换并不触发提交作业,这种算子是延迟执行的,也就是说从一个RDD转换生成 ...

  9. Spark transformation算子案例

    Spark支持两种RDD操作:transformation和action  在本文中,将对几个常用的transformation算子进行案例演示,采用Java和Scala两种语言对代码进行编写  其中 ...

最新文章

  1. CentOS: 将虚拟机迁移到 Azure (以阿里云为例)
  2. 第三章--数据链路层
  3. 总结jenkins Android自动打包遇到的坑
  4. JdbcPagingItemReader多线程的Step
  5. Go语言web框架 gin
  6. 产生信号的代码10分类
  7. python高通滤波_图像处理之高通滤波及低通滤波
  8. 无可用源 没有为任何调用堆栈加载任何符号_面试官问我JVM类加载,我笑了
  9. 利用Python绘制图案——七色花子
  10. STM32系列BSP外设驱动使用教程
  11. 免安装连接oracle,Oracle免安装PL/SQL连接
  12. Java:轻松一刻/程序员才懂的幽默
  13. halcon C++编程 第22讲 图像镜像 tcy
  14. EventBus简介与使用
  15. 非常好用的重复文件清理软件Tidy Up Mac
  16. 二重积分的计算.01
  17. 相机计算坐标公式_相机采样点的坐标转换方法与流程
  18. C# FileSystemWatcher使用说明
  19. APP高曝光率到智能化投放,SDK猫眼信息流广告的投放策略!
  20. DameWare无法远程连接解决办法

热门文章

  1. 连通图的桥(对桥和割点的理解)
  2. CSAPP大作业程序人生
  3. 5千多条多分类谚语格言ACCESS数据库
  4. QQ群发消息的笨办法
  5. 视觉SLAM_02_旋转矩阵
  6. 技术群里装偶遇撒狗粮?手起刀落人抬走!
  7. 超级马里奥Run的精妙设计
  8. CF#320 Div.2 总结
  9. fei33423 工作 职场 格言
  10. 【转】怎样将APP或PXL转为IPA格式