Spark SQL Catalyst源代码分析之TreeNode Library
/** Spark SQL源代码分析系列文章*/
前几篇文章介绍了Spark SQL的Catalyst的核心执行流程、SqlParser,和Analyzer,本来打算直接写Optimizer的,可是发现忘记介绍TreeNode这个Catalyst的核心概念,介绍这个能够更好的理解Optimizer是怎样对Analyzed Logical Plan进行优化的生成Optimized Logical Plan,本文就将TreeNode基本架构进行解释。
一、TreeNode类型
TreeNode Library是Catalyst的核心类库,语法树的构建都是由一个个TreeNode组成。TreeNode本身是一个BaseType <: TreeNode[BaseType] 的类型,而且实现了Product这个trait,这样能够存放异构的元素了。
TreeNode有三种形态:BinaryNode、UnaryNode、Leaf Node.
在Catalyst里,这些Node都是继承自Logical Plan,能够说每个TreeNode节点就是一个Logical Plan(包括Expression)(直接继承自TreeNode)
主要继承关系类图例如以下:
1、BinaryNode
二元节点,即有左右孩子的二叉节点
[[TreeNode]] that has two children, [[left]] and [[right]].
trait BinaryNode[BaseType <: TreeNode[BaseType]] {def left: BaseTypedef right: BaseTypedef children = Seq(left, right)
}
abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan] {self: Product =>
}
节点定义比較简单,左孩子,右孩子都是BaseType。 children是一个Seq(left, right)
以下列出主要继承二元节点的类,能够当查询手冊用 :)
这里提示下寻经常常使用的二元节点:Join和Union
2、UnaryNode
一元节点,即仅仅有一个孩子节点
A [[TreeNode]] with a single [[child]].
trait UnaryNode[BaseType <: TreeNode[BaseType]] {def child: BaseTypedef children = child :: Nil
}
abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] {self: Product =>
}
以下列出主要继承一元节点的类,能够当查询手冊用 :)
经常使用的二元节点有,Project,Subquery,Filter,Limit ...等
3、Leaf Node
叶子节点,没有孩子节点的节点。
A [[TreeNode]] with no children.
trait LeafNode[BaseType <: TreeNode[BaseType]] {def children = Nil
}
abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {self: Product =>// Leaf nodes by definition cannot reference any input attributes.override def references = Set.empty
}
以下列出主要继承叶子节点的类,能够当查询手冊用 :)
提示经常使用的叶子节点: Command类系列,一些Funtion函数,以及Unresolved Relation...etc.
二、TreeNode 核心方法
currentId
一颗树里的TreeNode有个唯一的id,类型是java.util.concurrent.atomic.AtomicLong原子类型。
private val currentId = new java.util.concurrent.atomic.AtomicLongprotected def nextId() = currentId.getAndIncrement()
sameInstance
推断2个实例是否是同一个的时候,仅仅须要推断TreeNode的id。
def sameInstance(other: TreeNode[_]): Boolean = {this.id == other.id}
fastEquals,更经常使用的一个快捷的判定方法,没有重写Object.Equals,这样防止scala编译器生成case class equals 方法
def fastEquals(other: TreeNode[_]): Boolean = {sameInstance(other) || this == other}
map,flatMap,collect都是递归的对子节点进行应用PartialFunction,其他方法还有非常多,篇幅有限这里不一一描写叙述了。
2.1、核心方法 transform 方法
transform该方法接受一个PartialFunction,就是就是前一篇文章Analyzer里提到的Batch里面的Rule。
是会将Rule迭代应用到该节点的全部子节点,最后返回这个节点的副本(一个和当前节点不同的节点,后面会介绍,事实上就是利用反射来返回一个改动后的节点)。
假设rule没有对一个节点进行PartialFunction的操作,就返回这个节点本身。
来看一个样例:
object GlobalAggregates extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform { //apply方法这里调用了logical plan(TreeNode) 的transform方法来应用一个PartialFunction。case Project(projectList, child) if containsAggregates(projectList) =>Aggregate(Nil, projectList, child)}def containsAggregates(exprs: Seq[Expression]): Boolean = {exprs.foreach(_.foreach {case agg: AggregateExpression => return truecase _ =>})false}}
这种方法真正的调用是transformChildrenDown,这里提到了用先序遍历来对子节点进行递归的Rule应用。
假设在对当前节点应用rule成功,改动后的节点afterRule,来对其children节点进行rule的应用。
transformDown方法:
/*** Returns a copy of this node where `rule` has been recursively applied to it and all of its* children (pre-order). When `rule` does not apply to a given node it is left unchanged.* @param rule the function used to transform this nodes children*/def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {val afterRule = rule.applyOrElse(this, identity[BaseType])// Check if unchanged and then possibly return old copy to avoid gc churn.if (this fastEquals afterRule) {transformChildrenDown(rule) //改动前节点this.transformChildrenDown(rule)} else {afterRule.transformChildrenDown(rule) //改动后节点进行transformChildrenDown}}
最重要的方法transformChildrenDown:
对children节点进行递归的调用PartialFunction,利用终于返回的newArgs来生成一个新的节点,这里调用了makeCopy()来生成节点。
transformChildrenDown方法:
/*** Returns a copy of this node where `rule` has been recursively applied to all the children of* this node. When `rule` does not apply to a given node it is left unchanged.* @param rule the function used to transform this nodes children*/def transformChildrenDown(rule: PartialFunction[BaseType, BaseType]): this.type = {var changed = falseval newArgs = productIterator.map {case arg: TreeNode[_] if children contains arg =>val newChild = arg.asInstanceOf[BaseType].transformDown(rule) //递归子节点应用ruleif (!(newChild fastEquals arg)) {changed = truenewChild} else {arg}case Some(arg: TreeNode[_]) if children contains arg =>val newChild = arg.asInstanceOf[BaseType].transformDown(rule)if (!(newChild fastEquals arg)) {changed = trueSome(newChild)} else {Some(arg)}case m: Map[_,_] => mcase args: Traversable[_] => args.map {case arg: TreeNode[_] if children contains arg =>val newChild = arg.asInstanceOf[BaseType].transformDown(rule)if (!(newChild fastEquals arg)) {changed = truenewChild} else {arg}case other => other}case nonChild: AnyRef => nonChildcase null => null}.toArrayif (changed) makeCopy(newArgs) else this //依据作用结果返回的newArgs数组,反射生成新的节点副本。}
makeCopy方法,反射生成节点副本
/*** Creates a copy of this type of tree node after a transformation.* Must be overridden by child classes that have constructor arguments* that are not present in the productIterator.* @param newArgs the new product arguments.*/def makeCopy(newArgs: Array[AnyRef]): this.type = attachTree(this, "makeCopy") {try {val defaultCtor = getClass.getConstructors.head //反射获取默认构造函数的第一个if (otherCopyArgs.isEmpty) {defaultCtor.newInstance(newArgs: _*).asInstanceOf[this.type] //反射生成当前节点类型的节点} else {defaultCtor.newInstance((newArgs ++ otherCopyArgs).toArray: _*).asInstanceOf[this.type] //假设还有其他參数,++}} catch {case e: java.lang.IllegalArgumentException =>throw new TreeNodeException(this, s"Failed to copy node. Is otherCopyArgs specified correctly for $nodeName? "+ s"Exception message: ${e.getMessage}.")}}
三、TreeNode实例
<span style="font-size:12px;">sbt/sbt hive/console
Using /usr/java/default as default JAVA_HOME.
Note, this will be overridden by -java-home if it is set.
[info] Loading project definition from /app/hadoop/shengli/spark/project/project
[info] Loading project definition from /app/hadoop/shengli/spark/project
[info] Set current project to root (in build file:/app/hadoop/shengli/spark/)
[info] Starting scala interpreter...
[info]
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.dsl._
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.parquet.ParquetTestDatascala> val query = sql("SELECT * FROM (SELECT * FROM src) a join (select * from src)b on a.key=b.key")</span>
3.1、UnResolve Logical Plan
scala> query.queryExecution.logical
res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [*]Join Inner, Some(('a.key = 'b.key))Subquery aProject [*]UnresolvedRelation None, src, NoneSubquery bProject [*]UnresolvedRelation None, src, None
假设画成树是这种,仅个人理解:
![](/assets/blank.gif)
3.2、Analyzed Logical Plan
![](/assets/blank.gif)
3.3、Optimized Plan
scala> query.queryExecution.optimizedPlan
res3: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [key#0,value#1,key#2,value#3]Join Inner, Some((key#0 = key#2))MetastoreRelation default, src, NoneMetastoreRelation default, src, None
3.4、executedPlan
scala> query.queryExecution.executedPlan
res4: org.apache.spark.sql.execution.SparkPlan =
Project [key#0:0,value#1:1,key#2:2,value#3:3]HashJoin [key#0], [key#2], BuildRightExchange (HashPartitioning [key#0:0], 150)HiveTableScan [key#0,value#1], (MetastoreRelation default, src, None), NoneExchange (HashPartitioning [key#2:0], 150)HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None
生成的物理运行树如图:
![](/assets/blank.gif)
四、总结:
TreeNode的transform方法是核心的方法,它接受一个rule,会对当前节点的孩子节点进行递归的调用rule,最后会返回一个TreeNode的copy,这样的操作就是transformation,贯穿了Spark SQL运行的几个核心阶段,如Analyze,Optimize阶段。
最后用一个实际的样例,展示出来Spark SQL的运行树生成流程。
我眼下的理解就是这些,假设分析不到位的地方,请大家多多指正。
Spark SQL Catalyst源代码分析之TreeNode Library相关推荐
- Spark SQL Catalyst源代码分析Optimizer
/** Spark SQL源代码分析系列*/ 前几篇文章介绍了Spark SQL的Catalyst的核心运行流程.SqlParser,和Analyzer 以及核心类库TreeNode,本文将具体解说S ...
- 递归下降语法分析器的构建_一文了解函数式查询优化器Spark SQL Catalyst
大数据技术与架构点击右侧关注,大数据开发领域最强公众号! 暴走大数据点击右侧关注,暴走大数据!记录一下个人对sparkSql的catalyst这个函数式的可扩展的查询优化器的理解,目录如下: 0. O ...
- Spark SQL 处理流程分析 (一)
下面的代码演示了通过Case Class进行表Schema定义的例子: // sc is an existing SparkContext. val sqlContext = new org.apac ...
- Spark SQL / Catalyst 内部原理 与 RBO
Spark SQL 架构 Spark SQL 的整体架构如下图所示 从上图可见,无论是直接使用 SQL 语句还是使用 DataFrame,都会经过如下步骤转换成 DAG 对 RDD 的操作 Parse ...
- Spark SQL 源代码分析系列
从决定写Spark SQL文章的源代码分析,到现在一个月的时间,一个又一个几乎相同的结束很快,在这里也做了一个综合指数,方便阅读,下面是读取顺序 :) 第一章 Spark SQL源代码分析之核心流程 ...
- spark sql运行时候出现cannot resolve '`bid`' given input columns: [bid, name, iphone];
测试代码如下: object ReadFileTest {def main(args: Array[String]): Unit = {//创建spark环境val spark: SparkSessi ...
- Spark2.3(三十五)Spark Structured Streaming源代码剖析(从CSDN和Github中看到别人分析的源代码的文章值得收藏)...
从CSDN中读取到关于spark structured streaming源代码分析不错的几篇文章 spark源码分析--事件总线LiveListenerBus spark事件总线的核心是LiveLi ...
- Spark SQL 的SQL处理引擎分析
目录 一.补充以Sql Server 为例,SQL语句的执行逻辑顺序 1.From阶段 2.Where 阶段 3.Group By 4.Having 5.Select 6.Order By 7.SQL ...
- 【Spark】扩展Spark Catalyst,打造自定义的Spark SQL引擎
1.概述 转载自:扩展Spark Catalyst,打造自定义的Spark SQL引擎 Apache Spark是大数据处理领域最常用的计算引擎之一,被应用在各种各样的场景中,除了易用的API,稳定高 ...
最新文章
- [软考]信息系统项目管理师考试大纲
- npm安装教程 集成npm webpack vue-cli
- Laravel Breadcrumbs 自动面包屑导航
- LeetCode 121:买卖股票的最佳时机 思考分析
- ofstream 向文件写数据
- python 嵌入式界面_运用Python和PyQT开发嵌入式ARM的界面
- iPhone唯一标识符
- LogServer日志详解
- 【适合程序员的代码笔记软件】Quiver 3.2.6 for Mac
- 基于C#木门归方程序 下料机 锯片 CNC 拆单软件 全屋定制设计拆单软件橱柜衣柜拆单设计制造管理一体化软件 有屋软件
- 核磁共振成像读片指南(一)
- 从投入产出简析直复营销
- k8s 一套代码部署两个实例
- 如何创新地解决光缆运维痛点?
- vs2019 installer Projects 【打包程序 下载地址】
- 蓝牙Sig Mesh 概念入门⑤——Mesh通信消息格式详解
- s5pv210开发与学习:1.3之SD卡学习
- 聊聊 Ubuntu 18.04 静态IP设置
- 分享腾讯官方二维码生成接口地址
- DP4301无线433M收发芯片智能家居芯片