Spark总结之RDD(四)

1. 背景

  1. Spark针对RDD的整体设计思想很多地方和MapReduce相似,但又做了很多优化.
  2. Spark整体API设计针对分布式数据处理做了很多优化,并且做了更高层级的抽象,API使用更加简单便捷.例如RDD\DataSet\DataFrame\DStream等
  3. 本文主要关于RDD的介绍,RDD类型较多,大的分类是Transformation和Action类型,但具体的RDD中又可以进一步细分为基础的RDD和在基础RDD之上的高级RDD(内部调用基础RDD以及各种操作)
  4. 好的API设计确实是使用便利,而把复杂的内部实现屏蔽起来.并且会对API接口做分层处理,Spark的RDD的API设计就很好体现了这一点,后续源码会有提及.

2. RDD类型

2.1 cogroup

顾名思义,协分组,可以看成是将多个RDD聚合然后根据key进行划分.

    val conf: SparkConf = new SparkConf().setAppName(classOf[CogroupTest].getName)conf.setMaster("spark://linux101:7077")val sc = new SparkContext(conf)// 创建rddval rdd1: RDD[(String, Int)] = sc.makeRDD(List(("tome", 1), ("tome", 2), ("jim", 1), ("tony", 1), ("sanny", 1)), 2)val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("tome", 3), ("tome", 1), ("jim", 4), ("tony", 6), ("sanny", 1)), 2)// cogroup,将多个rdd联合起来,进行groupByKey的操作。// 注意这种方式面对大数据场景,由于没有reduceByKey这种提前局部聚合,效率较低// 使用union,然后reduceByKey相对更加高效一些val res: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)println(res.collect().toBuffer)Thread.sleep(10000000)

运算结果

ArrayBuffer((jim,(CompactBuffer(1),CompactBuffer(4))), (tony,(CompactBuffer(1),CompactBuffer(6))), (sanny,(CompactBuffer(1),CompactBuffer(1))), (tome,(CompactBuffer(1, 2),CompactBuffer(3, 1))))

可以看出,最后的运算结果,是2个RDD中相同的key,value是2者合并起来的结果集合
实际cogroup的源码如下

注意有多参数的方法重载


最终实现这里,可以看到,就是创建了一个CoGroupedRDD,然后这个RDD使用mapValues对内部的数据做处理
处理逻辑就是当前调用cogroup的RDD的values,以及作为参数的RDD的values,其中的values转换为Iterable
返回值最终是RDD[(K, (Iterable[V], Iterable[W]))] ,可以看出最后生成的是对偶数组,key就是2个RDD中的key,value就是2个RDD的value对应数据形成的迭代器,2个迭代器组合而成的对偶数组.
所以整体可以把cogoup看做是把2个或多个RDD相同key的数据抽离出来,放在一起,但结果是以Tuple形式,Tuple中就是不同RDD的values形成的迭代器.
cogroup是产生了shuffle的
http://windows:4040/jobs/job/?id=0 web页面查看信息

上述可以看出,这里有3个stage, 每个stage是2个分区,并行处理2个task

注意,如果是需要对多个RDD的数据进行处理,可以先union,再局部聚合reduceByKey,这样更加高效.不过数据量少时,使用cogroup性能耗时上不会有太大差异

注意,shuffle的本质和英文名字类似,洗牌.Saprk和MapReduce的shuffle一样,都是把数据打散
数据打散就意味着需要有一个规则将数据划分到不同的区域去,一般是分区器来做这个事情.默认一般是hashPartitioner
实际shuffle分为2个阶段,shuffle write,shuffle read.也就是shuffle前期,数据从内存溢出写到磁盘,然后下游需要拉取这些写道磁盘的数据
shuffle和是否网络传输无关因为上下游有可能在同一个计算机节点上,直接本地通信即可.
shuffle会产生对应的map task

2.2 intersection

顾名思义,就是求RDD之间的交集,隐含着的条件就是2个RDD的数据类型需要一致.(是指RDD产生出来的数据类型需要一致)

    val conf: SparkConf = new SparkConf().setAppName("IntersectionTest").setMaster("local[*]")val sc = new SparkContext(conf)// 交集val rdd1: RDD[Int] = sc.makeRDD(List(2, 3, 4, 5), 2)val rdd2: RDD[Int] = sc.makeRDD(List( 4, 5, 6, 7), 2)// 就跟集合操作一样,交集就是求出2个集合中共同拥有的元素// 所以隐含着一个条件,数据类型需要一样,因为需要2者之间可以对比和判断val res: RDD[Int] = rdd1.intersection(rdd2)// 打印结果println(res.collect().toBuffer)sc.stop()

结果

ArrayBuffer(4, 5)

源码实现

第一步是把数据转换成key value形式,2个RDD的数据,key就是本身的数据,value是null
然后是转换后的数据进行cogroup操作,这样得出的就是相同的key,对应数据都放在一起.key, value就是2个迭代器组成的tuple,迭代器中数据是null类型
然后对数据做filter筛选操作, 这里使用的是偏函数,匹配模式.如果是交集,那么肯定第一个迭代器leftGroup和第二个迭代器rightGroup中数据都不为空,这样才算是交集数据
最后是将筛选后的数据,把keys取出来,因为要的也是交集数据,value只是一个一个的null,无用

从上述源码实现来看,cogroup可以看成是一个相对底层的算子,intersection则是一个相对高级的算子.

整体实现思路来看,其实spark的api屏蔽了分布式处理的细节,转而改为对抽象数据集合RDD的操作,就跟scala的集合转换和处理是一样的.
注意这里使用了cogroup,则一定会有shuffle

2.3 join

  1. 顾名思义,这个算子和sql中的join非常相似,都是将不同数据集合中根据相同字段以及值进行拼接组合.
  2. 这也就意味着,进行组合的字段类型,值必须相同才能够组合.类似于join on的字段一样.
  3. 默认,都是RDD中数据的第一个字段进行组合关联,所以一般这里的数据都是tuple2对偶数组
val conf: SparkConf = new SparkConf().setAppName("JoinTest").setMaster("local[*]")val sc = new SparkContext(conf)// join 2个RDD进行joinval rdd1: RDD[(Int, String)] = sc.parallelize(List((1, "1"),(2, "dd"),(1, "haha")))val rdd2: RDD[(Int, String)] = sc.parallelize(List((1, "ui"),(2, "pm"),(1, "coder")))val res: RDD[(Int, (String, String))] = rdd1.join(rdd2)println(res.collect().toBuffer)sc.stop()

源码

从源码可以看出,2个RDD的数据先进行cogroup处理,
然后使用flatMapValues,针对values进行操作
map中操作是一个嵌套for循环,就是先遍历2个RDD中第一个RDD的数据,然后遍历第二个RDD中数据,最后将匹配的数据汇总搜集起来
最后使用flat操作展平,因为最后得到的结果可能是key有多个数据,value也有多个数据,而需要的数据是按照key分别展平的数据

2.4 leftOuterJoin

和sql中一样,leftjoin,会以左侧数据为准,就算左侧数据没有字段能够和右侧的数据集匹配上,数据也会保留.

    val conf: SparkConf = new SparkConf().setAppName("JoinTest").setMaster("local[*]")val sc = new SparkContext(conf)// join 2个RDD进行joinval rdd1: RDD[(Int, String)] = sc.parallelize(List((1, "1"),(2, "dd"),(3, "haha")))val rdd2: RDD[(Int, String)] = sc.parallelize(List((1, "ui"),(2, "pm"),(1, "coder")))val res: RDD[(Int, (String, Option[String]))] = rdd1.leftOuterJoin(rdd2)println(res.collect().toBuffer)sc.stop()

运行结果

ArrayBuffer((1,(1,Some(ui))), (1,(1,Some(coder))), (2,(dd,Some(pm))), (3,(haha,None)))

从上述结果可以看出,就算右侧数据集合没有匹配上数据,左侧的数据也会被保留.只不过以None进行替代

源码

可以看出,2个RDD之间使用了cogroup操作
在此基础上,使用flatmapValues对值进行操作
map操作逻辑中,先判断第二个RDD中数据是否为空,是的话,则返回数据是(key, None)类型
如果第二个RDD中数据不为空,则进行嵌套for循环,然后把循环的数据使用yield搜集起来,注意这里的数据是对偶元组,第二个RDD中数据是Some(V)类型,就是Option值
Option值是针对Java中空数值或者对象的一个很大提升,显示地提示程序员这里可能为空,可以很明显降低null pointer地几率.
这是一种很有意思地编程接口设计思路,既然无法避免,那就让它让直接暴露出来,因为好处很大,所以swift也吸收了这种思路,也提供了option这种数据类型,可选值类型

2.5 rightOuterJoin

val conf: SparkConf = new SparkConf().setAppName("JoinTest").setMaster("local[*]")val sc = new SparkContext(conf)// join 2个RDD进行joinval rdd1: RDD[(Int, String)] = sc.parallelize(List((1, "1"),(2, "dd"),(1, "haha")))val rdd2: RDD[(Int, String)] = sc.parallelize(List((1, "ui"),(2, "pm"),(10, "coder")))val res: RDD[(Int, (Option[String], String))] = rdd1.rightOuterJoin(rdd2)println(res.collect().toBuffer)sc.stop()

运行结果

ArrayBuffer((1,(Some(1),ui)), (1,(Some(haha),ui)), (2,(Some(dd),pm)), (10,(None,coder)))

注意这里,join是以右侧数据为主,就算没有匹配上,右侧数据不会被丢弃,只是会把左侧地数据以None标记,占位并保存下来.

源码

这里可以看出,先cogroup操作,然后使用flatmapValues对数据进行处理
处理逻辑中,也是先判断第一个RDD的数据是否为空,是的话,数据就会转换为(None, value)的形式
如果第一个RDD的数据不为空,则使用嵌套for循环,然后把数据搜集起来,形成一个一个对偶元组
最后使用flat操作,把数据展平

2.6 fullOuterJoin

和sql中一样,fulljoin,不管2侧数据集合是否有匹配,都会把数据保留下来.

val conf: SparkConf = new SparkConf().setAppName("JoinTest").setMaster("local[*]")val sc = new SparkContext(conf)// join 2个RDD进行joinval rdd1: RDD[(Int, String)] = sc.parallelize(List((1, "1"), (2, "dd"), (7, "haha")))val rdd2: RDD[(Int, String)] = sc.parallelize(List((1, "ui"), (2, "pm"), (6, "coder")))val res: RDD[(Int, (Option[String], Option[String]))] = rdd1.fullOuterJoin(rdd2)println(res.collect().toBuffer)sc.stop()

运行结果

ArrayBuffer((1,(Some(1),Some(ui))), (2,(Some(dd),Some(pm))), (6,(None,Some(coder))), (7,(Some(haha),None)))

源码

从上述可以看出,先2个RDD之间先进行cogroup操作,然后进行flatMapValues操作
map逻辑是使用偏函数,进行匹配处理
如果第二个RDD中数据为空,Seq(),则将数据转换为(Some(V), None)的形式
如果第一个RDD中数据为空,也类似操作
如果2个都有数据,则使用嵌套for循环,将数据以(Some(v), Some(w))形式搜集起来
最后是flatten操作,数据展平,因为搜集出来的数据可能key,value都是数组形式,需要展平

2.7 substract

求2个集合之间的差集,就是去掉2个差集中相同部分.

val conf: SparkConf = new SparkConf().setAppName("JoinTest").setMaster("local[*]")val sc = new SparkContext(conf)// 差集val rdd1: RDD[Int] = sc.makeRDD(List(1, 3, 5, 6))val rdd2: RDD[Int] = sc.makeRDD(List(5, 6, 7, 8, 9))val res: RDD[Int] = rdd1.subtract(rdd2)println(res.collect().toBuffer)sc.stop()

运行结果

ArrayBuffer(1, 3)

源码

上述源码可以看出,最核心代码是RDD做了转换
转换逻辑是先将数据转成key value形式,value是null
然后再使用subtractByKey进行处理,传入的参数是第二个RDD的数据,
第二个RDD的数据也转换为key value形式
最终结果是把keys取出来,因为value没有 作用,只是进行转换所需.



以下是核心的代码


可以看出,整体思路就是将RDD1中数据转换为key value形式,value就是各个相同key对应的value组成的数据ArrayBuffer.
然后这个map以移除RDD2数据转换成的map对应的keys,这样就可以把差集数据求出来,因为相同key对应的数据已经被移除,所以留下的就是差集数据

2.8 substractByKey

val conf: SparkConf = new SparkConf().setAppName("JoinTest").setMaster("local[*]")val sc = new SparkContext(conf)val rdd3: RDD[(String, Int)] = sc.parallelize(List(("x", 1), ("r", 2), ("t", 3)))val rdd4: RDD[(String, Int)] = sc.parallelize(List(("x", 3), ("t", 2), ("z", 3)))val res2: RDD[(String, Int)] = rdd3.subtractByKey(rdd4)println(res2.collect().toBuffer)

运行结果

ArrayBuffer((r,2))

这个算子比较底层,最终是创建了SubtractedRDD,源代码在上述的substract中

2.9 repartition

顾名思义,重新对数据做分区.

在Spark中,如果发生数据倾斜或者防止,往往可以通过对数据重新分区来解决数据倾斜
另外,解决数据倾斜,可以换分区器,如果是哈希分区,可以把key打散,可以增加内存,可以提前聚合等等操作.都能缓解数据倾斜

val conf: SparkConf = new SparkConf().setAppName("JoinTest").setMaster("local[*]")val sc = new SparkContext(conf)// 重新分区val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8), 4)val rdd1WithIndex: RDD[(Int, Int)] = rdd1.mapPartitionsWithIndex((index, e) => {e.map(x => (index, x))})val rdd2: RDD[Int] = rdd1.repartition(4)val rdd2WithIndex: RDD[(Int, Int)] = rdd2.mapPartitionsWithIndex((index, e) => {e.map(x => (index, x))})println("rdd1WithIndex:"+ rdd1WithIndex.collect().toBuffer)println("rdd2WithIndex:"+ rdd2WithIndex.collect().toBuffer)sc.stop()

运行结果

rdd1WithIndex:ArrayBuffer((0,1), (0,2), (1,3), (1,4), (2,5), (2,6), (3,7), (3,8))rdd2WithIndex:ArrayBuffer((0,2), (0,5), (0,7), (1,3), (1,6), (1,8), (2,4), (3,1))

源码

从上述源码可以看出,最终是生成了一个CoalescedRDD对象
如果需要shuffle,则会先计算shuffle中各个数据分到哪个分区index

注意这里核心代码是重写的compute等代码,但具体作用需要运行起来更加直观
注意,coalesce如果设置参数为false,或者默认调用,是不会产生shuffle的

2.11 coalesce

数据重新分区,可以设置是否shuffle.如果是大分区降低为小分区,可以设置不shuffle,这样数据直接按照区进行聚合即可.
但如果小分区数变成大分区数,则一定需要shuffle,因为数据需要打散到不同分区上去.

val conf: SparkConf = new SparkConf().setAppName("JoinTest").setMaster("local[*]")val sc = new SparkContext(conf)val rdd3: RDD[ Int] = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 4)val indexedRDD1: RDD[(Int, Int)] = rdd3.mapPartitionsWithIndex((index, iter) => {iter.map(x => (index, x))})// 重新分区,但默认不做shuffle,所以这里本质只是聚合val res: RDD[Int] = rdd3.coalesce(3)val indexedResRDD: RDD[(Int, Int)] = res.mapPartitionsWithIndex((index, iter) => {iter.map(x => (index, x))})// 打印println("indexedRDD1: " + indexedRDD1.collect().toBuffer)println("indexedResRDD: " + indexedResRDD.collect().toBuffer)sc.stop()

运行结果

indexedResRDD: ArrayBuffer((0,1), (0,2), (1,3), (1,4), (2,5), (2,6), (2,7), (2,8), (2,9))

Spark总结之RDD(四)相关推荐

  1. spark基础之RDD详解

    一 什么是RDD,有什么特点? RDD: Resilient Distributed Dataset,弹性分布式数据集. 特点: # 它是一种数据的集合 # 它可以被分区,每一个分区分布在不同的集群中 ...

  2. 【Spark Streaming】(四)基于 Spark Structured Streaming 的开发与数据处理

    文章目录 一.前言 二.Spark Streaming vs Structured Streaming 2.1 Spark Streaming 2.2 Structured Streaming 2.3 ...

  3. Spark二之RDD

    一.Spark RDD 概述 Resilient Distributed Dataset(http://spark.apache.org/docs/latest/rdd-programming-gui ...

  4. 2021年大数据Spark(十八):Spark Core的RDD Checkpoint

    目录 RDD Checkpoint 引入 API 代码演示 总结:持久化和Checkpoint的区别 问题: 答案: 区别: RDD Checkpoint 引入 RDD 数据可以持久化,但是持久化/缓 ...

  5. 2021年大数据Spark(十七):Spark Core的RDD持久化

    目录 RDD 持久化 引入 API 缓存/持久化函数 缓存/持久化级别 释放缓存/持久化 代码演示 总结:何时使用缓存/持久化 RDD 持久化 引入 在实际开发中某些RDD的计算或转换可能会比较耗费时 ...

  6. 2021年大数据Spark(十五):Spark Core的RDD常用算子

    目录 常用算子 基本算子 分区操作函数算子 重分区函数算子 1).增加分区函数 2).减少分区函数 3).调整分区函数 ​​​​​​​聚合函数算子 ​​​​​​​Scala集合中的聚合函数 ​​​​​ ...

  7. spark 中的RDD编程:基于Java api

    1.RDD介绍: RDD,弹性分布式数据集,即分布式的元素集合.在spark中,对所有数据的操作不外乎是创建RDD.转化已有的RDD以及调用RDD操作进行求值.在这一切的背后,Spark会自动将RDD ...

  8. 02、体验Spark shell下RDD编程

    02.体验Spark shell下RDD编程 1.Spark RDD介绍 RDD是Resilient Distributed Dataset,中文翻译是弹性分布式数据集.该类是Spark是核心类成员之 ...

  9. Spark中,RDD概述(五大属性,弹性介绍,5个特性)

    1 什么是RDD RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark 中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行计算的集合.在Spa ...

最新文章

  1. 大数据将改变信息生命周期管理
  2. 安装与使用 Trac 来管理项目
  3. ListDataView:让你的List可以被任何Site引用
  4. html垂直线性渐变,html5线性渐变
  5. 编译WINDOWS版SDL2:You should run hg revert SDL_config.h
  6. 文献管理工具全家桶(引文网络,追踪,管理,多平台同步,快速下载)
  7. DBImport v3.0 中文版发布:支持各大数据库数据互导(IT人员必备工具)
  8. 百度云如何免费扩容至2055G?
  9. 关于opencv的rows和cols的理解
  10. 看看十二星座哪个更适合当程序员
  11. luogu P4643 [国家集训队]阿狸和桃子的游戏
  12. 团队合作开发的两种文档工具
  13. dubbo中文官网地址
  14. RMF客户消费行为评分模型
  15. 看得懂的猪周期,牧原们却不一定跨得过
  16. 【C语言】C语言中赋值语句规则
  17. 软件测试工程师必备技能——Linux基础知识
  18. 2019年各大互联网公司端午节礼盒指南
  19. panic: <Ormer> table: `.` not found, make sure it was registered with `RegisterModel()`
  20. CenterNet - CornerNet

热门文章

  1. miui11可用的位置模拟器_MIUI11内测体验包
  2. 南方周末:股神炒股一周年祭 24万本金仅剩7千
  3. 直播app开发公司手把手搭建一套简单的直播系统
  4. Android Studio设置-单行注释格式化时的缩进处理
  5. 深入 WEP和 WPA密码原理 1
  6. 东华大学计算机博士毕业要求,东华大学研究生在学期间发表学术论文要求的暂行规定(修订)...
  7. V-if 的常见用法
  8. A_A03_007 CH32串口软件安装与CH32程序串口下载
  9. 偷偷看了同事的代码找到了优雅代码的秘密
  10. 电线线缆铜芯和铝芯有什么区别?哪个更好呢?