今天为组内同学做了题为「From Calcite to Tampering with Flink SQL」的分享,将Markdown版讲义贴在下面。

本次分享信息量极大,涵盖Calcite基础、Blink Planner执行原理、优化器与优化规则等。之后会择重点专门写文章二次讲解。


From Calcite to Tampering with Flink SQL

August 26th, 2021

For NiceTuan Real-Time Team


Prerequisites

  • Basic understanding of

    • Flink DataStream runtime (3-layered DAGs, stream partition, etc.)
    • Database system concepts
    • SQL queries
    • Scala language, just in case

(Review) Some Relational Algebra

  • Textbook - Database System Concepts 6th Edition [Abraham Silberschatz et al. 2011]

  • But Wikipedia is fairly enough

    • Relational algebra is a theory that uses algebraic structures with a well-founded semantics for modeling data, and defining queries on it
    • The theory was introduced by Edgar F. Codd
  • Projection (Π)

  • Selection (σ)
  • Rename (ρ)
  • Natural join (⋈) & Equi-join
  • Left outer join (⟕)
  • Right outer join (⟖)

Calcite In A Nutshell

What is it

  • As you already knew, "Flink does not reinvent the wheel, but leverages Apache Calcite to deal with most SQL-related works"

  • Apache Calcite is a foundational software framework that provides query processing, optimization, and query language support to many popular open-source data processing systems such as Apache Hive, Apache Storm, Apache Flink, Druid, and MapD

Architecture

  • From Apache Calcite: A Foundational Framework for Optimized Query Processing Over Heterogeneous Data Sources [Edmon Begoli et al. SIGMOD 2018]

Fundamental Concepts

  • Catalog - A metadata store & handler for schema, tables, etc.

  • SqlNode - A parsed SQL tree (i.e. AST)

    • SqlLiteral - Constant value (1, FALSE, ...)
    • SqlIdentifier - Identifier
    • SqlCall - Call to functions, operators, etc.
    • SqlSelect / SqlJoin / SqlOrderBy / ...
  • RelNode - A relational (algebraic) expression

    • LogicalTableScan
    • LogicalProject
    • LogicalFilter
    • LogicalCalc
    • ...
  • RexNode - A (typed) row-level expression

    • RexLiteral
    • RexVariable
    • RexCall
    • ...
  • RelTrait & RelTraitDef - A set of physical properties & their definitions carried by a relational expression

    • Convention - Working scope, mainly a single data source
    • RelCollation - Ordering method of data (and sort keys)
    • RelDistribution - Distribution method of data
  • RelOptPlanner - A query optimizer, which transforms a relational expression into a semantically equivalent relational expression, according to a given set of rules and a cost model

    • HepPlanner - RBO, greedy, heuristic
    • VolcanoPlanner - CBO, dynamic programming, Volcano-flavored
  • RelOptRule - A (usually empirical) rule which defines the transformation routine for RBO

    • RelOptRuleOperand - Used by the rule to determine the section of RelNodes to be optimized
    • RuleSet - Self-explanatory
  • RelOptCost - An interface for optimizer cost in terms of number of rows processed, CPU cost, and I/O cost

  • RelMetadataProvider - An interface for obtaining metadata about relational expressions to support optimization process

    • Min / max row count
    • Data size
    • Expression lineage
    • Distinctness / uniqueness
    • ...
  • RelOptCluster - The environment during the optimization of a query

Process Flow


A Quick Calcite Show

Prepare Schema and SQL

SchemaPlus rootSchema = Frameworks.createRootSchema(true);rootSchema.add("student", new AbstractTable() {@Override public RelDataType getRowType(RelDataTypeFactory typeFactory) {RelDataTypeFactory.Builder builder = new Builder(DEFAULT_TYPE_FACTORY);builder.add("id", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.BIGINT));builder.add("name", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.VARCHAR));builder.add("class", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.VARCHAR));builder.add("age", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.INTEGER));return builder.build();}
});rootSchema.add("exam_result", new AbstractTable() {@Override public RelDataType getRowType(RelDataTypeFactory typeFactory) {RelDataTypeFactory.Builder builder = new Builder(DEFAULT_TYPE_FACTORY);builder.add("student_id", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.BIGINT));builder.add("score1", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.FLOAT));builder.add("score2", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.FLOAT));return builder.build();}
});String sql = /* language=SQL */"SELECT a.id, a.name, SUM(b.score1 * 0.7 + b.score2 * 0.3) AS total_score " +"FROM student a " +"INNER JOIN exam_result b ON a.id = b.student_id " +"WHERE a.age < 20 AND b.score1 > 60.0 " +"GROUP BY a.id, a.name";

Parsing

FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder().parserConfig(SqlParser.config().withCaseSensitive(false).withLex(Lex.MYSQL_ANSI)).defaultSchema(rootSchema).build();SqlParser parser = SqlParser.create(sql);
SqlNode originalSqlNode = parser.parseStmt();System.out.println(originalSqlNode.toString());
--- Original SqlNode ---
SELECT `A`.`ID`, `A`.`NAME`, SUM(`B`.`SCORE1` * 0.7 + `B`.`SCORE2` * 0.3) AS `TOTAL_SCORE`
FROM `STUDENT` AS `A`
INNER JOIN `EXAM_RESULT` AS `B` ON `A`.`ID` = `B`.`STUDENT_ID`
WHERE `A`.`AGE` < 20 AND `B`.`SCORE1` > 60.0
GROUP BY `A`.`ID`, `A`.`NAME`

Validation

Properties cxnConfig = new Properties();
cxnConfig.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(),String.valueOf(frameworkConfig.getParserConfig().caseSensitive()));CalciteCatalogReader catalogReader = new CalciteCatalogReader(CalciteSchema.from(rootSchema),CalciteSchema.from(frameworkConfig.getDefaultSchema()).path(null),DEFAULT_TYPE_FACTORY,new CalciteConnectionConfigImpl(cxnConfig)
);SqlValidator validator = new SqlValidatorImpl1(frameworkConfig.getOperatorTable(),catalogReader,DEFAULT_TYPE_FACTORY
);SqlNode validatedSqlNode = validator.validate(originalSqlNode);System.out.println(validatedSqlNode.toString());
--- Validated SqlNode ---
SELECT `A`.`ID`, `A`.`NAME`, SUM(`B`.`SCORE1` * 0.7 + `B`.`SCORE2` * 0.3) AS `TOTAL_SCORE`
FROM `STUDENT` AS `A`
INNER JOIN `EXAM_RESULT` AS `B` ON `A`.`id` = `B`.`student_id`
WHERE `A`.`age` < 20 AND `B`.`score1` > 60.0
GROUP BY `A`.`id`, `A`.`name`

Planning

RelOptCluster relOptCluster = RelOptCluster.create(new VolcanoPlanner(), new RexBuilder(DEFAULT_TYPE_FACTORY));SqlToRelConverter relConverter = new SqlToRelConverter(null,validator,catalogReader,relOptCluster,frameworkConfig.getConvertletTable()
);RelRoot relRoot = relConverter.convertQuery(validatedSqlNode, false, true);
RelNode originalRelNode = relRoot.rel;System.out.println(RelOptUtil.toString(originalRelNode));
--- Original RelNode ---
LogicalProject(ID=[$0], NAME=[$1], TOTAL_SCORE=[$2])LogicalAggregate(group=[{0, 1}], TOTAL_SCORE=[SUM($2)])LogicalProject(id=[$0], name=[$1], $f2=[+(*($5, 0.7:DECIMAL(2, 1)), *($6, 0.3:DECIMAL(2, 1)))])LogicalFilter(condition=[AND(<($3, 20), >($5, 60.0:DECIMAL(3, 1)))])LogicalJoin(condition=[=($0, $4)], joinType=[inner])LogicalTableScan(table=[[student]])LogicalTableScan(table=[[exam_result]])

Optimization

  • Predicate (filter) pushdown past join into table scan using HepPlanner and FILTER_INTO_JOIN rule

σR.aθa' ^ S.bθb' (R ⋈ S) = (σR.aθa' R) ⋈ (σS.bθb' S)

  • HepProgram defines the order of rules to be attempted
HepProgram hepProgram = new HepProgramBuilder().addRuleInstance(CoreRules.FILTER_INTO_JOIN).addMatchOrder(HepMatchOrder.BOTTOM_UP).build();HepPlanner hepPlanner = new HepPlanner(hepProgram);
hepPlanner.setRoot(originalRelNode);
RelNode optimizedRelNode = hepPlanner.findBestExp();System.out.println(RelOptUtil.toString(optimizedRelNode));
--- Optimized RelNode ---
LogicalProject(ID=[$0], NAME=[$1], TOTAL_SCORE=[$2])LogicalAggregate(group=[{0, 1}], TOTAL_SCORE=[SUM($2)])LogicalProject(id=[$0], name=[$1], $f2=[+(*($5, 0.7:DECIMAL(2, 1)), *($6, 0.3:DECIMAL(2, 1)))])LogicalJoin(condition=[=($0, $4)], joinType=[inner])LogicalFilter(condition=[<($3, 20)])LogicalTableScan(table=[[student]])LogicalFilter(condition=[>($1, 60.0:DECIMAL(3, 1))])LogicalTableScan(table=[[exam_result]])
  • Rules can do a lot more...

Dive Into Blink Stream Planner

Overview

  • Parsing & validation
  • Logical planning
  • All-over optimization w/ physical planning
  • Execution planning & codegen (only a brief today)

SQL for Example

  • Will not cover sophisticated things (e.g. sub-queries, aggregate functions, window TVFs) for now
  • Just an ordinary streaming ETL process, which will be optimized later
INSERT INTO expdb.print_joined_result
SELECT FROM_UNIXTIME(a.ts / 1000, 'yyyy-MM-dd HH:mm:ss') AS tss, a.userId, a.eventType, a.siteId, b.site_name AS siteName
FROM expdb.kafka_analytics_access_log_app
/*+ OPTIONS('scan.startup.mode'='latest-offset','properties.group.id'='DiveIntoBlinkExp') */ a
LEFT JOIN rtdw_dim.mysql_site_war_zone_mapping_relation
FOR SYSTEM_TIME AS OF a.procTime AS b ON CAST(a.siteId AS INT) = b.site_id
WHERE a.userId > 3 + 4;

Parsing & Validation

  • Build the flink-sql-parser module, and you'll get the exact parser for Flink SQL dialect
  • Call stack
// parse
parse:54, CalciteParser (org.apache.flink.table.planner.parse)
parse:96, ParserImpl (org.apache.flink.table.planner.delegation)
executeSql:722, TableEnvironmentImpl (org.apache.flink.table.api.internal)// validation
-- goes to org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator#validate()
org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate:150, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
validate:108, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
convert:201, SqlToOperationConverter (org.apache.flink.table.planner.operations)
parse:99, ParserImpl (org.apache.flink.table.planner.delegation)
executeSql:722, TableEnvironmentImpl (org.apache.flink.table.api.internal)
  • SqlNode tree

    • Note that FOR SYSTEM_TIME AS OF syntax is translated to a SqlSnapshot node

Logical Planning

  • Call stack

    • Obviously, these are a bunch of recursive processes
-- goes to Calcite SqlToRelConverter
org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:168, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
rel:160, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
toQueryOperation:967, SqlToOperationConverter (org.apache.flink.table.planner.operations)
convertSqlQuery:936, SqlToOperationConverter (org.apache.flink.table.planner.operations)
convert:275, SqlToOperationConverter (org.apache.flink.table.planner.operations)
convertSqlInsert:595, SqlToOperationConverter (org.apache.flink.table.planner.operations)
convert:268, SqlToOperationConverter (org.apache.flink.table.planner.operations)
parse:99, ParserImpl (org.apache.flink.table.planner.delegation)
executeSql:722, TableEnvironmentImpl (org.apache.flink.table.api.internal)
  • Logical planning in Flink SQL yields a tree of Operations (e.g. ModifyOperation, QueryOperation)

    • Just wrappers of RelNodes
  • RelNode tree

    • SqlJoinLogicalCorrelate (in Calcite this means nested-loop join)
    • SqlSnapshotLogicalSnapshot
    • etc.
  • Output of EXPLAIN statement
-- In fact this is the original logical plan
== Abstract Syntax Tree ==
LogicalSink(table=[hive.expdb.print_joined_result], fields=[tss, userId, eventType, siteId, siteName])
+- LogicalProject(tss=[FROM_UNIXTIME(/($0, 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss')], userId=[$1], eventType=[$2], siteId=[$6], siteName=[$10])+- LogicalFilter(condition=[>($1, +(3, 4))])+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{6, 8}]):- LogicalProject(ts=[$0], userId=[$1], eventType=[$2], columnType=[$3], fromType=[$4], grouponId=[$5], siteId=[$6], merchandiseId=[$7], procTime=[PROCTIME()]):  +- LogicalTableScan(table=[[hive, expdb, kafka_analytics_access_log_app]], hints=[[[OPTIONS inheritPath:[] options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]])+- LogicalFilter(condition=[=(CAST($cor0.siteId):INTEGER, $0)])+- LogicalSnapshot(period=[$cor0.procTime])+- LogicalTableScan(table=[[hive, rtdw_dim, mysql_site_war_zone_mapping_relation]])

All-Over Optimization w/ Physical Planning

  • Call stack

    • CommonSubGraphBasedOptimizer is a Flink-implemented optimizer that divides logical plan into sub-graphs by SinkBlocks, and reuses common sub-graphs whenever available
    • For most scenarios, the logical plan is merely a single tree (optimizeTree)
-- goes to org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram#optimize()
optimizeTree:163, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
doOptimize:79, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
optimize:284, PlannerBase (org.apache.flink.table.planner.delegation)
translate:168, PlannerBase (org.apache.flink.table.planner.delegation)
translate:1516, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeInternal:738, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeInternal:854, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeSql:728, TableEnvironmentImpl (org.apache.flink.table.api.internal)
  • FlinkChainedProgram breaks down to several FlinkHepPrograms (resemble to HepProgram), which defines the order of rules to be attempted with HepPlanner

    • This time a lot more rules of course
    • Flink SQL handles entire physical planning process with RelOptRules, along with logical/physical optimization
  • All RuleSets are presented in FlinkStreamRuleSets, some of them are shipped natively with Calcite

  • FlinkStreamProgram actually build up the program sequence

    • The names are quite straightforward though
    • At the end of LOGICAL, specialized ConverterRules will convert Calcite RelNode into FlinkLogicalRel
      • e.g. LogicalCalcFlinkLogicalCalcConverterFlinkLogicalCalc
      • i.e. Converted the convention to FLINK_LOGICAL
      • Logical optimization phase is somewhat hard to observe
  • The optimized StreamPhysicalRel tree

    • Physical planning rules are almost all ConverterRules

      • FlinkLogicalRelStreamPhysicalRel, convention FLINK_LOGICALSTREAM_PHYSICAL
      • e.g. FlinkLogicalCalcStreamPhysicalCalcRuleStreamPhysicalCalc
    • HepRelVertex is the wrapper of RelNode in HepPlanner
  • Output of EXPLAIN statement
== Optimized Physical Plan ==
Sink(table=[hive.expdb.print_joined_result], fields=[tss, userId, eventType, siteId, siteName])
+- Calc(select=[FROM_UNIXTIME(/(ts, 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS tss, userId, eventType, siteId, site_name AS siteName])+- LookupJoin(table=[hive.rtdw_dim.mysql_site_war_zone_mapping_relation], joinType=[LeftOuterJoin], async=[false], lookup=[site_id=siteId0], select=[ts, userId, eventType, siteId, siteId0, site_id, site_name])+- Calc(select=[ts, userId, eventType, siteId, CAST(siteId) AS siteId0], where=[>(userId, 7)])+- TableSourceScan(table=[[hive, expdb, kafka_analytics_access_log_app]], fields=[ts, userId, eventType, columnType, fromType, grouponId, siteId, merchandiseId], hints=[[[OPTIONS options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]])
  • Pick two rules for some explanation

  • TEMPORAL_JOIN_REWRITE - LogicalCorrelateToJoinFromLookupTableRuleWithFilter

This rule matches

+- LogicalCorrelate:- [RelNode related to stream table]+- LogicalFilter(condition)+- LogicalSnapshot(time_attr)+- [RelNode related to temporal table]

and transforms into

+- LogicalJoin(condition):- [RelNode related to stream table]+- LogicalSnapshot(time_attr)+- [RelNode related to temporal table]
  • PHYSICAL - StreamPhysicalLookupJoinRule - SnapshotOnTableScanRule

This rule matches

+- FlinkLogicalJoin(condition):- [RelNode related to stream table]+- FlinkLogicalSnapshot(time_attr)+- FlinkLogicalTableSourceScan [w/ LookupTableSource]

and transforms into StreamPhysicalLookupJoin

Execution Planning & Codegen

  • Call stack
-- goes to separate FlinkPhysicalRel#translateToExecNode()
generate:74, ExecNodeGraphGenerator (org.apache.flink.table.planner.plan.nodes.exec)
generate:54, ExecNodeGraphGenerator (org.apache.flink.table.planner.plan.nodes.exec)
translateToExecNodeGraph:312, PlannerBase (org.apache.flink.table.planner.delegation)
translate:164, PlannerBase (org.apache.flink.table.planner.delegation)
translate:1518, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeInternal:740, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeInternal:856, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeSql:730, TableEnvironmentImpl (org.apache.flink.table.api.internal)-- goes to separate ExecNodeBase#translateToPlan() & StreamExecNode#translateToPlanInternal()
translateToPlan:70, StreamPlanner (org.apache.flink.table.planner.delegation)
translate:165, PlannerBase (org.apache.flink.table.planner.delegation)
translate:1518, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeInternal:740, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeInternal:856, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeSql:730, TableEnvironmentImpl (org.apache.flink.table.api.internal)
  • The ExecNodeGraph DAG

    • JSON representation of this DAG can be acquired or executed by tableEnv.asInstanceOf[TableEnvironmentInternal].getJsonPlan(sql) / executeJsonPlan(plan)
  • Output of EXPLAIN statement
== Optimized Execution Plan ==
Sink(table=[hive.expdb.print_joined_result], fields=[tss, userId, eventType, siteId, siteName])
+- Calc(select=[FROM_UNIXTIME((ts / 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS tss, userId, eventType, siteId, site_name AS siteName])+- LookupJoin(table=[hive.rtdw_dim.mysql_site_war_zone_mapping_relation], joinType=[LeftOuterJoin], async=[false], lookup=[site_id=siteId0], select=[ts, userId, eventType, siteId, siteId0, site_id, site_name])+- Calc(select=[ts, userId, eventType, siteId, CAST(siteId) AS siteId0], where=[(userId > 7)])+- TableSourceScan(table=[[hive, expdb, kafka_analytics_access_log_app]], fields=[ts, userId, eventType, columnType, fromType, grouponId, siteId, merchandiseId], hints=[[[OPTIONS options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]])
  • StreamExecNodeTransformation → Generated DataStream Operator / Function code

    • e.g. StreamExecCalcOneInputStreamTransformationOneInputStreamOperator / FlatMapFunction
  • Generated code will be dynamically compiled into Java class files through Janino

    • You can view all generated code by setting debug output of CompileUtils
    • Too long, refer to https://pastebin.com/NCMSxh5h
  • We'll leave detailed explanation of this part for the next lecture


Get Our Hands Dirty

Question

  • Are there any hidden trouble in the simple example program shown above?
  • Try focus on the LookupJoin and consider its cache locality

    • In extreme conditions, a lookup-ed KV can be re-cached N times

Define An Option

  • Distributing lookup keys (according to hash) to sub-tasks seems better

  • In ExecutionConfigOptions...

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<Boolean> TABLE_EXEC_LOOKUP_DISTRIBUTE_BY_KEY =key("table.exec.lookup.distribute-by-key").defaultValue(false).withDescription("Specifies whether to distribute lookups to sub-tasks by hash value of lookup key.");

Customize A Rule

  • When to apply this rule? --- After physical planning

  • What should we do? --- Insert a hash-by-key operation before StreamPhysicalLookupJoin

    • FlinkRelDistribution will do the work
    • Physical redistribution means StreamPhysicalExchange node
  • Note that there are 5 kinds of RelTrait in Flink SQL

class HashDistributedLookupJoinRule extends RelOptRule(operand(classOf[StreamPhysicalLookupJoin], any()),"HashDistributedLookupJoinRule") {override def matches(call: RelOptRuleCall): Boolean = {val tableConfig = call.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfigtableConfig.getConfiguration.getBoolean(ExecutionConfigOptions.TABLE_EXEC_LOOKUP_DISTRIBUTE_BY_KEY)}override def onMatch(call: RelOptRuleCall): Unit = {val originalLookupJoin: StreamPhysicalLookupJoin = call.rel(0)val joinInfo = originalLookupJoin.joinInfoval traitSet = originalLookupJoin.getTraitSetval requiredDistribution = FlinkRelDistribution.hash(joinInfo.leftKeys)val hashDistributedTraitSet = traitSet.replace(requiredDistribution).replace(FlinkConventions.STREAM_PHYSICAL).replace(RelCollations.EMPTY).replace(traitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)).replace(traitSet.getTrait(UpdateKindTraitDef.INSTANCE))val hashDistributedInput = new StreamPhysicalExchange(originalLookupJoin.getCluster,hashDistributedTraitSet,originalLookupJoin,requiredDistribution)call.transformTo(originalLookupJoin.copy(originalLookupJoin.getTraitSet, util.Arrays.asList(hashDistributedInput)))}
}object HashDistributedLookupJoinRule {val INSTANCE: RelOptRule = new HashDistributedLookupJoinRule
}
  • There's a helper method FlinkExpandConversionRule#satisfyDistribution() (also used in two-stage aggregation), how lucky
val hashDistributedInput = FlinkExpandConversionRule.satisfyDistribution(FlinkConventions.STREAM_PHYSICAL,originalLookupJoin.getInput,requiredDistribution
)

Put Into Rule Set

  • At the tail of FlinkStreamRuleSets
val PHYSICAL_REWRITE: RuleSet = RuleSets.ofList(// hash distributed lookup join ruleHashDistributedLookupJoinRule.INSTANCE,// optimize agg ruleTwoStageOptimizedAggregateRule.INSTANCE,// incremental agg ruleIncrementalAggregateRule.INSTANCE,// optimize window agg ruleTwoStageOptimizedWindowAggregateRule.INSTANCE
)

Have A Try

  • Rebuild flink-table-api-java & flink-table-planner-blink module
  • SET table.exec.lookup.distribute-by-key=true
== Optimized Physical Plan ==
Sink(table=[hive.expdb.print_joined_result], fields=[tss, userId, eventType, siteId, siteName])
+- Calc(select=[FROM_UNIXTIME(/(ts, 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS tss, userId, eventType, siteId, site_name AS siteName])+- LookupJoin(table=[hive.rtdw_dim.mysql_site_war_zone_mapping_relation], joinType=[LeftOuterJoin], async=[false], lookup=[site_id=siteId0], select=[ts, userId, eventType, siteId, siteId0, site_id, site_name])+- Exchange(distribution=[hash[siteId0]])+- Calc(select=[ts, userId, eventType, siteId, CAST(siteId) AS siteId0], where=[>(userId, 7)])+- TableSourceScan(table=[[hive, expdb, kafka_analytics_access_log_app]], fields=[ts, userId, eventType, columnType, fromType, grouponId, siteId, merchandiseId], hints=[[[OPTIONS options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]])== Optimized Execution Plan ==
Sink(table=[hive.expdb.print_joined_result], fields=[tss, userId, eventType, siteId, siteName])
+- Calc(select=[FROM_UNIXTIME((ts / 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS tss, userId, eventType, siteId, site_name AS siteName])+- LookupJoin(table=[hive.rtdw_dim.mysql_site_war_zone_mapping_relation], joinType=[LeftOuterJoin], async=[false], lookup=[site_id=siteId0], select=[ts, userId, eventType, siteId, siteId0, site_id, site_name])+- Exchange(distribution=[hash[siteId0]])+- Calc(select=[ts, userId, eventType, siteId, CAST(siteId) AS siteId0], where=[(userId > 7)])+- TableSourceScan(table=[[hive, expdb, kafka_analytics_access_log_app]], fields=[ts, userId, eventType, columnType, fromType, grouponId, siteId, merchandiseId], hints=[[[OPTIONS options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]])

The End


http://www.taodudu.cc/news/show-2383273.html

相关文章:

  • 图像取证:源识别和伪造检测(Image Forensics: source identification and tampering detection) 续2
  • 图像取证:源识别和伪造检测(Image Forensics: source identification and tampering detection)
  • 【论文笔记】Image Tampering Localization Using a Dense Fully Convolutional Network
  • Geometric tampering estimation by means of a sift-based forensic analysis论文阅读
  • WebGoat Client side -- HTML tampering
  • Image Tampering Detection via Semantic Segmentation Network
  • 论文记录-2018-A survey on image tampering and its detection in real-world photos
  • Appscan漏洞之Authentication Bypass Using HTTP Verb Tampering
  • 网络攻击术语(Technical terms of the attacks)
  • Operation-wise Attention Network for Tampering Localization Fusion
  • 国庆高质量出行,可视化开启智慧旅游
  • Vuecli 城市三级联动的使用
  • 旅游新纪元紫山缭绕,静思生活
  • 【剑拔峨眉 团队裂变】蜜拓蜜教育高端特训营第二期即将上线
  • 用大数据调控旅游市场
  • 2018最新版省市区三级联动数据
  • layui 省市区 三级联动 单选
  • 清明去哪玩儿? 可视化工具帮你锁定旅游TOP10!
  • HTML旅游景点网页作业制作——旅游中国11个页面(HTML+CSS+JavaScript)
  • 旅游50大陷阱
  • 保乐力加在四川峨眉山兴建中国首家麦芽威士忌酒厂
  • 关于城市旅游的HTML网页设计 HTML+CSS+JS学生旅游网页设计与实现
  • 基于layui 下拉多选 三级联动省市区demo
  • 成都-峨眉山(乐山)旅游攻略
  • 成都及周边景点5日游
  • 峨眉山徒步休闲三日游攻略内附详细时间
  • 坑人的青旅乐山峨眉两日游
  • 记录峨眉山两日游(附带徒步登山攻略)
  • 峨眉山三日游攻略:带一个伤心的女纸,去峨眉寻一点清净
  • 五一成都峨眉山三日游 20140501~20140503

From Calcite to Tampering with Flink SQL相关推荐

  1. Flink最锋利的武器:Flink SQL入门和实战 | 附完整实现代码

    作者 | 机智的王知无 转载自大数据技术与架构(ID: import_bigdata) 一.Flink SQL 背景 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门 ...

  2. 使用flink Table Sql api来构建批量和流式应用(3)Flink Sql 使用

    从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...

  3. flink sql planner到底是干嘛用的

    先说应用: 写代码时有必要管planner吗? 答案是一般情况下不需要理会. 引用官方文档中的一个图 可以理解为planner就是管理source和sink的 -------------------- ...

  4. 【FLink】Flink SQL代码生成与UDF重复调用的优化

    1.概述 转载:Flink SQL代码生成与UDF重复调用的优化 2. 代码生成简介 代码生成(code generation)是当今各种数据库和数据处理引擎广泛采用的物理执行层技术之一.通过代码生成 ...

  5. 【Flink】Flink 源码阅读笔记(15)- Flink SQL 整体执行框架

    1.概述 转载:Flink 源码阅读笔记(15)- Flink SQL 整体执行框架 在数据处理领域,无论是实时数据处理还是离线数据处理,使用 SQL 简化开发将会是未来的整体发展趋势.尽管 SQL ...

  6. 最佳实践|如何写出简单高效的 Flink SQL?

    摘要:本文整理自阿里巴巴高级技术专家.Apache Flink PMC 贺小令,在Flink Forward Asia 2022 生产实践专场的分享.本篇内容主要分为三个部分: Flink SQL I ...

  7. Flink SQL 流计算可视化 UI 平台

    点击上方 "zhisheng"关注, 星标或置顶一起成长 Flink 从入门到精通 系列文章 一.简介 flink-streaming-platform-web系统是基于flink ...

  8. flink sql 知其所以然(八):flink sql tumble window 的奇妙解析之路

    感谢您的小爱心(关注  +  点赞 + 再看),对博主的肯定,会督促博主持续的输出更多的优质实战内容!!! 1.序篇-本文结构 大数据羊说 用数据提升美好事物发生的概率~ 34篇原创内容 公众号 源码 ...

  9. 从Flink SQL doesn't support consuming update and delete changes 错误谈起

    前言 相信我们在初学Flink SQL时,多少遇到过像这样的错误信息: org.apache.flink.table.api.TableException: X[算子名] doesn't suppor ...

  10. Flink SQL JSON Format 源码解析

    用 Flink SQL 解析 JSON 格式的数据是非常简单的,只需要在 DDL 语句中设置 Format 为 json 即可,像下面这样: CREATE TABLE kafka_source (fu ...

最新文章

  1. 高效开发:IntelliJ IDEA天天用,这些Debug技巧你都知道?
  2. Redis——由分布式锁造成的重大事故
  3. HUE配置文件hue.ini 的zookeeper模块详解(图文详解)(分HA集群)
  4. 【深度学习】同款商品识别的克星--ArcFace!
  5. Android微信界面的设计
  6. js读取本地excel文档数据
  7. mysql 5.7 udf http_mysql下mysql-udf-http效率测试小记
  8. 流程控制语句(bash)
  9. Android-Universal-Image-Loader学习笔记(4)--download
  10. apple pay充游戏后退款_iOS退款内幕
  11. linux读写磁盘文件寿命,linux下TF卡测试寿命的测试程序编写
  12. Windows下使用platform.pk8 和platform.x509.pem生成Android studio签名文件
  13. Python代码加密 - 4种方案
  14. 查看CAD图纸时怎么将文字隐藏
  15. vue实现九宫格打地鼠小游戏
  16. mysql 修改隔离级别_设置mysql隔离级别
  17. 重磅 | Hinton、LeCun、Bengio联合署名深度学习综述,《Nature》纪念人工智能60周年专题...
  18. 官方问答--微信小程序常见FAQ (17.8.21-17.8.27)
  19. python如何输入字母_python怎么输出单词的字母
  20. 为什么放弃Vim而选择Neovim?

热门文章

  1. 实现微信小程序的分享转发功能(可以从分享页返回小程序首页)
  2. MPMLink 中 通过 零件号+MBOM版本,查询正确的EBOM版本(相当于对等部件功能)
  3. AT91SAM9260EK总是出现有RomBOOT提示但是却无法连接SAM-BA的方法
  4. 推荐好用的两个搜索下载书籍网站
  5. Scratch小游戏《恐龙》
  6. java继承和接口的区别_java中的接口与继承的区别
  7. python随机加减乘除_python实现随机加减法生成器
  8. 计算机flops测试,谁知道哪个软件可以测试CPU是多少GFLOPS?
  9. 找一下不男不女 ?。。?
  10. matlab图像取反_matlab图像处理