spark算子详细介绍

  • 算子分区数、分区器如何确定?
  • Value 类型
    • 1. map() 改变结构就用map
    • 2. mapPartitions() 以分区为单位执行Map
      • 思考一个问题:map和mapPartitions的区别?
    • 3. mapPartitionsWithIndex() 带分区号
    • 4. flatMap() 扁平化
    • 5. glom() 分区转换数组
    • 6. groupBy() 分组
    • 7. filter() 过滤
    • 8. sample() 采样
      • 思考一个问题:sample有啥用,抽奖吗?
    • 9. distinct() 去重
      • 思考一个问题:如果不用该算子,你有什么办法实现数据去重?
    • 10. coalesce() 合并分区
    • 11. repartition() 重新分区(执行Shuffle)
      • 思考一个问题:coalesce和repartition区别?
    • 12. sortBy() 排序
    • 13. pipe() 管道,调用shell脚本
  • key-value 类型算子
    • 1. partitionBy() 按照K重新分区
    • 2. reduceByKey() 按照K聚合V
    • 3. groupByKey() 按照K重新分组
      • 思考一个问题:reduceByKey和groupByKey的区别?
      • 思考一个问题:groupBy 和 groupByKey 的区别?
    • 4. aggregateByKey() 按照K处理分区内和分区间逻辑
    • 5. foldByKey() 分区内和分区间相同的aggregateByKey()
    • 6. combineByKey() 转换结构后分区内和分区间操作
      • 思考一个问题:reduceByKey、foldByKey、aggregateByKey、combineByKey的区别?
    • 7. sortByKey() 排序
    • 8. mapValues() 只对V进行操作
    • 9. join() 连接
      • 思考一个问题:如果key存在不相等呢?
    • 10. leftOuterJoin() 左外连接
    • 11. cogroup() 联合
  • 双Value 类型
    • 1. intersection() 交集
    • 2. union() 并集
    • 3. subtract() 差集
    • 4. zip() 拉链
      • 思考一个问题:如果两个RDD数据类型不一致怎么办?
      • 思考一个问题:如果两个RDD数据分区不一致怎么办?
      • 思考一个问题:如果两个RDD分区数据数量不一致怎么办?

算子分区数、分区器如何确定?

如何确定新的RDD的分区数?

  一、如果设置了并行度,则使用并行度为分区数;

  二、若没设置并行度,则取上游最大分区数,作为下游分区数。

  三、若上游没有分区器,且没设置并行度,则使用默认并行度作为分区数(默认机器总核数)

如何确定分区器?

  一、默认先取上游最大分区数的分区器;

    ①如果有分区器,且分区数"大于或等于"设置的并行度,则使用该分区器和分区数;

    ②若分区数"小于"设置的并行度数,则还是使用HashPartitioner,且分区数为设置的并行度数;

    ③如果有分区器,但没有设置并行度,则直接使用该分区器和分区数

  二、否则,就使用HashPartitioner,且分区数为设置的并行度数!

四种情况:

一、设置了并行度,上游没有分区器

  则直接使用HashPartitioner,分区数为并行度

二、设置了并行度,上游有分区器

  ①并行度>上游最大分区数,则使用HashPartitioner,分区数为并行度

  ②并行度<=上游最大分区数,则使用该分区器和分区数

三、没设置并行度,上游有分区器

  则直接使用上游分区数最大的 分区器和分区数

四、没设置并行度,上游没有分区器

  则使用HashPartitioner,分区数为并行度默认值(机器总核数)

总结:

  默认不设置并行度,取上游最大分区数,作为下游分区数。

  默认取上游最大分区数的分区器,如果没有,就使用HashPartitioner!

Value 类型

1. map() 改变结构就用map

函数签名:

map[U: ClassTag](f: T => U): RDD[U]
 /*** Return a new RDD by applying a function to all elements of this RDD.*/def map[U: ClassTag](f: T => U): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))}

函数说明

参数f是一个函数,它可以接收一个参数。当某个RDD执行map方法时,会遍历该RDD中的每一个数据项,并依次应用f函数,从而产生一个新的RDD。即,这个新RDD中的每一个元素都是原来RDD中每一个元素依次应用f函数而得到的。

将处理的数据逐条进行映射转换,这里的转换可以是值的转换,也可以是类型的转换。

val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
// 值的转换
val dataRDD1: RDD[Int] = dataRDD.map(num => {num * 2}
)
// 类型的转换
val dataRDD2: RDD[String] = dataRDD1.map(num => {"" + num}
)

图片中的说明:先把一个数据拿过来以后进行 *2 操作,例如拿1 过来后 *2 = 2 后,1这个数据就离开这块区域,然后进行第二个数据的处理…

2. mapPartitions() 以分区为单位执行Map

函数签名

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] = withScope {val cleanedF = sc.clean(f)new MapPartitionsRDD(this,(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),preservesPartitioning)}

函数说明

f: Iterator[T] => Iterator[U]:f函数把每个分区的数据分别放入到迭代器中(批处理)。

preservesPartitioning: Boolean = false :是否保留RDD的分区信息。

功能:一次处理一个分区数据。

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。


每一个分区的数据会先到内存空间,然后才进行逻辑操作,整个分区操作完之后,拿到分区的数据才会释放掉。

从性能方面讲:批处理效率高,从内存方面:需要内存空间较大

val dataRDD: RDD[Int] = sc.makeRDD(1 to 4, 2)
val dataRDD1: RDD[Int] = dataRDD.mapPartitions(datas => {datas.filter(_ == 2)}
)

小功能:获取每个数据分区的最大值

val dataRDD: RDD[Int] = sc.makeRDD(List(1,3,6,2,5),2)
// 获取每个数据分区的最大值
val rdd: RDD[Int] = dataRDD.mapPartitions(iter => {List(iter.max).iterator}
)
println(rdd.collect().mkString(","))

思考一个问题:map和mapPartitions的区别?

map算子每一次处理一条数据,而mapPartitions算子每一次将一个分区的数据当成一个整体进行数据处理。

如果一个分区的数据没有完全处理完,那么所有的数据都不会释放,即使前面已经处理完的数据也不会释放。容易出现内存溢出,所以当内存空间足够大时,为了提高效率,推荐使用mapPartitions算子

有些时候,完成比完美更重要

3. mapPartitionsWithIndex() 带分区号

函数签名

def mapPartitionsWithIndex[U: ClassTag](// Int表示分区编号f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

函数说明

f: (Int, Iterator[T]) => Iterator[U]:f函数把每个分区的数据分别放入到迭代器中(批处理)并且加上分区号

参数Int:为分区号

参数Iterator[T]:为一个迭代器,内容为一个分区中所有的数据;

函数的返回Iterator[U]:分区内每个数据经过转换以后数据形成的迭代器。

作用:比mapPartitions多一个整数参数表示分区号,在处理数据同时可以获取当前分区索引。

val rdd: RDD[Int] = sc.makeRDD(1 to 4, 2)val mapRdd = rdd.mapPartitionsWithIndex((index, items) => {items.map((index, _))}
)
// 打印修改后的RDD中数据
mapRdd.collect().foreach(println)结果:
(0,1)
(0,2)
(1,3)
(1,4)

小功能:获取第二个数据分区的数据

val dataRDD: RDD[Int] = sc.makeRDD(List(1,3,6,2,5,4),3)
// 获取的分区索引从0开始
val rdd = dataRDD.mapPartitionsWithIndex((index, iter) => {if ( index == 1 ) {iter} else {Nil.iterator}}
)println(rdd.collect().mkString(","))

4. flatMap() 扁平化

函数签名

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))}

函数说明

与map操作类似,将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中。

本质:要求返回一个集合,会自动将集合压平(得到里面的元素)

区别:在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中。

val listRDD=sc.makeRDD(List(List(1,2),List(3,4)), 2)val mapRdd: RDD[Int]= listRDD.flatMap(item=>item
)
// 打印修改后的RDD中数据
mapRdd.collect().foreach(println)结果:
1
2
3
4

小功能:将List(List(1,2),3,List(4,5))进行扁平化操作

val dataRDD = sc.makeRDD( List(List(1,2),3,List(4,5)) )val rdd = dataRDD.flatMap(data => {data match {case list: List[_] =>listcase d => List(d)}}
)

5. glom() 分区转换数组

函数签名

def glom(): RDD[Array[T]]
def glom(): RDD[Array[T]] = withScope {new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))}

函数说明

作用:将同一个分区内的数据转换成数组,分区不变

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

该操作将RDD中每一个分区的数据变成一个数组,并放置在新的RDD中,数组中元素的类型与原分区中元素类型一致。

val rdd = sc.makeRDD(1 to 4, 2)
// 将每个分区的数据转换为数组,并去最大值
val mapRdd = rdd.glom().map(_.max)
// 打印修改后的RDD中数据
mapRdd.collect().foreach(println)结果:
2
4

6. groupBy() 分组

函数签名

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {groupBy[K](f, defaultPartitioner(this))}

函数说明

分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。

groupBy可以决定数据的分类,但是分类后的数据去哪个分区此算子无法决定

将数据根据指定的规则进行分组,分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中

注意:一个组的数据在一个分区中,但是并不是说一个分区中只有一个组

小功能:将List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组。

// 根据单词首写字母进行分组
val dataRDD = sc.makeRDD(List("Hello", "hive", "hbase", "Hadoop"), 2)dataRDD.groupBy(word=>{// word.substring(0,1)// word.charAt(0)// String(0) => StringOps// 隐式转换word(0)
})

小功能:WordCount。

val dataRDD = sc.makeRDD(List("Hello World", "Hello", "Hello"))println(dataRDD.flatMap(_.split(" ")).groupBy(word => word).map(kv => (kv._1, kv._2.size)).collect().mkString(","))

7. filter() 过滤

函数签名

def filter(f: T => Boolean): RDD[T]

函数说明

按照传入函数的返回值进行筛选过滤,符合规则的数据保留(保留为true的数据),不符合规则的数据丢弃。

当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

val rdd = sc.makeRDD(1 to 4, 2)
// 过滤出_%2==0的数据
val filterRdd = rdd.filter(_%2 == 0)filterRdd.collect().foreach(println)结果:
2
4

8. sample() 采样

函数签名

def sample(withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T]

函数说明

withReplacementtrue为有放回的抽样,false为无放回的抽样。

fraction表示:以指定的随机种子随机抽样出数量为fraction的数据。

seed表示:指定随机数生成器种子。

val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
// 抽取数据不放回(伯努利算法)
// 伯努利算法:又叫0、1分布。例如扔硬币,要么正面,要么反面。
// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
// 第一个参数:抽取的数据是否放回,false:不放回
// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
// 第三个参数:随机数种子
val dataRDD1 = dataRDD.sample(false, 0.5)// 抽取数据放回(泊松算法)
// 第一个参数:抽取的数据是否放回,true:放回;false:不放回
// 第二个参数:重复数据的几率,范围大于等于0.表示每一个元素被期望抽取到的次数
// 第三个参数:随机数种子
val dataRDD2 = dataRDD.sample(true, 2)

思考一个问题:sample有啥用,抽奖吗?

在实际开发中,往往会出现数据倾斜的情况,那么可以从数据倾斜的分区中抽取数据,查看数据的规则,分析后,可以进行改善处理,让数据更加均匀

9. distinct() 去重

函数签名

def distinct()(implicit ord: Ordering[T] = null): RDD[T]def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
/*** Return a new RDD containing the distinct elements in this RDD.*/
def distinct(): RDD[T] = withScope {distinct(partitions.length)
}def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}

函数说明

对内部的元素去重,distinct后会生成与原RDD分区个数不一致的分区数。上面的函数还可以对去重后的修改分区个数。

val distinctRdd: RDD[Int] = sc.makeRDD(List(1,2,1,5,2))distinctRdd.distinct(2).collect().foreach(println)结果
1
2
5

思考一个问题:如果不用该算子,你有什么办法实现数据去重?

可以通过reduceByKey的方式去重

// 数据1,2,1,2
dataRDD.map(x => (x, null)) // (1,null)(2,null)(1,null)(2,null)// (1,null)(1,null) => (1,null)// (2,null)(2,null) => (2,null).reduceByKey((v1, v2) => v1, numPartitions) // 只取元组的第一个元素,就是key.map(_._1)

10. coalesce() 合并分区

函数签名

def coalesce(numPartitions: Int, //重置的分区数量shuffle: Boolean = false, //是否需要打乱数据,shufflepartitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]

函数说明

该算子重点在减少分区,我们在重置分区的个数的时候,参数值不要比原有分区数量多,因为该算子默认是不会打乱数据重新,没有shuffle,所以分区设置多了,多余的分区不会有数据。

我们在使用这个算子的时候,只需要传递重置的分区数量即可,其他的参数使用默认值;

如果想扩大分区,有新的算子可以实现,不过底层还是调用coalesce,只是将参数2设置为true

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率

当spark程序中,存在过多的小任务的时候,可以通过coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本

val rdd: RDD[Int] = sc.makeRDD(Array(1,2,3,4),4)val mapRdd: RDD[Int] = rdd.coalesce(2)
mapRdd.mapPartitionsWithIndex((index,values)=>values.map((index,_))
).collect().foreach(println)结果
(0,1)
(0,2)
(1,3)
(1,4)无shuffle
设置2个分区后的结果:
(0,1) (0,2) (1,3) (1,4)
设置3个分区后的结果:
(0,1) (1,2) (2,3) (2,4)
设置4个或者以上
(0,1) (1,2) (2,3) (3,4) 设置true后开启shuffle
设置1 ,2后的结果
(0,1) (0,2) (0,3) (0,4)
设置3后的结果
(0,1) (1,2) (1,3) (2,4)
设置4后的结果
(3,1) (3,2) (3,3) (3,4)
....

源码

for (i <- 0 until maxPartitions) {val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toIntval rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt(rangeStart until rangeEnd).foreach{ j => groupArr(i).partitions += prev.partitions(j) }
}解释说明:maxPartitions:传进来的新分区数prev.partitions:之前RDD的分区数分区i开始 = 分区号*前一个分区数 / 新的分区数结束 = (分区号+1)*前一个分区数 / 新的分区数

11. repartition() 重新分区(执行Shuffle)

函数签名

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {coalesce(numPartitions, shuffle = true)}

函数说明

该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true

这个参数即可以扩大分区,也可以缩小分区的数量,但是我们一般用来扩大分区,缩小分区可以使用coalesce算子

无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。

val rdd: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6,7,8),4)
val mapRdd: RDD[Int] = rdd.repartition(8)mapRdd.mapPartitionsWithIndex((index,values) =>values.map((index,_))
).collect().foreach(println)结果
(6,1)
(6,3)
(6,5)
(6,7)
(7,2)
(7,4)
(7,6)
(7,8)

思考一个问题:coalesce和repartition区别?

repartition方法其实就是coalesce方法,只不过肯定使用了shuffle操作。让数据更均衡一些,可以有效防止数据倾斜问题。

如果缩减分区,一般就采用coalesce;如果扩大分区,就采用repartition

这两个算子只是决定分区数,并不能决定分区的数据如何分区,即只针对分区数,并不针对数据

12. sortBy() 排序

函数签名

def sortBy[K](f: (T) => K,ascending: Boolean = true,numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

函数说明

该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。

val rdd: RDD[Int] = sc.makeRDD(Array(1,5,3,2))
val mapRdd = rdd.sortBy(item=>item, false) // 默认为true为正序,false为倒序
// 打印修改后的RDD中数据
mapRdd.collect().foreach(println)结果
5
3
2
1

13. pipe() 管道,调用shell脚本

函数签名

def pipe(command: String): RDD[String]

函数说明

该方法可以让我们使用任意一种语言实现Spark作业中的部分逻辑,只要它能读写Unix标准的流就行,例如Pythonshell等脚本

管道,针对每个分区,都调用一次脚本,返回输出的RDD。

注意:脚本需要放在计算节点可以访问到的位置

实际案例

  1. 有2万个文件,每个10G,放在HDFS上了,总量200TB的数据需要分析。
  2. 分析程序本身已经写好了,程序接受一个参数:文件路径
  3. 如何用spark完成集群整个分析任务?

想到了Spark Pipe 应该可以完成:

总的来说就是Spark有一个pipe的编程接口,用的是Unix的标准输入和输出,类似于 Unix的 | 管道,例如: ls | grep ^d

第一步:创建RDD

  • 这一个步骤主要是罗列输入的任务,即,包含哪些文件。

    // 此处文件的List可以从另一个HDFS上的文件读取过来
    val data = List("hdfs://xxx/xxx/xxx/1.txt","hdfs://xxx/xxx/xxx/2.txt",...)
    val dataRDD = sc.makeRDD(data) //sc 是你的 SparkContext
    

第二步:创建一个Shell脚本启动分析任务

  • 我们已经有了RDD了,那么接下来写一个启动launch.sh脚本来启动我们的分析程序

    #!/bin/sh
    echo "Running launch.sh shell script..."
    while read LINE; doecho "启动分析任务, 待分析文件路径为: ${LINE}"bash hdfs://xxx/xxx/xx/analysis_program.sh ${LINE}
    done
    

第三步:RDD对接到启动脚本

  • 下面的步骤就是整合步骤了

    val scriptPath = "hdfs://xxx/xxx/launch.sh"
    val pipeRDD = dataRDD.pipe(scriptPath)
    pipeRDD.collect()
    

总结一下

dataRDD里面包含了我们要分析的文件列表,这个列表会被分发到spark集群,然后spark的工作节点会分别启动一个launch.sh脚本,接受文件列表作为输入参数,在launch.sh脚本的循环体用这些文件列表启动具体的分析任务

好处

  1. 既有程序analysis_program.sh 不需要任何修改,做到了重用,这是最大的好处

  2. 使用集群来做分析,速度比以前更快了(线性提升)

  3. 提高了机器的利用率(以前可能是一台机器分析)

key-value 类型算子

一个类中,隐式变量的类型只能有一种

1、Spark中有很多方法都是基于Key进行操作,所以数据格式应该为键值对(对偶元素)才能使用这些方法

2、如果数据类型是k-v类型,那么Spark会将RDD自动转换补充很多新的功能——>功能的扩展

3、那么是如果实现的?

  1. 通过隐式转换

  2. 如果数据类型为k-v类型(即RDD[k,v]),在RDD的伴生对象中,会将当前的RDD转换为PairRDDFunctions对象

  3. 所有的k-v类型的扩展方法,都来自PairRDDFunctions类中的方法

只有K-V类型的算子才有分区器

1. partitionBy() 按照K重新分区

函数签名

def partitionBy(partitioner: Partitioner): RDD[(K, V)]
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {throw new SparkException("HashPartitioner cannot partition array keys.")}if (self.partitioner == Some(partitioner)) {self} else {new ShuffledRDD[K, V, V](self, partitioner)}}

函数说明

RDD[K,V]中的K按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner

如果原有的RDD和新的RDD是一致的话就不进行分区,否则会产生Shuffle过程。

思考一个问题:如果重分区的分区器和当前RDD的分区器一样怎么办?

答:不进行任何的处理。不会再次重分区。

思考一个问题:Spark还有其他分区器吗?

答:有一个RangePartitioner,在sortBy中使用

思考一个问题:如果想按照自己的方法进行数据分区怎么办?

答:自定义分区器

// 哈希分区器
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)
val mapRdd: RDD[(Int, String)] = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
mapRdd.mapPartitionsWithIndex{(index,values)=>values.map((index,_))
}.collect().foreach(println)结果
(0,(2,bbb))
(1,(1,aaa))
(1,(3,ccc))

自定义分区规则

要实现自定义分区器,需要继承org.apache.spark.Partitioner类,并实现下面三个方法。

(1)numPartitions:Int:返回创建出来的分区数。

(2)getPartition(key: Any):Int:返回给定键的分区编号(0 到 numPartitions-1)。

(3)equals():Java 判断相等性的标准方法。这个方法的实现非常重要,Spark需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样Spark才可以判断两个RDD的分区方式是否相同。

// main方法
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)
val mapRdd: RDD[(Int, String)] = rdd.partitionBy(new MyPartition(2))
mapRdd.mapPartitionsWithIndex{(index,values)=>values.map((index,_))
}.collect().foreach(println)// 主要代码
class MyPartition(num:Int) extends Partitioner{override def numPartitions: Int = numoverride def getPartition(key: Any): Int = {if(key.isInstanceOf[Int]){val i: Int = key.asInstanceOf[Int]if(i%2==0){0}else{1}}else{0}}}结果
(0,(2,bbb))
(1,(1,aaa))
(1,(3,ccc))

2. reduceByKey() 按照K聚合V

函数签名

 def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {reduceByKey(defaultPartitioner(self), func)}def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

函数说明

该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合。其存在多种重载形式,还可以设置新RDD的分区数。

形参func:表示对相同Key的value处理的逻辑

形参numPartitions:聚合结果后的,分区的数量

函数返回值:聚合以后的结果,返回值数据类型和原数据value类型一致



触发一个shuffle就会划分一个新阶段

val rdd = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))
val mapRdd: RDD[(String, Int)] = rdd.reduceByKey((v1,v2)=>v1+v2)
// 打印修改后的RDD中数据
mapRdd.collect().foreach(println)结果
(a,6)
(b,7)

3. groupByKey() 按照K重新分组

函数签名

def groupByKey(): RDD[(K, Iterable[V])]def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

函数说明

groupByKey对每个key进行操作,但只生成一个seq,并不进行聚合。该操作可以指定分区器或者分区数(默认使用HashPartitioner)


val rdd = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))
// 调用groupByKey后,返回数据的类型为元组
// 元组的第一个元素:表示的是用于分组的key
// 元组的第二个元组:表示的是分组后,相同key的value的集合
val mapRdd: RDD[(String, Iterable[Int])] = rdd.groupByKey(2)
// 打印修改后的RDD中数据
mapRdd.mapPartitionsWithIndex{(index,values)=>values.map((index,_))
}.collect().foreach(println)结果
(0,(b,CompactBuffer(5, 2)))
(1,(a,CompactBuffer(1, 5)))

思考一个问题:reduceByKey和groupByKey的区别?

reduceByKey:先根据数据的Key进行分组,然后再对相同Key的value进行数据聚合处理,此算子会对value再shuffle之前进行预聚合处理(聚合逻辑和参数逻辑一致),返回值类型RDD[(K, V)]

groupByKey:根据数据的key进行分组,返回值类型RDD[(K, Iterable[V])]

reduceByKey与groupByKey相比,多了对value的聚合操作,

注意,并且reduceByKey会对value再shuffle之前进行预聚合,如果最后的结果不需要对value进行一些聚合操作,那一定不能使用reduceByKey,否则会出现结果数据不准的情况

两个算子在实现相同的业务功能时,reduceByKey存在预聚和功能,所以性能比较高,推荐使用。但是,不是说一定就采用这个方法,需要根据场景来选择

思考一个问题:groupBy 和 groupByKey 的区别?

groupBy:根据指定的规则对数据进行分组,有逻辑形参,返回值类型RDD[(K, Iterable[T])]

groupByKey:直接根据数据的key进行分组,没有逻辑形参,返回值类型RDD[(K, Iterable[T])]

4. aggregateByKey() 按照K处理分区内和分区间逻辑

函数签名

def aggregateByKey[U: ClassTag]
(zeroValue: U) //初始值(初始值只参与每个分区内相同key的第一次运算,而且初始值是与value进行操作)
(seqOp: (U, V) => U, //分区内的计算规则combOp: (U, U) => U //分区间的计算规则
): RDD[(K, U)]

函数说明

1)zeroValue(初始值):给每一个分区中的每一种key一个初始值。

这个初始值的理解:这个初始值就是与第一个值进行比较,保证第一次对比下去。

(2)seqOp(分区内):函数用于在每一个分区中用初始值逐步迭代value。

(3)combOp(分区间):函数用于合并每个分区中的结果。

使用场景:分区内分区间 的计算逻辑不同时

根据key进行聚合,将数据根据不同的规则进行分区内计算和分区间计算,计算逻辑均是针对于value的操作


取出每个分区内相同key的最大值然后分区间相加// aggregateByKey算子是函数柯里化,存在两个参数列表
// 1. 第一个参数列表中的参数zeroValue:U 表示初始值(初始值只参与,每个分区内相同key的第一次运算,而且初始值是与value进行操作)
// 2. 第二个参数列表中含有两个参数
//    2.1 第一个参数seqOp:(U, V) => U 表示分区内的计算规则
//    2.2 第二个参数combOp:(U, U) => U 表示分区间的计算规则
val rdd = sc.makeRDD(List(("a",1),("a",2),("c",3),("b",4),("c",5),("c",6)),2)
// 0:("a",1),("a",2),("c",3) => (a,10)(c,10)
//                                              => (a,10)(b,10)(c,20)
// 1:("b",4),("c",5),("c",6) => (b,10)(c,10)val resultRDD = rdd.aggregateByKey(10)((x, y) => math.max(x,y),(x, y) => x + y)resultRDD.collect().foreach(println)

5. foldByKey() 分区内和分区间相同的aggregateByKey()

函数签名

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

函数说明

使用场景:分区内分区间 的计算逻辑相同时

当分区内计算规则和分区间计算规则相同时,aggregateByKey就可以简化为foldByKey

val rdd = sc.makeRDD(List(("a",1),("b",1),("b",1),("a",1)),2)rdd.foldByKey(0)(_+_).collect().foreach(println)结果
(b,2)
(a,2)

6. combineByKey() 转换结构后分区内和分区间操作

函数签名

def combineByKey[C](createCombiner: V => C, //改变相同key的第一个计算的value的结构,后续操作的value返回值都是此结构mergeValue: (C, V) => C, //分区内的计算规则mergeCombiners: (C, C) => C //分区间的计算规则
): RDD[(K, C)]

函数说明

(1)createCombiner(转换数据的结构): combineByKey()

会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。

如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值。

(2)mergeValue(分区内):

如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并。

(3)mergeCombiners(分区间):

由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的 mergeCombiners()方法将各个分区的结果进行合并。

使用场景:当计算时发现key的value不符合计算规则的格式时,可以选择conbineByKey

最通用的对key-value型rdd进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

小练习:将数据List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))求每个key的平均值val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey((_, 1), //88 => (88,1)将value转换格式(值,个数)(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), //因为它不知道你将value变成什么样的结构,所有要加类型 //分区内逻辑,将value值相加,次数加1(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
) // 分区间,将value值相加,次数相加combineRdd.map{case ( key, ( total, count ) ) => (key, total / count )}.collect().foreach(println)

思考一个问题:reduceByKey、foldByKey、aggregateByKey、combineByKey的区别?

从源码的角度来讲,四个算子的底层逻辑是相同的。

  • reduceByKey:不会对第一个value进行处理,分区内和分区间计算规则相同。

  • aggregateByKey:会将初始值和第一个value使用分区内计算规则进行计算。

  • foldByKey:会将初始值和第一个value使用分区内计算规则,分区内和分区间的计算规则相同。

  • combineByKey:第一个参数就是对第一个value进行处理,所有无需初始值

从源码的角度发现,如上4个算子底层逻辑是相同,唯一不同的区别是参数不同。

参数1: createCombiner,分区内相同key的第一个v的转换逻辑
参数2: mergeValue,分区内部的计算逻辑
参数3: mergeCombiners,分区间的计算逻辑def combineByKeyWithClassTag[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true, //开启map端预聚合serializer: Serializer = null)
reduceByKey
源码如下:参数1:没有任何的转换,对key的第一个value没有转换参数2和参数3相同,即分区内和分区间的计算逻辑保持一致。combineByKeyWithClassTag[V]((v: V) => v, func, func   )aggregateByKey
源码如下:参数1:传递的初始值会和每一个不同key的第一个value按照分区内计算逻辑进行计算参数2:分区内计算逻辑参数3:分区间的计算逻辑combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),cleanedSeqOp,combOp       )foldByKey
源码如下:参数1:传递的初始值会和每一个不同key的第一个value按照分区内计算逻辑进行计算参数2和参数3一致:分区内和分区间的计算逻辑保持一致combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),cleanedFunc, cleanedFunc       )combineByKey
源码如下:参数1:分区内每个相同key的第一个v的转换逻辑,所以无需传递初始值参数2:分区内计算逻辑参数3:分区间的计算逻辑combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners  )

7. sortByKey() 排序

函数签名

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] {val part = new RangePartitioner(numPartitions, self, ascending)new ShuffledRDD[K, V, V](self, part).setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

函数说明

有两个形参,均有默认值:

形参1:ascending: Boolean = true

排序的顺序,默认是升序,如果需要降序,则输入false

形参2:numPartitions: Int = self.partitions.length

排序以后分区的数量,默认等于上一个rdd的分区的数量。

还可以自定义分区的规则。步骤:

  1. 继承与ordered,并混入serializable

  2. 重写compare方法,指定排序比较的规则

在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false)

小功能:设置key为自定义类User

class User extends Ordered[User] with Serializable{override def compare(that: User): Int = {if (this.name > that.name){1}else if (this.name == that.name){this.age - that.age}else{-1}}
}

8. mapValues() 只对V进行操作

函数签名

def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {val cleanF = self.context.clean(f)new MapPartitionsRDD[(K, U), (K, V)](self,(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },preservesPartitioning = true)}

函数说明

针对于(K,V)形式的类型只对V进行操作

val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (1, "d"), (2, "b"), (3, "c")))
val mapRdd: RDD[(Int, String)] = rdd.mapValues(_+">>>>")
// 打印修改后的RDD中数据
mapRdd.collect().foreach(println)结果
(1,a>>>>)
(1,d>>>>)
(2,b>>>>)
(3,c>>>>)

9. join() 连接

函数签名

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

函数说明

在类型为(K,V)的RDD上调用,返回一个相同key对应的所有元素连接在一起的(K,(V1,V2))的RDD

情况1:如果当前RDD中key在连接的RDD中没有,那么这条数据就不会被关联,数据则没有

情况2:如果当前RDD中相同的Key有多条数据,且另外一个RDD与子相同的key也有多条数据,那么就出现了笛卡尔积错误

正常情况
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
rdd.join(rdd1).collect().foreach(println)//(1,("a",4)),(2,("b",5),(3,("c",6))
少key的情况
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",1)))
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a",21),("b",2),("c",2)))
rdd.join(rdd1).collect().mkString(",") //(a,(1,21)),(b,(1,2))
多重复key的情况
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",1),("a",2)))
val rdd3: RDD[(String, Int)] = sc.makeRDD(List(("a",21),("b",2),("c",2),("a",2)))
rdd2.join(rdd3).collect().mkString(",")会出现数据重复,笛卡尔乘积的现象
//(a,(1,21)),(a,(1,2)),(a,(2,21)),(a,(2,2)),(b,(1,2))

思考一个问题:如果key存在不相等呢?

如果key不相等,对应的数据无法连接;如果key有重复的,那么数据会多次连接

10. leftOuterJoin() 左外连接

函数签名

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

函数说明

类似于SQL语句的左外连接

注意返回值:

  • 如果两个RDD有相同的key,则为:(a,(1,Some(21)))

  • 如果主RDD中的key,在从RDD没有对应的key,则为:(d,(2,None))

val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",1),("d",2),("a",2)))
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a",21),("b",2),("a",2)))rdd.leftOuterJoin(rdd1).collect().mkString(",")//(a,(1,Some(21))),(a,(1,Some(2))),(a,(2,Some(21))),(a,(2,Some(2))),(b,(1,Some(2))),(d,(2,None))

11. cogroup() 联合

函数签名

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

函数说明

在类型为(K,V)(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD

返回值:(K,(Iterable<V>,Iterable<W>)),是一个元组

第一个元素:RDD的key

第二个元素:还是一个元组

  • 元组的第一个元素:当前相同key的所有value的集合,是一个迭代器

  • 元组的第二个元素:另外一个RDD的key的所有value的集合,是一个迭代器

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("a",2),("c",3)))
val dataRDD2 = sparkContext.makeRDD(List(("a",1),("c",2),("c",3)))val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = dataRDD1.cogroup(dataRDD2)// (a,(CompactBuffer(1,2),CompactBuffer(1)))
// (c,(CompactBuffer(3),CompactBuffer(2, 3)))val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",1),("c",1)))
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a",21),("b",2),("b",2),("d",1)))
rdd.cogroup(rdd1).collect().foreach(println)// (a,(CompactBuffer(1),CompactBuffer(21)))
// (b,(CompactBuffer(1),CompactBuffer(2, 2)))
// (c,(CompactBuffer(1),CompactBuffer()))
// (d,(CompactBuffer(),CompactBuffer(1)))

双Value 类型

双Value:表示是两个RDD之间进行操作,类似sacla中集合的并集(union)、交集(intersect)、差集(diff)、拉链(zip

1. intersection() 交集

函数签名

def intersection(other: RDD[T]): RDD[T]

函数说明

1、数据打乱重组,有shuffle过程

2、返回的RDD的分区数量,为两个RDD最大的分区数量

3、两个RDD的数据类型必须保持一致,否者编译时报错

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD: RDD[Int] = dataRDD1.intersection(dataRDD2)

2. union() 并集

函数签名

def union(other: RDD[T]): RDD[T]

函数说明

1、分区:分区合并

2、数据:数据合并

3、两个RDD的数据类型必须保持一致,否者编译不通过

对源RDD和参数RDD求并集后返回一个新的RDD

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD: RDD[Int] = dataRDD1.union(dataRDD2)

3. subtract() 差集

函数签名

def subtract(other: RDD[T]): RDD[T]

函数说明

1、分区:返回的RDD的分区数量,等于调用这个方法的RDD的分区数量

2、有数据打乱重组过程,有shuffle过程

3、数据:返回当前RDD除去和参数RDD共同的数据集

4、两个RDD的数据类型必须保持一致,否者编译时报错

以一个RDD元素为主,去除两个RDD中重复元素,将其他元素保留下来。求差集

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD: RDD[Int] = dataRDD1.subtract(dataRDD2)

4. zip() 拉链

函数签名

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

函数说明

1、分区数量相同,每个分区的数据量不相等,

报错:Can only zip RDDs with same number of elements in each partition

  只有两个RDD的每个分区数据量相同才能拉链

2、分区数量不相同,每个分区的数量量相同,

报错:Can’t zip RDDs with unequal numbers of partitions

  RDD的分区数量不同不能拉链

综上:只有两个RDD的分区数量和每个分区数据量相等,才能拉链(拉链的规则需要对应)

3、返回的RDD的数据是元组

将两个RDD中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的元素。

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD: RDD[(Int, Int)] = dataRDD1.zip(dataRDD2)// (1,3)(2,4)(3,5)(4,6)

思考一个问题:如果两个RDD数据类型不一致怎么办?

编译时报错,会发生错误

思考一个问题:如果两个RDD数据分区不一致怎么办?

如果数据分区不一致,会发生错误

思考一个问题:如果两个RDD分区数据数量不一致怎么办?

如果数据分区中数据量不一致,也会发生错误。

spark算子详细介绍(v、k-v、vv类型)相关推荐

  1. php中 b=a-=k =p,详细介绍下“K=K、K=A、A=B”代表的是什么意思?

    有客户经常问到"K=K.K=A.A=B"代表的是什么意思? 表示瓦楞纸箱纸质的要求,面纸和里纸为: K纸(K2.K1)进口牛卡,克重为130G和170G,有些业务员为了更高利润,会 ...

  2. K近邻算法和KD树详细介绍及其原理详解

    相关文章 K近邻算法和KD树详细介绍及其原理详解 朴素贝叶斯算法和拉普拉斯平滑详细介绍及其原理详解 决策树算法和CART决策树算法详细介绍及其原理详解 线性回归算法和逻辑斯谛回归算法详细介绍及其原理详 ...

  3. 『NLP学习笔记』Transformer技术详细介绍

    Transformer技术详细介绍! 文章目录 一. 整体结构图 二. 输入部分 2.1. 词向量 2.2. 位置编码 三. 注意力机制 3.1. 注意力机制的本质 3.2. 举例说明 3.3. Tr ...

  4. spark算子_Spark常用算子

    Spark的算子分类: 从大方向说,Spark算子大致可以分为以下两类: (1)Transformation变换/转换算子:这种变换并不触发提交作业,这种算子是延迟执行的,也就是说从一个RDD转换生成 ...

  5. Spark算子:统计RDD分区中的元素及数量

    Spark RDD是被分区的,在生成RDD时候,一般可以指定分区的数量,如果不指定分区数量,当RDD从集合创建时候,则默认为该程序所分配到的资源的CPU核数,如果是从HDFS文件创建,默认为文件的Bl ...

  6. java中char占的二进制,java数据类型与二进制详细介绍

    java数据类型与二进制详细介绍 在java中 Int 类型的变量占 4个字节 Long 类型的变量占8个字节 一个程序就是一个世界,变量是这个程序的基本单位. Java基本数据类型 1. 整数类型 ...

  7. web图片裁切插件 cropper.js 详细介绍

    cropper.js一个用来处理图片的插件,可以使用它来实现图片的各种模式下的裁切效果,当我们在做一个上传头像或者上传图片功能的时候,需要用户裁切出用户想要的图片位置就可以利用这个插件来实现','cr ...

  8. python语言中运算符号_详细介绍Python语言中的按位运算符

    <从问题到程序:用Python学编程和计算>--2.11 补充材料 本节书摘来自华章计算机<从问题到程序:用Python学编程和计算>一书中的第2章,第2.11节,作者:裘宗燕 ...

  9. 理解索引:MySQL执行计划详细介绍

    为什么80%的码农都做不了架构师?>>>    最近有个需求,要修改现有存储结构,涉及查询条件和查询效率的考量,看了几篇索引和HBase相关的文章,回忆了相关知识,结合项目需求,说说 ...

最新文章

  1. 模型压缩95%:Lite Transformer,MIT韩松等人
  2. vs2005 下的发邮件代码
  3. Java 的Comparator比较器用法
  4. 新手使用vue-router传参时注意事项
  5. 面向对象之类的内建函数
  6. java 语法 泛型_java-解密泛型语法
  7. Parameter '**' not found. Available parameters are [0, 1, param1, param2]解决办法
  8. 初识数据中心Mesos
  9. 户口所在地代码查询_毕业生如何查询档案存放地及存档问题?
  10. C#中List〈string〉和string[]数组之间的相互转换
  11. python中update是啥意思_python中update的基本使用方法详解
  12. imagenet 千分类标签翻译
  13. 全文检索服务 _ ElasticSearch
  14. win10卸载软件_win10系统卸载软件超详细教程
  15. MBA教学目标、内容和方法
  16. stm32毕业设计 单片机万能红外遥控器
  17. 小程序跳转到另一个小程序很慢很卡
  18. 用Java写了一个类QQ界面聊天小项目,可在线聊天(附源码)
  19. 记录Windows下有趣的cmd命令(持续更新)
  20. Redis-Cluster 主节点故障后集群恢复耗时调优原理

热门文章

  1. go redis incr的使用
  2. Linux嵌入式设备文件系统修改为ext4格式
  3. 全球及中国多晶透明陶瓷行业供需及竞争形势分析报告2021~2026年
  4. chrome14-使用snippets辅助debugging
  5. 面试连环炮:从HashSet开始,一路怼到CPU
  6. Caffeine使用篇 - Eviction
  7. [LifeHack]Hack决策系统
  8. 快速提升网站排名_使用快排优化的方法
  9. C语言学生管理系统(期末作业,超详细哟,拿走不谢!!!)
  10. 2019年c++/c,java,python,前端,数据结构,ps等资料大全