文章目录

  • 一、Transformation 和 Action
    • 1、转换操作
    • 2、行动操作
  • 二、map、flatMap、mapParations、mapPartitionsWithIndex
    • 2.1 map
    • 2.2 flatMap
    • 3.3 mapPartitions
    • 3.4 mapPartitionsWithIndex
  • 三、reduce、reduceByKey
    • 3.1 reduce
    • 3.2 reduceByKey
  • 四、union,join和groupByKey
    • 4.1 union
    • 4.2 groupByKey
    • 4.3 join
  • 五、sample、cartesian
    • 5.1 sample
    • 5.2 cartesian
  • 六、filter、distinct、intersection
    • 6.1 filter
    • 6.2 distinct
    • 6.3 intersection
  • 七、coalesce、repartition、repartitionAndSortWithinPartitions
    • 7.1 coalesce
    • 7.2 replication
    • 7.3 repartitionAndSortWithinPartitions
  • 八、cogroup、sortBykey、aggregateByKey
    • 8.1 cogroup
    • 8.2 sortBykey
    • 8.3 aggregateByKey

一、Transformation 和 Action

RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

常用的Transformation:

1、转换操作

1、def map[U: ClassTag](f: T => U): RDD[U] 将函数应用于RDD的每一元素,并返回一个新的RDD
2、def filter(f: T => Boolean): RDD[T] 通过提供的产生boolean条件的表达式来返回符合结果为True新的RDD
3、def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] 将函数应用于RDD中的每一项,对于每一项都产生一个集合,并将集合中的元素压扁成一个集合。
4、def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] 将函数应用于RDD的每一个分区,每一个分区运行一次,函数需要能够接受Iterator类型,然后返回Iterator。
5、def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] 将函数应用于RDD中的每一个分区,每一个分区运行一次,函数能够接受 一个分区的索引值 和一个代表分区内所有数据的Iterator类型,需要返回Iterator类型。
6、def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] 在RDD中移seed为种子返回大致上有fraction比例个数据样本RDD,withReplacement表示是否采用放回式抽样。
7、def union(other: RDD[T]): RDD[T] 将两个RDD中的元素进行合并,返回一个新的RDD
8、def intersection(other: RDD[T]): RDD[T] 将两个RDD做交集,返回一个新的RDD
9、def distinct(): RDD[T] 将当前RDD进行去重后,返回一个新的RDD
10、def partitionBy(partitioner: Partitioner): RDD[(K, V)] 根据设置的分区器重新将RDD进行分区,返回新的RDD。
11、def reduceByKey(func: (V, V) => V): RDD[(K, V)] 根据Key值将相同Key的元组的值用func进行计算,返回新的RDD
12、def groupByKey(): RDD[(K, Iterable[V])] 将相同Key的值进行聚集,输出一个(K, Iterable[V])类型的RDD
13、def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] 根据key分别使用CreateCombiner和mergeValue进行相同key的数值聚集,通过mergeCombiners将各个分区最终的结果进行聚集。
14、def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] 通过seqOp函数将每一个分区里面的数据和初始值迭代带入函数返回最终值,comOp将每一个分区返回的最终值根据key进行合并操作。
15、def foldByKey(
zeroValue: V,
partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] aggregateByKey的简化操作,seqop和combop相同,
16、def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
17、def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 底层实现还是使用sortByKey,只不过使用fun生成的新key进行排序。
18、def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD,但是需要注意的是,他只会返回key在两个RDD中都存在的情况。
19、def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))] 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD,注意,如果V和W的类型相同,也不放在一块,还是单独存放。
20、def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] 做两个RDD的笛卡尔积,返回对偶的RDD
21、def pipe(command: String): RDD[String] 对于每个分区,都执行一个perl或者shell脚本,返回输出的RDD,注意,如果你是本地文件系统中,需要将脚本放置到每个节点上。
22、def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] 缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
23、def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] 根据你传入的分区数重新通过网络分区所有数据,重型操作。
24、def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] 性能要比repartition要高。在给定的partitioner内部进行排序
25、def glom(): RDD[Array[T]] 将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
26、def mapValues[U](f: V => U): RDD[(K, U)] 将函数应用于(k,v)结果中的v,返回新的RDD
27、def subtract(other: RDD[T]): RDD[T] 计算差的一种函数去除两个RDD中相同的元素,不同的RDD将保留下来。

2、行动操作

1、def takeSample( withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T] 抽样但是返回一个scala集合。
2、def reduce(f: (T, T) => T): T 通过func函数聚集RDD中的所有元素
3、def collect(): Array[T] 在驱动程序中,以数组的形式返回数据集的所有元素
4、def count(): Long 返回RDD中的元素个数
5、def first(): T 返回RDD中的第一个元素
6、def take(num: Int): Array[T] 返回RDD中的前n个元素
7、def takeOrdered(num: Int)(implicit ord: Ordering[T]) 返回前几个的排序
8、def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。
9、def fold(zeroValue: T)(op: (T, T) => T): T 折叠操作,aggregate的简化操作,seqop和combop一样。
10、def saveAsTextFile(path: String): Unit 将RDD以文本文件的方式保存到本地或者HDFS中
11、def saveAsObjectFile(path: String): Unit 将RDD中的元素以序列化后对象形式保存到本地或者HDFS中。
12、def countByKey(): Map[K, Long] 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
13、def foreach(f: T => Unit): Unit 在数据集的每一个元素上,运行函数func进行更新。
注意:当你在RDD中使用到了class的方法或者属性的时候,该class需要继承java.io.Serializable接口,或者可以将属性赋值为本地变量来防止整个对象的传输。

二、map、flatMap、mapParations、mapPartitionsWithIndex

基本的初始化

private val conf: SparkConf = new SparkConf().setAppName("TestTransformation").setMaster("local")
private val sparkContext = new SparkContext(conf)

2.1 map

map十分容易理解,他是将源JavaRDD的一个一个元素的传入call方法,并经过算法后一个一个的返回从而生成一个新的JavaRDD。

def map(): Unit ={val list = List("张无忌", "赵敏", "周芷若")val listRDD = sc.parallelize(list)val nameRDD = listRDD.map(name => "Hello " + name)nameRDD.foreach(name => println(name))}


可以看出,对于map算子,源JavaRDD的每个元素都会进行计算,由于是依次进行传参,所以他是有序的,新RDD的元素顺序与源RDD是相同的。而由有序又引出接下来的flatMap。

2.2 flatMap

flatMap与map一样,是将RDD中的元素依次的传入call方法,他比map多的功能是能在任何一个传入call方法的元素后面添加任意多元素,而能达到这一点,正是因为其进行传参是依次进行的。

def flatMap(): Unit ={val list = List("张无忌 赵敏","宋青书 周芷若")val listRDD = sc.parallelize(list)val nameRDD = listRDD.flatMap(line => line.split(" ")).map(name => "Hello " + name)nameRDD.foreach(name => println(name))}


flatMap的特性决定了这个算子在对需要随时增加元素的时候十分好用,比如在对源RDD查漏补缺时。

map和flatMap都是依次进行参数传递的,但有时候需要RDD中的两个元素进行相应操作时(例如:算存款所得时,下一个月所得的利息是要原本金加上上一个月所得的本金的),这两个算子便无法达到目的了,这是便需要mapPartitions算子,他传参的方式是将整个RDD传入,然后将一个迭代器传出生成一个新的RDD,由于整个RDD都传入了,所以便能完成前面说的业务。

3.3 mapPartitions

def mapParations(): Unit ={val list = List(1,2,3,4,5,6)val listRDD = sc.parallelize(list,2)listRDD.mapPartitions(iterator => {val newList: ListBuffer[String] = ListBuffer()while (iterator.hasNext){newList.append("hello " + iterator.next())}newList.toIterator}).foreach(name => println(name))}

3.4 mapPartitionsWithIndex

每次获取和处理的就是一个分区的数据,并且知道处理的分区的分区号是啥?

def mapPartitionsWithIndex(): Unit ={val list = List(1,2,3,4,5,6,7,8)sc.parallelize(list).mapPartitionsWithIndex((index,iterator) => {val listBuffer:ListBuffer[String] = new ListBufferwhile (iterator.hasNext){listBuffer.append(index+"_"+iterator.next())}listBuffer.iterator},true).foreach(println(_))}

三、reduce、reduceByKey

3.1 reduce

reduce其实是讲RDD中的所有元素进行合并,当运行call方法时,会传入两个参数,在call方法中将两个参数合并后返回,而这个返回值回合一个新的RDD中的元素再次传入call方法中,继续合并,直到合并到只剩下一个元素时。

def reduce(): Unit ={val list = List(1,2,3,4,5,6)val listRDD = sc.parallelize(list)val result = listRDD.reduce((x,y) => x+y)println(result)}

3.2 reduceByKey

reduceByKey仅将RDD中所有K,V对中K值相同的V进行合并。

def reduceByKey(): Unit ={val list = List(("武当", 99), ("少林", 97), ("武当", 89), ("少林", 77))val mapRDD = sc.parallelize(list)val resultRDD = mapRDD.reduceByKey(_+_)resultRDD.foreach(tuple => println("门派: " + tuple._1 + "->" + tuple._2))}

四、union,join和groupByKey

4.1 union

当要将两个RDD合并时,便要用到union和join,其中union只是简单的将两个RDD累加起来,可以看做List的addAll方法。就想List中一样,当使用union及join时,必须保证两个RDD的泛型是一致的。

def union(): Unit ={val list1 = List(1,2,3,4)val list2 = List(3,4,5,6)val rdd1 = sc.parallelize(list1)val rdd2 = sc.parallelize(list2)rdd1.union(rdd2).foreach(println(_))}

4.2 groupByKey

union只是将两个RDD简单的累加在一起,而join则不一样,join类似于hadoop中的combin操作,只是少了排序这一段,再说join之前说说groupByKey,因为join可以理解为union与groupByKey的结合:groupBy是将RDD中的元素进行分组,组名是call方法中的返回值,而顾名思义groupByKey是将PairRDD中拥有相同key值得元素归为一组。即:

def groupByKey(): Unit ={val list = List(("武当", "张三丰"), ("峨眉", "灭绝师太"), ("武当", "宋青书"), ("峨眉", "周芷若"))val listRDD = sc.parallelize(list)val groupByKeyRDD = listRDD.groupByKey()groupByKeyRDD.foreach(t => {val menpai = t._1val iterator = t._2.iteratorvar people = ""while (iterator.hasNext) people = people + iterator.next + " "println("门派:" + menpai + "人员:" + people)})}

4.3 join

join是将两个PairRDD合并,并将有相同key的元素分为一组,可以理解为groupByKey和Union的结合

def join(): Unit = {val list1 = List((1, "东方不败"), (2, "令狐冲"), (3, "林平之"))val list2 = List((1, 99), (2, 98), (3, 97))val list1RDD = sc.parallelize(list1)val list2RDD = sc.parallelize(list2)val joinRDD = list1RDD.join(list2RDD)joinRDD.foreach(t => println("学号:" + t._1 + " 姓名:" + t._2._1 + " 成绩:" + t._2._2))}

五、sample、cartesian

5.1 sample

    /*** sample用来从RDD中抽取样本。他有三个参数* withReplacement: Boolean,*       true: 有放回的抽样*       false: 无放回抽象* fraction: Double:*      抽取样本的比例* seed: Long:*      随机种子*/
  def sample(): Unit ={val list = 1 to 100val listRDD = sc.parallelize(list)listRDD.sample(false,0.1,0).foreach(num => print(num + " "))}

5.2 cartesian

cartesian是用于求笛卡尔积的

def cartesian(): Unit ={val list1 = List("A","B")val list2 = List(1,2,3)val list1RDD = sc.parallelize(list1)val list2RDD = sc.parallelize(list2)list1RDD.cartesian(list2RDD).foreach(t => println(t._1 +"->"+t._2))}

六、filter、distinct、intersection

过滤 出偶数

6.1 filter

  def filter(): Unit ={val list = List(1,2,3,4,5,6,7,8,9,10)val listRDD = sc.parallelize(list)listRDD.filter(num => num % 2 ==0).foreach(print(_))}

6.2 distinct

 def distinct(): Unit ={val list = List(1,1,2,2,3,3,4,5)sc.parallelize(list).distinct().foreach(println(_))}

6.3 intersection

def intersection(): Unit ={val list1 = List(1,2,3,4)val list2 = List(3,4,5,6)val list1RDD = sc.parallelize(list1)val list2RDD = sc.parallelize(list2)list1RDD.intersection(list2RDD).foreach(println(_))}

七、coalesce、repartition、repartitionAndSortWithinPartitions

7.1 coalesce

分区数由多 -》 变少

def coalesce(): Unit = {val list = List(1,2,3,4,5,6,7,8,9)sc.parallelize(list,3).coalesce(1).foreach(println(_))}

7.2 replication

进行重分区,解决的问题:本来分区数少 -》 增加分区数

def replication(): Unit ={val list = List(1,2,3,4)val listRDD = sc.parallelize(list,1)listRDD.repartition(2).foreach(println(_))}

7.3 repartitionAndSortWithinPartitions

repartitionAndSortWithinPartitions函数是repartition函数的变种,与repartition函数不同的是,repartitionAndSortWithinPartitions在给定的partitioner内部进行排序,性能比repartition要高。

def repartitionAndSortWithinPartitions(): Unit ={val list = List(1, 4, 55, 66, 33, 48, 23)val listRDD = sc.parallelize(list,1)listRDD.map(num => (num,num)).repartitionAndSortWithinPartitions(new HashPartitioner(2)).mapPartitionsWithIndex((index,iterator) => {val listBuffer: ListBuffer[String] = new ListBufferwhile (iterator.hasNext) {listBuffer.append(index + "_" + iterator.next())}listBuffer.iterator},false).foreach(println(_))}


八、cogroup、sortBykey、aggregateByKey

8.1 cogroup

对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并。

def cogroup(): Unit ={val list1 = List((1, "www"), (2, "bbs"))val list2 = List((1, "cnblog"), (2, "cnblog"), (3, "very"))val list3 = List((1, "com"), (2, "com"), (3, "good"))val list1RDD = sc.parallelize(list1)val list2RDD = sc.parallelize(list2)val list3RDD = sc.parallelize(list3)list1RDD.cogroup(list2RDD,list3RDD).foreach(tuple =>println(tuple._1 + " " + tuple._2._1 + " " + tuple._2._2 + " " + tuple._2._3))}

8.2 sortBykey

sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions中实现的,实现如下

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[(K, V)] =
{val part = new RangePartitioner(numPartitions, self, ascending)new ShuffledRDD[K, V, V](self, part).setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

从函数的实现可以看出,它主要接受两个函数,含义和sortBy一样,这里就不进行解释了。该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。其实这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围Key数据分到同一个partition中,然后内部用到了mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。下面对sortByKey的使用进行说明:

def sortByKey(): Unit ={val list = List((99, "张三丰"), (96, "东方不败"), (66, "林平之"), (98, "聂风"))sc.parallelize(list).sortByKey(false).foreach(tuple => println(tuple._2 + "->" + tuple._1))}

8.3 aggregateByKey

aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey函数最终返回的类型还是Pair RDD,对应的结果是Key和聚合好的值;而aggregate函数直接是返回非RDD的结果,这点需要注意。在实现过程中,定义了三个aggregateByKey函数原型,但最终调用的aggregateByKey函数都一致。

def aggregateByKey(): Unit ={val list = List("you,jump", "i,jump")sc.parallelize(list).flatMap(_.split(",")).map((_, 1)).aggregateByKey(0)(_+_,_+_).foreach(tuple =>println(tuple._1+"->"+tuple._2))}

【Spark】(五)Spark Transformation(转换算子) 和 Action(执行算子)相关推荐

  1. Spark Transformation转换算子和Action行动算子

    1.Transformation转换算子 RDD整体上分为Value类型.双Value类型和Key-Value类型 1.1,Value类型 1.1.1,map()映射 object value01_m ...

  2. spark的RDD中的action(执行)和transformation(转换)两种操作中常见函数介绍

    参考文章:spark的RDD中的action(执行)和transformation(转换)两种操作中常见函数介绍 spark常见的RDD 1. 函数概览 2. 常见的Transformations 操 ...

  3. Spark转换算子和执行算子

    在Spark编程模式下,所有的操作被分为转换(transformation)和执行(action)两种. 一般来说,转换操作是对一个数据集里的所有记录执行某种函数,从而使记录发生改变;而执行通常是运行 ...

  4. 【Spark】Spark的常用算子

    Spark的常用算子 目录内容 Spark的常用算子 一.转换算子(Transformation) 二.行动算子(Action) 三.键值对算子(PairRDDFunctions) 四.文件系统算子( ...

  5. Spark-----Spark 与 Hadoop 对比,Spark 集群搭建与示例运行,RDD算子简单入门

    目录 一.Spark 概述 1.1. Spark是什么 1.2. Spark的特点(优点) 1.3. Spark组件 1.4. Spark和Hadoop的异同 二.Spark 集群搭建 2.1. Sp ...

  6. Spark入门(五)——Spark Streaming

    Spark Streaming(流处理) Spark Streaming(流处理) 什么是流处理? 快速入门 概念介绍 初始化 StreamingContext Discretized Streams ...

  7. Spark源码阅读(五) --- Spark的支持的join方式以及join策略

    版本变动 2021-08-30 增加了对Broadcast Hash Join小表大小的评估内容 增加了对Sort Merge Join优于Shuffle Hash Join调用的解释 目录 Spar ...

  8. 基于Spark的数据清洗与转换

    基于Spark的数据清洗与转换 一.实验目的 二.实验内容 三.实验原理 四.实验环境 五.实验步骤 5.1 启动HDFS集群.Spark集群和Zeppelin服务器 5.2 准备实验数据 5.3 数 ...

  9. Spark五之Structured-Streaming

    文章目录 Structured Streaming 简介 快速入门 Programming Model(编程模型) 1.输入表 2.结果表 3.输出方式 Kafka Source Foreach(单行 ...

  10. 大数据管理技术 | 实习五 Spark软件栈体验

    文章目录 实习五 Spark软件栈体验 Spark安装与启动 1.Spark RDD-WordCount 2.Spark SQL 3.Spark MLlib之Titanic 4.GraphX再现Pag ...

最新文章

  1. Javascript内置对象新增接口列表
  2. 今晚 8 点直播 | OpenCV 20 年,首款开源软硬一体的 OAK 套件来了!
  3. 6.Mybatis中的动态Sql和Sql片段(Mybatis的一个核心)
  4. 创业第一天,有三AI扔出了深度学习的150多篇文章和10多个专栏
  5. SQL语言学习(五)流程控制函数学习
  6. 使用border-color设置输入框边框颜色后颜色不一致(左上边自动深色)解决方案
  7. 从PHP5到PHP7自我封装MongoDB以及平滑升级
  8. 第2章 状态机思维与状态机变量
  9. 剑指offer之把二叉树打印成多行
  10. 软件工程第三次作业(最大子段和)
  11. EndNote文献输出引用格式自定义修改与编辑界面解读
  12. [现代控制理论]8.5_线性控制器设计_轨迹跟踪simulink
  13. M7贴片二极管可以与什么型号的二极管通用?
  14. Node.Js实现最最最简单的登录注册
  15. 行为识别---不同模型的帧采样策略
  16. 【风马一族_构思】时光穿梭机
  17. Spring Boot 注解原理,自动装配原理,图文并茂,万字长文!
  18. vue路由懒加载写法
  19. Excel基础学习(2013及以上版本)
  20. 空客为重庆四条地铁线路部署TETRA系统 助力实施跨线运营

热门文章

  1. 微信小程序onShareTimeline()分享朋友圈功能
  2. ElastiSearch与Solr和Lucene
  3. 明明表没锁,却报SQL 错误: ORA-04021: 等待锁定对象 时发生超时
  4. 螃蟹芯片RTL8762之修改蓝牙设备类型
  5. python给一个不多于 5 位的正整数,要求:一、求它是几位数,二、逆序打印出各位数字
  6. 2021年西式面点师(初级)及西式面点师(初级)模拟试题
  7. 学习SEO第一天[笔记不易]
  8. 电脑黑屏,只有鼠标光标
  9. 飞书文档如何添加代码块 ```
  10. Linux模块(2) - 创建设备节点