阅读本篇博文时,请先理解RDD的描述及作业调度:[《深入理解Spark 2.1 Core (一):RDD的原理与源码分析 》](http://blog.csdn.net/u011239443/article/details/53894611#t16)

Join数据是我们在Spark操作中的很重要的一部分。Spark Core 和Spark SQL的基本类型都支持join操作。虽然join很常用而且功能很强大,但是我们使用它的时候,我们不得不考虑网络传输和所处理的数据集过大的问题。在Spark Core中,DAG优化器不像SQL优化器,它不能够重命令或者下压过滤。所以,Spark操作顺序对于Spark Core显得尤为重要。

这篇博文,我们将介绍RDD类型的join操作。通常来说,join是一中非常昂贵的操作。对于每个child RDD 的partition,它从各个parent RDD 的 各个 partition中,获取到与该child RDD partition上的key相对应的所有值。

假设有两个parent RDD,它们都没有已知的partitioner(可以理解为该RDD到其child RDD 重分区函数),那么它们就需要shuffle,使得它们共享同一个partitioner,并且有着相同key的数据会在同一个partition里面。如上图“join with inputs not co-partitioned”。

若parent RDD有已知的partitioner(若已知的partitioner相同,两个RDD会协同,那么就能避免网络传输,两个parent RDD 的相同partition会在同一个节点上),那么可能如上图的“join with inputs co-partitioned”,只能产生窄依赖。

和大多数的K/V操作一样,随着key的数量的增加和记录之间的距离的增加(需要寻找其所在的partition),join的花费会越来越高。

选择join的类型

默认情况下,Spark只会对两个RDD的key的值进行join。在有多个相同key值的情况下,会生成所有的K/V对。所以,标准join的最好的情况是,两个RDD有相同的key集合,而且该key集合中的key都是互斥的。若有重复的key,数据量会急剧的扩大以至于导致性能问题。若有个key只在一个RDD中出现了,那么你将失去那行数据。所以,有以下几条建议:

  1. 若两个RDD都有有重复的key,join操作会使得数据量会急剧的扩大。所有,最好先使用distinct或者combineByKey操作来减少key空间或者用cogroup来处理重复的key,而不是产生所有的交叉结果。在combine时,进行机智的分区,可以避免第二次shuffle。
  2. 如果只在一个RDD出现,那你将在无意中丢失你的数据。所以使用外连接会更加安全,这样你就能确保左边的RDD或者右边的RDD的数据完整性,在join之后再过滤数据。
  3. 如果我们容易得到RDD的可以的有用的子集合,那么我们可以先用filter或者reduce,如何在再用join。

总之,join通常是你在使用Spark时最昂贵的操作,需要在join之前应尽可能的先缩小你的数据。

假设,你有一个RDD存着(熊猫id,分数),另外一个RDD存着(熊猫id,邮箱地址)。若你想给每只可爱的熊猫的邮箱发送她所得的最高的分数,你可以将RDD根据id进行join,然后计算最高的分数,如下:

def joinScoresWithAddress1( scoreRDD : RDD[(Long, Double)],
addressRDD : RDD[(Long, String )]) : RDD[(Long, (Double, String))]= {val joinedRDD = scoreRDD.join(addressRDD)joinedRDD.reduceByKey( (x, y) => if(x._1 > y._1) x else y )
}
  • 1
  • 2
  • 3
  • 4
  • 5

然而,这可能不会比先减少分数数据的方案快。先计算最高的分数,那么每个熊猫的分数数据就只有一行,接下来再join地址数据:

def joinScoresWithAddress2( scoreRDD : RDD[(Long, Double)],
addressRDD : RDD[(Long, String )]) : RDD[(Long, (Double, String))]= {val bestScoreData = scoreRDD.reduceByKey((x, y) => if(x > y) x else y)bestScoreData.join(addressRDD)
}
  • 1
  • 2
  • 3
  • 4
  • 5

若每个熊猫有1000个不同的分数,那么这种做法的shuffle量就比上一种的小了1000倍。

如果你想要左外连接,保留分数数据中地址数据所没有的熊猫,那么你可以用leftOuterJoin来替代join。Spark还有fullOuterJoin和rightOuter,可以根据你想保留的记录选择使用。丢失的值会以None的形式存在:

def outerJoinScoresWithAddress( scoreRDD : RDD[(Long, Double)],
addressRDD : RDD[(Long, String )]) : RDD[(Long, (Double, Option[String]))]= {val joinedRDD = scoreRDD.leftOuterJoin(addressRDD)joinedRDD.reduceByKey( (x, y) => if(x._1 > y._1) x else y )
}
  • 1
  • 2
  • 3
  • 4
  • 5

选择执行过程

由于join数据时,Spark需要被join的数据在相同的分区。所有,默认实现是进行shuffled hash join。shuffled hash join会将第二个数据集按照第一个数据分区,这么一来有着相同hash值的key就会在相同的分区中了。虽然这种方法有用,但由于它需要shuffle,所以很昂贵。而以下情况可以避免shuffle:

  1. 连个RDD都有已知的partitioner
  2. 其中一个数据足够小到保存到内存这么一来我们就可以使用广播 hash join

注意,若RDD协同了,那么网络传输和shuffle都能避免。

通过分配已知Partitioner来加速Join

Spark是一个分布式的计算引擎,可以通过分区的形式将大批量的数据划分成n份较小的数据集进行并行计算。这种思想应用到Join上便是Shuffle Hash Join了。利用key相同必然分区相同的这个原理,Spark将较大表的join分而治之,先将表划分成n个分区,再对两个表中相对应分区的数据分别进行Hash Join。其原理如下图: 

而这仅仅适用于join with inputs co-partitions的情况。当在join with inputs not co-partitions

首先将两张表按照join keys进行了重新shuffle,保证join keys值相同的记录会被分在相应的分区。分区后对每个分区内的数据进行排序,排序后再对相应的分区内的记录进行连接。

但是shuffle操作十分昂贵,为了避免shuffle,我们可以通过使用下面的方案:

如果你在做join之前,其中一个RDD(RDD_A)不得不先进行一个shuffle操作,比如说aggregateByKey或者reduceByKey。你可以将该shuffle操作所使用的partitioner设置为另外一个RDD(RDD_B)的partitioner(若RDD_B的partitioner为None,则根据RDD_Bd的分区数new一个HashPartitioner),并持久化RDD_A,这样一来就可以避免两个RDD join时的shuffle了:

def joinScoresWithAddress3( scoreRDD : RDD[(Long, Double)],
addressRDD : RDD[(Long, String )]) : RDD[(Long, (Double, String))]= {val addressDataPartitioner = addressRDD.partitioner match {case (Some(p)) => pcase (None) => new HashPartitioner(addressRDD.partitions.length)}val bestScoreData = scoreRDD.reduceByKey(addressDataPartitioner, (x, y) => if(x > y) x else y)bestScoreData.cachebestScoreData.join(addressRDD)
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

建议在重分区后总是进行持久化。

广播 Hash Join

若RDD_B小到足以存到内存,那么我们可以使用广播变量将它push到各个节点。这种方法不需要shuffle,而是RDD_A的各个分区,分别会被RDD_B中相关的值join上,形成Child RDD 对应的分区。有时候Spark会很机智的自动帮你做这件事。

部分手动广播 Hash Join

有时候,我们的RDD_B并不能足够小到都能装进内存,但是有些RDD_A中的key会重复很多次,这时候你就可以想着只广播RDD_B中在RDD_A中出现最频繁的那些值。当一种key值在RDD_A中多到一个partition都装不下时,这种方法会非常有用。在这种情况下,你可以对RDD_A使用countByKeyApprox来近似得到哪些key需要广播。然后,你将从RDD_B中filter出来需要广播的RDD_B_0和不要广播的RDD_B_1,将RDD_B_0 collect成本地的HashMap。使用sc.broadcast广播该HashMap,使得每个节点都有一个备份,与RDD_A手动的执行join,得到结果RDD_C_1。再根据HashMap将RDD_A中多次重复的key值去掉,生成RDD_A_1。对RDD_A_1和RDD_B_1进行标准的join,得到结果RDD_C_0,并unoin上RDD_C_1,得到最终的结果。

这种方法虽然有点复杂,但是在对高度倾斜的数据进行处理时的效果很好。

RDD Join 性能调优相关推荐

  1. Spark性能调优-RDD算子调优篇

    Spark性能调优-RDD算子调优篇 RDD算子调优 1. RDD复用 在对RDD进行算子时,要避免相同的算子和计算逻辑之下对RDD进行重复的计算,如下图所示: 对上图中的RDD计算架构进行修改,得到 ...

  2. Spark商业案例与性能调优实战100课》第2课:商业案例之通过RDD实现分析大数据电影点评系统中电影流行度分析

    Spark商业案例与性能调优实战100课>第2课:商业案例之通过RDD实现分析大数据电影点评系统中电影流行度分析 package com.dt.spark.coresimport org.apa ...

  3. Spark商业案例与性能调优实战100课》第3课:商业案例之通过RDD分析大数据电影点评系各种类型的最喜爱电影TopN及性能优化技巧

    Spark商业案例与性能调优实战100课>第3课:商业案例之通过RDD分析大数据电影点评系各种类型的最喜爱电影TopN及性能优化技 源代码 package com.dt.spark.coresi ...

  4. spark 算子例子_Spark性能调优方法

    公众号后台回复关键词:pyspark,获取本项目github地址. Spark程序可以快如闪电⚡️,也可以慢如蜗牛?. 它的性能取决于用户使用它的方式. 一般来说,如果有可能,用户应当尽可能多地使用S ...

  5. Apache Spark Jobs 性能调优(二)

    Apache Spark Jobs 性能调优(二) 调试资源分配 调试并发 压缩你的数据结构 数据格式 在这篇文章中,首先完成在 Part I 中提到的一些东西.作者将尽量覆盖到影响 Spark 程序 ...

  6. Apache Spark Jobs 性能调优(一)

    Apache Spark Jobs 性能调优(一) Spark 是如何执行程序的 选择正确的 Operator 什么时候不发生 Shuffle 什么情况下 Shuffle 越多越好 二次排序 结论 当 ...

  7. Spark的性能调优

    下面这些关于Spark的性能调优项,有的是来自官方的,有的是来自别的的工程师,有的则是我自己总结的. 基本概念和原则 首先,要搞清楚Spark的几个基本概念和原则,否则系统的性能调优无从谈起: 每一台 ...

  8. 大数据培训:Spark性能调优与参数配置

    Spark性能调优-基础篇 众所周知,正确的参数配置对提升Spark的使用效率具有极大助力,帮助相关数据开发.分析人员更高效地使用Spark进行离线批处理和SQL报表分析等作业. 推荐参数配置模板如下 ...

  9. 大数据技术之_19_Spark学习_07_Spark 性能调优 + 数据倾斜调优 + 运行资源调优 + 程序开发调优 + Shuffle 调优 + GC 调优 + Spark 企业应用案例

    大数据技术之_19_Spark学习_07 第1章 Spark 性能优化 1.1 调优基本原则 1.1.1 基本概念和原则 1.1.2 性能监控方式 1.1.3 调优要点 1.2 数据倾斜优化 1.2. ...

最新文章

  1. Java入门教程系列【1】Java基本数据类型 小白必入系列
  2. Web 单点登录系统
  3. 华为鸿蒙vogtloop30pro价格,华为Mate30系列基本确认:首发麒麟985+鸿蒙系统,价格感人!...
  4. 软件能力成熟度CMMI3管理过程域
  5. 用“混序”替代鲍尔默
  6. 数据结构C语言版之线性表
  7. Android 硬编码
  8. 微分方程数值解法的matlab程序
  9. 计算机图形学 全局光照及方法,实时全局光照渲染研究
  10. oracle 日志查看教程,Oracle 查看日志
  11. oracle递归查询出现死循环,一次马失前蹄的SQL优化:递归查询引发的血案
  12. vb远程访问dde服务器,做wincc与VB的dde连接一定要用ddeserver吗?
  13. illegal instruction
  14. OSPF-1.ospf基础及工作流程
  15. 详细直播平台开发细节,提供成品直播系统源码
  16. NORDIC Thing:52 Android App 学习之二:手机 App 蓝牙服务发现及数据读取
  17. 码率控制(一):理解码率控制模式(x264,x264,vpx)
  18. 新的音乐播放器 xmms2
  19. CentOS 之 libc-dev 安装
  20. 莫把“李鬼”当成“李逵”,警惕元宇宙、区块链网络陷阱

热门文章

  1. git-SSH连接配置
  2. 常见的问题:https://localhost:1158/em 无法打开
  3. 算法面试题:找出由两个有序列表合并而成的新列表中的第n个元素
  4. 测试笔的使用_宽带故障怎么办?毕亚兹红光笔1秒定位光纤故障,快速解决问题...
  5. linux内核获取cpu,如何在Linux上使用C获取CPU信息,例如内核数量?
  6. 数据库-MySQL-数据库和表
  7. 张小龙Linux微信,微信至今没有黑暗模式,原来是张小龙“全责”?
  8. MySQL innodb_page_size
  9. 直播预告丨 统一数据操作平台— CloudQuery 应用指南
  10. 承上 DBlink 与 SCN | 新增视图找出外部 SCN 跳变