Top-N是我们应用Flink进行业务开发时的常见场景,传统的DataStream API已经有了非常成熟的实现方案,如果换成Flink SQL,又该怎样操作?好在Flink SQL官方文档已经给出了标准答案,我们只需要照抄就行,参考链接:

https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/overview/

其语法如下:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownumFROM table_name)
WHERE rownum <= N [AND conditions]

看官可能已经能够在日常工作中熟练应用这种查询风格了。那么,Flink内部是如何将它转化成高效的执行方案的呢?接下来基于最新的Flink 1.12版本稍微探究一下。

Logical Plan

使用EXPLAIN语句观察示例查询的执行计划(部分)如下:

EXPLAIN PLAN FOR SELECT * FROM (SELECT *,row_number() OVER(PARTITION BY merchandiseId ORDER BY totalQuantity DESC) AS rownumFROM (SELECT merchandiseId, sum(quantity) AS totalQuantityFROM rtdw_dwd.kafka_order_done_logGROUP BY merchandiseId)
) WHERE rownum <= 10== Abstract Syntax Tree ==
LogicalProject(merchandiseId=[$0], totalQuantity=[$1], rownum=[$2])
+- LogicalFilter(condition=[<=($2, 10)])+- LogicalProject(merchandiseId=[$0], totalQuantity=[$1], rownum=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $1 DESC NULLS LAST)])+- ...== Optimized Logical Plan ==
Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[merchandiseId], orderBy=[totalQuantity DESC], select=[merchandiseId, totalQuantity, w0$o0])
+- Exchange(distribution=[hash[merchandiseId]])+- ...== Physical Execution Plan ==
Stage 1 : Data Source...Stage 2 : Operator...Stage 4 : Operator...Stage 6 : Operatorcontent : Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[merchandiseId], orderBy=[totalQuantity DESC], select=[merchandiseId, totalQuantity, w0$o0])ship_strategy : HASH

由执行计划可知,row_number() OVER(PARTITION BY ...)子句在逻辑计划阶段被优化成了名为Rank的RelNode(看官可参见Calcite的相关资料了解RelNode),可以用如下的简图说明。

负责这个优化的RelOptRule在Flink项目中名为FlinkLogicalRankRule。它将符合规则的开窗聚合操作(FlinkLogicalOverAggregate RelNode)和对排名的过滤操作(FlinkLogicalCalc RelNode)合并为FlinkLogicalRank。也就是说,只有严格符合上一节所述语法的查询才能得到优化。

FlinkLogicalRank节点会记录以下主要信息:

  • partitionKey:分组键。

  • orderKey:排序键与排序规则。

  • rankType:排名函数的类型,即ROW_NUMBER、RANK或者DENSE_RANK。

  • rankRange:排名区间(即Top-N一词中的N)。

  • strategy:Top-N结果的更新策略,目前有3种:

    • AppendFast:结果只追加,不更新;

    • Retract:类似于回撤流,结果会更新,前提是输入数据没有主键,或者主键与partitionKey不同;

    • UpdateFast:快速更新,前提是输入数据有主键,且结果单调递增/递减,还要求orderKey的排序规则与结果的单调性相反(例:ORDER BY sum(quantity) DESC)。可见它的效率最高,但是也最苛刻。

  • outputRankNumber:是否输出排名的序号,即在外层查询中是否有SELECT rownum子句。显然,如果不输出序号,在排名发生变化时可以大大减少回撤输出的数据量,降低Flink端的压力,具体可参见官方文档"No Ranking Output Optimization"一节。

Physical Plan

在流处理环境下,StreamPhysicalRankRule规则负责将FlinkLogicalRank逻辑节点转换成StreamPhysicalRankRule物理节点,并翻译成物理执行节点StreamExecRank。注意如果是分组Top-N(即有PARTITION BY子句),就会按照partitionKey的hash值分发到各个sub-task,否则会将并行度强制设为1,计算全局Top-N。另外从代码可以读出,Top-N语法目前仅支持ROW_NUMBER,暂时还不支持RANK和DENSE_RANK排名。

根据上文所述更新策略的不同,实际执行时采用的ProcessFunction也不同,如下类图所示。其中CleanupState接口表示支持空闲状态保留时间(idle state retention time)特性。

以最常用到的RetractableTopNFunction为例,当有一条累加数据到来时,处理流程可以用如下的简图来说明。

其中,dataState是MapState< RowData, List< RowData>>类型的状态,保存partitionKey与该key下面的流数据,用于容错。而treeMap是ValueState< SortedMap< RowData, Long>>类型的状态,顾名思义,它其中维护了一个TreeMap,用于计数及输出Top-N结果。至于这里为什么用了红黑树(TreeMap)而不是传统的最大/最小堆(PriorityQueue),自然是因为红黑树是对数复杂度的,相较于堆来说更适合Flink这种对时间敏感而对空间较不敏感的执行环境。

另外,我们一定要记得启用空闲状态保留时间,这样dataState和treeMap中的数据才不会永远积攒下去。不过空闲状态的清理并非确定性的,所以如果要计算有时间维度的排行榜(如按天、按小时等),需要把时间维度也加入PARTITION BY子句,而不是将保留时间设为对应的长度。

最后,在StreamExecRank中还提供了一个可配置的参数table.exec.topn.cache-size(默认值10000),即Top-N缓存的大小。如果Top-N的规模比较大,适当增加此值可以避免频繁访问状态,提高执行效率。

八千里路云和月 | 从零到大数据专家学习路径指南

我们在学习Flink的时候,到底在学习什么?

193篇文章暴揍Flink,这个合集你需要关注一下

Flink生产环境TOP难题与优化,阿里巴巴藏经阁YYDS

Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点

我们在学习Spark的时候,到底在学习什么?

在所有Spark模块中,我愿称SparkSQL为最强!

硬刚Hive | 4万字基础调优面试小总结

数据治理方法论和实践小百科全书

标签体系下的用户画像建设小指南

4万字长文 | ClickHouse基础&实践&调优全视角解析

【面试&个人成长】2021年过半,社招和校招的经验之谈

大数据方向另一个十年开启 |《硬刚系列》第一版完结

我写过的关于成长/面试/职场进阶的文章

当我们在学习Hive的时候在学习什么?「硬刚Hive续集」

本文发自《大数据技术与架构》公众号,微信搜索:import_bigdata,欢迎关注。

Flink SQL高效Top-N方案的实现原理相关推荐

  1. 使用flink Table Sql api来构建批量和流式应用(3)Flink Sql 使用

    从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...

  2. 基于数据库数据增量同步_基于 Flink SQL CDC 的实时数据同步方案

    简介:Flink 1.11 引入了 Flink SQL CDC,CDC 能给我们数据和业务间能带来什么变化?本文由 Apache Flink PMC,阿里巴巴技术专家伍翀 (云邪)分享,内容将从传统的 ...

  3. 最佳实践|如何写出简单高效的 Flink SQL?

    摘要:本文整理自阿里巴巴高级技术专家.Apache Flink PMC 贺小令,在Flink Forward Asia 2022 生产实践专场的分享.本篇内容主要分为三个部分: Flink SQL I ...

  4. cdc工具 postgresql_基于 Flink SQL CDC 的实时数据同步方案

    作者:伍翀 (云邪) 整理:陈政羽(Flink 社区志愿者) Flink 1.11 引入了 Flink SQL CDC,CDC 能给我们数据和业务间能带来什么变化?本文由 Apache Flink P ...

  5. Flink SQL 功能解密系列 —— 流式 TopN 挑战与实现

    TopN 是统计报表和大屏非常常见的功能,主要用来实时计算排行榜.流式的 TopN 不同于批处理的 TopN,它的特点是持续的在内存中按照某个统计指标(如出现次数)计算 TopN 排行榜,然后当排行榜 ...

  6. 大数据分析实战之项目实践:使用DLI Flink SQL进行电商实时业务数据分析

    使用 DLI Flink SQL 进行电商实时业务数据分析 业务场景介绍 场景描述 场景方案 场景任务 数据说明 数据源表:电商业务订单详情宽表 结果表:各渠道的销售总额实时统计表 操作过程 实操过程 ...

  7. Flink SQL 1.11 新功能与最佳实践

    #2020云栖大会#阿里云海量offer来啦!投简历.赢阿里云限量礼品及阿里云ACA认证免费考试资格!>>> 整理者:陈婧敏(清樾) 本文整理自 Apache Flink PMC,阿 ...

  8. Flink SQL CDC 上线!我们总结了 13 条生产实践经验

    #2020云栖大会#阿里云海量offer来啦!投简历.赢阿里云限量礼品及阿里云ACA认证免费考试资格!>>> 摘要: 7月,Flink 1.11 新版发布,在生态及易用性上有大幅提升 ...

  9. 一站式实时数仓开发:当FLINK SQL遇见ULTRON

    女主宣言 FLINK是被称为第四代大数据处理引擎的开源利器,近年来在国内各大厂的加持下更是成为了实时计算领域的标准,而ULTRON是360商业化近一年多来在总结自身实时计算场景应用和特点的基础上打造的 ...

最新文章

  1. CNDO-INTGRL-SS-BINTGS-斯莱特轨道指数---递推方法
  2. 计算机研究生上课时间自由吗,计算机在职研究生面授班主要的上课时间安排是怎样的呢...
  3. 前端学习(3067):vue+element今日头条管理-element里面的image组件
  4. STL源码剖析 数值算法 accumulate | adjacent_difference | inner_product | partial_sum | power | itoa
  5. 什么是真正的高清,你知道吗?
  6. 【TensorFlow】TensorFlow从浅入深系列之九 -- 教你认识图像识别中经典数据集
  7. Tomcat中的Out Of Memory错误
  8. 基于JAVA+SpringMVC+Mybatis+MYSQL的漫画社区
  9. 泰勒·斯威夫特(Taylor Swift)最好听歌曲专辑,喜欢的可以下载保存
  10. python儿童编程入门-一款儿童编程入门的理想工具——PythonTurtle
  11. 影响ae渲染时间的计算机配置,分享两套影视后期电脑配置2019 能流畅使用ae和pr的电脑主机推荐...
  12. 【调剂】中科院天津工业生物技术所与天津科技大学联合培养硕士招生2023
  13. (附源码)基于java的校园二手书籍交易平台 毕业设计131558
  14. PHP服务器获取客户端IP地址
  15. 实现 防止视频被下载功能
  16. sex 无需下载_elipse FTP插件 - Sexftp 支持FTP上传、下载、比较等功能
  17. 秘密, 维基百科可以这样访问...(不包括中文版)
  18. vue中使用指令给按钮添加节流
  19. Python 生成器(generator)详细总结+示例
  20. 浅谈自定义类型-枚举

热门文章

  1. 开发中使用到的56个民族的数据
  2. NYOJ611 练练
  3. python爬虫书籍-Python爬虫入门看什么书好?
  4. npm-whoami
  5. Java中计算日期间隔
  6. 老鱼Python数据分析——篇十:使用selenium获取历史数据并保存
  7. 2022-2028全球与中国内存测试设备市场现状及未来发展趋势
  8. (SCI论文写作)使用Visio导出图片时,边角有黑色框线和大块留白的解决办法(PDF,jpg,png等多格式通用)
  9. 永磁同步电机矢量控制算法梳理
  10. linux没有应用程序,Ninite为Linux安装多个应用程序没有任何麻烦 | MOS86