1.概述

转载学习加深印象:一条 SQL 在 Apache Spark 之旅(上)

Spark SQL 是 Spark 众多组件中技术最复杂的组件之一,它同时支持 SQL 查询和 DataFrame DSL。通过引入了 SQL 的支持,大大降低了开发人员的学习和使用成本。目前,整个 SQL 、Spark ML、Spark Graph 以及 Structured Streaming 都是运行在 Catalyst Optimization & Tungsten Execution 之上的,如下图所示:


所以,正常的 SQL 执行先会经过 SQL Parser 解析 SQL,然后经过 Catalyst 优化器处理,最后到 Spark 执行。而 Catalyst 的过程又分为很多个过程,其中包括:

  1. Analysis:主要利用 Catalog 信息将 Unresolved Logical Plan 解析成 Analyzed logical plan;

  2. Logical Optimizations:利用一些 Rule (规则)将 Analyzed logical plan 解析成 Optimized Logical Plan;

  3. Physical Planning:前面的 logical plan 不能被 Spark 执行,而这个过程是把 logical plan 转换成多个 physical plans,然后利用代价模型(cost model)选择最佳的 physical plan;

  4. Code Generation:这个过程会把 SQL 查询生成 Java 字节码。

所以整个 SQL 的执行过程可以使用下图表示:


其中蓝色部分就是 Catalyst 优化器处理的部分,也是本文重点介绍的部分。下面我们以一条简单的 SQL 为例,从 High-level 角度介绍 一条 SQL 在 Spark 之旅。本文我们用到的 SQL 查询语句如下:

SELECT sum(v)FROM (SELECTt1.id,1 + 2 + t1.value AS vFROM t1 JOIN t2WHEREt1.id = t2.id ANDt1.cid = 1 ANDt1.did = t1.cid + 1 ANDt2.id > 5) iteblog

2.SQL 解析阶段 - SparkSqlParser

为了能够在 Spark 中运行 SQL 查询,第一步肯定是需要解析这条 SQL。在 Spark 1.x 版本中,SQL 的解析有两种方法:

基于 Scala parser combinator 实现
基于 Hive 的 SQL 解析

可以通过 spark.sql.dialect 来设置。虽然 SQL 的解析引擎可以选择,但是这种方案有以下几个问题:Scala parser combinator 解析器有时候会给出错误信息,而且在定义语法中存在冲突不会发出警告;而 Hive SQL 解析引擎依赖于 Hive,这导致扩展性不好。

为了解决这个问题,从 Spark 2.0.0 版本开始引入了第三方语法解析器工具 ANTLR(详情参见 SPARK-12362),Antlr 是一款强大的语法生成器工具,可用于读取、处理、执行和翻译结构化的文本或二进制文件,是当前 Java 语言中使用最为广泛的语法生成器工具,我们常见的大数据 SQL 解析都用到了这个工具,包括 Hive、Cassandra、Phoenix、Pig 以及 presto 等。目前最新版本的 Spark 使用的是 ANTLR4,通过这个对 SQL 进行词法分析并构建语法树。

具体的,Spark 基于 presto 的语法文件定义了 Spark SQL 语法文件 SqlBase.g4(路径 spark-2.4.3\sql\catalyst\src\main\antlr4\org\apache\spark\sql\catalyst\parser\SqlBase.g4),这个文件定义了 Spark SQL 支持的 SQL 语法。如果我们需要自定义新的语法,需要在这个文件定义好相关语法。然后使用 ANTLR4 对 SqlBase.g4 文件自动解析生成几个 Java 类,其中就包含重要的词法分析器 SqlBaseLexer.java 和语法分析器SqlBaseParser.java。运行上面的 SQL 会使用 SqlBaseLexer 来解析关键词以及各种标识符等;然后使用 SqlBaseParser 来构建语法树。整个过程就类似于下图。


生成语法树之后,使用 AstBuilder 将语法树转换成 LogicalPlan,这个 LogicalPlan 也被称为 Unresolved LogicalPlan。解析后的逻辑计划如下:

== Parsed Logical Plan ==
'Project [unresolvedalias('sum('v), None)]
+- 'SubqueryAlias `iteblog`+- 'Project ['t1.id, ((1 + 2) + 't1.value) AS v#16]+- 'Filter ((('t1.id = 't2.id) && ('t1.cid = 1)) && (('t1.did = ('t1.cid + 1)) && ('t2.id > 5)))+- 'Join Inner:- 'UnresolvedRelation `t1`+- 'UnresolvedRelation `t2`

图片表示如下:


Unresolved LogicalPlan 是从下往上看的,t1 和 t2 两张表被生成了 UnresolvedRelation,过滤的条件、选择的列以及聚合字段都知道了,SQL 之旅的第一个过程就算完成了。

3.绑定逻辑计划阶段 - Analyzer

在 SQL 解析阶段生成了 Unresolved LogicalPlan,从上图可以看出逻辑算子树中包含了 UnresolvedRelation 和 unresolvedalias 等对象。Unresolved LogicalPlan 仅仅是一种数据结构,不包含任何数据信息,比如不知道数据源、数据类型,不同的列来自于哪张表等。Analyzer 阶段会使用事先定义好的 Rule 以及 SessionCatalog 等信息对 Unresolved LogicalPlan 进行 transform。SessionCatalog 主要用于各种函数资源信息和元数据信息(数据库、数据表、数据视图、数据分区与函数等)的统一管理。而Rule 是定义在 Analyzer 里面的,如下具体如下:

lazy val batches: Seq[Batch] = Seq(Batch("Hints", fixedPoint,new ResolveHints.ResolveBroadcastHints(conf),ResolveHints.ResolveCoalesceHints,ResolveHints.RemoveAllHints),Batch("Simple Sanity Check", Once,LookupFunctions),Batch("Substitution", fixedPoint,CTESubstitution,WindowsSubstitution,EliminateUnions,new SubstituteUnresolvedOrdinals(conf)),Batch("Resolution", fixedPoint,ResolveTableValuedFunctions ::                    //解析表的函数ResolveRelations ::                               //解析表或视图ResolveReferences ::                              //解析列ResolveCreateNamedStruct ::ResolveDeserializer ::                            //解析反序列化操作类ResolveNewInstance ::ResolveUpCast ::                                  //解析类型转换ResolveGroupingAnalytics ::ResolvePivot ::ResolveOrdinalInOrderByAndGroupBy ::ResolveAggAliasInGroupBy ::ResolveMissingReferences ::ExtractGenerator ::ResolveGenerate ::ResolveFunctions ::                               //解析函数ResolveAliases ::                                 //解析表别名ResolveSubquery ::                                //解析子查询ResolveSubqueryColumnAliases ::ResolveWindowOrder ::ResolveWindowFrame ::ResolveNaturalAndUsingJoin ::ResolveOutputRelation ::ExtractWindowExpressions ::GlobalAggregates ::ResolveAggregateFunctions ::TimeWindowing ::ResolveInlineTables(conf) ::ResolveHigherOrderFunctions(catalog) ::ResolveLambdaVariables(conf) ::ResolveTimeZone(conf) ::ResolveRandomSeed ::TypeCoercion.typeCoercionRules(conf) ++extendedResolutionRules : _*),Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),Batch("View", Once,AliasViewChild(conf)),Batch("Nondeterministic", Once,PullOutNondeterministic),Batch("UDF", Once,HandleNullInputsForUDF),Batch("FixNullability", Once,FixNullability),Batch("Subquery", Once,UpdateOuterReferences),Batch("Cleanup", fixedPoint,CleanupAliases)
)

从上面代码可以看出,多个性质类似的 Rule 组成一个 Batch,比如上面名为 Hints 的 Batch就是由很多个 Hints Rule 组成;而多个 Batch 构成一个 batches。这些 batches 会由 RuleExecutor 执行,先按一个一个 Batch 顺序执行,然后对 Batch 里面的每个 Rule 顺序执行。每个 Batch 会之心一次(Once)或多次(FixedPoint,由
spark.sql.optimizer.maxIterations 参数决定),执行过程如下:


所以上面的 SQL 经过这个阶段生成的 Analyzed Logical Plan 如下:

== Analyzed Logical Plan ==
sum(v): bigint
Aggregate [sum(cast(v#16 as bigint)) AS sum(v)#22L]
+- SubqueryAlias `iteblog`+- Project [id#0, ((1 + 2) + value#1) AS v#16]+- Filter (((id#0 = id#8) && (cid#2 = 1)) && ((did#3 = (cid#2 + 1)) && (id#8 > 5)))+- Join Inner:- SubqueryAlias `t1`:  +- Relation[id#0,value#1,cid#2,did#3] csv+- SubqueryAlias `t2`+- Relation[id#8,value#9,cid#10,did#11] csv

从上面的结果可以看出,t1 和 t2 表已经解析成带有 id、value、cid 以及 did 四个列的表,其中这个表的数据源来自于 csv 文件。而且每个列的位置和数据类型已经确定了,sum 被解析成 Aggregate 函数了。下面是从 Unresolved LogicalPlan 转换到 Analyzed Logical Plan 对比图。


到这里, Analyzed LogicalPlan 就完全生成了。由于篇幅的原因,剩余的 SQL 处理我将在下一篇文章进行介绍,包括逻辑计划优化、代码生成等东西,敬请关注。

【Spark】一条 SQL 在 Apache Spark 之旅(上)相关推荐

  1. spark shell 启动 出现org.apache.spark.SparkException: Found both spark.executor.extraClassPath and SPARK

    启动spark shell报错: Setting default log level to "WARN". To adjust logging level use sc.setLo ...

  2. spark学习-52-Spark的org.apache.spark.SparkException: Task not serializable

    1.概述 报错这个一般是org.apache.spark.SparkException: Task not serializable 17/12/06 14:20:10 INFO MemoryStor ...

  3. MySQL数值扩大一百倍_Mysql优化----一条SQL百倍提升之旅

    在实现业务逻辑的时候,有些复杂一点逻辑会用数据库子查询去实现,但是sql用子查询会带来性能问题,下面就一个例子来说明,怎么优化子查询,来提升查询速度 mysql> desc update t_s ...

  4. Apache Spark 3.0 SQL DataFrame和DataSet指南

    目录 简介 SQL 数据集和数据框 入门 起点:SparkSession Scala语言 Java语言 Python语言 R语言 创建DataFrame Scala语言 Java语言 Python语言 ...

  5. apache spark_使用Apache Spark SQL探索标普500和石油价格

    apache spark 这篇文章将使用Apache Spark SQL和DataFrames查询,比较和探索过去5年中的S&P 500,Exxon和Anadarko Petroleum Co ...

  6. Apache Spark基础知识

    我的spark学习笔记,基于Spark 2.4.0 目录 一.简介 二.RDD编程 1 RDD介绍 2 RDD操作 2.0 读操作 2.1 常用Tramsformation算子 2.2 常用Actio ...

  7. 大火的Apache Spark也有诸多不完美

    现在如果你想要选择一个解决方案来处理企业中的大数据并不是难事,毕竟有很多数据处理框架可以任君选择,如Apache Samza,Apache Storm .Apache Spark等等.Apache S ...

  8. hadoop + spark+ hive 集群搭建(apache版本)

    0. 引言 hadoop 集群,初学者顺利将它搭起来,肯定要经过很多的坑.经过一个星期的折腾,我总算将集群正常跑起来了,所以,想将集群搭建的过程整理记录,分享出来,让大家作一个参考. 由于搭建过程比较 ...

  9. 使用Apache Spark让MySQL查询速度提升10倍以上

    作者:Alexander Rubin 链接:http://coyee.com/article/11012-how-apache-spark-makes-your-slow-mysql-queries- ...

最新文章

  1. Linux Test Project 测试套件说明
  2. 这几种Java异常处理方法,你会吗?
  3. 研究生申请:就一个字
  4. 【Linux独家秘方】Linux 定时备份oracle库的数据
  5. 表空间oracle查询,Oracle表和表空间查询
  6. 集合及其常见操作,创建,增加,删除,查找
  7. P2900 [USACO08MAR]土地征用Land Acquisition
  8. Linux下unzip乱码问题解决
  9. UG软件制图的几个重要技巧,你知道吗?
  10. 传智播客黑马程序员28期JavaEE基础及就业班视频教程
  11. Spire.Office for Java 7.5.4
  12. 在js中对数值进行取整、四舍五入等方法汇总
  13. 最近三年的百度产品经理面试与笔试题完整版
  14. 华为电脑用鸿蒙系统了吗,华为的鸿蒙系统可以用在电脑上吗?
  15. C语言snmp编程视频,使用net-snmp API编程_C语言教程_C++教程_C语言培训_C++教程培训_C/C++频道_中国IT实验室...
  16. python获取人民币汇率数据
  17. 淘宝卖家如何通过宝贝详情页装修提高店铺转化?
  18. 数据结构-链表及相关算法
  19. html页面推送功能的实现,想实现Web页面内容自动更新?你需要了解WEB实时推送技术!...
  20. 竞品分析-奈雪的茶VS喜茶:在新式茶饮的赛道上,谁会率先触线抵达终点?

热门文章

  1. 洲明科技与意法半导体合作开发新一代LED显示屏
  2. 苹果汽车项目团队添猛将?外媒称Apple Watch负责人已调入
  3. “先享后付”风靡深圳 “买鸭”618交易额增3倍助线下消费复苏
  4. 10元/小时?大涨价后,共享充电宝行业迎来“大地震”!
  5. 蛋壳公寓回应破产传闻:没有破产 也不会跑路
  6. 教育部认可公众号博主是自由职业,网友:公号被封算失业吗?
  7. 直播不是造车,特斯拉开店玩直播却栽了个“跟头”
  8. 搭载MIUI for Watch,支持eSIM独立通话!小米手表首发1299元起
  9. 雷军:小米CC9 Pro人像镜头简直太奢华了
  10. 防不胜防!微信借钱语音确认仍被骗:我可真是太难了