从Flink SQL doesn't support consuming update and delete changes 错误谈起
前言
相信我们在初学Flink SQL时,多少遇到过像这样的错误信息:
org.apache.flink.table.api.TableException:
X[算子名] doesn't support consuming update and delete changes which is produced by node Y[算子名]
为什么有些下游算子不能接受上游算子发来的UPDATE和DELETE消息呢?本文以1.13版本为准来简单地捋一下。
回顾ChangelogMode
笔者之前写过一篇自定义Flink SQL Connector的简明教程,其中提到在定义DynamicTableSink
(以及ScanTableSource
)的时候,都需要覆写getChangelogMode()
方法,告诉Planner这个Connector可以接受或产生的数据变化类型。变化的标记由四种RowKind
表示,即INSERT(+I)、UPDATE_BEFORE(-U)、UPDATE_AFTER(+U)和DELETE(-D):
/** Insertion operation. */INSERT("+I", (byte) 0),/*** Update operation with the previous content of the updated row.** <p>This kind SHOULD occur together with {@link #UPDATE_AFTER} for modelling an update that* needs to retract the previous row first. It is useful in cases of a non-idempotent update,* i.e., an update of a row that is not uniquely identifiable by a key.*/UPDATE_BEFORE("-U", (byte) 1),/*** Update operation with new content of the updated row.** <p>This kind CAN occur together with {@link #UPDATE_BEFORE} for modelling an update that* needs to retract the previous row first. OR it describes an idempotent update, i.e., an* update of a row that is uniquely identifiable by a key.*/UPDATE_AFTER("+U", (byte) 2),/** Deletion operation. */DELETE("-D", (byte) 3);
但是,ChangelogMode
只能作用于Connector,中间算子(其实也包含Source/Sink)产生和接受哪些类型的变化在Planner内部是如何定义的?下面了解一下相关的两种RelTrait
s。
ModifyKindSet / UpdateKind Trait
复习一下之前讲过的RelTrait
和RelTraitDef
的含义:
A set of physical properties & their definitions carried by a relational expression.
站在RelNode
的角度上讲,ChangelogMode
确实可以作为附加在其上的物理属性。Blink Planner的物理计划层使用了两个RelTrait
来承载数据变化的语义。第一个是ModifyKindSetTrait
,表示INSERT(I)、UPDATE(U)和DELETE(D)三者组成的集合,部分代码如下,比较容易理解:
object ModifyKindSetTrait {/*** An empty [[ModifyKindSetTrait]] which doesn't contain any [[ModifyKind]].*/val EMPTY = new ModifyKindSetTrait(ModifyKindSet.newBuilder().build())/*** Insert-only [[ModifyKindSetTrait]].*/val INSERT_ONLY = new ModifyKindSetTrait(ModifyKindSet.INSERT_ONLY)/*** A modify [[ModifyKindSetTrait]] that contains all change operations.*/val ALL_CHANGES = new ModifyKindSetTrait(ModifyKindSet.ALL_CHANGES)/*** Creates an instance of [[ModifyKindSetTrait]] from th given [[ChangelogMode]].*/def fromChangelogMode(changelogMode: ChangelogMode): ModifyKindSetTrait = {val builder = ModifyKindSet.newBuilderchangelogMode.getContainedKinds.foreach {case RowKind.INSERT => builder.addContainedKind(ModifyKind.INSERT)case RowKind.DELETE => builder.addContainedKind(ModifyKind.DELETE)case _ => builder.addContainedKind(ModifyKind.UPDATE) // otherwise updates}new ModifyKindSetTrait(builder.build)}
}
第二个则是UpdateKindTrait
,表示UPDATE_BEFORE(-U)和UPDATE_AFTER(+U),也不难。注意它除了可以从ChangelogMode
转换而来,还可以从ModifyKindSet
转换而来:
object UpdateKindTrait {/*** An [[UpdateKindTrait]] that describes the node doesn't provide any kind of updates* as a provided trait, or requires nothing about kind of updates as a required trait.** <p>It also indicates that the [[ModifyKindSetTrait]] of current node doesn't contain* [[ModifyKind#UPDATE]] operation.*/val NONE = new UpdateKindTrait(UpdateKind.NONE)/*** An [[UpdateKindTrait]] that describes the node produces update changes just as a* single row of [[org.apache.flink.types.RowKind#UPDATE_AFTER]]*/val ONLY_UPDATE_AFTER = new UpdateKindTrait(UpdateKind.ONLY_UPDATE_AFTER)/*** An [[UpdateKindTrait]] that describes the node produces update changes consists of* a row of [[org.apache.flink.types.RowKind#UPDATE_BEFORE]] and* [[org.apache.flink.types.RowKind#UPDATE_AFTER]].*/val BEFORE_AND_AFTER = new UpdateKindTrait(UpdateKind.BEFORE_AND_AFTER)/*** Returns ONLY_UPDATE_AFTER [[UpdateKindTrait]] if there is update changes.* Otherwise, returns NONE [[UpdateKindTrait]].*/def onlyAfterOrNone(modifyKindSet: ModifyKindSet): UpdateKindTrait = {val updateKind = if (modifyKindSet.contains(ModifyKind.UPDATE)) {UpdateKind.ONLY_UPDATE_AFTER} else {UpdateKind.NONE}new UpdateKindTrait(updateKind)}/*** Returns BEFORE_AND_AFTER [[UpdateKindTrait]] if there is update changes.* Otherwise, returns NONE [[UpdateKindTrait]].*/def beforeAfterOrNone(modifyKindSet: ModifyKindSet): UpdateKindTrait = {val updateKind = if (modifyKindSet.contains(ModifyKind.UPDATE)) {UpdateKind.BEFORE_AND_AFTER} else {UpdateKind.NONE}new UpdateKindTrait(updateKind)}/*** Creates an instance of [[UpdateKindTrait]] from the given [[ChangelogMode]].*/def fromChangelogMode(changelogMode: ChangelogMode): UpdateKindTrait = {val hasUpdateBefore = changelogMode.contains(RowKind.UPDATE_BEFORE)val hasUpdateAfter = changelogMode.contains(RowKind.UPDATE_AFTER)(hasUpdateBefore, hasUpdateAfter) match {case (true, true) => BEFORE_AND_AFTERcase (false, true) => ONLY_UPDATE_AFTERcase (true, false) =>throw new IllegalArgumentException("Unsupported changelog mode: " +ChangelogPlanUtils.stringifyChangelogMode(Some(changelogMode)))case (false, false) => NONE}}
}
RelTrait相容性
补充一个之前的Calcite入门讲义里略去的点,就是RelTrait
的核心方法:
boolean satisfies(RelTrait trait);
它用于判断此RelTrait
与另外一个RelTrait
的相容性,亦即T1是否满足T2的约束。显然,如果T1与T2相同,或者T1比T2更严格,那么此方法返回true
,否则返回false
。举个栗子,对于RelCollation
而言,(ORDER BY a, b) satisfies (ORDER BY a)
就是成立的,反过来则不成立。
ModifyKindSetTrait#satisfies()
方法的定义如下,注释写得很清楚,即T1是T2的子集:
override def satisfies(relTrait: RelTrait): Boolean = relTrait match {case other: ModifyKindSetTrait =>// it’s satisfied when modify kinds are included in the required set,// e.g. [I,U] satisfy [I,U,D]// [I,U,D] not satisfy [I,D]this.modifyKindSet.getContainedKinds.forall(other.modifyKindSet.contains)case _ => false}
UpdateKindTrait#satisfies()
则要求两者完全相同:
override def satisfies(relTrait: RelTrait): Boolean = relTrait match {case other: UpdateKindTrait =>// should totally matchother.updateKind == this.updateKindcase _ => false}
接下来就可以进入Blink Planner的相关逻辑了。
物理计划阶段的ChangelogMode推断
Blink Planner通过名为FlinkChangelogModeInferenceProgram
的优化程序来为每个StreamPhysicalRel
推断出ChangelogMode
信息,并检查产生的ModifyKindSetTrait
和UpdateKindTrait
的上下游相容性。主要的逻辑分为两步:
// step1: satisfy ModifyKindSet traitval physicalRoot = root.asInstanceOf[StreamPhysicalRel]val rootWithModifyKindSet = SATISFY_MODIFY_KIND_SET_TRAIT_VISITOR.visit(physicalRoot,// we do not propagate the ModifyKindSet requirement and requester among blocks// set default ModifyKindSet requirement and requester for rootModifyKindSetTrait.ALL_CHANGES,"ROOT")// step2: satisfy UpdateKind traitval rootModifyKindSet = getModifyKindSet(rootWithModifyKindSet)// use the required UpdateKindTrait from parent blocksval requiredUpdateKindTraits = if (rootModifyKindSet.contains(ModifyKind.UPDATE)) {if (context.isUpdateBeforeRequired) {Seq(UpdateKindTrait.BEFORE_AND_AFTER)} else {// update_before is not required, and input contains updates// try ONLY_UPDATE_AFTER first, and then BEFORE_AND_AFTERSeq(UpdateKindTrait.ONLY_UPDATE_AFTER, UpdateKindTrait.BEFORE_AND_AFTER)}} else {// there is no updatesSeq(UpdateKindTrait.NONE)}
可见是通过两个特殊定义的Visitor(参见访问者模式)对物理计划树进行遍历与转换。以SatisfyModifyKindSetTraitVisitor
为例,它的visit()
方法代码框架如下,也体现了Scala模式匹配的强大之处。
def visit(rel: StreamPhysicalRel,requiredTrait: ModifyKindSetTrait,requester: String): StreamPhysicalRel = rel match {case sink: StreamPhysicalSink =>val name = s"Table sink '${sink.tableIdentifier.asSummaryString()}'"val queryModifyKindSet = deriveQueryDefaultChangelogMode(sink.getInput, name)val sinkRequiredTrait = ModifyKindSetTrait.fromChangelogMode(sink.tableSink.getChangelogMode(queryModifyKindSet))val children = visitChildren(sink, sinkRequiredTrait, name)val sinkTrait = sink.getTraitSet.plus(ModifyKindSetTrait.EMPTY)// ignore required trait from context, because sink is the true rootsink.copy(sinkTrait, children).asInstanceOf[StreamPhysicalRel]case sink: StreamPhysicalLegacySink[_] => // ......case deduplicate: StreamPhysicalDeduplicate =>// deduplicate only support insert only as inputval children = visitChildren(deduplicate, ModifyKindSetTrait.INSERT_ONLY)val providedTrait = if (!deduplicate.keepLastRow && !deduplicate.isRowtime) {// only proctime first row deduplicate does not produce UPDATE changesModifyKindSetTrait.INSERT_ONLY} else {// other deduplicate produce update changesModifyKindSetTrait.ALL_CHANGES}createNewNode(deduplicate, children, providedTrait, requiredTrait, requester)case agg: StreamPhysicalGroupAggregate =>// agg support all changes in inputval children = visitChildren(agg, ModifyKindSetTrait.ALL_CHANGES)val inputModifyKindSet = getModifyKindSet(children.head)val builder = ModifyKindSet.newBuilder().addContainedKind(ModifyKind.INSERT).addContainedKind(ModifyKind.UPDATE)if (inputModifyKindSet.contains(ModifyKind.UPDATE) ||inputModifyKindSet.contains(ModifyKind.DELETE)) {builder.addContainedKind(ModifyKind.DELETE)}val providedTrait = new ModifyKindSetTrait(builder.build())createNewNode(agg, children, providedTrait, requiredTrait, requester)case tagg: StreamPhysicalGroupTableAggregateBase => // ......case agg: StreamPhysicalPythonGroupAggregate => // ......case window: StreamPhysicalGroupWindowAggregateBase =>// WindowAggregate and WindowTableAggregate support insert-only in inputval children = visitChildren(window, ModifyKindSetTrait.INSERT_ONLY)val builder = ModifyKindSet.newBuilder().addContainedKind(ModifyKind.INSERT)if (window.emitStrategy.produceUpdates) {builder.addContainedKind(ModifyKind.UPDATE)}val providedTrait = new ModifyKindSetTrait(builder.build())createNewNode(window, children, providedTrait, requiredTrait, requester)case _: StreamPhysicalWindowAggregate | _: StreamPhysicalWindowRank =>// WindowAggregate and WindowRank support insert-only in inputval children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY)val providedTrait = ModifyKindSetTrait.INSERT_ONLYcreateNewNode(rel, children, providedTrait, requiredTrait, requester)case limit: StreamPhysicalLimit => // ......case _: StreamPhysicalRank | _: StreamPhysicalSortLimit => // ......case sort: StreamPhysicalSort => // ......case cep: StreamPhysicalMatch => // ......case _: StreamPhysicalTemporalSort | _: StreamPhysicalIntervalJoin |_: StreamPhysicalOverAggregate | _: StreamPhysicalPythonOverAggregate => // ......case join: StreamPhysicalJoin => // ......case windowJoin: StreamPhysicalWindowJoin => // ......case temporalJoin: StreamPhysicalTemporalJoin => // ......case _: StreamPhysicalCalcBase | _: StreamPhysicalCorrelateBase |_: StreamPhysicalLookupJoin | _: StreamPhysicalExchange |_: StreamPhysicalExpand | _: StreamPhysicalMiniBatchAssigner |_: StreamPhysicalWatermarkAssigner | _: StreamPhysicalWindowTableFunction =>// transparent forward requiredTrait to childrenval children = visitChildren(rel, requiredTrait, requester)val childrenTrait = children.head.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)// forward children modecreateNewNode(rel, children, childrenTrait, requiredTrait, requester)case union: StreamPhysicalUnion => // ......case normalize: StreamPhysicalChangelogNormalize => // ......case ts: StreamPhysicalTableSourceScan =>// ScanTableSource supports produces updates and deletionsval providedTrait = ModifyKindSetTrait.fromChangelogMode(ts.tableSource.getChangelogMode)createNewNode(ts, List(), providedTrait, requiredTrait, requester)case _: StreamPhysicalDataStreamScan | _: StreamPhysicalLegacyTableSourceScan |_: StreamPhysicalValues => // ......case scan: StreamPhysicalIntermediateTableScan => // ......case _ =>throw new UnsupportedOperationException(s"Unsupported visit for ${rel.getClass.getSimpleName}")}
可见,这个访问者以Sink为根开始推断ChangelogMode
,requiredTrait
参数就是父节点需要子节点满足的ModifyKindSetTrait
,一直传播到Source为止。对于那些不会对变化语义产生影响的节点(如Calc
、Exchange
等),则会直接将requiredTrait
转发到子节点。
在这过程中,若有父节点和子节点Trait不相容的情况出现,就会抛出文章开头所述的错误信息,见createNewNode()
方法:
private def createNewNode(node: StreamPhysicalRel,children: List[StreamPhysicalRel],providedTrait: ModifyKindSetTrait,requiredTrait: ModifyKindSetTrait,requestedOwner: String): StreamPhysicalRel = {if (!providedTrait.satisfies(requiredTrait)) {val diff = providedTrait.modifyKindSet.minus(requiredTrait.modifyKindSet)val diffString = diff.getContainedKinds.toList.sorted // for deterministic error message.map(_.toString.toLowerCase).mkString(" and ")// creates a new node based on the new children, to have a more correct node description// e.g. description of GroupAggregate is based on the ModifyKindSetTrait of childrenval tempNode = node.copy(node.getTraitSet, children).asInstanceOf[StreamPhysicalRel]val nodeString = tempNode.getRelDetailedDescriptionthrow new TableException(s"$requestedOwner doesn't support consuming $diffString changes " +s"which is produced by node $nodeString")}val newTraitSet = node.getTraitSet.plus(providedTrait)node.copy(newTraitSet, children).asInstanceOf[StreamPhysicalRel]}
为了方便理解,来用一条包含去重+窗口聚合逻辑的SQL语句做说明:
SELECT userId, COUNT(DISTINCT orderId)
FROM (SELECT * FROM (SELECT *, ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime [ASC | DESC]) AS rnFROM rtdw_ods.kafka_order_done_log /*+ OPTIONS('scan.startup.mode'='latest-offset') */) WHERE rn = 1
) GROUP BY userId, TUMBLE(procTime, INTERVAL '5' SECOND);
经过试验可以发现,如果去重保留第一条数据(即ORDER BY procTime ASC
),那么这条语句可以正常执行。但若是保留最后一条数据(即ORDER BY procTime DESC
),就会抛出如下的异常:
Exception in thread "main" org.apache.flink.table.api.TableException: StreamPhysicalGroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate(keep=[LastRow], key=[suborderid], order=[PROCTIME])at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:389)at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:166)
......
再次参考代码可得知,GroupWindowAggregate
只能接受子节点的INSERT语义,但是Deduplicate
在保留最后一条的情况下会产生回撤语义,故无法执行。
SatisfyUpdateKindTraitVisitor
的处理方式类似,不再赘述。
Hack一下
通过查看执行层的GroupWindowAggregate
代码,可知它其实是能够支持回撤流输入的。我们只需要对FlinkChangelogModeInferenceProgram
做三处简单的改动就能达到目的:
SatisfyModifyKindSetTraitVisitor#visit()
方法:
将StreamPhysicalGroupWindowAggregateBase
判断分支中visitChildren
方法的requiredChildrenTrait
参数由ModifyKindSetTrait.INSERT_ONLY
改成ModifyKindSetTrait.ALL_CHANGES
,表示它接受所有变更类型。SatisfyUpdateKindTraitVisitor#visit()
方法:- 将第3个判断分支的条件最后加上
| _: StreamPhysicalGroupWindowAggregateBase
,表示它接受UpdateKindTrait.BEFORE_AND_AFTER
(对于回撤流)和UpdateKindTrait.NONE
(对于只追加流)。 - 相应地,在第4个判断分支的条件中删掉
_: StreamPhysicalGroupWindowAggregate | _: StreamPhysicalGroupWindowTableAggregate
两者。
- 将第3个判断分支的条件最后加上
重新构建flink-table-planner-blink模块,再提交上一节的保留最后一条去重+窗口聚合的SQL,发现可以正常执行,且结果正确。
The End
明后两天就是Flink Forward Asia 2021 Online咯~
晚安晚安。
http://www.taodudu.cc/news/show-2919766.html
相关文章:
- mysql between and 日期
- TypeError: Can‘t mix strings and bytes in path components
- Oracle数据库Bitand()函数用法(计算位移)
- 问题解决: ValueError: Can't Handle mix of binary and continuous
- BETWEEN AND 操作符
- lftp下载文件无法覆盖,提示 file already existst and xfer:clobber is unset 问题解决
- 学习MATLAB的第一天,梳理一些见到的函数。1.matlab中sin、cos、tan三角函数问题。2.abs函数。3.vpa函数。4.disp函数。5.class函数。6.logical函数。
- 颜色表大全 | HTML Color Table
- PS 颜色表大全-颜色中文名(1)
- 【CSS】:中文颜色名称对照
- shawl.qiu Javascript 前景色背景色调色类 / BgColorScheme v1.0
- 背景颜色对照表
- 【WEB】前端系统配色方案(全览)
- 【Web前端】配色方案(全览)
- 颜色搜集表
- 日语表示颜色的单词
- 配色用的调和色
- 中文颜色名称与RGB颜色对照表
- UI设计 AndroidIOS开发推荐用色
- 颜色对照表、偏色修正表
- 色彩大全,色彩配色大全
- 记一次APP去壳破解重新打包
- clusterprolifer gsea 富集分析
- R语言GO富集分析
- WebSestalt,好用的富集分析工具,介绍及使用教程
- r语言进行go富集分析_GO富集柱状图
- 【富集分析】
- 使用clusterProfiler进行GO富集分析
- r语言进行go富集分析_R语言GEO数据挖掘-功能富集分析
- linux下的go富集分析,GO富集分析示例【华为云技术分享】
从Flink SQL doesn't support consuming update and delete changes 错误谈起相关推荐
- Flink sql:Table sink doesn‘t support consuming update and delete changes which is produced by node
一.问题描述 Flink sql将kafka作为join的输出,报错: Exception in thread "main" org.apache.flink.table.api. ...
- toAppendStream doesn‘t support consuming update and delete changes which is produced by node XXX
bug如下: Exception in thread "main" org.apache.flink.table.api.TableException: toAppendStrea ...
- Table sink ‘default_catalog.default_database.t3‘ doesn‘t support consuming update and delete changes
报错:Table sink 'default_catalog.default_database.t3' doesn't support consuming update and delete chan ...
- toAppendStream doesn‘t support consuming update changes which is produced by node GroupAggregate
Exception in thread "main" org.apache.flink.table.api.TableException: toAppendStream doesn ...
- 【Flink】FLink SQL TableException: Table sink doesn‘t support consuming update changes which is
文章目录 1.概述 2.原因 3.解决方案 1.概述 在做实验:[Flink]Flink SQL 读取 CSV 文件 最后尝试将数据写入到csv文件的时候报错. Flink SQL> INSER ...
- 【SQL】Attempt to do update or delete using transaction manager that does not support these operations
在Hive SQL中执行update或者delete语句,报错: FAILED: SemanticException [Error 10294]: Attempt to do update or de ...
- Flink SQL的打印输出的几种方式
打广告!!!!!!!!!!!!!!!!!!!!!!!!!: 先跟鸡哥打个广告 ,博客地址: https://me.csdn.net/weixin_47482194 写的博客很有水平的,上了几次官网推荐 ...
- 使用flink Table Sql api来构建批量和流式应用(3)Flink Sql 使用
从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...
- Flink SQL 1.11 新功能与最佳实践
#2020云栖大会#阿里云海量offer来啦!投简历.赢阿里云限量礼品及阿里云ACA认证免费考试资格!>>> 整理者:陈婧敏(清樾) 本文整理自 Apache Flink PMC,阿 ...
最新文章
- php的匿名函数和闭包函数
- 10 个常见的 Linux 终端仿真器
- Ubuntu 16.04 QT ‘usr/bin/ld cannot find -IGL‘
- Leetcode-53:最大子序和
- 独立思考者模型:寻找潜藏在表象背后的真相 探寻真相的方法
- 近期吉他练习曲目《爱的罗曼史》,安排每天晚上睡觉前练习1小时!
- 【经典回放】多种语言系列数据结构线性表之二:链表
- PCL——向PCD文件写入点云数据
- python mql4跟单_MT4软件本地跟单方法的实现 -
- 西门子PLC是怎么控制伺服电机的?
- 读 Irving M. Copi 之《逻辑学导论》
- 硬件设计1:常用元器件的选型理论依据
- 深度linux关闭搜狗后没法输入法,Deepin v20无法安装搜狗官方输入法 for Linux,附原因和解决办法...
- 星际争霸2服务器未能创建游戏,星际争霸2游戏进不去解决方法
- 阿里云个人网站备案流程
- 【商业源码】生日大放送-Newlife商业源码分享
- 如何管理一台集群的虚拟机
- 易优cms uiarclist 文档列表可视化标签
- Nginx与服务器集群
- 机器学习中的Inductive bias理解