spark outer join push down filter rule(spark 外连接中的下推规则)
背景
对于spark的不同的join操作来说,有些谓词是能下推,是有谓词是不能下推的,今天我们实地操作一番,从现象和源码级别分析一下,这到底是怎么回事。
版本&环境
spark 3.2.0
macbook pro
理论基础
1. 参考hive OuterJoinBehavior
- Preserved Row table (留存表)
在join操作中返回所有行的表 - Null Supplying table (补空表)
在join操作中,对于不匹配的行,补bull的表 - During Join predicate (join中谓词)
在join中on 语句中的谓词,例如:在 R1 join R2 on R1.x = 5,R1.x = 5 我们称之为 join中谓词 - After Join predicate (join后谓词)
在join中,位于where中的谓词
2. join type
根据当前spark版本,我们把join类型分为以下多种类型,也就是我们进行验证的各种join类型
- inner
- outer | full | fullouter
- leftouter | left
- rightouter | right
- leftsemi | semi
- leftanti | anti
- cross
因为 fullouter join和inner join以及leftsemi/anti join 在join中谓词和join后谓词是没有区别的,所以我们不探讨
ross join 没有on操作这么一说,所以我们也不探讨
注意:理论只是理论,在实际应用中会做一些优化,这和理论是有区别
3.sql解析
对于spark来说,任何一个sql的解析都会经过以下几个阶段:
Unresolved Logical Plan -> Analyzer Logical Plan -> Optimzer Logical Plan -> SparkPlan -> ExecutedPlan
先说结论
outer join | 留存表 | 补空表 |
---|---|---|
join中谓词 | 不下推 | 下推 |
join后谓词 | 下推 | 下推 |
实践分析
运行以下代码:
def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("delta-merge").config("spark.master", "local[1]").config("spark.app.name", "demo").config("spark.sql.adaptive.autoBroadcastJoinThreshold", -1).config("spark.sql.autoBroadcastJoinThreshold", -1).config(SQLConf.PLAN_CHANGE_LOG_LEVEL.key, "warn").getOrCreate()spark.sparkContext.setLogLevel("info")import spark.implicits._val df1 = Seq((BigDecimal("11"), 1),(BigDecimal("22"), 2),(BigDecimal("33"), 3)).toDF("decNum1", "intNum1")df1.write.mode(SaveMode.Overwrite).parquet("df1.parquet")val df2 = Seq((BigDecimal("1111"), 1),(BigDecimal("2222"), 2),(BigDecimal("4444"), 4)).toDF("decNum2", "intNum2")df2.write.mode(SaveMode.Overwrite).parquet("df2.parquet")spark.sql("select null > 2").show(2)val dfP1 = spark.read.parquet("df1.parquet")val dfP2 = spark.read.parquet("df2.parquet")dfP1.createOrReplaceTempView("tbl1")dfP2.createOrReplaceTempView("tbl2")val dfResult = spark.sql("select * from tbl1 join tbl2 on intNum1 == intNum2 where intNum1 > 1")dfResult.show(40, false)dfResult.explain("extended")println("==========")dfResult.queryExecution.tracker.rules map {case (key, value: RuleSummary) if (value.numEffectiveInvocations > 0) =>println(s"$key, $value")case (_, _) =>}Thread.sleep(10000000L)}
spark.sql.adaptive.autoBroadcastJoinThreshold 和spark.sql.autoBroadcastJoinThreshold设置为-1
是为了把SMJ(sort merge join)转换为BHJ(broastcast hash join)给禁掉,这样就能看到我们想要的结果。
SQLConf.PLAN_CHANGE_LOG_LEVEL.key和sparkcontext的log级别进行调整
是为了能够打印出sql所经历的逻辑计划优化规则以及物理规则,这样我们就很清楚的知道该条sql被洗礼的过程。
df3.explain(“extended”) 是为了更加清晰直观的打印出各个阶段的计划,方便追踪。
df3.queryExecution.tracker.rules 是为了打印出sql在逻辑计划阶段所经历的解析以及优化规则,排序不分先后,因为后端是用java.util.HashMap存储的。
- leftouter-join中谓词-留存表
运行
val dfResult = spark.sql("select * from tbl1 left outer join tbl2 on intNum1 == intNum2 and intNum1 > 1")
ResolveRelations规则只是用catalog元数据解析出parquet表,如下:
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ==='Project [*] 'Project [*]+- 'Join LeftOuter, (('intNum1 = 'intNum2) AND ('intNum1 > 1)) +- 'Join LeftOuter, (('intNum1 = 'intNum2) AND ('intNum1 > 1))
! :- 'UnresolvedRelation [tbl1], [], false :- SubqueryAlias tbl1
! +- 'UnresolvedRelation [tbl2], [], false : +- View (`tbl1`, [decNum1#33,intNum1#34])
! : +- Relation [decNum1#33,intNum1#34] parquet
! +- SubqueryAlias tbl2
! +- View (`tbl2`, [decNum2#37,intNum2#38])
! +- Relation [decNum2#37,intNum2#38] parquet
PushDownPredicates规则有所变化,只是变化了一下on中两个条件的位置,如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Join LeftOuter, ((intNum1#34 = intNum2#38) AND (intNum1#34 > 1)) Join LeftOuter, ((intNum1#34 > 1) AND (intNum1#34 = intNum2#38)):- Relation [decNum1#33,intNum1#34] parquet :- Relation [decNum1#33,intNum1#34] parquet+- Relation [decNum2#37,intNum2#38] parquet +- Relation [decNum2#37,intNum2#38] parquet
InferFiltersFromConstraints做了谓词下推,但是下推的是补空表,而不是保留表,如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===Join LeftOuter, ((intNum1#34 > 1) AND (intNum1#34 = intNum2#38)) Join LeftOuter, ((intNum1#34 > 1) AND (intNum1#34 = intNum2#38)):- Relation [decNum1#33,intNum1#34] parquet :- Relation [decNum1#33,intNum1#34] parquet
!+- Relation [decNum2#37,intNum2#38] parquet +- Filter ((intNum2#38 > 1) AND isnotnull(intNum2#38))
! +- Relation [decNum2#37,intNum2#38] parquet
其实从源码上我们也可以看到其实现,如下:
case LeftOuter | LeftAnti =>val allConstraints = getAllConstraints(left, right, conditionOpt)val newRight = inferNewFilter(right, allConstraints)join.copy(right = newRight)
结果:
|decNum1 |intNum1|decNum2 |intNum2|
+---------------------+-------+-----------------------+-------+
|11.000000000000000000|1 |null |null |
|22.000000000000000000|2 |2222.000000000000000000|2 |
|33.000000000000000000|3 |null |null |
+---------------------+-------+-----------------------+-------+
对应的物理计划:
- leftouter-join中谓词-补空表
运行
val dfResult = spark.sql("select * from tbl1 left outer join tbl2 on intNum1 == intNum2 and intNum2 > 1")
这个时候PushDownPredicates规则又有所变化,直接把谓词下推下去了,如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Join LeftOuter, ((intNum1#34 = intNum2#38) AND (intNum2#38 > 1)) Join LeftOuter, (intNum1#34 = intNum2#38):- Relation [decNum1#33,intNum1#34] parquet :- Relation [decNum1#33,intNum1#34] parquet
!+- Relation [decNum2#37,intNum2#38] parquet +- Filter (intNum2#38 > 1)
! +- Relation [decNum2#37,intNum2#38] parquet
源码实现部分参考如下:
case LeftOuter | LeftAnti | ExistenceJoin(_) =>// push down the right side only join filter for right sub queryval newLeft = leftval newRight = rightJoinConditions.reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)Join(newLeft, newRight, joinType, newJoinCond, hint)
InferFiltersFromConstraints的规则,也就只是加了isnotnull(intNum2#38)判断,如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===Join LeftOuter, (intNum1#34 = intNum2#38) Join LeftOuter, (intNum1#34 = intNum2#38):- Relation [decNum1#33,intNum1#34] parquet :- Relation [decNum1#33,intNum1#34] parquet
!+- Filter (intNum2#38 > 1) +- Filter (isnotnull(intNum2#38) AND (intNum2#38 > 1))+- Relation [decNum2#37,intNum2#38] parquet +- Relation [decNum2#37,intNum2#38] parquet
结果:
+---------------------+-------+-----------------------+-------+
|decNum1 |intNum1|decNum2 |intNum2|
+---------------------+-------+-----------------------+-------+
|11.000000000000000000|1 |null |null |
|22.000000000000000000|2 |2222.000000000000000000|2 |
|33.000000000000000000|3 |null |null |
+---------------------+-------+-----------------------+-------+
对应的物理计划:
- leftouter-join后谓词-留存表
运行
val dfResult = spark.sql("select * from tbl1 left outer join tbl2 on intNum1 == intNum2 where intNum1 > 1")
PushDownPredicates规则把filter进行了下推,如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Filter (intNum1#34 > 1) Join LeftOuter, (intNum1#34 = intNum2#38)
!+- Join LeftOuter, (intNum1#34 = intNum2#38) :- Filter (intNum1#34 > 1)
! :- Relation [decNum1#33,intNum1#34] parquet : +- Relation [decNum1#33,intNum1#34] parquet
! +- Relation [decNum2#37,intNum2#38] parquet +- Relation [decNum2#37,intNum2#38] parquet
InferFiltersFromConstraints规则把谓词进行了推导,补空表也进行了下推,如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===Join LeftOuter, (intNum1#34 = intNum2#38) Join LeftOuter, (intNum1#34 = intNum2#38)
!:- Filter (intNum1#34 > 1) :- Filter (isnotnull(intNum1#34) AND (intNum1#34 > 1)): +- Relation [decNum1#33,intNum1#34] parquet : +- Relation [decNum1#33,intNum1#34] parquet
!+- Relation [decNum2#37,intNum2#38] parquet +- Filter ((intNum2#38 > 1) AND isnotnull(intNum2#38))
! +- Relation [decNum2#37,intNum2#38] parquet
运行结果如下:
+---------------------+-------+-----------------------+-------+
|decNum1 |intNum1|decNum2 |intNum2|
+---------------------+-------+-----------------------+-------+
|22.000000000000000000|2 |2222.000000000000000000|2 |
|33.000000000000000000|3 |null |null |
+---------------------+-------+-----------------------+-------+
对应的物理计划:
- leftouter-join后谓词-补空表
运行:
val dfResult = spark.sql("select * from tbl1 left outer join tbl2 on intNum1 == intNum2 where intNum2 > 1")
但是多了一条EliminateOuterJoin规则,这个规则会把left join操作,变换为inner join,如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin ===Filter (intNum2#38 > 1) Filter (intNum2#38 > 1)
!+- Join LeftOuter, (intNum1#34 = intNum2#38) +- Join Inner, (intNum1#34 = intNum2#38):- Relation [decNum1#33,intNum1#34] parquet :- Relation [decNum1#33,intNum1#34] parquet+- Relation [decNum2#37,intNum2#38] parquet +- Relation [decNum2#37,intNum2#38] parquet
PushDownPredicates规则和InferFiltersFromConstraints分析和leftouter-join后谓词-留存表 一样,只不过join类型变成了inner join(由于EliminateOuterJoin变换的),也是会进行下推.
结果如下:
+---------------------+-------+-----------------------+-------+
|decNum1 |intNum1|decNum2 |intNum2|
+---------------------+-------+-----------------------+-------+
|22.000000000000000000|2 |2222.000000000000000000|2 |
+---------------------+-------+-----------------------+-------+
对应的物理计划:
- rightouter join中谓词-留存表
运行:
val dfResult = spark.sql("select * from tbl1 right outer join tbl2 on intNum1 == intNum2 and intNum2 > 1")
PushDownPredicates规则只是把join条件的位置进行了变化,如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Join RightOuter, ((intNum1#34 = intNum2#38) AND (intNum2#38 > 1)) Join RightOuter, ((intNum2#38 > 1) AND (intNum1#34 = intNum2#38)):- Relation [decNum1#33,intNum1#34] parquet :- Relation [decNum1#33,intNum1#34] parquet+- Relation [decNum2#37,intNum2#38] parquet +- Relation [decNum2#37,intNum2#38] parquet
而InferFiltersFromConstraints会衍生出下推,如:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===Join RightOuter, ((intNum2#38 > 1) AND (intNum1#34 = intNum2#38)) Join RightOuter, ((intNum2#38 > 1) AND (intNum1#34 = intNum2#38))
!:- Relation [decNum1#33,intNum1#34] parquet :- Filter ((intNum1#34 > 1) AND isnotnull(intNum1#34))
!+- Relation [decNum2#37,intNum2#38] parquet : +- Relation [decNum1#33,intNum1#34] parquet
! +- Relation [decNum2#37,intNum2#38] parquet
结果:
+---------------------+-------+-----------------------+-------+
|decNum1 |intNum1|decNum2 |intNum2|
+---------------------+-------+-----------------------+-------+
|null |null |1111.000000000000000000|1 |
|22.000000000000000000|2 |2222.000000000000000000|2 |
|null |null |4444.000000000000000000|4 |
+---------------------+-------+-----------------------+-------+
对应的物理计划:
- rightouter join中谓词-补空表
运行:
val dfResult = spark.sql("select * from tbl1 right outer join tbl2 on intNum1 == intNum2 and intNum1 > 1")
PushDownPredicates规则会把补空表进行下推,如:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Join RightOuter, ((intNum1#34 = intNum2#38) AND (intNum1#34 > 1)) Join RightOuter, (intNum1#34 = intNum2#38)
!:- Relation [decNum1#33,intNum1#34] parquet :- Filter (intNum1#34 > 1)
!+- Relation [decNum2#37,intNum2#38] parquet : +- Relation [decNum1#33,intNum1#34] parquet
! +- Relation [decNum2#37,intNum2#38] parquet
InferFiltersFromConstraints规则,会添加isnull的判断:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===Join RightOuter, (intNum1#34 = intNum2#38) Join RightOuter, (intNum1#34 = intNum2#38)
!:- Filter (intNum1#34 > 1) :- Filter (isnotnull(intNum1#34) AND (intNum1#34 > 1)): +- Relation [decNum1#33,intNum1#34] parquet : +- Relation [decNum1#33,intNum1#34] parquet+- Relation [decNum2#37,intNum2#38] parquet +- Relation [decNum2#37,intNum2#38] parquet
结果:
+---------------------+-------+-----------------------+-------+
|decNum1 |intNum1|decNum2 |intNum2|
+---------------------+-------+-----------------------+-------+
|null |null |1111.000000000000000000|1 |
|22.000000000000000000|2 |2222.000000000000000000|2 |
|null |null |4444.000000000000000000|4 |
+---------------------+-------+-----------------------+-------+
对应的物理计划:
- rightouter join后谓词-留存表
运行:
val dfResult = spark.sql("select * from tbl1 right outer join tbl2 on intNum1 == intNum2 where intNum2 > 1")
PushDownPredicates规则会把留存表的谓词下推到join之后,如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Filter (intNum2#38 > 1) Join RightOuter, (intNum1#34 = intNum2#38)
!+- Join RightOuter, (intNum1#34 = intNum2#38) :- Relation [decNum1#33,intNum1#34] parquet
! :- Relation [decNum1#33,intNum1#34] parquet +- Filter (intNum2#38 > 1)+- Relation [decNum2#37,intNum2#38] parquet +- Relation [decNum2#37,intNum2#38] parquet
InferFiltersFromConstraints则会进行衍生,如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===Join RightOuter, (intNum1#34 = intNum2#38) Join RightOuter, (intNum1#34 = intNum2#38)
!:- Relation [decNum1#33,intNum1#34] parquet :- Filter ((intNum1#34 > 1) AND isnotnull(intNum1#34))
!+- Filter (intNum2#38 > 1) : +- Relation [decNum1#33,intNum1#34] parquet
! +- Relation [decNum2#37,intNum2#38] parquet +- Filter (isnotnull(intNum2#38) AND (intNum2#38 > 1))
! +- Relation [decNum2#37,intNum2#38] parquet
结果:
+---------------------+-------+-----------------------+-------+
|decNum1 |intNum1|decNum2 |intNum2|
+---------------------+-------+-----------------------+-------+
|22.000000000000000000|2 |2222.000000000000000000|2 |
|null |null |4444.000000000000000000|4 |
+---------------------+-------+-----------------------+-------+
对应的物理计划:
- rightouter join后谓词-补空表
运行:
val dfResult = spark.sql("select * from tbl1 right outer join tbl2 on intNum1 == intNum2 where intNum1 > 1")
EliminateOuterJoin的规则和PushDownPredicates以及InferFiltersFromConstraints的分析和 leftouter-join后谓词-补空表一样,此处不再累赘
结果:
+---------------------+-------+-----------------------+-------+
|decNum1 |intNum1|decNum2 |intNum2|
+---------------------+-------+-----------------------+-------+
|22.000000000000000000|2 |2222.000000000000000000|2 |
+---------------------+-------+-----------------------+-------+
对应的物理计划:
结论
left join | 留存表 | 补空表 |
---|---|---|
join中谓词 | 不下推 | 下推 |
join后谓词 | 下推 | 下推 |
right join | 留存表 | 补空表 |
---|---|---|
join中谓词 | 不下推 | 下推 |
join后谓词 | 下推 | 下推 |
合并一下就是
outer join | 留存表 | 补空表 |
---|---|---|
join中谓词 | 不下推 | 下推 |
join后谓词 | 下推 | 下推 |
对比之下,其实 理论上说的 join后谓词 补空表不下推和实践中得出来的下推还是有区别(不同点用黑体进行了区分),也就印证了那句话,实践中会对理论做优化,也和Paxos原理类似。
其实这区别的来源是spark增加了EliminateOuterJoin规则
spark outer join push down filter rule(spark 外连接中的下推规则)相关推荐
- Oracle数据库:oracle外连接left/right/full outer join on,oracle扩展的左右外连接展示符号(+)
Oracle数据库:oracle外连接left/right/full outer join on,oracle扩展的左右外连接展示符号(+) 2022找工作是学历.能力和运气的超强结合体,遇到寒冬,大 ...
- R语言merge函数全连接dataframe数据(Full (outer) join)、merge函数进行全连接必须将参数all设置为true(all=TRUE)、默认merge函数通过公共列名合并数
R语言merge函数全连接dataframe数据(Full (outer) join).merge函数进行全连接必须将参数all设置为true(all=TRUE).默认merge函数通过公共列名合并数 ...
- oracle full outer join,oracle 内连接(inner join)、外连接(outer join)、全连接(full join)...
建表语句: create table EMPLOYEE ( EID NUMBER, DEPTID NUMBER, ENAME VARCHAR2(200) ) create table DEPT ( D ...
- Oracle数据库:oracle内连接inner join on,多表查询各种自链接、内连接、外连接的练习示例
Oracle数据库:oracle内连接inner join on,多表查询各种自链接.内连接.外连接的练习示例 2022找工作是学历.能力和运气的超强结合体,遇到寒冬,大厂不招人,可能很多算法学生都得 ...
- full outer join 与full join的区别_基础小白的SQL的JOIN语法解析
总的来说,四种JOIN的使用/区别可以描述为: left join 会从左表(shop)那里返回所有的记录,即使在右表(sale_detail)中没有匹配的行. right outer join 右连 ...
- SQL OUTER JOIN概述和示例
This article will provide a full overview, with examples of the SQL Outer join, including the full, ...
- “INNER JOIN”和“OUTER JOIN”有什么区别?
问题描述: 另外,LEFT JOIN.RIGHT JOIN 和 FULL JOIN 如何适应? 保持自己快人一步,享受全网独家提供的一站式外包任务.远程工作.创意产品订阅服务–huntsbot.com ...
- mysql outer join的用法_MySQL中join的用法
JOIN的含义就如英文单词"join"一样,连接两张表,大致分为内连接,外连接,右连接,左连接,自然连接.这里描述先甩出一张用烂了的图,然后插入测试数据. 笛卡尔积:CROSS J ...
- SQL入门之第十一讲——OUTER JOIN 外连接
外连接是左外连接(LEFT OUTER JOIN),右外连接(RIGHT OUTER JOIN),全外连接(FULL OUTER JOIN)的统称. 一般情况下直接忽视掉OUTER就可以,直接统称: ...
最新文章
- Neutron 物理部署方案 - 每天5分钟玩转 OpenStack(68)
- SAP云平台上的Low Code Development(低代码开发)解决方案
- 基于surging 的stage组件设计,谈谈我眼中的微服务
- 不属于python标准库的是_《Python Cookbook(第2版)中文版》——1.10 过滤字符串中不属于指定集合的字符-阿里云开发者社区...
- 兼容IE和Firefox
- 急速了解vue生命周期
- 2022年最新《谷粒学院开发教程》:1 - 构建工程篇
- 通过模板生成Excel表格——XLSTransformer
- 无损检测技术知识大全
- 曲面的法向量+高斯公式曲面的方向余弦的计算
- 如何导出mysql数据库
- 1.1.1 操作系统的层次结构、基本概念、功能和目标
- 产品经理,该如何做好「自己」这款产品?
- underscore.js 报_is not defined解决方法
- 振弦传感器不同线制分类
- 5G套餐打七折, 购机再现补贴“大战”
- 【Flask学习】1.2Flask的工作方式
- delay 芯片时序output_时序分析中的一些基本概念
- web期末作业网页设计——JavaScript
- Mocha Pro:移除模块
热门文章
- 分享两年折腾DIY-NAS的经验和小技巧
- QT程序退出后托盘图标不消失问题
- 基于ZigBee的桥梁健康监测系统
- Vue3和码上掘金实现猜数字小游戏
- 1-5分钟上手自动化测试——Airtest+Poco快速上手
- bash: ./xx: Permission denied解决方法
- 关于win10 链接安卓设备报错winusb.sys未经签名的解决办法
- 【问题】不能加载 MSCOMCTL.OCX(机房问题)
- python读取mssql文件_python 读取mssql数据库中文的搜索结果-阿里云开发者社区
- 大数据为什么需要学python?