文章目录

  • 一个示例
  • 程序架构
  • 创建表环境
  • 创建表
    • 1、连接器
    • 2、虚拟表
  • 表的查询
    • 1、执行SQL查询
    • 2、调用Table API进行查询
    • 3、两种API的结合使用
  • 输出表
  • 表和流的转换
    • 1、将表转换成流
    • 2、将流转换成表
    • 3、支持的数据类型
    • 4、综合应用示例

一个示例

import com.yingzi.chapter05.Source.Event;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;public class TableExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据源SingleOutputStreamOperator<Event> eventStream = env.fromElements(new Event("Alice", "./home", 1000L),new Event("Bob", "./cart", 1000L),new Event("Alice", "./prod?id=1", 5 * 1000L),new Event("Cary", "./home", 60 * 1000L),new Event("Bob", "./prod?id=3", 90 * 1000L),new Event("Alice", "./prod?id=7", 105 * 1000L));//获取表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//将数据流转换成表Table eventTable = tableEnv.fromDataStream(eventStream);//用执行SQL的方式提取数据Table visitTable1 = tableEnv.sqlQuery("select url,user,`timestamp` from " + eventTable);//基于Table直接转换Table visitTable2 = eventTable.select($("user"), $("url")).where($("user").isEqual("Alice"));//将表转换成数据流,打印输出tableEnv.toDataStream(visitTable1).print("table1");tableEnv.toDataStream(visitTable2).print("table2");env.execute();}
}

程序架构

// 创建表环境
TableEnvironment tableEnv = ...;
// 创建输入表,连接外部系统读取数据
tableEnv.executeSql("CREATE TEMPORARY TABLE inputTable ... WITH ( 'connector'= ... )");
// 创建输出表,连接到外部系统输出数据
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector'= ... )");
// 执行 SQL 对表进行查询转换,得到一个新的表
Table table1 = tableEnv.sqlQuery("SELECT ... FROM inputTable... ");
// 使用 Table API 对表进行查询转换,得到一个新的表
Table table2 = tableEnv.from("inputTable").select(...);
// 将得到的结果写入输出表
TableResult tableResult = table1.executeInsert("outputTable");

通过执行DDL来直接创建一个表,connector指定了外部系统的连接器。

创建表环境

数据流和表在结构上还是有所区别,使用Table API和SQL需要一个特别的运行时环境——表环境(TableEnvironment)

  • 注册Catalog和表
  • 执行SQL查询
  • 注册用户自定义函数(UDF)
  • DataStream和表直接的转换

Catalog就是目录,主要用来管理所有数据库和表的元数据。通过Catalog可以方便地对数据库和表进行查询的管理。在表环境中可以由用户自定义Catalog,并在其中注册表和自定义函数,默认的Catalog叫做default_catalog

每个表和SQL的执行,都必须绑定一个表环境(TableEnvironment),其是Table API提供的基本接口类,可以通过调用静态的create()方法来创建一个表环境实例,方法传入环境配置参数EnvironmentSettings,指定当前表环境的执行模式和计划器。执行模式有:批处理、流处理,默认是流处理,计划器默认是blink planner

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode() // 使用流处理模式.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

另一种更加简单的方式创建表环境,引入流式表环境(StreamTableEnvironment),继承自TableEnvironment的子接口,调用crete()方法,创建出对应的流式表环境

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

创建表

表是我们非常熟悉的一个概念,它是关系型数据库中数据存储的基本形式,也是SQL执行的基本对象。Flink中的表是由多个行数据构成,每个行又可以定义多个列。整体来看,表就是固定类型的数据组成的二维矩阵

表环境中会维护一个目录(Catalog)和表的对应关系。表都是通过Catalog来进行注册创建,在环境中有一个唯一的ID,由三部分组成:目录(catalog)名、数据库(database)名、表名。在默认情况下,目录名为default_catalog、数据库名为default_database。故我们直接创建一个MyTable的表,它的ID是:default_catalog.default_database.MyTable

具体创建表的方式:连接器(connector)、虚拟表(virtual tables)

1、连接器

通过连接器(connector)连接到一个外部系统,然后定义出对应的表结构。例如,我们可以连接到Kafka或者文件系统,将存储在这些外部的数据以表的形式定义出来,这样对表的读写就可以通过连接器转换成对外部系统的读写了。当我们在表环境中读取这张表,连接器会从外部系统读取数据并进行转换,而当我向这种表写入数据,连接器会将数据输出(Sink)到外部系统

在代码中,调用表环境的executeSql()方法,可传入一个DDL作为参数执行SQL操作。这里,我们传入一个CREATE语句进行表的创建,并通过WITH关键字指定连接到外部系统的连接器:

tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable ... WITH ( 'connector'= ... )");

目录(Catalog)、数据库(Database)若没有定义则默认为default_catalog.default_database。自定义目录名、库名设置如下:

tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");
2、虚拟表

在环境注册之后,可在SQL中直接使用这张表进行查询转换

Table newTable = tableEnv.sqlQuery("SELECT ... FROM MyTable... ");

调用表环境的sqlQuery()方法,直接传入一条SQL语句作为参数执行查询,得到的结果是一个Table对象。Table是Table API中提供的核心接口类,代表了一个Java中定义的表实例

将得到表对象注册到环境中,即可在SQL中使用:

tableEnv.createTemporaryView("NewTable", newTable);

这里的注册其实是创建了一个虚拟表(Virtual Table),这个概念与SQL语法中的视图(View)非常类似,调用的方法也叫做创建虚拟视图(createTemporaryView)。视图是虚拟的,我们不会直接保存这个表的内容,没有实体,只在用到这张表的时候,会将它对应的查询语句嵌入到Sql中

虚拟表可以让我们在Table API和SQL之间进行自由切换,一个Java中的Table对象可以直接调用Table API中定义好的查询转换方法,得到一个中间结果表,这跟对注册好的表直接执行SQL结果一样。

表的查询

Flink提供了两种查询方式:SQL和Table API

1、执行SQL查询

Flink基于Apache Calcite来提供对SQL的支持,Calcite是一个为不同的计算平台提供标准SQL查询的底层工具,很多大数据框架如 Apache Hive、Apache Kylin 中的 SQL 支持都是通过集成 Calcite 来实现的

在代码中,调用表环境的sqlQuery()方法,传入一个字符串的SQL查询语句,执行得到一个Table对象

// 创建表环境
TableEnvironment tableEnv = ...;
// 创建表
tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )");
// 查询用户 Alice 的点击事件,并提取表中前两个字段
Table aliceVisitTable = tableEnv.sqlQuery("SELECT user, url " +"FROM EventTable " +"WHERE user = 'Alice' ");

我们也可以通过GROUP BY关键字定义分组聚合,调用COUNT()、SUM()这样的函数来进行计算

Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) " +"FROM EventTable " +"GROUP BY user ");

上面得到的是一个新的Table对象,我们可以再次将它注册为虚拟表在SQL中调用。另外,也可以直接将查询的结果写入到已经注册的表中,这需要调用表环境的executeSql()方法来执行DDL,传入一个INSERT语句:

// 注册表
tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )");
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");
// 将查询结果输出到 OutputTable 中
tableEnv.executeSql (
"INSERT INTO OutputTable " +"SELECT user, url " +"FROM EventTable " +"WHERE user = 'Alice' ");
2、调用Table API进行查询

嵌入于Java和Scala语言内的查询API,核心就是Table接口类,通过一步步链式调用的Table方法,就可以定义出所有的查询转换操作,每一步方法调用返回的结果都是Table

Table API是基于Table的Java实例进行调用,因此我们首先要得到表的Java对象,基于环境已注册表,可以通过表环境的from()方法得到一个Table对象:

Table eventTable = tableEnv.from("EventTable");

传入的参数就是注册好的表名(EventTable),而eventTable是一个Table对象,得到Table对象之后,就可以调用API进行各种转换操作了,得到新的Table对象:

Table maryClickTable = eventTable.where($("user").isEqual("Alice")).select($("url"), $("user"));

$符号用来指定表中的一个字段。Table API是嵌入式编程语言中的DSL,SQL中的很多特性和功能必须要有对应的实现才可以使用,因此跟直接写SQL比要麻烦

3、两种API的结合使用

调用API或执行SQL得到的结果仍是一个Table对象,故两种API的查询可以结合

  • 无论那种方式得到的Table对象,都可以继续调用Table API进行查询转换

  • 对一个表执行SQL操作(用FROM关键字引用),必须现在环境中对它进行注册,我们可以通过创建虚拟表的方式实现两者的转换:

    tableEnv.createTemporaryView("MyTable", myTable);
    

输出表

表的创建和和查询,对应流处理中的读取数据源(Source)和转换(Transform),最后的Sink也就是将结果数据输出到外部系统,对应着表的输出操作

在代码上,调用Table方法executeInsert()方法将一个Table写入到注册过表中,方法传入的参数就是注册的表名

// 注册表,用于输出数据到外部系统
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");
// 经过查询转换,得到结果表
Table result = ...
// 将结果表写入已注册的输出表中
result.executeInsert("OutputTable");

在底层,表的输出是通过将数据写入到TableSink来实现的,TableSink是Table API中提供的一个向外部系统写入数据的通用接口,可支持不同的文件格式(如CSV、Parquet)、存储数据库(如JDBC、HBase、Elasticsearch)和消息队列(如Kafka)

表和流的转换

在Flink中,我们可以将Table转换成DataStream,然后进行打印输出

1、将表转换成流

(1)调用toDataStream()方法

直接调用表环境的toDataStream()方法

Table aliceVisitTable = tableEnv.sqlQuery("SELECT user, url " +"FROM EventTable " +"WHERE user = 'Alice' ");
// 将表转换成数据流
tableEnv.toDataStream(aliceVisitTable).print();

(2)调用toChangelogStream()方法

对于会更新操作的表,记录它的更新日志,将表转换成更新日志流

Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) " +"FROM EventTable " +"GROUP BY user ");
// 将表转换成更新日志流
tableEnv.toChangelogStream(urlCountTable).print();
2、将流转换成表

(1)调用fromDataStream()方法

调用表环境的fromDataStream()方法来实现,返回一个Table对象。例如,我们可以直接将事件流eventStream转换成一个表

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 获取表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 读取数据源
SingleOutputStreamOperator<Event> eventStream = env.addSource(...)
// 将数据流转换成表
Table eventTable = tableEnv.fromDataStream(eventStream);

流中的数据本身就是定义好的POJO类型Event,故我们将流转换成表之后,每一行数据就对应着一个Event,而表中的列名就对应这Event中的属性

另外,在fromDataStream()方法中增加参数,用来指定提取哪些属性作为表中的字段名,可以任意指定位置:

// 提取 Event 中的 timestamp 和 url 作为表中的列
Table eventTable2 = tableEnv.fromDataStream(eventStream, $("timestamp"),$("url"));// 将 timestamp 字段重命名为 ts
Table eventTable2 = tableEnv.fromDataStream(eventStream, $("timestamp").as("ts"),$("url"));

注意的是,timestampe本身是SQL中的关键字,我们在定义表名、列名时应尽量避免,可以通过表达式as()方法对字段进行重命名

(2)调用createTemporaryView()方法

直接在SQL中引用表,需要调用表环境的createTemporaryView()方法来创建虚拟视图。对于这种场景,直接调用createTemporaryView()方法创建虚拟表,传入两个参数,第一个是注册的表名,第二个是DataStream,之后仍旧可以传入多个参数,用来指定表中的字段

tableEnv.createTemporaryView("EventTable", eventStream,$("timestamp").as("ts"),$("url"));

接下来可以直接在SQL中引用表EventTable

(3)调用 fromChangelogStream()方法

表环境还提供了一个方法fromChangelogStream(),可以将一个更新日志流转换成表,这个方法要求流中的数据类型只能是Row,而且每一个数据都需要指定当前行的更新类型(RowKind),一般由连接器实现

3、支持的数据类型

(1)原子类型

在Flink中,基础数据类型(Integer、Double、String)和通用数据类型(不可再拆分)统一称作原子类型。原子类型的DataStream转换之后就成了只有一列的Table,列字段(field)的数据类型可以由原子类型推断出,另外,还可以在fromDataStream()方法里增加参数,用来重新命名字段

StreamTableEnvironment tableEnv = ...;
DataStream<Long> stream = ...;
// 将数据流转换成动态表,动态表只有一个字段,重命名为 myLong
Table table = tableEnv.fromDataStream(stream, $("myLong"));

(2)Tuple类型

当原子类型不做重命名时,默认字段名是f0,将原子类型当做一元组Tuple1处理。Table支持Flink中定义的元组类型Tuple,对应在表中字段名默认就是元组中的属性名f0、f1、f2…所有字段都可以被重新排序,可以提取其中一部分字段,字段还可以通过调用表达式的as()方法来进行重命名

StreamTableEnvironment tableEnv = ...;
DataStream<Tuple2<Long, Integer>> stream = ...;
// 将数据流转换成只包含 f1 字段的表
Table table = tableEnv.fromDataStream(stream, $("f1"));
// 将数据流转换成包含 f0 和 f1 字段的表,在表中 f0 和 f1 位置交换
Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0"));
// 将 f1 字段命名为 myInt,f0 命名为 myLong
Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"),$("f0").as("myLong"));

(3)POJO类型

Flink也支持数据类型组合成复合类型,最典型的就是简单Java对象(POJO类型),由于POJO中已经定义好了可读性强的字段名,这种类型的数据流转换成Table就显得无比顺畅了。将 POJO 类型的 DataStream 转换成 Table,如果不指定字段名称,就会直接使用原始 POJO 类型中的字段名称。POJO 中的字段同样可以被重新排序、提却和重命名

StreamTableEnvironment tableEnv = ...;
DataStream<Event> stream = ...;
Table table = tableEnv.fromDataStream(stream);
Table table = tableEnv.fromDataStream(stream, $("user"));
Table table = tableEnv.fromDataStream(stream, $("user").as("myUser"),$("url").as("myUrl"));

(4)Row类型

Row类型是一种复合类型,它的长度固定,而且无法直接推断出每个字段的类型,所以在使用时必须指明具体的类型信息;在创建Table时调用CREATE语句就会将所有的字段名名称和类型指定,在Flink中被称为表的模式结构(Schema)。除此之外,Row类型还附加了一个属性RowKind,用来表示当前行在更新操作中的类型,Row因此可以用来表示更新日志流(changelog stream)中的数据,从而架起了Flink中流和表的转换桥梁

在更新日志流中,元素的类型必须是 Row,而且需要调用 ofKind()方法来指定更新类型

DataStream<Row> dataStream = env.fromElements(Row.ofKind(RowKind.INSERT, "Alice", 12),Row.ofKind(RowKind.INSERT, "Bob", 5),Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));
// 将更新日志流转换为表
Table table = tableEnv.fromChangelogStream(dataStream);
4、综合应用示例

查询Alice点击的url列表、统计每个用户累加的点击次数

import com.yingzi.chapter05.Source.Event;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class TableToStreamExample {public static void main(String[] args) throws Exception {//获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//读取数据源SingleOutputStreamOperator<Event> eventStream = env.fromElements(new Event("Alice", "./home", 1000L),new Event("Bob", "./cart", 1000L),new Event("Alice", "./prod?id=1", 5 * 1000L),new Event("Cary", "./home", 60 * 1000L),new Event("Bob", "./prod?id=3", 90 * 1000L),new Event("Alice", "./prod?id=7", 105 * 1000L));//获取表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//将数据流转换成表tableEnv.createTemporaryView("EventTable", eventStream);//查询Alice的访问url列表Table aliceVisitTable = tableEnv.sqlQuery("SELECT url,user FROM EventTable WHERE user = 'Alice'");//统计每个用户的点击次数Table urlCounTable = tableEnv.sqlQuery("SELECT user,COUNT(url) FROM EventTable GROUP BY user");//将表转换成数据流,在控制台打印输出tableEnv.toDataStream(aliceVisitTable).print("alic visit");tableEnv.toChangelogStream(urlCounTable).print("count");env.execute();}
}

Flink的Table和SQL的基本API相关推荐

  1. 2021年大数据Flink(三十二):​​​​​​​Table与SQL案例准备 API

    目录 API 获取环境 创建表 查询表 Table API SQL ​​​​​​​写出表 ​​​​​​​与DataSet/DataStream集成 ​​​​​​​TableAPI ​​​​​​​SQL ...

  2. Flink Table和SQL的基本API

    文章目录 一个示例 程序架构 创建表环境 创建表 1.连接器 2.虚拟表 表的查询 1.执行SQL查询 2.调用Table API进行查询 3.两种API的结合使用 输出表 表和流的转换 1.将表转换 ...

  3. flink的dataset/stream/sql三套API的选择以及是否应该阅读源码

    我常常在钉钉群群里面请教,群里也有阿里P7/P8的专家. 但是每当我请教dataset/datastream相关问题的时候,即使是专家也没有响应. 钉钉群里面P7的是云邪,擅长使用的也是flink s ...

  4. Flink的Table API 与SQL介绍及调用

    1 概述    DataSetAPI和DateStreamAPI是基于整个Flink的运行时环境做操作处理的,Table API和SQL是在DateStreamAPI上又包了一层.对于新版本的Blin ...

  5. Flink教程(16)- Flink Table与SQL

    文章目录 01 引言 02 Table API & SQL 介绍 2.1 Flink Table模块 2.2 Table API & SQL特点 2.3 Table API& ...

  6. Flink的Table API 与SQL的流处理

    1 流处理与SQL的区别   Table API和SQL,本质上还是基于关系型表的操作方式:而关系型表.SQL本身,一般是有界的,更适合批处理的场景.所以在流处理的过程中,有一些特殊概念. SQL 流 ...

  7. 2021年大数据Flink(三十三):​​​​​​​Table与SQL相关概念

    目录 相关概念 Dynamic Tables & Continuous Queries ​​​​​​​Table to Stream Conversion ​​​​​​​ ​​​​​​​相关概 ...

  8. Flink教程(17)- Flink Table与SQL(案例与SQL算子)

    文章目录 01 引言 02 Flink Table&SQL 案例 2.1 案例1(DataStream SQL统计) 2.2 案例2(DataStream Table&SQL统计) 2 ...

  9. Flink 使用Table Api 读取文件数据并写出到文件中

    前言 在上一篇我们演示了如何使用Flink 的Table Api 读取文件数据,并过滤特定字段的数据,本篇在上一篇的基础上,将从CSV文件中读取的数据重新输出到一个新的CSV文件中: 在实际业务场景下 ...

最新文章

  1. Vmware Workstation VMX 在资源管理器中杀不掉(虚拟机繁忙导致无法关机)
  2. 网络管理员比赛回顾04-DHCP
  3. Coursera Machine Learning 作业提交问题
  4. 回顾2020,我国无人机经历了四大新变化
  5. [WinAPI] API 11 [创建目录]
  6. 把 textbox 遍历赋值为空
  7. 报告解读丨企服 9 大规模化获客标杆模型(附赠案例)
  8. boost::test模块装饰器数据测试用例测试
  9. 使用Microsoft Media Service实现网络影音多媒体应用系列第三篇---技术要点
  10. java中文件选择器JFileChooser的用法
  11. leetcode python3 简单题167. Two Sum II - Input array is sorted
  12. struts2 action 返回类型分析
  13. Buildroot make网卡interfaces文件被修改
  14. 计算机继电保护书籍,电力网络继电保护的计算机整定计算
  15. 下载Android5.1源代码
  16. SpringCloud day12
  17. HashSet模拟新浪微博用户注册
  18. mysql audit log_关于MySQL AUDIT(审计)那点事
  19. 一文读懂ICS工业控制系统架构
  20. Verilog练习:HDLBits笔记4

热门文章

  1. 创建ServiceArea
  2. RabbitMQ之路由选择
  3. Android NDK学习记录(一)
  4. 快速排序的C++实现
  5. sql server中case的简单示例
  6. 【机器学习】一些模型的位置总结
  7. 基于Cache的Fibonacci数列的计算
  8. 无类型指针、空指针和野指针
  9. RDS for MySQL 物理备份文件恢复到本地数据库(亲测)
  10. Python学习笔记: 闭包