文章目录

  • Fink SQL和Table API
    • 示例
    • 基本API
      • 依赖
      • 程序架构
      • 创建表环境
      • 创建表
        • 连接器表
        • 虚拟表
      • 表的查询
        • 执行SQL进行查询
        • 调用Table API进行查询
      • 输出表
      • 表和流的转换
        • 表转换为流
        • 流转换为表
        • 支持的数据类型
          • 原子类型
          • Tuple类型
          • POJO类型
          • ROW类型
    • 流处理中的表
      • 动态表和持续查询
        • 动态表
        • 持续查询
      • 将流转换为动态表
      • 使用SQL持续查询
        • 更新查询
        • 追加查询
        • 查询限制
      • 将动态表转换为流
        • 仅追加流
        • 撤回流
        • 更新插入流
      • 时间属性和窗口
        • 事件时间
          • 在创建的DDL中定义
          • 在数据流转换为表时定义
        • 处理时间
          • 在创建表的DDL中定义
        • 窗口
          • 分组窗口
          • 窗口表值函数
    • 聚合查询
      • 分组聚合
      • 窗口聚合
      • 开窗聚合
    • 联结join查询
      • 常规联结查询
        • 等值内联结
        • 等值外联结
      • 间隔联结查询
    • 函数
      • 系统函数
        • 标量函数
          • 比较函数(Comparison Functions)
          • 逻辑函数(Logical Functions)
          • 算术函数(Arithmetic Functions)
          • 字符串函数(String Functions)
          • 时间函数(Temporal Functions)
        • 聚合函数
      • 自定义函数
        • 标量函数
        • 表函数
        • 聚合函数
        • 表聚合函数
    • SQL客户端
    • 连接到外部系统
      • kafka
        • Upsert Kafka
      • 文件系统
      • JDBC
      • Elasticsearch
      • HBase
      • Hive
        • SQL方言

Fink SQL和Table API

Flink提供的多层级API中,核心是DataStream API,使我们开发的基本途径;底层则是处理函数,可以访问事件时间信息,内存状态,窗口信息,定时器等。

在企业应用中,会有大量类似的处理逻辑,所以一般会将底层API包装成具体的应用级接口,也就是SQL,flink提供了Table API和SQL处理表结构数据。

spark,mapreduce,flink这些框架的流批处理功能和接口:

mapreduce:基础批处理编程接口

spark:基础批处理编程接口,spark streaming DStream流处理接口,spark sql结构化数据处理接口

flink:DataStream流批一体接口,Table API和SQL 流批一体接口

离线数仓的分层结构,不可能达到实时的处理速度,实时要求都是1秒甚至毫秒为单位,离线数仓的分层结构导致有大量的落盘操作,而且它是为单次大批量输入设计的,以天等时间单位为粒度。

实时数仓只有处理的数据流,在最后阶段进行数据的sink。中间步骤一般使用kafka进行临时存储,统计的一般是当天的数据,用于一些实时的指标。

示例

public class TableExample {public static void main(String[] args) throws Exception {// 获取流执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据源SingleOutputStreamOperator<Event> eventStream = env.fromElements(new Event("Alice", "./home", 1000L),new Event("Bob", "./cart", 1000L),new Event("Alice", "./prod?id=1", 5 * 1000L),new Event("Cary", "./home", 60 * 1000L),new Event("Bob", "./prod?id=3", 90 * 1000L),new Event("Alice", "./prod?id=7", 105 * 1000L));// 获取表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 将数据流转换成表Table eventTable = tableEnv.fromDataStream(eventStream);// 用执行 SQL 的方式提取数据Table visitTable = tableEnv.sqlQuery("select url, user from " + eventTable);// 将表转换成数据流,打印输出tableEnv.toDataStream(visitTable).print();// 执行程序env.execute();}
}

基本API

依赖

<dependency><groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
#如果想实现自定义的数据格式来做序列化,可以引入下面的依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version>
</dependency>

程序架构

// 创建表环境
TableEnvironment tableEnv = ...;
// 创建输入表,连接外部系统读取数据
tableEnv.executeSql("CREATE TEMPORARY TABLE inputTable ... WITH ( 'connector'
= ... )");
// 注册一个表,连接到外部系统,用于输出
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector'
= ... )");
// 执行 SQL 对表进行查询转换,得到一个新的表
Table table1 = tableEnv.sqlQuery("SELECT ... FROM inputTable... ");
// 使用 Table API 对表进行查询转换,得到一个新的表
Table table2 = tableEnv.from("inputTable").select(...);
// 将得到的结果写入输出表
TableResult tableResult = table1.executeInsert("outputTable");

创建表环境

 (1)注册catalog和表
(2)执行 SQL 查询;
(3)注册用户自定义函数(UDF);
(4)DataStream 和表之间的转换。#指定配置创建
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode() // 使用流处理模式.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

创建表

表在环境中有一个唯一的ID,由三部分组成:目录catalog,数据库database,以及表名

默认目录为default_catalog,数据库名为default_database,所以如果创建一个MyTable表,其ID为

default_catalog.default_database.MyTable

创建表的方式,有通过连接器和虚拟表两种

连接器表

连接到外部系统,我们可以从连接器的表中读取数据,他就会从外部系统读取数据并转换。而当我们向这张表写入数据,连接器就会sink到外部系统中。

tableEnv.executeSql("CRATE [TEMPORY] TABLE MyTable ... WITH ( 'connector'
= ... )");
#设定catalog和database
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");
虚拟表

tableEnv.createTemporaryView(“NewTable”, newTable);

上边将中间表注册到环境中,这样才能对其使用sql语句。虚拟表和视图非常相似。

表的查询

执行SQL进行查询
#使用sqlQuery传入sql字符串
TableEnvironment tableEnv = ...;
// 创建表
tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )");
// 查询用户 Alice 的点击事件,并提取表中前两个字段
Table aliceVisitTable = tableEnv.sqlQuery("SELECT user, url " +"FROM EventTable " +"WHERE user = 'Alice' ");#将中间表手动注册为虚拟表,或者直接在sql中插入新表,如下
// 将查询结果输出到 OutputTable 中
tableEnv.executeSql ("INSERT INTO OutputTable " +"SELECT user, url " +"FROM EventTable " +"WHERE user = 'Alice' ");
调用Table API进行查询
#获取表
Table eventTable = tableEnv.from("EventTable");
#
Table maryClickTable = eventTable.where($("user").isEqual("Alice")).select($("url"), $("user"));

输出表

#调用executeInsert方法将一个table写入注册过的表中
// 注册表,用于输出数据到外部系统
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");
// 经过查询转换,得到结果表
Table result = ...
// 将结果表写入已注册的输出表中
result.executeInsert("OutputTable");

表和流的转换

表转换为流
#将表转换为流
Table aliceVisitTable = tableEnv.sqlQuery("SELECT user, url " +"FROM EventTable " +"WHERE user = 'Alice' ");
// 将表转换成数据流
tableEnv.toDataStream(aliceVisitTable).print();
#调用toChangelogStream()方法,对应聚合结果的表,无法直接调用toDataStream方法,因为它不是单纯的插入新的数据,而是当新数据到来后重新计算的动态表。
Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) " +"FROM EventTable " +"GROUP BY user ");tableEnv.toChangelogStream(urlCountTable).print()
流转换为表
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 获取表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 读取数据源
SingleOutputStreamOperator<Event> eventStream = env.addSource(...)
// 将数据流转换成表
Table eventTable = tableEnv.fromDataStream(eventStream);// 提取 Event 中的 timestamp 和 url 作为表中的列
Table eventTable2 = tableEnv.fromDataStream(eventStream, $("timestamp"),
$("url"));// 将 timestamp 字段重命名为 ts
Table eventTable2 = tableEnv.fromDataStream(eventStream, $("timestamp").as("ts"),
$("url"));#注册表名到环境中,方便使用sql
tableEnv.createTemporaryView("EventTable", eventStream,
$("timestamp").as("ts"),$("url"));
支持的数据类型
原子类型

1原子类型:基础数据类型(Integer,Double,String)和通用数据类型(不可拆分的)

Tuple类型

2Tuple类型:将tuple分别解析为列名为f0,f1,f2… …的表。

StreamTableEnvironment tableEnv = ...;
DataStream<Tuple2<Long, Integer>> stream = ...;
// 将数据流转换成只包含 f1 字段的表
Table table = tableEnv.fromDataStream(stream, $("f1"));
// 将数据流转换成包含 f0 和 f1 字段的表,在表中 f0 和 f1 位置交换
Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0"));
// 将 f1 字段命名为 myInt,f0 命名为 myLong
Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"),
$("f0").as("myLong"));
POJO类型

默认映射POJO的默认字段名

StreamTableEnvironment tableEnv = ...;
DataStream<Event> stream = ...;
Table table = tableEnv.fromDataStream(stream);
Table table = tableEnv.fromDataStream(stream, $("user"));
Table table = tableEnv.fromDataStream(stream, $("user").as("myUser"),
$("url").as("myUrl"));
ROW类型

Row在使用时必须指明具体的类型,Row类型长度固定,而且无法推断每个字段的类型。

DataStream<Row> dataStream =env.fromElements(Row.ofKind(RowKind.INSERT, "Alice", 12),Row.ofKind(RowKind.INSERT, "Bob", 5),Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));
// 将更新日志流转换为表
Table table = tableEnv.fromChangelogStream(dataStream);

流处理中的表

一般来说,sql适合批处理的场景,如hive,msyql。流处理的概念域sql概念有些不符,这里对两者的概念进行对比。

动态表和持续查询

动态表

由于流数据不断增加,那么表也是动态变化的。

实际上,传统关系型数据库中,表中的数据也是一系列INSERT,UPDATE,DELETE语句执行的结果,在关系型数据库中,一般称为更新日志流。Flink中的动态表,就类似用日志流形式维持动态表,在流和表之间搭建桥梁。

持续查询

每当有新的数据到来,就对表中的所有数据进行全体批处理查询,这种连续查询也会生成动态表。

这么说,这是一种批处理了,岂不是和流处理相悖了。

将流转换为动态表

#借助只有插入操作的更新日志流维持一个动态表,每当新数据到来,生成一个插入日志流。
tableEnv.createTemporaryView("EventTable", eventStream, $("user"), $("url"),
$("timestamp").as("ts"));

使用SQL持续查询

更新查询
#这个表的日志流包含update的insert两种,所以这个查询结果也必须使用toChangelogStream方法转为DataStream
Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM
EventTable GROUP BY user");
追加查询
#如果我们执行简单的查询操作,就不会由update流操作,这种就被称为追加查询,因为结果表中只有INSERT操作,结果表只通过INSERT日志流就可以构建,所以能直接调用toDataStream()方法。
Table aliceVisitTable = tableEnv.sqlQuery("SELECT url, user FROM EventTable WHERE
user = 'Cary'")
#是不是所有的聚合操作都是更新查询呢,不是,如果是窗口聚合,那么窗口计算完成后就不会有update,只有insert。这就对应流处理中的全体聚合、窗口聚合,可以体会其中的差别。
// 设置 1 小时滚动窗口,执行 SQL 统计查询Table result = tableEnv.sqlQuery("SELECT " +"user, " +"window_end AS endT, " + // 窗口结束时间"COUNT(url) AS cnt " + // 统计 url 访问次数"FROM TABLE( " +"TUMBLE( TABLE EventTable, " + // 1 小时滚动窗口"DESCRIPTOR(ts), " +"INTERVAL '1' HOUR)) " +"GROUP BY user, window_start, window_end ");
查询限制

1数据量变大,内存状态无法维持

2每次计算复杂度很高,每次需要计算全量数据,例如RANK函数。这样的操作显然不适合在流处理中执行。

将动态表转换为流

仅追加流

仅通过插入更改来修改的动态表,可以转换为仅追加流。

撤回流

撤回流包含两类信息的流,添加消息和撤回消息。

INSERT 插入操作编码为add消息;DELETE删除操作编码为retract消息。而UPDATE更新操作则编码为更改行的retract消息,和新行的add消息。

更新插入流

更新插入流只包含两种类型的信息:更新插入消息和删除消息

更新插入upsert就是update和insert的合成词,所以对于更新插入流来说,INSERT插入操作和UPDATE更新操作,统一编码为upsert消息;而DELETE删除操作被编码为delete消息。

那么如何区分的insert和update呢,实际上是通过key值确定的,如果key已存在就执行update操作。所以更新插入流需要有对应的key值支持。

时间属性和窗口

事件时间
在创建的DDL中定义

在创建表的DDL中,可以增加一个字段,通过WATERMARK语句来定义事件时间属性。WATERMARK语句主要用来定义水位线的生成表达式。

CREATE TABLE EventTable(user STRING,url STRING,ts TIMESTAMP(3),WATERMARK FOR ts AS ts - INTERVAL '5' SECOND) WITH (...
);

一般情况下如果数据中的时间戳是“年-月-日-时-分-秒”的形式,那就是不带时区信 息的,可以将事件时间属性定义为 TIMESTAMP 类型。 而如果原始的时间戳就是一个长整型的毫秒数,这时就需要另外定义一个字段来表示事件 时间属性,类型定义为 TIMESTAMP_LTZ 会更方便:

CREATE TABLE events (user STRING,url STRING,ts BIGINT,ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),WATERMARK FOR ts_ltz AS time_ltz - INTERVAL '5' SECOND) WITH (...
);
在数据流转换为表时定义

可以在将DataStream转换为表的时候来定义,通过追加参数来定义表中的字段结构。这时可以给某个字段加上rowtime()后缀,就相当于将其指定为事件时间属性。这个字段可以是数据中本不存在、额外追加上去的逻辑字段,就像之前DDL中定义的第二种情况。也可以是本身固有的字段,那么这个字段会被时间属性覆盖,类型转换为TIMESTAMP。

注意这种方式只是生成时间属性,而水位线的生成应该在DataStream上定义。

// 方法一:
// 流中数据类型为二元组 Tuple2,包含两个字段;需要自定义提取时间戳并生成水位线
DataStream<Tuple2<String, String>> stream =
inputStream.assignTimestampsAndWatermarks(...);
// 声明一个额外的逻辑字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user"), $("url"),
$("ts").rowtime());// 方法二:
// 流中数据类型为三元组 Tuple3,最后一个字段就是事件时间戳
DataStream<Tuple3<String, String, Long>> stream =
inputStream.assignTimestampsAndWatermarks(...);
// 不再声明额外字段,直接用最后一个字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user"), $("url"),
$("ts").rowtime());
处理时间
在创建表的DDL中定义
CREATE TABLE EventTable(user STRING,url STRING,ts AS PROCTIME()
) WITH (...
);#转换时指定
Table table = tEnv.fromDataStream(stream, $("user"), $("url"),
$("ts").proctime());
窗口
分组窗口

包括常用的滚动窗口、滑动窗口、会话窗口

具体调用TUMBLE()、HOP()、SESSION() ,传入时间属性字段、窗口大小等参数就可以了。以滚动窗口为例:

TUMBLE(ts,INTERVAL '1' HOUR)
#ts是定义好的时间属性字段,窗口大小用‘时间间隔’INTERVAL定义
在进行窗口计算时,分组窗口是将窗口本身作为一个字段对数据进行分组的,可以对组内的数据进行聚合。
Table result = tableEnv.sqlQuery("SELECT " +"user, " +"TUMBLE_END(ts, INTERVAL '1' HOUR) as endT, " +"COUNT(url) AS cnt " +"FROM EventTable " +"GROUP BY " + // 使用窗口和用户名进行分组"user, " +"TUMBLE(ts, INTERVAL '1' HOUR)" // 定义 1 小时滚动窗口);
窗口表值函数

窗口表值函数式(Windowing table-valued functions, Windowing TVFs)是flink定义的多态表函数PTF,可以将表进行扩展后返回。表函数可以看做返回一个表的函数

有如下几个TVF

滚动窗口(Tumbling Windows);

滑动窗口(Hop Windows,跳跃窗口);

累积窗口(Cumulate Windows);

会话窗口(Session Windows,目前尚未完全支持)。

在窗口 TVF 的返回值中,除去原始表中的所有列,还增加了用来描述窗口的额外 3 个列: “窗口起始点”(window_start)、“窗口结束点”(window_end)、“窗口时间”(window_time)。

#滚动窗口 TUMBLE
TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)#滑动窗口 HOP
滑动窗口的使用与滚动窗口类似,可以通过设置滑动步长来控制统计输出的频率。在SQL中通过调用HOP()来声明滑动窗口。
HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS));#累积窗口(CUMULATE)
#比如我需要,当天累计至目前时刻的交易量,这就是累计窗口。如果用滑动和滚动只能计算固定窗口大小。
#两个关键参数,最大窗口长度max window size和累计步长step
CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))

聚合查询

分组聚合

内置聚合函数:SUM()、MAX()、MIN()、AVG()以及 COUNT()

在流处理中,分组聚合同样是一个持续查询,而且是 一个更新查询,得到的是一个动态表;每当流中有一个新的数据到来时,都会导致结果表的更 新操作。因此,想要将结果表转换成流或输出到外部系统,必须采用撤回流(retract stream) 或更新插入流(upsert stream)的编码方式;如果在代码中直接转换成 DataStream 打印输出, 需要调用 toChangelogStream()。

TableEnvironment tableEnv = ...
// 获取表环境的配置
TableConfig tableConfig = tableEnv.getConfig();
// 配置状态保持时间
tableConfig.setIdleStateRetention(Duration.ofMinutes(60));或者也可以直接设置配置项 table.exec.state.ttl:
TableEnvironment tableEnv = ...
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", "60 min");

窗口聚合

Table result = tableEnv.sqlQuery("SELECT " +"user, " +"window_end AS endT, " +"COUNT(url) AS cnt " +"FROM TABLE( " +"TUMBLE( TABLE EventTable, " +"DESCRIPTOR(ts), " +"INTERVAL '1' HOUR)) " +"GROUP BY user, window_start, window_end ");

开窗聚合

开窗函数就是对每一行数据进行处理,over开窗可以进行分组,但是不进行聚合操作,借助rank,lag等函数可以操作这个窗口内的数据。开窗相当于为每一行开启了一个特定的窗口,每一行的窗口都可能不同。
PARTITION BY(可选)
ORDER BY:flink中必须是定义好的时间属性
BETWEEN ... PRECEDING AND CURRENT ROW
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW#使用实例
SELECT user, ts,COUNT(url) OVER (PARTITION BY userORDER BY tsRANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS cnt
FROM EventTable#在外部定义窗口
SELECT user, ts,COUNT(url) OVER w AS cnt,MAX(CHAR_LENGTH(url)) OVER w AS max_url
FROM EventTable
WINDOW w AS (PARTITION BY userORDER BY tsROWS BETWEEN 2 PRECEDING AND CURRENT ROW)

联结join查询

常规联结查询

等值内联结

INNER JOIN返回两表中符合联接条件的所有行的组合,也就是笛卡尔积。

SELECT *
FROM Order
INNER JOIN Product
ON Order.product_id = Product.id
等值外联结

找不到匹配行的记录也进行返回

SELECT *
FROM Order
LEFT JOIN Product
ON Order.product_id = Product.idSELECT *
FROM Order
RIGHT JOIN Product
ON Order.product_id = Product.idSELECT *
FROM Order
FULL OUTER JOIN Product
ON Order.product_id = Product.id

间隔联结查询

SELECT *
FROM Order o, Shipment s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time

函数

flink函数主要分为两类,一类是系统内置的函数,COUNT(),CHAR_LENGTH(),UPPER()等等。另一类则是用户自定义函数UDF。

系统函数

标量函数
比较函数(Comparison Functions)
(1)value1 = value2 判断两个值相等;
(2)value1 <> value2 判断两个值不相等
(3)value IS NOT NULL 判断 value 不为空
逻辑函数(Logical Functions)
(1)boolean1 OR boolean2 布尔值 boolean1 与布尔值 boolean2 取逻辑或
(2)boolean IS FALSE 判断布尔值 boolean 是否为 false
(3)NOT boolean 布尔值 boolean 取逻辑非
算术函数(Arithmetic Functions)
进行算术计算的函数,包括用算术符号连接的运算,和复杂的数学运算。例如:
(1)numeric1 + numeric2 两数相加
(2)POWER(numeric1, numeric2) 幂运算,取数 numeric1 的 numeric2 次方
(3)RAND() 返回(0.0, 1.0)区间内的一个 double 类型的伪随机数
字符串函数(String Functions)
(1)string1 || string2 两个字符串的连接
(2)UPPER(string) 将字符串 string 转为全部大写
(3)CHAR_LENGTH(string) 计算字符串 string 的长度
时间函数(Temporal Functions)
(1)DATE string 按格式"yyyy-MM-dd"解析字符串 string,返回类型为 SQL Date
(2)TIMESTAMP string 按格式"yyyy-MM-dd HH:mm:ss[.SSS]"解析,返回类型为 SQL
timestamp
(3)CURRENT_TIME 返回本地时区的当前时间,类型为 SQL time(与 LOCALTIME
等价)
(4)INTERVAL string range 返回一个时间间隔。string 表示数值;range 可以是 DAY,
MINUTE,DAT TO HOUR 等单位,也可以是 YEAR TO MONTH 这样的复合单位。如“2 年
10 个月”可以写成:INTERVAL '2-10' YEAR TO MONTH
聚合函数

COUNT,SUM,RANK,ROW_NUMBER

自定义函数

当前 UDF 主要有以下几类:

1.标量函数(Scalar Functions):将输入的标量值转换成一个新的标量值;

2.表函数(Table Functions):将标量值转换成一个或多个新的行数据,也就是 扩展成一个表;

3.聚合函数(Aggregate Functions):将多行数据里的标量值转换成一个新的标 量值;

4.表聚合函数(Table Aggregate Functions):将多行数据里的标量值转换成一 个或多个新的行数据。

要想在代码中使用自定义的函数,我们需要首先自定义对应 UDF 抽象类的实现,并在表 环境中注册这个函数,然后就可以在 Table API 和 SQL 中调用了。

tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);
tableEnv.from("MyTable").select(call("MyFunction", $("myField")));
标量函数
public static class HashFunction extends ScalarFunction {// 接受任意类型输入,返回 INT 型输出public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {return o.hashCode();}
}
// 注册函数
tableEnv.createTemporarySystemFunction("HashFunction", HashFunction.class);
// 在 SQL 里调用注册好的函数
tableEnv.sqlQuery("SELECT HashFunction(myField) FROM MyTable");
表函数

与hive的lateral view类似,使用lateral table来生成扩展的侧向表。

// 注意这里的类型标注,输出是 Row 类型,Row 中包含两个字段:word 和 length。
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public static class SplitFunction extends TableFunction<Row> {public void eval(String str) {for (String s : str.split(" ")) {// 使用 collect()方法发送一行数据collect(Row.of(s, s.length()));}}
}
// 注册函数
tableEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class);
355
// 在 SQL 里调用注册好的函数
// 1. 交叉联结
tableEnv.sqlQuery("SELECT myField, word, length " +"FROM MyTable, LATERAL TABLE(SplitFunction(myField))");
// 2. 带 ON TRUE 条件的左联结
tableEnv.sqlQuery("SELECT myField, word, length " +"FROM MyTable " +"LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE");
// 重命名侧向表中的字段
tableEnv.sqlQuery("SELECT myField, newWord, newLength " +"FROM MyTable " +"LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON
TRUE");
聚合函数

必须实现如下方法:

createAccumulator:返回类型为累加器类型ACC

accumulate:每来一行数据就调用,第一个参数位累加器,类型为ACC,表示当前聚合的中间状态。后面的参数则是聚合函数调用时传入的参数,可以有多个,类型可以不同

getValue:得到最终返回值,输入参数是 ACC 类型的累加器,输出类型为 T

当需要对会话窗口聚合时,必须实现merger方法,定义累加器合并操作

当需要在OVER窗口聚合时,必须实现retract方法,保证数据可以撤回

resetAccumulator也是可选方法,有些场景用于重置累加器

// 累加器类型定义
public static class WeightedAvgAccumulator {public long sum = 0; // 加权和public int count = 0; // 数据个数
}
// 自定义聚合函数,输出为长整型的平均值,累加器类型为 WeightedAvgAccumulator
public static class WeightedAvg extends AggregateFunction<Long,
WeightedAvgAccumulator> {@Overridepublic WeightedAvgAccumulator createAccumulator() {return new WeightedAvgAccumulator(); // 创建累加器}@Overridepublic Long getValue(WeightedAvgAccumulator acc) {if (acc.count == 0) {return null; // 防止除数为 0} else {return acc.sum / acc.count; // 计算平均值并返回}}// 累加计算方法,每来一行数据都会调用public void accumulate(WeightedAvgAccumulator acc, Long iValue, Integer iWeight) {acc.sum += iValue * iWeight;acc.count += iWeight;}
}
// 注册自定义聚合函数
tableEnv.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);
// 调用函数计算加权平均值
Table result = tableEnv.sqlQuery("SELECT student, WeightedAvg(score, weight) FROM ScoreTable GROUP BY
student"
);
表聚合函数

createAccumulator() 创建累加器的方法,与 AggregateFunction 中用法相同。

accumulate() 聚合计算的核心方法,与 AggregateFunction 中用法相同。

emitValue():输出最终计算结果的方法,没有输出类型,通过out的收集器,发送多行数据。

SQL客户端

./bin/sql-client.sh#设置运行模式
SET 'execution.runtime-mode' = 'streaming';#执行结果模式,table、changelog、tableau模式
SET 'sql-client.execution.result-mode' = 'table';#执行 SQL 查询

连接到外部系统

#连接到控制台
CREATE TABLE ResultTable (user STRING,cnt BIGINTWITH ('connector' = 'print'
);

kafka

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
#根据连接器配置的格式,引入对应的格式
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version>
</dependency>#创建链接
CREATE TABLE KafkaTable (
`user` STRING,`url` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH ('connector' = 'kafka','topic' = 'events','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'csv'
)
Upsert Kafka

正常情况下,kafka作为保持数据顺序的消息队列,读取和写入都应该是流式的数据,对应在表中就是仅追加模式。如果我们想要将有更新操作的结果表写入Kafka,就会因为Kafka无法识别撤回或更新插入消息而导致异常。

将表格转换为流时,如果有删除,就插入null消息。如果碰到INSERT和UPDATE_AFTER就直接add操作。

CREATE TABLE pageviews_per_region (user_region STRING,pv BIGINT,uv BIGINT,PRIMARY KEY (user_region) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'pageviews_per_region','properties.bootstrap.servers' = '...','key.format' = 'avro','value.format' = 'avro'
);
CREATE TABLE pageviews (user_id BIGINT,page_id BIGINT,viewtime TIMESTAMP,user_region STRING,WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH ('connector' = 'kafka','topic' = 'pageviews','properties.bootstrap.servers' = '...','format' = 'json'
);
-- 计算 pv、uv 并插入到 upsert-kafka 表中
INSERT INTO pageviews_per_region
SELECTuser_region,COUNT(*),COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;

文件系统

CREATE TABLE MyTable (column_name1 INT,column_name2 STRING,...part_name1 INT,part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH ('connector' = 'filesystem', -- 连接器类型'path' = '...', -- 文件路径'format' = '...' -- 文件格式
)

JDBC

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>#此外,为了连接到特定的数据库,我们还用引入相关的驱动器依赖,比如 MySQL:
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version>
</dependency>-- 创建一张连接到 MySQL 的 表
CREATE TABLE MyTable (id BIGINT,name STRING,age INT,status BOOLEAN,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/mydatabase','table-name' = 'users'
);
-- 将另一张表 T 的数据写入到 MyTable 表中
INSERT INTO MyTable
SELECT id, name, age, status FROM T;

Elasticsearch

<dependency><groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifact
Id>
<version>${flink.version}</version>
</dependency>
对于 Elasticsearch 7 以上的版本,引入的依赖则是:
<dependency><groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifact
Id>
<version>${flink.version}</version>
</dependency>-- 创建一张连接到 Elasticsearch 的 表
CREATE TABLE MyTable (user_id STRING,user_name STRINGuv BIGINT,pv BIGINT,PRIMARY KEY (user_id) NOT ENFORCED
) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://localhost:9200','index' = 'users'
);

HBase

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hbase-1.4_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
对于 HBase 2.2 版本,引入的依赖则是:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hbase-2.2_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>-- 创建一张连接到 HBase 的 表
CREATE TABLE MyTable (
rowkey INT,
family1 ROW<q1 INT>,
family2 ROW<q2 STRING, q3 BIGINT>,
family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'mytable',
'zookeeper.quorum' = 'localhost:2181'
);-- 假设表 T 的字段结构是 [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
INSERT INTO MyTable
SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;

Hive

<!-- Flink 的 Hive 连接器-->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
<!-- Hive 依赖 -->
<dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>${hive.version}</version>
</dependency>EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir = "/opt/hive-conf";
// 创建一个 HiveCatalog,并在表环境中注册
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
// 使用 HiveCatalog 作为当前会话的 catalog
tableEnv.useCatalog("myhive");#sql客户端
create catalog myhive with ('type' = 'hive', 'hive-conf-dir' =
'/opt/hive-conf');
use catalog myhive;
SQL方言
set table.sql-dialect=hive;#通过配置文件配置
execution:planner: blinktype: batchresult-mode: table
configuration:table.sql-dialect: hive// 配置 hive 方言
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// 配置 default 方言
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
-- 设置 SQL 方言为 hive,创建 Hive 表
SET table.sql-dialect=hive;
CREATE TABLE hive_table (user_id STRING,order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ('partition.time-extractor.timestamp-pattern'='$dt $hr:00:00','sink.partition-commit.trigger'='partition-time','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='metastore,success-file'
);
-- 设置 SQL 方言为 default,创建 Kafka 表
SET table.sql-dialect=default;
CREATE TABLE kafka_table (user_id STRING,order_amount DOUBLE,log_ts TIMESTAMP(3),WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND – 定义水位线
) WITH (...);
-- 将 Kafka 中读取的数据经转换后写入 Hive
INSERT INTO TABLE hive_table
SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'),
DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;

Fink SQL和Table API相关推荐

  1. 第六课 大数据技术之Fink1.13的实战学习-Table Api和SQL

    第六课 大数据技术之Fink1.13的实战学习-Table Api和SQL 文章目录 第六课 大数据技术之Fink1.13的实战学习-Table Api和SQL 第一节 Fink SQL快速上手 1. ...

  2. 2021年大数据Flink(三十):Flink ​​​​​​​Table API  SQL 介绍

    目录 ​​​​​​​Table API & SQL 介绍 为什么需要Table API & SQL ​​​​​​​Table API& SQL发展历程 架构升级 查询处理器的选 ...

  3. 1.18.Table API SQL(概念、依赖图、Table程序依赖、扩展依赖)

    1.18.Table API & SQL 1.18.1.概念 1.18.1.1.依赖图 1.18.1.2.Table程序依赖 1.18.1.3.扩展依赖 1.18.Table API & ...

  4. Flink Table和SQL的基本API

    文章目录 一个示例 程序架构 创建表环境 创建表 1.连接器 2.虚拟表 表的查询 1.执行SQL查询 2.调用Table API进行查询 3.两种API的结合使用 输出表 表和流的转换 1.将表转换 ...

  5. Flink的Table和SQL的基本API

    文章目录 一个示例 程序架构 创建表环境 创建表 1.连接器 2.虚拟表 表的查询 1.执行SQL查询 2.调用Table API进行查询 3.两种API的结合使用 输出表 表和流的转换 1.将表转换 ...

  6. 1.19.7.Table API、SQL、数据类型、保留关键字、查询语句、指定查询、执行查询、语法、操作符、无排名输出优化、去重、分组窗口、时间属性、选择分组窗口的开始和结束时间戳、模式匹配

    1.19.7.Table API 1.19.8.SQL 1.19.8.1.概述 1.19.8.1.1.SQL 1.19.8.1.2.数据类型 1.19.8.1.3.保留关键字 1.19.8.2.查询语 ...

  7. 大话Flink之十一Table API 和 Flink SQL

    目录 Table API 和 Flink SQL 1 Table API 和 Flink SQL 是什么 2 基本程序结构 3 创建 TableEnvironment 4 表(Table) 4.1 创 ...

  8. Flink学习笔记(十一)Table API 和 SQL

    文章目录 11. Table API 和 SQL 11.1 快速上手 11.1.1 需要依赖 11.1.2 示例 11.2 基本 API 11.2.1 程序架构 11.2.2 创建表环境 11.2.3 ...

  9. Apache Flink 漫谈系列(13) - Table API 概述

    什么是Table API 在<Apache Flink 漫谈系列(08) - SQL概览>中我们概要的向大家介绍了什么是好SQL,SQL和Table API是Apache Flink中的同 ...

最新文章

  1. 如何理解*(int *)0x12ff7c = 0x100;?
  2. Vue.js安装使用教程
  3. undefined reference to
  4. android view滑动到顶部并固定在顶部
  5. python第四章单元测试_Python 单元测试
  6. win32开发(绘制bitmap)
  7. 使用Builder模式创建复杂可选参数对象
  8. Latch free等待事件
  9. 剑指offer面试题[34]丑数
  10. UVA489 Hangman Judge【模拟】
  11. DWR入门教程(http://www.cnblogs.com/cyjch/archive/2012/02/16/2353758.html)
  12. 全网首发:org.xml.sax.SAXNotRecognizedException: unrecognized features nonvalidating/load-external-dtd
  13. Ant Design Upload 文件上传功能
  14. 多一份感动,多一份行动[动物园的猪 发表于 2004-11-15]
  15. php.exe不是 32位有效应用程序,XP系统打开程序时提示“不是有效的Win32应用程序”怎么办?...
  16. python中私有属性无法访问的原理_python私有属性访问不到吗?
  17. 手把手教你用java发送邮件
  18. 计算机视频剪辑教程,VLOG视频剪辑教程
  19. 差分隐私-整理-知乎
  20. DIY 大型FDM 3D打印机中遇到的问题和经验分享(1)

热门文章

  1. 获取经纬度 +经纬度转换成中文地址
  2. java178-终篇?静态代理?动态代理?
  3. lammps剪切模拟关键技术讲解
  4. 赵艳丽谈下自己的一些经历和一个项目
  5. CRECT,RECT区别GetWindowRect GetClientRect
  6. 两步处理字符串正则匹配得到JSON列表
  7. Ubuntu常用快捷键总结
  8. matlab程序循环,matlab循环程序只得到一个结果
  9. 计算机专业的双证在职研究生,计算机双证在职研究生可以拿到双证吗招生专业多吗...
  10. Android内核编译(支持netfilter/iptalbes)(can't initialize iptables table `filter': iptables who)