目录

Table API 和 Flink SQL

1 Table API 和 Flink SQL 是什么

2 基本程序结构

3 创建 TableEnvironment

4 表(Table)

4.1 创建表

4.2 输出到文件

4.3 更新模式

4.4 输出到 Kafka

4.5 输出到 ES

4.6 输出到 MySql

5 将 Table 转换成 DataStream

5.1 将 Table 转换成 DataStream

5.2 将 DataStream 转换成表

5.3 创建临时视图(Temporary View)

6 查看执行计划

7 流处理和关系代数的区别

7.1 动态表(Dynamic Tables)

7.2 动态表和持续查询

7.3

7.3.1 将流转换成动态表

7.3.2 持续查询

7.3.3 将动态表转换成 DataStream

8 时间特性(Time Attributes)

8.1 定义处理时间(Processing Time)

8.2 定义事件时间(Event Time)

9 窗口

9.1 Group Windows

9.1.1 滚动窗口(Tumbling windows)

9.1.2 滑动窗口(Sliding windows)

9.1.3 会话窗口(Session windows)

9.2 SQL 中的 Group Windows

9.3 Over Windows

9.3.1 无界 Over Windows

9.3.2 有界 Over Windows

9.4 SQL 中的 Over Windows

10 函数(Functions)

10.1 Flink Table API 和 SQL 为用户提供了一组用于数据转换的内置函数

10.2 用户自定义函数(UDF)

10.2.1 标量函数(Scalar Functions)

10.2.2 表函数(Table Functions)

10.2.3 聚合函数(Aggregate Functions)

10.2.4 表聚合函数(Table Aggregate Functions)


Table API 和 Flink SQL

1 Table API 和 Flink SQL 是什么

Table API 是流处理和批处理通用的关系型 API,Table API 可以基于流输入或者 批输入来运行而不需要进行任何修改。Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的,Table API 是 Scala 和 Java 语言集成式的 API。与常规 SQL 语言中将 查询指定为字符串不同,Table API 查询是以 Java 或 Scala 中的语言嵌入样式来定义 的,具有 IDE 支持如:自动完成和语法检测。

Flink 对批处理和流处理,提供了统一的上层 API

Table API 是一套内嵌在 Java 和 Scala 语言中的查询API,它允许以非常直 观的方式组合来自一些关系运算符的查询

Flink 的 SQL 支持基于实现了 SQL 标准的 Apache Calcite

写一个小示例来入门TableAPI和Flink SQL

首先要引入依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>1.10.1</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>1.10.1</version>
</dependency>

代码示例:

package com.dongda.tableapi;import com.dongda.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;public class Example {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//为了方便观察打印出来的结果,将全局并行度设置为1//从文件里面读取数据DataStream<String> inputStream = env.readTextFile("/Users/haitaoyou/developer/flink/src/main/resources/sensor.txt");//转换成POJODataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});//3.创建表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//4.基于数据流创建一张表Table dataTable = tableEnv.fromDataStream(dataStream);//5.调用table API进行转换操作Table resultTable = dataTable.select("id,temperature").where("id='sensor_1'");//6.执行SQL,我们必须专门注册一张表tableEnv.createTemporaryView("sensor",dataTable);String sql="select id,temperature from sensor where id='sensor_1'";Table resultSqlTable = tableEnv.sqlQuery(sql);//我们可以看到tableAPI和直接写SQL是等价的tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toAppendStream(resultSqlTable,Row.class).print("sql");env.execute();}
}

2 基本程序结构

3 创建 TableEnvironment

创建表的执行环境,需要将 flink 流处理的执行环境传入

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

TableEnvironment 是 flink 中集成 Table API 和 SQL 的核心概念,所有对 表的操作都基于 TableEnvironment

  • –  注册 Catalog

  • –  在 Catalog 中注册表

  • –  执行 SQL 查询

  • – 注册用户自定义函数(UDF)

配置 TableEnvironment

package com.dongda.tableapi;import com.dongda.beans.SensorReading;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;public class Example {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//为了方便观察打印出来的结果,将全局并行度设置为1//1.1 基于老版本planner的流处理EnvironmentSettings oldStreamSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env, oldStreamSettings);//1.2 基于老版本planner的批处理ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv);//1.3 基于Blink的流处理EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings);//1.4 基于Blink的批处理EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();TableEnvironment blinkBatchtableEnv = TableEnvironment.create(blinkBatchSettings);}
}

4 表(Table)

  • TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表

  • 表(Table)是由一个“标识符”(identifier)来指定的,由3部分组成: Catalog名、数据库(database)名和对象名

  • 表可以是常规的,也可以是虚拟的(视图,View)

  • 常规表(Table)一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream转换而来

  • 视图(View)可以从现有的表中创建,通常是 table API 或者 SQL 查询的 一个结果集

4.1 创建表

TableEnvironment 可以调用 .connect() 方法,连接外部系统,并调用 .createTemporaryTable() 方法,在 Catalog 中注册表

package com.dongda.tableapi;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;public class Example {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//为了方便观察打印出来的结果,StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2.表的创建:连接外部系统,读取数据//2.1 读取文件String filePath="/Users/haitaoyou/developer/flink/src/main/resources/sensor.txt";//读取文件 组册表tableEnv.connect(new FileSystem().path(filePath)).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("timestamp",DataTypes.BIGINT()).field("temperature",DataTypes.DOUBLE())).createTemporaryTable("inputTable");Table inputTable = tableEnv.from("inputTable");//3.查询转换//3.1 Table API//简单转换Table resultTable = inputTable.select("id,temperature").filter("id ==='sensor_6'");//聚合统计Table aggTable = inputTable.groupBy("id").select("id ,id.count as count, temperature.avg as avgTemp");//3.2 SQLtableEnv.sqlQuery("select id, temperature from inputTable where id = 'sensor_6'");Table sqlAggTable = tableEnv.sqlQuery("select id,count(id) as cnt, avg(temperature) as avgTemp from inputTable group by id");//打印输出tableEnv.toAppendStream(resultTable,Row.class).print("result");tableEnv.toRetractStream(aggTable,Row.class).print("agg");tableEnv.toRetractStream(sqlAggTable,Row.class).print("sqlagg");env.execute();}
}

4.2 输出到文件

package com.dongda.tableapi;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;public class Example {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//为了方便观察打印出来的结果,StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2.表的创建:连接外部系统,读取数据//2.1 读取文件String filePath="/Users/haitaoyou/developer/flink/src/main/resources/sensor.txt";//读取文件 组册表tableEnv.connect(new FileSystem().path(filePath)).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("timestamp",DataTypes.BIGINT()).field("temperature",DataTypes.DOUBLE())).createTemporaryTable("inputTable");Table inputTable = tableEnv.from("inputTable");//3.查询转换//3.1 Table API//简单转换Table resultTable = inputTable.select("id,temperature").filter("id ==='sensor_6'");//聚合统计Table aggTable = inputTable.groupBy("id").select("id ,id.count as count, temperature.avg as avgTemp");//3.2 SQLtableEnv.sqlQuery("select id, temperature from inputTable where id = 'sensor_6'");Table sqlAggTable = tableEnv.sqlQuery("select id,count(id) as cnt, avg(temperature) as avgTemp from inputTable group by id");//4.输出到文件//连接外部文件注册输出表String outputPath="/Users/haitaoyou/developer/flink/src/main/resources/out.txt";tableEnv.connect(new FileSystem().path(outputPath)).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("temperature",DataTypes.DOUBLE())).createTemporaryTable("outputTable");resultTable.insertInto("outputTable");env.execute();}
}

4.3 更新模式

• 对于流式查询,需要声明如何在表和外部连接器之间执行转换

• 与外部系统交换的消息类型,由更新模式(Update Mode)指定

➢ 追加(Append)模式
     – 表只做插入操作,和外部连接器只交换插入(Insert)消息

➢ 撤回(Retract)模式
     – 表和外部连接器交换添加(Add)和撤回(Retract)消息

– 插入操作(Insert)编码为 Add 消息;删除(Delete)编码为 Retract 消息;更新(Update) 编码为上一条的 Retract 和下一条的 Add 消息

➢ 更新插入(Upsert)模式
     – 更新和插入都被编码为 Upsert 消息;删除编码为 Delete 消息

4.4 输出到 Kafka

可以创建 Table 来描述 kafka 中的数据,作为输入或输出的 TableSink

package com.dongda.tableapi;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;public class Example {public static void main(String[] args) throws Exception {//1.创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//为了方便观察打印出来的结果,StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2.连接kafka,读取数据tableEnv.connect(new Kafka().version("0.11").topic("sensor").property("zookeeper.connect","localhost:2181").property("bootstrap.servers","localhost:9092")).withFormat(new Csv()).withSchema(new Schema().field("id",DataTypes.STRING()).field("timestamp",DataTypes.BIGINT()).field("temp",DataTypes.DOUBLE())).createTemporaryTable("inputTable");//3.简单转换Table sensorTable = tableEnv.from("inputTable");//3.1 Table API//简单转换Table resultTable = sensorTable.select("id,temperature").filter("id ==='sensor_6'");//聚合统计Table aggTable = sensorTable.groupBy("id").select("id ,id.count as count, temperature.avg as avgTemp");//4.kafka进kafka出,建立kafka连接,输出到不同的topic下tableEnv.connect(new Kafka().version("0.11").topic("sinkTest").property("zookeeper.connect","localhost:2181").property("bootstrap.servers","localhost:9092")).withFormat(new Csv()).withSchema(new Schema().field("id",DataTypes.STRING())
//                        .field("timestamp",DataTypes.BIGINT()).field("temp",DataTypes.DOUBLE())).createTemporaryTable("outputTable");resultTable.insertInto("outputTable");env.execute();}
}

4.5 输出到 ES

可以创建 Table 来描述 ES 中的数据,作为输出的 TableSink

4.6 输出到 MySql

可以创建 Table 来描述 MySql 中的数据,作为输入和输出

5 将 Table 转换成 DataStream

  • 表可以转换为 DataStream 或 DataSet ,这样自定义流处理或批处理程序就 可以继续在 Table API 或 SQL 查询的结果上运行了

  • 将表转换为 DataStream 或 DataSet 时,需要指定生成的数据类型,即要将 表的每一行转换成的数据类型

  • 表作为流式查询的结果,是动态更新的

  • 转换有两种转换模式:追加(Append)模式和撤回(Retract)模式

5.1 将 Table 转换成 DataStream

➢ 追加模式(Append Mode)

– 用于表只会被插入(Insert)操作更改的场景

DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);

➢ 撤回模式(Retract Mode)

–  用于任何场景。有些类似于更新模式中 Retract 模式,它只有 Insert 和 Delete 两类操作。

–  得到的数据会增加一个 Boolean 类型的标识位(返回的第一个字段),用它来表示到底是 新增的数据(Insert),还是被删除的数据(Delete)

DataStream<Tuple2<Boolean, Row>> aggResultStream = tableEnv .toRetractStream(aggResultTable , Row.class);

5.2 将 DataStream 转换成表

• 对于一个 DataStream,可以直接转换成 Table,进而方便地调用 Table API 做转换操作

DataStream<SensorReading> dataStream = ...
Table sensorTable = tableEnv.fromDataStream(dataStream);

• 默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来

DataStream<SensorReading> dataStream = ...
Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature");

5.3 创建临时视图(Temporary View)

• 基于 DataStream 创建临时视图

tableEnv.createTemporaryView("sensorView", dataStream);

tableEnv.createTemporaryView("sensorView",
dataStream, "id, temperature, timestamp as ts");

• 基于 Table 创建临时视图

tableEnv.createTemporaryView("sensorView", sensorTable);

6 查看执行计划

Table API 提供了一种机制来解释计算表的逻辑和优化查询计划

查看执行计划,可以通过 TableEnvironment.explain(table) 方法或 TableEnvironment.explain() 方法完成,返回一个字符串,描述三个计划

➢ 优化的逻辑查询计划

➢ 优化后的逻辑查询计划

➢ 实际执行计划

String explaination = tableEnv.explain(resultTable);

System.out.println(explaination);

7 流处理和关系代数的区别

7.1 动态表(Dynamic Tables)

• 动态表是 Flink 对流数据的 Table API 和 SQL 支持的核心概念

• 与表示批处理数据的静态表不同,动态表是随时间变化的

➢ 持续查询(Continuous Query)

  • 动态表可以像静态的批处理表一样进行查询,查询一个动态表会产生持续查询(Continuous Query)

  • 连续查询永远不会终止,并会生成另一个动态表

  • 查询会不断更新其动态结果表,以反映其动态输入表上的更改

7.2 动态表和持续查询

➢ 流式表查询的处理过程:
1. 流被转换为动态表
2. 对动态表计算连续查询,生成新的动态表

3. 生成的动态表被转换回流

7.3

7.3.1 将流转换成动态表

• 为了处理带有关系查询的流,必须先将其转换为表

• 从概念上讲,流的每个数据记录,都被解释为对结果表的插入 (Insert)修改操作

7.3.2 持续查询

• 持续查询会在动态表上做计算处理,并作为结果生成新的动态表

7.3.3 将动态表转换成 DataStream

  • 与常规的数据库表一样,动态表可以通过插入(Insert)、更新(Update)和删除(Delete)更改,进行持续的修改

  • 将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码

  • ➢  仅追加(Append-only)流

  • – 仅通过插入(Insert)更改来修改的动态表,可以直接转换为仅追加流

  • ➢  撤回(Retract)流

    – 撤回流是包含两类消息的流:添加(Add)消息和撤回(Retract)消息

  • ➢  Upsert(更新插入)流

– Upsert 流也包含两种类型的消息:Upsert 消息和删除(Delete)消息。

8 时间特性(Time Attributes)

  • 基于时间的操作(比如 Table API 和 SQL 中窗口操作),需要定义相关的时 间语义和时间数据来源的信息

  • Table 可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳

  • 时间属性,可以是每个表schema的一部分。一旦定义了时间属性,它就可以 作为一个字段引用,并且可以在基于时间的操作中使用

  • 时间属性的行为类似于常规时间戳,可以访问,并且进行计算

8.1 定义处理时间(Processing Time)

• 处理时间语义下,允许表处理程序根据机器的本地时间生成结果。它是时间 的最简单概念。它既不需要提取时间戳,也不需要生成 watermark

➢ 由 DataStream 转换成表时指定

• 在定义Schema期间,可以使用.proctime,指定字段名定义处理时间字段

• 这个proctime属性只能通过附加逻辑字段,来扩展物理schema。因此,只 能在schema定义的末尾定义它

Table sensorTable = tableEnv.fromDataStream(dataStream,"id, temperature, timestamp, pt.proctime");

➢ 定义 Table Schema 时指定

➢ 在创建表的 DDL 中定义​​​​​​​

8.2 定义事件时间(Event Time)

  • 事件时间语义,允许表处理程序根据每个记录中包含的时间生成结果。这样 即使在有乱序事件或者延迟事件时,也可以获得正确的结果。

  • 为了处理无序事件,并区分流中的准时和迟到事件;Flink 需要从事件数据中, 提取时间戳,并用来推进事件时间的进展

  • 定义事件时间,同样有三种方法:

    ➢ 由 DataStream 转换成表时指定

➢ 定义 Table Schema 时指定

➢ 在创建表的 DDL 中定义

➢ 由 DataStream 转换成表时指定
• 在 DataStream 转换成 Table,使用 .rowtime 可以定义事件时间属性

➢ 定义 Table Schema 时指定

➢ 在创建表的 DDL 中定义

9 窗口

• 时间语义,要配合窗口操作才能发挥作用

• 在 Table API 和 SQL 中,主要有两种窗口

➢ Group Windows(分组窗口)

– 根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数

➢ Over Windows
    – 针对每个输入行,计算相邻行范围内的聚合

9.1 Group Windows

Group Windows 是使用 window(w:GroupWindow)子句定义的,并且 必须由as子句指定一个别名。

为了按窗口对表进行分组,窗口的别名必须在 group by 子句中,像常规的分组字段一样引用

Table API 提供了一组具有特定语义的预定义 Window 类,这些类会被转换 为底层 DataStream 或 DataSet 的窗口操作

9.1.1 滚动窗口(Tumbling windows)

滚动窗口要用 Tumble 类来定义

9.1.2 滑动窗口(Sliding windows)

滑动窗口要用 Slide 类来定义

9.1.3 会话窗口(Session windows)

会话窗口要用 Session 类来定义

9.2 SQL 中的 Group Windows

• Group Windows 定义在 SQL 查询的 Group By 子句中

➢ TUMBLE(time_attr, interval)

• 定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口长度

➢ HOP(time_attr, interval, interval)

• 定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是 窗口长度

➢ SESSION(time_attr, interval)
• 定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔

9.3 Over Windows

Over window 聚合是标准 SQL 中已有的(over 子句),可以在查询的 SELECT 子句中定义

Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合

Over windows 使用 window(w:overwindows*)子句定义,并在 select ()方法中通过别名来引用

Table API 提供了 Over 类,来配置 Over 窗口的属性

9.3.1 无界 Over Windows

• 可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义 Over windows

• 无界的 over window 是使用常量指定的

9.3.2 有界 Over Windows

有界的 over window 是用间隔的大小指定的

9.4 SQL 中的 Over Windows

• 用 Over 做窗口聚合时,所有聚合必须在同一窗口上定义,也就是说必须是 相同的分区、排序和范围

• 目前仅支持在当前行范围之前的窗口
• ORDER BY 必须在单一的时间属性上指定

10 函数(Functions)

10.1 Flink Table API 和 SQL 为用户提供了一组用于数据转换的内置函数

SQL 中支持的很多函数,Table API 和 SQL 都已经做了实现

10.2 用户自定义函数(UDF)

• 用户定义函数(User-defined Functions,UDF)是一个重要的特性,它们显著地扩展了查询的表达能力

• 在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用

• 函数通过调用 registerFunction()方法在 TableEnvironment 中注册。当 用户定义的函数被注册时,它被插入到 TableEnvironment 的函数目录中, 这样Table API 或 SQL 解析器就可以识别并正确地解释它

10.2.1 标量函数(Scalar Functions)

1)用户定义的标量函数,可以将0、1或多个标量值,映射到新的标量值 (一进一出)

2)为了定义标量函数,必须在 org.apache.flink.table.functions 中扩展基类 Scalar Function,并实现(一个或多个)求值(eval)方法

3)标量函数的行为由求值方法决定,求值方法必须公开声明并命名为 eval

package com.dongda;import com.dongda.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;public class UdfTest1_ScalarFunction {public static void main(String[] args) throws Exception {//1.创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//为了方便观察打印出来的结果,//2.创建表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//从文件里面读取数据DataStream<String> inputStream = env.readTextFile("/Users/haitaoyou/developer/flink/src/main/resources/sensor.txt");//lamda表达式写法DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});//3.将流转换成表Table sensorTable = tableEnv.fromDataStream(dataStream, "id,timestamp as ts,temperature as temp");//4.自定义标量函数,实现求id的hash值//4.1 table APIHashCode hashCode = new HashCode(23);//需要在环境中注册UDFtableEnv.registerFunction("hashCode",hashCode);Table resultTable = sensorTable.select("id,ts,hashCode(id)");//4.2 SQLtableEnv.createTemporaryView("sensor",sensorTable);Table resultSqlTable = tableEnv.sqlQuery("select id,ts,hashCode(id) from sensor");//打印输出tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toAppendStream(resultSqlTable,Row.class).print("sql");env.execute();}public static class HashCode extends ScalarFunction{private int factor = 13;public HashCode(int factor) {this.factor = factor;}public int eval(String str){return str.hashCode()*factor;}}
}

10.2.2 表函数(Table Functions)

1)用户定义的表函数,也可以将0、1或多个标量值作为输入参数;与标量函数不同 的是,它可以返回任意数量的行作为输出,而不是单个值 (一进多出)

2)为了定义一个表函数,必须扩展 org.apache.flink.table.functions 中的基类 TableFunction 并实现(一个或多个)求值方法

3)表函数的行为由其求值方法决定,求值方法必须是 public 的,并命名为 eval

10.2.3 聚合函数(Aggregate Functions)

• 用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs) 可以把一个表中的数据,聚合成一个标量值 (多进一出)

• 用户定义的聚合函数,是通过继承 AggregateFunction 抽象类实现的

• AggregationFunction要求必须实现的方法:

– createAccumulator()

– accumulate() – getValue()

• AggregateFunction 的工作原理如下:

–  首先,它需要一个累加器(Accumulator),用来保存聚合中间结果的数据结构;可以通过调用 createAccumulator() 方法创建空累加器

–  随后,对每个输入行调用函数的 accumulate() 方法来更新累加器

–  处理完所有行后,将调用函数的 getValue() 方法来计算并返回最终结果

package com.dongda;import com.dongda.beans.SensorReading;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;public class UdfTest1_ScalarFunction {public static void main(String[] args) throws Exception {//1.创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//为了方便观察打印出来的结果,//2.创建表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//从文件里面读取数据DataStream<String> inputStream = env.readTextFile("/Users/haitaoyou/developer/flink/src/main/resources/sensor.txt");//lamda表达式写法DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});//3.将流转换成表Table sensorTable = tableEnv.fromDataStream(dataStream, "id,timestamp as ts,temperature as temp");//4.自定义聚合函数,求当前传感器的平均温度值//4.1 table APIAvgTemp avgTemp = new AvgTemp();//需要在环境中注册UDFtableEnv.registerFunction("avgTemp", avgTemp);Table resultTable = sensorTable.groupBy("id").aggregate("avgTemp(temp) as avgTemp").select("id,avgTemp");//4.2 SQLtableEnv.createTemporaryView("sensor", sensorTable);Table resultSqlTable = tableEnv.sqlQuery("select id,avgTemp(temp) from sensor group by id");//打印输出tableEnv.toRetractStream(resultTable, Row.class).print("result");tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql");env.execute();}//实现自定义的AggregateFunctionpublic static class AvgTemp extends AggregateFunction<Double, Tuple2<Double, Integer>> {@Overridepublic Double getValue(Tuple2<Double, Integer> accumulator) {return accumulator.f0 / accumulator.f1;}@Overridepublic Tuple2<Double, Integer> createAccumulator() {return new Tuple2<>(0.0, 0);}//必须实现一个accumulate方法,来数据之后更新状态public void accumulate(Tuple2<Double, Integer> accumulator, Double temp) { //必须前面是状态,后面是传进来的值accumulator.f0 += temp;accumulator.f1 += 1;  //当前对于状态的改变}}
}

10.2.4 表聚合函数(Table Aggregate Functions)

• 用户定义的表聚合函数(User-Defined Table Aggregate Functions, UDTAGGs),可以把一个表中数据,聚合为具有多行和多列的结果表

• 用户定义表聚合函数,是通过继承 TableAggregateFunction 抽象类来实现的

表聚合函数(Table Aggregate Functions)

AggregationFunction 要求必须实现的方法:

– createAccumulator()

– accumulate()

– emitValue()

TableAggregateFunction 的工作原理如下:

  • –  首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据 结构。通过调用 createAccumulator() 方法可以创建空累加器。

  • –  随后,对每个输入行调用函数的 accumulate() 方法来更新累加器。

  • –  处理完所有行后,将调用函数的 emitValue() 方法来计算并返回最终结果。

大话Flink之十一Table API 和 Flink SQL相关推荐

  1. flink中的table api中的CloseableIterator是什么意思?

    在官网玩耍table api的时候看一个 table api的example的时候看到这么一段代码: try (CloseableIterator<Row> iterator = youn ...

  2. Flink学习笔记(十一)Table API 和 SQL

    文章目录 11. Table API 和 SQL 11.1 快速上手 11.1.1 需要依赖 11.1.2 示例 11.2 基本 API 11.2.1 程序架构 11.2.2 创建表环境 11.2.3 ...

  3. Flink的Table API 与SQL的流处理

    1 流处理与SQL的区别   Table API和SQL,本质上还是基于关系型表的操作方式:而关系型表.SQL本身,一般是有界的,更适合批处理的场景.所以在流处理的过程中,有一些特殊概念. SQL 流 ...

  4. Flink的Table API 与SQL介绍及调用

    1 概述    DataSetAPI和DateStreamAPI是基于整个Flink的运行时环境做操作处理的,Table API和SQL是在DateStreamAPI上又包了一层.对于新版本的Blin ...

  5. flink Table Api 理论篇

    Table API 和 Flink SQL 是什么 1.Flink 对批处理和流处理,提供了统一的上层 API: 2.Table API 是一套内嵌在 Java 和 Scala 语言中的查询API,它 ...

  6. Flink Table API和SQL(下)

    传送门: Flink Table API和SQL(上)(基本API介绍+流处理表的特性) Flink Table API和SQL(中)(时间属性及窗口+聚合查询+联结查询) Flink Table A ...

  7. 如何在 Apache Flink 中使用 Python API?

    本文根据 Apache Flink 系列直播课程整理而成,由 Apache Flink PMC,阿里巴巴高级技术专家 孙金城 分享.重点为大家介绍 Flink Python API 的现状及未来规划, ...

  8. python flink_如何在 Apache Flink 中使用 Python API?

    原标题:如何在 Apache Flink 中使用 Python API? 导读:本文重点为大家介绍 Flink Python API 的现状及未来规划,主要内容包括:Apache Flink Pyth ...

  9. 第六课 大数据技术之Fink1.13的实战学习-Table Api和SQL

    第六课 大数据技术之Fink1.13的实战学习-Table Api和SQL 文章目录 第六课 大数据技术之Fink1.13的实战学习-Table Api和SQL 第一节 Fink SQL快速上手 1. ...

最新文章

  1. 生产环境实用的小脚本
  2. php 数组 随机选择_php中通过数组进行高效随机抽取指定条记录的算法
  3. To handle Unhandled Exception
  4. vs2005 + ASP.NET 页面布局应注意问题及方法步骤
  5. 数据中心系统管理员基础知识培训
  6. Python解析json字符串,json字符串用法
  7. Java多线程安全问题解决的两种方式代码案例
  8. flask tutorial = make a blog :) flask 搭建博客系统从零开始!
  9. linux下如何做ghost,又简单又方便,很实用的方法!!!
  10. java中类似webapi,在.net框架应用程序中包含.net核心WebAPI?
  11. Django 聚合(译)
  12. 给定一个字符串,求第一个不重复的字符
  13. 瑞星杀毒软件2007 / 瑞星个人防火墙2007 - 免费试用
  14. DEA模型(数据包络分析)deap2.1操作方法
  15. OpenCV获取不规则区域的最大内切圆(附Python / C++源码)
  16. 微信分享本地视频到朋友圈,收藏或者对话
  17. 台式计算机关闭屏幕快捷键,多种电脑屏幕关闭方法推荐
  18. 2022-03-03 北京 计算机知识。字符编码,ppt
  19. 基于Kivy的HDR拍摄软件案例分享
  20. 英语语法回顾5——状语和状语从句

热门文章

  1. 一款非常好玩的小程序游戏推荐给大家,基于cocos creator引擎开发的
  2. 热锅热油,热锅冷油,热锅宽油,锅上火滑油,留底油区别
  3. 细品RibbonX(25):使用自定义图片和库
  4. Google Adsense付款方式添加西联付款
  5. 近期学到的css样式
  6. PHP加速和部署多主机LAMP架构
  7. 隐藏控制面板或360软件管家中的部分程序
  8. ARM版本Cortex-M/R/A 芯片内核架构
  9. HTTP为什么不安全?HTTP安全漏洞 Why is HTTP not secure? HTTP Security Gaps
  10. 搜狗浏览器F5,ctrl+F等快捷键不起作用的解决