1.18.2.5.查询表
1.18.2.5.1.Table API
1.18.2.5.2.SQL
1.18.2.5.3.混用Table API和SQL
1.18.2.6.输出表
1.18.2.7.翻译与执行查询
1.18.2.7.1.Blink planner
1.18.2.7.2.Old planner

1.18.2.5.查询表

1.18.2.5.1.Table API

Table API 是关于 Scala 和 Java 的集成语言式查询 API。与 SQL 相反,Table API 的查询不是由字符串指定,而是在宿主语言中逐步构建。

Table API 是基于 Table 类的,该类表示一个表(流或批处理),并提供使用关系操作的方法。这些方法返回一个新的 Table 对象,该对象表示对输入 Table 进行关系操作的结果。 一些关系操作由多个方法调用组成,例如 table.groupBy(…).select(),其中 groupBy(…) 指定 table 的分组,而 select(…) 在 table 分组上的投影。
package com.toto.demo.sql;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;import static org.apache.flink.table.api.Expressions.$;public class Demo {public static void main(String[] args) {EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// get a TableEnvironmentTableEnvironment tableEnv = TableEnvironment.create(bsSettings);// register Orders table// scan registered Orders tableTable orders = tableEnv.from("Orders");// compute revenue for all customers from FranceTable revenue = orders.filter($("cCountry").isEqual("FRANCE")).groupBy($("cID"), $("cName").select($("cID"), $("cName"), $("revenue").sum().as("revSum"));// emit or convert Table// execute query}}
1.18.2.5.2.SQL

Flink SQL 是基于实现了SQL标准的 Apache Calcite 的。SQL 查询由常规字符串指定。
文档 SQL 描述了Flink对流处理和批处理表的SQL支持。
下面的示例演示了如何指定查询并将结果作为 Table 对象返回。

Java代码版本:

package com.toto.demo.sql;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;public class Demo {public static void main(String[] args) {EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// get a TableEnvironmentTableEnvironment tableEnv = TableEnvironment.create(bsSettings);// register Orders table// compute revenue for all customers from FranceTable revenue = tableEnv.sqlQuery("SELECT cID, cName, SUM(revenue) AS revSum " +"FROM Orders " +"WHERE cCountry = 'FRANCE' " +"GROUP BY cID, cName");// emit or convert Table// execute query}}

Scala版本

package com.toto.learn.sqlimport org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}object Demo {def main(args: Array[String]): Unit = {val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()val tableEnv: TableEnvironment = TableEnvironment.create(bbSettings)val revenue = tableEnv.sqlQuery("""|SELECT cID, cName, SUM(revenue) AS revSum|FROM Orders|WHERE cCountry = 'FRANCE'|GROUP BY cID,cName""".stripMargin)// emit or convert Table// execute query}}

如下的示例展示了如何指定一个更新查询,将查询的结果插入到已注册的表中。

package com.toto.demo.sql;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;public class Demo {public static void main(String[] args) {EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// get a TableEnvironmentTableEnvironment tableEnv = TableEnvironment.create(bsSettings);// register "Orders" table// register "RevenueFrance" output table// compute revenue for all customers from France and emit to "RevenueFrance"tableEnv.executeSql("INSERT INTO RevenueFrance " +"SELECT cID, cName, SUM(revenue) AS revSum " +"FROM Orders " +"WHERE cCountry = 'FRANCE' " +"GROUP BY cID, cName");}}
1.18.2.5.3.混用Table API和SQL

Table API和SQL查询的混用非常简单因为它们都返回 Table 对象:
可以在SQL查询返回的Table对象上定义Table API查询。
在TableEnvironment中注册的结果表可以在 SQL查询的FROM子句中引用,通过这种方法就可以在 Table API 查询的结果上定义SQL查询。

1.18.2.6.输出表

Table通过写入 TableSink 输出。TableSink是一个通用接口,用于支持多种文件格式(如 CSV、Apache Parquet、Apache Avro)、存储系统(如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch)或消息队列系统(如Apache Kafka、RabbitMQ)。

批处理Table只能写入 BatchTableSink,而流处理 Table 需要指定写入 AppendStreamTableSink,RetractStreamTableSink 或者 UpsertStreamTableSink。

请参考文档 Table Sources & Sinks (https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/sourceSinks.html)以获取更多关于可用 Sink 的信息以及如何自定义 TableSink。

方法Table.executeInsert(String tableName) 将Table发送至已注册的TableSink。该方法通过名称在catalog中查找TableSink并确认Table schema和TableSink schema一致。

下面的示例演示如何输出 Table:

package com.toto.demo.sql;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;public class Demo {public static void main(String[] args) {EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// get a TableEnvironmentTableEnvironment tableEnv = TableEnvironment.create(bsSettings);// create an output Tablefinal Schema schema = new Schema().field("a", DataTypes.INT()).field("b", DataTypes.STRING()).field("c", DataTypes.BIGINT());tableEnv.connect(new FileSystem().path("/path/to/file")).withFormat(new OldCsv().fieldDelimiter("|").deriveSchema()).withSchema(schema).createTemporaryTable("CsvSinkTable");// compute a result Table using Table API operators and/or SQL queriesTable result = ...// emit the result Table to the registered TableSinkresult.executeInsert("CsvSinkTable");}}

1.18.2.7.翻译与执行查询

两种计划器翻译和执行查询的方式是不同的。

1.18.2.7.1.Blink planner

不论输入数据源是流式的还是批式的,Table API 和 SQL 查询都会被转换成 DataStream 程序。查询在内部表示为逻辑查询计划,并被翻译成两个阶段:
1.优化逻辑执行计划
2.翻译成DataStream程序
当 TableEnvironment.executeSql() 被调用时。该方法是用来执行一个 SQL 语句,一旦该方法被调用, SQL 语句立即被翻译。
当 Table.executeInsert() 被调用时。该方法是用来将一个表的内容插入到目标表中,一旦该方法被调用, TABLE API 程序立即被翻译。
当 Table.execute() 被调用时。该方法是用来将一个表的内容收集到本地,一旦该方法被调用, TABLE API 程序立即被翻译。
当 StatementSet.execute() 被调用时。Table (通过 StatementSet.addInsert() 输出给某个 Sink)和 INSERT 语句 (通过调用 StatementSet.addInsertSql())会先被缓存到 StatementSet 中,StatementSet.execute() 方法被调用时,所有的 sink 会被优化成一张有向无环图。
当 Table 被转换成 DataStream 时(参阅与 DataStream 和 DataSet API 结合)。转换完成后,它就成为一个普通的 DataStream 程序,并会在调用 StreamExecutionEnvironment.execute() 时被执行。

注意:从1.11版本开始,sqlUpdate方法和insertInfo方法被废弃,从这两个方法构建的Table程序必须通过StreamTableEnvironment.execute()方法执行,而不能通过StreamExecutionEnvironment.execute()方法来执行。

1.18.2.7.2.Old planner

Table API 和 SQL 查询会被翻译成 DataStream 或者 DataSet 程序, 这取决于它们的输入数据源是流式的还是批式的。查询在内部表示为逻辑查询计划,并被翻译成两个阶段:
1.优化逻辑执行计划
2.翻译成DataStream或DataSet程序。

Table API 或者 SQL 查询在下列情况下会被翻译:
当 TableEnvironment.executeSql() 被调用时。该方法是用来执行一个 SQL 语句,一旦该方法被调用, SQL 语句立即被翻译。
当 Table.executeInsert() 被调用时。该方法是用来将一个表的内容插入到目标表中,一旦该方法被调用, TABLE API 程序立即被翻译。
当 Table.execute() 被调用时。该方法是用来将一个表的内容收集到本地,一旦该方法被调用, TABLE API 程序立即被翻译。
当 StatementSet.execute() 被调用时。Table (通过 StatementSet.addInsert() 输出给某个 Sink)和 INSERT 语句 (通过调用 StatementSet.addInsertSql())会先被缓存到 StatementSet 中,StatementSet.execute() 方法被调用时,所有的 sink 会被优化成一张有向无环图。
对于 Streaming 而言,当Table 被转换成 DataStream 时(参阅与 DataStream 和 DataSet API 结合)触发翻译。转换完成后,它就成为一个普通的 DataStream 程序,并会在调用 StreamExecutionEnvironment.execute() 时被执行。对于 Batch 而言,Table 被转换成 DataSet 时(参阅与 DataStream 和 DataSet API 结合)触发翻译。转换完成后,它就成为一个普通的 DataSet 程序,并会在调用 ExecutionEnvironment.execute() 时被执行。

注意 :从 1.11 版本开始,sqlUpdate 方法 和 insertInto 方法被废弃。对于 Streaming 而言,如果一个 Table 程序是从这两个方法构建出来的,必须通过 StreamTableEnvironment.execute() 方法执行,而不能通过 StreamExecutionEnvironment.execute() 方法执行;对于 Batch 而言,如果一个 Table 程序是从这两个方法构建出来的,必须通过 BatchTableEnvironment.execute() 方法执行,而不能通过 ExecutionEnvironment.execute() 方法执行。

1.18.2.5.Table APISQL(查询表、Table API、SQL、混用Table API和SQL、输出表、翻译与执行查询、Blink planner、Old planner)等相关推荐

  1. Flink Table和SQL的基本API

    文章目录 一个示例 程序架构 创建表环境 创建表 1.连接器 2.虚拟表 表的查询 1.执行SQL查询 2.调用Table API进行查询 3.两种API的结合使用 输出表 表和流的转换 1.将表转换 ...

  2. Flink的Table和SQL的基本API

    文章目录 一个示例 程序架构 创建表环境 创建表 1.连接器 2.虚拟表 表的查询 1.执行SQL查询 2.调用Table API进行查询 3.两种API的结合使用 输出表 表和流的转换 1.将表转换 ...

  3. Fink SQL和Table API

    文章目录 Fink SQL和Table API 示例 基本API 依赖 程序架构 创建表环境 创建表 连接器表 虚拟表 表的查询 执行SQL进行查询 调用Table API进行查询 输出表 表和流的转 ...

  4. 1.18.2.Table APISQL(概念与通用API、两种计划器(Planner)的主要区别、创建 TableEnvironment、临时表、永久表、创建表、虚拟表、Connector 等)

    1.18.2.概念与通用API 1.18.2.1.两种计划器(Planner)的主要区别: 1.18.2.2.Table API和SQL程序的结构 1.18.2.3.创建 TableEnvironme ...

  5. Apache Flink 零基础入门(十八)Flink Table APISQL

    什么是Flink关系型API? 虽然Flink已经支持了DataSet和DataStream API,但是有没有一种更好的方式去编程,而不用关心具体的API实现?不需要去了解Java和Scala的具体 ...

  6. SQL ALTER TABLE 语句在项目中的使用

    1.在实际的项目开发过程中,之前已经创建好的实体类可能需要增加/删除字段,亦或是更改已有字段的属性,比如主键的增长策略从自增型改为UUID型,那么就会涉及到 SQL 中 alter table 语句的 ...

  7. asp 设置table 间距_B端后台表格(table)如何设计

    嗨小伙伴们大家好-☀️今天小编给大家带来的文章是B端后台表格(table)如何设计,本篇文章干货较多且长,记得先?再看哦-? (全文共计6908字,阅读约需要17分钟) 引言 目前在一家制造业软件提供 ...

  8. SQL CREATE TABLE 语句(转)

    CREATE TABLE 语句 CREATE TABLE 语句用于创建数据库中的表. SQL CREATE TABLE 语法 CREATE TABLE 表名称 ( 列名称1 数据类型, 列名称2 数据 ...

  9. SQL ALTER TABLE 语句

    SQL ALTER TABLE 语句 ALTER TABLE 语句 ALTER TABLE 语句用于在已有的表中添加.删除或修改列. SQL ALTER TABLE 语法 如需在表中添加列,请使用下面 ...

最新文章

  1. android 自定义键盘_Android自定义输入车牌号键盘、车牌简称,数字 ,字母键盘...
  2. 人脸相关2020eccv
  3. 003_Servlet生命周期
  4. 用反向传导做分子模拟:苯胺(C6H5NH2)和硝基苯(C6H5NO2)
  5. 题解 UVA10298 【Power Strings】
  6. springMVC rest风格
  7. Python绘制傅里叶变换、反变换与带通滤波图像
  8. Class com.fasterxml.jackson.databind.ser.BasicSerializerFactory can not access a member of class com
  9. 仿微信添加触摸图片阴影效果
  10. Python绘制简单漂亮好玩的散点图
  11. nmon和nmon analyser使用方法
  12. 数值分析·学习 | 平方根法和追赶法matlab实现
  13. 运维工程师常见软件故障_软件故障分类| 软件工程师
  14. 中国古代衣食住行 3
  15. 微信小程序入门:和风天气小程序
  16. 区块宝周报:区块链一周大事排行榜10.16
  17. HDOJ Saving HDU JAVA 2111
  18. linu修改open files无效_雷电模拟器修改教程
  19. 微巴士阳光出行---竞品分析
  20. “3+3+N”之下,神州数码在云时代进击

热门文章

  1. IDEA 点击进入方法内部_【推荐收藏】IDEA的Debug调试,你全会用么?
  2. ie 不执行回调函时_「Excel VBA操作IE篇」10分钟内设置完成,3句代码打开IE浏览器
  3. linux 命令行 java_在Linux上讲Java命令行的作为服务运行
  4. Python并发编程之多进程(一)
  5. python+opencv 给女朋友照片加上个性相框,学会等着她夸你。
  6. boost::mpl::aux::largest_int相关用法的测试程序
  7. hana::detail::variadic::foldl1用法的测试程序
  8. boost::function_types::is_callable_builtin用法的测试程序
  9. DCMTK:TLS测试DcmSCP和DcmSCPPool类
  10. VTK:可视化之HideAllActors