总结一下遇到的sparksql大小表join情况。

一、数据倾斜

使用sparksql对一张大表和小表join时发现executor节点数据倾斜严重,最终执行超时失败了。查看日志发现对应的执行计划是Sort-Merge Join,查了下该方式适合两张大表join,会把两张表

  1. shuffle:按照join key进行重新分区,两张表数据会分布到整个集群;
  2. sort:对分区中的两表数据,分别进行排序;
  3. merge:对排好序的两张分区表数据执行join操作。分别遍历两个有序序列,碰到相同join key就merge输出,否则取更小一边。
    大小表join可以使用broadcastjoin,将小表先广播分发至executor上,取小表根据key连接,避免了数据shuffle。

二、源码

1.执行join执行计划的SparkStrategy

查看spark中源码,入口为org.apache.spark.sql.execution.SparkStrategy
查看apply方法,可以看到执行不同join,canBuildRight与canBuildLeft判断join方式,canBroadcast方法根据表大小判断是否广播。

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {...}private def canBuildRight(joinType: JoinType): Boolean = joinType match {case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => truecase _ => false}private def canBuildLeft(joinType: JoinType): Boolean = joinType match {case _: InnerLike | RightOuter => truecase _ => false}/*** Matches a plan whose output should be small enough to be used in broadcast join.*/private def canBroadcast(plan: LogicalPlan): Boolean = {plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold}

autoBroadcastJoinThreshold默认10M,plan.stats.sizeInBytes表的大小从hive元数据中取得,默认是long最大值,而本次从视图中读表没有大小,因此不会广播。所以可以将数据导入hdfs建立外表,spark源码中通过hdfs文件获取文件大小,进而取得sizeInBytes 。

2.查询提示(hint)

spark提供了hint方式指定需要广播的表,修改join的sql如下,源码中还会继续计算表的大小判断是否广播:

select /*+ BROADCAST(small) */ big.col_1,big.col_2,small.col_1
from big left outer join small on small.col_1= big.col_2/*** The hint for broadcast hash join or broadcast nested loop join, depending on the availability of* equi-join keys.*/
case object BROADCAST extends JoinStrategyHint {override def displayName: String = "broadcast"override def hintAliases: Set[String] = Set("BROADCAST","BROADCASTJOIN","MAPJOIN")
}

spark3源码中根据是否等值join,解析是否包含hint提示选择不同的执行计划。

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {// If it is an equi-join, we first look at the join hints w.r.t. the following order://   1. broadcast hint: pick broadcast hash join if the join type is supported. If both sides//      have the broadcast hints, choose the smaller side (based on stats) to broadcast.//   2. sort merge hint: pick sort merge join if join keys are sortable.//   3. shuffle hash hint: We pick shuffle hash join if the join type is supported. If both//      sides have the shuffle hash hints, choose the smaller side (based on stats) as the//      build side.//   4. shuffle replicate NL hint: pick cartesian product if join type is inner like.//// If there is no hint or the hints are not applicable, we follow these rules one by one://   1. Pick broadcast hash join if one side is small enough to broadcast, and the join type//      is supported. If both sides are small, choose the smaller side (based on stats)//      to broadcast.//   2. Pick shuffle hash join if one side is small enough to build local hash map, and is//      much smaller than the other side, and `spark.sql.join.preferSortMergeJoin` is false.//   3. Pick sort merge join if the join keys are sortable.//   4. Pick cartesian product if join type is inner like.//   5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have//      other choice.case j @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) =>def createBroadcastHashJoin(buildLeft: Boolean, buildRight: Boolean) = {val wantToBuildLeft = canBuildLeft(joinType) && buildLeftval wantToBuildRight = canBuildRight(joinType) && buildRightgetBuildSide(wantToBuildLeft, wantToBuildRight, left, right).map { buildSide =>Seq(joins.BroadcastHashJoinExec(leftKeys,rightKeys,joinType,buildSide,nonEquiCond,planLater(left),planLater(right)))}}def createShuffleHashJoin(buildLeft: Boolean, buildRight: Boolean) = {val wantToBuildLeft = canBuildLeft(joinType) && buildLeftval wantToBuildRight = canBuildRight(joinType) && buildRightgetBuildSide(wantToBuildLeft, wantToBuildRight, left, right).map { buildSide=>Seq(joins.ShuffledHashJoinExec(leftKeys,rightKeys,joinType,buildSide,nonEquiCond,planLater(left),planLater(right)))}}def createSortMergeJoin() = {if (RowOrdering.isOrderable(leftKeys)) {Some(Seq(joins.SortMergeJoinExec(leftKeys, rightKeys, joinType, nonEquiCond, planLater(left), planLater(right))))} else {None}}def createCartesianProduct() = {if (joinType.isInstanceOf[InnerLike]) {// `CartesianProductExec` can't implicitly evaluate equal join condition, here we should// pass the original condition which includes both equal and non-equal conditions.Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), j.condition)))} else {None}}def createJoinWithoutHint() = {createBroadcastHashJoin(canBroadcast(left) && !hint.leftHint.exists(_.strategy.contains(NO_BROADCAST_HASH)),canBroadcast(right) && !hint.rightHint.exists(_.strategy.contains(NO_BROADCAST_HASH))).orElse {if (!conf.preferSortMergeJoin) {createShuffleHashJoin(canBuildLocalHashMap(left) && muchSmaller(left, right),canBuildLocalHashMap(right) && muchSmaller(right, left))} else {None}}.orElse(createSortMergeJoin()).orElse(createCartesianProduct()).getOrElse {// This join could be very slow or OOMval buildSide = getSmallerSide(left, right)Seq(joins.BroadcastNestedLoopJoinExec(planLater(left), planLater(right), buildSide, joinType, nonEquiCond))}}createBroadcastHashJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint)).orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None }.orElse(createShuffleHashJoin(hintToShuffleHashLeft(hint), hintToShuffleHashRight(hint))).orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }.getOrElse(createJoinWithoutHint())// If it is not an equi-join, we first look at the join hints w.r.t. the following order://   1. broadcast hint: pick broadcast nested loop join. If both sides have the broadcast//      hints, choose the smaller side (based on stats) to broadcast for inner and full joins,//      choose the left side for right join, and choose right side for left join.//   2. shuffle replicate NL hint: pick cartesian product if join type is inner like.//// If there is no hint or the hints are not applicable, we follow these rules one by one://   1. Pick broadcast nested loop join if one side is small enough to broadcast. If only left//      side is broadcast-able and it's left join, or only right side is broadcast-able and//      it's right join, we skip this rule. If both sides are small, broadcasts the smaller//      side for inner and full joins, broadcasts the left side for right join, and broadcasts//      right side for left join.//   2. Pick cartesian product if join type is inner like.//   3. Pick broadcast nested loop join as the final solution. It may OOM but we don't have//      other choice. It broadcasts the smaller side for inner and full joins, broadcasts the//      left side for right join, and broadcasts right side for left join.case logical.Join(left, right, joinType, condition, hint) =>val desiredBuildSide = if (joinType.isInstanceOf[InnerLike] || joinType == FullOuter) {getSmallerSide(left, right)} else {// For perf reasons, `BroadcastNestedLoopJoinExec` prefers to broadcast left side if// it's a right join, and broadcast right side if it's a left join.// TODO: revisit it. If left side is much smaller than the right side, it may be better// to broadcast the left side even if it's a left join.if (canBuildLeft(joinType)) BuildLeft else BuildRight}def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = {val maybeBuildSide = if (buildLeft && buildRight) {Some(desiredBuildSide)} else if (buildLeft) {Some(BuildLeft)} else if (buildRight) {Some(BuildRight)} else {None}maybeBuildSide.map { buildSide =>Seq(joins.BroadcastNestedLoopJoinExec(planLater(left), planLater(right), buildSide, joinType, condition))}}def createCartesianProduct() = {if (joinType.isInstanceOf[InnerLike]) {Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition)))} else {None}}def createJoinWithoutHint() = {createBroadcastNLJoin(canBroadcast(left), canBroadcast(right)).orElse(createCartesianProduct()).getOrElse {// This join could be very slow or OOMSeq(joins.BroadcastNestedLoopJoinExec(planLater(left), planLater(right), desiredBuildSide, joinType, condition))}}createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint)).orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }.getOrElse(createJoinWithoutHint())// --- Cases where this strategy does not apply ---------------------------------------------case _ => Nil}

参考

[1] https://www.cnblogs.com/ssqq5200936/p/13084824.html
[2] https://blog.csdn.net/dabokele/article/details/65963401
[3] https://blog.csdn.net/MrLevo520/article/details/104024814/
[4] https://www.cnblogs.com/importbigdata/p/11247299.html

sparksql中大小表jion相关推荐

  1. 在 sql server 中,查询 数据库的大小 和 数据库中各表的大小

    2019独角兽企业重金招聘Python工程师标准>>> 在 sql server 中,查询 数据库的大小 和 数据库中各表的大小 其实本来只想找一个方法能查询一下 数据库 的大小,没 ...

  2. mysql 每条记录大小_计算数据库中各个表的数据量和每行记录所占用空间

    很多时候我们都需要计算数据库中各个表的数据量和每行记录所占用空间 这里共享一个脚本 CREATE TABLE #tablespaceinfo ( nameinfo VARCHAR(500) , row ...

  3. mysql表空间大小_浅谈mysql中各种表空间(tablespaces)的概念

    mysql中,会涉及到各种表空间的概念,虽然,很多方面这些概念和Oracle有相似性,但也有很多不同的地方,初学者很容易被这些概念弄的晕头转向,从而,混淆这些概念的区别和理解,下面,就简要介绍和说明一 ...

  4. SQL Server:统计数据库中每张表的大小

    1. 统计数据库中每张表的大小 1.1 首先执行下面的命令 exec sp_MSforeachtable @command1="sp_spaceused '?'"; 1.2 检测当 ...

  5. mysql 查看所有表的引擎_MySQL查看数据库、表的占用空间大小以及某个库中所有表的引擎类型...

    本文章来给大家介绍一些常用的MySQL查看数据库.表的占用空间大小sql命令吧,希望此教程 对各位同学会有所帮助. 查看各库的大小代码如下复制代码 SELECT SUM(DATA_LENGTH)+SU ...

  6. 查看MySQL数据库中每个表占用的空间大小

    如果想知道MySQL数据库中每个表占用的空间.表记录的行数的话,可以打开MySQL的 information_schema 数据库.在该库中有一个 TABLES 表,这个表主要字段分别是: TABLE ...

  7. Mysql查询表中每行数据大小_计算数据库中各个表的数据量和每行记录所占用空间的脚本-转载来自(博客园 桦仔)...

    本文出处: 感谢桦仔 的分享精神! 很多时候我们都需要计算数据库中各个表的数据量和每行记录所占用空间 这里共享一个脚本 CREATE TABLE #tablespaceinfo ( nameinfo  ...

  8. 查询mysql数据库表占用空间大小_查看 MySQL 数据库中每个表占用的空间大小-阿里云开发者社区...

    如果想知道MySQL数据库中每个表占用的空间.表记录的行数的话,可以打开MySQL的 information_schema 数据库.在该库中有一个 TABLES 表,这个表主要字段分别是: TABLE ...

  9. hive:统计hive中所有表的大小和创建时间

    目的:获取hive中所有表的创建时间和表大小 本文给出了一个既可以统计hive中非分区表也可以统计分区表的表大小的方式. 由于业务上没有针对分区数据统计的需求,所以可以使用此种方式统计. 如果业务需要 ...

最新文章

  1. 记录下,我们平时开发当中不得不知道的HTTP状态码
  2. hadoop W3SCHOOL
  3. 【FFmpeg】FFmpeg常用基本命令
  4. 力扣刷题【20,21,26,27,35】
  5. centos php 版本升级 至5.3 wordpress3.7
  6. X86 CPU特性之(3)-kaiser
  7. Java中introduce方法_Java基础—继承
  8. 理解Android安全机制
  9. BZOJ 1878: [SDOI2009]HH的项链【莫队】
  10. 数据结构与算法(三):链表
  11. 计算机汉字怎么制作的,如何制作自己的字体?教你快速制作自己手写字体
  12. 知道一点怎么设直线方程_已知两点坐标怎样求直线方程
  13. Pycharm使用技巧:Split Vertically/Horizontally(垂直/水平拆分窗口)
  14. SAP RETAIL 自动补货WRP1R事务代码报错 - Forecast values for determining target stock do not exist -
  15. python 获取当前日期和时间_python获取当前日期和时间的方法
  16. SM2算法加解密中的C1,C2,C3
  17. 神经网络 深度神经网络,深度神经网络训练
  18. 03、【电脑维修】防火墙丢失,找不到 windows firewall服务, windows defender firewall服务被禁用或防火墙无法打开
  19. 札记-20190531
  20. 腾讯投资未来智安,企业为什么要使用ssl数字证书?

热门文章

  1. 字符串比较大小(C语言)
  2. Android深色模式下,看不见字的解决办法
  3. 功率放大器设计方案(包含原理图+PCB+BOM表)
  4. 北雄安 南佛山:区块链政务应用C位城市已出线 ——区块链市政应用四级梯队盘点...
  5. 十三水牌型 图片_十三水,得玩法到底有多少种!
  6. uniapp vue页面嵌套webview组件 ,自定义webview大小
  7. 南邮汇编语言程序设计实验二————用户登录验证程序的设计
  8. B/S C/S架构分析与区别
  9. 基于Java+SpringBoot+vue+element实现毕业就业招聘系统
  10. Ubuntu使用apt管理安装软件包