1. Spark 算子从功能上可以分为以下两类:

1)Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理。Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。

RDD转换算子根据数据处理方式的不同将算子整体上分为Value类型、双Value类型Key-Value类型

2)Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业。

Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统。

2. Value型Transformation算子

map mapPartitions mapPartitionsWithIndex flatMap
glom groupBy filter sample
distinct coalesce repartition sortBy

面试题:请列举Spark的transformation算子(不少于8个),并简述功能(重点)
1)map (func):返回一个新的 RDD,该 RDD 由每一个输入元素经过func 函数转换后组成.
2)mapPartitions(func):类似于 map,但独立地在 RDD 的每一个分片上运行,因此在类型为 T 的RDD上运行时,func 的函数类型必须是 Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而 mapPartitions 被调用M 次,一个函数一次处理所有分区。
3)reduceByKey(func,[numTask]):在一个(K,V)的 RDD 上调用,返回一个(K,V)的RDD,使用定的 reduce 函数,将相同key 的值聚合到一起,reduce 任务的个数可以通过第二个可选的参数来设置。
4)aggregateByKey (zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U: 在kv 对的 RDD 中,,按 key 将value 进行分组合并,合并时,将每个 value 和初始值作为 seq 函数的参数,进行计算,返回的结果作为一个新的 kv 对,然后再将结果按照 key 进行合并,最后将每个分组的 value 传递给 combine 函数进行计算(先将前两个value 进行计算,将返回结果和下一个 value 传给 combine 函数,以此类推),将 key 与计算结果作为一个新的 kv 对输出。
5)combineByKey(createCombiner: V=>C, mergeValue: (C, V) =>C, mergeCombiners: (C, C) =>C):
对相同 K,把 V 合并成一个集合。

1.createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作 createCombiner()的函数来创建那个键对应的累加器的初始值
2.mergeValue: 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并 
3.mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的mergeCombiners() 方法将各个分区的结果进行合并。

1) map

➢ 函数签名
def map[U: ClassTag](f: T => U): RDD[U]
➢ 函数说明
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。

需求1:将数组中每个数乘以2

package com.meiyuan.bigdata.spark.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Test {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkCore")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,2,3,4),2)// 【1,2】,【3,4】rdd.saveAsTextFile("output")val mapRDD = rdd.map(_*2)// 【2,4】,【6,8】mapRDD.saveAsTextFile("output1")mapRDD.collect().foreach(println)sc.stop()}
}

总结:

① rdd的计算一个分区内的数据是一个一个执行逻辑,只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据
② 分区内数据的执行是有序的。不同分区数据计算是无序的(并行无法确定先后)
③ 一个partition一个task发给Executor不会跨区执行任务
④ 分区不变, 数据转换之后所在的分区位置也不变

❖ 小功能:从服务器日志数据 apache.log 中获取用户请求 URL 资源路径

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark01_RDD_Operator_Transform_Test {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - mapval rdd = sc.textFile("data/apache.log")// 数据格式:IP地址+用户ID+访问时间戳+时区+请求方式// 这里面的String就是请求资源的路径val mapRDD: RDD[String] = rdd.map(line => {val data = line.split(" ")data(6) // 6表示文件位置6(从0开始数)})mapRDD.collect().foreach(println)sc.stop()}
}

2) mapPartitions

➢ 函数签名
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
➢ 函数说明
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。

需求:分区为2,下面数据一共操作几次?

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark02_RDD_Operator_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - mapPartitionsval rdd = sc.makeRDD(List(1,2,3,4), 2)// 一个分区数据全部拿到做操作;通过迭代器得到迭代器// 内存中的操作, 类似于批处理, 1和2一起操作, 3和4一起操作; 2次val mpRDD: RDD[Int] = rdd.mapPartitions(iter => {// 打印两次println(">>>>>>>>>>")iter.map(_ * 2) })mpRDD.collect().foreach(println)sc.stop()}
}

总结:

① 以分区为单位进行数据转换操作, 而不是一个一个操作
② 但是会将整个分区的数据加载到内存进行引用
③ 如果处理完的数据是不会被释放掉,存在对象的引用
④ 在内存较小,数据量较大的场合下,容易出现内存溢出。

❖ 小功能:获取每个数据分区的最大值

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.{SparkConf, SparkContext}object Spark02_RDD_Operator_Transform_Test {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - mapPartitionsval rdd = sc.makeRDD(List(1,2,3,4), 2)// 【1,2】,【3,4】//  每个分区最大值【2】,【4】val mpRDD = rdd.mapPartitions(iter => {// 迭代器得到迭代器,求迭代器最大值是一个值所以需要包起来变成迭代器List(iter.max).iterator })mpRDD.collect().foreach(println)sc.stop()}}

面试题:map和mapPartitions的区别?
➢ 数据处理角度
Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作。
➢ 功能的角度
Map算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
➢ 性能的角度
Map算子因为类似于串行操作,所以性能比较低,而是mapPartitions算子类似于批处理,所以性能较高。但是mapPartitions算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用map操作。

3) mapPartitionsWithIndex

➢ 函数签名
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
➢ 函数说明
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。

需求1:求数字的分区

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.{SparkConf, SparkContext}object Spark03_RDD_Operator_Transform1 {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - mapPartitionsWithIndexval rdd = sc.makeRDD(List(1,2,3,4))val mpiRDD = rdd.mapPartitionsWithIndex((index, iter) => {// 求数字的分区(分区号码, 数字)(一共8个分区)// 1,   2,    3,   4//(1,1)(3,2),(5,3),(7,4)iter.map(num => {(index, num)})})mpiRDD.collect().foreach(println)sc.stop()}
}

❖ 小功能:获取第二个数据分区的数据

4) flatMap

➢ 函数签名
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
➢ 函数说明
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

需求:拆词

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark04_RDD_Operator_Transform1 {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - flatMapval rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))// 一个字符串拆成一个一个单词// 简约写法 val flatRDD: RDD[String] = rdd.flatMap(_.split(" "))val flatRDD: RDD[String] = rdd.flatMap(s => {s.split(" ")})flatRDD.collect().foreach(println)sc.stop()}
}

❖ 小功能:将List(List(1,2),3,List(4,5))进行扁平化操作

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.{SparkConf, SparkContext}object Spark04_RDD_Operator_Transform2 {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - flatMapval rdd = sc.makeRDD(List(List(1,2),3,List(4,5)))val flatRDD = rdd.flatMap(data => {data match {case list:List[_] => listcase dat => List(dat)}})flatRDD.collect().foreach(println)sc.stop()}
}

5) glom

➢ 函数签名
def glom(): RDD[Array[T]]
➢ 函数说明
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

List => Int 整体拆成个体flatMap操作
Int => Array 相反的操作,个体变成整体,glom操作

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** @author meiyuan* @create 2021-09-17 2:59 PM*/
object Spark05_RDD_Operator_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - glomval rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)val glomRDD: RDD[Array[Int]] = rdd.glom()// data是两个分区的两个数组, 所以需要循环// 1,2// 3,4glomRDD.collect().foreach(data => println(data.mkString(",")))sc.stop()}
}

❖ 小功能:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark05_RDD_Operator_Transform_Test {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - glomval rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)// 【1,2】,【3,4】// 【2】,【4】 分区内取最大值// 【6】 分区之间求和// 分区数据变成数组val glomRDD: RDD[Array[Int]] = rdd.glom()                                       // 求每个分区最大值val maxRDD: RDD[Int] = glomRDD.map(array => {array.max} )println(maxRDD.collect().sum) // 采集回来也是数组sc.stop()}
}

6) groupBy

➢ 函数签名
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
➢ 函数说明
将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark06_RDD_Operator_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - groupByval rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)// groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组// 相同的key值的数据会放置在一个组中def groupFunction(num:Int) = {num % 2}val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction)// (0,CompactBuffer(2, 4))// (1,CompactBuffer(1, 3))groupRDD.collect().foreach(println)sc.stop()}
}

❖ 小功能:将List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组。

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.{SparkConf, SparkContext}object Spark06_RDD_Operator_Transform1 {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - groupByval rdd  = sc.makeRDD(List("Hello", "hive", "hbase", "Hadoop"), 2)// 分组和分区没有必然的关系val groupRDD = rdd.groupBy(_.charAt(0))// (h,CompactBuffer(hive, hbase))// (H,CompactBuffer(Hello, Hadoop))groupRDD.collect().foreach(println)sc.stop()}
}

❖ 小功能:从服务器日志数据apache.log中获取每个时间段访问量。

parse函数是把字符串转换成日期
format函数是把日期转换成字符串

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport java.text.SimpleDateFormat
import java.util.Dateimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark06_RDD_Operator_Transform_Test {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - groupByval rdd = sc.textFile("data/apache.log")// 从服务器日志数据中获取每个时间段的访问量val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(line => {val data = line.split(" ")val time = data(3)// 格式化 - 字符串格式需要统一val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")val date: Date = sdf.parse(time)// 字符串格式需要统一val sdf1 = new SimpleDateFormat("HH")val hour: String = sdf1.format(date) // 时间段(hour, 1) // 时间点出现了一次}).groupBy(_._1)timeRDD.map{case ( hour, iter ) => {(hour, iter.size)}}.collect.foreach(println)sc.stop()}
}

❖ 小功能:WordCount (看part5)

7) filter

➢ 函数签名
def filter(f: T => Boolean): RDD[T]
➢ 函数说明
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。
当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

需求:奇数

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark07_RDD_Operator_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - filterval rdd = sc.makeRDD(List(1,2,3,4))val filterRDD: RDD[Int] = rdd.filter(num=>num%2!=0)filterRDD.collect().foreach(println)sc.stop()}}

❖ 小功能:从服务器日志数据apache.log中获取2015年5月17日的请求路径

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.{SparkConf, SparkContext}object Spark07_RDD_Operator_Transform_Test {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - filterval rdd = sc.textFile("data/apache.log")// 从服务器日志数据中获取2015年5月17日的请求路径// 不用map是因为不是光返回time, 还要请求路径数据rdd.filter(line => {val data = line.split(" ")val time = data(3)time.startsWith("17/05/2015")}).collect().foreach(println)sc.stop()}
}

8) sample

➢ 函数签名
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
➢ 函数说明
根据指定的规则从数据集中抽取数据

sample算子需要传递三个参数
1. 第一个参数表示,抽取数据后是否将数据返回 true(放回),false(丢弃)
2. 第二个参数表示,
如果抽取不放回的场合:数据源中每条数据被抽取的概率,基准值的概念
如果抽取放回的场合:表示数据源中的每条数据被抽取的可能次数
3. 第三个参数表示,抽取数据时随机算法的种子
如果不传递第三个参数,那么使用的是当前系统时间

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.{SparkConf, SparkContext}object Spark08_RDD_Operator_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))// 结果 1,5,5,6,6,6,6,7,7,7,8,8,9,9,9,9 随机的println(rdd.sample(true,2//1).collect().mkString(","))sc.stop()}
}

9) distinct

➢ 函数签名
def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
➢ 函数说明
将数据集中重复的数据去重

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark09_RDD_Operator_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - filterval rdd = sc.makeRDD(List(1,2,3,4,1,2,3,4))// map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)// reduceByKey聚合规则根据key聚合不管第二个,直接返回第一个// (1, null),(2, null),(3, null),(4, null),(1, null),(2, null),(3, null),(4, null)// (1, null)(1, null)(1, null)// (null, null) => null// (1, null) => 1val rdd1: RDD[Int] = rdd.distinct()rdd1.collect().foreach(println)sc.stop()}
}

总结

scala底层是用HashSet去重,而RDD是用分布式处理方式去重

10) coalesce

➢ 函数签名
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
➢ 函数说明
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率
当spark程序中,存在过多的小任务的时候,可以通过coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark10_RDD_Operator_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,2,3,4,5,6), 3)//        val newRDD: RDD[Int] = rdd.coalesce(2) // 默认不shuffleval newRDD: RDD[Int] = rdd.coalesce(2,true)newRDD.saveAsTextFile("output")sc.stop()}
}

总结:

① coalesce方法默认情况下不会将分区的数据打乱重新组合,这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
② 如果想要让数据均衡,可以进行shuffle处理, 第二个参数为true => 数据打乱没有规律,不会是【1,2,3】【4,5,6】

11) repartition

➢ 函数签名
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
➢ 函数说明
该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark11_RDD_Operator_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - filterval rdd = sc.makeRDD(List(1,2,3,4,5,6), 2)//val newRDD: RDD[Int] = rdd.coalesce(3, true)// 扩大分区, 均匀分配数据变少, 并行计算能力增加val newRDD: RDD[Int] = rdd.repartition(3) newRDD.saveAsTextFile("output")sc.stop()}
}

总结:

① coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义,不起作用。所以如果想要实现扩大分区的效果,需要使用shuffle操作
② spark提供了一个简化的操作
缩减分区:coalesce,如果想要数据均衡,可以采用shuffle
扩大分区:repartition, 底层代码调用的就是coalesce,而且肯定采用shuffle

面试题:Repartition和Coalesce 的关系与区别,能简单说说吗?
1)关系:
两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)
2)区别:
repartition一定会发生shuffle,coalesce 根据传入的参数来判断是否发生shuffle。
一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce。

12) sortBy

➢ 函数签名
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
➢ 函数说明
该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为升序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。中间存在shuffle的过程

需求: list排序

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark12_RDD_Operator_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - sortBy 根据制定规则排序, 默认分区数量不变val rdd = sc.makeRDD(List(6,2,4,5,3,1), 2)val newRDD: RDD[Int] = rdd.sortBy(num=>num)// 分2区,数据从小到大排序【1,2,3】【4,5,6】newRDD.saveAsTextFile("output")sc.stop()}
}

需求:tuple对按照第一个元素排序

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.{SparkConf, SparkContext}object Spark12_RDD_Operator_Transform1 {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - sortByval rdd = sc.makeRDD(List(("1", 1), ("11", 2), ("2", 3)), 2)// val newRDD = rdd.sortBy(t=>t._1) 按字符串排序11和2比较1在2前面// 按数字排序,false是降序val newRDD = rdd.sortBy(t=>t._1.toInt, false) // (11,2)// (2,3)// (1,1)newRDD.collect().foreach(println)sc.stop()}
}

总结:

① sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序,第二个参数可以改变排序的方式
② sortBy默认情况下,不会改变分区。但是中间存在shuffle操作

3. 双Value型Transformation算子

intersection union subtract zip

13) intersection

➢ 函数签名
def intersection(other: RDD[T]): RDD[T]
➢ 函数说明
对源RDD和参数RDD求交集后返回一个新的RDD

14) union

➢ 函数签名
def union(other: RDD[T]): RDD[T]
➢ 函数说明
对源RDD和参数RDD求并集后返回一个新的RDD

15) subtract

➢ 函数签名
def subtract(other: RDD[T]): RDD[T]
➢ 函数说明
以一个RDD元素为主,去除两个RDD中重复元素,将其他元素保留下来。求差集

16) zip

➢ 函数签名
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
➢ 函数说明
将两个RDD中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的相同位置的元素。

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark13_RDD_Operator_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - 双Value类型val rdd1 = sc.makeRDD(List(1,2,3,4))val rdd2 = sc.makeRDD(List(3,4,5,6))val rdd7 = sc.makeRDD(List("3","4","5","6"))// 交集 : 【3,4】val rdd3: RDD[Int] = rdd1.intersection(rdd2)//val rdd8 = rdd1.intersection(rdd7)println(rdd3.collect().mkString(","))// 并集 : 【1,2,3,4,3,4,5,6】val rdd4: RDD[Int] = rdd1.union(rdd2)println(rdd4.collect().mkString(","))// 差集 : 【1,2】val rdd5: RDD[Int] = rdd1.subtract(rdd2)println(rdd5.collect().mkString(","))// 拉链 : 【1-3,2-4,3-5,4-6】val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)val rdd8 = rdd1.zip(rdd7)println(rdd6.collect().mkString(","))sc.stop()}
}
package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark13_RDD_Operator_Transform1 {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - 双Value类型val rdd1 = sc.makeRDD(List(1,2,3,4),2)val rdd2 = sc.makeRDD(List(3,4,5,6),2)val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)println(rdd6.collect().mkString(","))sc.stop()}
}

总结:

① 交集(intersection),并集(union)和差集(subtract)要求两个数据源数据类型保持一致
② 拉链(zip)操作两个数据源的类型可以不一致
③ Can't zip RDDs with unequal numbers of partitions: List(2, 4) - 两个数据源要求分区数量要保持一致
     val rdd1 = sc.makeRDD(List(1,2,3,4),2)
     val rdd2 = sc.makeRDD(List(3,4,5,6),4)
④ Can only zip RDDs with same number of elements in each partition - 两个数据源要求分区中数据数量保持一致
     val rdd1 = sc.makeRDD(List(1,2,3,4,5,6),2)
     val rdd2 = sc.makeRDD(List(3,4,5,6),2)

3. Key-Value型Transformation算子

partitionBy reduceByKey groupByKey aggregateByKey foldByKey
combineByKey sortByKey join leftOuterJoin cogroup

17) partitionBy

➢ 函数签名
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
➢ 函数说明
将数据按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner

partitionBy方法是 PairRDDFunctions提供的,这里从 RDD => PairRDDFunctions 利用开发OCP原则,通过隐式转换 (二次编译), 伴生对象有一个方法rddToPairRDDFunctions 把RDD变为PairRDDFunctions

区分partitionBy和coalesce和repartition:
① partitionBy根据指定的分区规则对数据进行重分区
② 区分coalesce和repartition是分区数量的改变
③ group by只对数据分组和分区无关

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}object Spark14_RDD_Operator_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - (Key - Value类型)val rdd = sc.makeRDD(List(1,2,3,4),2)// int => tuple 对偶元组形成键值可以使用val mapRDD:RDD[(Int, Int)] = rdd.map((_,1)) val newRDD = mapRDD.partitionBy(new HashPartitioner(2))// 当前就返回他自己, 所以这一步没有意义newRDD.partitionBy(new HashPartitioner(2))newRDD.saveAsTextFile("output")sc.stop()}
}

总结:

如果重分区的分区器和当前RDD的分区器一样怎么办? 在底层源码判断了当前的分区器和传入的分区器类型和分区数量。首先匹配分区的类型是否相同,如果相同判断分区数量是否相同, 如果类型和数量都相同,返回它自己。
Spark是否有其他分区器?RangePartitioner 排序使用
如果想要按自己的方法进行数据分区怎么办?可以自己写分区器, 改变数据存放的位置

18) reduceByKey

➢ 函数签名
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
➢ 函数说明
可以将数据按照相同的Key对Value进行聚合

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark15_RDD_Operator_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - (Key - Value类型)val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))val reduceRDD: RDD[(String, Int)] = rdd.reduceByKey( (x:Int, y:Int) => {println(s"x = ${x}, y = ${y}")x + y} )// x = 1, y = 2// x = 3, y = 3// (a,6)// (b,4)reduceRDD.collect().foreach(println)sc.stop()}
}

总结:
① reduceByKey中如果key的数据只有一个,是不会参与运算的。
② reduceByKey相同key的value两两聚合,reduceByKey支持分区内预聚合功能(落盘之前就聚合在一起),可以有效减少shuffle时落盘的数据量,提升shuffle的性能。
分区内 (同一个分区内数据做聚合),分区间 (落盘之后就是多个分区间做聚合),分区内和分区间计算规则是相同的。
③ 如果分区内和分区间计算规则不相同可以用aggregatedByKey,比如
【1,2】,【3,4】 => 分区内求最大值【2】,【4】 分区间求和【6】

❖ 小功能:WordCount

19) groupByKey

➢ 函数签名
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
➢ 函数说明
将数据源的数据根据key对value进行分组

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark16_RDD_Operator_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - (Key - Value类型)val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()// (a,CompactBuffer(1, 2, 3))// (b,CompactBuffer(4))groupRDD.collect().foreach(println)// (groupBy(tuple._1)按tuple的第一个元素分组val groupRDD1: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)// (a,CompactBuffer((a,1), (a,2), (a,3)))// (b,CompactBuffer((b,4)))groupRDD1.collect().foreach(println)sc.stop()}
}

总结:

groupByKey : 将数据源中的数据,相同key的数据分在一个组中,形成一个对偶元组
元组中的第一个元素就是key,key是确定的, value单独拿出来
元组中的第二个元素就是相同key的value的集合

groupBy:按哪个条件分组不确定,不一定是通过key分组,并且没有单独把value拿出来,而是把整体进行分组。

❖ 小功能:WordCount

面试题:reduceByKey与groupByKey的区别,哪一种更具优势?

从shuffle的角度:reduceByKey和groupByKey都存在shuffle的操作,但是reduceByKey可以在shuffle前对分区内相同key的数据进行预聚合(combine)功能,返回结果是RDD[k,v],这样会减少落盘的数据量,而groupByKey只是按照key进行分组,不存在数据量减少的问题,reduceByKey性能比较高。【spark中,shuffle操作必须落盘处理,shuffle操作的性能非常低(与磁盘交互)】
从功能的角度:reduceByKey其实包含分组和聚合的功能。GroupByKey只能分组,不能聚合,所以在分组聚合的场合下,推荐使用reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用groupByKey

所以,在实际开发过程中,reduceByKey比groupByKey,更建议使用。但是需要注意是否会影响业务逻辑。

20) aggregateByKey

➢ 函数签名
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]
➢ 函数说明
将数据根据不同的规则进行分区内计算和分区间计算

(初始值为0)

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.{SparkConf, SparkContext}object Spark17_RDD_Operator_Transform1 {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - (Key - Value类型)val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3),("b", 4), ("b", 5), ("a", 6)),2)// math.min(x, y)// math.max(x, y)// 分区内取最大值// (a,5) (a,1) => (a,5) (a,2) =>(a,5); (b,5) (b,3) => (b,5)// (b,4) (b,5) => (b,5) (b,5) => (b,5);  (a,5) (a,6) => (a,6)// 分区间求和 (b,5) (b,5) => (b,10); (a,5) (a,6) => (a,11)// (b,10)// (a,11)// (b,12)// (a,9)// (b,12)// (a,9)rdd.aggregateByKey(5)((x, y) => math.max(x, y),(x, y) => x + y).collect.foreach(println)// 分区内和分区间也可以做相同规则的计算// 分区内// (a,3),(b,3)// (b,9),(a,6)// 分区间// (b,3),(b,9) => (b,12)// (a,3),(a,6) => (a,9)rdd.aggregateByKey(0)((x, y) => x + y,(x, y) => x + y).collect.foreach(println)// scala简化原则rdd.aggregateByKey(0)(_+_, _+_).collect.foreach(println)sc.stop()}
}

总结:
①aggregateByKey存在函数柯里化,有两个参数列表, 没有默认值
a. 第一个参数列表,需要传递一个参数,表示为初始值
    主要用于当碰见第一个key的时候,和value进行分区内计算
b. 第二个参数列表需要传递2个参数
    第一个参数表示分区内计算规则
    第二个参数表示分区间计算规则

需求:取得不同分区当中相同key的的平均值

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark18_RDD_Operator_Transform3 {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - (Key - Value类型)// 取得不同分区当中相同key的的平均值val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3),("b", 4), ("b", 5), ("a", 6)),2)// aggregateByKey最终的返回数据结果应该和初始值的类型保持一致//val aggRDD: RDD[(String, String)] = rdd.aggregateByKey("")(_ + _, _ + _)//aggRDD.collect.foreach(println)// 获取相同key的数据的平均值 => (a, 3),(b, 4)// 第一个0是相同key计算的初始值,如a - 1,2,6, 第二个0是key出现的次数// (0,0)表示相同key的初始值// 返回的RDD[(K, U)] K就是key,即String, U就是tupleval newRDD : RDD[(String, (Int, Int))] = rdd.aggregateByKey( (0,0) )(( t, v ) => {(t._1 + v, t._2 + 1) // v代表数量,分区内(数据相加, 次数相加)},(t1, t2) => {(t1._1 + t2._1, t1._2 + t2._2) // 分区间相加})val resultRDD: RDD[(String, Int)] = newRDD.mapValues {case (num, cnt) => {num / cnt}}// (b,4)// (a,3)resultRDD.collect().foreach(println)sc.stop()}
}

21) foldByKey

➢ 函数签名
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
➢ 函数说明
当分区内计算规则和分区间计算规则相同时,aggregateByKey就可以简化为foldByKey

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.{SparkConf, SparkContext}object Spark17_RDD_Operator_Transform2 {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - (Key - Value类型)val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3),("b", 4), ("b", 5), ("a", 6)),2)//rdd.aggregateByKey(0)(_+_, _+_).collect.foreach(println)// 如果聚合计算时,分区内和分区间计算规则相同,spark提供了简化的方法rdd.foldByKey(0)(_+_).collect.foreach(println)sc.stop()}
}

总结:

① aggregateByKey最终的返回数据结果应该和初始值的类型保持一致

22) combineByKey

➢ 函数签名
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]
➢ 函数说明
最通用的对key-value型rdd进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark19_RDD_Operator_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - (Key - Value类型)val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3),("b", 4), ("b", 5), ("a", 6)),2)val newRDD : RDD[(String, (Int, Int))] = rdd.combineByKey(// 之前的方法是有初始值, 所以就需要在tuple那个给数据类型,否则易报错v => (v, 1), // 分区内规则( t:(Int, Int), v ) => {(t._1 + v, t._2 + 1)},// 分区间规则(t1:(Int, Int), t2:(Int, Int)) => {(t1._1 + t2._1, t1._2 + t2._2)})val resultRDD: RDD[(String, Int)] = newRDD.mapValues {case (num, cnt) => {num / cnt}}resultRDD.collect().foreach(println)sc.stop()}
}

总结:
① combineByKey : 方法需要三个参数
第一个参数表示:将相同key的第一个数据进行结构的转换,实现操作
第二个参数表示:分区内的计算规则
第三个参数表示:分区间的计算规则

② aggregateByKey设置了初始值,但是key的次数并没有包括设置初始值的数量,不太合理,所以使用combineByKey对第一个数据结构进行转换,从a,1到a,(1,1)

③ mapValues :针对(Key, Value)型数据中的 Value 进行 Map 操作,而不对 Key 进行处理。

面试题:reduceByKey、foldByKey、aggregateByKey、combineByKey的区别?
reduceByKey: 相同key的第一个数据不进行任何计算,分区内和分区间计算规则相同
FoldByKey: 相同key的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同AggregateByKey:相同key的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
CombineByKey: 当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。

23) sortByKey

➢ 函数签名
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)]
➢ 函数说明
在一个(K,V)的RDD上调用,K必须实现Ordered接口(特质),返回一个按照key进行排序的

24) join

➢ 函数签名
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
➢ 函数说明
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素连接在一起的(K,(V,W))的RDD

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark21_RDD_Operator_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - (Key - Value类型)val rdd1 = sc.makeRDD(List(("a", 1), ("a", 2), ("c", 3)))val rdd2 = sc.makeRDD(List(("a", 5), ("c", 6),("a", 4)))val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)// (a,(1,5))// (a,(1,4))// (a,(2,5))// (a,(2,4))// (c,(3,6))joinRDD.collect().foreach(println)sc.stop()}
}

总结:
① join : 两个不同数据源的数据,相同的key的value会连接在一起,形成元组
② 如果两个数据源中key没有匹配上,那么数据不会出现在结果中
③ 如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔乘积,数据量会几何性增长,会导致性能降低。

25) leftOuterJoin

➢ 函数签名
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
➢ 函数说明
类似于SQL语句的左外连接

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.{SparkConf, SparkContext}object Spark22_RDD_Operator_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - (Key - Value类型)val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2)//, ("c", 3)))val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5),("c", 6)))val leftJoinRDD = rdd1.leftOuterJoin(rdd2)// (a,(1,Some(4)))// (b,(2,Some(5)))leftJoinRDD.collect().foreach(println)//        val rightJoinRDD = rdd1.rightOuterJoin(rdd2)//(a,(Some(1),4))// (b,(Some(2),5))// (c,(None,6))
//        rightJoinRDD.collect().foreach(println)sc.stop()}
}

26) cogroup

➢ 函数签名
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
➢ 函数说明
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark23_RDD_Operator_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子 - (Key - Value类型)val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2)//, ("c", 3)))val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5),("c", 6),("c", 7)))val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)// (a,(CompactBuffer(1),CompactBuffer(4)))// (b,(CompactBuffer(2),CompactBuffer(5)))// (c,(CompactBuffer(),CompactBuffer(6, 7)))cgRDD.collect().foreach(println)sc.stop()}
}

总结:
cogroup : connect + group (分组,连接) 同一个key分组在一起然后连接在一起

面试题:有哪些会引起Shuffle过程的Spark算子呢?

shuffle过程,简单来说,就是将分布在集群中多个节点上的同一个key拉取到同一个节点上,进行聚合或join等操作。

① 重分区算子:repartition、coalesce

② ByKey算子:groupByKey、reduceByKey、aggregateByKey、combineByKey、sortByKey、sortBy

③ join 算子: cogroup、join、leftOuterJoin、intersection、subtract、subtractByKey

④ 去重算子: distinct

4. Actions算子 

reduce collect count first take
takeOrdered aggregate fold countByKey save 相关算子
foreach

所谓的行动算子,其实就是触发作业(Job)执行的方法,底层代码调用的是环境对象的runJob方法,底层代码中会创建ActiveJob,并提交执行。

请列举Spark的action算子(不少于6个),并简述功能(重点) 标黄色是必备

1) reduce

➢ 函数签名
def reduce(f: (T, T) => T): T
➢ 函数说明
聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据

package com.meiyuan.bigdata.spark.core.rdd.operator.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark02_RDD_Operator_Action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,2,3,4))// reduce 两两聚合1和2变为3, 3和3聚合为6,6和4聚合为10val i: Int = rdd.reduce(_+_) println(i) // 10sc.stop()}
}

2) collect

➢ 函数签名
def collect(): Array[T]
➢ 函数说明
在驱动程序中,以数组Array的形式返回数据集的所有元素

package com.meiyuan.bigdata.spark.core.rdd.operator.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark02_RDD_Operator_Action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,2,3,4))// collect : 方法会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组val ints: Array[Int] = rdd.collect()// 结果为1,2,3,4println(ints.mkString(","))sc.stop()}
}

3) count

➢ 函数签名
def count(): Long
➢ 函数说明
返回RDD中元素的个数

package com.meiyuan.bigdata.spark.core.rdd.operator.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark02_RDD_Operator_Action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,2,3,4))// count : 数据源中数据的个数val cnt = rdd.count()// 结果为4println(cnt)sc.stop()}
}

4) first

➢ 函数签名
def first(): T
➢ 函数说明
返回RDD中的第一个元素

package com.meiyuan.bigdata.spark.core.rdd.operator.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark02_RDD_Operator_Action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,2,3,4))// first : 获取数据源中数据的第一个val first = rdd.first()// 结果是1println(first)sc.stop()}
}

5) take

➢ 函数签名
def take(num: Int): Array[T]
➢ 函数说明
返回一个由RDD的前n个元素组成的数组

package com.meiyuan.bigdata.spark.core.rdd.operator.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark02_RDD_Operator_Action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,2,3,4))// take : 获取N个数据val ints: Array[Int] = rdd.take(3)// 1,2,3println(ints.mkString(","))sc.stop()}
}

6) takeOrdered

➢ 函数签名
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
➢ 函数说明
返回该RDD排序后的前n个元素组成的数组

package com.meiyuan.bigdata.spark.core.rdd.operator.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark02_RDD_Operator_Action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,2,3,4))// takeOrdered : 数据排序后,取N个数据, 默认升序排列, 降序把第二个参数传进去val rdd1 = sc.makeRDD(List(4,2,3,1))val ints1: Array[Int] = rdd1.takeOrdered(3)(Ordering[Int].reverse)// 4,3,2println(ints1.mkString(","))sc.stop()}
}

7) aggregate

➢ 函数签名
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
➢ 函数说明
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

aggregateByKey : 初始值只会参与分区内计算
aggregate : 初始值会参与分区内计算,并且参与分区间计算

package com.meiyuan.bigdata.spark.core.rdd.operator.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark03_RDD_Operator_Action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,2,3,4),2)   val result = rdd.aggregate(10)(_+_, _+_)//10 + 13 + 17 = 40println(result)sc.stop()}
}

8) fold

➢ 函数签名
def fold(zeroValue: T)(op: (T, T) => T): T
➢ 函数说明
折叠操作,aggregate的简化版操作

分区内和分区间计算规则相同

package com.meiyuan.bigdata.spark.core.rdd.operator.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark03_RDD_Operator_Action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,2,3,4),2)   val result = rdd.fold(10)(_+_)// 40println(result)sc.stop()}
}

9) countByKey

➢ 函数签名
def countByKey(): Map[K, Long]
➢ 函数说明
统计每种key的个数

需求:统计每个value出现的次数

package com.meiyuan.bigdata.spark.core.rdd.operator.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark04_RDD_Operator_Action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,1,1,4),2)// TODO - 行动算子// 统计每个value出现的次数val intToLong: collection.Map[Int, Long] = rdd.countByValue()// Map(4 -> 1, 1 -> 3)println(intToLong)sc.stop()}
}

需求:统计每个key出现的次数

package com.meiyuan.bigdata.spark.core.rdd.operator.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark04_RDD_Operator_Action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a", 1),("a", 2),("a", 3)))// 统计每个key出现的次数和value没有关系 val stringToLong: collection.Map[String, Long] = rdd.countByKey()// Map(a -> 3)println(stringToLong)sc.stop()}
}

10) save 相关算子

➢ 函数签名
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
➢ 函数说明
将数据保存到不同格式的文件中

package com.meiyuan.bigdata.spark.core.rdd.operator.actionimport org.apache.spark.{SparkConf, SparkContext}object Spark05_RDD_Operator_Action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)//val rdd = sc.makeRDD(List(1,1,1,4),2)val rdd = sc.makeRDD(List(("a", 1),("a", 2),("a", 3)))rdd.saveAsTextFile("output")rdd.saveAsObjectFile("output1")// saveAsSequenceFile方法要求数据的格式必须为K-V类型rdd.saveAsSequenceFile("output2")sc.stop()}
}

11) foreach

➢ 函数签名
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
➢ 函数说明
分布式遍历RDD中的每一个元素,调用指定函数

总结:
foreach外边的代码在Driver端执行

foreach内的代码在Executor执行

foreach用于换行打印

5. 不同算子实现的不同写法的WordCount

package com.meiyuan.bigdata.spark.core.wcimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutableobject Spark03_WordCount {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")val sc = new SparkContext(sparConf)wordcount1(sc)wordcount2(sc)wordcount3(sc)wordcount4(sc)wordcount5(sc)wordcount6(sc)wordcount7(sc)wordcount8(sc)wordcount9(sc)sc.stop()}// groupBydef wordcount1(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val group: RDD[(String, Iterable[String])] = words.groupBy(word=>word)val wordCount: RDD[(String, Int)] = group.mapValues(iter=>iter.size)wordCount.collect().foreach(println)println("wordcount1========================================")}//  // groupByKey (shuffle,数据量大性能不高)def wordcount2(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1)) // 要求数据有key和value类型val group: RDD[(String, Iterable[Int])] = wordOne.groupByKey()val wordCount: RDD[(String, Int)] = group.mapValues(iter=>iter.size)wordCount.collect().foreach(println)println("wordcount2========================================")}// reduceByKeydef wordcount3(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val wordCount: RDD[(String, Int)] = wordOne.reduceByKey(_+_)wordCount.collect().foreach(println)println("wordcount3========================================")}// aggregateByKeydef wordcount4(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val wordCount: RDD[(String, Int)] = wordOne.aggregateByKey(0)(_+_, _+_)wordCount.collect().foreach(println)println("wordcount4========================================")}// foldByKeydef wordcount5(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val wordCount: RDD[(String, Int)] = wordOne.foldByKey(0)(_+_)wordCount.collect().foreach(println)println("wordcount5========================================")}// combineByKeydef wordcount6(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val wordCount: RDD[(String, Int)] = wordOne.combineByKey(v=>v,(x:Int, y) => x + y,(x:Int, y:Int) => x + y)wordCount.collect().foreach(println)println("wordcount6========================================")}// countByKeydef wordcount7(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val wordCount: collection.Map[String, Long] = wordOne.countByKey()wordCount.foreach(println)println("wordcount7========================================")}// countByValuedef wordcount8(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordCount: collection.Map[String, Long] = words.countByValue()wordCount.foreach(println)println("wordcount8========================================")}// reduce, aggregate, fold (scala)def wordcount9(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))// 【(word, count),(word, count)】// word => Map[(word,1)]val mapWord = words.map(word => {mutable.Map[String, Long]((word,1))})// 把两个map合并在一起, 需要相同key将value做聚合, 不同key做累加// map2这里是k-v对, 用foreach取出来k-v对, map1进行聚合val wordCount = mapWord.reduce((map1, map2) => {map2.foreach{case (word, count) => {val newCount = map1.getOrElse(word, 0L) + countmap1.update(word, newCount)}}map1})println(wordCount)println("wordcount9========================================")}
}

6. 需求分析

需求 1:统计出每一个省份每个广告被点击数量排行的Top3

1) 数据准备
agent.log:时间戳,省份,城市,用户,广告,中间字段使用空格分隔。
2) 需求分析如下:
1. 获取原始数据:时间戳,省份,城市,用户,广告
2. 将原始数据进行结构的转换。方便统计
时间戳,省份,城市,用户,广告 => ( ( 省份,广告 ), 1 )
3. 将转换结构后的数据,进行分组聚合
 ( ( 省份,广告 ), 1 ) => ( ( 省份,广告 ), sum )
4. 将聚合的结果进行结构的转换
( ( 省份,广告 ), sum ) => ( 省份, ( 广告, sum ) ) 模式匹配
5.  将转换结构后的数据根据省份进行分组
( 省份, 【( 广告A, sumA ),( 广告B, sumB )】 )
6. 将分组后的数据组内排序(降序),取前3名
7. 采集数据打印在控制台
3) 功能实现

package com.meiyuan.bigdata.spark.core.rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark24_RDD_Req {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 案例实操// 统计每一个省份每个广告被点击数量排行的Top3// 1. 获取原始数据:时间戳,省份,城市,用户,广告val dataRDD = sc.textFile("data/agent.log")// 2. 将原始数据进行结构的转换。方便统计//    时间戳,省份,城市,用户,广告//    =>//    ( ( 省份,广告 ), 1 )val mapRDD = dataRDD.map(line => {val data = line.split(" ")(( data(1), data(4) ), 1)})// 3. 将转换结构后的数据,进行分组聚合//    ( ( 省份,广告 ), 1 ) => ( ( 省份,广告 ), sum )val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey(_+_)// 4. 将聚合的结果进行结构的转换//    ( ( 省份,广告 ), sum ) => ( 省份, ( 广告, sum ) ) 模式匹配val newMapRDD = reduceRDD.map{case ( (prv, ad), sum ) => {(prv, (ad, sum))}}// 5. 将转换结构后的数据根据省份进行分组//    ( 省份, 【( 广告A, sumA ),( 广告B, sumB )】 )val groupRDD: RDD[(String, Iterable[(String, Int)])] = newMapRDD.groupByKey()// 6. 将分组后的数据组内排序(降序),取前3名val resultRDD = groupRDD.mapValues( // key保持不变只对value操作进行数据转换iter => {iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)} // 可迭代的集合不能排序, ._2代表迭代器中tuple的第二个元素)// 7. 采集数据打印在控制台resultRDD.collect().foreach(println)sc.stop()}
}

需求2~4数据

// 用户访问动作表
case class UserVisitAction(
date: String,// 用户点击行为的日期
user_id: Long,// 用户的 ID
session_id: String,//Session 的 ID
page_id: Long,// 某个页面的 ID
action_time: String,// 动作的时间点
search_keyword: String,// 用户搜索的关键词
click_category_id: Long,// 某一个商品品类的 ID
click_product_id: Long,// 某一个商品的 ID
order_category_ids: String,// 一次订单中所有品类的 ID 集合
order_product_ids: String,// 一次订单中所有商品的 ID 集合
pay_category_ids: String,// 一次支付中所有品类的 ID 集合
pay_product_ids: String,// 一次支付中所有商品的 ID 集合
city_id: Long
) // 城市 id

数据主要包含用户的 4 种行为: 搜索,点击,下单,支付 。 数据规则如下:
➢ 数据文件中每行数据采用 下划线 分隔数据
➢ 每一行数据表示用户的一次行为,这个行为只能是 4 种行为的一种
➢ 如果搜索关键字为 null, 表示数据不是搜索数据
➢ 如果点击的品类 ID 和产品 ID 为 -1 ,表示数据不是点击数据
➢ 针对于下单行为,一次可以下单多个商品,所以品类 ID 和产品 ID 可以是多个, id 之
间采用 逗号 分隔,如果本次不是下单行为,则数据采用 null 表示
➢ 支付行为和下单行为类似

需求 2 Top10 热门品类

1) 需求优化::先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。

方案1:分别统计每个品类点击的次数,下单的次数和支付的次数:
(品类,点击总数)(品类,下单总数)(品类,支付总数)
需求分析如下:
1.读取原始日志数据
2.统计品类的点击数量: (品 类ID, 点击数量) 【先过滤出-1不是点击行为再聚合】
3. 统计品类的下单数量: (品类ID, 下单数量)
【先过滤出null不是下单行为且扁平化order_category_ids(品类id用逗号隔开)再聚合】
4. 统计品类的支付数量: (品类ID, 支付数量) 
【先过滤出null不是支付行为且扁平化order_category_ids再聚合】
5. 将品类进行排序,并且取前10名
点击数量排序, 下单数量排序, 支付数量排序
元组排序:先比较第一个, 再比较第二个,再比较第三个,依此类推
( 品类ID, (点击数量,下单数量, 支付数量) )
cogroup = connect + group
将两个不同数据源连接在一起, join, 拉链zip, leftOuterJoin, cogroup
① join × 数据源需要有相同的key才能连一起, 即有点击不一定有下单
② zip × 分区的数量和分区元素的数量有要求, 但是题目没说要把相同品类放在一起, 所以zip的连接是和数量以及位置有关
③ leftOuterJoin不确定哪个是主表 ×
④ cogroup √ 会在自己的数据源中建立分组, 和另外一个数据源做connect连接, connect + group 即使不存在也会有组的概念的存在
6.将结果采集到控制台打印出来

功能实现:注意迭代器的使用

package com.meiyuan.bigdata.spark.core.reqimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark01_Req1_HotCategoryTop10Analysis {def main(args: Array[String]): Unit = {// TODO : Top10热门品类val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")val sc = new SparkContext(sparConf)// 1. 读取原始日志数据val actionRDD = sc.textFile("data/user_visit_action.txt")// 2. 统计品类的点击数量:(品类ID,点击数量)// 点击的品类ID为-1, 表示数据不是点击数据, click_category_id是7,索引是6val clickActionRDD = actionRDD.filter(action => {val data = action.split("_")data(6) != "-1"})val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(action => {val data = action.split("_")(data(6), 1) // (点击品类ID,1)}).reduceByKey(_ + _)// 3. 统计品类的下单数量:(品类ID,下单数量)// 如果本次不是下单行为,则数据采用null表示val orderActionRDD = actionRDD.filter(action => {val data = action.split("_")data(8) != "null"})// 针对下单行为, 一次可以下单多个商品, 所以品类ID和产品ID可以是多个// 所以需要把多个品类ID, 拆成一个一个, 比如:orderID => 1,2,3// 【(1,1),(2,1),(3,1)】, 所以是扁平化操作val orderCountRDD = orderActionRDD.flatMap(action => {val data = action.split("_")val cid = data(8)val cids = cid.split(",") //多个品类ID是用逗号隔开cids.map(id=>(id, 1))}).reduceByKey(_+_)// 4. 统计品类的支付数量:(品类ID,支付数量)val payActionRDD = actionRDD.filter(action => {val data = action.split("_")data(10) != "null"})// orderid => 1,2,3// 【(1,1),(2,1),(3,1)】val payCountRDD = payActionRDD.flatMap(action => {val data = action.split("_")val cid = data(10)val cids = cid.split(",")cids.map(id=>(id, 1))}).reduceByKey(_+_)// 5. 将品类进行排序,并且取前10名//    点击数量排序,下单数量排序,支付数量排序//    元组排序:先比较第一个,再比较第二个,再比较第三个,依此类推//    ( 品类ID, ( 点击数量, 下单数量, 支付数量 ) )val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] =clickCountRDD.cogroup(orderCountRDD, payCountRDD)val analysisRDD = cogroupRDD.mapValues{case ( clickIter, orderIter, payIter ) => {var clickCnt = 0val iter1 = clickIter.iteratorif ( iter1.hasNext ) {clickCnt = iter1.next()}var orderCnt = 0val iter2 = orderIter.iteratorif ( iter2.hasNext ) {orderCnt = iter2.next()}var payCnt = 0val iter3 = payIter.iteratorif ( iter3.hasNext ) {payCnt = iter3.next()}( clickCnt, orderCnt, payCnt )}}// 品类是._1, 所以是._2val resultRDD = analysisRDD.sortBy(_._2, false).take(10)// 6. 将结果采集到控制台打印出来resultRDD.foreach(println)sc.stop()}
}

方案2:一次性统计每个品类点击的次数,下单的次数和支付的次数:
(品类,(点击总数,下单总数,支付总数))

方案1优化:
actionRDD重复使用:actionRDD.cache() 
cogroup性能较低 (分区规则不同就会有可能启动shuffle),第5步重写成下面的逻辑
(品类ID, 点击数量) => (品类ID, (点击数量, 0, 0))
(品类ID, 下单数量) => (品类ID, (0, 下单数量, 0))
 两两聚合在一起    => (品类ID, (点击数量, 下单数量, 0))
(品类ID, 支付数量) => (品类ID, (0, 0, 支付数量))
两两聚合在一起     => (品类ID, (点击数量, 下单数量, 支付数量))
最终和方案1结果一样的( 品类ID, ( 点击数量, 下单数量, 支付数量 ) )

在方案1基础上优化的功能实现

package com.meiyuan.bigdata.spark.core.reqimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark02_Req1_HotCategoryTop10Analysis1 {def main(args: Array[String]): Unit = {// TODO : Top10热门品类val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")val sc = new SparkContext(sparConf)// Q : actionRDD重复使用// Q : cogroup性能可能较低 (如果分区不相同就会有shuffle)// 1. 读取原始日志数据val actionRDD = sc.textFile("data/user_visit_action.txt")actionRDD.cache() // actionRDD重复使用解决方案// 2. 统计品类的点击数量:(品类ID,点击数量)val clickActionRDD = actionRDD.filter(action => {val data = action.split("_")data(6) != "-1"})val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(action => {val data = action.split("_")(data(6), 1)}).reduceByKey(_ + _)// 3. 统计品类的下单数量:(品类ID,下单数量)val orderActionRDD = actionRDD.filter(action => {val datas = action.split("_")datas(8) != "null"})// orderid => 1,2,3// 【(1,1),(2,1),(3,1)】val orderCountRDD = orderActionRDD.flatMap(action => {val datas = action.split("_")val cid = datas(8)val cids = cid.split(",")cids.map(id=>(id, 1))}).reduceByKey(_+_)// 4. 统计品类的支付数量:(品类ID,支付数量)val payActionRDD = actionRDD.filter(action => {val datas = action.split("_")datas(10) != "null"})// orderid => 1,2,3// 【(1,1),(2,1),(3,1)】val payCountRDD = payActionRDD.flatMap(action => {val datas = action.split("_")val cid = datas(10)val cids = cid.split(",")cids.map(id=>(id, 1))}).reduceByKey(_+_)// (品类ID, 点击数量) => (品类ID, (点击数量, 0, 0))// (品类ID, 下单数量) => (品类ID, (0, 下单数量, 0))//  两两聚合在一起    => (品类ID, (点击数量, 下单数量, 0))// (品类ID, 支付数量) => (品类ID, (0, 0, 支付数量))//  两两聚合在一起    => (品类ID, (点击数量, 下单数量, 支付数量))// ( 品类ID, ( 点击数量, 下单数量, 支付数量 ) )// 5. 将品类进行排序,并且取前10名//    点击数量排序,下单数量排序,支付数量排序//    元组排序:先比较第一个,再比较第二个,再比较第三个,依此类推//    ( 品类ID, ( 点击数量, 下单数量, 支付数量 ) )val rdd1 = clickCountRDD.map{case ( cid, cnt ) => {(cid, (cnt, 0, 0))}}val rdd2 = orderCountRDD.map{case ( cid, cnt ) => {(cid, (0, cnt, 0))}}val rdd3 = payCountRDD.map{case ( cid, cnt ) => {(cid, (0, 0, cnt))}}// 将三个数据源合并在一起,统一进行聚合计算val sourceRDD: RDD[(String, (Int, Int, Int))] = rdd1.union(rdd2).union(rdd3)// 相同的key两两聚合  连三次val analysisRDD = sourceRDD.reduceByKey(( t1, t2 ) => {( t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3 )})val resultRDD = analysisRDD.sortBy(_._2, false).take(10)// 6. 将结果采集到控制台打印出来resultRDD.foreach(println)sc.stop()}
}

方案3:一次性统计每个品类点击的次数,下单的次数和支付的次数:
(品类,(点击总数,下单总数,支付总数))

方案2优化:
存在大量的shuffle操作(reduceByKey - 不同的数据源)
reduceByKey 聚合算子,spark会提供优化(预聚合),缓存(不需要重复读取),但是这个案例是来自不同数据源的reduceByKey,会有shuffle操作。实际上,这里不用在reduceByKey之后再分别统计数量。在一开始就按照下面的数据结构统计,这样只会有一次reduceByKey:
点击的场合 : ( 品类ID,( 1, 0, 0 ) )
下单的场合 : ( 品类ID,( 0, 1, 0 ) )
支付的场合 : ( 品类ID,( 0, 0, 1 ) )

功能实现

package com.meiyuan.bigdata.spark.core.reqimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark03_Req1_HotCategoryTop10Analysis2 {def main(args: Array[String]): Unit = {// TODO : Top10热门品类val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")val sc = new SparkContext(sparConf)// Q : 存在大量的shuffle操作(reduceByKey - 不同的数据源)// reduceByKey 聚合算子,spark会提供优化,缓存// 1. 读取原始日志数据val actionRDD = sc.textFile("data/user_visit_action.txt")// 2. 将数据转换结构//    点击的场合 : ( 品类ID,( 1, 0, 0 ) )//    下单的场合 : ( 品类ID,( 0, 1, 0 ) )//    支付的场合 : ( 品类ID,( 0, 0, 1 ) )// word没变, count微变 之前讲的方法是word变了, count没变// 一次下单可能会有多个品类ID, 所以用flatMap,返回结果应该是Listval flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(action => {val data = action.split("_")if (data(6) != "-1") {// 点击的场合List((data(6), (1, 0, 0)))} else if (data(8) != "null") {// 下单的场合val ids = data(8).split(",")ids.map(id => (id, (0, 1, 0)))} else if (data(10) != "null") {// 支付的场合val ids = data(10).split(",")ids.map(id => (id, (0, 0, 1)))} else {Nil}})// 3. 将相同的品类ID的数据进行分组聚合//    ( 品类ID,( 点击数量, 下单数量, 支付数量 ) )val analysisRDD = flatRDD.reduceByKey((t1, t2) => {( t1._1+t2._1, t1._2 + t2._2, t1._3 + t2._3 )})// 4. 将统计结果根据数量进行降序处理,取前10名val resultRDD = analysisRDD.sortBy(_._2, false).take(10)// 5. 将结果采集到控制台打印出来resultRDD.foreach(println)sc.stop()}
}

方案4:使用累加器的方式聚合数据 - 不使用shuffle操作

功能实现

package com.meiyuan.bigdata.spark.core.reqimport org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutableobject Spark04_Req1_HotCategoryTop10Analysis3 {def main(args: Array[String]): Unit = {// TODO : Top10热门品类val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")val sc = new SparkContext(sparConf)// 1. 读取原始日志数据val actionRDD = sc.textFile("data/user_visit_action.txt")val acc = new HotCategoryAccumulatorsc.register(acc, "hotCategory")// 2. 将数据转换结构actionRDD.foreach(action => {val data = action.split("_")if (data(6) != "-1") {// 点击的场合acc.add((data(6), "click"))} else if (data(8) != "null") {// 下单的场合val ids = data(8).split(",")ids.foreach(id => {acc.add( (id, "order") )})} else if (data(10) != "null") {// 支付的场合val ids = data(10).split(",")ids.foreach(id => {acc.add( (id, "pay") )})}})val accVal: mutable.Map[String, HotCategory] = acc.valueval categories: mutable.Iterable[HotCategory] = accVal.map(_._2)// 可迭代的集合不能排序, 需要转为toList; 自定义降序排列val sort = categories.toList.sortWith((left, right) => {if ( left.clickCnt > right.clickCnt ) {true} else if (left.clickCnt == right.clickCnt) {if ( left.orderCnt > right.orderCnt ) {true} else if (left.orderCnt == right.orderCnt) {left.payCnt > right.payCnt} else {false}} else {false}})// 5. 将结果采集到控制台打印出来sort.take(10).foreach(println)sc.stop()}// case class 参数默认是val,下面的clickCnt、orderCnt和payCnt可以改,需要加varcase class HotCategory( cid:String, var clickCnt : Int, var orderCnt : Int, var payCnt : Int )/*** 自定义累加器* 1. 继承AccumulatorV2,定义泛型*    IN : ( 品类ID, 行为类型 )*    OUT : mutable.Map[String, HotCategory]* 2. 重写方法(6)*/class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]{private val hcMap = mutable.Map[String, HotCategory]()override def isZero: Boolean = {hcMap.isEmpty}override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] = {new HotCategoryAccumulator()}override def reset(): Unit = {hcMap.clear()}override def add(v: (String, String)): Unit = {val cid = v._1val actionType = v._2val category: HotCategory = hcMap.getOrElse(cid, HotCategory(cid, 0,0,0))if ( actionType == "click" ) {category.clickCnt += 1} else if (actionType == "order") {category.orderCnt += 1} else if (actionType == "pay") {category.payCnt += 1}hcMap.update(cid, category)}override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]): Unit = {val map1 = this.hcMapval map2 = other.valuemap2.foreach{case ( cid, hc ) => {val category: HotCategory = map1.getOrElse(cid, HotCategory(cid, 0,0,0))category.clickCnt += hc.clickCntcategory.orderCnt += hc.orderCntcategory.payCnt += hc.payCntmap1.update(cid, category)}}}override def value: mutable.Map[String, HotCategory] = hcMap}
}

需求 3 Top10 热门品类中每个品类的Top10活跃Session统计

1) 需求说明:在需求一的基础上,增加每个品类用户session的点击统计

2) 功能实现:在需求2得到Top10Ids的基础上做需求3

package com.meiyuan.bigdata.spark.core.reqimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark05_Req2_HotCategoryTop10SessionAnalysis {def main(args: Array[String]): Unit = {// TODO : Top10热门品类val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")val sc = new SparkContext(sparConf)val actionRDD = sc.textFile("data/user_visit_action.txt")actionRDD.cache()val top10Ids: Array[String] = top10Category(actionRDD)// 1. 过滤原始数据,保留点击和前10品类IDval filterActionRDD = actionRDD.filter(action => {val data = action.split("_")if ( data(6) != "-1" ) {top10Ids.contains(data(6))} else {false}})// 2. 根据品类ID和sessionid进行点击量的统计val reduceRDD: RDD[((String, String), Int)] = filterActionRDD.map(action => {val data = action.split("_")((data(6), data(2)), 1)}).reduceByKey(_ + _)// 3. 将统计的结果进行结构的转换//  (( 品类ID,sessionId ),sum) => ( 品类ID,(sessionId, sum) )val mapRDD = reduceRDD.map{case ( (cid, sid), sum ) => {( cid, (sid, sum) )}}// 4. 相同的品类进行分组val groupRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupByKey()// 5. 将分组后的数据进行点击量的排序,取前10名val resultRDD = groupRDD.mapValues(iter => {iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)})resultRDD.collect().foreach(println)sc.stop()}def top10Category(actionRDD:RDD[String]) = {val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(action => {val data = action.split("_")if (data(6) != "-1") {// 点击的场合List((data(6), (1, 0, 0)))} else if (data(8) != "null") {// 下单的场合val ids = data(8).split(",")ids.map(id => (id, (0, 1, 0)))} else if (data(10) != "null") {// 支付的场合val ids = data(10).split(",")ids.map(id => (id, (0, 0, 1)))} else {Nil}})val analysisRDD = flatRDD.reduceByKey((t1, t2) => {( t1._1+t2._1, t1._2 + t2._2, t1._3 + t2._3 )})analysisRDD.sortBy(_._2, false).take(10).map(_._1)}
}

需求 4 页面单跳转换率统计

1) 页面跳转率定义:计算页面单跳转化率,什么是页面单跳转换率,比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的概率。
比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV)为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率。

2) 功能实现: 

获取源数据=>根据用户的回话进行分组=>分组后组内根据时间进行排序保证页面跳转顺序是对的=>只保留页面后分别计算分母和分子

分母比较简单,即wordCount

分子需要保证连续的页面访问,相邻的元素形成整体,在统计出现的次数

页面单跳转率=分子/分母

package com.meiyuan.bigdata.spark.core.reqimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark06_Req3_PageflowAnalysis {def main(args: Array[String]): Unit = {// TODO : Top10热门品类val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")val sc = new SparkContext(sparConf)val actionRDD = sc.textFile("data/user_visit_action.txt")val actionDataRDD = actionRDD.map(action => {val data = action.split("_")UserVisitAction(data(0),data(1).toLong,data(2),data(3).toLong,data(4),data(5),data(6).toLong,data(7).toLong,data(8),data(9),data(10),data(11),data(12).toLong)})actionDataRDD.cache()// TODO 对指定的页面连续跳转进行统计// 1-2,2-3,3-4,4-5,5-6,6-7val ids = List[Long](1,2,3,4,5,6,7)val okflowIds: List[(Long, Long)] = ids.zip(ids.tail)// TODO 计算分母val pageidToCountMap: Map[Long, Long] = actionDataRDD.filter(action => {ids.init.contains(action.page_id) // 除了最后一个7}).map(action => {(action.page_id, 1L)}).reduceByKey(_ + _).collect().toMap// TODO 计算分子// 根据session进行分组val sessionRDD: RDD[(String, Iterable[UserVisitAction])] = actionDataRDD.groupBy(_.session_id)// 分组后,根据访问时间进行排序(升序)val mvRDD: RDD[(String, List[((Long, Long), Int)])] = sessionRDD.mapValues(iter => {val sortList: List[UserVisitAction] = iter.toList.sortBy(_.action_time)// 【1,2,3,4】// 【1,2】,【2,3】,【3,4】// 【1-2,2-3,3-4】// Sliding : 滑窗// 【1,2,3,4】// 【2,3,4】// zip : 拉链val flowIds: List[Long] = sortList.map(_.page_id)val pageflowIds: List[(Long, Long)] = flowIds.zip(flowIds.tail)// 将不合法的页面跳转进行过滤pageflowIds.filter(t => {okflowIds.contains(t)}).map(t => {(t, 1)})})// ((1,2),1)val flatRDD: RDD[((Long, Long), Int)] = mvRDD.map(_._2).flatMap(list=>list)// ((1,2),1) => ((1,2),sum)val dataRDD = flatRDD.reduceByKey(_+_)// TODO 计算单跳转换率// 分子除以分母dataRDD.foreach{case ( (pageid1, pageid2), sum ) => {val lon: Long = pageidToCountMap.getOrElse(pageid1, 0L)println(s"页面${pageid1}跳转到页面${pageid2}单跳转换率为:" + ( sum.toDouble/lon ))}}sc.stop()}//用户访问动作表case class UserVisitAction(date: String,//用户点击行为的日期user_id: Long,//用户的IDsession_id: String,//Session的IDpage_id: Long,//某个页面的IDaction_time: String,//动作的时间点search_keyword: String,//用户搜索的关键词click_category_id: Long,//某一个商品品类的IDclick_product_id: Long,//某一个商品的IDorder_category_ids: String,//一次订单中所有品类的ID集合order_product_ids: String,//一次订单中所有商品的ID集合pay_category_ids: String,//一次支付中所有品类的ID集合pay_product_ids: String,//一次支付中所有商品的ID集合city_id: Long)//城市 id
}

在实际工作中,这种写法不好扩展功能和维护。而是按照下面的三重架构模式编写代码:

7. 其他相关面试题

1. 如何使用Spark实现TopN的获取(描述思路或使用伪代码)?
方法1:
(1)按照key对数据进行聚合(groupByKey)
(2)将value转换为数组,利用scala的sortBy或者sortWith进行排序(mapValues)
注意:当数据量太大时,会导致OOM
方法2:
(1)取出所有的key
(2)对key进行迭代,每次取出一个key利用spark的排序算子进行排序
方法3:
(1)自定义分区器,按照key进行分区,使不同的key进到不同的分区
(2)对每个分区运用spark的排序算子进行排序

2. 现场写一个笔试题
hdfs文件,文件每行的格式为作品ID,用户id,用户性别。请用一个spark任务实现以下功能:统计每个作品对应的用户(去重后)的性别分布。输出格式如下:作品ID,男性用户数量,女性用户数量
答案:

sc.textfile().flatmap(.split(",")) //分割成作品ID,用户id,用户性别.map(((_.1,_._2),1)) //((作品id,用户性别),1).reduceByKey(_+_) //((作品id,用户性别),n).map(_._1._1,_._1._2,_._2) //(作品id,用户性别,n)

参考资料:

https://blog.csdn.net/yangshengwei230612/article/details/115383518

spark常用算子_chbxw-CSDN博客

spark系列6:常用RDD介绍与演示_涤生手记-CSDN博客

尚硅谷大数据Spark之RDD转换算子学习笔记及面试题相关推荐

  1. 尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,map、mapPartitions、mapPartitionsWithIndex、flatMap、glom、groupBy)】

    视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[Spark(概述.快速上手.运行环境.运行架构)] 尚硅谷大数据技术Spark教 ...

  2. 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】

    视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[Spark(概述.快速上手.运行环境.运行架构)] 尚硅谷大数据技术Spark教 ...

  3. 尚硅谷大数据技术Spark教程-笔记09【SparkStreaming(概念、入门、DStream入门、案例实操、总结)】

    尚硅谷大数据技术-教程-学习路线-笔记汇总表[课程资料下载] 视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[SparkCore ...

  4. 尚硅谷大数据技术Scala教程-笔记04【集合】

    视频地址:尚硅谷大数据技术之Scala入门到精通教程(小白快速上手scala)_哔哩哔哩_bilibili 尚硅谷大数据技术Scala教程-笔记01[Scala课程简介.Scala入门.变量和数据类型 ...

  5. 《尚硅谷大数据Hadoop》教程

    尚硅谷大数据Hadoop教程 概论 入门 HDFS MapReduce YARN 由于对这方面的知识只是做一个了解,所以详细的东西并不会做笔记. 概论 大数据的特点 海量.高速.多样.低价值密度 入门 ...

  6. 尚硅谷大数据视频_Hive视频教程

    这次分享的是尚硅谷大数据教程视频的第五份--Hive Hive是基于Hadoop的一个数据仓库工具,将繁琐的MapReduce程序变成了简单方便的SQL语句实现,深受广大软件开发工程师喜爱.Hive同 ...

  7. 尚硅谷大数据项目之电商数仓(4即席查询数据仓库)

    尚硅谷大数据项目之电商数仓(即席查询) (作者:尚硅谷大数据研发部) 版本:V4.0 第1章 Presto 1.1 Presto简介 1.1.1 Presto概念 1.1.2 Presto架构 1.1 ...

  8. 2019尚硅谷大数据Javaweb篇三 Ajax、JSTL、会话技术、过滤器、监听器、xml、json

    2019尚硅谷大数据 Javaweb篇三Ajax.JSTL.会话技术.过滤器.监听器 tags: 大数据 2019尚学堂 categories: Ajax异步请求 JSTL中的if和forEach 会 ...

  9. 电商数仓描述_笔记-尚硅谷大数据项目数据仓库-电商数仓V1.2新版

    架构 项目框架 数仓架构 存储压缩 Snappy与LZO LZO安装: 读取LZO文件时,需要先创建索引,才可以进行切片. 框架版本选型Apache:运维麻烦,需要自己调研兼容性. CDH:国内使用最 ...

  10. 尚硅谷大数据技术Hadoop教程-笔记03【Hadoop-HDFS】

    视频地址:尚硅谷大数据Hadoop教程(Hadoop 3.x安装搭建到集群调优) 尚硅谷大数据技术Hadoop教程-笔记01[大数据概论] 尚硅谷大数据技术Hadoop教程-笔记02[Hadoop-入 ...

最新文章

  1. c++局域网主动ftp_如何在局域网中实现 ARP 攻击
  2. OpenCV中的waitkey()详解
  3. php零拷贝,百万并发「零拷贝」技术系列之初探门径
  4. 进程、线程和协程的区别和联系(TX)
  5. linux进程增删改查,iptables的增删改查
  6. pca降维的基本思想_百面机器学习 第四章 降维 PCA
  7. Effective C++ 学习笔记(24)
  8. 2020 年 6 月编程语言排行榜,Rust 第一次进入榜单前 20。
  9. 数据分析师的30种死法
  10. 重装系统后计算机无法启动,重装系统后电脑为什么启动不了?云骑士告诉你怎么办?...
  11. 阿里云张振尧:阿里云边缘云驱动5G时代行业新价值
  12. Web IDE优势在哪?详解Web版数据库管理工具SQL Studio
  13. MapReduce中名字的通俗解释--故事会
  14. 计算机未响应硬盘,最近电脑打开磁盘或文件夹老程序未响应为什么啊,有什么办法可以解决?...
  15. 重学C++笔记之(十三)友元、异常和其他
  16. ubuntu16.04Cuda8.0安装opencv3.1
  17. python习题练习
  18. 注会 第三章 存货
  19. 谷歌浏览器打开普通用户_Chrome浏览器的用户账户和密码如何导入其他浏览器
  20. 【python】yolov5的torch与torchvision环境问题

热门文章

  1. hi3798mv300救砖包_迪优美特V26_hi3798mv300线刷固件升级包纯净系统
  2. Matlab —— 电路仿真
  3. 人工智能AI - 学习/实践
  4. Cobalt Strike Profile 学习记录
  5. 多功能照片图片处理器小程序源码/流量主系列小程序源码
  6. Django(71)图片处理器django-imagekit
  7. Java学习路线(转)
  8. 小白安装linux系统一键,小白如何快速安装vos3000,一键快速安装VOS
  9. 微信和qq默认表情代码对照表及表情文件下载
  10. 信息和信息技术的概念,发展和应用