Spark SQL Catalyst源代码分析Optimizer
/** Spark SQL源代码分析系列*/
前几篇文章介绍了Spark SQL的Catalyst的核心运行流程、SqlParser,和Analyzer 以及核心类库TreeNode,本文将具体解说Spark SQL的Optimizer的优化思想以及Optimizer在Catalyst里的表现方式,并加上自己的实践。对Optimizer有一个直观的认识。
Optimizer的主要职责是将Analyzer给Resolved的Logical Plan依据不同的优化策略Batch。来对语法树进行优化。优化逻辑计划节点(Logical Plan)以及表达式(Expression),也是转换成物理运行计划的前置。例如以下图:
一、Optimizer
Optimizer这个类是在catalyst里的optimizer包下的唯一一个类。Optimizer的工作方式事实上相似Analyzer,由于它们都继承自RuleExecutor[LogicalPlan],都是运行一系列的Batch操作:
Optimizer里的batches包括了3类优化策略:1、Combine Limits 合并Limits 2、ConstantFolding 常量合并 3、Filter Pushdown 过滤器下推,每一个Batch里定义的优化伴随对象都定义在Optimizer里了:
object Optimizer extends RuleExecutor[LogicalPlan] {val batches =Batch("Combine Limits", FixedPoint(100),CombineLimits) ::Batch("ConstantFolding", FixedPoint(100),NullPropagation,ConstantFolding,BooleanSimplification,SimplifyFilters,SimplifyCasts,SimplifyCaseConversionExpressions) ::Batch("Filter Pushdown", FixedPoint(100),CombineFilters,PushPredicateThroughProject,PushPredicateThroughJoin,ColumnPruning) :: Nil
}
另外提一点,Optimizer里不但对Logical Plan进行了优化,并且对Logical Plan中的Expression也进行了优化,所以有必要了解一下Expression相关类。主要是用到了references和outputSet,references主要是Logical Plan或Expression节点的所依赖的那些Expressions,而outputSet是Logical Plan所有的Attribute的输出:
如:Aggregate是一个Logical Plan, 它的references就是group by的表达式 和 aggreagate的表达式的并集去重。
case class Aggregate(groupingExpressions: Seq[Expression],aggregateExpressions: Seq[NamedExpression],child: LogicalPlan)extends UnaryNode {override def output = aggregateExpressions.map(_.toAttribute)override def references =(groupingExpressions ++ aggregateExpressions).flatMap(_.references).toSet
}
二、优化策略具体解释
2.1、Batch: Combine Limits
/*** Combines two adjacent [[Limit]] operators into one, merging the* expressions into one single expression.*/
object CombineLimits extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case ll @ Limit(le, nl @ Limit(ne, grandChild)) => //ll为当前Limit,le为其expression。 nl是ll的grandChild,ne是nl的expressionLimit(If(LessThan(ne, le), ne, le), grandChild) //expression比較,假设ne比le小则表达式为ne,否则为le}
}
给定SQL:val query = sql("select * from (select * from temp_shengli limit 100)a limit 10 ")
scala> query.queryExecution.analyzed
res12: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Limit 10Project [key#13,value#14]Limit 100Project [key#13,value#14]MetastoreRelation default, temp_shengli, None
子查询里limit100,外层查询limit10,这里我们当然能够在子查询里不必查那么多。由于外层仅仅须要10个,所以这里会合并Limit10。和Limit100 为 Limit 10。
2.2、Batch: ConstantFolding
这个Batch里包括了Rules:NullPropagation,ConstantFolding。BooleanSimplification,SimplifyFilters。SimplifyCasts。SimplifyCaseConversionExpressions。
2.2.1、Rule:NullPropagation
这里先提一下Literal字面量。它事实上是一个能匹配随意基本类型的类。(为下文做铺垫)
object Literal {def apply(v: Any): Literal = v match {case i: Int => Literal(i, IntegerType)case l: Long => Literal(l, LongType)case d: Double => Literal(d, DoubleType)case f: Float => Literal(f, FloatType)case b: Byte => Literal(b, ByteType)case s: Short => Literal(s, ShortType)case s: String => Literal(s, StringType)case b: Boolean => Literal(b, BooleanType)case d: BigDecimal => Literal(d, DecimalType)case t: Timestamp => Literal(t, TimestampType)case a: Array[Byte] => Literal(a, BinaryType)case null => Literal(null, NullType)}
}
注意Literal是一个LeafExpression,核心方法是eval,给定Row。计算表达式返回值:
case class Literal(value: Any, dataType: DataType) extends LeafExpression {override def foldable = truedef nullable = value == nulldef references = Set.emptyoverride def toString = if (value != null) value.toString else "null"type EvaluatedType = Anyoverride def eval(input: Row):Any = value
}
如今来看一下NullPropagation都做了什么。
NullPropagation是一个能将Expression Expressions替换为等价的Literal值的优化。并且能够避免NULL值在SQL语法树的传播。
/*** Replaces [[Expression Expressions]] that can be statically evaluated with* equivalent [[Literal]] values. This rule is more specific with* Null value propagation from bottom to top of the expression tree.*/
object NullPropagation extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case q: LogicalPlan => q transformExpressionsUp {case e @ Count(Literal(null, _)) => Cast(Literal(0L), e.dataType) //假设count(null)则转化为count(0)case e @ Sum(Literal(c, _)) if c == 0 => Cast(Literal(0L), e.dataType)<span style="font-family: Arial;">//假设sum(null)则转化为sum(0)</span>case e @ Average(Literal(c, _)) if c == 0 => Literal(0.0, e.dataType)case e @ IsNull(c) if !c.nullable => Literal(false, BooleanType)case e @ IsNotNull(c) if !c.nullable => Literal(true, BooleanType)case e @ GetItem(Literal(null, _), _) => Literal(null, e.dataType)case e @ GetItem(_, Literal(null, _)) => Literal(null, e.dataType)case e @ GetField(Literal(null, _), _) => Literal(null, e.dataType)case e @ Coalesce(children) => {val newChildren = children.filter(c => c match {case Literal(null, _) => falsecase _ => true})if (newChildren.length == 0) {Literal(null, e.dataType)} else if (newChildren.length == 1) {newChildren(0)} else {Coalesce(newChildren)}}case e @ If(Literal(v, _), trueValue, falseValue) => if (v == true) trueValue else falseValuecase e @ In(Literal(v, _), list) if (list.exists(c => c match {case Literal(candidate, _) if candidate == v => truecase _ => false})) => Literal(true, BooleanType)// Put exceptional cases above if anycase e: BinaryArithmetic => e.children match {case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)case _ => e}case e: BinaryComparison => e.children match {case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)case _ => e}case e: StringRegexExpression => e.children match {case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)case _ => e}}}
}
给定SQL: val query = sql("select count(null) from temp_shengli where key is not null")
scala> query.queryExecution.analyzed
res6: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Aggregate [], [COUNT(null) AS c0#5L] //这里count的是nullFilter IS NOT NULL key#7MetastoreRelation default, temp_shengli, None
调用NullPropagation
scala> NullPropagation(query.queryExecution.analyzed)
res7: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Aggregate [], [CAST(0, LongType) AS c0#5L] //优化后为0了Filter IS NOT NULL key#7MetastoreRelation default, temp_shengli, None
2.2.2、Rule:ConstantFolding
object ConstantFolding extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform { //先对plan进行transformcase q: LogicalPlan => q transformExpressionsDown { //对每一个plan的expression进行transform// Skip redundant folding of literals.case l: Literal => lcase e if e.foldable => Literal(e.eval(null), e.dataType) //调用eval方法计算结果}}}
给定SQL: val query = sql("select 1+2+3+4 from temp_shengli")
scala> query.queryExecution.analyzed
res23: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [(((1 + 2) + 3) + 4) AS c0#21] //这里还是常量表达式MetastoreRelation default, src, None
优化后:
scala> query.queryExecution.optimizedPlan
res24: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [10 AS c0#21] //优化后。直接合并为10MetastoreRelation default, src, None
2.2.3、BooleanSimplification
这个是对布尔表达式的优化,有点像java布尔表达式中的短路推断。只是这个写的倒是非常优雅。
看看布尔表达式2边能不能通过仅仅计算1边,而省去计算还有一边而提高效率,称为简化布尔表达式。
解释请看我写的凝视:
/*** Simplifies boolean expressions where the answer can be determined without evaluating both sides.* Note that this rule can eliminate expressions that might otherwise have been evaluated and thus* is only safe when evaluations of expressions does not result in side effects.*/
object BooleanSimplification extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case q: LogicalPlan => q transformExpressionsUp {case and @ And(left, right) => //假设布尔表达式是AND操作,即exp1 and exp2(left, right) match { //(左边表达式。右边表达式)case (Literal(true, BooleanType), r) => r // 左边true。返回右边的<span style="font-family: Arial;">bool</span><span style="font-family: Arial;">值</span>case (l, Literal(true, BooleanType)) => l //右边true,返回左边的bool值case (Literal(false, BooleanType), _) => Literal(false)//左边都false,右边随便。反正是返回falsecase (_, Literal(false, BooleanType)) => Literal(false)//仅仅要有1边是false了,都是falsecase (_, _) => and}case or @ Or(left, right) =>(left, right) match {case (Literal(true, BooleanType), _) => Literal(true) //仅仅要左边是true了,不用推断右边都是truecase (_, Literal(true, BooleanType)) => Literal(true) //仅仅要有一边是true,都返回truecase (Literal(false, BooleanType), r) => r //希望右边r是truecase (l, Literal(false, BooleanType)) => lcase (_, _) => or}}}
}
2.3 Batch: Filter Pushdown
2.3.1、Combine Filters
/*** Combines two adjacent [[Filter]] operators into one, merging the* conditions into one conjunctive predicate.*/
object CombineFilters extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case ff @ Filter(fc, nf @ Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild)}
}
给定SQL:val query = sql("select key from (select key from temp_shengli where key >100)a where key > 80 ")
优化前:我们看到一个filter 是还有一个filter的grandChild
scala> query.queryExecution.analyzed
res25: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [key#27]Filter (key#27 > 80) //filter>80Project [key#27]Filter (key#27 > 100) //filter>100MetastoreRelation default, src, None
优化后:事实上filter也能够表达为一个复杂的boolean表达式
scala> query.queryExecution.optimizedPlan
res26: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [key#27]Filter ((key#27 > 100) && (key#27 > 80)) //合并为1个MetastoreRelation default, src, None
2.3.2 Filter Pushdown
Filter Pushdown,过滤器下推。
原理就是更早的过滤掉不须要的元素来降低开销。
给定SQL:val query = sql("select key from (select * from temp_shengli)a where key>100")
生成的逻辑计划为:
scala> scala> query.queryExecution.analyzed
res29: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [key#31]Filter (key#31 > 100) //先select key, value,然后再FilterProject [key#31,value#32]MetastoreRelation default, src, None
优化后的计划为:
query.queryExecution.optimizedPlan
res30: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [key#31]Filter (key#31 > 100) //先filter,然后再selectMetastoreRelation default, src, None
2.3.3、ColumnPruning
object ColumnPruning extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {// Eliminate attributes that are not needed to calculate the specified aggregates.case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty => 假设project的outputSet中减去a.references的元素假设不同,那么就将Aggreagte的child替换为a.referencesa.copy(child = Project(a.references.toSeq, child))// Eliminate unneeded attributes from either side of a Join.case Project(projectList, Join(left, right, joinType, condition)) =>// 消除join的left 和 right孩子的不必要属性,将join的左右子树的列进行裁剪// Collect the list of off references required either above or to evaluate the condition.val allReferences: Set[Attribute] =projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)/** Applies a projection only when the child is producing unnecessary attributes */def prunedChild(c: LogicalPlan) =if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {Project(allReferences.filter(c.outputSet.contains).toSeq, c)} else {c}Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))// Combine adjacent Projects.case Project(projectList1, Project(projectList2, child)) => //合并相邻Project的列// Create a map of Aliases to their values from the child projection.// e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)).val aliasMap = projectList2.collect {case a @ Alias(e, _) => (a.toAttribute: Expression, a)}.toMap// Substitute any attributes that are produced by the child projection, so that we safely// eliminate it.// e.g., 'SELECT c + 1 FROM (SELECT a + b AS C ...' produces 'SELECT a + b + 1 ...'// TODO: Fix TransformBase to avoid the cast below.val substitutedProjection = projectList1.map(_.transform {case a if aliasMap.contains(a) => aliasMap(a)}).asInstanceOf[Seq[NamedExpression]]Project(substitutedProjection, child)// Eliminate no-op Projectscase Project(projectList, child) if child.output == projectList => child}
}
res57: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Aggregate [key#51], [(1 + 1) AS shengli#49,key#51]Project [key#51,value#52] //优化前默认select key 和 value两列MetastoreRelation default, temp_shengli, None
优化后:
scala> ColumnPruning1(query.queryExecution.analyzed)
MetastoreRelation default, temp_shengli, None
res59: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Aggregate [key#51], [(1 + 1) AS shengli#49,key#51]Project [key#51] //优化后。列裁剪掉了value,仅仅select keyMetastoreRelation default, temp_shengli, None
2、在join操作中,左右孩子能够做列裁剪
给定SQL:val query = sql("select a.value qween from (select * from temp_shengli) a join (select * from temp_shengli)b on a.key =b.key ")
没有优化之前:
scala> query.queryExecution.analyzed
res51: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [value#42 AS qween#39]Join Inner, Some((key#41 = key#43))Project [key#41,value#42] //这里多select了一列,即valueMetastoreRelation default, temp_shengli, NoneProject [key#43,value#44] //这里多select了一列。即valueMetastoreRelation default, temp_shengli, None
优化后:(ColumnPruning2是我自己调试用的)
scala> ColumnPruning2(query.queryExecution.analyzed)
allReferences is -> Set(key#35, key#37)
MetastoreRelation default, temp_shengli, None
MetastoreRelation default, temp_shengli, None
res47: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [key#35 AS qween#33]Join Inner, Some((key#35 = key#37))Project [key#35] //经过列裁剪之后,left Child仅仅须要select key这一个列MetastoreRelation default, temp_shengli, NoneProject [key#37] //经过列裁剪之后。right Child仅仅须要select key这一个列MetastoreRelation default, temp_shengli, None
3、合并相邻的Project的列,裁剪
给定SQL:val query = sql("SELECT c + 1 FROM (SELECT 1 + 1 as c from temp_shengli ) a ")
优化前:
scala> query.queryExecution.analyzed
res61: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [(c#56 + 1) AS c0#57]Project [(1 + 1) AS c#56]MetastoreRelation default, temp_shengli, None
优化后:
scala> query.queryExecution.optimizedPlan
res62: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [(2 AS c#56 + 1) AS c0#57] //将子查询里的c 代入到 外层select里的c,直接计算结果MetastoreRelation default, temp_shengli, None
三、总结:
本文介绍了Optimizer在Catalyst里的作用即将Analyzed Logical Plan 经过对Logical Plan和Expression进行Rule的应用transfrom,从而达到树的节点进行合并和优化。当中基本的优化的策略总结起来是合并、列裁剪、过滤器下推几大类。
Catalyst应该在不断迭代中,本文仅仅是基于spark1.0.0进行研究。兴许假设新增加的优化策略也会在兴许补充进来。
欢迎大家讨论。共同进步!
——EOF——
原创文章,转载请注明:
转载自:OopsOutOfMemory盛利的Blog。作者: OopsOutOfMemory
本文链接地址:http://blog.csdn.net/oopsoom/article/details/38121259
注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,可是请保留本文作者署名和文章链接。如若须要用于商业目的或者与授权方面的协商,请联系我。
Spark SQL Catalyst源代码分析Optimizer相关推荐
- Spark SQL Catalyst源代码分析之TreeNode Library
/** Spark SQL源代码分析系列文章*/ 前几篇文章介绍了Spark SQL的Catalyst的核心执行流程.SqlParser,和Analyzer,本来打算直接写Optimizer的,可是发 ...
- 递归下降语法分析器的构建_一文了解函数式查询优化器Spark SQL Catalyst
大数据技术与架构点击右侧关注,大数据开发领域最强公众号! 暴走大数据点击右侧关注,暴走大数据!记录一下个人对sparkSql的catalyst这个函数式的可扩展的查询优化器的理解,目录如下: 0. O ...
- Spark SQL 处理流程分析 (一)
下面的代码演示了通过Case Class进行表Schema定义的例子: // sc is an existing SparkContext. val sqlContext = new org.apac ...
- Spark SQL / Catalyst 内部原理 与 RBO
Spark SQL 架构 Spark SQL 的整体架构如下图所示 从上图可见,无论是直接使用 SQL 语句还是使用 DataFrame,都会经过如下步骤转换成 DAG 对 RDD 的操作 Parse ...
- Spark SQL 源代码分析系列
从决定写Spark SQL文章的源代码分析,到现在一个月的时间,一个又一个几乎相同的结束很快,在这里也做了一个综合指数,方便阅读,下面是读取顺序 :) 第一章 Spark SQL源代码分析之核心流程 ...
- Spark SQL 的SQL处理引擎分析
目录 一.补充以Sql Server 为例,SQL语句的执行逻辑顺序 1.From阶段 2.Where 阶段 3.Group By 4.Having 5.Select 6.Order By 7.SQL ...
- Spark2.3(三十五)Spark Structured Streaming源代码剖析(从CSDN和Github中看到别人分析的源代码的文章值得收藏)...
从CSDN中读取到关于spark structured streaming源代码分析不错的几篇文章 spark源码分析--事件总线LiveListenerBus spark事件总线的核心是LiveLi ...
- 【Spark】扩展Spark Catalyst,打造自定义的Spark SQL引擎
1.概述 转载自:扩展Spark Catalyst,打造自定义的Spark SQL引擎 Apache Spark是大数据处理领域最常用的计算引擎之一,被应用在各种各样的场景中,除了易用的API,稳定高 ...
- Spark SQL 工作流程源码解析(四)optimization 阶段(基于 Spark 3.3.0)
前言 本文隶属于专栏<大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见大数据技术体系 目录 Spark SQL 工 ...
最新文章
- 解除微信回调的Activity必须在包名.wxapi下的限制
- 美团笔试题——公司食堂
- ios UI控件-导航(1)
- 百度地图开发(安卓)
- 怎样将OFD转成PDF并保留电子签章
- 和谐Froala editor编辑器
- 软件测试基础——功能测试
- CATIA二次开发—定制搜索功能
- 小白ARM平台移植 USB 蓝牙、交叉编译 bluez-4.95
- 连接服务器显示句柄无效,紧急求助!!1 打印机不能打印 提示:句柄无效...
- python爬虫兼职群-经典古言小说推荐完本
- destoon平台供应或者商城详情页直接链接到会员的商铺上
- Python2/3的中、英文字符编码与解码输出: UnicodeDecodeError: 'ascii' codec can't decode/encode...
- springboot整合微信支付
- @linux安装及使用(压缩|解压)工具RAR
- Python OpenCV 3.x 示例:6~11
- 微信开发40163 40029错误总结
- MOS管与三极管比较及应用
- C语言雪花算法,记一次雪花算法的实现
- ios开发 静音键设置_IOS 静音开关检测
热门文章
- Spring-data-jpa和mybatis的比较及两者的优缺点?
- 防止网络请求(或其他回调)引用,从而造成内存泄漏
- 2021-01-07 Halcon初学者知识【8】 将图像部分内容截取并保存
- linux进程map,linux下unordered_map和map在小数据下性能差异
- 浏览器获取设备信息_过滤获取日志和浏览器信息
- php fastcgi_param,nginx-404与root指令 fastcgi_param 指令
- 17.Spark第1部分
- 【数学与算法】曲线上各点的曲率kappa和倾角theta
- SpringBoot技术栈搭建个人博客【项目准备】
- 使用 Equinox 开发 OSGi 应用程序