/** Spark SQL源代码分析系列*/

前几篇文章介绍了Spark SQL的Catalyst的核心运行流程、SqlParser,和Analyzer 以及核心类库TreeNode,本文将具体解说Spark SQL的Optimizer的优化思想以及Optimizer在Catalyst里的表现方式,并加上自己的实践。对Optimizer有一个直观的认识。

Optimizer的主要职责是将Analyzer给Resolved的Logical Plan依据不同的优化策略Batch。来对语法树进行优化。优化逻辑计划节点(Logical Plan)以及表达式(Expression),也是转换成物理运行计划的前置。例如以下图:

一、Optimizer

Optimizer这个类是在catalyst里的optimizer包下的唯一一个类。Optimizer的工作方式事实上相似Analyzer,由于它们都继承自RuleExecutor[LogicalPlan],都是运行一系列的Batch操作:

Optimizer里的batches包括了3类优化策略:1、Combine Limits 合并Limits  2、ConstantFolding 常量合并 3、Filter Pushdown 过滤器下推,每一个Batch里定义的优化伴随对象都定义在Optimizer里了:

object Optimizer extends RuleExecutor[LogicalPlan] {val batches =Batch("Combine Limits", FixedPoint(100),CombineLimits) ::Batch("ConstantFolding", FixedPoint(100),NullPropagation,ConstantFolding,BooleanSimplification,SimplifyFilters,SimplifyCasts,SimplifyCaseConversionExpressions) ::Batch("Filter Pushdown", FixedPoint(100),CombineFilters,PushPredicateThroughProject,PushPredicateThroughJoin,ColumnPruning) :: Nil
}

另外提一点,Optimizer里不但对Logical Plan进行了优化,并且对Logical Plan中的Expression也进行了优化,所以有必要了解一下Expression相关类。主要是用到了references和outputSet,references主要是Logical Plan或Expression节点的所依赖的那些Expressions,而outputSet是Logical Plan所有的Attribute的输出:

如:Aggregate是一个Logical Plan, 它的references就是group by的表达式 和 aggreagate的表达式的并集去重。

case class Aggregate(groupingExpressions: Seq[Expression],aggregateExpressions: Seq[NamedExpression],child: LogicalPlan)extends UnaryNode {override def output = aggregateExpressions.map(_.toAttribute)override def references =(groupingExpressions ++ aggregateExpressions).flatMap(_.references).toSet
}

二、优化策略具体解释

  Optimizer的优化策略不仅有对plan进行transform的,也有对expression进行transform的,究其原理就是遍历树。然后应用优化的Rule,可是注意一点,对Logical Plantransfrom的是先序遍历(pre-order),而对Expression transfrom的时候是后序遍历(post-order):

2.1、Batch: Combine Limits

假设出现了2个Limit,则将2个Limit合并为一个,这个要求一个Limit是还有一个Limit的grandChild。
 /*** Combines two adjacent [[Limit]] operators into one, merging the* expressions into one single expression.*/
object CombineLimits extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case ll @ Limit(le, nl @ Limit(ne, grandChild)) => //ll为当前Limit,le为其expression。 nl是ll的grandChild,ne是nl的expressionLimit(If(LessThan(ne, le), ne, le), grandChild) //expression比較,假设ne比le小则表达式为ne,否则为le}
}

给定SQL:val query = sql("select * from (select * from temp_shengli limit 100)a limit 10 ")

scala> query.queryExecution.analyzed
res12: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Limit 10Project [key#13,value#14]Limit 100Project [key#13,value#14]MetastoreRelation default, temp_shengli, None

子查询里limit100,外层查询limit10,这里我们当然能够在子查询里不必查那么多。由于外层仅仅须要10个,所以这里会合并Limit10。和Limit100 为 Limit 10。

2.2、Batch: ConstantFolding

这个Batch里包括了Rules:NullPropagation,ConstantFolding。BooleanSimplification,SimplifyFilters。SimplifyCasts。SimplifyCaseConversionExpressions。

2.2.1、Rule:NullPropagation

这里先提一下Literal字面量。它事实上是一个能匹配随意基本类型的类。(为下文做铺垫)

object Literal {def apply(v: Any): Literal = v match {case i: Int => Literal(i, IntegerType)case l: Long => Literal(l, LongType)case d: Double => Literal(d, DoubleType)case f: Float => Literal(f, FloatType)case b: Byte => Literal(b, ByteType)case s: Short => Literal(s, ShortType)case s: String => Literal(s, StringType)case b: Boolean => Literal(b, BooleanType)case d: BigDecimal => Literal(d, DecimalType)case t: Timestamp => Literal(t, TimestampType)case a: Array[Byte] => Literal(a, BinaryType)case null => Literal(null, NullType)}
}

注意Literal是一个LeafExpression,核心方法是eval,给定Row。计算表达式返回值:

case class Literal(value: Any, dataType: DataType) extends LeafExpression {override def foldable = truedef nullable = value == nulldef references = Set.emptyoverride def toString = if (value != null) value.toString else "null"type EvaluatedType = Anyoverride def eval(input: Row):Any = value
}

如今来看一下NullPropagation都做了什么。

NullPropagation是一个能将Expression Expressions替换为等价的Literal值的优化。并且能够避免NULL值在SQL语法树的传播。

/*** Replaces [[Expression Expressions]] that can be statically evaluated with* equivalent [[Literal]] values. This rule is more specific with* Null value propagation from bottom to top of the expression tree.*/
object NullPropagation extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case q: LogicalPlan => q transformExpressionsUp {case e @ Count(Literal(null, _)) => Cast(Literal(0L), e.dataType) //假设count(null)则转化为count(0)case e @ Sum(Literal(c, _)) if c == 0 => Cast(Literal(0L), e.dataType)<span style="font-family: Arial;">//假设sum(null)则转化为sum(0)</span>case e @ Average(Literal(c, _)) if c == 0 => Literal(0.0, e.dataType)case e @ IsNull(c) if !c.nullable => Literal(false, BooleanType)case e @ IsNotNull(c) if !c.nullable => Literal(true, BooleanType)case e @ GetItem(Literal(null, _), _) => Literal(null, e.dataType)case e @ GetItem(_, Literal(null, _)) => Literal(null, e.dataType)case e @ GetField(Literal(null, _), _) => Literal(null, e.dataType)case e @ Coalesce(children) => {val newChildren = children.filter(c => c match {case Literal(null, _) => falsecase _ => true})if (newChildren.length == 0) {Literal(null, e.dataType)} else if (newChildren.length == 1) {newChildren(0)} else {Coalesce(newChildren)}}case e @ If(Literal(v, _), trueValue, falseValue) => if (v == true) trueValue else falseValuecase e @ In(Literal(v, _), list) if (list.exists(c => c match {case Literal(candidate, _) if candidate == v => truecase _ => false})) => Literal(true, BooleanType)// Put exceptional cases above if anycase e: BinaryArithmetic => e.children match {case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)case _ => e}case e: BinaryComparison => e.children match {case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)case _ => e}case e: StringRegexExpression => e.children match {case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)case _ => e}}}
}

给定SQL: val query = sql("select count(null) from temp_shengli where key is not null")

scala> query.queryExecution.analyzed
res6: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Aggregate [], [COUNT(null) AS c0#5L] //这里count的是nullFilter IS NOT NULL key#7MetastoreRelation default, temp_shengli, None

调用NullPropagation

scala> NullPropagation(query.queryExecution.analyzed)
res7: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Aggregate [], [CAST(0, LongType) AS c0#5L]  //优化后为0了Filter IS NOT NULL key#7MetastoreRelation default, temp_shengli, None

2.2.2、Rule:ConstantFolding

  常量合并是属于Expression优化的一种,对于能够直接计算的常量,不用放到物理运行里去生成对象来计算了,直接能够在计划里就计算出来:
 object ConstantFolding extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform { //先对plan进行transformcase q: LogicalPlan => q transformExpressionsDown { //对每一个plan的expression进行transform// Skip redundant folding of literals.case l: Literal => lcase e if e.foldable => Literal(e.eval(null), e.dataType) //调用eval方法计算结果}}}

给定SQL: val query = sql("select 1+2+3+4 from temp_shengli")

scala> query.queryExecution.analyzed
res23: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [(((1 + 2) + 3) + 4) AS c0#21]  //这里还是常量表达式MetastoreRelation default, src, None

优化后:

scala> query.queryExecution.optimizedPlan
res24: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [10 AS c0#21] //优化后。直接合并为10MetastoreRelation default, src, None

2.2.3、BooleanSimplification

这个是对布尔表达式的优化,有点像java布尔表达式中的短路推断。只是这个写的倒是非常优雅。

看看布尔表达式2边能不能通过仅仅计算1边,而省去计算还有一边而提高效率,称为简化布尔表达式。

解释请看我写的凝视:

/*** Simplifies boolean expressions where the answer can be determined without evaluating both sides.* Note that this rule can eliminate expressions that might otherwise have been evaluated and thus* is only safe when evaluations of expressions does not result in side effects.*/
object BooleanSimplification extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case q: LogicalPlan => q transformExpressionsUp {case and @ And(left, right) => //假设布尔表达式是AND操作,即exp1 and exp2(left, right) match { //(左边表达式。右边表达式)case (Literal(true, BooleanType), r) => r // 左边true。返回右边的<span style="font-family: Arial;">bool</span><span style="font-family: Arial;">值</span>case (l, Literal(true, BooleanType)) => l //右边true,返回左边的bool值case (Literal(false, BooleanType), _) => Literal(false)//左边都false,右边随便。反正是返回falsecase (_, Literal(false, BooleanType)) => Literal(false)//仅仅要有1边是false了,都是falsecase (_, _) => and}case or @ Or(left, right) =>(left, right) match {case (Literal(true, BooleanType), _) => Literal(true) //仅仅要左边是true了,不用推断右边都是truecase (_, Literal(true, BooleanType)) => Literal(true) //仅仅要有一边是true,都返回truecase (Literal(false, BooleanType), r) => r //希望右边r是truecase (l, Literal(false, BooleanType)) => lcase (_, _) => or}}}
}

2.3 Batch: Filter Pushdown

Filter Pushdown下包括了CombineFilters、PushPredicateThroughProject、PushPredicateThroughJoin、ColumnPruning
Ps:感觉Filter Pushdown的名字起的有点不能涵盖所有比方ColumnPruning列裁剪。

2.3.1、Combine Filters

 合并两个相邻的Filter,这个和上述Combine Limit差点儿相同。合并2个节点,就能够降低树的深度从而降低反复运行过滤的代价。
/*** Combines two adjacent [[Filter]] operators into one, merging the* conditions into one conjunctive predicate.*/
object CombineFilters extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case ff @ Filter(fc, nf @ Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild)}
}

给定SQL:val query = sql("select key from (select key from temp_shengli where key >100)a where key > 80 ")

优化前:我们看到一个filter 是还有一个filter的grandChild

scala> query.queryExecution.analyzed
res25: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [key#27]Filter (key#27 > 80) //filter>80Project [key#27]Filter (key#27 > 100) //filter>100MetastoreRelation default, src, None

优化后:事实上filter也能够表达为一个复杂的boolean表达式

scala> query.queryExecution.optimizedPlan
res26: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [key#27]Filter ((key#27 > 100) && (key#27 > 80)) //合并为1个MetastoreRelation default, src, None

2.3.2  Filter Pushdown

Filter Pushdown,过滤器下推。

原理就是更早的过滤掉不须要的元素来降低开销。

给定SQL:val query = sql("select key from (select * from temp_shengli)a where key>100")

生成的逻辑计划为:

scala> scala> query.queryExecution.analyzed
res29: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [key#31]Filter (key#31 > 100) //先select key, value,然后再FilterProject [key#31,value#32]MetastoreRelation default, src, None

优化后的计划为:

query.queryExecution.optimizedPlan
res30: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [key#31]Filter (key#31 > 100) //先filter,然后再selectMetastoreRelation default, src, None

2.3.3、ColumnPruning

  列裁剪用的比較多,就是降低不必要select的某些列。
  列裁剪在3种地方能够用:
  1、在聚合操作中,能够做列裁剪
  2、在join操作中,左右孩子能够做列裁剪
  3、合并相邻的Project的列

object ColumnPruning extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {// Eliminate attributes that are not needed to calculate the specified aggregates.case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty => 假设project的outputSet中减去a.references的元素假设不同,那么就将Aggreagte的child替换为a.referencesa.copy(child = Project(a.references.toSeq, child))// Eliminate unneeded attributes from either side of a Join.case Project(projectList, Join(left, right, joinType, condition)) =>// 消除join的left 和 right孩子的不必要属性,将join的左右子树的列进行裁剪// Collect the list of off references required either above or to evaluate the condition.val allReferences: Set[Attribute] =projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)/** Applies a projection only when the child is producing unnecessary attributes */def prunedChild(c: LogicalPlan) =if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {Project(allReferences.filter(c.outputSet.contains).toSeq, c)} else {c}Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))// Combine adjacent Projects.case Project(projectList1, Project(projectList2, child)) => //合并相邻Project的列// Create a map of Aliases to their values from the child projection.// e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)).val aliasMap = projectList2.collect {case a @ Alias(e, _) => (a.toAttribute: Expression, a)}.toMap// Substitute any attributes that are produced by the child projection, so that we safely// eliminate it.// e.g., 'SELECT c + 1 FROM (SELECT a + b AS C ...' produces 'SELECT a + b + 1 ...'// TODO: Fix TransformBase to avoid the cast below.val substitutedProjection = projectList1.map(_.transform {case a if aliasMap.contains(a) => aliasMap(a)}).asInstanceOf[Seq[NamedExpression]]Project(substitutedProjection, child)// Eliminate no-op Projectscase Project(projectList, child) if child.output == projectList => child}
}
分别举三个样例来相应三种情况进行说明:
1、在聚合操作中,能够做列裁剪
给定SQL:val query = sql("SELECT 1+1 as shengli, key from (select key, value from temp_shengli)a group by key")
优化前:

res57: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Aggregate [key#51], [(1 + 1) AS shengli#49,key#51]Project [key#51,value#52] //优化前默认select key 和 value两列MetastoreRelation default, temp_shengli, None

优化后:

scala> ColumnPruning1(query.queryExecution.analyzed)
MetastoreRelation default, temp_shengli, None
res59: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Aggregate [key#51], [(1 + 1) AS shengli#49,key#51]Project [key#51]  //优化后。列裁剪掉了value,仅仅select keyMetastoreRelation default, temp_shengli, None

2、在join操作中,左右孩子能够做列裁剪

给定SQL:val query = sql("select a.value qween from (select * from temp_shengli) a join (select * from temp_shengli)b  on a.key =b.key ")
没有优化之前:

scala> query.queryExecution.analyzed
res51: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [value#42 AS qween#39]Join Inner, Some((key#41 = key#43))Project [key#41,value#42]  //这里多select了一列,即valueMetastoreRelation default, temp_shengli, NoneProject [key#43,value#44]  //这里多select了一列。即valueMetastoreRelation default, temp_shengli, None

优化后:(ColumnPruning2是我自己调试用的)

scala> ColumnPruning2(query.queryExecution.analyzed)
allReferences is -> Set(key#35, key#37)
MetastoreRelation default, temp_shengli, None
MetastoreRelation default, temp_shengli, None
res47: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [key#35 AS qween#33]Join Inner, Some((key#35 = key#37))Project [key#35]   //经过列裁剪之后,left Child仅仅须要select key这一个列MetastoreRelation default, temp_shengli, NoneProject [key#37]   //经过列裁剪之后。right Child仅仅须要select key这一个列MetastoreRelation default, temp_shengli, None

3、合并相邻的Project的列,裁剪

给定SQL:val query = sql("SELECT c + 1 FROM (SELECT 1 + 1 as c from temp_shengli ) a ")

优化前:

scala> query.queryExecution.analyzed
res61: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [(c#56 + 1) AS c0#57]Project [(1 + 1) AS c#56]MetastoreRelation default, temp_shengli, None

优化后:

scala> query.queryExecution.optimizedPlan
res62: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [(2 AS c#56 + 1) AS c0#57] //将子查询里的c 代入到 外层select里的c,直接计算结果MetastoreRelation default, temp_shengli, None

三、总结:

本文介绍了Optimizer在Catalyst里的作用即将Analyzed Logical Plan 经过对Logical Plan和Expression进行Rule的应用transfrom,从而达到树的节点进行合并和优化。当中基本的优化的策略总结起来是合并、列裁剪、过滤器下推几大类。

Catalyst应该在不断迭代中,本文仅仅是基于spark1.0.0进行研究。兴许假设新增加的优化策略也会在兴许补充进来。

欢迎大家讨论。共同进步!

——EOF——

原创文章,转载请注明:

转载自:OopsOutOfMemory盛利的Blog。作者: OopsOutOfMemory

本文链接地址:http://blog.csdn.net/oopsoom/article/details/38121259

注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,可是请保留本文作者署名和文章链接。如若须要用于商业目的或者与授权方面的协商,请联系我。

Spark SQL Catalyst源代码分析Optimizer相关推荐

  1. Spark SQL Catalyst源代码分析之TreeNode Library

    /** Spark SQL源代码分析系列文章*/ 前几篇文章介绍了Spark SQL的Catalyst的核心执行流程.SqlParser,和Analyzer,本来打算直接写Optimizer的,可是发 ...

  2. 递归下降语法分析器的构建_一文了解函数式查询优化器Spark SQL Catalyst

    大数据技术与架构点击右侧关注,大数据开发领域最强公众号! 暴走大数据点击右侧关注,暴走大数据!记录一下个人对sparkSql的catalyst这个函数式的可扩展的查询优化器的理解,目录如下: 0. O ...

  3. Spark SQL 处理流程分析 (一)

    下面的代码演示了通过Case Class进行表Schema定义的例子: // sc is an existing SparkContext. val sqlContext = new org.apac ...

  4. Spark SQL / Catalyst 内部原理 与 RBO

    Spark SQL 架构 Spark SQL 的整体架构如下图所示 从上图可见,无论是直接使用 SQL 语句还是使用 DataFrame,都会经过如下步骤转换成 DAG 对 RDD 的操作 Parse ...

  5. Spark SQL 源代码分析系列

    从决定写Spark SQL文章的源代码分析,到现在一个月的时间,一个又一个几乎相同的结束很快,在这里也做了一个综合指数,方便阅读,下面是读取顺序 :) 第一章 Spark SQL源代码分析之核心流程 ...

  6. Spark SQL 的SQL处理引擎分析

    目录 一.补充以Sql Server 为例,SQL语句的执行逻辑顺序 1.From阶段 2.Where 阶段 3.Group By 4.Having 5.Select 6.Order By 7.SQL ...

  7. Spark2.3(三十五)Spark Structured Streaming源代码剖析(从CSDN和Github中看到别人分析的源代码的文章值得收藏)...

    从CSDN中读取到关于spark structured streaming源代码分析不错的几篇文章 spark源码分析--事件总线LiveListenerBus spark事件总线的核心是LiveLi ...

  8. 【Spark】扩展Spark Catalyst,打造自定义的Spark SQL引擎

    1.概述 转载自:扩展Spark Catalyst,打造自定义的Spark SQL引擎 Apache Spark是大数据处理领域最常用的计算引擎之一,被应用在各种各样的场景中,除了易用的API,稳定高 ...

  9. Spark SQL 工作流程源码解析(四)optimization 阶段(基于 Spark 3.3.0)

    前言 本文隶属于专栏<大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见大数据技术体系 目录 Spark SQL 工 ...

最新文章

  1. 解除微信回调的Activity必须在包名.wxapi下的限制
  2. 美团笔试题——公司食堂
  3. ios UI控件-导航(1)
  4. 百度地图开发(安卓)
  5. 怎样将OFD转成PDF并保留电子签章
  6. 和谐Froala editor编辑器
  7. 软件测试基础——功能测试
  8. CATIA二次开发—定制搜索功能
  9. 小白ARM平台移植 USB 蓝牙、交叉编译 bluez-4.95
  10. 连接服务器显示句柄无效,紧急求助!!1  打印机不能打印 提示:句柄无效...
  11. python爬虫兼职群-经典古言小说推荐完本
  12. destoon平台供应或者商城详情页直接链接到会员的商铺上
  13. Python2/3的中、英文字符编码与解码输出: UnicodeDecodeError: 'ascii' codec can't decode/encode...
  14. springboot整合微信支付
  15. @linux安装及使用(压缩|解压)工具RAR
  16. Python OpenCV 3.x 示例:6~11
  17. 微信开发40163 40029错误总结
  18. MOS管与三极管比较及应用
  19. C语言雪花算法,记一次雪花算法的实现
  20. ios开发 静音键设置_IOS 静音开关检测

热门文章

  1. Spring-data-jpa和mybatis的比较及两者的优缺点?
  2. 防止网络请求(或其他回调)引用,从而造成内存泄漏
  3. 2021-01-07 Halcon初学者知识【8】 将图像部分内容截取并保存
  4. linux进程map,linux下unordered_map和map在小数据下性能差异
  5. 浏览器获取设备信息_过滤获取日志和浏览器信息
  6. php fastcgi_param,nginx-404与root指令 fastcgi_param 指令
  7. 17.Spark第1部分
  8. 【数学与算法】曲线上各点的曲率kappa和倾角theta
  9. SpringBoot技术栈搭建个人博客【项目准备】
  10. 使用 Equinox 开发 OSGi 应用程序