【Spark】(五)Spark Transformation(转换算子) 和 Action(执行算子)
文章目录
- 一、Transformation 和 Action
- 1、转换操作
- 2、行动操作
- 二、map、flatMap、mapParations、mapPartitionsWithIndex
- 2.1 map
- 2.2 flatMap
- 3.3 mapPartitions
- 3.4 mapPartitionsWithIndex
- 三、reduce、reduceByKey
- 3.1 reduce
- 3.2 reduceByKey
- 四、union,join和groupByKey
- 4.1 union
- 4.2 groupByKey
- 4.3 join
- 五、sample、cartesian
- 5.1 sample
- 5.2 cartesian
- 六、filter、distinct、intersection
- 6.1 filter
- 6.2 distinct
- 6.3 intersection
- 七、coalesce、repartition、repartitionAndSortWithinPartitions
- 7.1 coalesce
- 7.2 replication
- 7.3 repartitionAndSortWithinPartitions
- 八、cogroup、sortBykey、aggregateByKey
- 8.1 cogroup
- 8.2 sortBykey
- 8.3 aggregateByKey
一、Transformation 和 Action
RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。
常用的Transformation:
1、转换操作
1、def map[U: ClassTag](f: T => U): RDD[U] 将函数应用于RDD的每一元素,并返回一个新的RDD
2、def filter(f: T => Boolean): RDD[T] 通过提供的产生boolean条件的表达式来返回符合结果为True新的RDD
3、def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] 将函数应用于RDD中的每一项,对于每一项都产生一个集合,并将集合中的元素压扁成一个集合。
4、def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] 将函数应用于RDD的每一个分区,每一个分区运行一次,函数需要能够接受Iterator类型,然后返回Iterator。
5、def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] 将函数应用于RDD中的每一个分区,每一个分区运行一次,函数能够接受 一个分区的索引值 和一个代表分区内所有数据的Iterator类型,需要返回Iterator类型。
6、def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] 在RDD中移seed为种子返回大致上有fraction比例个数据样本RDD,withReplacement表示是否采用放回式抽样。
7、def union(other: RDD[T]): RDD[T] 将两个RDD中的元素进行合并,返回一个新的RDD
8、def intersection(other: RDD[T]): RDD[T] 将两个RDD做交集,返回一个新的RDD
9、def distinct(): RDD[T] 将当前RDD进行去重后,返回一个新的RDD
10、def partitionBy(partitioner: Partitioner): RDD[(K, V)] 根据设置的分区器重新将RDD进行分区,返回新的RDD。
11、def reduceByKey(func: (V, V) => V): RDD[(K, V)] 根据Key值将相同Key的元组的值用func进行计算,返回新的RDD
12、def groupByKey(): RDD[(K, Iterable[V])] 将相同Key的值进行聚集,输出一个(K, Iterable[V])类型的RDD
13、def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] 根据key分别使用CreateCombiner和mergeValue进行相同key的数值聚集,通过mergeCombiners将各个分区最终的结果进行聚集。
14、def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] 通过seqOp函数将每一个分区里面的数据和初始值迭代带入函数返回最终值,comOp将每一个分区返回的最终值根据key进行合并操作。
15、def foldByKey(
zeroValue: V,
partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] aggregateByKey的简化操作,seqop和combop相同,
16、def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
17、def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 底层实现还是使用sortByKey,只不过使用fun生成的新key进行排序。
18、def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD,但是需要注意的是,他只会返回key在两个RDD中都存在的情况。
19、def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))] 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD,注意,如果V和W的类型相同,也不放在一块,还是单独存放。
20、def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] 做两个RDD的笛卡尔积,返回对偶的RDD
21、def pipe(command: String): RDD[String] 对于每个分区,都执行一个perl或者shell脚本,返回输出的RDD,注意,如果你是本地文件系统中,需要将脚本放置到每个节点上。
22、def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] 缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
23、def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] 根据你传入的分区数重新通过网络分区所有数据,重型操作。
24、def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] 性能要比repartition要高。在给定的partitioner内部进行排序
25、def glom(): RDD[Array[T]] 将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
26、def mapValues[U](f: V => U): RDD[(K, U)] 将函数应用于(k,v)结果中的v,返回新的RDD
27、def subtract(other: RDD[T]): RDD[T] 计算差的一种函数去除两个RDD中相同的元素,不同的RDD将保留下来。
2、行动操作
1、def takeSample( withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T] 抽样但是返回一个scala集合。
2、def reduce(f: (T, T) => T): T 通过func函数聚集RDD中的所有元素
3、def collect(): Array[T] 在驱动程序中,以数组的形式返回数据集的所有元素
4、def count(): Long 返回RDD中的元素个数
5、def first(): T 返回RDD中的第一个元素
6、def take(num: Int): Array[T] 返回RDD中的前n个元素
7、def takeOrdered(num: Int)(implicit ord: Ordering[T]) 返回前几个的排序
8、def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。
9、def fold(zeroValue: T)(op: (T, T) => T): T 折叠操作,aggregate的简化操作,seqop和combop一样。
10、def saveAsTextFile(path: String): Unit 将RDD以文本文件的方式保存到本地或者HDFS中
11、def saveAsObjectFile(path: String): Unit 将RDD中的元素以序列化后对象形式保存到本地或者HDFS中。
12、def countByKey(): Map[K, Long] 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
13、def foreach(f: T => Unit): Unit 在数据集的每一个元素上,运行函数func进行更新。
注意:当你在RDD中使用到了class的方法或者属性的时候,该class需要继承java.io.Serializable接口,或者可以将属性赋值为本地变量来防止整个对象的传输。
二、map、flatMap、mapParations、mapPartitionsWithIndex
基本的初始化
private val conf: SparkConf = new SparkConf().setAppName("TestTransformation").setMaster("local")
private val sparkContext = new SparkContext(conf)
2.1 map
map十分容易理解,他是将源JavaRDD的一个一个元素的传入call方法,并经过算法后一个一个的返回从而生成一个新的JavaRDD。
def map(): Unit ={val list = List("张无忌", "赵敏", "周芷若")val listRDD = sc.parallelize(list)val nameRDD = listRDD.map(name => "Hello " + name)nameRDD.foreach(name => println(name))}
可以看出,对于map算子,源JavaRDD的每个元素都会进行计算,由于是依次进行传参,所以他是有序的,新RDD的元素顺序与源RDD是相同的。而由有序又引出接下来的flatMap。
2.2 flatMap
flatMap与map一样,是将RDD中的元素依次的传入call方法,他比map多的功能是能在任何一个传入call方法的元素后面添加任意多元素,而能达到这一点,正是因为其进行传参是依次进行的。
def flatMap(): Unit ={val list = List("张无忌 赵敏","宋青书 周芷若")val listRDD = sc.parallelize(list)val nameRDD = listRDD.flatMap(line => line.split(" ")).map(name => "Hello " + name)nameRDD.foreach(name => println(name))}
flatMap的特性决定了这个算子在对需要随时增加元素的时候十分好用,比如在对源RDD查漏补缺时。
map和flatMap都是依次进行参数传递的,但有时候需要RDD中的两个元素进行相应操作时(例如:算存款所得时,下一个月所得的利息是要原本金加上上一个月所得的本金的),这两个算子便无法达到目的了,这是便需要mapPartitions算子,他传参的方式是将整个RDD传入,然后将一个迭代器传出生成一个新的RDD,由于整个RDD都传入了,所以便能完成前面说的业务。
3.3 mapPartitions
def mapParations(): Unit ={val list = List(1,2,3,4,5,6)val listRDD = sc.parallelize(list,2)listRDD.mapPartitions(iterator => {val newList: ListBuffer[String] = ListBuffer()while (iterator.hasNext){newList.append("hello " + iterator.next())}newList.toIterator}).foreach(name => println(name))}
3.4 mapPartitionsWithIndex
每次获取和处理的就是一个分区的数据,并且知道处理的分区的分区号是啥?
def mapPartitionsWithIndex(): Unit ={val list = List(1,2,3,4,5,6,7,8)sc.parallelize(list).mapPartitionsWithIndex((index,iterator) => {val listBuffer:ListBuffer[String] = new ListBufferwhile (iterator.hasNext){listBuffer.append(index+"_"+iterator.next())}listBuffer.iterator},true).foreach(println(_))}
三、reduce、reduceByKey
3.1 reduce
reduce其实是讲RDD中的所有元素进行合并,当运行call方法时,会传入两个参数,在call方法中将两个参数合并后返回,而这个返回值回合一个新的RDD中的元素再次传入call方法中,继续合并,直到合并到只剩下一个元素时。
def reduce(): Unit ={val list = List(1,2,3,4,5,6)val listRDD = sc.parallelize(list)val result = listRDD.reduce((x,y) => x+y)println(result)}
3.2 reduceByKey
reduceByKey仅将RDD中所有K,V对中K值相同的V进行合并。
def reduceByKey(): Unit ={val list = List(("武当", 99), ("少林", 97), ("武当", 89), ("少林", 77))val mapRDD = sc.parallelize(list)val resultRDD = mapRDD.reduceByKey(_+_)resultRDD.foreach(tuple => println("门派: " + tuple._1 + "->" + tuple._2))}
四、union,join和groupByKey
4.1 union
当要将两个RDD合并时,便要用到union和join,其中union只是简单的将两个RDD累加起来,可以看做List的addAll方法。就想List中一样,当使用union及join时,必须保证两个RDD的泛型是一致的。
def union(): Unit ={val list1 = List(1,2,3,4)val list2 = List(3,4,5,6)val rdd1 = sc.parallelize(list1)val rdd2 = sc.parallelize(list2)rdd1.union(rdd2).foreach(println(_))}
4.2 groupByKey
union只是将两个RDD简单的累加在一起,而join则不一样,join类似于hadoop中的combin操作,只是少了排序这一段,再说join之前说说groupByKey,因为join可以理解为union与groupByKey的结合:groupBy是将RDD中的元素进行分组,组名是call方法中的返回值,而顾名思义groupByKey是将PairRDD中拥有相同key值得元素归为一组。即:
def groupByKey(): Unit ={val list = List(("武当", "张三丰"), ("峨眉", "灭绝师太"), ("武当", "宋青书"), ("峨眉", "周芷若"))val listRDD = sc.parallelize(list)val groupByKeyRDD = listRDD.groupByKey()groupByKeyRDD.foreach(t => {val menpai = t._1val iterator = t._2.iteratorvar people = ""while (iterator.hasNext) people = people + iterator.next + " "println("门派:" + menpai + "人员:" + people)})}
4.3 join
join是将两个PairRDD合并,并将有相同key的元素分为一组,可以理解为groupByKey和Union的结合
def join(): Unit = {val list1 = List((1, "东方不败"), (2, "令狐冲"), (3, "林平之"))val list2 = List((1, 99), (2, 98), (3, 97))val list1RDD = sc.parallelize(list1)val list2RDD = sc.parallelize(list2)val joinRDD = list1RDD.join(list2RDD)joinRDD.foreach(t => println("学号:" + t._1 + " 姓名:" + t._2._1 + " 成绩:" + t._2._2))}
五、sample、cartesian
5.1 sample
/*** sample用来从RDD中抽取样本。他有三个参数* withReplacement: Boolean,* true: 有放回的抽样* false: 无放回抽象* fraction: Double:* 抽取样本的比例* seed: Long:* 随机种子*/
def sample(): Unit ={val list = 1 to 100val listRDD = sc.parallelize(list)listRDD.sample(false,0.1,0).foreach(num => print(num + " "))}
5.2 cartesian
cartesian是用于求笛卡尔积的
def cartesian(): Unit ={val list1 = List("A","B")val list2 = List(1,2,3)val list1RDD = sc.parallelize(list1)val list2RDD = sc.parallelize(list2)list1RDD.cartesian(list2RDD).foreach(t => println(t._1 +"->"+t._2))}
六、filter、distinct、intersection
过滤 出偶数
6.1 filter
def filter(): Unit ={val list = List(1,2,3,4,5,6,7,8,9,10)val listRDD = sc.parallelize(list)listRDD.filter(num => num % 2 ==0).foreach(print(_))}
6.2 distinct
def distinct(): Unit ={val list = List(1,1,2,2,3,3,4,5)sc.parallelize(list).distinct().foreach(println(_))}
6.3 intersection
def intersection(): Unit ={val list1 = List(1,2,3,4)val list2 = List(3,4,5,6)val list1RDD = sc.parallelize(list1)val list2RDD = sc.parallelize(list2)list1RDD.intersection(list2RDD).foreach(println(_))}
七、coalesce、repartition、repartitionAndSortWithinPartitions
7.1 coalesce
分区数由多 -》 变少
def coalesce(): Unit = {val list = List(1,2,3,4,5,6,7,8,9)sc.parallelize(list,3).coalesce(1).foreach(println(_))}
7.2 replication
进行重分区,解决的问题:本来分区数少 -》 增加分区数
def replication(): Unit ={val list = List(1,2,3,4)val listRDD = sc.parallelize(list,1)listRDD.repartition(2).foreach(println(_))}
7.3 repartitionAndSortWithinPartitions
repartitionAndSortWithinPartitions函数是repartition函数的变种,与repartition函数不同的是,repartitionAndSortWithinPartitions在给定的partitioner内部进行排序,性能比repartition要高。
def repartitionAndSortWithinPartitions(): Unit ={val list = List(1, 4, 55, 66, 33, 48, 23)val listRDD = sc.parallelize(list,1)listRDD.map(num => (num,num)).repartitionAndSortWithinPartitions(new HashPartitioner(2)).mapPartitionsWithIndex((index,iterator) => {val listBuffer: ListBuffer[String] = new ListBufferwhile (iterator.hasNext) {listBuffer.append(index + "_" + iterator.next())}listBuffer.iterator},false).foreach(println(_))}
八、cogroup、sortBykey、aggregateByKey
8.1 cogroup
对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并。
def cogroup(): Unit ={val list1 = List((1, "www"), (2, "bbs"))val list2 = List((1, "cnblog"), (2, "cnblog"), (3, "very"))val list3 = List((1, "com"), (2, "com"), (3, "good"))val list1RDD = sc.parallelize(list1)val list2RDD = sc.parallelize(list2)val list3RDD = sc.parallelize(list3)list1RDD.cogroup(list2RDD,list3RDD).foreach(tuple =>println(tuple._1 + " " + tuple._2._1 + " " + tuple._2._2 + " " + tuple._2._3))}
8.2 sortBykey
sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions中实现的,实现如下
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[(K, V)] =
{val part = new RangePartitioner(numPartitions, self, ascending)new ShuffledRDD[K, V, V](self, part).setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
从函数的实现可以看出,它主要接受两个函数,含义和sortBy一样,这里就不进行解释了。该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。其实这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围Key数据分到同一个partition中,然后内部用到了mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。下面对sortByKey的使用进行说明:
def sortByKey(): Unit ={val list = List((99, "张三丰"), (96, "东方不败"), (66, "林平之"), (98, "聂风"))sc.parallelize(list).sortByKey(false).foreach(tuple => println(tuple._2 + "->" + tuple._1))}
8.3 aggregateByKey
aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey函数最终返回的类型还是Pair RDD,对应的结果是Key和聚合好的值;而aggregate函数直接是返回非RDD的结果,这点需要注意。在实现过程中,定义了三个aggregateByKey函数原型,但最终调用的aggregateByKey函数都一致。
def aggregateByKey(): Unit ={val list = List("you,jump", "i,jump")sc.parallelize(list).flatMap(_.split(",")).map((_, 1)).aggregateByKey(0)(_+_,_+_).foreach(tuple =>println(tuple._1+"->"+tuple._2))}
【Spark】(五)Spark Transformation(转换算子) 和 Action(执行算子)相关推荐
- Spark Transformation转换算子和Action行动算子
1.Transformation转换算子 RDD整体上分为Value类型.双Value类型和Key-Value类型 1.1,Value类型 1.1.1,map()映射 object value01_m ...
- spark的RDD中的action(执行)和transformation(转换)两种操作中常见函数介绍
参考文章:spark的RDD中的action(执行)和transformation(转换)两种操作中常见函数介绍 spark常见的RDD 1. 函数概览 2. 常见的Transformations 操 ...
- Spark转换算子和执行算子
在Spark编程模式下,所有的操作被分为转换(transformation)和执行(action)两种. 一般来说,转换操作是对一个数据集里的所有记录执行某种函数,从而使记录发生改变;而执行通常是运行 ...
- 【Spark】Spark的常用算子
Spark的常用算子 目录内容 Spark的常用算子 一.转换算子(Transformation) 二.行动算子(Action) 三.键值对算子(PairRDDFunctions) 四.文件系统算子( ...
- Spark-----Spark 与 Hadoop 对比,Spark 集群搭建与示例运行,RDD算子简单入门
目录 一.Spark 概述 1.1. Spark是什么 1.2. Spark的特点(优点) 1.3. Spark组件 1.4. Spark和Hadoop的异同 二.Spark 集群搭建 2.1. Sp ...
- Spark入门(五)——Spark Streaming
Spark Streaming(流处理) Spark Streaming(流处理) 什么是流处理? 快速入门 概念介绍 初始化 StreamingContext Discretized Streams ...
- Spark源码阅读(五) --- Spark的支持的join方式以及join策略
版本变动 2021-08-30 增加了对Broadcast Hash Join小表大小的评估内容 增加了对Sort Merge Join优于Shuffle Hash Join调用的解释 目录 Spar ...
- 基于Spark的数据清洗与转换
基于Spark的数据清洗与转换 一.实验目的 二.实验内容 三.实验原理 四.实验环境 五.实验步骤 5.1 启动HDFS集群.Spark集群和Zeppelin服务器 5.2 准备实验数据 5.3 数 ...
- Spark五之Structured-Streaming
文章目录 Structured Streaming 简介 快速入门 Programming Model(编程模型) 1.输入表 2.结果表 3.输出方式 Kafka Source Foreach(单行 ...
- 大数据管理技术 | 实习五 Spark软件栈体验
文章目录 实习五 Spark软件栈体验 Spark安装与启动 1.Spark RDD-WordCount 2.Spark SQL 3.Spark MLlib之Titanic 4.GraphX再现Pag ...
最新文章
- Javascript内置对象新增接口列表
- 今晚 8 点直播 | OpenCV 20 年,首款开源软硬一体的 OAK 套件来了!
- 6.Mybatis中的动态Sql和Sql片段(Mybatis的一个核心)
- 创业第一天,有三AI扔出了深度学习的150多篇文章和10多个专栏
- SQL语言学习(五)流程控制函数学习
- 使用border-color设置输入框边框颜色后颜色不一致(左上边自动深色)解决方案
- 从PHP5到PHP7自我封装MongoDB以及平滑升级
- 第2章 状态机思维与状态机变量
- 剑指offer之把二叉树打印成多行
- 软件工程第三次作业(最大子段和)
- EndNote文献输出引用格式自定义修改与编辑界面解读
- [现代控制理论]8.5_线性控制器设计_轨迹跟踪simulink
- M7贴片二极管可以与什么型号的二极管通用?
- Node.Js实现最最最简单的登录注册
- 行为识别---不同模型的帧采样策略
- 【风马一族_构思】时光穿梭机
- Spring Boot 注解原理,自动装配原理,图文并茂,万字长文!
- vue路由懒加载写法
- Excel基础学习(2013及以上版本)
- 空客为重庆四条地铁线路部署TETRA系统 助力实施跨线运营
热门文章
- 微信小程序onShareTimeline()分享朋友圈功能
- ElastiSearch与Solr和Lucene
- 明明表没锁,却报SQL 错误: ORA-04021: 等待锁定对象 时发生超时
- 螃蟹芯片RTL8762之修改蓝牙设备类型
- python给一个不多于 5 位的正整数,要求:一、求它是几位数,二、逆序打印出各位数字
- 2021年西式面点师(初级)及西式面点师(初级)模拟试题
- 学习SEO第一天[笔记不易]
- 电脑黑屏,只有鼠标光标
- 飞书文档如何添加代码块 ```
- Linux模块(2) - 创建设备节点