文章目录

  • 背景
  • 规则
    • UnionPullUpConstantsRule
      • 做了什么?
      • 怎么做的?
    • AggregateProjectPullUpConstantsRule
      • 做了什么?
      • 怎么做的?

背景

之前设计的tranform存在一个bug,可以通过将limit上提来解决,所以学习一下Calcite上提的规则。顺便也为之后的公共表达式消除打下基础。

规则

UnionPullUpConstantsRule

做了什么?

从名字不难看出是将union的常量上提,例如下面的SQL有两个常量2

select 2, deptno, job from emp as e1
union all
select 2, deptno, job from emp as e2

优化前的plan如下

LogicalUnion(all=[true])LogicalProject(EXPR$0=[2], DEPTNO=[$7], JOB=[$2])LogicalTableScan(table=[[CATALOG, SALES, EMP]])LogicalProject(EXPR$0=[2], DEPTNO=[$7], JOB=[$2])LogicalTableScan(table=[[CATALOG, SALES, EMP]])

优化后的plan如下,查询常量2的操作在union之后

虽然这个case不足以看出优化的效果,但是将union改为union distinct后就能发现该优化可以解决distinct的计算量(结合其他filter下推的情况该优化效果更明显)

LogicalProject(EXPR$0=[2], DEPTNO=[$0], JOB=[$1])LogicalUnion(all=[true])LogicalProject(DEPTNO=[$7], JOB=[$2])LogicalTableScan(table=[[CATALOG, SALES, EMP]])LogicalProject(DEPTNO=[$7], JOB=[$2])LogicalTableScan(table=[[CATALOG, SALES, EMP]])

从上面的例子可以看出,这个规则修改了两个LogicalProject并且在LogicalUnion上添加了一个LogicalProject,规则对应的代码应该不会很简单

怎么做的?

同多数规则一样,是在onMatch方法进行处理的

  1. 检验常量
    首先提取了常量判断是否需要优化
final Union union = call.rel(0);
// 行表达式的builder,其实可以放在下面,这段代码并不会用到
final RexBuilder rexBuilder = union.getCluster().getRexBuilder();
// Metadata记录着一些query的信息
final RelMetadataQuery mq = call.getMetadataQuery();
// 通过getPulledUpPredicates拿到谓词
final RelOptPredicateList predicates = mq.getPulledUpPredicates(union);
// 谓词为空就无需优化了
if (predicates == null) {return;
}
// 谓词不为空的时候,再从中提取常量
final Map<Integer, RexNode> constants = new HashMap<>();
for (Map.Entry<RexNode, RexNode> e : predicates.constantMap.entrySet()) {if (e.getKey() instanceof RexInputRef) {constants.put(((RexInputRef) e.getKey()).getIndex(), e.getValue());}
}
// 常量也无需优化,直接返回
// None of the expressions are constant. Nothing to do.
if (constants.isEmpty()) {return;
}
  1. 构建顶部LogicalProject的表达式
    再往下走就开始真正的优化了,刚刚创建的RexBuilder会在下面被用上
 // Create expressions for Project operators before and after the UnionList<RelDataTypeField> fields = union.getRowType().getFieldList();//union的字段List<RexNode> topChildExprs = new ArrayList<>(); // 表达式或者常量List<String> topChildExprsFields = new ArrayList<>(); //表达式对应的字段名List<RexNode> refs = new ArrayList<>(); // 指向表达式的引用// 引用的builderImmutableBitSet.Builder refsIndexBuilder = ImmutableBitSet.builder();for (RelDataTypeField field : fields) {final RexNode constant = constants.get(field.getIndex());// 常量直接放入即可if (constant != null) {topChildExprs.add(constant);topChildExprsFields.add(field.getName());} else {// 非常量的话要构建应用final RexNode expr = rexBuilder.makeInputRef(union, field.getIndex());topChildExprs.add(expr);topChildExprsFields.add(field.getName());refs.add(expr);refsIndexBuilder.set(field.getIndex());}}
ImmutableBitSet refsIndex = refsIndexBuilder.build();
// Update top Project positions
final Mappings.TargetMapping mapping =RelOptUtil.permutation(refs, union.getInput(0).getRowType()).inverse();
topChildExprs = ImmutableList.copyOf(RexUtil.apply(mapping, topChildExprs));

整个这一步就是用fileds构建好顶部Project的表达式

图为testPullConstantThroughUnion这个单测


3. 构建新的plan
之前提到了要更改Union前的两个LogicalProject,并增加一个Union后的LogicalProject即是在此

// Create new Project-Union-Project sequences
final RelBuilder relBuilder = call.builder();
for (RelNode input : union.getInputs()) {List<Pair<RexNode, String>> newChildExprs = new ArrayList<>();for (int j : refsIndex) {newChildExprs.add(Pair.of(rexBuilder.makeInputRef(input, j),input.getRowType().getFieldList().get(j).getName()));}if (newChildExprs.isEmpty()) {// At least a single item in project is required.newChildExprs.add(Pair.of(topChildExprs.get(0), topChildExprsFields.get(0)));}// Add the input with project on toprelBuilder.push(input);relBuilder.project(Pair.left(newChildExprs), Pair.right(newChildExprs));
}
relBuilder.union(union.all, union.getInputs().size());
// Create top Project fixing nullability of fields
relBuilder.project(topChildExprs, topChildExprsFields);
relBuilder.convert(union.getRowType(), false);
  1. 收尾
    所有的RelOptRule都是通过transformTo提交relBuilder的结果
    call.transformTo(relBuilder.build());

AggregateProjectPullUpConstantsRule

做了什么?

看名字就知道和aggregate相关,也有比较简单的单测,如下

select job, empno, sal, sum(sal) as s
from emp where empno = 10
group by job, empno, sal

优化前,聚合的时候还要聚合EMPNO的数据

LogicalAggregate(group=[{0, 1, 2}], S=[SUM($2)])LogicalProject(JOB=[$2], EMPNO=[$0], SAL=[$5])LogicalFilter(condition=[=($0, 10)])LogicalTableScan(table=[[CATALOG, SALES, EMP]])

优化后,聚合的时候就不用聚合EMPNO的数据,节约了一部分计算量

LogicalProject(JOB=[$0], EMPNO=[10], SAL=[$1], S=[$2])LogicalAggregate(group=[{0, 1}], S=[SUM($1)])LogicalProject(JOB=[$2], SAL=[$5])LogicalFilter(condition=[=($0, 10)])LogicalTableScan(table=[[CATALOG, SALES, EMP]])

和UnionPullUpConstantsRule一样,改了LogicalAggregate前的LogicalProject,并增加了一个LogicalAggregate后的LogicalProject,只是多改了一个LogicalAggregate

怎么做的?

  1. 检验常量
    和UnionPullUpConstantsRule非常相似,没有常量就直接return不用优化了
 final Aggregate aggregate = call.rel(0);final RelNode input = call.rel(1);final int groupCount = aggregate.getGroupCount();if (groupCount == 1) {// No room for optimization since we cannot convert from non-empty// GROUP BY list to the empty one.return;}final RexBuilder rexBuilder = aggregate.getCluster().getRexBuilder();final RelMetadataQuery mq = call.getMetadataQuery();final RelOptPredicateList predicates =mq.getPulledUpPredicates(aggregate.getInput());if (predicates == null) {return;}final NavigableMap<Integer, RexNode> map = new TreeMap<>();for (int key : aggregate.getGroupSet()) {final RexInputRef ref =rexBuilder.makeInputRef(aggregate.getInput(), key);if (predicates.constantMap.containsKey(ref)) {map.put(key, predicates.constantMap.get(ref));}}// None of the group expressions are constant. Nothing to do.if (map.isEmpty()) {return;}
  1. 构建Aggregate
    因为聚合的数据少了,所以也需要调整一下LogicalAggregate的key
if (groupCount == map.size()) {// At least a single item in group by is required.// Otherwise "GROUP BY 1, 2" might be altered to "GROUP BY ()".// Removing of the first element is not optimal here,// however it will allow us to use fast path below (just trim// groupCount).map.remove(map.navigableKeySet().first());
}ImmutableBitSet newGroupSet = aggregate.getGroupSet();
for (int key : map.keySet()) {newGroupSet = newGroupSet.clear(key);
}
final int newGroupCount = newGroupSet.cardinality();// If the constants are on the trailing edge of the group list, we just
// reduce the group count.
final RelBuilder relBuilder = call.builder();
relBuilder.push(input);// Clone aggregate calls.
final List<AggregateCall> newAggCalls = new ArrayList<>();
for (AggregateCall aggCall : aggregate.getAggCallList()) {newAggCalls.add(aggCall.adaptTo(input, aggCall.getArgList(), aggCall.filterArg,groupCount, newGroupCount));
}
relBuilder.aggregate(relBuilder.groupKey(newGroupSet), newAggCalls);

注意relBuilder.aggregate()的时候也对aggregate前的project进行了裁剪

  1. 构建Project
    主要是收集顶部project需要的表达式,然后构建
// Create a projection back again.
List<Pair<RexNode, String>> projects = new ArrayList<>();
int source = 0;
for (RelDataTypeField field : aggregate.getRowType().getFieldList()) {RexNode expr;final int i = field.getIndex();if (i >= groupCount) {// Aggregate expressions' names and positions are unchanged.expr = relBuilder.field(i - map.size());} else {int pos = aggregate.getGroupSet().nth(i);if (map.containsKey(pos)) {// Re-generate the constant expression in the project.RelDataType originalType =aggregate.getRowType().getFieldList().get(projects.size()).getType();if (!originalType.equals(map.get(pos).getType())) {expr = rexBuilder.makeCast(originalType, map.get(pos), true);} else {expr = map.get(pos);}} else {// Project the aggregation expression, in its original// position.expr = relBuilder.field(source);++source;}}projects.add(Pair.of(expr, field.getName()));
}
relBuilder.project(Pair.left(projects), Pair.right(projects)); // inverse
  1. 收尾
    call.transformTo(relBuilder.build());提交优化后的plan

Calcite的PullUp规则相关推荐

  1. Apache Calcite官方文档中文版-概览-1.背景

    第一部分 概览 1. 背景   Apache Calcite是一个动态数据管理框架.它包含了许多组成典型数据管理系统的经典模块,但省略了一些关键性的功能: 数据存储,数据处理算法和元数据存储库.    ...

  2. Apache Calcite 论文翻译

    Apache Calcite 论文原稿: https://arxiv.org/pdf/1802.10233.pdf 文章目录 Apache Calcite 论文 1.简介 1.1. 引言 2.相关工作 ...

  3. Apache Calcite官方文档中文版- 概览-2. 教程

    第一部分 概览 2. 教程   本章针对Calcite的连接建立提供了循序渐进的教程,使用一个简单的适配器来将一个CSV文件目录以包含Schema信息的tables形式呈现,并提供了一个完全SQL接口 ...

  4. Flink 流批一体的实践与探索

    自 Google Dataflow 模型被提出以来,流批一体就成为分布式计算引擎最为主流的发展趋势.流批一体意味着计算引擎同时具备流计算的低延迟和批计算的高吞吐高稳定性,提供统一编程接口开发两种场景的 ...

  5. Flink SQL在B站的实践

    本期作者 FlinkSql团队 B站实时平台flinksql团队,负责flink引擎sql功能的研发,支持的业务包括实时计算,流批一体以及数据湖等 01 FlinkSql在B站 目前在B站,线上大概有 ...

  6. Calcite优化规则之ProjectAggregateMergeRule

    文章目录 前言 分析 命中条件 单元测试 testProjectAggregateMergeNoOp testProjectAggregateMergeSum0 testProjectAggregat ...

  7. calcite 启发式优化器(HepPlanner)原理与自定义优化规则实现

    文章目录 HepPlanner 思考

  8. sql 忽略大小写_Flink使用Calcite解析Sql做维表关联(一)

    点击箭头处"蓝色字",关注我们哦!! 维表关联是离线计算或者实时计算里面常见的一种处理逻辑,常常用于字段补齐.规则过滤等,一般情况下维表数据放在MySql等数据库里面,对于离线计算 ...

  9. calcite mysql_Apache顶级项目 Calcite使用介绍

    什么是Calcite Apache Calcite是一个动态数据管理框架,它具备很多典型数据库管理系统的功能,比如SQL解析.SQL校验.SQL查询优化.SQL生成以及数据连接查询等,但是又省略了一些 ...

最新文章

  1. RocketMQ命令整理
  2. XML篇---可配置化的取值方式[便于维护]
  3. vue实现移动端圆形旋钮插件
  4. Netflix Curator 使用 Zookeeper 编程
  5. 报名 | 网易MCtalk: 5G+AI新时代 探索音视频技术创新与实践
  6. qt文件逐行读取_qt读取txt文件并绘图 qt逐行读取txt文件
  7. 【PHP】xampp配置多个监听端口和不同的网站目录(转)
  8. Linux中的.bash_ 文件的详细说明
  9. gitlab搭建之互备模式
  10. 为SSD加速 实战4KB对齐技巧3/3
  11. html css手册,CSS 参考手册
  12. OLAP 系统选型,选 Apache Kylin 还是 ClickHouse?
  13. 第八届2005汕头国际音响大展
  14. 查看搜狗浏览器插件的本地安装位置
  15. 面试中问什么问题最能让面试官记忆犹新?
  16. 主叫号码信息识别及传送
  17. Android颜色转化
  18. AutoCAD常用快捷键
  19. android字库,android L 上切换字库方法
  20. 个人所得税计算器程序c语言,基于C语言实现的个人所得税纳税系统.docx

热门文章

  1. imx6ull中断体系之GICV2
  2. Elasticsearch索引新增字段
  3. 【LINUX】Linux之Ubuntu系统安装搜狗输入法
  4. 杭电oj 1426 Sudoku Killer(深度优先搜索 回溯)
  5. 【 云原生 | kubernetes 】- tekton构建CI/CD流水线(二)
  6. 《Android开发卷——设置圆形头像,Android截取圆形图片》
  7. 正则校验必须由数字 字母 和 特殊符号组成的正则
  8. 网页版视频怎么加快播放速度
  9. Python入门实例——测试代码
  10. exception的使用