1.18.2.5.Table APISQL(查询表、Table API、SQL、混用Table API和SQL、输出表、翻译与执行查询、Blink planner、Old planner)等
1.18.2.5.查询表
1.18.2.5.1.Table API
1.18.2.5.2.SQL
1.18.2.5.3.混用Table API和SQL
1.18.2.6.输出表
1.18.2.7.翻译与执行查询
1.18.2.7.1.Blink planner
1.18.2.7.2.Old planner
1.18.2.5.查询表
1.18.2.5.1.Table API
Table API 是关于 Scala 和 Java 的集成语言式查询 API。与 SQL 相反,Table API 的查询不是由字符串指定,而是在宿主语言中逐步构建。
Table API 是基于 Table 类的,该类表示一个表(流或批处理),并提供使用关系操作的方法。这些方法返回一个新的 Table 对象,该对象表示对输入 Table 进行关系操作的结果。 一些关系操作由多个方法调用组成,例如 table.groupBy(…).select(),其中 groupBy(…) 指定 table 的分组,而 select(…) 在 table 分组上的投影。
package com.toto.demo.sql;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;import static org.apache.flink.table.api.Expressions.$;public class Demo {public static void main(String[] args) {EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// get a TableEnvironmentTableEnvironment tableEnv = TableEnvironment.create(bsSettings);// register Orders table// scan registered Orders tableTable orders = tableEnv.from("Orders");// compute revenue for all customers from FranceTable revenue = orders.filter($("cCountry").isEqual("FRANCE")).groupBy($("cID"), $("cName").select($("cID"), $("cName"), $("revenue").sum().as("revSum"));// emit or convert Table// execute query}}
1.18.2.5.2.SQL
Flink SQL 是基于实现了SQL标准的 Apache Calcite 的。SQL 查询由常规字符串指定。
文档 SQL 描述了Flink对流处理和批处理表的SQL支持。
下面的示例演示了如何指定查询并将结果作为 Table 对象返回。
Java代码版本:
package com.toto.demo.sql;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;public class Demo {public static void main(String[] args) {EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// get a TableEnvironmentTableEnvironment tableEnv = TableEnvironment.create(bsSettings);// register Orders table// compute revenue for all customers from FranceTable revenue = tableEnv.sqlQuery("SELECT cID, cName, SUM(revenue) AS revSum " +"FROM Orders " +"WHERE cCountry = 'FRANCE' " +"GROUP BY cID, cName");// emit or convert Table// execute query}}
Scala版本
package com.toto.learn.sqlimport org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}object Demo {def main(args: Array[String]): Unit = {val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()val tableEnv: TableEnvironment = TableEnvironment.create(bbSettings)val revenue = tableEnv.sqlQuery("""|SELECT cID, cName, SUM(revenue) AS revSum|FROM Orders|WHERE cCountry = 'FRANCE'|GROUP BY cID,cName""".stripMargin)// emit or convert Table// execute query}}
如下的示例展示了如何指定一个更新查询,将查询的结果插入到已注册的表中。
package com.toto.demo.sql;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;public class Demo {public static void main(String[] args) {EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// get a TableEnvironmentTableEnvironment tableEnv = TableEnvironment.create(bsSettings);// register "Orders" table// register "RevenueFrance" output table// compute revenue for all customers from France and emit to "RevenueFrance"tableEnv.executeSql("INSERT INTO RevenueFrance " +"SELECT cID, cName, SUM(revenue) AS revSum " +"FROM Orders " +"WHERE cCountry = 'FRANCE' " +"GROUP BY cID, cName");}}
1.18.2.5.3.混用Table API和SQL
Table API和SQL查询的混用非常简单因为它们都返回 Table 对象:
可以在SQL查询返回的Table对象上定义Table API查询。
在TableEnvironment中注册的结果表可以在 SQL查询的FROM子句中引用,通过这种方法就可以在 Table API 查询的结果上定义SQL查询。
1.18.2.6.输出表
Table通过写入 TableSink 输出。TableSink是一个通用接口,用于支持多种文件格式(如 CSV、Apache Parquet、Apache Avro)、存储系统(如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch)或消息队列系统(如Apache Kafka、RabbitMQ)。
批处理Table只能写入 BatchTableSink,而流处理 Table 需要指定写入 AppendStreamTableSink,RetractStreamTableSink 或者 UpsertStreamTableSink。
请参考文档 Table Sources & Sinks (https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/sourceSinks.html)以获取更多关于可用 Sink 的信息以及如何自定义 TableSink。
方法Table.executeInsert(String tableName) 将Table发送至已注册的TableSink。该方法通过名称在catalog中查找TableSink并确认Table schema和TableSink schema一致。
下面的示例演示如何输出 Table:
package com.toto.demo.sql;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;public class Demo {public static void main(String[] args) {EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// get a TableEnvironmentTableEnvironment tableEnv = TableEnvironment.create(bsSettings);// create an output Tablefinal Schema schema = new Schema().field("a", DataTypes.INT()).field("b", DataTypes.STRING()).field("c", DataTypes.BIGINT());tableEnv.connect(new FileSystem().path("/path/to/file")).withFormat(new OldCsv().fieldDelimiter("|").deriveSchema()).withSchema(schema).createTemporaryTable("CsvSinkTable");// compute a result Table using Table API operators and/or SQL queriesTable result = ...// emit the result Table to the registered TableSinkresult.executeInsert("CsvSinkTable");}}
1.18.2.7.翻译与执行查询
两种计划器翻译和执行查询的方式是不同的。
1.18.2.7.1.Blink planner
不论输入数据源是流式的还是批式的,Table API 和 SQL 查询都会被转换成 DataStream 程序。查询在内部表示为逻辑查询计划,并被翻译成两个阶段:
1.优化逻辑执行计划
2.翻译成DataStream程序
当 TableEnvironment.executeSql() 被调用时。该方法是用来执行一个 SQL 语句,一旦该方法被调用, SQL 语句立即被翻译。
当 Table.executeInsert() 被调用时。该方法是用来将一个表的内容插入到目标表中,一旦该方法被调用, TABLE API 程序立即被翻译。
当 Table.execute() 被调用时。该方法是用来将一个表的内容收集到本地,一旦该方法被调用, TABLE API 程序立即被翻译。
当 StatementSet.execute() 被调用时。Table (通过 StatementSet.addInsert() 输出给某个 Sink)和 INSERT 语句 (通过调用 StatementSet.addInsertSql())会先被缓存到 StatementSet 中,StatementSet.execute() 方法被调用时,所有的 sink 会被优化成一张有向无环图。
当 Table 被转换成 DataStream 时(参阅与 DataStream 和 DataSet API 结合)。转换完成后,它就成为一个普通的 DataStream 程序,并会在调用 StreamExecutionEnvironment.execute() 时被执行。
注意:从1.11版本开始,sqlUpdate方法和insertInfo方法被废弃,从这两个方法构建的Table程序必须通过StreamTableEnvironment.execute()方法执行,而不能通过StreamExecutionEnvironment.execute()方法来执行。
1.18.2.7.2.Old planner
Table API 和 SQL 查询会被翻译成 DataStream 或者 DataSet 程序, 这取决于它们的输入数据源是流式的还是批式的。查询在内部表示为逻辑查询计划,并被翻译成两个阶段:
1.优化逻辑执行计划
2.翻译成DataStream或DataSet程序。
Table API 或者 SQL 查询在下列情况下会被翻译:
当 TableEnvironment.executeSql() 被调用时。该方法是用来执行一个 SQL 语句,一旦该方法被调用, SQL 语句立即被翻译。
当 Table.executeInsert() 被调用时。该方法是用来将一个表的内容插入到目标表中,一旦该方法被调用, TABLE API 程序立即被翻译。
当 Table.execute() 被调用时。该方法是用来将一个表的内容收集到本地,一旦该方法被调用, TABLE API 程序立即被翻译。
当 StatementSet.execute() 被调用时。Table (通过 StatementSet.addInsert() 输出给某个 Sink)和 INSERT 语句 (通过调用 StatementSet.addInsertSql())会先被缓存到 StatementSet 中,StatementSet.execute() 方法被调用时,所有的 sink 会被优化成一张有向无环图。
对于 Streaming 而言,当Table 被转换成 DataStream 时(参阅与 DataStream 和 DataSet API 结合)触发翻译。转换完成后,它就成为一个普通的 DataStream 程序,并会在调用 StreamExecutionEnvironment.execute() 时被执行。对于 Batch 而言,Table 被转换成 DataSet 时(参阅与 DataStream 和 DataSet API 结合)触发翻译。转换完成后,它就成为一个普通的 DataSet 程序,并会在调用 ExecutionEnvironment.execute() 时被执行。
注意 :从 1.11 版本开始,sqlUpdate 方法 和 insertInto 方法被废弃。对于 Streaming 而言,如果一个 Table 程序是从这两个方法构建出来的,必须通过 StreamTableEnvironment.execute() 方法执行,而不能通过 StreamExecutionEnvironment.execute() 方法执行;对于 Batch 而言,如果一个 Table 程序是从这两个方法构建出来的,必须通过 BatchTableEnvironment.execute() 方法执行,而不能通过 ExecutionEnvironment.execute() 方法执行。
1.18.2.5.Table APISQL(查询表、Table API、SQL、混用Table API和SQL、输出表、翻译与执行查询、Blink planner、Old planner)等相关推荐
- Flink Table和SQL的基本API
文章目录 一个示例 程序架构 创建表环境 创建表 1.连接器 2.虚拟表 表的查询 1.执行SQL查询 2.调用Table API进行查询 3.两种API的结合使用 输出表 表和流的转换 1.将表转换 ...
- Flink的Table和SQL的基本API
文章目录 一个示例 程序架构 创建表环境 创建表 1.连接器 2.虚拟表 表的查询 1.执行SQL查询 2.调用Table API进行查询 3.两种API的结合使用 输出表 表和流的转换 1.将表转换 ...
- Fink SQL和Table API
文章目录 Fink SQL和Table API 示例 基本API 依赖 程序架构 创建表环境 创建表 连接器表 虚拟表 表的查询 执行SQL进行查询 调用Table API进行查询 输出表 表和流的转 ...
- 1.18.2.Table APISQL(概念与通用API、两种计划器(Planner)的主要区别、创建 TableEnvironment、临时表、永久表、创建表、虚拟表、Connector 等)
1.18.2.概念与通用API 1.18.2.1.两种计划器(Planner)的主要区别: 1.18.2.2.Table API和SQL程序的结构 1.18.2.3.创建 TableEnvironme ...
- Apache Flink 零基础入门(十八)Flink Table APISQL
什么是Flink关系型API? 虽然Flink已经支持了DataSet和DataStream API,但是有没有一种更好的方式去编程,而不用关心具体的API实现?不需要去了解Java和Scala的具体 ...
- SQL ALTER TABLE 语句在项目中的使用
1.在实际的项目开发过程中,之前已经创建好的实体类可能需要增加/删除字段,亦或是更改已有字段的属性,比如主键的增长策略从自增型改为UUID型,那么就会涉及到 SQL 中 alter table 语句的 ...
- asp 设置table 间距_B端后台表格(table)如何设计
嗨小伙伴们大家好-☀️今天小编给大家带来的文章是B端后台表格(table)如何设计,本篇文章干货较多且长,记得先?再看哦-? (全文共计6908字,阅读约需要17分钟) 引言 目前在一家制造业软件提供 ...
- SQL CREATE TABLE 语句(转)
CREATE TABLE 语句 CREATE TABLE 语句用于创建数据库中的表. SQL CREATE TABLE 语法 CREATE TABLE 表名称 ( 列名称1 数据类型, 列名称2 数据 ...
- SQL ALTER TABLE 语句
SQL ALTER TABLE 语句 ALTER TABLE 语句 ALTER TABLE 语句用于在已有的表中添加.删除或修改列. SQL ALTER TABLE 语法 如需在表中添加列,请使用下面 ...
最新文章
- android 自定义键盘_Android自定义输入车牌号键盘、车牌简称,数字 ,字母键盘...
- 人脸相关2020eccv
- 003_Servlet生命周期
- 用反向传导做分子模拟:苯胺(C6H5NH2)和硝基苯(C6H5NO2)
- 题解 UVA10298 【Power Strings】
- springMVC rest风格
- Python绘制傅里叶变换、反变换与带通滤波图像
- Class com.fasterxml.jackson.databind.ser.BasicSerializerFactory can not access a member of class com
- 仿微信添加触摸图片阴影效果
- Python绘制简单漂亮好玩的散点图
- nmon和nmon analyser使用方法
- 数值分析·学习 | 平方根法和追赶法matlab实现
- 运维工程师常见软件故障_软件故障分类| 软件工程师
- 中国古代衣食住行 3
- 微信小程序入门:和风天气小程序
- 区块宝周报:区块链一周大事排行榜10.16
- HDOJ Saving HDU JAVA 2111
- linu修改open files无效_雷电模拟器修改教程
- 微巴士阳光出行---竞品分析
- “3+3+N”之下,神州数码在云时代进击
热门文章
- IDEA 点击进入方法内部_【推荐收藏】IDEA的Debug调试,你全会用么?
- ie 不执行回调函时_「Excel VBA操作IE篇」10分钟内设置完成,3句代码打开IE浏览器
- linux 命令行 java_在Linux上讲Java命令行的作为服务运行
- Python并发编程之多进程(一)
- python+opencv 给女朋友照片加上个性相框,学会等着她夸你。
- boost::mpl::aux::largest_int相关用法的测试程序
- hana::detail::variadic::foldl1用法的测试程序
- boost::function_types::is_callable_builtin用法的测试程序
- DCMTK:TLS测试DcmSCP和DcmSCPPool类
- VTK:可视化之HideAllActors