本文主要研究一下flink Table的OrderBy及Limit

实例

Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.orderBy("a.asc");Table in = tableEnv.fromDataSet(ds, "a, b, c");// returns the first 5 records from the sorted result
Table result1 = in.orderBy("a.asc").fetch(5); // skips the first 3 records and returns all following records from the sorted result
Table result2 = in.orderBy("a.asc").offset(3);// skips the first 10 records and returns the next 5 records from the sorted result
Table result3 = in.orderBy("a.asc").offset(10).fetch(5);
  • orderBy方法类似sql的order by;limit则由offset及fetch两个方法构成,类似sql的offset及fetch

Table

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

class Table(private[flink] val tableEnv: TableEnvironment,private[flink] val logicalPlan: LogicalNode) {//......def orderBy(fields: String): Table = {val parsedFields = ExpressionParser.parseExpressionList(fields)orderBy(parsedFields: _*)}def orderBy(fields: Expression*): Table = {val order: Seq[Ordering] = fields.map {case o: Ordering => ocase e => Asc(e)}new Table(tableEnv, Sort(order, logicalPlan).validate(tableEnv))}def offset(offset: Int): Table = {new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))}def fetch(fetch: Int): Table = {if (fetch < 0) {throw new ValidationException("FETCH count must be equal or larger than 0.")}this.logicalPlan match {case Limit(o, -1, c) =>// replace LIMIT without FETCH by LIMIT with FETCHnew Table(tableEnv, Limit(o, fetch, c).validate(tableEnv))case Limit(_, _, _) =>throw new ValidationException("FETCH is already defined.")case _ =>new Table(tableEnv, Limit(0, fetch, logicalPlan).validate(tableEnv))}}//......
}
  • Table的orderBy方法,支持String或Expression类型的参数,其中String类型最终是转为Expression类型;orderBy方法最后使用Sort重新创建了Table;offset及fetch方法,使用Limit重新创建了Table(offset方法创建的Limit其fetch为-1;fetch方法如果之前没有指定offset则创建的Limit的offset为0)

Sort

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode {override def output: Seq[Attribute] = child.outputoverride protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {child.construct(relBuilder)relBuilder.sort(order.map(_.toRexNode(relBuilder)).asJava)}override def validate(tableEnv: TableEnvironment): LogicalNode = {if (tableEnv.isInstanceOf[StreamTableEnvironment]) {failValidation(s"Sort on stream tables is currently not supported.")}super.validate(tableEnv)}
}
  • Sort继承了UnaryNode,它的构造器接收Set类型的Ordering,其construct方法使用relBuilder.sort来构建sort条件

Ordering

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/expressions/ordering.scala

abstract class Ordering extends UnaryExpression {override private[flink] def validateInput(): ValidationResult = {if (!child.isInstanceOf[NamedExpression]) {ValidationFailure(s"Sort should only based on field reference")} else {ValidationSuccess}}
}case class Asc(child: Expression) extends Ordering {override def toString: String = s"($child).asc"override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {child.toRexNode}override private[flink] def resultType: TypeInformation[_] = child.resultType
}case class Desc(child: Expression) extends Ordering {override def toString: String = s"($child).desc"override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {relBuilder.desc(child.toRexNode)}override private[flink] def resultType: TypeInformation[_] = child.resultType
}
  • Ordering是一个抽象类,它有Asc及Desc两个子类

Limit

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

case class Limit(offset: Int, fetch: Int = -1, child: LogicalNode) extends UnaryNode {override def output: Seq[Attribute] = child.outputoverride protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {child.construct(relBuilder)relBuilder.limit(offset, fetch)}override def validate(tableEnv: TableEnvironment): LogicalNode = {if (tableEnv.isInstanceOf[StreamTableEnvironment]) {failValidation(s"Limit on stream tables is currently not supported.")}if (!child.isInstanceOf[Sort]) {failValidation(s"Limit operator must be preceded by an OrderBy operator.")}if (offset < 0) {failValidation(s"Offset should be greater than or equal to zero.")}super.validate(tableEnv)}
}
  • Limit继承了UnaryNode,它的构造器接收offset及fetch参数,它的construct方法通过relBuilder.limit来设置offset及fetch

小结

  • Table的orderBy方法类似sql的order by;limit则由offset及fetch两个方法构成,类似sql的offset及fetch
  • Table的orderBy方法,支持String或Expression类型的参数,其中String类型最终是转为Expression类型;orderBy方法最后使用Sort重新创建了Table;offset及fetch方法,使用Limit重新创建了Table(offset方法创建的Limit其fetch为-1;fetch方法如果之前没有指定offset则创建的Limit的offset为0)
  • Sort继承了UnaryNode,它的构造器接收Set类型的Ordering,其construct方法使用relBuilder.sort来构建sort条件;Ordering是一个抽象类,它有Asc及Desc两个子类;Limit继承了UnaryNode,它的构造器接收offset及fetch参数,它的construct方法通过relBuilder.limit来设置offset及fetch

doc

  • OrderBy, Offset & Fetch

聊聊flink Table的OrderBy及Limit相关推荐

  1. 聊聊flink Table的groupBy操作

    序 本文主要研究一下flink Table的groupBy操作 Table.groupBy flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/t ...

  2. 聊聊flink Table的ScalarFunction

    序 本文主要研究一下flink Table的ScalarFunction 实例 public class HashCode extends ScalarFunction {private int fa ...

  3. 使用flink Table Sql api来构建批量和流式应用(3)Flink Sql 使用

    从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...

  4. 使用flink Table Sql api来构建批量和流式应用(2)Table API概述

    从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...

  5. 四万字!掌握Flink Table一篇就够了

    学习工具与软件版本:开发软件IDEA.Flink1.10.2.Kafka2.0.0.Scala2.11 本章建议有一定Flink基础的伙伴学习 Apache Flink介绍.架构.原理以及实现:点击这 ...

  6. 2021年大数据Flink(三十):Flink ​​​​​​​Table API  SQL 介绍

    目录 ​​​​​​​Table API & SQL 介绍 为什么需要Table API & SQL ​​​​​​​Table API& SQL发展历程 架构升级 查询处理器的选 ...

  7. 聊聊flink的CsvTableSink

    序 本文主要研究一下flink的CsvTableSink TableSink flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/si ...

  8. 使用flink Table Sql api来构建批量和流式应用(1)Table的基本概念

    从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...

  9. Apache Flink 零基础入门(十八)Flink Table APISQL

    什么是Flink关系型API? 虽然Flink已经支持了DataSet和DataStream API,但是有没有一种更好的方式去编程,而不用关心具体的API实现?不需要去了解Java和Scala的具体 ...

最新文章

  1. oracle数据库按照城市分组_大数据分组怎样才会更快
  2. html5设计图的状态栏标准高度,HTML5 canvas自适应手机屏幕宽高度大小
  3. MongoDB安装与副本集配置
  4. wxWidgets随笔(1)-hello,world
  5. chrome android 远程调试,chrome 远程调试
  6. MediatR 知多少 - 简书
  7. ICCV 2021 揭榜!十大方向抢先看!(Transformer/分割/Action/插帧/超分等)
  8. Linxu 学习记录
  9. 基于设备树的TQ2440 DMA学习(2)—— 简单的DMA传输
  10. 28留数及其应用(四)
  11. idea中异常处理快捷键
  12. 九宫格日记:微博难越,前路漫漫
  13. cocos2d 屏幕適配_cocos2dx 3.2 屏幕适配的理解
  14. H5网页漫画小说苹果cms模板\支持对接公众号\支持三级分销
  15. 2022最新高级java面试题
  16. 广东医科大学计算机网络,广东海洋大学计算机网络历年考题(直接阅读版6套可编辑)课件.doc...
  17. 2021福建漳州高考成绩查询,2021年漳州高考成绩排名及成绩公布时间什么时候出来...
  18. 阿里云云数据库(RDS)的登录问题
  19. 基于android音乐播放器的设计
  20. 组合数学之隔板法:多元一次方程组解的统计

热门文章

  1. 求一个序列中最大的子序列_最大的斐波那契子序列
  2. HDFC的完整形式是什么?
  3. 如何关掉Microsoft Office Click-to-Run服务
  4. 二、规则组织数学模型的建立
  5. centos升级之内核kernel
  6. C语言解析http请求表单内容
  7. 嵌入式开发硬件知识札记
  8. C++析构函数执行顺序
  9. 探索 Java 隐藏的开销
  10. Java架构师必备框架技能核心笔记,工作感悟