前言

相信我们在初学Flink SQL时,多少遇到过像这样的错误信息:

org.apache.flink.table.api.TableException:
X[算子名] doesn't support consuming update and delete changes which is produced by node Y[算子名]

为什么有些下游算子不能接受上游算子发来的UPDATE和DELETE消息呢?本文以1.13版本为准来简单地捋一下。

回顾ChangelogMode

笔者之前写过一篇自定义Flink SQL Connector的简明教程,其中提到在定义DynamicTableSink(以及ScanTableSource)的时候,都需要覆写getChangelogMode()方法,告诉Planner这个Connector可以接受或产生的数据变化类型。变化的标记由四种RowKind表示,即INSERT(+I)、UPDATE_BEFORE(-U)、UPDATE_AFTER(+U)和DELETE(-D):

/** Insertion operation. */INSERT("+I", (byte) 0),/*** Update operation with the previous content of the updated row.** <p>This kind SHOULD occur together with {@link #UPDATE_AFTER} for modelling an update that* needs to retract the previous row first. It is useful in cases of a non-idempotent update,* i.e., an update of a row that is not uniquely identifiable by a key.*/UPDATE_BEFORE("-U", (byte) 1),/*** Update operation with new content of the updated row.** <p>This kind CAN occur together with {@link #UPDATE_BEFORE} for modelling an update that* needs to retract the previous row first. OR it describes an idempotent update, i.e., an* update of a row that is uniquely identifiable by a key.*/UPDATE_AFTER("+U", (byte) 2),/** Deletion operation. */DELETE("-D", (byte) 3);

但是,ChangelogMode只能作用于Connector,中间算子(其实也包含Source/Sink)产生和接受哪些类型的变化在Planner内部是如何定义的?下面了解一下相关的两种RelTraits。

ModifyKindSet / UpdateKind Trait

复习一下之前讲过的RelTraitRelTraitDef的含义:

A set of physical properties & their definitions carried by a relational expression.

站在RelNode的角度上讲,ChangelogMode确实可以作为附加在其上的物理属性。Blink Planner的物理计划层使用了两个RelTrait来承载数据变化的语义。第一个是ModifyKindSetTrait,表示INSERT(I)、UPDATE(U)和DELETE(D)三者组成的集合,部分代码如下,比较容易理解:

object ModifyKindSetTrait {/*** An empty [[ModifyKindSetTrait]] which doesn't contain any [[ModifyKind]].*/val EMPTY = new ModifyKindSetTrait(ModifyKindSet.newBuilder().build())/*** Insert-only [[ModifyKindSetTrait]].*/val INSERT_ONLY = new ModifyKindSetTrait(ModifyKindSet.INSERT_ONLY)/*** A modify [[ModifyKindSetTrait]] that contains all change operations.*/val ALL_CHANGES = new ModifyKindSetTrait(ModifyKindSet.ALL_CHANGES)/*** Creates an instance of [[ModifyKindSetTrait]] from th given [[ChangelogMode]].*/def fromChangelogMode(changelogMode: ChangelogMode): ModifyKindSetTrait = {val builder = ModifyKindSet.newBuilderchangelogMode.getContainedKinds.foreach {case RowKind.INSERT => builder.addContainedKind(ModifyKind.INSERT)case RowKind.DELETE => builder.addContainedKind(ModifyKind.DELETE)case _ => builder.addContainedKind(ModifyKind.UPDATE) // otherwise updates}new ModifyKindSetTrait(builder.build)}
}

第二个则是UpdateKindTrait,表示UPDATE_BEFORE(-U)和UPDATE_AFTER(+U),也不难。注意它除了可以从ChangelogMode转换而来,还可以从ModifyKindSet转换而来:

object UpdateKindTrait {/*** An [[UpdateKindTrait]] that describes the node doesn't provide any kind of updates* as a provided trait, or requires nothing about kind of updates as a required trait.** <p>It also indicates that the [[ModifyKindSetTrait]] of current node doesn't contain* [[ModifyKind#UPDATE]] operation.*/val NONE = new UpdateKindTrait(UpdateKind.NONE)/*** An [[UpdateKindTrait]] that describes the node produces update changes just as a* single row of [[org.apache.flink.types.RowKind#UPDATE_AFTER]]*/val ONLY_UPDATE_AFTER = new UpdateKindTrait(UpdateKind.ONLY_UPDATE_AFTER)/*** An [[UpdateKindTrait]] that describes the node produces update changes consists of* a row of [[org.apache.flink.types.RowKind#UPDATE_BEFORE]] and* [[org.apache.flink.types.RowKind#UPDATE_AFTER]].*/val BEFORE_AND_AFTER = new UpdateKindTrait(UpdateKind.BEFORE_AND_AFTER)/*** Returns ONLY_UPDATE_AFTER [[UpdateKindTrait]] if there is update changes.* Otherwise, returns NONE [[UpdateKindTrait]].*/def onlyAfterOrNone(modifyKindSet: ModifyKindSet): UpdateKindTrait = {val updateKind = if (modifyKindSet.contains(ModifyKind.UPDATE)) {UpdateKind.ONLY_UPDATE_AFTER} else {UpdateKind.NONE}new UpdateKindTrait(updateKind)}/*** Returns BEFORE_AND_AFTER [[UpdateKindTrait]] if there is update changes.* Otherwise, returns NONE [[UpdateKindTrait]].*/def beforeAfterOrNone(modifyKindSet: ModifyKindSet): UpdateKindTrait = {val updateKind = if (modifyKindSet.contains(ModifyKind.UPDATE)) {UpdateKind.BEFORE_AND_AFTER} else {UpdateKind.NONE}new UpdateKindTrait(updateKind)}/*** Creates an instance of [[UpdateKindTrait]] from the given [[ChangelogMode]].*/def fromChangelogMode(changelogMode: ChangelogMode): UpdateKindTrait = {val hasUpdateBefore = changelogMode.contains(RowKind.UPDATE_BEFORE)val hasUpdateAfter = changelogMode.contains(RowKind.UPDATE_AFTER)(hasUpdateBefore, hasUpdateAfter) match {case (true, true) => BEFORE_AND_AFTERcase (false, true) => ONLY_UPDATE_AFTERcase (true, false) =>throw new IllegalArgumentException("Unsupported changelog mode: " +ChangelogPlanUtils.stringifyChangelogMode(Some(changelogMode)))case (false, false) => NONE}}
}

RelTrait相容性

补充一个之前的Calcite入门讲义里略去的点,就是RelTrait的核心方法:

boolean satisfies(RelTrait trait);

它用于判断此RelTrait与另外一个RelTrait的相容性,亦即T1是否满足T2的约束。显然,如果T1与T2相同,或者T1比T2更严格,那么此方法返回true,否则返回false。举个栗子,对于RelCollation而言,(ORDER BY a, b) satisfies (ORDER BY a)就是成立的,反过来则不成立。

ModifyKindSetTrait#satisfies()方法的定义如下,注释写得很清楚,即T1是T2的子集:

override def satisfies(relTrait: RelTrait): Boolean = relTrait match {case other: ModifyKindSetTrait =>// it’s satisfied when modify kinds are included in the required set,// e.g. [I,U] satisfy [I,U,D]//      [I,U,D] not satisfy [I,D]this.modifyKindSet.getContainedKinds.forall(other.modifyKindSet.contains)case _ => false}

UpdateKindTrait#satisfies()则要求两者完全相同:

override def satisfies(relTrait: RelTrait): Boolean = relTrait match {case other: UpdateKindTrait =>// should totally matchother.updateKind == this.updateKindcase _ => false}

接下来就可以进入Blink Planner的相关逻辑了。

物理计划阶段的ChangelogMode推断

Blink Planner通过名为FlinkChangelogModeInferenceProgram的优化程序来为每个StreamPhysicalRel推断出ChangelogMode信息,并检查产生的ModifyKindSetTraitUpdateKindTrait的上下游相容性。主要的逻辑分为两步:

// step1: satisfy ModifyKindSet traitval physicalRoot = root.asInstanceOf[StreamPhysicalRel]val rootWithModifyKindSet = SATISFY_MODIFY_KIND_SET_TRAIT_VISITOR.visit(physicalRoot,// we do not propagate the ModifyKindSet requirement and requester among blocks// set default ModifyKindSet requirement and requester for rootModifyKindSetTrait.ALL_CHANGES,"ROOT")// step2: satisfy UpdateKind traitval rootModifyKindSet = getModifyKindSet(rootWithModifyKindSet)// use the required UpdateKindTrait from parent blocksval requiredUpdateKindTraits = if (rootModifyKindSet.contains(ModifyKind.UPDATE)) {if (context.isUpdateBeforeRequired) {Seq(UpdateKindTrait.BEFORE_AND_AFTER)} else {// update_before is not required, and input contains updates// try ONLY_UPDATE_AFTER first, and then BEFORE_AND_AFTERSeq(UpdateKindTrait.ONLY_UPDATE_AFTER, UpdateKindTrait.BEFORE_AND_AFTER)}} else {// there is no updatesSeq(UpdateKindTrait.NONE)}

可见是通过两个特殊定义的Visitor(参见访问者模式)对物理计划树进行遍历与转换。以SatisfyModifyKindSetTraitVisitor为例,它的visit()方法代码框架如下,也体现了Scala模式匹配的强大之处。

def visit(rel: StreamPhysicalRel,requiredTrait: ModifyKindSetTrait,requester: String): StreamPhysicalRel = rel match {case sink: StreamPhysicalSink =>val name = s"Table sink '${sink.tableIdentifier.asSummaryString()}'"val queryModifyKindSet = deriveQueryDefaultChangelogMode(sink.getInput, name)val sinkRequiredTrait = ModifyKindSetTrait.fromChangelogMode(sink.tableSink.getChangelogMode(queryModifyKindSet))val children = visitChildren(sink, sinkRequiredTrait, name)val sinkTrait = sink.getTraitSet.plus(ModifyKindSetTrait.EMPTY)// ignore required trait from context, because sink is the true rootsink.copy(sinkTrait, children).asInstanceOf[StreamPhysicalRel]case sink: StreamPhysicalLegacySink[_] => // ......case deduplicate: StreamPhysicalDeduplicate =>// deduplicate only support insert only as inputval children = visitChildren(deduplicate, ModifyKindSetTrait.INSERT_ONLY)val providedTrait = if (!deduplicate.keepLastRow && !deduplicate.isRowtime) {// only proctime first row deduplicate does not produce UPDATE changesModifyKindSetTrait.INSERT_ONLY} else {// other deduplicate produce update changesModifyKindSetTrait.ALL_CHANGES}createNewNode(deduplicate, children, providedTrait, requiredTrait, requester)case agg: StreamPhysicalGroupAggregate =>// agg support all changes in inputval children = visitChildren(agg, ModifyKindSetTrait.ALL_CHANGES)val inputModifyKindSet = getModifyKindSet(children.head)val builder = ModifyKindSet.newBuilder().addContainedKind(ModifyKind.INSERT).addContainedKind(ModifyKind.UPDATE)if (inputModifyKindSet.contains(ModifyKind.UPDATE) ||inputModifyKindSet.contains(ModifyKind.DELETE)) {builder.addContainedKind(ModifyKind.DELETE)}val providedTrait = new ModifyKindSetTrait(builder.build())createNewNode(agg, children, providedTrait, requiredTrait, requester)case tagg: StreamPhysicalGroupTableAggregateBase => // ......case agg: StreamPhysicalPythonGroupAggregate => // ......case window: StreamPhysicalGroupWindowAggregateBase =>// WindowAggregate and WindowTableAggregate support insert-only in inputval children = visitChildren(window, ModifyKindSetTrait.INSERT_ONLY)val builder = ModifyKindSet.newBuilder().addContainedKind(ModifyKind.INSERT)if (window.emitStrategy.produceUpdates) {builder.addContainedKind(ModifyKind.UPDATE)}val providedTrait = new ModifyKindSetTrait(builder.build())createNewNode(window, children, providedTrait, requiredTrait, requester)case _: StreamPhysicalWindowAggregate | _: StreamPhysicalWindowRank =>// WindowAggregate and WindowRank support insert-only in inputval children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY)val providedTrait = ModifyKindSetTrait.INSERT_ONLYcreateNewNode(rel, children, providedTrait, requiredTrait, requester)case limit: StreamPhysicalLimit => // ......case _: StreamPhysicalRank | _: StreamPhysicalSortLimit => // ......case sort: StreamPhysicalSort => // ......case cep: StreamPhysicalMatch => // ......case _: StreamPhysicalTemporalSort | _: StreamPhysicalIntervalJoin |_: StreamPhysicalOverAggregate | _: StreamPhysicalPythonOverAggregate => // ......case join: StreamPhysicalJoin => // ......case windowJoin: StreamPhysicalWindowJoin => // ......case temporalJoin: StreamPhysicalTemporalJoin => // ......case _: StreamPhysicalCalcBase | _: StreamPhysicalCorrelateBase |_: StreamPhysicalLookupJoin | _: StreamPhysicalExchange |_: StreamPhysicalExpand | _: StreamPhysicalMiniBatchAssigner |_: StreamPhysicalWatermarkAssigner | _: StreamPhysicalWindowTableFunction =>// transparent forward requiredTrait to childrenval children = visitChildren(rel, requiredTrait, requester)val childrenTrait = children.head.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)// forward children modecreateNewNode(rel, children, childrenTrait, requiredTrait, requester)case union: StreamPhysicalUnion => // ......case normalize: StreamPhysicalChangelogNormalize => // ......case ts: StreamPhysicalTableSourceScan =>// ScanTableSource supports produces updates and deletionsval providedTrait = ModifyKindSetTrait.fromChangelogMode(ts.tableSource.getChangelogMode)createNewNode(ts, List(), providedTrait, requiredTrait, requester)case _: StreamPhysicalDataStreamScan | _: StreamPhysicalLegacyTableSourceScan |_: StreamPhysicalValues => // ......case scan: StreamPhysicalIntermediateTableScan => // ......case _ =>throw new UnsupportedOperationException(s"Unsupported visit for ${rel.getClass.getSimpleName}")}

可见,这个访问者以Sink为根开始推断ChangelogModerequiredTrait参数就是父节点需要子节点满足的ModifyKindSetTrait,一直传播到Source为止。对于那些不会对变化语义产生影响的节点(如CalcExchange等),则会直接将requiredTrait转发到子节点。

在这过程中,若有父节点和子节点Trait不相容的情况出现,就会抛出文章开头所述的错误信息,见createNewNode()方法:

private def createNewNode(node: StreamPhysicalRel,children: List[StreamPhysicalRel],providedTrait: ModifyKindSetTrait,requiredTrait: ModifyKindSetTrait,requestedOwner: String): StreamPhysicalRel = {if (!providedTrait.satisfies(requiredTrait)) {val diff = providedTrait.modifyKindSet.minus(requiredTrait.modifyKindSet)val diffString = diff.getContainedKinds.toList.sorted // for deterministic error message.map(_.toString.toLowerCase).mkString(" and ")// creates a new node based on the new children, to have a more correct node description// e.g. description of GroupAggregate is based on the ModifyKindSetTrait of childrenval tempNode = node.copy(node.getTraitSet, children).asInstanceOf[StreamPhysicalRel]val nodeString = tempNode.getRelDetailedDescriptionthrow new TableException(s"$requestedOwner doesn't support consuming $diffString changes " +s"which is produced by node $nodeString")}val newTraitSet = node.getTraitSet.plus(providedTrait)node.copy(newTraitSet, children).asInstanceOf[StreamPhysicalRel]}

为了方便理解,来用一条包含去重+窗口聚合逻辑的SQL语句做说明:

SELECT userId, COUNT(DISTINCT orderId)
FROM (SELECT * FROM (SELECT *, ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime [ASC | DESC]) AS rnFROM rtdw_ods.kafka_order_done_log /*+ OPTIONS('scan.startup.mode'='latest-offset') */) WHERE rn = 1
) GROUP BY userId, TUMBLE(procTime, INTERVAL '5' SECOND);

经过试验可以发现,如果去重保留第一条数据(即ORDER BY procTime ASC),那么这条语句可以正常执行。但若是保留最后一条数据(即ORDER BY procTime DESC),就会抛出如下的异常:

Exception in thread "main" org.apache.flink.table.api.TableException: StreamPhysicalGroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate(keep=[LastRow], key=[suborderid], order=[PROCTIME])at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:389)at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:166)
......

再次参考代码可得知,GroupWindowAggregate只能接受子节点的INSERT语义,但是Deduplicate在保留最后一条的情况下会产生回撤语义,故无法执行。

SatisfyUpdateKindTraitVisitor的处理方式类似,不再赘述。

Hack一下

通过查看执行层的GroupWindowAggregate代码,可知它其实是能够支持回撤流输入的。我们只需要对FlinkChangelogModeInferenceProgram做三处简单的改动就能达到目的:

  • SatisfyModifyKindSetTraitVisitor#visit()方法:
    StreamPhysicalGroupWindowAggregateBase判断分支中visitChildren方法的requiredChildrenTrait参数由ModifyKindSetTrait.INSERT_ONLY改成ModifyKindSetTrait.ALL_CHANGES,表示它接受所有变更类型。
  • SatisfyUpdateKindTraitVisitor#visit()方法:
    • 将第3个判断分支的条件最后加上| _: StreamPhysicalGroupWindowAggregateBase,表示它接受UpdateKindTrait.BEFORE_AND_AFTER(对于回撤流)和UpdateKindTrait.NONE(对于只追加流)。
    • 相应地,在第4个判断分支的条件中删掉_: StreamPhysicalGroupWindowAggregate | _: StreamPhysicalGroupWindowTableAggregate两者。

重新构建flink-table-planner-blink模块,再提交上一节的保留最后一条去重+窗口聚合的SQL,发现可以正常执行,且结果正确。

The End

明后两天就是Flink Forward Asia 2021 Online咯~

晚安晚安。


http://www.taodudu.cc/news/show-2919766.html

相关文章:

  • mysql between and 日期
  • TypeError: Can‘t mix strings and bytes in path components
  • Oracle数据库Bitand()函数用法(计算位移)
  • 问题解决: ValueError: Can't Handle mix of binary and continuous
  • BETWEEN AND 操作符
  • lftp下载文件无法覆盖,提示 file already existst and xfer:clobber is unset 问题解决
  • 学习MATLAB的第一天,梳理一些见到的函数。1.matlab中sin、cos、tan三角函数问题。2.abs函数。3.vpa函数。4.disp函数。5.class函数。6.logical函数。
  • 颜色表大全 | HTML Color Table
  • PS 颜色表大全-颜色中文名(1)
  • 【CSS】:中文颜色名称对照
  • shawl.qiu Javascript 前景色背景色调色类 / BgColorScheme v1.0
  • 背景颜色对照表
  • 【WEB】前端系统配色方案(全览)
  • 【Web前端】配色方案(全览)
  • 颜色搜集表
  • 日语表示颜色的单词
  • 配色用的调和色
  • 中文颜色名称与RGB颜色对照表
  • UI设计 AndroidIOS开发推荐用色
  • 颜色对照表、偏色修正表
  • 色彩大全,色彩配色大全
  • 记一次APP去壳破解重新打包
  • clusterprolifer gsea 富集分析
  • R语言GO富集分析
  • WebSestalt,好用的富集分析工具,介绍及使用教程
  • r语言进行go富集分析_GO富集柱状图
  • 【富集分析】
  • 使用clusterProfiler进行GO富集分析
  • r语言进行go富集分析_R语言GEO数据挖掘-功能富集分析
  • linux下的go富集分析,GO富集分析示例【华为云技术分享】

从Flink SQL doesn't support consuming update and delete changes 错误谈起相关推荐

  1. Flink sql:Table sink doesn‘t support consuming update and delete changes which is produced by node

    一.问题描述 Flink sql将kafka作为join的输出,报错: Exception in thread "main" org.apache.flink.table.api. ...

  2. toAppendStream doesn‘t support consuming update and delete changes which is produced by node XXX

    bug如下: Exception in thread "main" org.apache.flink.table.api.TableException: toAppendStrea ...

  3. Table sink ‘default_catalog.default_database.t3‘ doesn‘t support consuming update and delete changes

    报错:Table sink 'default_catalog.default_database.t3' doesn't support consuming update and delete chan ...

  4. toAppendStream doesn‘t support consuming update changes which is produced by node GroupAggregate

    Exception in thread "main" org.apache.flink.table.api.TableException: toAppendStream doesn ...

  5. 【Flink】FLink SQL TableException: Table sink doesn‘t support consuming update changes which is

    文章目录 1.概述 2.原因 3.解决方案 1.概述 在做实验:[Flink]Flink SQL 读取 CSV 文件 最后尝试将数据写入到csv文件的时候报错. Flink SQL> INSER ...

  6. 【SQL】Attempt to do update or delete using transaction manager that does not support these operations

    在Hive SQL中执行update或者delete语句,报错: FAILED: SemanticException [Error 10294]: Attempt to do update or de ...

  7. Flink SQL的打印输出的几种方式

    打广告!!!!!!!!!!!!!!!!!!!!!!!!!: 先跟鸡哥打个广告 ,博客地址: https://me.csdn.net/weixin_47482194 写的博客很有水平的,上了几次官网推荐 ...

  8. 使用flink Table Sql api来构建批量和流式应用(3)Flink Sql 使用

    从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...

  9. Flink SQL 1.11 新功能与最佳实践

    #2020云栖大会#阿里云海量offer来啦!投简历.赢阿里云限量礼品及阿里云ACA认证免费考试资格!>>> 整理者:陈婧敏(清樾) 本文整理自 Apache Flink PMC,阿 ...

最新文章

  1. php的匿名函数和闭包函数
  2. 10 个常见的 Linux 终端仿真器
  3. Ubuntu 16.04 QT ‘usr/bin/ld cannot find -IGL‘
  4. Leetcode-53:最大子序和
  5. 独立思考者模型:寻找潜藏在表象背后的真相 探寻真相的方法
  6. 近期吉他练习曲目《爱的罗曼史》,安排每天晚上睡觉前练习1小时!
  7. 【经典回放】多种语言系列数据结构线性表之二:链表
  8. PCL——向PCD文件写入点云数据
  9. python mql4跟单_MT4软件本地跟单方法的实现 -
  10. 西门子PLC是怎么控制伺服电机的?
  11. 读 Irving M. Copi 之《逻辑学导论》
  12. 硬件设计1:常用元器件的选型理论依据
  13. 深度linux关闭搜狗后没法输入法,Deepin v20无法安装搜狗官方输入法 for Linux,附原因和解决办法...
  14. 星际争霸2服务器未能创建游戏,星际争霸2游戏进不去解决方法
  15. 阿里云个人网站备案流程
  16. 【商业源码】生日大放送-Newlife商业源码分享
  17. 如何管理一台集群的虚拟机
  18. 易优cms uiarclist 文档列表可视化标签
  19. Nginx与服务器集群
  20. 机器学习中的Inductive bias理解

热门文章

  1. linux cadaver 命令,(个人学习Linux经历)文本命令
  2. Python3 实现简易局域网视频聊天工具
  3. 【实验】实验课总结3 实验二
  4. screenocr怎么卸载_screenocr是什么软件 screenocr软件及其功能介绍
  5. Idea中git进行回滚版本操作和查看修改记录
  6. java登陆界面背景_Java登录界面的实现(注册、登录、背景图片)
  7. Unity代码自动生成
  8. MAC最详细配置rz/sz命令
  9. 疾病研究:重症肌无力医师指南
  10. jni调用java数组导致VM aborting,安卓程序莫名闪退