Spark总结之RDD(四)
Spark总结之RDD(四)
1. 背景
- Spark针对RDD的整体设计思想很多地方和MapReduce相似,但又做了很多优化.
- Spark整体API设计针对分布式数据处理做了很多优化,并且做了更高层级的抽象,API使用更加简单便捷.例如RDD\DataSet\DataFrame\DStream等
- 本文主要关于RDD的介绍,RDD类型较多,大的分类是Transformation和Action类型,但具体的RDD中又可以进一步细分为基础的RDD和在基础RDD之上的高级RDD(内部调用基础RDD以及各种操作)
- 好的API设计确实是使用便利,而把复杂的内部实现屏蔽起来.并且会对API接口做分层处理,Spark的RDD的API设计就很好体现了这一点,后续源码会有提及.
2. RDD类型
2.1 cogroup
顾名思义,协分组,可以看成是将多个RDD聚合然后根据key进行划分.
val conf: SparkConf = new SparkConf().setAppName(classOf[CogroupTest].getName)conf.setMaster("spark://linux101:7077")val sc = new SparkContext(conf)// 创建rddval rdd1: RDD[(String, Int)] = sc.makeRDD(List(("tome", 1), ("tome", 2), ("jim", 1), ("tony", 1), ("sanny", 1)), 2)val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("tome", 3), ("tome", 1), ("jim", 4), ("tony", 6), ("sanny", 1)), 2)// cogroup,将多个rdd联合起来,进行groupByKey的操作。// 注意这种方式面对大数据场景,由于没有reduceByKey这种提前局部聚合,效率较低// 使用union,然后reduceByKey相对更加高效一些val res: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)println(res.collect().toBuffer)Thread.sleep(10000000)
运算结果
ArrayBuffer((jim,(CompactBuffer(1),CompactBuffer(4))), (tony,(CompactBuffer(1),CompactBuffer(6))), (sanny,(CompactBuffer(1),CompactBuffer(1))), (tome,(CompactBuffer(1, 2),CompactBuffer(3, 1))))
可以看出,最后的运算结果,是2个RDD中相同的key,value是2者合并起来的结果集合
实际cogroup的源码如下
注意有多参数的方法重载
最终实现这里,可以看到,就是创建了一个CoGroupedRDD,然后这个RDD使用mapValues对内部的数据做处理
处理逻辑就是当前调用cogroup的RDD的values,以及作为参数的RDD的values,其中的values转换为Iterable
返回值最终是RDD[(K, (Iterable[V], Iterable[W]))] ,可以看出最后生成的是对偶数组,key就是2个RDD中的key,value就是2个RDD的value对应数据形成的迭代器,2个迭代器组合而成的对偶数组.
所以整体可以把cogoup看做是把2个或多个RDD相同key的数据抽离出来,放在一起,但结果是以Tuple形式,Tuple中就是不同RDD的values形成的迭代器.
cogroup是产生了shuffle的
http://windows:4040/jobs/job/?id=0 web页面查看信息
上述可以看出,这里有3个stage, 每个stage是2个分区,并行处理2个task
注意,如果是需要对多个RDD的数据进行处理,可以先union,再局部聚合reduceByKey,这样更加高效.不过数据量少时,使用cogroup性能耗时上不会有太大差异
注意,shuffle的本质和英文名字类似,洗牌.Saprk和MapReduce的shuffle一样,都是把数据打散
数据打散就意味着需要有一个规则将数据划分到不同的区域去,一般是分区器来做这个事情.默认一般是hashPartitioner
实际shuffle分为2个阶段,shuffle write,shuffle read.也就是shuffle前期,数据从内存溢出写到磁盘,然后下游需要拉取这些写道磁盘的数据
shuffle和是否网络传输无关因为上下游有可能在同一个计算机节点上,直接本地通信即可.
shuffle会产生对应的map task
2.2 intersection
顾名思义,就是求RDD之间的交集,隐含着的条件就是2个RDD的数据类型需要一致.(是指RDD产生出来的数据类型需要一致)
val conf: SparkConf = new SparkConf().setAppName("IntersectionTest").setMaster("local[*]")val sc = new SparkContext(conf)// 交集val rdd1: RDD[Int] = sc.makeRDD(List(2, 3, 4, 5), 2)val rdd2: RDD[Int] = sc.makeRDD(List( 4, 5, 6, 7), 2)// 就跟集合操作一样,交集就是求出2个集合中共同拥有的元素// 所以隐含着一个条件,数据类型需要一样,因为需要2者之间可以对比和判断val res: RDD[Int] = rdd1.intersection(rdd2)// 打印结果println(res.collect().toBuffer)sc.stop()
结果
ArrayBuffer(4, 5)
源码实现
第一步是把数据转换成key value形式,2个RDD的数据,key就是本身的数据,value是null
然后是转换后的数据进行cogroup操作,这样得出的就是相同的key,对应数据都放在一起.key, value就是2个迭代器组成的tuple,迭代器中数据是null类型
然后对数据做filter筛选操作, 这里使用的是偏函数,匹配模式.如果是交集,那么肯定第一个迭代器leftGroup和第二个迭代器rightGroup中数据都不为空,这样才算是交集数据
最后是将筛选后的数据,把keys取出来,因为要的也是交集数据,value只是一个一个的null,无用
从上述源码实现来看,cogroup可以看成是一个相对底层的算子,intersection则是一个相对高级的算子.
整体实现思路来看,其实spark的api屏蔽了分布式处理的细节,转而改为对抽象数据集合RDD的操作,就跟scala的集合转换和处理是一样的.
注意这里使用了cogroup,则一定会有shuffle
2.3 join
- 顾名思义,这个算子和sql中的join非常相似,都是将不同数据集合中根据相同字段以及值进行拼接组合.
- 这也就意味着,进行组合的字段类型,值必须相同才能够组合.类似于join on的字段一样.
- 默认,都是RDD中数据的第一个字段进行组合关联,所以一般这里的数据都是tuple2对偶数组
val conf: SparkConf = new SparkConf().setAppName("JoinTest").setMaster("local[*]")val sc = new SparkContext(conf)// join 2个RDD进行joinval rdd1: RDD[(Int, String)] = sc.parallelize(List((1, "1"),(2, "dd"),(1, "haha")))val rdd2: RDD[(Int, String)] = sc.parallelize(List((1, "ui"),(2, "pm"),(1, "coder")))val res: RDD[(Int, (String, String))] = rdd1.join(rdd2)println(res.collect().toBuffer)sc.stop()
源码
从源码可以看出,2个RDD的数据先进行cogroup处理,
然后使用flatMapValues,针对values进行操作
map中操作是一个嵌套for循环,就是先遍历2个RDD中第一个RDD的数据,然后遍历第二个RDD中数据,最后将匹配的数据汇总搜集起来
最后使用flat操作展平,因为最后得到的结果可能是key有多个数据,value也有多个数据,而需要的数据是按照key分别展平的数据
2.4 leftOuterJoin
和sql中一样,leftjoin,会以左侧数据为准,就算左侧数据没有字段能够和右侧的数据集匹配上,数据也会保留.
val conf: SparkConf = new SparkConf().setAppName("JoinTest").setMaster("local[*]")val sc = new SparkContext(conf)// join 2个RDD进行joinval rdd1: RDD[(Int, String)] = sc.parallelize(List((1, "1"),(2, "dd"),(3, "haha")))val rdd2: RDD[(Int, String)] = sc.parallelize(List((1, "ui"),(2, "pm"),(1, "coder")))val res: RDD[(Int, (String, Option[String]))] = rdd1.leftOuterJoin(rdd2)println(res.collect().toBuffer)sc.stop()
运行结果
ArrayBuffer((1,(1,Some(ui))), (1,(1,Some(coder))), (2,(dd,Some(pm))), (3,(haha,None)))
从上述结果可以看出,就算右侧数据集合没有匹配上数据,左侧的数据也会被保留.只不过以None进行替代
源码
可以看出,2个RDD之间使用了cogroup操作
在此基础上,使用flatmapValues对值进行操作
map操作逻辑中,先判断第二个RDD中数据是否为空,是的话,则返回数据是(key, None)类型
如果第二个RDD中数据不为空,则进行嵌套for循环,然后把循环的数据使用yield搜集起来,注意这里的数据是对偶元组,第二个RDD中数据是Some(V)类型,就是Option值
Option值是针对Java中空数值或者对象的一个很大提升,显示地提示程序员这里可能为空,可以很明显降低null pointer地几率.
这是一种很有意思地编程接口设计思路,既然无法避免,那就让它让直接暴露出来,因为好处很大,所以swift也吸收了这种思路,也提供了option这种数据类型,可选值类型
2.5 rightOuterJoin
val conf: SparkConf = new SparkConf().setAppName("JoinTest").setMaster("local[*]")val sc = new SparkContext(conf)// join 2个RDD进行joinval rdd1: RDD[(Int, String)] = sc.parallelize(List((1, "1"),(2, "dd"),(1, "haha")))val rdd2: RDD[(Int, String)] = sc.parallelize(List((1, "ui"),(2, "pm"),(10, "coder")))val res: RDD[(Int, (Option[String], String))] = rdd1.rightOuterJoin(rdd2)println(res.collect().toBuffer)sc.stop()
运行结果
ArrayBuffer((1,(Some(1),ui)), (1,(Some(haha),ui)), (2,(Some(dd),pm)), (10,(None,coder)))
注意这里,join是以右侧数据为主,就算没有匹配上,右侧数据不会被丢弃,只是会把左侧地数据以None标记,占位并保存下来.
源码
这里可以看出,先cogroup操作,然后使用flatmapValues对数据进行处理
处理逻辑中,也是先判断第一个RDD的数据是否为空,是的话,数据就会转换为(None, value)的形式
如果第一个RDD的数据不为空,则使用嵌套for循环,然后把数据搜集起来,形成一个一个对偶元组
最后使用flat操作,把数据展平
2.6 fullOuterJoin
和sql中一样,fulljoin,不管2侧数据集合是否有匹配,都会把数据保留下来.
val conf: SparkConf = new SparkConf().setAppName("JoinTest").setMaster("local[*]")val sc = new SparkContext(conf)// join 2个RDD进行joinval rdd1: RDD[(Int, String)] = sc.parallelize(List((1, "1"), (2, "dd"), (7, "haha")))val rdd2: RDD[(Int, String)] = sc.parallelize(List((1, "ui"), (2, "pm"), (6, "coder")))val res: RDD[(Int, (Option[String], Option[String]))] = rdd1.fullOuterJoin(rdd2)println(res.collect().toBuffer)sc.stop()
运行结果
ArrayBuffer((1,(Some(1),Some(ui))), (2,(Some(dd),Some(pm))), (6,(None,Some(coder))), (7,(Some(haha),None)))
源码
从上述可以看出,先2个RDD之间先进行cogroup操作,然后进行flatMapValues操作
map逻辑是使用偏函数,进行匹配处理
如果第二个RDD中数据为空,Seq(),则将数据转换为(Some(V), None)的形式
如果第一个RDD中数据为空,也类似操作
如果2个都有数据,则使用嵌套for循环,将数据以(Some(v), Some(w))形式搜集起来
最后是flatten操作,数据展平,因为搜集出来的数据可能key,value都是数组形式,需要展平
2.7 substract
求2个集合之间的差集,就是去掉2个差集中相同部分.
val conf: SparkConf = new SparkConf().setAppName("JoinTest").setMaster("local[*]")val sc = new SparkContext(conf)// 差集val rdd1: RDD[Int] = sc.makeRDD(List(1, 3, 5, 6))val rdd2: RDD[Int] = sc.makeRDD(List(5, 6, 7, 8, 9))val res: RDD[Int] = rdd1.subtract(rdd2)println(res.collect().toBuffer)sc.stop()
运行结果
ArrayBuffer(1, 3)
源码
上述源码可以看出,最核心代码是RDD做了转换
转换逻辑是先将数据转成key value形式,value是null
然后再使用subtractByKey进行处理,传入的参数是第二个RDD的数据,
第二个RDD的数据也转换为key value形式
最终结果是把keys取出来,因为value没有 作用,只是进行转换所需.
以下是核心的代码
可以看出,整体思路就是将RDD1中数据转换为key value形式,value就是各个相同key对应的value组成的数据ArrayBuffer.
然后这个map以移除RDD2数据转换成的map对应的keys,这样就可以把差集数据求出来,因为相同key对应的数据已经被移除,所以留下的就是差集数据
2.8 substractByKey
val conf: SparkConf = new SparkConf().setAppName("JoinTest").setMaster("local[*]")val sc = new SparkContext(conf)val rdd3: RDD[(String, Int)] = sc.parallelize(List(("x", 1), ("r", 2), ("t", 3)))val rdd4: RDD[(String, Int)] = sc.parallelize(List(("x", 3), ("t", 2), ("z", 3)))val res2: RDD[(String, Int)] = rdd3.subtractByKey(rdd4)println(res2.collect().toBuffer)
运行结果
ArrayBuffer((r,2))
这个算子比较底层,最终是创建了SubtractedRDD,源代码在上述的substract中
2.9 repartition
顾名思义,重新对数据做分区.
在Spark中,如果发生数据倾斜或者防止,往往可以通过对数据重新分区来解决数据倾斜
另外,解决数据倾斜,可以换分区器,如果是哈希分区,可以把key打散,可以增加内存,可以提前聚合等等操作.都能缓解数据倾斜
val conf: SparkConf = new SparkConf().setAppName("JoinTest").setMaster("local[*]")val sc = new SparkContext(conf)// 重新分区val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8), 4)val rdd1WithIndex: RDD[(Int, Int)] = rdd1.mapPartitionsWithIndex((index, e) => {e.map(x => (index, x))})val rdd2: RDD[Int] = rdd1.repartition(4)val rdd2WithIndex: RDD[(Int, Int)] = rdd2.mapPartitionsWithIndex((index, e) => {e.map(x => (index, x))})println("rdd1WithIndex:"+ rdd1WithIndex.collect().toBuffer)println("rdd2WithIndex:"+ rdd2WithIndex.collect().toBuffer)sc.stop()
运行结果
rdd1WithIndex:ArrayBuffer((0,1), (0,2), (1,3), (1,4), (2,5), (2,6), (3,7), (3,8))rdd2WithIndex:ArrayBuffer((0,2), (0,5), (0,7), (1,3), (1,6), (1,8), (2,4), (3,1))
源码
从上述源码可以看出,最终是生成了一个CoalescedRDD对象
如果需要shuffle,则会先计算shuffle中各个数据分到哪个分区index
注意这里核心代码是重写的compute等代码,但具体作用需要运行起来更加直观
注意,coalesce如果设置参数为false,或者默认调用,是不会产生shuffle的
2.11 coalesce
数据重新分区,可以设置是否shuffle.如果是大分区降低为小分区,可以设置不shuffle,这样数据直接按照区进行聚合即可.
但如果小分区数变成大分区数,则一定需要shuffle,因为数据需要打散到不同分区上去.
val conf: SparkConf = new SparkConf().setAppName("JoinTest").setMaster("local[*]")val sc = new SparkContext(conf)val rdd3: RDD[ Int] = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 4)val indexedRDD1: RDD[(Int, Int)] = rdd3.mapPartitionsWithIndex((index, iter) => {iter.map(x => (index, x))})// 重新分区,但默认不做shuffle,所以这里本质只是聚合val res: RDD[Int] = rdd3.coalesce(3)val indexedResRDD: RDD[(Int, Int)] = res.mapPartitionsWithIndex((index, iter) => {iter.map(x => (index, x))})// 打印println("indexedRDD1: " + indexedRDD1.collect().toBuffer)println("indexedResRDD: " + indexedResRDD.collect().toBuffer)sc.stop()
运行结果
indexedResRDD: ArrayBuffer((0,1), (0,2), (1,3), (1,4), (2,5), (2,6), (2,7), (2,8), (2,9))
Spark总结之RDD(四)相关推荐
- spark基础之RDD详解
一 什么是RDD,有什么特点? RDD: Resilient Distributed Dataset,弹性分布式数据集. 特点: # 它是一种数据的集合 # 它可以被分区,每一个分区分布在不同的集群中 ...
- 【Spark Streaming】(四)基于 Spark Structured Streaming 的开发与数据处理
文章目录 一.前言 二.Spark Streaming vs Structured Streaming 2.1 Spark Streaming 2.2 Structured Streaming 2.3 ...
- Spark二之RDD
一.Spark RDD 概述 Resilient Distributed Dataset(http://spark.apache.org/docs/latest/rdd-programming-gui ...
- 2021年大数据Spark(十八):Spark Core的RDD Checkpoint
目录 RDD Checkpoint 引入 API 代码演示 总结:持久化和Checkpoint的区别 问题: 答案: 区别: RDD Checkpoint 引入 RDD 数据可以持久化,但是持久化/缓 ...
- 2021年大数据Spark(十七):Spark Core的RDD持久化
目录 RDD 持久化 引入 API 缓存/持久化函数 缓存/持久化级别 释放缓存/持久化 代码演示 总结:何时使用缓存/持久化 RDD 持久化 引入 在实际开发中某些RDD的计算或转换可能会比较耗费时 ...
- 2021年大数据Spark(十五):Spark Core的RDD常用算子
目录 常用算子 基本算子 分区操作函数算子 重分区函数算子 1).增加分区函数 2).减少分区函数 3).调整分区函数 聚合函数算子 Scala集合中的聚合函数 ...
- spark 中的RDD编程:基于Java api
1.RDD介绍: RDD,弹性分布式数据集,即分布式的元素集合.在spark中,对所有数据的操作不外乎是创建RDD.转化已有的RDD以及调用RDD操作进行求值.在这一切的背后,Spark会自动将RDD ...
- 02、体验Spark shell下RDD编程
02.体验Spark shell下RDD编程 1.Spark RDD介绍 RDD是Resilient Distributed Dataset,中文翻译是弹性分布式数据集.该类是Spark是核心类成员之 ...
- Spark中,RDD概述(五大属性,弹性介绍,5个特性)
1 什么是RDD RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark 中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行计算的集合.在Spa ...
最新文章
- 大数据将改变信息生命周期管理
- 安装与使用 Trac 来管理项目
- ListDataView:让你的List可以被任何Site引用
- html垂直线性渐变,html5线性渐变
- 编译WINDOWS版SDL2:You should run hg revert SDL_config.h
- 文献管理工具全家桶(引文网络,追踪,管理,多平台同步,快速下载)
- DBImport v3.0 中文版发布:支持各大数据库数据互导(IT人员必备工具)
- 百度云如何免费扩容至2055G?
- 关于opencv的rows和cols的理解
- 看看十二星座哪个更适合当程序员
- luogu P4643 [国家集训队]阿狸和桃子的游戏
- 团队合作开发的两种文档工具
- dubbo中文官网地址
- RMF客户消费行为评分模型
- 看得懂的猪周期,牧原们却不一定跨得过
- 【C语言】C语言中赋值语句规则
- 软件测试工程师必备技能——Linux基础知识
- 2019年各大互联网公司端午节礼盒指南
- panic: <Ormer> table: `.` not found, make sure it was registered with `RegisterModel()`
- CenterNet - CornerNet
热门文章
- miui11可用的位置模拟器_MIUI11内测体验包
- 南方周末:股神炒股一周年祭 24万本金仅剩7千
- 直播app开发公司手把手搭建一套简单的直播系统
- Android Studio设置-单行注释格式化时的缩进处理
- 深入 WEP和 WPA密码原理 1
- 东华大学计算机博士毕业要求,东华大学研究生在学期间发表学术论文要求的暂行规定(修订)...
- V-if 的常见用法
- A_A03_007 CH32串口软件安装与CH32程序串口下载
- 偷偷看了同事的代码找到了优雅代码的秘密
- 电线线缆铜芯和铝芯有什么区别?哪个更好呢?