/** Spark SQL源代码分析系列文章*/

前面几篇文章解说了Spark SQL的核心运行流程和Spark SQL的Catalyst框架的Sql Parser是如何接受用户输入sql,经过解析生成Unresolved Logical Plan的。

我们记得Spark SQL的运行流程中还有一个核心的组件式Analyzer,本文将会介绍Analyzer在Spark SQL里起到了什么作用。

Analyzer位于Catalyst的analysis package下。主要职责是将Sql Parser 未能Resolved的Logical Plan 给Resolved掉。

一、Analyzer构造

Analyzer会使用Catalog和FunctionRegistry将UnresolvedAttribute和UnresolvedRelation转换为catalyst里全类型的对象。

Analyzer里面有fixedPoint对象,一个Seq[Batch].

class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean)extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {// TODO: pass this in as a parameter.val fixedPoint = FixedPoint(100)val batches: Seq[Batch] = Seq(Batch("MultiInstanceRelations", Once,NewRelationInstances),Batch("CaseInsensitiveAttributeReferences", Once,(if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),Batch("Resolution", fixedPoint,ResolveReferences ::ResolveRelations ::NewRelationInstances ::ImplicitGenerate ::StarExpansion ::ResolveFunctions ::GlobalAggregates ::typeCoercionRules :_*),Batch("AnalysisOperators", fixedPoint,EliminateAnalysisOperators))

Analyzer里的一些对象解释:

FixedPoint:相当于迭代次数的上限。

  /** A strategy that runs until fix point or maxIterations times, whichever comes first. */case class FixedPoint(maxIterations: Int) extends Strategy

Batch: 批次,这个对象是由一系列Rule组成的,採用一个策略(策略事实上是迭代几次的别名吧,eg:Once)

  /** A batch of rules. */。protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)

Rule:理解为一种规则,这样的规则会应用到Logical Plan 从而将UnResolved 转变为Resolved

abstract class Rule[TreeType <: TreeNode[_]] extends Logging {/** Name for this rule, automatically inferred based on class name. */val ruleName: String = {val className = getClass.getNameif (className endsWith "$") className.dropRight(1) else className}def apply(plan: TreeType): TreeType
}

Strategy:最大的运行次数,假设运行次数在最大迭代次数之前就达到了fix point,策略就会停止,不再应用了。

  /*** An execution strategy for rules that indicates the maximum number of executions. If the* execution reaches fix point (i.e. converge) before maxIterations, it will stop.*/abstract class Strategy { def maxIterations: Int }

Analyzer解析主要是依据这些Batch里面定义的策略和Rule来对Unresolved的逻辑计划进行解析的。

这里Analyzer类本身并未定义运行的方法,而是要从它的父类RuleExecutor[LogicalPlan]寻找,Analyzer也实现了HiveTypeCosercion,这个类是參考Hive的类型自己主动兼容转换的原理。如图:

RuleExecutor:运行Rule的运行环境,它会将包括了一系列的Rule的Batch进行运行,这个过程都是串行的。

详细的运行方法定义在apply里:

能够看到这里是一个while循环,每一个batch下的rules都对当前的plan进行作用,这个过程是迭代的,直到达到Fix Point或者最大迭代次数。

 def apply(plan: TreeType): TreeType = {var curPlan = planbatches.foreach { batch =>val batchStartPlan = curPlanvar iteration = 1var lastPlan = curPlanvar continue = true// Run until fix point (or the max number of iterations as specified in the strategy.while (continue) {curPlan = batch.rules.foldLeft(curPlan) {case (plan, rule) =>val result = rule(plan) //这里将调用各个不同Rule的apply方法,将UnResolved Relations,Attrubute和Function进行Resolveif (!result.fastEquals(plan)) {logger.trace(s"""|=== Applying Rule ${rule.ruleName} ===|${sideBySide(plan.treeString, result.treeString).mkString("\n")}""".stripMargin)}result //返回作用后的result plan}iteration += 1if (iteration > batch.strategy.maxIterations) { //假设迭代次数已经大于该策略的最大迭代次数,就停止循环logger.info(s"Max iterations ($iteration) reached for batch ${batch.name}")continue = false}if (curPlan.fastEquals(lastPlan)) { //假设在多次迭代中不再变化,由于plan有个unique id,就停止循环。

logger.trace(s"Fixed point reached for batch ${batch.name} after $iteration iterations.") continue = false } lastPlan = curPlan } if (!batchStartPlan.fastEquals(curPlan)) { logger.debug( s""" |=== Result of Batch ${batch.name} === |${sideBySide(plan.treeString, curPlan.treeString).mkString("\n")} """.stripMargin) } else { logger.trace(s"Batch ${batch.name} has no effect.") } } curPlan //返回Resolved的Logical Plan }

二、Rules介绍

眼下Spark SQL 1.0.0的Rule都定义在了Analyzer.scala的内部类。
在batches里面定义了4个Batch。
MultiInstanceRelations、CaseInsensitiveAttributeReferences、Resolution、AnalysisOperators 四个。
这4个Batch是将不同的Rule进行归类。每种类别採用不同的策略来进行Resolve。

2.1、MultiInstanceRelation

假设一个实例在Logical Plan里出现了多次,则会应用NewRelationInstances这儿Rule
 Batch("MultiInstanceRelations", Once,NewRelationInstances)
trait MultiInstanceRelation {def newInstance: this.type
}
object NewRelationInstances extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = {val localRelations = plan collect { case l: MultiInstanceRelation => l} //将logical plan应用partial function得到全部MultiInstanceRelation的plan的集合 val multiAppearance = localRelations.groupBy(identity[MultiInstanceRelation]) //group by操作.filter { case (_, ls) => ls.size > 1 } //假设仅仅取size大于1的进行兴许操作.map(_._1).toSet//更新plan,使得每一个实例的expId是唯一的。plan transform {case l: MultiInstanceRelation if multiAppearance contains l => l.newInstance}}
}

2.2、LowercaseAttributeReferences

相同是partital function,对当前plan应用,将全部匹配的如UnresolvedRelation的别名alise转换为小写,将Subquery的别名也转换为小写。
总结:这是一个使属性名大写和小写不敏感的Rule,由于它将全部属性都to lower case了。
  object LowercaseAttributeReferences extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case UnresolvedRelation(databaseName, name, alias) =>UnresolvedRelation(databaseName, name, alias.map(_.toLowerCase))case Subquery(alias, child) => Subquery(alias.toLowerCase, child)case q: LogicalPlan => q transformExpressions {case s: Star => s.copy(table = s.table.map(_.toLowerCase))case UnresolvedAttribute(name) => UnresolvedAttribute(name.toLowerCase)case Alias(c, name) => Alias(c, name.toLowerCase)()case GetField(c, name) => GetField(c, name.toLowerCase)}}}

2.3、ResolveReferences

将Sql parser解析出来的UnresolvedAttribute全部都转为相应的实际的catalyst.expressions.AttributeReference AttributeReferences
这里调用了logical plan 的resolve方法。将属性转为NamedExepression。
  object ResolveReferences extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {case q: LogicalPlan if q.childrenResolved =>logger.trace(s"Attempting to resolve ${q.simpleString}")q transformExpressions {case u @ UnresolvedAttribute(name) =>// Leave unchanged if resolution fails.  Hopefully will be resolved next round.val result = q.resolve(name).getOrElse(u)//转化为NamedExpressionlogger.debug(s"Resolving $u to $result")result}}}

2.4、 ResolveRelations

这个比較好理解,还记得前面Sql parser吗。比方select * from src,这个src表parse后就是一个UnresolvedRelation节点。
这一步ResolveRelations调用了catalog这个对象。

Catalog对象里面维护了一个tableName,Logical Plan的HashMap结果。

通过这个Catalog文件夹来寻找当前表的结构。从而从中解析出这个表的字段,如UnResolvedRelations 会得到一个tableWithQualifiers。(即表和字段) 
这也解释了为什么流程图那,我会画一个catalog在上面。由于它是Analyzer工作时须要的meta data。
object ResolveRelations extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case UnresolvedRelation(databaseName, name, alias) =>catalog.lookupRelation(databaseName, name, alias)}}

2.5、ImplicitGenerate

假设在select语句里仅仅有一个表达式,并且这个表达式是一个Generator(Generator是一个1条记录生成到N条记录的映射)
当在解析逻辑计划时,遇到Project节点的时候,就能够将它转换为Generate类(Generate类是将输入流应用一个函数。从而生成一个新的流)。
  object ImplicitGenerate extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case Project(Seq(Alias(g: Generator, _)), child) =>Generate(g, join = false, outer = false, None, child)}}

2.6 StarExpansion

在Project操作符里。假设是*符号,即select * 语句。能够将全部的references都展开,即将select * 中的*展开成实际的字段。
  object StarExpansion extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {// Wait until children are resolvedcase p: LogicalPlan if !p.childrenResolved => p// If the projection list contains Stars, expand it.case p @ Project(projectList, child) if containsStar(projectList) => Project(projectList.flatMap {case s: Star => s.expand(child.output) //展开,将输入的Attributeexpand(input: Seq[Attribute]) 转化为Seq[NamedExpression]case o => o :: Nil},child)case t: ScriptTransformation if containsStar(t.input) =>t.copy(input = t.input.flatMap {case s: Star => s.expand(t.child.output)case o => o :: Nil})// If the aggregate function argument contains Stars, expand it.case a: Aggregate if containsStar(a.aggregateExpressions) =>a.copy(aggregateExpressions = a.aggregateExpressions.flatMap {case s: Star => s.expand(a.child.output)case o => o :: Nil})}/*** Returns true if `exprs` contains a [[Star]].*/protected def containsStar(exprs: Seq[Expression]): Boolean =exprs.collect { case _: Star => true }.nonEmpty}
}

2.7 ResolveFunctions

这个和ResolveReferences几乎相同,这里主要是对udf进行resolve。
将这些UDF都在FunctionRegistry里进行查找。
  object ResolveFunctions extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case q: LogicalPlan =>q transformExpressions {case u @ UnresolvedFunction(name, children) if u.childrenResolved =>registry.lookupFunction(name, children) //看是否注冊了当前udf}}}

2.8 GlobalAggregates

全局的聚合,假设遇到了Project就返回一个Aggregate.

  object GlobalAggregates extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case Project(projectList, child) if containsAggregates(projectList) =>Aggregate(Nil, projectList, child)}def containsAggregates(exprs: Seq[Expression]): Boolean = {exprs.foreach(_.foreach {case agg: AggregateExpression => return truecase _ =>})false}}

2.9 typeCoercionRules

这个是Hive里的兼容SQL语法,比方将String和Int互相转换,不须要显示的调用cast xxx  as yyy了。如StringToIntegerCasts。
  val typeCoercionRules =PropagateTypes ::ConvertNaNs ::WidenTypes ::PromoteStrings ::BooleanComparisons ::BooleanCasts ::StringToIntegralCasts ::FunctionArgumentConversion ::CastNulls ::Nil

2.10 EliminateAnalysisOperators

将分析的操作符移除。这里仅支持2种,一种是Subquery须要移除,一种是LowerCaseSchema。这些节点都会从Logical Plan里移除。
object EliminateAnalysisOperators extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case Subquery(_, child) => child //遇到Subquery,不反悔本身,返回它的Child,即删除了该元素case LowerCaseSchema(child) => child}
}

三、实践

补充昨天DEBUG的一个样例。这个样例证实了如何将上面的规则应用到Unresolved Logical Plan:
当传递sql语句的时候,的确调用了ResolveReferences将mobile解析成NamedExpression。
能够对比这看运行流程,左边是Unresolved Logical Plan,右边是Resoveld Logical Plan。
先是运行了Batch Resolution,eg: 调用ResovelRalation这个RUle来使 Unresovled Relation 转化为 SparkLogicalPlan并通过Catalog找到了其对于的字段属性。
然后运行了Batch Analysis Operator。eg:调用EliminateAnalysisOperators来将SubQuery给remove掉了。
可能格式显示的不太好,能够向右边拖动下滚动轴看下结果。 :) 
val exec = sqlContext.sql("select mobile as mb, sid as id, mobile*2 multi2mobile, count(1) times from (select * from temp_shengli_mobile)a where pfrom_id=0.0 group by mobile, sid,  mobile*2")
14/07/21 18:23:32 DEBUG SparkILoop$SparkILoopInterpreter: Invoking: public static java.lang.String $line47.$eval.$print()
14/07/21 18:23:33 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations
14/07/21 18:23:33 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences
14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving 'pfrom_id to pfrom_id#5
14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving 'mobile to mobile#2
14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving 'sid to sid#1
14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving 'mobile to mobile#2
14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving 'mobile to mobile#2
14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving 'sid to sid#1
14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving 'mobile to mobile#2
14/07/21 18:23:33 DEBUG Analyzer:
=== Result of Batch Resolution ===
!Aggregate ['mobile,'sid,('mobile * 2) AS c2#27], ['mobile AS mb#23,'sid AS id#24,('mobile * 2) AS multi2mobile#25,COUNT(1) AS times#26L]   Aggregate [mobile#2,sid#1,(CAST(mobile#2, DoubleType) * CAST(2, DoubleType)) AS c2#27], [mobile#2 AS mb#23,sid#1 AS id#24,(CAST(mobile#2, DoubleType) * CAST(2, DoubleType)) AS multi2mobile#25,COUNT(1) AS times#26L]
! Filter ('pfrom_id = 0.0)                                                                                                                   Filter (CAST(pfrom_id#5, DoubleType) = 0.0)Subquery a                                                                                                                                 Subquery a
!   Project [*]                                                                                                                                Project [data_date#0,sid#1,mobile#2,pverify_type#3,create_time#4,pfrom_id#5,p_status#6,pvalidate_time#7,feffect_time#8,plastupdate_ip#9,update_time#10,status#11,preserve_int#12]
!    UnresolvedRelation None, temp_shengli_mobile, None                                                                                         Subquery temp_shengli_mobile
!                                                                                                                                                SparkLogicalPlan (ExistingRdd [data_date#0,sid#1,mobile#2,pverify_type#3,create_time#4,pfrom_id#5,p_status#6,pvalidate_time#7,feffect_time#8,plastupdate_ip#9,update_time#10,status#11,preserve_int#12], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:174)14/07/21 18:23:33 DEBUG Analyzer:
=== Result of Batch AnalysisOperators ===
!Aggregate ['mobile,'sid,('mobile * 2) AS c2#27], ['mobile AS mb#23,'sid AS id#24,('mobile * 2) AS multi2mobile#25,COUNT(1) AS times#26L]   Aggregate [mobile#2,sid#1,(CAST(mobile#2, DoubleType) * CAST(2, DoubleType)) AS c2#27], [mobile#2 AS mb#23,sid#1 AS id#24,(CAST(mobile#2, DoubleType) * CAST(2, DoubleType)) AS multi2mobile#25,COUNT(1) AS times#26L]
! Filter ('pfrom_id = 0.0)                                                                                                                   Filter (CAST(pfrom_id#5, DoubleType) = 0.0)
!  Subquery a                                                                                                                                 Project [data_date#0,sid#1,mobile#2,pverify_type#3,create_time#4,pfrom_id#5,p_status#6,pvalidate_time#7,feffect_time#8,plastupdate_ip#9,update_time#10,status#11,preserve_int#12]
!   Project [*]                                                                                                                                SparkLogicalPlan (ExistingRdd [data_date#0,sid#1,mobile#2,pverify_type#3,create_time#4,pfrom_id#5,p_status#6,pvalidate_time#7,feffect_time#8,plastupdate_ip#9,update_time#10,status#11,preserve_int#12], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:174)
!    UnresolvedRelation None, temp_shengli_mobile, None                                                                                     

四、总结

本文从源代码角度分析了Analyzer在对Sql Parser解析出的UnResolve Logical Plan 进行analyze的过程中,所运行的流程。
流程是实例化一个SimpleAnalyzer,定义一些Batch,然后遍历这些Batch在RuleExecutor的环境下。运行Batch里面的Rules。每一个Rule会对Unresolved Logical Plan进行Resolve,有些可能不能一次解析出。须要多次迭代,直到达到max迭代次数或者达到fix point。这里Rule里比較经常使用的就是ResolveReferences、ResolveRelations、StarExpansion、GlobalAggregates、typeCoercionRules和EliminateAnalysisOperators。
——EOF——

转载于:https://www.cnblogs.com/yangykaifa/p/6955694.html

Spark SQL Catalyst源代码分析之Analyzer相关推荐

  1. Spark SQL Catalyst源代码分析之TreeNode Library

    /** Spark SQL源代码分析系列文章*/ 前几篇文章介绍了Spark SQL的Catalyst的核心执行流程.SqlParser,和Analyzer,本来打算直接写Optimizer的,可是发 ...

  2. Spark SQL Catalyst源代码分析Optimizer

    /** Spark SQL源代码分析系列*/ 前几篇文章介绍了Spark SQL的Catalyst的核心运行流程.SqlParser,和Analyzer 以及核心类库TreeNode,本文将具体解说S ...

  3. 递归下降语法分析器的构建_一文了解函数式查询优化器Spark SQL Catalyst

    大数据技术与架构点击右侧关注,大数据开发领域最强公众号! 暴走大数据点击右侧关注,暴走大数据!记录一下个人对sparkSql的catalyst这个函数式的可扩展的查询优化器的理解,目录如下: 0. O ...

  4. Spark SQL 处理流程分析 (一)

    下面的代码演示了通过Case Class进行表Schema定义的例子: // sc is an existing SparkContext. val sqlContext = new org.apac ...

  5. Spark SQL / Catalyst 内部原理 与 RBO

    Spark SQL 架构 Spark SQL 的整体架构如下图所示 从上图可见,无论是直接使用 SQL 语句还是使用 DataFrame,都会经过如下步骤转换成 DAG 对 RDD 的操作 Parse ...

  6. Spark SQL 源代码分析系列

    从决定写Spark SQL文章的源代码分析,到现在一个月的时间,一个又一个几乎相同的结束很快,在这里也做了一个综合指数,方便阅读,下面是读取顺序 :) 第一章 Spark SQL源代码分析之核心流程 ...

  7. Spark SQL 的SQL处理引擎分析

    目录 一.补充以Sql Server 为例,SQL语句的执行逻辑顺序 1.From阶段 2.Where 阶段 3.Group By 4.Having 5.Select 6.Order By 7.SQL ...

  8. Spark2.3(三十五)Spark Structured Streaming源代码剖析(从CSDN和Github中看到别人分析的源代码的文章值得收藏)...

    从CSDN中读取到关于spark structured streaming源代码分析不错的几篇文章 spark源码分析--事件总线LiveListenerBus spark事件总线的核心是LiveLi ...

  9. 【Spark】扩展Spark Catalyst,打造自定义的Spark SQL引擎

    1.概述 转载自:扩展Spark Catalyst,打造自定义的Spark SQL引擎 Apache Spark是大数据处理领域最常用的计算引擎之一,被应用在各种各样的场景中,除了易用的API,稳定高 ...

  10. Spark SQL 工作流程源码解析(四)optimization 阶段(基于 Spark 3.3.0)

    前言 本文隶属于专栏<大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见大数据技术体系 目录 Spark SQL 工 ...

最新文章

  1. 一句white-space:nowrap解决IE6,IE7下浮动元素不自动换行
  2. 英语语法---单词总结
  3. SpringMVC与Ajax交互
  4. strcmp()字符串比较函数
  5. Convex Hull (ACM-ICPC 2018 沈阳赛区网络预赛) 存个公式
  6. mysql的 怎么处理_本人的MySQL连接到底怎么处理才好……
  7. 转学到斯坦福大学计算机专业,斯坦福大学转学申请条件有哪些?
  8. display:column常用属性解释
  9. Java8 中通过 Stream 对列表进行去重的几种方法
  10. mysql 监控工具安装_Mysql监控工具Innotop工具安装
  11. Python——Django-settings.py的内容
  12. Jira 破解版 Docker 部署
  13. 组合优化问题的典型事例
  14. App 快捷方式——创建快捷方式
  15. 成长与直面: ORID见感思行教练法
  16. cdr 表格自动填充文字_常平办公软件培训分享之word表格自动填充序号
  17. 我来自江湖修改器|我来自江湖修改器3dm下载 v0.1二十四项
  18. eNews 第二十六期/2007.07
  19. STM32 PB3或者PB4不能正常使用的讲解
  20. Linux驱动开发(二)内核符号表

热门文章

  1. 从源码角度分析RocketMQ同步刷盘与异步刷盘的异同
  2. Mybatis原理解析(二)SqlSession的创建过程
  3. android 资源文件
  4. ceph mds的客户端告警记录
  5. php.ini_中文详解
  6. EditText有焦点(focusable为true)阻止输入法弹出
  7. MySQL复制篇之---半同步复制
  8. 软件教程给MyEclipse 10增加SVN功能
  9. [Android Studio]-基本快捷键大全
  10. linux下测试权限,linux 文件权限