Spark SQL Catalyst源代码分析之Analyzer
/** Spark SQL源代码分析系列文章*/
前面几篇文章解说了Spark SQL的核心运行流程和Spark SQL的Catalyst框架的Sql Parser是如何接受用户输入sql,经过解析生成Unresolved Logical Plan的。
我们记得Spark SQL的运行流程中还有一个核心的组件式Analyzer,本文将会介绍Analyzer在Spark SQL里起到了什么作用。
Analyzer位于Catalyst的analysis package下。主要职责是将Sql Parser 未能Resolved的Logical Plan 给Resolved掉。
一、Analyzer构造
Analyzer会使用Catalog和FunctionRegistry将UnresolvedAttribute和UnresolvedRelation转换为catalyst里全类型的对象。
Analyzer里面有fixedPoint对象,一个Seq[Batch].
class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean)extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {// TODO: pass this in as a parameter.val fixedPoint = FixedPoint(100)val batches: Seq[Batch] = Seq(Batch("MultiInstanceRelations", Once,NewRelationInstances),Batch("CaseInsensitiveAttributeReferences", Once,(if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),Batch("Resolution", fixedPoint,ResolveReferences ::ResolveRelations ::NewRelationInstances ::ImplicitGenerate ::StarExpansion ::ResolveFunctions ::GlobalAggregates ::typeCoercionRules :_*),Batch("AnalysisOperators", fixedPoint,EliminateAnalysisOperators))
Analyzer里的一些对象解释:
FixedPoint:相当于迭代次数的上限。
/** A strategy that runs until fix point or maxIterations times, whichever comes first. */case class FixedPoint(maxIterations: Int) extends Strategy
Batch: 批次,这个对象是由一系列Rule组成的,採用一个策略(策略事实上是迭代几次的别名吧,eg:Once)
/** A batch of rules. */。protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
Rule:理解为一种规则,这样的规则会应用到Logical Plan 从而将UnResolved 转变为Resolved
abstract class Rule[TreeType <: TreeNode[_]] extends Logging {/** Name for this rule, automatically inferred based on class name. */val ruleName: String = {val className = getClass.getNameif (className endsWith "$") className.dropRight(1) else className}def apply(plan: TreeType): TreeType
}
Strategy:最大的运行次数,假设运行次数在最大迭代次数之前就达到了fix point,策略就会停止,不再应用了。
/*** An execution strategy for rules that indicates the maximum number of executions. If the* execution reaches fix point (i.e. converge) before maxIterations, it will stop.*/abstract class Strategy { def maxIterations: Int }
Analyzer解析主要是依据这些Batch里面定义的策略和Rule来对Unresolved的逻辑计划进行解析的。
这里Analyzer类本身并未定义运行的方法,而是要从它的父类RuleExecutor[LogicalPlan]寻找,Analyzer也实现了HiveTypeCosercion,这个类是參考Hive的类型自己主动兼容转换的原理。如图:
RuleExecutor:运行Rule的运行环境,它会将包括了一系列的Rule的Batch进行运行,这个过程都是串行的。
详细的运行方法定义在apply里:
能够看到这里是一个while循环,每一个batch下的rules都对当前的plan进行作用,这个过程是迭代的,直到达到Fix Point或者最大迭代次数。
def apply(plan: TreeType): TreeType = {var curPlan = planbatches.foreach { batch =>val batchStartPlan = curPlanvar iteration = 1var lastPlan = curPlanvar continue = true// Run until fix point (or the max number of iterations as specified in the strategy.while (continue) {curPlan = batch.rules.foldLeft(curPlan) {case (plan, rule) =>val result = rule(plan) //这里将调用各个不同Rule的apply方法,将UnResolved Relations,Attrubute和Function进行Resolveif (!result.fastEquals(plan)) {logger.trace(s"""|=== Applying Rule ${rule.ruleName} ===|${sideBySide(plan.treeString, result.treeString).mkString("\n")}""".stripMargin)}result //返回作用后的result plan}iteration += 1if (iteration > batch.strategy.maxIterations) { //假设迭代次数已经大于该策略的最大迭代次数,就停止循环logger.info(s"Max iterations ($iteration) reached for batch ${batch.name}")continue = false}if (curPlan.fastEquals(lastPlan)) { //假设在多次迭代中不再变化,由于plan有个unique id,就停止循环。
logger.trace(s"Fixed point reached for batch ${batch.name} after $iteration iterations.") continue = false } lastPlan = curPlan } if (!batchStartPlan.fastEquals(curPlan)) { logger.debug( s""" |=== Result of Batch ${batch.name} === |${sideBySide(plan.treeString, curPlan.treeString).mkString("\n")} """.stripMargin) } else { logger.trace(s"Batch ${batch.name} has no effect.") } } curPlan //返回Resolved的Logical Plan }
二、Rules介绍
2.1、MultiInstanceRelation
Batch("MultiInstanceRelations", Once,NewRelationInstances)
trait MultiInstanceRelation {def newInstance: this.type
}
object NewRelationInstances extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = {val localRelations = plan collect { case l: MultiInstanceRelation => l} //将logical plan应用partial function得到全部MultiInstanceRelation的plan的集合 val multiAppearance = localRelations.groupBy(identity[MultiInstanceRelation]) //group by操作.filter { case (_, ls) => ls.size > 1 } //假设仅仅取size大于1的进行兴许操作.map(_._1).toSet//更新plan,使得每一个实例的expId是唯一的。plan transform {case l: MultiInstanceRelation if multiAppearance contains l => l.newInstance}}
}
2.2、LowercaseAttributeReferences
object LowercaseAttributeReferences extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case UnresolvedRelation(databaseName, name, alias) =>UnresolvedRelation(databaseName, name, alias.map(_.toLowerCase))case Subquery(alias, child) => Subquery(alias.toLowerCase, child)case q: LogicalPlan => q transformExpressions {case s: Star => s.copy(table = s.table.map(_.toLowerCase))case UnresolvedAttribute(name) => UnresolvedAttribute(name.toLowerCase)case Alias(c, name) => Alias(c, name.toLowerCase)()case GetField(c, name) => GetField(c, name.toLowerCase)}}}
2.3、ResolveReferences
object ResolveReferences extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {case q: LogicalPlan if q.childrenResolved =>logger.trace(s"Attempting to resolve ${q.simpleString}")q transformExpressions {case u @ UnresolvedAttribute(name) =>// Leave unchanged if resolution fails. Hopefully will be resolved next round.val result = q.resolve(name).getOrElse(u)//转化为NamedExpressionlogger.debug(s"Resolving $u to $result")result}}}
2.4、 ResolveRelations
Catalog对象里面维护了一个tableName,Logical Plan的HashMap结果。
object ResolveRelations extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case UnresolvedRelation(databaseName, name, alias) =>catalog.lookupRelation(databaseName, name, alias)}}
2.5、ImplicitGenerate
object ImplicitGenerate extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case Project(Seq(Alias(g: Generator, _)), child) =>Generate(g, join = false, outer = false, None, child)}}
2.6 StarExpansion
object StarExpansion extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {// Wait until children are resolvedcase p: LogicalPlan if !p.childrenResolved => p// If the projection list contains Stars, expand it.case p @ Project(projectList, child) if containsStar(projectList) => Project(projectList.flatMap {case s: Star => s.expand(child.output) //展开,将输入的Attributeexpand(input: Seq[Attribute]) 转化为Seq[NamedExpression]case o => o :: Nil},child)case t: ScriptTransformation if containsStar(t.input) =>t.copy(input = t.input.flatMap {case s: Star => s.expand(t.child.output)case o => o :: Nil})// If the aggregate function argument contains Stars, expand it.case a: Aggregate if containsStar(a.aggregateExpressions) =>a.copy(aggregateExpressions = a.aggregateExpressions.flatMap {case s: Star => s.expand(a.child.output)case o => o :: Nil})}/*** Returns true if `exprs` contains a [[Star]].*/protected def containsStar(exprs: Seq[Expression]): Boolean =exprs.collect { case _: Star => true }.nonEmpty}
}
2.7 ResolveFunctions
object ResolveFunctions extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case q: LogicalPlan =>q transformExpressions {case u @ UnresolvedFunction(name, children) if u.childrenResolved =>registry.lookupFunction(name, children) //看是否注冊了当前udf}}}
2.8 GlobalAggregates
object GlobalAggregates extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case Project(projectList, child) if containsAggregates(projectList) =>Aggregate(Nil, projectList, child)}def containsAggregates(exprs: Seq[Expression]): Boolean = {exprs.foreach(_.foreach {case agg: AggregateExpression => return truecase _ =>})false}}
2.9 typeCoercionRules
val typeCoercionRules =PropagateTypes ::ConvertNaNs ::WidenTypes ::PromoteStrings ::BooleanComparisons ::BooleanCasts ::StringToIntegralCasts ::FunctionArgumentConversion ::CastNulls ::Nil
2.10 EliminateAnalysisOperators
object EliminateAnalysisOperators extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case Subquery(_, child) => child //遇到Subquery,不反悔本身,返回它的Child,即删除了该元素case LowerCaseSchema(child) => child}
}
三、实践
val exec = sqlContext.sql("select mobile as mb, sid as id, mobile*2 multi2mobile, count(1) times from (select * from temp_shengli_mobile)a where pfrom_id=0.0 group by mobile, sid, mobile*2")
14/07/21 18:23:32 DEBUG SparkILoop$SparkILoopInterpreter: Invoking: public static java.lang.String $line47.$eval.$print()
14/07/21 18:23:33 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations
14/07/21 18:23:33 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences
14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving 'pfrom_id to pfrom_id#5
14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving 'mobile to mobile#2
14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving 'sid to sid#1
14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving 'mobile to mobile#2
14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving 'mobile to mobile#2
14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving 'sid to sid#1
14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving 'mobile to mobile#2
14/07/21 18:23:33 DEBUG Analyzer:
=== Result of Batch Resolution ===
!Aggregate ['mobile,'sid,('mobile * 2) AS c2#27], ['mobile AS mb#23,'sid AS id#24,('mobile * 2) AS multi2mobile#25,COUNT(1) AS times#26L] Aggregate [mobile#2,sid#1,(CAST(mobile#2, DoubleType) * CAST(2, DoubleType)) AS c2#27], [mobile#2 AS mb#23,sid#1 AS id#24,(CAST(mobile#2, DoubleType) * CAST(2, DoubleType)) AS multi2mobile#25,COUNT(1) AS times#26L]
! Filter ('pfrom_id = 0.0) Filter (CAST(pfrom_id#5, DoubleType) = 0.0)Subquery a Subquery a
! Project [*] Project [data_date#0,sid#1,mobile#2,pverify_type#3,create_time#4,pfrom_id#5,p_status#6,pvalidate_time#7,feffect_time#8,plastupdate_ip#9,update_time#10,status#11,preserve_int#12]
! UnresolvedRelation None, temp_shengli_mobile, None Subquery temp_shengli_mobile
! SparkLogicalPlan (ExistingRdd [data_date#0,sid#1,mobile#2,pverify_type#3,create_time#4,pfrom_id#5,p_status#6,pvalidate_time#7,feffect_time#8,plastupdate_ip#9,update_time#10,status#11,preserve_int#12], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:174)14/07/21 18:23:33 DEBUG Analyzer:
=== Result of Batch AnalysisOperators ===
!Aggregate ['mobile,'sid,('mobile * 2) AS c2#27], ['mobile AS mb#23,'sid AS id#24,('mobile * 2) AS multi2mobile#25,COUNT(1) AS times#26L] Aggregate [mobile#2,sid#1,(CAST(mobile#2, DoubleType) * CAST(2, DoubleType)) AS c2#27], [mobile#2 AS mb#23,sid#1 AS id#24,(CAST(mobile#2, DoubleType) * CAST(2, DoubleType)) AS multi2mobile#25,COUNT(1) AS times#26L]
! Filter ('pfrom_id = 0.0) Filter (CAST(pfrom_id#5, DoubleType) = 0.0)
! Subquery a Project [data_date#0,sid#1,mobile#2,pverify_type#3,create_time#4,pfrom_id#5,p_status#6,pvalidate_time#7,feffect_time#8,plastupdate_ip#9,update_time#10,status#11,preserve_int#12]
! Project [*] SparkLogicalPlan (ExistingRdd [data_date#0,sid#1,mobile#2,pverify_type#3,create_time#4,pfrom_id#5,p_status#6,pvalidate_time#7,feffect_time#8,plastupdate_ip#9,update_time#10,status#11,preserve_int#12], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:174)
! UnresolvedRelation None, temp_shengli_mobile, None
四、总结
原创文章,转载请注明:
转载自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory
本文链接地址:http://blog.csdn.net/oopsoom/article/details/38025185
转载于:https://www.cnblogs.com/yangykaifa/p/6955694.html
Spark SQL Catalyst源代码分析之Analyzer相关推荐
- Spark SQL Catalyst源代码分析之TreeNode Library
/** Spark SQL源代码分析系列文章*/ 前几篇文章介绍了Spark SQL的Catalyst的核心执行流程.SqlParser,和Analyzer,本来打算直接写Optimizer的,可是发 ...
- Spark SQL Catalyst源代码分析Optimizer
/** Spark SQL源代码分析系列*/ 前几篇文章介绍了Spark SQL的Catalyst的核心运行流程.SqlParser,和Analyzer 以及核心类库TreeNode,本文将具体解说S ...
- 递归下降语法分析器的构建_一文了解函数式查询优化器Spark SQL Catalyst
大数据技术与架构点击右侧关注,大数据开发领域最强公众号! 暴走大数据点击右侧关注,暴走大数据!记录一下个人对sparkSql的catalyst这个函数式的可扩展的查询优化器的理解,目录如下: 0. O ...
- Spark SQL 处理流程分析 (一)
下面的代码演示了通过Case Class进行表Schema定义的例子: // sc is an existing SparkContext. val sqlContext = new org.apac ...
- Spark SQL / Catalyst 内部原理 与 RBO
Spark SQL 架构 Spark SQL 的整体架构如下图所示 从上图可见,无论是直接使用 SQL 语句还是使用 DataFrame,都会经过如下步骤转换成 DAG 对 RDD 的操作 Parse ...
- Spark SQL 源代码分析系列
从决定写Spark SQL文章的源代码分析,到现在一个月的时间,一个又一个几乎相同的结束很快,在这里也做了一个综合指数,方便阅读,下面是读取顺序 :) 第一章 Spark SQL源代码分析之核心流程 ...
- Spark SQL 的SQL处理引擎分析
目录 一.补充以Sql Server 为例,SQL语句的执行逻辑顺序 1.From阶段 2.Where 阶段 3.Group By 4.Having 5.Select 6.Order By 7.SQL ...
- Spark2.3(三十五)Spark Structured Streaming源代码剖析(从CSDN和Github中看到别人分析的源代码的文章值得收藏)...
从CSDN中读取到关于spark structured streaming源代码分析不错的几篇文章 spark源码分析--事件总线LiveListenerBus spark事件总线的核心是LiveLi ...
- 【Spark】扩展Spark Catalyst,打造自定义的Spark SQL引擎
1.概述 转载自:扩展Spark Catalyst,打造自定义的Spark SQL引擎 Apache Spark是大数据处理领域最常用的计算引擎之一,被应用在各种各样的场景中,除了易用的API,稳定高 ...
- Spark SQL 工作流程源码解析(四)optimization 阶段(基于 Spark 3.3.0)
前言 本文隶属于专栏<大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见大数据技术体系 目录 Spark SQL 工 ...
最新文章
- 一句white-space:nowrap解决IE6,IE7下浮动元素不自动换行
- 英语语法---单词总结
- SpringMVC与Ajax交互
- strcmp()字符串比较函数
- Convex Hull (ACM-ICPC 2018 沈阳赛区网络预赛) 存个公式
- mysql的 怎么处理_本人的MySQL连接到底怎么处理才好……
- 转学到斯坦福大学计算机专业,斯坦福大学转学申请条件有哪些?
- display:column常用属性解释
- Java8 中通过 Stream 对列表进行去重的几种方法
- mysql 监控工具安装_Mysql监控工具Innotop工具安装
- Python——Django-settings.py的内容
- Jira 破解版 Docker 部署
- 组合优化问题的典型事例
- App 快捷方式——创建快捷方式
- 成长与直面: ORID见感思行教练法
- cdr 表格自动填充文字_常平办公软件培训分享之word表格自动填充序号
- 我来自江湖修改器|我来自江湖修改器3dm下载 v0.1二十四项
- eNews 第二十六期/2007.07
- STM32 PB3或者PB4不能正常使用的讲解
- Linux驱动开发(二)内核符号表