目录

常用算子

基本算子

分区操作函数算子

重分区函数算子

1)、增加分区函数

2)、减少分区函数

3)、调整分区函数

​​​​​​​聚合函数算子

​​​​​​​Scala集合中的聚合函数

​​​​​​​RDD中的聚合函数

​​​​​​​​​​​​​PairRDDFunctions聚合函数

面试题:groupByKey和reduceByKey

关联函数

排序函数-求TopKey


常用算子

RDD中包含很多函数,主要可以分为两类:Transformation转换函数和Action函数

主要常见使用函数如下,一一通过演示范例讲解。

基本算子

RDD中map、filter、flatMap及foreach等函数为最基本函数,都是都RDD中每个元素进行操作,将元素传递到函数中进行转换。

  1.  map 算子:

    1. map(f:T=>U) : RDD[T]=>RDD[U],表示将 RDD 经由某一函数 f 后,转变为另一个RDD。
  1.  flatMap 算子:

    1. flatMap(f:T=>Seq[U]) : RDD[T]=>RDD[U]),表示将 RDD 经由某一函数 f 后,转变为一个新的 RDD,但是与 map 不同,RDD 中的每一个元素会被映射成新的 0 到多个元素(f 函数返回的是一个序列 Seq)。
  1.  filter 算子:

    1. filter(f:T=>Bool) : RDD[T]=>RDD[T],表示将 RDD 经由某一函数 f 后,只保留 f 返回为 true 的数据,组成新的 RDD。
  2.  foreach 算子:
    1. foreach(func),将函数 func 应用在数据集的每一个元素上,通常用于更新一个累加器,或者和外部存储系统进行交互,例如 Redis。关于 foreach,在后续章节中还会使用,到时会详细介绍它的使用方法及注意事项。
  1.  saveAsTextFile 算子:

    1. saveAsTextFile(path:String),数据集内部的元素会调用其 toString 方法,转换为字符串形式,然后根据传入的路径保存成文本文件,既可以是本地文件系统,也可以是HDFS 等。

​​​​​​​分区操作函数算子

每个RDD由多分区组成的,实际开发建议对每个分区数据的进行操作,map函数使用mapPartitions代替、foreache函数使用foreachPartition代替。

针对词频统计WordCount代码进行修改,针对分区数据操作,示例代码如下:

package cn.itcast.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}/*** 分区操作函数:mapPartitions和foreachPartition*/
object SparkIterTest {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 1、从文件系统加载数据,创建RDD数据集val inputRDD: RDD[String] = sc.textFile("data/input/words.txt", minPartitions = 2)// 2、处理数据,调用RDD集合中函数(类比于Scala集合类中列表List)/*def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]*/val wordcountsRDD: RDD[(String, Int)] = inputRDD// 将每行数据按照分隔符进行分割,将数据扁平化.flatMap(_.trim.split("\\s+"))// 针对每个分区数据操作.mapPartitions{ iter =>// iter 表示RDD中每个分区中的数据,存储在迭代器中,相当于列表Listiter.map((_, 1))}// 按照Key聚合统计, 先按照Key分组,再聚合统计(此函数局部聚合,再进行全局聚合).reduceByKey(_+_)// 3、输出结果RDD到本地文件系统wordcountsRDD.foreachPartition{ iter =>// 获取各个分区IDval partitionId: Int = TaskContext.getPartitionId()// val xx: Iterator[(String, Int)] = datasiter.foreach{ case (word, count) =>println(s"p-${partitionId}: word = $word, count = $count")}}// 应用程序运行结束,关闭资源sc.stop()}
}

为什么要对分区操作,而不是对每个数据操作,好处在哪里呢???

  1. 应用场景:处理网站日志数据,数据量为10GB,统计各个省份PV和UV

    1. 假设10GB日志数据,从HDFS上读取的,此时RDD的分区数目:80 分区;
    2. 但是分析PV和UV有多少条数据:34,存储在80个分区中,实际项目中降低分区数目,比如设置为2个分区。

​​​​​​​重分区函数算子

如何对RDD中分区数目进行调整(增加分区或减少分区),在RDD函数中主要有如下三个函数。

1)、增加分区函数

函数名称:repartition,此函数使用的谨慎,会产生Shuffle。

注意: repartition底层调用coalesce(numPartitions, shuffle=true)

2)、减少分区函数

函数名称:coalesce,shuffle参数默认为false,不会产生Shuffle,默认只能减少分区

比如RDD的分区数目为10个分区,此时调用rdd.coalesce(12),不会对RDD进行任何操作

3)、调整分区函数

在PairRDDFunctions中partitionBy函数:

import org.apache.spark.Partitioner/*** 自定义分区器,实现RDD分区,在进行Shuffle过程中*/class MyPartitioner extends Partitioner{// 确定分区数目override def numPartitions: Int = 3// 依据Key,确定所属分区,返回值:0,...,2override def getPartition(key: Any): Int = {// 获取每个单词第一个字符val firstChar: Int = key.asInstanceOf[String].charAt(0).toIntif(firstChar >= 97 && firstChar <= 122){0  // 小写字母开头单词,在第一个分区}else if(firstChar >= 65 && firstChar <= 90){1 // 大写字母开头单词,在第二个分区}else{2 // 非大小字母开头单词,在第三个分区}}}

范例演示代码,适当使用函数调整RDD分区数目:

package cn.itcast.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** RDD中分区函数,调整RDD分区数目,可以增加分区和减少分区*/
object SparkPartitionTest {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 读取本地文件系统文本文件数据val datasRDD: RDD[String] = sc.textFile("data/input/words.txt", minPartitions = 2)// 增加RDD分区数val etlRDD: RDD[String] = datasRDD.repartition(3)println(s"EtlRDD 分区数目 = ${etlRDD.getNumPartitions}")// 词频统计val resultRDD: RDD[(String, Int)] = etlRDD// 数据分析,考虑过滤脏数据.filter(line => null != line && line.trim.length > 0)// 分割单词,注意去除左右空格.flatMap(line => line.trim.split("\\s+"))// 转换为二元组,表示单词出现一次.mapPartitions{iter =>iter.map((_, 1))}// 分组聚合,按照Key单词.reduceByKey(_+_)//resultRDD.partitionBy(传入自定义分区器)// 输出结果RDDresultRDD// 对结果RDD降低分区数目.coalesce(1).foreachPartition(iter => iter.foreach(println))// 应用程序运行结束,关闭资源sc.stop()}
}

在实际开发中,什么时候适当调整RDD的分区数目呢?让程序性能更好好呢????

第一点:增加分区数目

当处理的数据很多的时候,可以考虑增加RDD的分区数

 第二点:减少分区数目

其一:当对RDD数据进行过滤操作(filter函数)后,考虑是否降低RDD分区数目

其二:当对结果RDD存储到外部系统

​​​​​​​聚合函数算子

在数据分析领域中,对数据聚合操作是最为关键的,在Spark框架中各个模块使用时,主要就是其中聚合函数的使用。

​​​​​​​Scala集合中的聚合函数

回顾列表List中reduce聚合函数核心概念:聚合的时候,往往需要聚合中间临时变量。查看列表List中聚合函数reduce和fold源码如下:

通过代码,看看列表List中聚合函数使用:

运行截图如下所示:

fold聚合函数,比reduce聚合函数,多提供一个可以初始化聚合中间临时变量的值参数:

聚合操作时,往往聚合过程中需要中间临时变量(到底时几个变量,具体业务而定),如下案例:

​​​​​​​RDD中的聚合函数

在RDD中提供类似列表List中聚合函数reduce和fold,查看如下:

案例演示:求列表List中元素之和,RDD中分区数目为2,核心业务代码如下:

运行原理分析:

使用RDD中fold聚合函数:

查看RDD中高级聚合函数aggregate,函数声明如下:

seqOp函数的第一个参数是累加器,第一次执行时,会把zeroValue赋给累加器。第一次之后会把返回值赋给累加器,作为下一次运算的第一个参数。

seqOP函数每个分区下的每个key有个累加器,combOp函数全部分区有几个key就有几个累加器。如果某个key只存在于一个分区下,不会对他执行combOp函数

业务需求:使用aggregate函数实现RDD中最大的两个数据,分析如下:

核心业务代码如下:

运行结果原理剖析示意图:

上述完整范例演示代码:


package cn.itcast.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}import scala.collection.mutable
import scala.collection.mutable.ListBuffer/*** RDD中聚合函数:reduce、aggregate函数*/
object SparkAggTest {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 模拟数据,1 到 10 的列表,通过并行方式创建RDDval datas = 1 to 10val datasRDD: RDD[Int] = sc.parallelize(datas, numSlices = 2)// 查看每个分区中的数据datasRDD.foreachPartition { iter =>println(s"p-${TaskContext.getPartitionId()}: ${iter.mkString(", ")}")}println("==================使用reduce函数聚合=======================")// 使用reduce函数聚合val result: Int = datasRDD.reduce((tmp, item) => {println(s"p-${TaskContext.getPartitionId()}: tmp = $tmp, item = $item")tmp + item})println(result)println("==================使用fold函数聚合=======================")// 使用fold函数聚合val result2: Int = datasRDD.fold(0)((tmp, item) => {println(s"p-${TaskContext.getPartitionId()}: tmp = $tmp, item = $item")tmp + item})println(result2)println("================使用aggregate函数获取最大的两个值=========================")// 使用aggregate函数获取最大的两个值val top2: mutable.Seq[Int] = datasRDD.aggregate(new ListBuffer[Int]())(// 分区内聚合函数,每个分区内数据如何聚合  seqOp: (U, T) => U,(u, t) => {println(s"p-${TaskContext.getPartitionId()}: u = $u, t = $t")// 将元素加入到列表中u += t //// 降序排序val top = u.sorted.takeRight(2)// 返回top},// 分区间聚合函数,每个分区聚合的结果如何聚合 combOp: (U, U) => U(u1, u2) => {println(s"p-${TaskContext.getPartitionId()}: u1 = $u1, u2 = $u2")u1 ++= u2 // 将列表的数据合并,到u1中//u1.sorted.takeRight(2)})println(top2)// 应用程序运行结束,关闭资源sc.stop()}
}

​​​​​​​​​​​​​PairRDDFunctions聚合函数

在Spark中有一个object对象PairRDDFunctions,主要针对RDD的数据类型是Key/Value对的数据提供函数,方便数据分析处理。比如使用过的函数:reduceByKey、groupByKey等。*ByKey函数将相同Key的Value进行聚合操作的,省去先分组再聚合

第一类:分组函数groupByKey

第二类:分组聚合函数reduceByKeyfoldByKey

但是reduceByKey和foldByKey聚合以后的结果数据类型与RDD中Value的数据类型是一样的。

第三类:分组聚合函数aggregateByKey

在企业中如果对数据聚合使用,不能使用reduceByKey完成时,考虑使用aggregateByKey函数,基本上都能完成任意聚合功能。

演示范例代码如下:

package cn.itcast.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** RDD中聚合函数,针对RDD中数据类型Key/Value对:*      groupByKey*      reduceByKey/foldByKey*      aggregateByKey*/
object SparkAggByKeyTest {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 1、并行化集合创建RDD数据集val linesSeq: Seq[String] = Seq("hello me you her","hello you her","hello her","hello")val inputRDD: RDD[String] = sc.parallelize(linesSeq, numSlices = 2)// 2、分割单词,转换为二元组val wordsRDD: RDD[(String, Int)] = inputRDD.flatMap(_.split("\\s+")).map((_,1))println("==使用groupByKey函数分组,再使用mapValues函数聚合==")val wordsGroupRDD: RDD[(String, Iterable[Int])] = wordsRDD.groupByKey()val resultRDD: RDD[(String, Int)] = wordsGroupRDD.mapValues(_.sum)println(resultRDD.collectAsMap())println("==使用reduceByKey或foldByKey分组聚合==")val resultRDD2: RDD[(String, Int)] = wordsRDD.reduceByKey(_+_)println(resultRDD2.collectAsMap())val resultRDD3 = wordsRDD.foldByKey(0)(_+_)println(resultRDD3.collectAsMap())println("==使用aggregateByKey聚合==")/*def aggregateByKey[U: ClassTag](zeroValue: U) // 聚合中间临时变量初始值,类似fold函数zeroValue(seqOp: (U, V) => U, // 各个分区内数据聚合操作函数combOp: (U, U) => U // 分区间聚合结果的聚合操作函数): RDD[(K, U)]*/val resultRDD4 = wordsRDD.aggregateByKey(0)((tmp: Int, item: Int) => {tmp + item},(tmp: Int, result: Int) => {tmp + result})println(resultRDD4.collectAsMap())// 应用程序运行结束,关闭资源sc.stop()}}

面试题:groupByKey和reduceByKey

RDD中groupByKey和reduceByKey区别???

groupByKey函数:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的函数,将相同key的值聚合到一起。

reduceByKey函数:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。有预聚合

关联函数

当两个RDD的数据类型为二元组Key/Value对时,可以依据Key进行关联Join。

首先回顾一下SQL JOIN,用Venn图表示如下:

RDD中关联JOIN函数都在PairRDDFunctions中,具体截图如下:

具体看一下join(等值连接)函数说明:

范例演示代码:

package cn.itcast.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** RDD中关联函数Join,针对RDD中数据类型为Key/Value对*/
object SparkJoinTest {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 模拟数据集val empRDD: RDD[(Int, String)] = sc.parallelize(Seq((1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")))val deptRDD: RDD[(Int, String)] = sc.parallelize(Seq((1001, "sales"), (1002, "tech")))/*def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]*/val joinRDD: RDD[(Int, (String, String))] = empRDD.join(deptRDD)println(joinRDD.collectAsMap())/*def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]*/val leftJoinRDD: RDD[(Int, (String, Option[String]))] = empRDD.leftOuterJoin(deptRDD)println(leftJoinRDD.collectAsMap())// 应用程序运行结束,关闭资源sc.stop()}
}

排序函数-求TopKey

在上述词频统计WordCount代码基础上,对统计出的每个单词的词频Count,按照降序排序,获取词频次数最多Top3单词

RDD中关于排序函数有如下三个:

1)、sortByKey:针对RDD中数据类型key/value对时,按照Key进行排序

2)、sortBy:针对RDD中数据指定排序规则

3)、top:按照RDD中数据采用降序方式排序,如果是Key/Value对,按照Key降序排序

具体演示代码如下,注意慎用top函数。

package cn.itcast.helloimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** 获取词频最高三个单词*/
object WordCountTopKey {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")val inputRDD: RDD[String] = sc.textFile("data/input/words.txt")val wordsRDD = inputRDD.flatMap(line => line.split("\\s+"))val tuplesRDD: RDD[(String, Int)] = wordsRDD.map((_, 1))val wordCountsRDD: RDD[(String, Int)] = tuplesRDD.reduceByKey(_+_)wordCountsRDD.foreach(println)// 按照词频count降序排序获取前3个单词, 有三种方式println("======================== sortByKey =========================")// 方式一:按照Key排序sortByKey函数,/*def sortByKey(ascending: Boolean = true,numPartitions: Int = self.partitions.length): RDD[(K, V)]*/wordCountsRDD.map(tuple => tuple.swap) //.map(tuple => (tuple._2, tuple._1)).sortByKey(ascending = false)//逆序.take(3).foreach(println)println("======================== sortBy =========================")// 方式二:sortBy函数, 底层调用sortByKey函数/*def sortBy[K](f: (T) => K, // T 表示RDD集合中数据类型,此处为二元组ascending: Boolean = true,numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]*/wordCountsRDD.sortBy(tuple => tuple._2, ascending = false).take(3).foreach(println)println("======================== top =========================")// 方式三:top函数,含义获取最大值,传递排序规则, 慎用/*def top(num: Int)(implicit ord: Ordering[T]): Array[T]*/wordCountsRDD.top(3)(Ordering.by(_._2)).foreach(println)sc.stop()}
}

2021年大数据Spark(十五):Spark Core的RDD常用算子相关推荐

  1. 2021年大数据Kafka(五):❤️Kafka的java API编写❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka的java API编写 一.生产者代码 第一步: ...

  2. 2021年大数据HBase(五):HBase的相关操作JavaAPI方式

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 HBase的相关操作-JavaAPI方式 一.需求说明 ...

  3. 2021年大数据Hive(五):Hive的内置函数(数学、字符串、日期、条件、转换、行转列)

    全网最详细的Hive文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 系列历史文章 前言 Hive的内置函数 一.数学函数 1. 取整函数: round ...

  4. 2021年大数据发展十大趋势:抓准一个,就能掌握先机!

    导读:如何激活数据价值.真正从大数据中"淘金",成为2021年大数据的重中之重.大数据究竟如何持续不断地影响组织和机构,以及它给这个世界带来了何种影响,本文特别梳理出2021年大数 ...

  5. 2021年大数据ELK(五):Elasticsearch中的核心概念

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Elasticsearch中的核心概念 一.索引 index 二 ...

  6. 2021年大数据基础(五):​​​​​​​​​​​​​​​​​​​​​分布式技术

    2021大数据领域优质创作博客,带你从入门到精通,该博客每天更新,逐渐完善大数据各个知识体系的文章,帮助大家更高效学习. 有对大数据感兴趣的可以关注微信公众号:三帮大数据 目录 分布式技术 为什么需要 ...

  7. 2021年大数据ZooKeeper(五):ZooKeeper Java API操作

    目录 ZooKeeper Java API操作 引入maven坐标 节点的操作 ZooKeeper Java API操作 这里操作Zookeeper的JavaAPI使用的是一套zookeeper客户端 ...

  8. 客快物流大数据项目(十五):DockeFile常用命令

    目录 DockeFile常用命令 一.FROM 二.​​​​​​​MAINTAINER 三.​​​​​​​RUN

  9. 2021年大数据Flink(五):Standalone-HA高可用集群模式

    目录 Standalone-HA高可用集群模式 原理 操作 1.集群规划 2.启动ZooKeeper 3.启动HDFS 4.停止Flink集群 5.修改flink-conf.yaml 6.修改mast ...

最新文章

  1. facebook 直播_什么时候是在Facebook Live上直播的最佳时间? 我分析了5,000个Facebook帖子以找出答案。...
  2. 利用async和await异步操作解决node.js里面fs模块异步读写,同步结果的问题
  3. python字典里可以放列表吗_学习python之列表及字典
  4. 102. 最佳牛围栏【二分 / 思维 不错】
  5. 关于linux的服务器搭建,关于搭建linux日志服务器
  6. 微信小程序自定义弹窗实例
  7. YBTOJ:前缀询问(trie树)
  8. linux-macbook内核,技术|用 Linux 让旧 MacBook 重获新生
  9. php linux 删除文件夹,linux下如何删除文件夹
  10. Lodop打印设计界面生成代码带”...(省略)”
  11. 教育|教授因被指控“奴役”博士生遭学校解雇,反手将学校告上法庭并获赔偿......
  12. ext--fileset控件示例
  13. Flask备注4(Structure)
  14. 先有电脑然后才有手机,为什么当时手机不叫“手脑”呢?
  15. Mac 如何查看电脑的蓝牙版本信息
  16. 论文笔记_S2D.73_ICCV2021_单目深度估计的可解释深度网络研究
  17. php fseek函数,php fseek函数怎么用
  18. QTableView遍历
  19. POJ2545-丑数
  20. linux忘记密码,使用星号密码查看器,查看SSH工具记录的密码

热门文章

  1. 【golang程序包推荐分享】go-ini、viper、godoc
  2. 2022-2028年中国基金业投资分析及前景预测报告
  3. 后端怎么防止重复提交?(常用的做法)
  4. 2022-2028年中国数字化档案加工行业市场深度分析及发展策略分析报告
  5. Python 标准库之 shutil
  6. 时间处理_pandas_时间处理小结
  7. tensorflow问题
  8. LeetCode简单题之同构字符串
  9. 2021 年音视频技术与发展
  10. 一些量化(quantization)技巧