Analyzer主要职责就是将通过Sql Parser未能Resolved的Logical Plan给Resolved掉。

lazy val analyzed: LogicalPlan = analyzer.execute(logical)//分析过的LogicalPlan
protected[sql] lazy val analyzer: Analyzer =new Analyzer(catalog, functionRegistry, conf) {override val extendedResolutionRules =ExtractPythonUdfs ::sources.PreInsertCastAndRename ::Niloverride val extendedCheckRules = Seq(sources.PreWriteCheck(catalog))}
class Analyzer(catalog: Catalog,registry: FunctionRegistry,conf: CatalystConf,maxIterations: Int = 100)extends RuleExecutor[LogicalPlan] with HiveTypeCoercion with CheckAnalysis {def resolver: Resolver = {if (conf.caseSensitiveAnalysis) {caseSensitiveResolution} else {caseInsensitiveResolution}}val fixedPoint = FixedPoint(maxIterations)/*** Override to provide additional rules for the "Resolution" batch.*/val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nillazy val batches: Seq[Batch] = Seq(//不同的Batch代表不同的策略Batch("Substitution", fixedPoint,CTESubstitution ::WindowsSubstitution ::Nil : _*),Batch("Resolution", fixedPoint,//通过catalog解析表名ResolveRelations :://解析从子节点的操作生成的属性,一般是别名引起的,比如a.idResolveReferences ::ResolveGroupingAnalytics :://在select语言里,order by的属性往往在前面没写,查询的时候也需要把这些字段查出来,排序完毕之后再删除ResolveSortReferences ::ResolveGenerate :://解析函数ResolveFunctions ::ExtractWindowExpressions :://解析全局的聚合函数,比如select sum(score) from tableGlobalAggregates :://解析having子句后面的聚合过滤条件,比如having sum(score) > 400UnresolvedHavingClauseAttributes :://typeCoercionRules是hive的类型转换规则TrimGroupingAliases ::typeCoercionRules ++extendedResolutionRules : _*))
…
}

其中val analyzed: LogicalPlan= analyzer.execute(logical),logical就是sqlparser解析出来的unresolved logical plan,analyzed就是analyzed logical plan。那么exectue究竟是这么样的过程呢?

def execute(plan: TreeType): TreeType = {var curPlan = planbatches.foreach { batch =>//针对每个Batch进行处理val batchStartPlan = curPlanvar iteration = 1var lastPlan = curPlanvar continue = true// Run until fix point (or the max number of iterations as specified in the strategy.while (continue) {//只要对这个plan应用这个batch里面的所有rule之后,最后生成的plan没有发生变化才认为所有都遍历过了,只要有变化,就继续遍历//fold函数操作遍历问题集合的顺序。foldLeft是从左开始计算,然后往右遍历。foldRight是从右开始算,然后往左遍历。curPlan = batch.rules.foldLeft(curPlan) {case (plan, rule) =>val result = rule(plan)//对这个plan应用rule.apply转化里面的TreeNodelogInfo(s"plan (${plan}) \n result (${result}) \n rule (${rule})")//加这个打印可以看到每个plan应用之后的result是什么,方便后面讲解if (!result.fastEquals(plan)) {logTrace(s"""|=== Applying Rule ${rule.ruleName} ===|${sideBySide(plan.treeString, result.treeString).mkString("\n")}""".stripMargin)}result}iteration += 1if (iteration > batch.strategy.maxIterations) {// Only log if this is a rule that is supposed to run more than once.if (iteration != 2) {logInfo(s"Max iterations (${iteration - 1}) reached for batch ${batch.name}")}continue = false}if (curPlan.fastEquals(lastPlan)) {logTrace(s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")continue = false}lastPlan = curPlan}if (!batchStartPlan.fastEquals(curPlan)) {logDebug(s"""|=== Result of Batch ${batch.name} ===|${sideBySide(plan.treeString, curPlan.treeString).mkString("\n")}""".stripMargin)} else {logTrace(s"Batch ${batch.name} has no effect.")}}curPlan
}

重点在于以下这个函数:

val result = rule(plan)//对这个plan应用rule.apply转化里面的TreeNode

rule(plan)调用的是对应的Rule[LogicalPlan]对象里面的apply函数,例如ResolveRelations和ResolveReferences

object ResolveRelations extends Rule[LogicalPlan] {def getTable(u: UnresolvedRelation): LogicalPlan = {try {catalog.lookupRelation(u.tableIdentifier, u.alias)} catch {case _: NoSuchTableException =>u.failAnalysis(s"no such table ${u.tableName}")}}//输入(plan)logical 返回logical,transform是遍历各个节点,对每个节点应用该ruledef apply(plan: LogicalPlan): LogicalPlan = plan transform {//调用transformDown,本质上就是二叉树的前序(pre-order
)遍历case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>i.copy(table = EliminateSubQueries(getTable(u)))case u: UnresolvedRelation =>getTable(u)}
}

object ResolveReferences extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {// transformUp本质上就是二叉树的后序(post-order
)遍历case p: LogicalPlan if !p.childrenResolved => p// If the projection list contains Stars, expand it.case p @ Project(projectList, child) if containsStar(projectList) =>Project(projectList.flatMap {case s: Star => s.expand(child.output, resolver)case Alias(f @ UnresolvedFunction(_, args), name) if containsStar(args) =>val expandedArgs = args.flatMap {case s: Star => s.expand(child.output, resolver)case o => o :: Nil}Alias(child = f.copy(children = expandedArgs), name)() :: Nilcase Alias(c @ CreateArray(args), name) if containsStar(args) =>val expandedArgs = args.flatMap {case s: Star => s.expand(child.output, resolver)case o => o
二叉树的遍历原理见下图:

接下来讲解几个典型的Rule[LogicalPlan]

3.1 ResolveRelations

将UnresolvedRelation解析为resolvedRelation
object ResolveRelations extends Rule[LogicalPlan] {def getTable(u: UnresolvedRelation): LogicalPlan = {try {catalog.lookupRelation(u.tableIdentifier, u.alias)} catch {case _: NoSuchTableException =>u.failAnalysis(s"no such table ${u.tableName}")}}//输入(plan)logical 返回logical,transform是遍历各个节点,对每个节点应用该ruledef apply(plan: LogicalPlan): LogicalPlan = plan transform {case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>i.copy(table = EliminateSubQueries(getTable(u)))case u: UnresolvedRelation =>//当遇到UnresolvedRelation时,通过在catalog里查找表名对应的真实的数据源是什么relationgetTable(u)}
}
而这个表名对应的relation是在dataFrame.registerTempTable(source)时候注册进去的。
dataFrame.registerTempTable(source)

且看dataFrame.registerTempTable

/*** Registers this [[DataFrame]] as a temporary table using the given name.  The lifetime of this* temporary table is tied to the [[SQLContext]] that was used to create this DataFrame.** @group basic* @since 1.3.0*/
def registerTempTable(tableName: String): Unit = {sqlContext.registerDataFrameAsTable(this, tableName)
}
/*** Registers the given [[DataFrame]] as a temporary table in the catalog. Temporary tables exist* only during the lifetime of this instance of SQLContext.*/
private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {catalog.registerTable(Seq(tableName), df.logicalPlan)//一个表名对应1个logicalPlan
}

而这个logicalPlan正是dataFrame里面的logicalPlan

DataFrame dataFrame = sqlContext.parquetFile(hdfsPath)//这个dataFrame里面的logicalPlan
def parquetFile(paths: String*): DataFrame = {if (paths.isEmpty) {emptyDataFrame} else if (conf.parquetUseDataSourceApi) {//目前走这个分支read.parquet(paths : _*)} else {DataFrame(this, parquet.ParquetRelation(paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))}
}
def parquet(paths: String*): DataFrame = {if (paths.isEmpty) {sqlContext.emptyDataFrame} else {val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArraysqlContext.baseRelationToDataFrame(new ParquetRelation2(globbedPaths.map(_.toString), None, None, Map.empty[String, String])(sqlContext))//最终形成的正是ParquetRelation2}
}

然后我们看下日志打印:

plan->
'Sort ['car_num ASC], false'Aggregate ['dev_chnid], ['id,'dev_chnid,'dev_chnname,'car_num,'car_speed,'car_direct]'Filter ('id > 1)'UnresolvedRelation [test], Noneresult->
'Sort ['car_num ASC], false'Aggregate ['dev_chnid], ['id,'dev_chnid,'dev_chnname,'car_num,'car_speed,'car_direct]'Filter ('id > 1)Subquery testRelation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] org.apache.spark.sql.parquet.ParquetRelation2@2a400010rule->org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$@51db8cdb

当应用rule=ResolveRelations之后,将UnresolvedRelation [test], None解析成

Subquery test

Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42]org.apache.spark.sql.parquet.ParquetRelation2@2a400010

3.2 ResolveReferences

解析节点的输出属性,每个LogicalPlan的输出都是一些字段。例如当select*出现时,需要把*代表的所有字段列举出来

object ResolveReferences extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {case p: LogicalPlan if !p.childrenResolved => p// If the projection list contains Stars, expand it.case p @ Project(projectList, child) if containsStar(projectList) =>//如果出现*则需要把*扩展出来Project(projectList.flatMap {case s: Star => s.expand(child.output, resolver)case Alias(f @ UnresolvedFunction(_, args), name) if containsStar(args) =>val expandedArgs = args.flatMap {case s: Star => s.expand(child.output, resolver)case o => o :: Nil}Alias(child = f.copy(children = expandedArgs), name)() :: Nilcase Alias(c @ CreateArray(args), name) if containsStar(args) =>val expandedArgs = args.flatMap {case s: Star => s.expand(child.output, resolver)case o => o :: Nil}Alias(c.copy(children = expandedArgs), name)() :: Nilcase Alias(c @ CreateStruct(args), name) if containsStar(args) =>val expandedArgs = args.flatMap {case s: Star => s.expand(child.output, resolver)case o => o :: Nil}Alias(c.copy(children = expandedArgs), name)() :: Nilcase o => o :: Nil},child)case t: ScriptTransformation if containsStar(t.input) =>t.copy(input = t.input.flatMap {case s: Star => s.expand(t.child.output, resolver)case o => o :: Nil})……
}

例如sql语句如下:

String sql = "SELECT * from test ";

则日志打印如下:

plan->
'Project [*]Subquery test
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] org.apache.spark.sql.parquet.ParquetRelation2@2a400010result->
Project [id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42]//将*解析成具体的列Subquery testRelation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] org.apache.spark.sql.parquet.ParquetRelation2@2a400010
rule->org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$@7878966d

3.3 ResolveSortReferences

在select语言里,order by的属性往往在前面没写,查询的时候也需要把这些字段查出来,排序完毕之后再删除,还有当同时存在聚合函数和排序的时候,如果排序的字段不在聚合函数的字段中,则也要把对应的字段添加到聚合函数中:

object ResolveSortReferences extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {case s @ Sort(ordering, global, p @ Project(projectList, child))if !s.resolved && p.resolved =>val (resolvedOrdering, missing) = resolveAndFindMissing(ordering, p, child)// If this rule was not a no-op, return the transformed plan, otherwise return the original.if (missing.nonEmpty) {// Add missing attributes and then project them away after the sort.Project(p.output,Sort(resolvedOrdering, global,Project(projectList ++ missing, child)))//把order中没有出现在p的输出列表的字段补充进p} else {logDebug(s"Failed to find $missing in ${p.output.mkString(", ")}")s // Nothing we can do here. Return original plan.}case s @ Sort(ordering, global, a @ Aggregate(grouping, aggs, child))if !s.resolved && a.resolved =>val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name })// A small hack to create an object that will allow us to resolve any references that// refer to named expressions that are present in the grouping expressions.val groupingRelation = LocalRelation(grouping.collect { case ne: NamedExpression => ne.toAttribute })val (resolvedOrdering, missing) = resolveAndFindMissing(ordering, a, groupingRelation)if (missing.nonEmpty) {// Add missing grouping exprs and then project them away after the sort.Project(a.output,Sort(resolvedOrdering, global,Aggregate(grouping, aggs ++ missing, child)))//把order中没有出现在聚合函数中的字段放到聚合函数中} else {s // Nothing we can do here. Return original plan.}}

例如sql语句如下:

String sql = "SELECT dev_chnid from test order by id";

则日志打印如下:

plan->
'Sort ['id ASC], true//id没有出现在Project中Project [dev_chnid#26]Subquery test
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] org.apache.spark.sql.parquet.ParquetRelation2@2a400010
result->
Project [dev_chnid#26]Sort [id#0L ASC], trueProject [dev_chnid#26,id#0L]//先统一一起查出来Subquery test
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] org.apache.spark.sql.parquet.ParquetRelation2@2a400010
rule->org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveSortReferences$@2fa28f15

3.4 ResolveFunctions

解析UDF(user definedfunction)用户自定义函数。Spark支持用户自定义函数,用户可以在Spark SQL 里自定义实际需要的UDF来处理数据。相信在使用Sparksql的人都遇到了Sparksql所支持的函数太少了的难处,除了最基本的函数,Sparksql所能支撑的函数很少,肯定不能满足正常的项目使用,UDF可以解决问题

那么如何使用用户自定义函数呢,先看段代码:

SQLContext sqlContext = new SQLContext(jsc);
UDFRegistration udfRegistration = new UDFRegistration(sqlContext);//通过UDFRegistration进行注册
DataFrame dataFrame = sqlContext.parquetFile(hdfsPath);
dataFrame.registerTempTable(source);
udfRegistration.register("strlength", new UDF1<String, Integer>() {@Overridepublic Integer call(String str) throws Exception {return (Integer)str.length();}
}, DataType.fromCaseClassString("IntegerType"));//返回对应字符串的长度
String sql = "SELECT strlength(dev_chnid) from test";
DataFrame result = sqlContext.sql(sql);
用户可以通过UDFRegistration针对某个字段类型进行注册自定义函数,那么ResolveFunctions是如何解析的?接着往下看:
object ResolveFunctions extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case q: LogicalPlan =>q transformExpressions {case u @ UnresolvedFunction(name, children) if u.childrenResolved =>registry.lookupFunction(name, children)//通过registry查找}}
}
protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry(conf)
class SimpleFunctionRegistry(val conf: CatalystConf) extends FunctionRegistry {val functionBuilders = StringKeyHashMap[FunctionBuilder](conf.caseSensitiveAnalysis)override def registerFunction(name: String, builder: FunctionBuilder): Unit = {functionBuilders.put(name, builder)}override def lookupFunction(name: String, children: Seq[Expression]): Expression = {functionBuilders(name)(children)}
}
class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging {
/*** Register a user-defined function with 1 arguments.* @since 1.3.0*/
def register(name: String, f: UDF1[_, _], returnType: DataType) = {//内部最终还是通过functionRegistry进行注册的functionRegistry.registerFunction(name,(e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF1[Any, Any]].call(_: Any), returnType, e))
}
}

则日志打印如下:

plan->
'Project ['strlength(dev_chnid#26) AS c0#43]Subquery test
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] org.apache.spark.sql.parquet.ParquetRelation2@2a400010
result->
Project [scalaUDF(dev_chnid#26) AS c0#43]//将strlength解析成scalaUDFSubquery test
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] org.apache.spark.sql.parquet.ParquetRelation2@2a400010
rule->org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$@2b8199b7

3.5 GlobalAggregates

解析select 中的全局聚合函数,例如select MAX(ID)。

object GlobalAggregates extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case Project(projectList, child) if containsAggregates(projectList) =>//如果包含聚合表达式,则将Project转变为AggregateAggregate(Nil, projectList, child)}def containsAggregates(exprs: Seq[Expression]): Boolean = {exprs.foreach(_.foreach {case agg: AggregateExpression => return truecase _ =>})false}
}

例如sql语句如下:

String sql = "SELECT MAX(id) from test";

则日志打印如下:

16-07-19 14:17:59,708 INFO org.apache.spark.sql.SQLContext$$anon$1(Logging.scala:59) ##
plan->
'Project [MAX(id#0L) AS c0#43L]Subquery test
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42]org.apache.spark.sql.parquet.ParquetRelation2@2a400010result->
Aggregate [MAX(id#0L) AS c0#43L]//将Project解析成AggragateSubquery test
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42]org.apache.spark.sql.parquet.ParquetRelation2@2a400010rule->org.apache.spark.sql.catalyst.analysis.Analyzer$GlobalAggregates$@4a9e419a

3.6 UnresolvedHavingClauseAttributes

解析having子句后面的过滤条件,如果该过滤字段没有出现在select 之后的话,则补齐。

object UnresolvedHavingClauseAttributes extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {case filter @ Filter(havingCondition, aggregate @ Aggregate(_, originalAggExprs, _))if aggregate.resolved && containsAggregate(havingCondition) => {val evaluatedCondition = Alias(havingCondition, "havingCondition")()val aggExprsWithHaving = evaluatedCondition +: originalAggExprs//合并filter中的过滤字段Project(aggregate.output,Filter(evaluatedCondition.toAttribute,aggregate.copy(aggregateExpressions = aggExprsWithHaving)))//将其作为聚合函数的输出}}protected def containsAggregate(condition: Expression): Boolean =condition.collect { case ae: AggregateExpression => ae }.nonEmpty
}

例如sql语句如下:

String sql = "SELECT SUM(car_speed) from test group by dev_chnname HAVING SUM(id) > 1";//id没有出现在select 之后

则日志打印如下:

16-07-19 15:41:43,410 INFO  org.apache.spark.sql.SQLContext$$anon$1(Logging.scala:59) ##
plan->
'Filter (SUM('id) > 1)Aggregate [dev_chnname#4], [SUM(car_speed#8) AS c0#43]Subquery test
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] org.apache.spark.sql.parquet.ParquetRelation2@2a400010result->
'Project [c0#43]'Filter 'havingCondition'Aggregate [dev_chnname#4], [(SUM('id) > 1) AS havingCondition#44,SUM(car_speed#8) AS c0#43]//将SUM(id)下推到聚合函数这里Subquery test
Relation[id#0L,dev_id#1,dev_chnnum#2L,dev_name#3,dev_chnname#4,car_num#5,car_numtype#6,car_numcolor#7,car_speed#8,car_type#9,car_color#10,car_length#11L,car_direct#12,car_way_code#13,cap_time#14L,cap_date#15L,inf_note#16,max_speed#17,min_speed#18,car_img_url#19,car_img1_url#20,car_img2_url#21,car_img3_url#22,car_img4_url#23,car_img5_url#24,rec_stat#25,dev_chnid#26,car_img_count#27,save_flag#28,dc_cleanflag#29,pic_id#30,car_img_plate_top#31L,car_img_plate_left#32L,car_img_plate_bottom#33L,car_img_plate_right#34L,car_brand#35L,issafetybelt#36,isvisor#37,bind_stat#38,car_num_pic#39,combined_pic_url#40,verify_memo#41,rec_stat_tmp#42] org.apache.spark.sql.parquet.ParquetRelation2@2a400010rule->org.apache.spark.sql.catalyst.analysis.Analyzer$UnresolvedHavingClauseAttributes$@631ea30a

Spark-Sql源码解析之三 Analyzer:Unresolved logical plan – analyzed logical plan相关推荐

  1. 【DPDK】dpdk样例源码解析之三:dpdk-l3fwd_001

    本篇文章主要介绍dpdk-l3fwd实例源码,通过分析代码逻辑,学习DPDK中几个API接口作用以及如何使用? 操作系统版本:CentOS 8.4 DPDK版本:dpdk-20.11.3 如何单独创建 ...

  2. spark reduceByKey源码解析

    使用关联和可交换的归约函数合并每个key的value. 在将结果发送给reducer之前,这还将在每个Mapper上本地执行合并,类似于MapReduce中的" combiner" ...

  3. SparkSQL 之 Shuffle Join 内核原理及应用深度剖析-Spark商业源码实战

    本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客.版权声明:禁止转载,欢迎学习.QQ邮箱 ...

  4. FlinkSQL源码解析(三)执行流程

    1.前言 前面2部分主要是介绍以下2点: flink sql整体的执行流程大致为:sqlNode --> Operation --> RelNode --> 优化 --> ex ...

  5. 【DPDK】dpdk样例源码解析之五:dpdk-rss

    本篇文章介绍DPDK-RSS相关的功能,RSS是网卡提供的分流机制,简单讲就是一个HASH值,如果使用DPDK收包,开启RSS后,会根据配置项将数据包分流到不同的收包队列,用来是实现负载均衡. 通过D ...

  6. spark shell 删除失效_Spark任务提交源码解析

    1. 前言 反反复复捣鼓了很久,终于开始学习Spark的源码了,果不其然,那真的很有趣.这里我打算一本正经的胡说八道来讲一下Spark作业的提交过程. 基础mac系统基础环境如下: JDK 1.8 I ...

  7. StarRocks Analyzer 源码解析

    导读:欢迎来到 StarRocks 源码解析系列文章,我们将为你全方位揭晓 StarRocks 背后的技术原理和实践细节,助你逐步了解这款明星开源数据库产品.本期将主要介绍 StarRocks Par ...

  8. Spark特征处理之RFormula源码解析

    ##RFormula简单介绍 RFormula通过R模型公式来操作列. 支持R操作中的部分操作包括'~', '.', ':', '+'以及'-'. 1. ~分隔目标和对象2. +合并对象," ...

  9. spark word2vec 源码详细解析

    spark word2vec 源码详细解析 简单介绍spark word2vec skip-gram 层次softmax版本的源码解析 word2vec 的原理 只需要看层次哈弗曼树skip-gram ...

  10. sharding-jdbc源码解析之sql解析

    2019独角兽企业重金招聘Python工程师标准>>> 说在前面 本文转自"天河聊技术"微信公众号 本次介绍的是sharding-jdbc的源码解析部分的sql解 ...

最新文章

  1. 树莓派文件服务器nas,树莓派搭建NAS服务器
  2. 多任务学习漫谈:分主次之序
  3. python 一次输入10个数_python 如何一次输入3个整数
  4. 平台策略:从Portlet到OpenSocial小工具再到渐进式Web应用程序:最新技术
  5. 【网络安全入门大总结】—Java语言中常用的渗透漏洞大汇总
  6. 安装opencv_contrib(ubuntu16.0)
  7. 项目部署:服务器IIS发布后本地浏览没有问题,外网访问显示无法访问
  8. 我在华为度过的 “两辈子”(学习那些在大厂表现优秀的人)
  9. Linux学习第一节课学习心得
  10. linux文件系统与磁盘(五)分区的取消挂载、调整分区大小
  11. 百度地图的一些踩坑 marker网络图片不显示
  12. 理解C#中装箱和拆箱的概念
  13. 【论文翻译】Fully Convolutional Networks for Semantic Segmentation_2
  14. 阿尔法python课程答案_阿尔法编程python答案
  15. 星起航:维护好私域流量池要有哪些要素?
  16. 论文解读: PP-YOLOE: An evolved version of YOLO
  17. 解析二维码的三种方式
  18. c语言程序设计基础课后习题答案,2011级C语言程序设计基础教程课后习题答案
  19. 解决MSN无法登录问题
  20. java手机cpu测试_CPU性能检测下载

热门文章

  1. linux系统中rpm啥意思,Linux系统中的RPM简说
  2. fyi 在邮件里是什么意思_FYI的完整形式是什么?
  3. vulhub漏洞复现- ActiveMQ 反序列化漏洞(CVE-2015-5254)
  4. MBR、主分区、扩展分区、逻辑分区、活动分区、系统分区、启动分区讲解
  5. 破网与立网,数字螺旋下的适者生存
  6. Microbiome | 黄海所陈松林院士/华科宁康等-肠道菌群在龙利鱼(半滑舌鳎)抗弧菌病性状形成中的机制...
  7. 高股息组合与指数基金谁好?
  8. kali系统自带的aircrack-ng破解WIFI
  9. 计算机存储器由半导体,计算机存储器
  10. 关于高级交互设计师的5个经验总结