1 概述

   DataSetAPI和DateStreamAPI是基于整个Flink的运行时环境做操作处理的,Table API和SQL是在DateStreamAPI上又包了一层。对于新版本的Blink在DateStream基础上又包了一层实现了批流统一,上层执行环境都是基于流处理,做批流统一的查询。Table API是流处理和批处理通用的关系型API,与常规SQL语言中将查询指定为字符串不同,Table API查询是以Java或Scala中的语言嵌入样式来定义。 是一套内嵌在 Java 和 Scala 语言中的查询API,它允许以非常直观的方式组合来自一些关系运算符的查询。

  从Flink 1.9开始,Flink为Table 和SQL API程序提供了两种不同的planner :Blink planner 和the old planner。Blink planner 和the old planner的区别如下:

  (1)批流统一:Blink将批处理作业,视为流式处理的特殊情况。所以,blink不支持表和DataSet之间的转换,批处理作业将不转换为DataSet应用程序,而是跟流处理一样,转换为DataStream程序来处理。

  (2)因为批流统一,Blink planner也不支持BatchTableSource,而使用有界的StreamTableSource代替

  (3)Blink planner只支持全新的目录,不支持已弃用的ExternalCatalog

  (4)old planner和Blink planner的FilterableTableSource实现不兼容。旧的planner会把PlannerExpressions下推到filterableTableSource中,而blink planner则会把Expressions下推

  (5)基于字符串的键值配置选项仅适用于Blink planner

  (6)PlannerConfig在两个planner中的实现不同

  (7) Blink planner会将多个sink优化在一个DAG中(仅在TableEnvironment上受支持,而在StreamTableEnvironment上不受支持)。而旧planner的优化总是将每一个sink放在一个新的DAG中,其中所有DAG彼此独立

  (8)旧的planner不支持目录统计,而Blink planner支持

  需要添加的依赖

  根据目标编程语言的不同,您需要将Java或ScalaAPI添加到项目中,以便使用TableAPI&SQL来定义管道:

<!-- Either... -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>1.10.0</version><scope>provided</scope>
</dependency>
<!-- or... -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>1.10.0</version><scope>provided</scope>
</dependency>

  您想在IDE中本地运行TableAPI&SQL程序,则必须添加以下一组模块,具体取决于使用的planner:

<!-- Either... (for the old planner that was available before Flink 1.9) -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.10.0</version><scope>provided</scope>
</dependency>
<!-- or.. (for the new Blink planner) -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>1.10.0</version><scope>provided</scope>
</dependency>

   在内部,表生态系统的一部分是在Scala中实现的。因此确保为批处理和流应用程序添加以下依赖项:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.10.0</version><scope>provided</scope>
</dependency>

2 Table API 和 SQL 的程序结构

  所有用于批处理和流处理的Table API和SQL程序都遵循相同的结构,与流式处理的程序结构类似

//1 创建表的执行环境
// create a TableEnvironment for specific planner batch or streaming
val tableEnv = ... // see "Create a TableEnvironment" section//2 创建表,读取数据
tableEnv.connect(...).createTemporaryTable("table1")//3 注册表,用于输出计算结果
tableEnv.connect(...).createTemporaryTable("outputTable")//4.1 通过Table API查询得到结果表
val tapiResult = tableEnv.from("table1").select(...)//4.2 通过SQL查询得到结果表
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ...")//5 将结果表写入到输出表
tapiResult.insertInto("outputTable")// execute
tableEnv.execute("scala_job")

3 创建TableEnvironment

  创建表的执行环境,需要将 flink 流处理的执行环境传入

val tableEnv = StreamTableEnvironment.create(env)

  TableEnvironment 是 flink 中集成 Table API 和 SQL 的核心概念,所有对表的操作都基于 TableEnvironment :注册 Catalog(可以认为是对表的管理的结构);在 Catalog 中注册表;执行 SQL 查询;注册用户自定义函数(UDF);转换DataStream或DataSet变成Table;保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

  在老版本Table总是绑定到特定的TableEnvironment。不可能在同一个查询中组合不同TableEnvironment的表,例如加入或合并它们。

  TableEnvironment创建通过调用静态的BatchTableEnvironment.create()或StreamTableEnvironment.create()方法的StreamExecutionEnvironment或者ExecutionEnvironment还有一个可选的TableConfig。这个TableConfig可用于配置TableEnvironment或自定义查询优化和转换过程

  如果两个planner JAR都位于类路径(默认行为)上,则应显式设置在当前程序中使用的planner。

// **********************
// FLINK STREAMING QUERY 配置老版本 planner 的流式查询
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironmentval fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings)
// or val fsTableEnv = TableEnvironment.create(fsSettings)// ******************
// FLINK BATCH QUERY  配置老版本 planner 的批式查询
// ******************
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala.BatchTableEnvironmentval fbEnv = ExecutionEnvironment.getExecutionEnvironment
val fbTableEnv = BatchTableEnvironment.create(fbEnv)// **********************
// BLINK STREAMING QUERY  配置 blink planner 的流式查询
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironmentval bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
// or val bsTableEnv = TableEnvironment.create(bsSettings)// ******************
// BLINK BATCH QUERY  配置 blink planner 的批式查询
// ******************
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)

  注意:老版本可以在代码里面定义不同的环境,老版本处理过程中可以有批处理和流处理环境。一张批处理环境,一张流处理环境是不能做join查询的,所有针对表的操作,必须是基于同一个执行环境的。

  如果只有一个Planner/lib目录,可以使用useAnyPlanner (use_any_planner(用于python)创建特定的EnvironmentSettings.

4 在Catalog中创建表

  一个TableEnvironment维护使用标识符创建的表的Catalog的map,也就是TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表。表(Table)是由一个“标识符”(identifier)来指定的,由3部分组成:Catalog名、数据库(database)名和对象名

  表可以是常规的,也可以是虚拟的视图,View。常规表(Table):一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream转换而来。视图(View):可以从现有的表中创建,通常是 table API 或者 SQL 查询的一个结果集

  表可以是临时的,并且与单个Flink会话的生命周期相关联,或者是永久的,并且跨多个Flink会话和集群可见。永久表:需要Catalog(如Hive Metastore)来维护有关表的元数据。一旦创建了永久表,它对连接到目录的任何Flink会话都是可见的,并且将继续存在,直到该表显式删除为止。临时表:总是存储在内存中,并且仅在Flink会话期间才存在。这些表在其他会话中不可见。它们不绑定到任何目录或数据库,但可以在其中的名称空间中创建。如果删除临时表的相应数据库,则不会删除它们。

4.1 创建表

4.1.1 Virtual Tables

  Table API对应于虚拟的表,它封装了一个逻辑查询计划。它可以在Catalog中创建,如下所示:

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section// table is the result of a simple projection query
val projTable: Table = tableEnv.from("X").select(...)// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable)

  注意:Table对象类似于关系数据库系统的VIEW,即定义Table未优化,当另一个查询引用已注册的Table查询时,将内联。如果多个查询引用同一个已注册的Table,它将为每个引用查询内联并多次执行,即注册的结果Table不被分享。

4.1.2 Connector Tables

  可以创建一个TABLE从关系数据库中所知道的连接器申报。连接器描述存储表数据的外部系统,可以在这里声明诸如Kafka之类的存储系统或常规文件系统。其实就是TableEnvironment 可以调用 .connect() 方法,连接外部系统,并调用 .createTemporaryTable() 方法,在 Catalog 中注册表

tableEnvironment.connect(...)          // 定义表的数据来源,与外部系统建立连接.withFormat(...)       // 定义数据格式化方法.withSchema(...)       // 定义表结构.inAppendMode().createTemporaryTable("MyTable")      // 创建临时表

  创建 Table 从文件中读取如下:

tableEnv.connect(new FileSystem().path(“YOUR_FILE_PATH”))    // 定义到文件系统的连接.withFormat(new Csv())    // 定义以csv格式进行数据格式化.withSchema(new Schema()    // 定义表结构.field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE()))    .createTemporaryTable("sensorTable")    // 创建临时表

4.2 表的标识符identifier

  Flink中表由一个identifier指定,identifier由Catalog名、数据库(database)名和对象名(表名)组成。用户可以将其中的一Catalog和一个database设置为“当前目录”和“当前数据库”。使用它们,上面提到的3部分标识符中的前两部分可以是可选的,如果不提供它们,则将引用当前目录和当前数据库。用户可以通过表API或SQL切换当前目录和当前数据库。

  标识符遵循SQL要求,这意味着可以使用回勾字符(`)。此外,必须转义所有SQL保留关键字。

Scala
// get a TableEnvironment
val tEnv: TableEnvironment = ...;
tEnv.useCatalog("custom_catalog")
tEnv.useDatabase("custom_database")val table: Table = ...;// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("exampleView", table)// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_database.exampleView", table)// register the view named 'View' in the catalog named 'custom_catalog' in the
// database named 'custom_database'. 'View' is a reserved keyword and must be escaped.
tableEnv.createTemporaryView("`View`", table)// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("`example.View`", table)// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table)

5 表的查询

5.1 Table API

  Table API 是集成在 Scala 和 Java 语言内的查询 API。

  Table API 基于代表“表”的 Table 类,并提供一整套操作处理的方法 API;这些方法会返回一个新的 Table 对象,表示对输入表应用转换操作的结果,有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构

  Table API 文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/tableApi.html

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section// register Orders table// scan registered Orders table
val orders = tableEnv.from("Orders")
// compute revenue for all customers from France
val revenue = orders.filter('cCountry === "FRANCE").groupBy('cID, 'cName).select('cID, 'cName, 'revenue.sum AS 'revSum)// emit or convert Table
// execute query

注意:Scala Table API使用Scala符号,前面加了一个单引号’,这是Table API中定义的Expression类型的写法,可以很方便地表示一个表中的字段。字段可以直接全部用双引号引起来,也可以用半边单引号+字段名的方式。Table API使用Scala实现。确保导入org.apache.flink.api.scala._和org.apache.flink.table.api.scala._为了使用Scala隐式转换。

5.2 SQL

  Flink 的 SQL 集成,基于实现 了SQL 标准的 Apache Calcite,在 Flink 中,用常规字符串来定义 SQL 查询语句,SQL 查询的结果,也是一个新的 Table。官方Flink对流表和批处理表的SQL支持地址:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/index.html

  指定查询并将结果作为Table如下:

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section// register Orders table// compute revenue for all customers from France
val revenue = tableEnv.sqlQuery("""|SELECT cID, cName, SUM(revenue) AS revSum|FROM Orders|WHERE cCountry = 'FRANCE'|GROUP BY cID, cName""".stripMargin)// emit or convert Table
// execute query

  指定将其结果插入到已注册表中的更新查询如下:

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section// register "Orders" table
// register "RevenueFrance" output table// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.sqlUpdate("""|INSERT INTO RevenueFrance|SELECT cID, cName, SUM(revenue) AS revSum|FROM Orders|WHERE cCountry = 'FRANCE'|GROUP BY cID, cName""".stripMargin)// execute query

  Table API和SQL查询可以很容易地混合在一起使用,因为两者都返回Table对象:Table API查询可以基于SQL查询返回的Table对象;可以根据Table API查询的结果定义SQL查询注册结果表在TableEnvironment并在FROM子句的SQL查询。

6 输出表

6.1 表的输出

  表的输出,是通过将数据写入 TableSink 来实现的,TableSink 是一个通用接口,可以支持不同的文件格式(如CSV、Apache Parquet、Apache Avro)、存储数据库(如如JDBC、Apache HBASE、Apache Cassandra、Elasticsearch)和消息队列(如如Apache Kafka、RabbitMQ)。

  一个batch Table只能写入BatchTableSink,而Streaming Table需要一个AppendStreamTableSink或RetractStreamTableSink或UpsertStreamTableSink。

  输出表最直接的方法,就是通过 Table.insertInto() 方法将一个 Table 写入注册过的 TableSink 中

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section// create an output Table
val schema = new Schema().field("a", DataTypes.INT()).field("b", DataTypes.STRING()).field("c", DataTypes.LONG())tableEnv.connect(new FileSystem("/path/to/file")).withFormat(new Csv().fieldDelimiter('|').deriveSchema()).withSchema(schema).createTemporaryTable("CsvSinkTable")// compute a result Table using Table API operators and/or SQL queries
val result: Table = ...// emit the result Table to the registered TableSink
result.insertInto("CsvSinkTable")// execute the program

6.2 更新模式

  对于流式查询,需要声明如何在表和外部连接器之间执行转换,与外部系统交换的消息类型,由更新模式(Update Mode)指定。有追加(Append),撤回(Retract),更新插入(Upsert)三种模式

  追加(Append)模式:表只做插入操作,和外部连接器只交换插入(Insert)消息。以前发出的结果永远不会更新,如果更新或删除操作使用追加模式会失败报错。

  撤回(Retract)模式:表和外部连接器交换添加(Add)和撤回(Retract)消息,插入操作(Insert)编码为 Add 消息;删除(Delete)编码为 Retract 消息;更新(Update)编码为上一条的 Retract 和下一条的 Add 消息。返回值是boolean类型。它用true或false来标记数据的插入和撤回,返回true代表数据插入,false代表数据的撤回。

  更新插入(Upsert)模式:更新和插入都被编码为 Upsert 消息;删除编码为 Delete 消息

  撤回(Retract)和更新插入(Upsert)的区别:

  输出到文件如下:Retract不能定义key,这一点跟upsert模式完全不同;Update操作需要一个唯一的key,通过这个key可以传递更新消息。为了正确应用消息,外部连接器需要知道这个唯一key的属性,是用单个消息编码的,所以效率会更高。

6.3 输出到文件

tableEnv.connect(new FileSystem().path("/path/to/file") // 定义到文件系统的连接.withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("temp", DataTypes.Double())) .createTemporaryTable("outputTable")     // 创建临时表resultTable.insertInto("outputTable")    // 输出表

6.4 输出到Kafka

  可以创建 Table 来描述 kafka 中的数据,作为输入或输出的 TableSink

tableEnv.connect(new Kafka().version("0.11").topic("sinkTest").property("zookeeper.connect", "localhost:2181").property("bootstrap.servers", "localhost:9092")
).withFormat( new Csv() ).withSchema( new Schema().field("id", DataTypes.STRING()).field("temp", DataTypes.DOUBLE())).createTemporaryTable("kafkaOutputTable")resultTable.insertInto("kafkaOutputTable")

6.5 输出到ElasticSearch

  ElasticSearch的connector可以在upsert(update+insert,更新插入)模式下操作,这样就可以使用Query定义的键(key)与外部系统交换UPSERT/DELETE消息。对于“仅追加”(append-only)的查询,connector还可以在append 模式下操作,这样就可以与外部系统只交换insert消息。es目前支持的数据格式,只有Json,而flink本身并没有对应的支持,所以还需要引入依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.10.0</version>
</dependency>
tableEnv.connect(new Elasticsearch().version("6").host("localhost", 9200, "http").index("test").documentType("temp")
).inUpsertMode()           // 指定是 Upsert 模式.withFormat(new Json()).withSchema( new Schema().field("id", DataTypes.STRING()).field("count", DataTypes.BIGINT())).createTemporaryTable("esOutputTable")aggResultTable.insertInto("esOutputTable")

6.6 输出到MySql

  Flink专门为Table API的jdbc连接提供了flink-jdbc连接器,我们需要先引入依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.10.0</version>
</dependency>

  jdbc连接的代码实现比较特殊,因为没有对应的java/scala类实现ConnectorDescriptor,所以不能直接tableEnv.connect()。不过Flink SQL留下了执行DDL的接口:tableEnv.sqlUpdate()

val sinkDDL: String ="""|create table jdbcOutputTable (|  id varchar(20) not null,|  cnt bigint not null|) with (|  'connector.type' = 'jdbc',|  'connector.url' = 'jdbc:mysql://localhost:3306/test',|  'connector.table' = 'sensor_count',|  'connector.driver' = 'com.mysql.jdbc.Driver',|  'connector.username' = 'root',|  'connector.password' = '123456'|)""".stripMargintableEnv.sqlUpdate(sinkDDL)
aggResultSqlTable.insertInto("jdbcOutputTable")

7 Query的解释和执行

  Table API提供了一种机制来解释(Explain)计算表的逻辑和优化查询计划。这是通过TableEnvironment.explain(table)方法或TableEnvironment.explain()方法完成的。

  explain方法会返回一个字符串,描述三个计划:①未优化的逻辑查询计划②优化后的逻辑查询计划③实际执行计划

  查看执行计划如下

val explaination: String = tableEnv.explain(resultTable)
println(explaination)

  Query的解释和执行过程,老planner和blink planner是不一样的。整体来讲,Query都会表示成一个逻辑查询计划,然后分两步解释:①优化查询计划②解释成 DataStream 或者 DataSet程序

  而Blink版本是批流统一的,所以所有的Query,只会被解释成DataStream程序;另外在批处理环境TableEnvironment下,Blink版本要到tableEnv.execute()执行调用才开始解释。

8 Table与DataStream,DataSet的集成

  两种planners都可以与DataStream API集成,只有老的planner才能与DataSet API集成。Scala Table API提供了DataSet、DataStream和Table的隐式转换,通过导入org.apache.flink.table.api.scala._

8.1 从DataStream或DataSet创建视图

  DataStream或DataSet可以在TableEnvironment作为一个视图,只能注册为临时视图

// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" sectionval stream: DataStream[(Long, String)] = ...// register the DataStream as View "myTable" with fields "f0", "f1"
tableEnv.createTemporaryView("myTable", stream)// register the DataStream as View "myTable2" with fields "myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, 'myLong, 'myString)

8.2 将DataStream或DataSet转换成表

  DataStream或DataSet可以直接转换为Table,而不是注册在TableEnvironment。

// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = ... // see "Create a TableEnvironment" sectionval stream: DataStream[(Long, String)] = ...// convert the DataStream into a Table with default fields '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)// convert the DataStream into a Table with fields 'myLong, 'myString
val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

  可以基于一个DataStream,先流式地读取数据源,然后map成样例类,再把它转成Table。Table的列字段(column fields),就是样例类里的字段,这样就不用再定义schema。

  代码中实现非常简单,直接用tableEnv.fromDataStream()就可以了。默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来。

  这就允许我们更换字段的顺序、重命名,或者只选取某些字段出来,相当于做了一次map操作(或者Table API的 select操作)

case class WC(id: String, timestamp: Long, count: Double)val inputStream: DataStream[String] = env.readTextFile("/path/to/file")
val dataStream: DataStream[WC] = inputStream.map(data => {val dataArray = data.split(",")SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)})val wcTable: Table = tableEnv.fromDataStream(dataStream)

  数据类型与 Table schema的对应:上面DataStream 中的数据类型,与表的 Schema 之间的对应关系,是按照样例类中的字段名来对应的(name-based mapping),所以还可以用as做重命名。另外一种对应方式是,直接按照字段的位置来对应(position-based mapping),对应的过程中,就可以直接指定新的字段名了。

  基于名称的对应:

val wcTable = tableEnv.fromDataStream(dataStream, 'timestamp as 'ts, 'id as 'myId, 'temperature)

  基于位置的对应:

val wcTable = tableEnv.fromDataStream(dataStream, 'myId, 'ts)

  Flink的DataStream和 DataSet API支持多种类型。组合类型,比如元组(内置Scala和Java元组)、POJO、Scala case类和Flink的Row类型等,允许具有多个字段的嵌套数据结构,这些字段可以在Table的表达式中访问。其他类型,则被视为原子类型。

  元组类型和原子类型,一般用位置对应会好一些;用名称对应的话:元组类型,默认的名称是 “_1”, “_2”;而原子类型,默认名称是 ”f0”。

8.3 将表转换成DataStream

  表可以转换为DataStream或DataSet。这样,自定义流处理或批处理程序就可以继续在 Table API或SQL查询的结果上运行了。

  将表转换为DataStream或DataSet时,需要指定生成的数据类型,即要将表的每一行转换成的数据类型。通常,最方便的转换类型就是Row。当然,因为结果的所有字段类型都是明确的,我们也经常会用元组类型来表示。

  表作为流式查询的结果,是动态更新的。所以,将这种动态查询转换成的数据流,同样需要对表的更新操作进行编码,进而有不同的转换模式。Table API中表到DataStream有两种模式:①追加模式(Append Mode):用于表只会被插入(Insert)操作更改的场景②撤回模式(Retract Mode):用于任何场景。有些类似于更新模式中Retract模式,它只有Insert和Delete两类操作。得到的数据会增加一个Boolean类型的标识位(返回的第一个字段),用它来表示到底是新增的数据(Insert),还是被删除的数据(老数据, Delete)。

// get TableEnvironment.
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section// Table with two fields (String name, Integer age)
val table: Table = ...// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple = tableEnv.toAppendStream[(String, Int)](table)// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream[(Boolean, X)].
//   The boolean field indicates the type of the change.
//   True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

  注意:一般没有经过groupby之类聚合操作,可以直接用 toAppendStream 来转换;而如果经过了聚合,有更新操作,一般就必须用 toRetractDstream

Flink的Table API 与SQL介绍及调用相关推荐

  1. Flink的Table API 与SQL的流处理

    1 流处理与SQL的区别   Table API和SQL,本质上还是基于关系型表的操作方式:而关系型表.SQL本身,一般是有界的,更适合批处理的场景.所以在流处理的过程中,有一些特殊概念. SQL 流 ...

  2. Flink Table API和SQL(下)

    传送门: Flink Table API和SQL(上)(基本API介绍+流处理表的特性) Flink Table API和SQL(中)(时间属性及窗口+聚合查询+联结查询) Flink Table A ...

  3. Flink学习笔记(十一)Table API 和 SQL

    文章目录 11. Table API 和 SQL 11.1 快速上手 11.1.1 需要依赖 11.1.2 示例 11.2 基本 API 11.2.1 程序架构 11.2.2 创建表环境 11.2.3 ...

  4. 第六课 大数据技术之Fink1.13的实战学习-Table Api和SQL

    第六课 大数据技术之Fink1.13的实战学习-Table Api和SQL 文章目录 第六课 大数据技术之Fink1.13的实战学习-Table Api和SQL 第一节 Fink SQL快速上手 1. ...

  5. flink Table API 与SQL入门实战

    流处理和批处理都可以用,是非常的方便! 导入依赖 <dependency><groupId>org.apache.flink</groupId><artifa ...

  6. 1.18.2.5.Table APISQL(查询表、Table API、SQL、混用Table API和SQL、输出表、翻译与执行查询、Blink planner、Old planner)等

    1.18.2.5.查询表 1.18.2.5.1.Table API 1.18.2.5.2.SQL 1.18.2.5.3.混用Table API和SQL 1.18.2.6.输出表 1.18.2.7.翻译 ...

  7. Flink 使用Table Api 读取文件数据并写出到文件中

    前言 在上一篇我们演示了如何使用Flink 的Table Api 读取文件数据,并过滤特定字段的数据,本篇在上一篇的基础上,将从CSV文件中读取的数据重新输出到一个新的CSV文件中: 在实际业务场景下 ...

  8. 1.19.7.Table API、SQL、数据类型、保留关键字、查询语句、指定查询、执行查询、语法、操作符、无排名输出优化、去重、分组窗口、时间属性、选择分组窗口的开始和结束时间戳、模式匹配

    1.19.7.Table API 1.19.8.SQL 1.19.8.1.概述 1.19.8.1.1.SQL 1.19.8.1.2.数据类型 1.19.8.1.3.保留关键字 1.19.8.2.查询语 ...

  9. Flink temporal table join研究

    作者:王东阳 前言 ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这个标准.Temporal Table记录了历史上 ...

最新文章

  1. java web 哪些方法不能被从写_JAVA_WEB面试题
  2. Velocity——模板中转换字符串首字母小写解决方案
  3. ACCESS中的Update语句不支持Select的解决办法
  4. 嵌入式必会!C语言最常用的贪心算法就这么被攻略了
  5. 通过回调函数阻止进程创建(验证结束,方案完全可行)
  6. 2020年7大技术趋势
  7. java types.varchar_java statement.registerOutParameter(5, java.sql.Types.VARCHAR)返回值长度限制...
  8. 学习C#面向对象设计模式纵横谈---笔记
  9. linux学习之lvm2逻辑卷管理
  10. 2015 ACM Syrian Collegiate Programming Contest
  11. OpenCV_cv::Mat初始化
  12. html怎么隐藏信息,3.2.5 在HTML文件中隐藏信息
  13. 《Linux/UNIX系统编程手册(上、下册)》
  14. 水晶头做网线颜色排列
  15. “汇新杯”新兴科技+互联网创新大赛正式启动
  16. c语言 英文歌曲大赛,英文歌曲大赛活动方案
  17. AndrOid系统亭子运行,细讲Android系统下的Preference
  18. AutoML系列 | 04-AutoML系统中的元知识迁移应用
  19. mac系统下启用root用户
  20. windows下安装VMware Workstation14.0Pro(VMware系列一)

热门文章

  1. JVM自动内存管理机制——Java内存区域(下)
  2. PHP页面显示中文字符出现乱码
  3. linux相关命令介绍
  4. C#中用委托实现C++的回调函数
  5. Linux shell脚本 遍历带空格的文件名
  6. 技术演讲的技巧和经验
  7. C#.net同步异步SOCKET通讯和多线程总结(转)
  8. 辉哥给rockchip修复了一个内存溢出问题
  9. 纪念音视频界前辈-雷霄骅
  10. dedecms怎么改php版本_PHP程序员进阶之路