背景

对于spark的不同的join操作来说,有些谓词是能下推,是有谓词是不能下推的,今天我们实地操作一番,从现象和源码级别分析一下,这到底是怎么回事。

版本&环境

spark 3.2.0
macbook pro

理论基础

1. 参考hive OuterJoinBehavior

我们解释一下几个名词:

2. join type

根据当前spark版本,我们把join类型分为以下多种类型,也就是我们进行验证的各种join类型

  • inner
  • outer | full | fullouter
  • leftouter | left
  • rightouter | right
  • leftsemi | semi
  • leftanti | anti
  • cross

因为 fullouter join和inner join以及leftsemi/anti join 在join中谓词和join后谓词是没有区别的,所以我们不探讨
ross join 没有on操作这么一说,所以我们也不探讨

注意:理论只是理论,在实际应用中会做一些优化,这和理论是有区别

3.sql解析

对于spark来说,任何一个sql的解析都会经过以下几个阶段:

Unresolved Logical Plan -> Analyzer Logical Plan -> Optimzer Logical Plan -> SparkPlan -> ExecutedPlan

先说结论

outer join 留存表 补空表
join中谓词 不下推 下推
join后谓词 下推 下推

实践分析

运行以下代码:

 def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("delta-merge").config("spark.master", "local[1]").config("spark.app.name", "demo").config("spark.sql.adaptive.autoBroadcastJoinThreshold", -1).config("spark.sql.autoBroadcastJoinThreshold", -1).config(SQLConf.PLAN_CHANGE_LOG_LEVEL.key, "warn").getOrCreate()spark.sparkContext.setLogLevel("info")import spark.implicits._val df1 = Seq((BigDecimal("11"), 1),(BigDecimal("22"), 2),(BigDecimal("33"), 3)).toDF("decNum1", "intNum1")df1.write.mode(SaveMode.Overwrite).parquet("df1.parquet")val df2 = Seq((BigDecimal("1111"), 1),(BigDecimal("2222"), 2),(BigDecimal("4444"), 4)).toDF("decNum2", "intNum2")df2.write.mode(SaveMode.Overwrite).parquet("df2.parquet")spark.sql("select null > 2").show(2)val dfP1 = spark.read.parquet("df1.parquet")val dfP2 = spark.read.parquet("df2.parquet")dfP1.createOrReplaceTempView("tbl1")dfP2.createOrReplaceTempView("tbl2")val dfResult = spark.sql("select * from tbl1 join tbl2 on intNum1 == intNum2 where intNum1 > 1")dfResult.show(40, false)dfResult.explain("extended")println("==========")dfResult.queryExecution.tracker.rules map {case (key, value: RuleSummary) if (value.numEffectiveInvocations > 0) =>println(s"$key, $value")case (_, _) =>}Thread.sleep(10000000L)}

spark.sql.adaptive.autoBroadcastJoinThreshold 和spark.sql.autoBroadcastJoinThreshold设置为-1
是为了把SMJ(sort merge join)转换为BHJ(broastcast hash join)给禁掉,这样就能看到我们想要的结果。

SQLConf.PLAN_CHANGE_LOG_LEVEL.key和sparkcontext的log级别进行调整
是为了能够打印出sql所经历的逻辑计划优化规则以及物理规则,这样我们就很清楚的知道该条sql被洗礼的过程。

df3.explain(“extended”) 是为了更加清晰直观的打印出各个阶段的计划,方便追踪。

df3.queryExecution.tracker.rules 是为了打印出sql在逻辑计划阶段所经历的解析以及优化规则,排序不分先后,因为后端是用java.util.HashMap存储的。

  • leftouter-join中谓词-留存表
    运行
    val dfResult = spark.sql("select * from tbl1 left outer join tbl2 on intNum1 == intNum2 and intNum1 > 1")

ResolveRelations规则只是用catalog元数据解析出parquet表,如下:

=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ==='Project [*]                                                     'Project [*]+- 'Join LeftOuter, (('intNum1 = 'intNum2) AND ('intNum1 > 1))   +- 'Join LeftOuter, (('intNum1 = 'intNum2) AND ('intNum1 > 1))
!   :- 'UnresolvedRelation [tbl1], [], false                         :- SubqueryAlias tbl1
!   +- 'UnresolvedRelation [tbl2], [], false                         :  +- View (`tbl1`, [decNum1#33,intNum1#34])
!                                                                    :     +- Relation [decNum1#33,intNum1#34] parquet
!                                                                    +- SubqueryAlias tbl2
!                                                                       +- View (`tbl2`, [decNum2#37,intNum2#38])
!                                                                          +- Relation [decNum2#37,intNum2#38] parquet

PushDownPredicates规则有所变化,只是变化了一下on中两个条件的位置,如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Join LeftOuter, ((intNum1#34 = intNum2#38) AND (intNum1#34 > 1))   Join LeftOuter, ((intNum1#34 > 1) AND (intNum1#34 = intNum2#38)):- Relation [decNum1#33,intNum1#34] parquet                        :- Relation [decNum1#33,intNum1#34] parquet+- Relation [decNum2#37,intNum2#38] parquet                        +- Relation [decNum2#37,intNum2#38] parquet

InferFiltersFromConstraints做了谓词下推,但是下推的是补空表,而不是保留表,如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===Join LeftOuter, ((intNum1#34 > 1) AND (intNum1#34 = intNum2#38))   Join LeftOuter, ((intNum1#34 > 1) AND (intNum1#34 = intNum2#38)):- Relation [decNum1#33,intNum1#34] parquet                        :- Relation [decNum1#33,intNum1#34] parquet
!+- Relation [decNum2#37,intNum2#38] parquet                        +- Filter ((intNum2#38 > 1) AND isnotnull(intNum2#38))
!                                                                      +- Relation [decNum2#37,intNum2#38] parquet

其实从源码上我们也可以看到其实现,如下:

 case LeftOuter | LeftAnti =>val allConstraints = getAllConstraints(left, right, conditionOpt)val newRight = inferNewFilter(right, allConstraints)join.copy(right = newRight)

结果:

|decNum1              |intNum1|decNum2                |intNum2|
+---------------------+-------+-----------------------+-------+
|11.000000000000000000|1      |null                   |null   |
|22.000000000000000000|2      |2222.000000000000000000|2      |
|33.000000000000000000|3      |null                   |null   |
+---------------------+-------+-----------------------+-------+

对应的物理计划:

  • leftouter-join中谓词-补空表
    运行
    val dfResult = spark.sql("select * from tbl1 left outer join tbl2 on intNum1 == intNum2 and intNum2 > 1")

这个时候PushDownPredicates规则又有所变化,直接把谓词下推下去了,如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Join LeftOuter, ((intNum1#34 = intNum2#38) AND (intNum2#38 > 1))   Join LeftOuter, (intNum1#34 = intNum2#38):- Relation [decNum1#33,intNum1#34] parquet                        :- Relation [decNum1#33,intNum1#34] parquet
!+- Relation [decNum2#37,intNum2#38] parquet                        +- Filter (intNum2#38 > 1)
!                                                                      +- Relation [decNum2#37,intNum2#38] parquet

源码实现部分参考如下:

case LeftOuter | LeftAnti | ExistenceJoin(_) =>// push down the right side only join filter for right sub queryval newLeft = leftval newRight = rightJoinConditions.reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)Join(newLeft, newRight, joinType, newJoinCond, hint)

InferFiltersFromConstraints的规则,也就只是加了isnotnull(intNum2#38)判断,如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===Join LeftOuter, (intNum1#34 = intNum2#38)        Join LeftOuter, (intNum1#34 = intNum2#38):- Relation [decNum1#33,intNum1#34] parquet      :- Relation [decNum1#33,intNum1#34] parquet
!+- Filter (intNum2#38 > 1)                       +- Filter (isnotnull(intNum2#38) AND (intNum2#38 > 1))+- Relation [decNum2#37,intNum2#38] parquet      +- Relation [decNum2#37,intNum2#38] parquet

结果:

+---------------------+-------+-----------------------+-------+
|decNum1              |intNum1|decNum2                |intNum2|
+---------------------+-------+-----------------------+-------+
|11.000000000000000000|1      |null                   |null   |
|22.000000000000000000|2      |2222.000000000000000000|2      |
|33.000000000000000000|3      |null                   |null   |
+---------------------+-------+-----------------------+-------+

对应的物理计划:

  • leftouter-join后谓词-留存表
    运行
    val dfResult = spark.sql("select * from tbl1 left outer join tbl2 on intNum1 == intNum2 where intNum1 > 1")

PushDownPredicates规则把filter进行了下推,如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Filter (intNum1#34 > 1)                          Join LeftOuter, (intNum1#34 = intNum2#38)
!+- Join LeftOuter, (intNum1#34 = intNum2#38)     :- Filter (intNum1#34 > 1)
!   :- Relation [decNum1#33,intNum1#34] parquet   :  +- Relation [decNum1#33,intNum1#34] parquet
!   +- Relation [decNum2#37,intNum2#38] parquet   +- Relation [decNum2#37,intNum2#38] parquet

InferFiltersFromConstraints规则把谓词进行了推导,补空表也进行了下推,如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===Join LeftOuter, (intNum1#34 = intNum2#38)        Join LeftOuter, (intNum1#34 = intNum2#38)
!:- Filter (intNum1#34 > 1)                       :- Filter (isnotnull(intNum1#34) AND (intNum1#34 > 1)):  +- Relation [decNum1#33,intNum1#34] parquet   :  +- Relation [decNum1#33,intNum1#34] parquet
!+- Relation [decNum2#37,intNum2#38] parquet      +- Filter ((intNum2#38 > 1) AND isnotnull(intNum2#38))
!                                                    +- Relation [decNum2#37,intNum2#38] parquet

运行结果如下:

+---------------------+-------+-----------------------+-------+
|decNum1              |intNum1|decNum2                |intNum2|
+---------------------+-------+-----------------------+-------+
|22.000000000000000000|2      |2222.000000000000000000|2      |
|33.000000000000000000|3      |null                   |null   |
+---------------------+-------+-----------------------+-------+

对应的物理计划:

  • leftouter-join后谓词-补空表
    运行:
    val dfResult = spark.sql("select * from tbl1 left outer join tbl2 on intNum1 == intNum2 where intNum2 > 1")

但是多了一条EliminateOuterJoin规则,这个规则会把left join操作,变换为inner join,如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin ===Filter (intNum2#38 > 1)                          Filter (intNum2#38 > 1)
!+- Join LeftOuter, (intNum1#34 = intNum2#38)     +- Join Inner, (intNum1#34 = intNum2#38):- Relation [decNum1#33,intNum1#34] parquet      :- Relation [decNum1#33,intNum1#34] parquet+- Relation [decNum2#37,intNum2#38] parquet      +- Relation [decNum2#37,intNum2#38] parquet

PushDownPredicates规则和InferFiltersFromConstraints分析和leftouter-join后谓词-留存表 一样,只不过join类型变成了inner join(由于EliminateOuterJoin变换的),也是会进行下推.
结果如下:


+---------------------+-------+-----------------------+-------+
|decNum1              |intNum1|decNum2                |intNum2|
+---------------------+-------+-----------------------+-------+
|22.000000000000000000|2      |2222.000000000000000000|2      |
+---------------------+-------+-----------------------+-------+

对应的物理计划:

  • rightouter join中谓词-留存表
    运行:
    val dfResult = spark.sql("select * from tbl1 right outer join tbl2 on intNum1 == intNum2 and  intNum2 > 1")

PushDownPredicates规则只是把join条件的位置进行了变化,如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Join RightOuter, ((intNum1#34 = intNum2#38) AND (intNum2#38 > 1))   Join RightOuter, ((intNum2#38 > 1) AND (intNum1#34 = intNum2#38)):- Relation [decNum1#33,intNum1#34] parquet                         :- Relation [decNum1#33,intNum1#34] parquet+- Relation [decNum2#37,intNum2#38] parquet                         +- Relation [decNum2#37,intNum2#38] parquet

而InferFiltersFromConstraints会衍生出下推,如:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===Join RightOuter, ((intNum2#38 > 1) AND (intNum1#34 = intNum2#38))   Join RightOuter, ((intNum2#38 > 1) AND (intNum1#34 = intNum2#38))
!:- Relation [decNum1#33,intNum1#34] parquet                         :- Filter ((intNum1#34 > 1) AND isnotnull(intNum1#34))
!+- Relation [decNum2#37,intNum2#38] parquet                         :  +- Relation [decNum1#33,intNum1#34] parquet
!                                                                    +- Relation [decNum2#37,intNum2#38] parquet

结果:

+---------------------+-------+-----------------------+-------+
|decNum1              |intNum1|decNum2                |intNum2|
+---------------------+-------+-----------------------+-------+
|null                 |null   |1111.000000000000000000|1      |
|22.000000000000000000|2      |2222.000000000000000000|2      |
|null                 |null   |4444.000000000000000000|4      |
+---------------------+-------+-----------------------+-------+

对应的物理计划:

  • rightouter join中谓词-补空表
    运行:
    val dfResult = spark.sql("select * from tbl1 right outer join tbl2 on intNum1 == intNum2 and intNum1 > 1")

PushDownPredicates规则会把补空表进行下推,如:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Join RightOuter, ((intNum1#34 = intNum2#38) AND (intNum1#34 > 1))   Join RightOuter, (intNum1#34 = intNum2#38)
!:- Relation [decNum1#33,intNum1#34] parquet                         :- Filter (intNum1#34 > 1)
!+- Relation [decNum2#37,intNum2#38] parquet                         :  +- Relation [decNum1#33,intNum1#34] parquet
!                                                                    +- Relation [decNum2#37,intNum2#38] parquet

InferFiltersFromConstraints规则,会添加isnull的判断:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===Join RightOuter, (intNum1#34 = intNum2#38)       Join RightOuter, (intNum1#34 = intNum2#38)
!:- Filter (intNum1#34 > 1)                       :- Filter (isnotnull(intNum1#34) AND (intNum1#34 > 1)):  +- Relation [decNum1#33,intNum1#34] parquet   :  +- Relation [decNum1#33,intNum1#34] parquet+- Relation [decNum2#37,intNum2#38] parquet      +- Relation [decNum2#37,intNum2#38] parquet

结果:

+---------------------+-------+-----------------------+-------+
|decNum1              |intNum1|decNum2                |intNum2|
+---------------------+-------+-----------------------+-------+
|null                 |null   |1111.000000000000000000|1      |
|22.000000000000000000|2      |2222.000000000000000000|2      |
|null                 |null   |4444.000000000000000000|4      |
+---------------------+-------+-----------------------+-------+

对应的物理计划:

  • rightouter join后谓词-留存表
    运行:
    val dfResult = spark.sql("select * from tbl1 right outer join tbl2 on intNum1 == intNum2 where intNum2 > 1")

PushDownPredicates规则会把留存表的谓词下推到join之后,如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Filter (intNum2#38 > 1)                          Join RightOuter, (intNum1#34 = intNum2#38)
!+- Join RightOuter, (intNum1#34 = intNum2#38)    :- Relation [decNum1#33,intNum1#34] parquet
!   :- Relation [decNum1#33,intNum1#34] parquet   +- Filter (intNum2#38 > 1)+- Relation [decNum2#37,intNum2#38] parquet      +- Relation [decNum2#37,intNum2#38] parquet

InferFiltersFromConstraints则会进行衍生,如下:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===Join RightOuter, (intNum1#34 = intNum2#38)       Join RightOuter, (intNum1#34 = intNum2#38)
!:- Relation [decNum1#33,intNum1#34] parquet      :- Filter ((intNum1#34 > 1) AND isnotnull(intNum1#34))
!+- Filter (intNum2#38 > 1)                       :  +- Relation [decNum1#33,intNum1#34] parquet
!   +- Relation [decNum2#37,intNum2#38] parquet   +- Filter (isnotnull(intNum2#38) AND (intNum2#38 > 1))
!                                                    +- Relation [decNum2#37,intNum2#38] parquet

结果:

+---------------------+-------+-----------------------+-------+
|decNum1              |intNum1|decNum2                |intNum2|
+---------------------+-------+-----------------------+-------+
|22.000000000000000000|2      |2222.000000000000000000|2      |
|null                 |null   |4444.000000000000000000|4      |
+---------------------+-------+-----------------------+-------+

对应的物理计划:

  • rightouter join后谓词-补空表
    运行:
    val dfResult = spark.sql("select * from tbl1 right outer join tbl2 on intNum1 == intNum2 where intNum1 > 1")

EliminateOuterJoin的规则和PushDownPredicates以及InferFiltersFromConstraints的分析和 leftouter-join后谓词-补空表一样,此处不再累赘

结果:

+---------------------+-------+-----------------------+-------+
|decNum1              |intNum1|decNum2                |intNum2|
+---------------------+-------+-----------------------+-------+
|22.000000000000000000|2      |2222.000000000000000000|2      |
+---------------------+-------+-----------------------+-------+

对应的物理计划:

结论

left join 留存表 补空表
join中谓词 不下推 下推
join后谓词 下推 下推
right join 留存表 补空表
join中谓词 不下推 下推
join后谓词 下推 下推

合并一下就是

outer join 留存表 补空表
join中谓词 不下推 下推
join后谓词 下推 下推

对比之下,其实 理论上说的 join后谓词 补空表不下推和实践中得出来的下推还是有区别(不同点用黑体进行了区分),也就印证了那句话,实践中会对理论做优化,也和Paxos原理类似。

其实这区别的来源是spark增加了EliminateOuterJoin规则

spark outer join push down filter rule(spark 外连接中的下推规则)相关推荐

  1. Oracle数据库:oracle外连接left/right/full outer join on,oracle扩展的左右外连接展示符号(+)

    Oracle数据库:oracle外连接left/right/full outer join on,oracle扩展的左右外连接展示符号(+) 2022找工作是学历.能力和运气的超强结合体,遇到寒冬,大 ...

  2. R语言merge函数全连接dataframe数据(Full (outer) join)、merge函数进行全连接必须将参数all设置为true(all=TRUE)、默认merge函数通过公共列名合并数

    R语言merge函数全连接dataframe数据(Full (outer) join).merge函数进行全连接必须将参数all设置为true(all=TRUE).默认merge函数通过公共列名合并数 ...

  3. oracle full outer join,oracle 内连接(inner join)、外连接(outer join)、全连接(full join)...

    建表语句: create table EMPLOYEE ( EID NUMBER, DEPTID NUMBER, ENAME VARCHAR2(200) ) create table DEPT ( D ...

  4. Oracle数据库:oracle内连接inner join on,多表查询各种自链接、内连接、外连接的练习示例

    Oracle数据库:oracle内连接inner join on,多表查询各种自链接.内连接.外连接的练习示例 2022找工作是学历.能力和运气的超强结合体,遇到寒冬,大厂不招人,可能很多算法学生都得 ...

  5. full outer join 与full join的区别_基础小白的SQL的JOIN语法解析

    总的来说,四种JOIN的使用/区别可以描述为: left join 会从左表(shop)那里返回所有的记录,即使在右表(sale_detail)中没有匹配的行. right outer join 右连 ...

  6. SQL OUTER JOIN概述和示例

    This article will provide a full overview, with examples of the SQL Outer join, including the full, ...

  7. “INNER JOIN”和“OUTER JOIN”有什么区别?

    问题描述: 另外,LEFT JOIN.RIGHT JOIN 和 FULL JOIN 如何适应? 保持自己快人一步,享受全网独家提供的一站式外包任务.远程工作.创意产品订阅服务–huntsbot.com ...

  8. mysql outer join的用法_MySQL中join的用法

    JOIN的含义就如英文单词"join"一样,连接两张表,大致分为内连接,外连接,右连接,左连接,自然连接.这里描述先甩出一张用烂了的图,然后插入测试数据. 笛卡尔积:CROSS J ...

  9. SQL入门之第十一讲——OUTER JOIN 外连接

    外连接是左外连接(LEFT OUTER JOIN),右外连接(RIGHT OUTER JOIN),全外连接(FULL OUTER JOIN)的统称. 一般情况下直接忽视掉OUTER就可以,直接统称: ...

最新文章

  1. Neutron 物理部署方案 - 每天5分钟玩转 OpenStack(68)
  2. SAP云平台上的Low Code Development(低代码开发)解决方案
  3. 基于surging 的stage组件设计,谈谈我眼中的微服务
  4. 不属于python标准库的是_《Python Cookbook(第2版)中文版》——1.10 过滤字符串中不属于指定集合的字符-阿里云开发者社区...
  5. 兼容IE和Firefox
  6. 急速了解vue生命周期
  7. 2022年最新《谷粒学院开发教程》:1 - 构建工程篇
  8. 通过模板生成Excel表格——XLSTransformer
  9. 无损检测技术知识大全
  10. 曲面的法向量+高斯公式曲面的方向余弦的计算
  11. 如何导出mysql数据库
  12. 1.1.1 操作系统的层次结构、基本概念、功能和目标
  13. 产品经理,该如何做好「自己」这款产品?
  14. underscore.js 报_is not defined解决方法
  15. 振弦传感器不同线制分类
  16. 5G套餐打七折, 购机再现补贴“大战”
  17. 【Flask学习】1.2Flask的工作方式
  18. delay 芯片时序output_时序分析中的一些基本概念
  19. web期末作业网页设计——JavaScript
  20. Mocha Pro:移除模块

热门文章

  1. 分享两年折腾DIY-NAS的经验和小技巧
  2. QT程序退出后托盘图标不消失问题
  3. 基于ZigBee的桥梁健康监测系统
  4. Vue3和码上掘金实现猜数字小游戏
  5. 1-5分钟上手自动化测试——Airtest+Poco快速上手
  6. bash: ./xx: Permission denied解决方法
  7. 关于win10 链接安卓设备报错winusb.sys未经签名的解决办法
  8. 【问题】不能加载 MSCOMCTL.OCX(机房问题)
  9. python读取mssql文件_python 读取mssql数据库中文的搜索结果-阿里云开发者社区
  10. 大数据为什么需要学python?