Flink学习4-流式SQL

Flink系列文章

  • 更多Flink系列文章请点击Flink系列文章

  • 更多大数据文章请点击大数据好文推荐

摘要

介绍Flink Table Sql API相关概念,还会提供一些例子。

本文大部分内容已经更新到Flink 1.11。

1 基本概念

1.1 概述

Flink高层API有两种:Table级别和SQL级别。两种API都是统一的处理批和流数据,也就是说对于无界、实时的流或者有界、记录型的流有着同样的处理语义,产生同样的结果。

Table和SQL API采用了Apache Calcite进行语句解析、验证和查询调优。
他们可以和DataStream及DataSet API无缝集成,并支持用户自定义的标量,聚合和表值函数。

Flink的关系型API旨在简化数据分析,数据管道和ETL应用程序。

下面这个示例功能和DataStream API中的相同,也是展示一个SQL查询将一个点击流session化,然后对每个session中的点击数计数:

SELECT userId, COUNT(*)
FROM clicks
GROUP BY SESSION(clicktime, INTERVAL '30' MINUTE), userId

这个SQL就是个流式处理SQL,简洁,高效。

1.2 限制

虽然flink 1.9.0支持DDL,但是尚不支持Time相关的元素。

可见:

  • FLIP-66: Support Time Attribute in SQL DDL
  • JIRA-Support Time Attribute in SQL DDL

注意,1.10开始版本中DDL已经支持Time相关内容!

1.3 动态表(Dynamic Tables)

1.3.1 概念

  • 动态表主要是对应了传统有界数据概念上的表,对比如下:

    / 关系代数/SQL 流处理
    数据量 有界 无界
    访问区域 查询时可访问完整数据 流式查询时不能访问完整数据,而是不断对新输入的数据访问并计算
    执行周期 执行一段时间后完毕 用不完结,持续执行,并根据收到的数据不断更新计算结果
  • 物化视图(Materialized Views)与流SQL
    高级数据库用来应对流数据场景,他被定义为一条SQL查询,会缓存查询结果用来查询访问(与虚拟视图需要执行SQL不同,无需每次使用都执行查询计算)。有一种Eager View Maintenance技术被用来在基表(定义视图时的查询语句的原表)更新时更新物化视图。

    我们可以想象,数据库表一般拥有一个变化的由增删改组成的数据流(Changelog Stream),那就需要持续查询处理该数据流来不断更新物化视图(物化视图本身就由SQL查询定义),也就是说物化视图就是流式SQL查询的结果。

  • 动态表和持续查询(Continuous Query)
    有了以上物化视图的相关概念,这里我们引出动态表。动态表是Flink 流式Table和SQL的核心概念(一般是逻辑概念,并不一定要在查询执行中物化),会持续更新。

    需要注意的是,Flink可以像查询静态表一样查询它们。

    动态表可像传统表一样执行查询(称为持续查询-Continuous Query),这种持续查询的结果形成动态表,并且不断更新该动态表。这跟更新物化视图差不多,持续查询在语义上也和批量模式中查询任意时刻的输入表的快照相同。这个过程如下图:

    1. Input Stream被转为动态表
    2. 持续查询在动态表上执行,生成新的动态表
    3. 生成的新动态表又被转回Output Stream
  • 动态表的存储
    Flink中动态表只是一个逻辑概念,Table和SQL API用它来统一处理有界和无界数据,注意Flink是不会保存动态表数据的,而是存储在外部系统之中(如数据库、MQ、文件系统等)。

1.3.2 例子

1.3.2.1 背景

比如一个点击流事件数据流,schema如下:

[user:  VARCHAR,   // 用户名cTime: TIMESTAMP, // 访问 URL 的时间url:   VARCHAR    // 用户访问的 URL
]

1.3.2.2 Group Aggregation

点击事件流随着事件不断流入,其实就相当于不断地对结果表的INSERT操作,可在逻辑上构成动态表(这种在流上定会以的表,其实在Flink内没有物化)。也就是说,本质上Flink使用仅有INSERT的Chanelog输入流来构建动态表。

下图左侧为点击流事件,右侧为转换后的动态表,随着点击流持续Insert而不断增长

首先考虑以下sql,计算用户维度的url访问总量。

select user,count(url) as cnt from clicks group by user;

此时每条数据都会触发聚合计算,一般需要接入能更新的数据源,如果是不能更新的如kafka就是一条条单独数据,持续查询原表生成动态表如下图:

  1. 查询刚开始时,输入表(左表)为空
  2. 当第一行[Mary, ./home]输入clicks原表时,输出了首个结果(INSERT)
  3. 第二行输入时,计算后INSERT结果到动态表
  4. 第三行又来一个Mary相关的,所以对动态表中的[Mary, 1]执行了UPDATE操作,更新为[Mary, 2]
  5. 第四行记录也是新人,所以也是INSERT到动态表

1.3.2.3 Window Aggregation

再来一个复杂一点的sql,计算每个1小时的翻滚窗口内(以事件时间计算)的用户维度的url访问总量。

select user,TUMBLE_END(cTime, INTERVAL '1' HOURS) as endT,count(ur) as cnt from clicks
group by user,TUMBLE(cTime, INTERVAL '1' HOURS);

此时数据会累积,直到水位到达窗口边界才会触发上一窗口计算,持续查询原表生成动态表如下图,这里颜色相同的为同一个user:

  • 根据cTime列不同,划分为不同窗口,分别进行计算。
  • 每次计算后将结果INSERT到动态表

1.3.2.4 Window Aggregation对比Group Aggregation


Group Aggregation如果不配置会导致结果表无限增大,最终会出问题。所以一般使用该模式需要清理掉过期数据并设置最小、最大时间。

StreamTableEnvironment.create(bsEnv, bsSettings).getConfig.setIdleStateRetentionTime(Time.days(1), Time.days(2))

这个用来指定空闲State被保留的的最小和最大时长,差值至少5分钟。超过最大时长未更新的State会被移除。

State清理后,关于该已经被清理的State的新来的数据会被当做首条数据处理,可能导致覆盖之前的结果。

1.3.3 Update / Append

虽然以上例子都是聚合运算,但有很大不同:

  • 普通聚合运算,是不断UPDATE结果表,用来定义结果表的Changelog流中包含INSERTUPDATE
  • 窗口聚合运算的Changelog流仅包含INSERT,只能Append到结果表!

一个查询是append-only还是update的影响:

  • update表通常需要维护更多状态,用来更新
  • 两种表转为Stream的方式不同

1.3.4 表到流的转换

1.3.4.1 概述

动态表支持INSERTUPDATEDELETE操作来不断修改。所以动态表可能是:

  • 只有不断被修改的单行数据
  • 一个insert-only表,没有UPDATE / DELETE来修改表
  • 或者介于以上两者之间

当要将动态表转为流或写入外部系统时,需要将将这三种操作编码。

需要注意的是,只有AppendRetract流模式下,才可以将动态表转为DataStream。详见table2datastream

使用TableSink接口来讲动态表发送到外部系统相关内容可见TableSources and TableSinks

还可参考

  • Flink Table 的三种 Sink 模式

1.3.4.2 Append Only Mode(增)

仅交互INSERT操作数据。

此时可通过将insert的新记录发送到下游来转换为流。

比如我们前面的例子2,翻滚窗口计算后的变化数据流中仅有INSERT变化,全部append到结果动态表中。

1.3.4.3 Retract Mode (增删改)

编码:
交互INSERT(编码为ADD Message)和DELETE(编码为RETRACT Message)、UPDATE(对于修改前的行来说编码为RETRACT Message,对于正在修改的行来说编码为ADD Message)操作数据。

特点:

  • 与Upsert Mode相反,Retract Mode不需要定义key。
  • 每个UPDATE操作由两条消息(RETRACT和ADD)组成,效率较低。

将动态表转为Retract Stream的例子:

上图解读:

  • 数据到来的顺序从上往下
  • 第三行的Mary是第二次读到的Mary,此时由于第一行已经读到过Mary(+ Mary,1),所以需要更新。
    此时需要先DELETE之前的- Mary,1,再INSERT + Mary,2,即upsert更新由两个操作实现。

1.3.4.4 Upsert Mode(增删改)

编码:
交互INSERTUPDATE(编码为UPSERT Message)和DELETE(编码为Delete Message)操作数据。

特点:

  • Upsert Mode需要一个唯一的key(可能是组合的),用来传播UPDATE事件。
    具体来说,外部连接器需要了解该唯一key属性,才能正确处理消息。
  • Upsert Mode和Retract Mode都支持增删改但编码不同
    Upsert Mode中的UPDATE事件使用单条UPSERT消息进行编码,而Retract Mode中的UPDATE由两条消息(RETRACT和ADD)组成,因此Upsert模式效率更高。

例子:
比如我们前面的例子1,计算后的变化数据流中有INSERT和UPDATE变化,会持续更新动态表中之前的结果。此时比起Append Only Mode来说会需要维护更多的状态信息,用来更新。但相比Retract Mode,本模式在修改数据时可直接Upsert,无需两个操作即可完成。

将动态表转为Upsert Stream的示意图如下:

  • 比如第三行的第二次遇到Mary,就直接修改即完成。
  • 注意看,上面的user列就是唯一KEY!

1.3.5 动态表查询限制

许多(但不是全部)语义上有效的查询可以作为流上持续查询进行代价评估。有些查询代价太高而无法计算,这可能是由于它们需要维护的状态大小,也可能是由于计算以及更新代价太高:

  • 状态大小
    持续运行的流式应用,有的需要更新之前输出的结果,那就必须维护所有输出行。

    比如前面的例子1就需要一直维护每个用户的URL总和以便每次计算时更新。随着时间流逝,注册用户越来越多,有的网站甚至会为未注册用户也绑定一个用户名,导致维护的状态会越来越大,最终可能导致这个持续查询失败!

  • 计算和更新
    一些持续查询需要重新计算和更新大量结果行,甚至即使只添加或更新了一行数据。这种情况显然不适合持续查询。

    比如以下SQL,以每个用户最后一次点击时间来为每个用户计算RANK:

        SELECT user, RANK() OVER (ORDER BY lastLogin)FROM (SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user);```
    此时一旦clicks表接收到一行新数据,则用户的`lastAction`字段就会更新,而新的rank也需要被计算。这会导致所有人的排名都重新计算!有一些参数可以控制维持状态大小和结果准确度之间的权衡:[Query Configuration](https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html)
    

1.4 Catalog

可参考

  • Catalogs
  • HiveCatalog

1.4.1 简介

Catalog用来存储元数据,包括数据库、表、分区、视图、函数以及需要访问数据库或外部系统数据的信息。

Catalog提供的元数据包括两方面:

  • 临时元数据
    如临时表、通过 TableEnvironment 注册的临时UDF等
  • 持久化元数据
    如存在HiveMetastore中的数据

Catalog提供统一的Table API和SQL来管理、访问元数据。

1.4.2 元数据映射

Catalog使得用户可直引用其他数据系统的元数据,映射到Flink。比如直接将JDBC表映射到Flink表,不需要重写FlinkSql DDL就能直接查询外部表了,十分方便。

1.4.3 可用的Catalog

1.4.3.1 概述

目前有

  • GenericInMemoryCatalog
    基于内存,所有元数据只在 session 的生命周期内可用。

    注意,大小写敏感。

  • JdbcCatalog
    使用JDBC协议连接管理元数据,目前只有PostgresCatalog实现。

    相见JdbcCatalog documentation

  • HiveCatalog
    可用来拿出Flink元数据、作为接口读写Hive现有元数据。

    注意,存储时都是小写。

  • 自定义Catalog

1.4.3.2 HiveCatalog

可参考

  • HiveCatalog
  • Hive集成方法

可以让Flink使用Hive Catalog存储Flink SQL 元数据。

可以在Hive命令行中使用DESCRIBE FORMATTED命令查看表的元数据,如果是is_generic=true代表是Flink专用表,这种表只能由Flink读写使用,不要用Hive去读写。

也可以直接使用Flink读写Hive表数据。

需要将以下包放入$FLINK_HOME/lib:

  • flink-connector-hive_2.11-1.11.0.jar
  • hive-exec-2.3.3.jar

使用hive存储flink sql元数据的时候,还可以做一些高级配置,可参考Flink-hive_catalog

简单来说,可以修改$FLINK_HOME/conf/sql-client-defaults.yaml:

execution:planner: blinktype: streaming...current-catalog: zeppelin-hive  # set the HiveCatalog as the current catalog of the sessioncurrent-database: flink_metacatalogs:- name: myhivetype: hivehive-conf-dir: /opt/hive-conf  # contains hive-site.xmlhive-version: 2.3.3

然后可以启动sql-client,会默认读取$FLINK_HOME/conf/sql-client-defaults.yaml

sql-client.sh embedded

如果报错flink sql-client ClassNotFoundException: LogFactory,则还需要设置HADOOP_CLASSPATH

出现如下界面表示sql-client启动成功:

关于sql-client,可参考

  • Flink 与 Hive 的磨合期
  • SQL Client Beta
  • Flink SQL Client初探

1.4.4 Catalog注册和创建

1.4.4.1 SQL

  • 创建和注册
    默认有内存中的default_catalog,包括一个default_database
// Create a HiveCatalog
val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>")// Register the catalog
tableEnv.registerCatalog("myhive", catalog)// Create a catalog database
tableEnv.executeSql("CREATE DATABASE mydb WITH (...)")
  • 切换当前catalog和数据库
    Flink总是通过当前catalog和数据库来搜索表、视图、UDF,可切换使用的catalog和数据库
USE CATALOG myCatalog;
USE myDB;
tableEnv.useCatalog("myCatalog");
tableEnv.useDatabase("myDb");
  • 查询非当前catalog的表
SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;
  • 创建表
tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)")
  • 展示catalog
show catalogs;
tableEnv.listCatalogs();
  • 展示数据库
show databases;
tableEnv.listDatabases();
  • 展示表
show tables;
// should return the tables in current catalog and database.
tableEnv.listTables()

也可以用yaml来配置catalog,再执行sql.

1.4.4.2 TableAPI

import org.apache.flink.table.api._
import org.apache.flink.table.catalog._
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.table.descriptors.Kafkaval tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance.build)// Create a HiveCatalog
val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>")// Register the catalog
tableEnv.registerCatalog("myhive", catalog)// Create a catalog database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))// Create a catalog table
val schema = TableSchema.builder().field("name", DataTypes.STRING()).field("age", DataTypes.INT()).build()catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(schema,new Kafka().version("0.11").....startFromEarlist().toProperties(),"my comment"),false)val tables = catalog.listTables("mydb") // tables should contain "mytable"

2 Table API

参考:

  • Table API

Table&Sql API可参考Concepts & Common API

2.1 概述

Table&Sql API核心在于Table,作为输入和输出。

2.2 Scala Api示例

2.2.1 基本代码结构

// create a TableEnvironment for specific planner batch or streaming
val tableEnv = ... // see "Create a TableEnvironment" section// create a Table
tableEnv.connect(...).createTemporaryTable("table1")
// register an output Table
tableEnv.connect(...).createTemporaryTable("outputTable")// create a Table from a Table API query
val tapiResult = tableEnv.from("table1").select(...)
// create a Table from a SQL query
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ...")// emit a Table API result Table to a TableSink, same for SQL result
val tableResult = tapiResult.executeInsert("outputTable")
tableResult...

2.2.2 Blink例子

批流统一的代码结构。

以下是使用Blink时的scala Streaming示例:

// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment// create a TableEnvironment for blink planner streaming
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
// 每个表都绑定到一个特定的TableEnvironment,不能跨TableEnvironment连接表,如join/union
val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)// 1. create a TableEnvironment for blink planner batch
val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)// 2. create TabletableEnv.connect(...).createTemporaryTable("table1")
// register an output Table
tableEnv.connect(...).createTemporaryTable("outputTable")// create a Table from a Table API query
val tapiResult = tableEnv.from("table1").select(...)
// create a Table from a SQL query
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ...")// emit a Table API result Table to a TableSink, same for SQL result
val tableResult = tapiResult.executeInsert("outputTable")
tableResult...

使用Blink时的scala Batch示例:

// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)

2.3 TableEnvironment

2.3.1 概述

TableEnvironment职责如下:

  • 注册table到内部catalog
  • 注册catalog
  • 加载插件化的module
  • 执行sql查询
  • 注册UDF
  • 将DtaStream/DataSet转为Table
  • 持有指向ExecutionEnvironment或StreamExecutionEnvironment的引用

每个Table都会绑定到某个TableEnvironment,不能在同一个查询里跨TableEnvironment执行连接表操作(如join/union)。

2.3.2 创建和配置

// create a TableEnvironment for blink planner streaming
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
// TableConfig,可用来配置TableEnvironment、优化查询
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
// TableEnvironment
val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

2.4 与DataStream/DataSet集成

2.4.1 概述

参见Integration with DataStream and DataSet API

  • 在流处理方面,两种Planner都可以与 DataStream API 集成使用。
  • 只有旧planner可以与 DataSet API 结合。
  • 在批处理方面,Blink Planner不能同两种DataStream或DataSet集成。

2.4.2 隐式转换

Scala Table API含有对DataSet、DataStream、 Table的隐式转换,DataStream API需要导入:

import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.api.scala._

2.4.3 通过 DataSet / DataStream创建临时视图

视图的schema依赖于DataStream / DataSet注册的数据类型,详见mapping of data types to table schema。

注意: 通过DataStream或DataSet创建的视图只能注册为临时视图。

// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" sectionval stream: DataStream[(Long, String)] = ...// register the DataStream as View "myTable" with fields "f0", "f1"
tableEnv.createTemporaryView("myTable", stream)// register the DataStream as View "myTable2" with fields "myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, 'myLong, 'myString)

2.4.4 将 DataStream / DataSet 转换成Table

// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = ... // see "Create a TableEnvironment" sectionval stream: DataStream[(Long, String)] = ...// convert the DataStream into a Table with default fields "_1", "_2"
val table1: Table = tableEnv.fromDataStream(stream)// convert the DataStream into a Table with fields "myLong", "myString"
val table2: Table = tableEnv.fromDataStream(stream, $"myLong", $"myString")val stream2: DataStream[(String, Int)] = ...
// 如果没指定,则field从DataStream自动获取
// DataStreamConversions#toTable,
// DataSet时使用DataSetConversions#toTable
val table = stream2.toTable(tEnv, 'name, 'amount)

2.4.5 将Table转换成DataStream / DataSet

2.4.5.1 类型转换

详细请参考Mapping of Data Types to Table Schema

Table可转为DataStream/DataSet,这样可以使自定义DataStream/DataSet来使用Table&SQL 查询结果。

转变时,需要指定生成的DataStream/DataSet的数据类型,通常来说最方便的转换类型是Row,所有选项如下:

2.4.5.2 Table转为DataStream

这里说的Table就是流式查询的结果,会随着新数据流入输入流中执行查询被不断地动态更新。因此,这种由动态查询转换成的DataStream需要对表的UPDATE进行编码。Table转为DataStream有两种模式:

  • Append Mode
    仅当动态表只被INSERT操作改动被使用,是append-only的,而且旧的转换结果不会再被更新。
  • Retract Mode
    任何情况下都可以使用,使用 boolean 值对INSERTDELETE操作的改动进行标记,True表示 INSERT, False 表示DELETE.。

示例代码:

// get TableEnvironment.
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section// Table with two fields (String name, Integer age)
val table: Table = ...// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple = tableEnv.toAppendStream[(String, Int)](table)// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream[(Boolean, X)].
//   The boolean field indicates the type of the change.
//   True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

Table转为DataStream后,必须使用StreamExecutionEnvironment.execute()来运行该DataStream程序。

2.4.5.3 Table转为DataSet

// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = BatchTableEnvironment.create(env)// Table with two fields (String name, Integer age)
val table: Table = ...// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

Table转为DataSet后,必须使用ExecutionEnvironment.execute()来运行该DataSet程序。

2.5 创建表

2.5.1 视图-虚拟表

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section// table is the result of a simple projection query
val projTable: Table = tableEnv.from("X").select(...)// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable)

内部其实就是封装了逻辑查询计划,但如果使用了Blink Planner,则在被多表引用时也只会被执行一次,结果可被多表共享。

2.5.2 创建connector表

其实就是外表,描述了外部存储系统

tableEnvironment.connect(...).withFormat(...).withSchema(...).inAppendMode().createTemporaryTable("MyTable")

2.5.3 identifier

包括 catalog/database/table,可直接指定前两者,则使用可省略

// get a TableEnvironment
val tEnv: TableEnvironment = ...;
tEnv.useCatalog("custom_catalog")
tEnv.useDatabase("custom_database")val table: Table = ...;// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("exampleView", table)// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_database.exampleView", table)// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("`example.View`", table)// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table)

2.6 查询

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section// register Orders table// scan registered Orders table
val orders = tableEnv.from("Orders")
// compute revenue for all customers from France
val revenue = orders.filter($"cCountry" === "FRANCE").groupBy($"cID", $"cName").select($"cID", $"cName", $"revenue".sum AS "revSum")// emit or convert Table
// execute query

2.7 Emit输出表

Table最终被输出到TableSink,他是一个通用接口,支持多种文件format、存储系统、MQ等。

  • 流Table可写入AppendStreamTableSink / RetractStreamTableSink / UpsertStreamTableSink
  • 批Table只能写入BatchTableSink。
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section// create an output Table
val schema = new Schema().field("a", DataTypes.INT()).field("b", DataTypes.STRING()).field("c", DataTypes.LONG())tableEnv.connect(new FileSystem("/path/to/file")).withFormat(new Csv().fieldDelimiter('|').deriveSchema()).withSchema(schema).createTemporaryTable("CsvSinkTable")// compute a result Table using Table API operators and/or SQL queries
val result: Table = ...// emit the result Table to the registered TableSink
result.executeInsert("CsvSinkTable")

3 SQL API

参考

  • SQL API
  • 依赖介绍

3.1 Table API & SQL 架构

3.2 执行原理

  • Flink 原理与实现:Table & SQL API
    阿里-伍翀
  • 基于Flink1.8 深入理解Flink Sql执行流程 + Flink Sql语法扩展
  • Flink | 使用Calcite做Sql语法解析

在1.11的Flink中,Table API不再被翻译为DataStream API,而是翻译为Transformation。

Flink 1.11 Sql执行流程如下:

  1. 使用Calcite解析SQL,这个过程会用到Catalog(表名、列信息、类型信息、PK信息等,可用于逻辑计划的后续优化),翻译为Logic Plan
  2. 将Logic Plan按一系列规则(如谓词下推、投影下推、Join重排等)翻译为Physical Plan
  3. 通过Code Generation将Physical Plan转为Transformation DAG 图
    这一阶段也有许多优化
  4. Transformation转为StreamGraph->JobGraph提交到Flink Cluster
  5. JobGraph生成带并行的信息的ExecutionGraph,最后生成虚拟的物理执行图运行

3.3 Blink和Old Planner

  • Blink是阿里贡献的
  • Blink将batch job也作为特殊的流来处理(会被转为DataStream程序处理),所以不能转为DataSet。
  • Blink不支持BatchTableSource,而是使用StreamTableSource
  • Sink处理不同:
    • Old Planner将Job中的多个Sink各自单独优化为一个DAG,每个DAG相互独立;
    • Blink将多个Sink优化为一个DAG。
  • Table&Sql翻译执行不同
    • Blink将多个他们全部转为DataStream程序,不论输入是流还是批。一个查询在Flink内部被描述为逻辑查询计划,并被翻译为两个阶段:

      • 逻辑执行计划优化
      • 转换为一个DataStream程序
        不同语句的Blink翻译时机转自官网:
  • 我们实践中现在都用Blink了

3.4 示例

3.4.1 基本结构

批流统一的代码结构,以下是使用Blink时的scala示例:

// create a TableEnvironment for blink planner streaming
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
// 每个表都绑定到一个特定的TableEnvironment,不能夸TableEnvironment操作表
val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)// 1. create a TableEnvironment for blink planner batch
val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)// 2. create TabletableEnv.connect(...).createTemporaryTable("table1")
// register an output Table
tableEnv.connect(...).createTemporaryTable("outputTable")// create a Table from a Table API query
val tapiResult = tableEnv.from("table1").select(...)
// create a Table from a SQL query
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ...")// emit a Table API result Table to a TableSink, same for SQL result
val tableResult = tapiResult.executeInsert("outputTable")
tableResult...

参见Integration with DataStream and DataSet API可查看Table API和DataStream、DataSet整合、转换的例子。

3.4.2 创建connector表

其实就是外表,描述了外部存储系统

tableEnvironment.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")

3.4.3 SQL

可参考FlinkSQL查询

查询指定表示例:

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section// register Orders table// compute revenue for all customers from France
val revenue = tableEnv.sqlQuery("""|SELECT cID, cName, SUM(revenue) AS revSum|FROM Orders|WHERE cCountry = 'FRANCE'|GROUP BY cID, cName""".stripMargin)// emit or convert Table
// execute query

更新指定表示例:

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section// register "Orders" table
// register "RevenueFrance" output table// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.executeSql("""|INSERT INTO RevenueFrance|SELECT cID, cName, SUM(revenue) AS revSum|FROM Orders|WHERE cCountry = 'FRANCE'|GROUP BY cID, cName""".stripMargin)

3.5 配置

  • Table-Configuration
  • 通用配置

3.6 Catalog和Identifier标识符

3.6.1 概述

TableEnvironment维护着一个由标识符(identifier)创建的表 catalog 的映射,标识符由catalogdatabasetable三部分组成。

用户可以指定默认的catalog和database

可以通过sql创建catalog

CREATE CATALOG catalog_nameWITH (key1=val1, key2=val2, ...)

创建DATABASE

CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name[COMMENT database_comment]WITH (key1=val1, key2=val2, ...)

Catalog可参考catalog

3.7 Table和View的创建

参考

  • CREATE Statements

  • Flink Table描述了外部数据,比如文件、数据库、消息队列等。

    CREATE TABLE [catalog_name.][db_name.]table_name({ <column_definition> | <computed_column_definition> }[ , ...n][ <watermark_definition> ][ <table_constraint> ][ , ...n])[COMMENT table_comment][PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]WITH (key1=val1, key2=val2, ...)[ LIKE source_table [( <like_options> )] ]<column_definition>:column_name column_type [ <column_constraint> ] [COMMENT column_comment]<column_constraint>:[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED<table_constraint>:[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED<computed_column_definition>:column_name AS computed_column_expression [COMMENT column_comment]<watermark_definition>:WATERMARK FOR rowtime_column_name AS watermark_strategy_expression<like_options>:
    {{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
    }[, ...]
    
  • Table也可以是虚表即view视图,可由对现存Table操作Table Api和SQL查询来创建。

    CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name[{columnName [, columnName ]* }] [COMMENT view_comment]AS query_expression
    

3.8 临时表和永久表

  • 临时表的生命周期绑定到Flink Session
    临时表存于内存,对其他Session不可见,也没有绑定catalog或数据库,但可以被创建到namespace中,但即使删除对应的namespace database也不会drop临时表!

  • 永久表一直存在直到被Drop,跨Flink Session和集群可见
    永久表需要catalog(如HiveCatalog)来存放表的元数据,对连接到该catalog的FlinkSession可见。

  • Shadowing
    如果注册和永久表同identifier的临时表,则临时表会掩盖永久表,此时永久表不可访问,所有对该标识符的表的访问都会路由到临时表。

    这个特性用来进行试验,一旦测试通过可以drop该临时表。

3.9 insert

参考

  • INSERT Statement

3.10 Explain

参考

  • EXPLAIN Statements

  • Explaining a Table

  • sql方式

EXPLAIN PLAN FOR SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' UNION ALL SELECT count, word FROM MyTable2;

还可以通过TableEnvironment.explainSql()TableEnvironment.executeSql()执行

  • Table.explain()
    得到表的查询计划

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = StreamTableEnvironment.create(env)val table1 = env.fromElements((1, "hello")).toTable(tEnv, $"count", $"word")
    val table2 = env.fromElements((1, "hello")).toTable(tEnv, $"count", $"word")
    val table = table1.where($"word".like("F%")).unionAll(table2)
    println(table.explain())
    
  • StatementSet.explain()
    得到多个sink的查询计划

    val settings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
    val tEnv = TableEnvironment.create(settings)val schema = new Schema().field("count", DataTypes.INT()).field("word", DataTypes.STRING())tEnv.connect(new FileSystem("/source/path1")).withFormat(new Csv().deriveSchema()).withSchema(schema).createTemporaryTable("MySource1")
    tEnv.connect(new FileSystem("/source/path2")).withFormat(new Csv().deriveSchema()).withSchema(schema).createTemporaryTable("MySource2")
    tEnv.connect(new FileSystem("/sink/path1")).withFormat(new Csv().deriveSchema()).withSchema(schema).createTemporaryTable("MySink1")
    tEnv.connect(new FileSystem("/sink/path2")).withFormat(new Csv().deriveSchema()).withSchema(schema).createTemporaryTable("MySink2")val stmtSet = tEnv.createStatementSet()val table1 = tEnv.from("MySource1").where($"word".like("F%"))
    stmtSet.addInsert("MySink1", table1)val table2 = table1.unionAll(tEnv.from("MySource2"))
    stmtSet.addInsert("MySink2", table2)val explanation = stmtSet.explain()
    println(explanation)
    

返回结果:

  • 查询转换后得到的抽象语法树AST,他是尚未优化的逻辑查询计划
  • 优化后的逻辑查询计划
  • 物理查询计划

3.11 TableSink

TableSink指输出表:

  • 可支持各种文件格式,如CSV, Apache Parquet, Apache Avro
  • 可支持各种存储系统,如JDBC、Apache HBase、Apache Cassandra、Elasticsearch
  • 可支持消息队列MQ,如 Apache Kafka、RabbitMQ

3.12 Blink对查询的优化

Flink利用并扩展Calcite来执行查询优化,有基于规则和基于代价的优化:

  • 子查询去相关性
  • 投影裁剪
    投影解释:投影运算也是一个单目运算,它是从一个关系R中选取所需要的列组成一个新关系,就是select a,b,c这样的语句,让结果集仅包含指定列,这种操作称为投影查询。
  • 分区裁剪
  • 过滤器下推
    否则会收集大量数据后再过滤
  • 子计划去重以避免重复计算
  • 特殊子查询重写,包括两部分:
    • INEXISTS转为left semi-joins
    • NOT INNOT EXISTS 转换为 left anti-join
  • 可选的join重排序,通过table.optimizer.join-reorder-enabled开启

注意: 当前仅在子查询重写的结合条件下支持 IN / EXISTS / NOT IN / NOT EXISTS。

Flink优化器不仅基于查询计划,还基于从数据源获得的统计信息以及每个算子的细粒度成本(如 io,cpu,网络和内存)来做最佳决策。

高级用户还可以通过CalciteConfig提供自定义优化,可以通过调用 TableEnvironment#getConfig#setPlannerConfig 将该对象传递给 TableEnvironment。

3.13 SQL Hints

3.13.1 概述

Flink 1.11加入的新特性,可用来改变SQL的查询计划。

  • 指定planner
    Blink和默认的Planner都有各自最佳场景,所以可指定Planner
  • 追加元数据和统计数据
    table index for scanskew info of some shuffle keys是动态变化的
  • 算子资源限制
    多数情况给与算子默认资源配置(最小并行度或managed memory或指定的资源需求(GPC/SSD)等)。可通过Hints在Query级别控制资源。

3.13.2 Dynamic Table Options

可在每次Query时动态指定或覆写表的option。需要加上配置set table.dynamic-table-options.enabled=true来开启,默认关闭。

否则在查询该表时使用hints会报错:

Fail to run sql command: select * from xxx /*+ OPTIONS('scan.startup.mode'='earliest-offset') */
org.apache.flink.table.api.ValidationException: OPTIONS hint is allowed only when table.dynamic-table-options.enabled is set to true

3.13.3 hints语法

table_path /*+ OPTIONS(key=val [, key=val]*) */
  • key:
    字符串类型的配置名
  • val:
    字符串类型的配置值

3.13.4 例子

CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);
CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);-- override table options in query source
select id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;-- override table options in join
select * fromkafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1joinkafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2on t1.id = t2.id;-- override table options for INSERT target table
insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;

3.14 时态表 Temporal Table

3.14.1 概述

注意和临时表(Temporary Table)区分!完全是不同概念。

时态表表示一个参数化的视图View,运行在一个变动中的Table上,返回指定时间点的该表的内容。

变动Table包括两类:

  • 数据库表的changelog构成的changelog历史表
    Flink可追踪变动,允许查询特定时刻表的内容。在Flink里,这种表由Temporal Table Function表示。
  • 物化变动构成的维度表
    允许查询特定ProcessingTime时的表内容。在Flink里,这种表由Temporal Table表示。

3.14.2 目标

3.14.2.1 关联changing history table

Flink可追踪变动,允许查询特定时刻表的内容。在Flink里,这种表由Temporal Table Function表示。

可将append-only表的行解释为表的ChangeLog,返回指定时间点的该表的内容。将 append-only 的表解释为ChangeLog需要指定主键以及时间戳属性:

  • 主键用来确定哪些行将被覆盖
  • 时间戳用来确定行有效的时间

时态表主要目标是简化此类查询,加速执行,减少State用量。

比如有下表

SELECT * FROM RatesHistory;rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Euro        114
09:00   Yen           1
10:45   Euro        116
11:15   Euro        119
11:49   Pounds      108

RatesHistory表表示其他币种兑换日元Yen的汇率,append-only

如果要输出所有10:58时刻的汇率,SQL如下:

SELECT *
FROM RatesHistory AS r
WHERE r.rowtime = (SELECT MAX(rowtime)FROM RatesHistory AS r2WHERE r2.currency = r.currencyAND r2.rowtime <= TIME '10:58');

结果:

rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Yen           1
10:45   Euro        116

可以看到确实是10:58时刻的汇率,也就是该表的快照。

该例子中,currency即为主键,rowtime为时间戳属性列。

该表在Flink由Temporal Table Function表示。

3.14.2.2 关联changing dimension table

允许查询特定ProcessingTime时的表内容。在Flink里,这种表由Temporal Table表示。

现有表LatestRates,内容是最新的汇率,也就是说他是物化的RatesHistory历史的物化终态表。

如果我们10:58(处理时间)的时候查询该表:

10:58> SELECT * FROM LatestRates;
currency   rate
======== ======
US Dollar   102
Yen           1
Euro        116

如果我们12:00(处理时间)的时候查询该表:

12:00> SELECT * FROM LatestRates;
currency   rate
======== ======
US Dollar   102
Yen           1
Euro        119
Pounds      108

3.14.3 Temporal Table Function

使用时态表,要访问数据时,必须传递一个时间属性,该属性用来确定将要返回的表的版本。

定义后,Temporal Table Function可用一个时间参数timeAttribute产生一些列行,该集合包含相对于给定时间属性的所有现有主键的行的最新版本。

比如在RatesHistory表上定义了一个Temporal Table Function:Rates(timeAttribute),可返回给定时间属性列值时的Rates状态,则查询如下:

SELECT * FROM Rates('10:15');rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Euro        114
09:00   Yen           1SELECT * FROM Rates('11:00');rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
10:45   Euro        116
09:00   Yen           1

注意,其实目前Flink不支持直接传入常量到Temporal Table Function,目前只能应用在join中,上例只是为了让我们更好理解而已!

对append-only table定义Temporal Table Function示例代码:

// Get the stream and table environments.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)// Provide a static data set of the rates history table.
val ratesHistoryData = new mutable.MutableList[(String, Long)]
ratesHistoryData.+=(("US Dollar", 102L))
ratesHistoryData.+=(("Euro", 114L))
ratesHistoryData.+=(("Yen", 1L))
ratesHistoryData.+=(("Euro", 116L))
ratesHistoryData.+=(("Euro", 119L))// Create and register an example table using above data set.
// In the real setup, you should replace this with your own table.
val ratesHistory = env.fromCollection(ratesHistoryData).toTable(tEnv, 'r_currency, 'r_rate, 'r_proctime.proctime)tEnv.createTemporaryView("RatesHistory", ratesHistory)// Create and register TemporalTableFunction.
// Define "r_proctime" as the time attribute and "r_currency" as the primary key.
// TemporalTableFunction内容
val rates = ratesHistory.createTemporalTableFunction($"r_proctime", $"r_currency")
// 以Rates为名字注册该函数
tEnv.registerFunction("Rates", rates)

3.14.4 Temporal Table

仅支持Blink planner。

为了访问时态表,当前必须使用 LookupableTableSource 定义一个 TableSource。可使用FOR SYSTEM_TIME AS OF语法来查询时态表。

查询时态表LatestRates:

SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '10:15';currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '11:00';currency   rate
======== ======
US Dollar   102
Euro        116
Yen           1

结果和上面使用Temporal Table Function的示例相同。

注意,其实目前Flink不支持直接传入常量来查询Temporal Table,目前只能将时态表应用在join中,上例只是为了让我们更好理解而已!

定义时态表:

// Get the stream and table environments.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings.newInstance().build()
val tEnv = StreamTableEnvironment.create(env, settings)
// or val tEnv = TableEnvironment.create(settings)// Define an HBase table with DDL, then we can use it as a temporal table in sql
// Column 'currency' is the rowKey in HBase table
tEnv.executeSql(s"""|CREATE TABLE LatestRates (|    currency STRING,|    fam1 ROW<rate DOUBLE>|) WITH (|    'connector' = 'hbase-1.4',|    'table-name' = 'Rates',|    'zookeeper.quorum' = 'localhost:2181'|)|""".stripMargin)

LookupableTableSource定义请参考这里

3.15 Join

3.15.1 概述

动态表的join概念比普通静态表要难理解一些。

关于FlinkSql支持的Join可参照这里

以下开始讲不同Join场景。

3.15.2 Regular Join

指两个流的join。

这种场景下,两个表中的新数据、旧数据改动都会造成新的join计算。比如左表来了条新数据,则会和右表的旧的、未来的数据全部计算join。

这个join语法适用于存在任意修改类型(insert, update, delete)的输入表。

但这样Join需要被join的两个表的输入永久保存在Flink的State里。因此,使用的资源量也会随着输入表的增长而无限增长。所以需要配置数据状态驻留期限。参阅Query Configuration

  • Inner Equi-join
    目前只支持等值连接,即join条件至少包含一个=的维持,不支持任何 cross join 和 theta join。

    还要注意,Join的顺序未优化,目前会按照sql中from后表定义的顺序依次执行join,需要确保该顺序不会导致cross join(笛卡儿积),否则会导致失败。

    最后需要注意的是需要配置State过期时间,否则会无限增长。参阅Query Configuration

    正例如下:

    SELECT * FROM Orders
    INNER JOIN Product
    ON Orders.productId = Product.id
    
  • Outer Equi-join
    目前只支持等值连接,还要注意,Join的顺序未优化,也需要配置State过期时间,同Inner Equi-join

    正例如下:

    SELECT *
    FROM Orders LEFT JOIN Product ON Orders.productId = Product.idSELECT *
    FROM Orders RIGHT JOIN Product ON Orders.productId = Product.idSELECT *
    FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id
    

3.15.3 Interval Join(时间区间关联)

Regular Join的子集,可以使用流的方式进行处理。

不同的是,Interval Join只支持带有时间属性的append-only表,不支持删改。

而且,由于时间属性是准单调递增的,因而Flink可以从State移除旧的值,而不会影响结果正确性。

执行时,Interval join需要至少一个等值连接谓词和一个限制了两表的时间的 join 条件。如使用两个适当的Range谓词(<, <=, >=, >),一个 BETWEEN 谓词或单个比较两个输入表中相同事件类型(process/event time)的时间属性的等值谓词。

正例如下

ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

将所有收到订单后4小时内发货的订单表和他们对应的发货单进行join的例子:

SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId ANDo.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

3.15.4 时态表函数 Join

3.15.4.1 概述

这是将一个append-only table作为左表和时态表作为右表进行join。

以下例子展示了一个join的例子。

  • Orders
    append-only table
SELECT * FROM Orders;rowtime amount currency
======= ====== =========
10:15        2 Euro
10:30        1 US Dollar
10:32       50 Yen
10:52        3 Euro
11:04        5 US Dollar
  • RatesHistory
    时态表,具体是个append-only table类型的各币种对日元(Yen)汇率变动。该表在Flink由Temporal Table Function表示。
SELECT * FROM RatesHistory;rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Euro        114
09:00   Yen           1
10:45   Euro        116
11:15   Euro        119
11:49   Pounds      108

如果要将Orders表的金额转为日元,一般会这么做

SELECTSUM(o.amount * r.rate) AS amount
FROM Orders AS o,RatesHistory AS r
WHERE r.currency = o.currency
AND r.rowtime = (SELECT MAX(rowtime)FROM RatesHistory AS r2WHERE r2.currency = o.currencyAND r2.rowtime <= o.rowtime);

而如果使用 Temporal Table Function ,注册一个在时态表RatesHistory上的函数Rates,可大大简化:

SELECTo.amount * r.rate AS amount
FROMOrders AS o,LATERAL TABLE (Rates(o.rowtime)) AS r
WHERE r.currency = o.currency

每条probe左表的数据会和build右表的对应时间属性版本的所有数据行做jon。也就是说,上例中Orders表的每条记录会和右表Rates在对应的o.rowtime的版本的数据做join。

右表需要主键以修改,这里是currency列。

如果使用ProcessingTime来查询,则新append的order总是会join最新版本的Rates数据。

可以发现,时态表Join和常规Join最大不同就是,右表(时态表)有新数据也不会影响之前的join结果。这可限制Flink在State中保存元素的数量。也就是说,随着时间推移,之前的不再需要的版本的数据会从State中移除。

3.15.4.2 使用

注意,目前时态表join时的状态过期尚未实现(Fink1.11),可能导致查询使用的状态无限增长。

关于ProcessingTime和EventTime时的时态表join可参考这里

  • ProcessingTime
    此时不能传递过去的时间属性给temporal table function,而只能总是当前时间。所以这种场景下每次调用temporal table function总是返回最新的已知版本的表数据。

    注意,此时仅build表的最新版本(相对于定义的主键)的数据会被保存在状态中。

    build表的更新不会影响先前发出的join结果,因为只会让左表和build表最新版本数据join。

    可以将这种join想象为HashMap<K, V>,存有所有build表的行。如果新数据有着和旧数据重复key,此时直接覆盖该缓存即可。此时每个probe左表总是使用最新的HashMap。

  • EventTime
    此时可传递过去的时间给temporal table function,进行join。

    • EventTime
      此时可传递过去的时间给temporal table function,进行join。

    此场景下,会将自从上次水位依赖所有版本的行保存到State中。水位的作用是推进join算子进程,丢弃不需要的版本的build table数据。

    举个例子,比如拥有12:30:00的事件时间戳的行append到probe表,此时就会和时态表12:30:00版本的数据(即时间戳小于等于该时间的数据)进行join,并会不断更新主键,并根据主键应用更新直到该时间点。

3.15.5 时态表 Join

3.15.4.1 概述

这是将一个包含任意操作的表作为probe左表和时态表(必须由LookupableTableSource支撑)作为build右表进行join。

比如我们有一个不断物化变化的当前汇率表LatestRates作为时态表:

10:15> SELECT * FROM LatestRates;currency   rate
======== ======
US Dollar   102
Euro        114
Yen           110:30> SELECT * FROM LatestRates;currency   rate
======== ======
US Dollar   102
Euro        114
Yen           110:52> SELECT * FROM LatestRates;currency   rate
======== ======
US Dollar   102
Euro        116     <==== changed from 114 to 116
Yen           1

另有一个append-only表Orders:

SELECT * FROM Orders;amount currency
====== =========2 Euro             <== arrived at time 10:151 US Dollar        <== arrived at time 10:302 Euro             <== arrived at time 10:52

以下例子展示了普通标Orders和时态表LatestRates join以计算订单金额转换为当时汇率对应的日元的例子。

期待的结果如下:

amount currency     rate   amout*rate
====== ========= ======= ============2 Euro          114          228    <== arrived at time 10:151 US Dollar     102          102    <== arrived at time 10:302 Euro          116          232    <== arrived at time 10:52

joinSQL:

SELECTo.amout, o.currency, r.rate, o.amount * r.rate
FROMOrders AS oJOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS rON r.currency = o.currency

每条probe左表的数据会和build右表的当前版本的数据行做join。也就是说,上例中Orders表的每条新记录会和右表LatestRates最新版本的数据做join,因为这里使用的是proctime

可以发现,时态表Join和常规Join最大不同就是,右表(时态表)有新数据也不会影响之前的join结果。同时,时态表join算子十分轻量级,不需要保存任何State!

3.15.5.2 使用

时态表join语法:

SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
ON table1.column-name1 = table2.column-name1
  • 目前仅支持Blink planner
  • 目前仅支持SQL API,不支持Table API
  • 目前仅支持ProcessingTime 时态表join,不支持EventTime
  • 目前仅支持INNER JOINLEFT JOIN
  • FOR SYSTEM_TIME AS OF必须跟在时态表之后
  • proctime是probe表table1的处理时间属性,含义是join时,会获取时态表在处理时间的快照进行join

实例:

SELECTSUM(o_amount * r_rate) AS amount
FROMOrdersJOIN LatestRates FOR SYSTEM_TIME AS OF o_proctimeON r_currency = o_currency

3.15.6 小结

  • Temporal Tables 是跟随时间变化而变化的表。
  • Temporal Table Function 提供访问 Temporal Tables 在某一时间点的状态的能力
  • Join Temporal Table Function 的语法与 Join Table Function 一致
  • 目前仅支持在 Temporal Tables 上的 inner join

temporal table function join 对比 temporal table join:

  • 相同:

    • 目标相同
  • 不同:
    • SQL语法不同
      前者使用join UDTF,后者使用标准的时态表语法(SQL:2011)
    • State保存不同
      前者将join的两个流保存在State,后者仅接受输入流,并在外部数据库查找对应key的记录
    • 前者一般用来和changelog stream做join,后者一般和外部表(维表)做join

3.15.7 Join with Table Function (UDTF)

将表与另一个表的函数执行的结果进行 join 操作。左表(outer)中的每一行将会与右表的函数执行所产生的所有结果中相关联的行进行 join 。

当然,要先注册UDTF。

  • Inner Join
    此时如果函数调用结果为空,则对应的左表行会被丢弃
SELECT users, tag
FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag
  • Left Outer Join
    此时如果函数调用结果为空,则对应的左表行会被保留,使用null填充
SELECT users, tag
FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE

4 Connector的前置概念

4.1 概述

可参考:

  • Connect to External Systems

DDL不行,我们可以用Connect to External Systems,直接读写外部数据源流批数据:

用来定义外部数据源连接。不是所有都支持流/批,支持批的Connector支持的Update Mode也不尽相同。

支持的输出:

  • 可支持各种文件格式,如CSV, Apache Parquet, Apache Avro
  • 可支持各种存储系统,如JDBC、Apache HBase、Apache Cassandra、Elasticsearch、Hive
  • 可支持消息队列MQ,如 Apache Kafka、RabbitMQ

注意,从1.11.0开始相关API有巨大变化,要用老的请查看legacy documentation

  • Table Schema
    定义表的schema,描述了怎么将Table Source的数据格式映射到Table API的schema,以及Table映射到Sink的方式。可暴露给SQL查询。
  • 支持Time属性
    可以使用一个或多个字段来提取或插入时间属性到Table Schema。

Flink连接外部系统可通过以下两种方式指定:

  • 使用 Table & SQL API,搭配org.apache.flink.table.descriptors下的内容
  • 通过SQL客户端的YAML配置文件声明

一个Table & SQL API中连接外部数据源语句基本结构:

tableEnvironment
// 定义连接外部数据源.connect(...)// 定义解析外部数据源中数据格式.withFormat(...)// 定义流式数据表的schema.withSchema(...)// 定义输出表的更新模式(update modes).inAppendMode()// 注册Source表到flink.registerTableSource("MyTable")// 注册Sink表到flink.registerTableSink// 使用相同名字注册Source和Sink表.registerTableSourceAndSink

一个从Kafka中读Avro格式存储的数据的例子:

tableEnvironment
// 定义连接外部Kafka数据源的配置
.connect(new Kafka().version("0.10").topic("test-input").startFromEarliest().property("zookeeper.connect", "localhost:2181").property("bootstrap.servers", "localhost:9092")
)// 定义解析外部kafka数据源中数据格式
.withFormat(new Avro().avroSchema("{" +"  \"namespace\": \"org.myorganization\"," +"  \"type\": \"record\"," +"  \"name\": \"UserMessage\"," +"    \"fields\": [" +"      {\"name\": \"timestamp\", \"type\": \"string\"}," +"      {\"name\": \"user\", \"type\": \"long\"}," +"      {\"name\": \"message\", \"type\": [\"string\", \"null\"]}" +"    ]" +"}")
)// 定义流式数据表的schema
.withSchema(new Schema().field("rowtime", Types.SQL_TIMESTAMP).rowtime(new Rowtime().timestampsFromField("timestamp").watermarksPeriodicBounded(60000)).field("user", Types.LONG).field("message", Types.STRING)
)// 定义输出表的更新方式为append
.inAppendMode()// 同时注册source和sink
.registerTableSource("MyUserSourceTable");
//registerTableSink可参考《深入理解flink》243页
.registerTableSink("MyUserSinkTable");

配置的连接属性会被转换为标准化的、基于String的key-value键值对。会基于Java SPI机制搜索唯一匹配的Table Factory来创建Table Source、Table Sink以及相应的format。

4.2 示例

我们需要使用FlinkSql定义表名、表的Schema以及表的属性,以便连接外部系统。

以下注册Kafka数据源+读取json数据的例子:

CREATE TABLE MyUserTable (-- 定义流式数据表的schema,包括字段、水位等,其实就是flink内部和外部系统的映射关系`user` BIGINT,message STRING,ts TIMESTAMP,proctime AS PROCTIME(), -- use computed column to define proctime attributeWATERMARK FOR ts AS ts - INTERVAL '5' SECOND  -- use WATERMARK statement to define rowtime attribute
) WITH (-- declare the external system to connect to'connector' = 'kafka','topic' = 'topic_name','scan.startup.mode' = 'earliest-offset','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'   -- declare a format for this system
)

由dynamic-table-factories(基于java SPI查找)来根据sql创建TableSource、TableSink以及对应的format,。

4.3 主键

1.11.0以前靠的是group by后的关键字进行推断,1.11.0以后可以指定。

主键约束表名一个或若干列是一个表内的唯一一行,且不包含null值。

在SinkTable内使用主键,一般被用来做upsert操作。

由用户来确保查询强制执行键完整性。

CREATE TABLE MyTable (MyField1 INT,MyField2 STRING,MyField3 BOOLEAN,PRIMARY KEY (MyField1, MyField2) NOT ENFORCED  -- defines a primary key on columns
) WITH (...
)

4.4 时间属性

4.4.1 Processing time

即执行相应算子所在机器系统时间(wall-clock time墙上时间),最简单的时间属性,但有不确定性。

该属性不需要时间戳提取器(timestamp extract)和水位生成器。

  • DDL

    CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time AS PROCTIME() -- 定义user_action_time列,作为processingTime属性列
    ) WITH (...
    );SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
    FROM user_actions
    GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
    
  • DataStream-to-Table转换时定义
    必须在schema最后声明processingTime属性列

    val stream: DataStream[(String, String)] = ...// 声明一个额外的逻辑字段$"user_action_time"作为processingTime属性列
    // 所以,必须在schema最后声明processingTime属性列
    val table = tEnv.fromDataStream(stream, $"UserActionTimestamp", $"user_name", $"data", $"user_action_time".proctime)val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")
    
  • TableSource
// define a table source with a processing attribute
// 注意TableSource需要实现DefinedProctimeAttribute接口
class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {override def getReturnType = {val names = Array[String]("user_name" , "data")val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)Types.ROW(names, types)}override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {// create streamval stream = ...stream}// 定义逻辑processingTime属性列override def getProctimeAttribute = {// field with this name will be appended as a third field"user_action_time"}
}// register table source
tEnv.registerTableSource("user_actions", new UserActionSource)val windowedTable = tEnv.from("user_actions").window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")

4.4.2 Event time

指用户附加给每行数据的时间戳,Flink基于该时间戳作为时间基准进行处理。

特点是可以处理一定范围内的乱序事件和迟到时间(一致性保证),可保证从同一个存储层内反复读取同样一批数据时结果是可重放的。

将Event time设为时间属性时,Flink需要从数据记录中提取时间戳以及生成水位,以便处理乱序和迟到事件。

  • DDL

    CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategyWATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
    ) WITH (...
    );SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
    FROM user_actions
    GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
    
  • DataStream-to-Table转换时定义
    待转换的DataStream必须已经定义时间戳和水位。

    有两种方法定义DataStream中的EventTime 时间戳:

    • x.rowtime时x字段不存在
      在 schema 的结尾追加一个新的字段
    • x.rowtime时x字段存在
      替换已经存在的x字段
    // Option 1:// extract timestamp and assign watermarks based on knowledge of the stream
    val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)// declare an additional logical field as an event time attribute
    val table = tEnv.fromDataStream(stream, $"user_name", $"data", $"user_action_time".rowtime)// Option 2:// extract timestamp from first field, and assign watermarks based on knowledge of the stream
    val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)// the first field has been used for timestamp extraction, and is no longer necessary
    // replace first field with a logical event time attribute
    val table = tEnv.fromDataStream(stream, $"user_action_time".rowtime, $"user_name", $"data")// 使用:
    val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")
    
  • TableSource

// 定义一个有EventTime属性的 TableSource
// 必须实现DefinedRowTimeAttributes接口
class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {override def getReturnType = {val names = Array[String]("user_name" , "data", "user_action_time")val types = Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.LONG)Types.ROW(names, types)}// 需要确保此方法返回的DataStream 已经定义好了时间属性。// 1.定义StreamRecordTimestamp,从DataStream提取时间戳// 2.定义PreserveWatermarks,以保留DataStream的水位// 否则,只有TableSource的 rowtime属性有效override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {// 构造 DataStream// ...// 基于 "user_action_time" 定义 watermarkval stream = inputStream.assignTimestampsAndWatermarks(...)stream}// 本方法用来返回RowtimeAttributeDescriptor列表// 内部包含了EventTime属性字段的名字、计算方法、水位生成策略等信息override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {// 标记 "user_action_time" 字段是事件时间字段// 计算方法是使用 "user_action_time" 字段构建// 水位生成策略为AscendingTimestamps周期性递增式水位val rowtimeAttrDescr = new RowtimeAttributeDescriptor("user_action_time",new ExistingField("user_action_time"),new AscendingTimestamps)val listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr)listRowtimeAttrDescr}
}// register the table source
tEnv.registerTableSource("user_actions", new UserActionSource)val windowedTable = tEnv.from("user_actions").window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")

4.4.3 Ingestion time

数据到达Flink Source的时间,内部处理方式类似Event time

注意:只要时间属性没有被修改过,而是仅仅被传递,那就一直保持为合法时间属性,可以和普通时间戳一样被访问计算,但只要被计算就会被物化、称为常规时间戳,就不能再作为Flink时间属性了,也不能在水位系统中起作用了,更不能作用到时间相关计算!

4.4.4 时间属性使用

  • 时间属性种类指定
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default// 可选
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  • 时间属性字段指定
    如Window等基于时间的算子,就需要使用时间属性,所以应该在建表时指定时间属性字段。指定时机如下:

    • DDL CREATE TABLE
    • DataStream
    • TableSource
  • 时间属性字段传递
    时间属性可被传递,只要没有修改,就可以在一个表简单传递到另一个表后继续使用。

  • 时间属性字段物化
    当时间属性字段被用在计算中时,就会被物化为普通时间戳,该时间戳不能配合Flink时间和水位使用了,也就无法用在基于时间的算子中

4.5 Connector原理

4.5.1 概述

DynamicSourceDynamicSink可被用来从外部系统读或写入数据到外部系统,他们也可以被称为Connector

下图展示了在翻译过程中,从一个stage到下一个stage时,Object如何被转为其他Object:

4.5.2 Metadata

动态表的元数据(包括DDL和由Catalog提供的)由CatalogTable实例表示。

对应上图的Catalog->CatalogTable。

4.5.3 Planning

4.5.3.1 概述

利用CatalogTable来生成具体Connector相关的逻辑执行计划。

当开始为Table生成执行计划和优化,CatalogTable被解析为DynamicTableSource(用在SELECT语句中读取)和DynamicTableSink(用在INSERT INTO语句中写入)。

解析过程中DynamicTableSourceFactory(生成DynamicTableSource具体实现类,如JdbcDynamicTableSource)和DynamicTableSinkFactory(生成DynamicTableSink具体实现类,如JdbcDynamicTableSink)接口的具体实现类就会根据指定的Connector逻辑来翻译CatalogTable元数据。Dynamic Factory被用来根据catalog和session信息中为外部存储系统配置动态表Connector。这些工厂类的目的是验证DDL选项(如图中的'port' = '5022')、配置编解码格式等,最后创建参数化的Table Connector实例。

4.5.3.2 SPI

这两个工厂类接口的实现类查找和初始化是通过java SPI机制实现,也就是说DDL中的'connector' = 'custom'必须能找到对应的工厂类。SPI将检查唯一匹配的工厂类,该工厂类由工厂标识符和请求的父类名字(例如DynamicTableSourceFactory)唯一标识。

比如可以看看flink-connector-jdbc相关SPI定义文件:

  • META-INF/services/org.apache.flink.table.factories.Factory
    org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory
  • META-INF/services/org.apache.flink.table.factories.TableFactory
    org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactory
    org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory

更多关于SPI内容可参考:

  • Java SPI 机制在 Flink SQL 中的应用
  • Flink1.10基于工厂模式的任务提交与SPI机制

如果需要,Catalog也可以绕过这个工厂SPI发现机制,那就需要 org.apache.flink.table.catalog.Catalog#getFactory返回一个实现了所请求的父类的实现类的实例。

FlinkSql Planner会使用Source和Sink实例来执行指定Connector相关的相互通信,直到发现最佳逻辑执行计划。

依赖于可选的接口(如SupportsProjectionPushDown或SupportsOverwrite等),planner也许会将修改应用到生成的实例上,并修改运行时实现。

4.5.3.3 DynamicTableSource

套路为DynamicTableSourceFactory -> DynamicTableSource -> ScanRuntimeProvider,工厂类负责翻译DDL option,Source负责创建运行时逻辑。

当读取动态表时,读到的数据可能被视为以下两个:

  • ScanTableSource
    ChangeLog,Source在batch场景发送有界的、insert-only流;Streaming场景发送无界的、insert-only流;CDC(Change Data Capture)场景发送无界/有界的、可包含增删改的记录的流。

    从外部数据中扫描整张表的所有行。可包含增删改的行,所以可被用来读取changelog。

    getChangelogMode返回ChangelogMode,表示运行时可能返回的Changelog类型集合。

    如果Source有一些额外的能力,可以实现org.apache.flink.table.connector.source.abilities包内的接口,如SupportsProjectionPushDown投影下推。

    ScanTableSource生产的记录必须是org.apache.flink.table.data.RowData,用来在Flink内部传递。

  • LookupTableSource
    一个持续变化或非常大的外部表,这个表的内容一般不会被整个读取,而是在需要时单独读取。

    在运行时,通过一个或多个Key来从外部系统中查找数据行。不需要读取整张表,可以懒惰地获取单独的数据。

    注意,LookupTableSource目前仅支持insert-only变更;不支持abilities包内的接口。

    LookupTableSource的运行时实现是TableFunctionAsyncTableFunction。 在运行期间,将使用给定查找key的值来调用该函数。

Class可以同时实现以上两个接口,由Flink Planner来根据具体的查询内容来决定使用的方法。

4.5.3.4 DynamicTableSink

当写入动态表时,数据可被视为ChangeLog。getChangelogMode方法返回ChangelogMode,表示运行时可能返回的Changelog类型集合。

  • Sink在batch场景只能接收insert-only行,输出有界流;
  • Sink在Streaming场景只能接收insert-only行,输出无界流;
  • Sink在CDC(Change Data Capture)场景可接收增删改的行,输出无界/有界流。

可实现额外功能,详见org.apache.flink.table.connector.sink.abilities包,如SupportsOverwrite

DynamicTableSource输出org.apache.flink.table.data.RowData对应的,DynamicTableSink必须消费该类结构的数据。

4.5.3.5 Encoding / Decoding Format

一些Table Connector接收不同的format以编解码key和value。Format实现层次类似DynamicTableSourceFactory -> DynamicTableSource -> ScanRuntimeProvider,发现机制也是用的SPI。

4.5.4 Runtime

逻辑执行计划 -> 物理执行计划。

一旦逻辑执行计划生成,Planner会从具体的TableConnector中生成运行时实现逻辑,这些运行逻辑是Flink Connector的接口如InputFormatSourceFunction等的实现类实现的。

这些接口按其他层级的抽象,分组为ScanRuntimeProviderLookupRuntimeProviderSinkRuntimeProvider的子类。

比如OutputFormatProvider(提供org.apache.flink.api.common.io.OutputFormat)和SinkFunctionProvider (提供 org.apache.flink.streaming.api.functions.sink.SinkFunction)都是SinkRuntimeProvider的子类。

4.6 Table Schema

4.6.1 概述

Table Schema定义表的每个列的名字和类型,类似于SQL create table语句那样,用来暴露给SQL查询。此外,还可以指定如何将列与表数据编码schema的字段进行映射。当输入列无序时,Tabel Schema可清晰地定义列名、顺序和来源。Table Schema会和Table Format匹配来在Table数据输入和输出的过程中完成Schema转换。

此外, Table Schema还可指定Time属性提取器。

4.6.2 例子

简单例子:

.withSchema(new Schema()// 必填。和数据源中列顺序一致来指定Flink数据表的列.field("MyField1", Types.INT).field("MyField2", Types.STRING).field("MyField3", Types.BOOLEAN)
)

复杂例子:

.withSchema(new Schema().field("MyField1", Types.SQL_TIMESTAMP)// 可选的,指定该列为processing-time.proctime()     .field("MyField2", Types.SQL_TIMESTAMP)// 可选的,指定该列为event-time(rowtime).rowtime(...)   .field("MyField3", Types.BOOLEAN)// 可选的,指定该列的原始来源列为mf3.from("mf3")

4.6.3 Rowtime

上述的.rowtime(...)本小节详细说下。

rowtime在flink里用来处理事件时间event-time。

采用Rowtime时,总是需要设置timestamp提取策略和watermark策略。

timestamp提取为rowtime例子如下:

.rowtime(new Rowtime()// 转换input中的某个LONG或SQL_TIMESTAMP类型的列为rowtime.timestampsFromField("ts_field")
).rowtime(new Rowtime()// 使用input数据中的timestamp属性来转为rowtime,需要数据源支持,如Kafka 0.10+版本.timestampsFromSource()
).rowtime(new Rowtime()// 为rowtime设置一个自定义的timestamp转换器// ,该转换器必须实现自 org.apache.flink.table.sources.tsextractors.TimestampExtractor.timestampsFromExtractor(...)
)

水位策略例子:

.rowtime(new Rowtime()// 为升序的rowtime设置watermark。// 发出截止目前观察到的最大timestamp-1的watermark。// timestamp等于最大timestamp的行不算迟到。.watermarksPeriodicAscending()
).rowtime(new Rowtime()// 为rowtime设置一个内嵌的watermark,该rowtime属性在有限的时间间隔内是乱序的// 发出截止目前观察到的最大timestamp减去指定延迟(毫秒)的watermark。.watermarksPeriodicBounded(2000)
).rowtime(new Rowtime()// 设置一个内置水印策略,该策略指示应从DataStream API中保留水印,从而保留数据源中分配的水印。.watermarksFromSource()
)

4.7 Table Formats

4.7.1 概述

一些外部数据系统支持不同的Table Formats,比如kafka或文件就支持其内存储的表的行使用CSV、JSON、Avro进行编码,所以需要指定Table Format来阐明外部数据源解析方式。

4.7.2 JSON Table Format

JSON格式允许读取和写入与给定的format schema相对应的JSON数据。format schema可用Flink type(SQl-like,映射到对应的SQL数据类型)、 JSON schema(适合复杂的嵌套数据结构)或目标表的schema(适合format schema等于table schema的场景,可自动派生出schema)来定义。

目前支持的JSON schema类型和Flink SQL类型如下:

Missing Field Handling: By default, a missing JSON field is set to null. You can enable strict JSON parsing that will cancel the source (and query) if a field is missing.

Make sure to add the JSON format as a dependency.

需要在项目中添加JSON依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version>
</dependency>

实例:

.withFormat(new Json()// 可选。当某个field缺失的时候,是否导致失败,默认false。.failOnMissingField(true) // 可选方式1。使用Flink数据类型定义,然后mapping映射解析为对应的type的类型信息来定义schema// flink的ROW对应JSON的object结构,String对应VARCHAR等.schema(Type.ROW(...))// 可选方式2。使用JSON schema来定义,可支持非常复杂和嵌套的数据结构.jsonSchema("{" +"  type: 'object'," +"  properties: {" +"    lon: {" +"      type: 'number'" +"    }," +"    rideTime: {" +"      type: 'string'," +// 指定时间格式"      format: 'date-time'" +"    }" +"  }" +"}")// 可选方式3。可直接使用表的schema来解析// 适用于Flink Table Schema和JSON Schema一致时// 此时只需要定义Table Schema,就能确定字段名称、类型、位置顺序等.deriveSchema()
)

4.8 Update Modes

4.8.1 概述

流式查询中,需要声明怎么执行动态表和外部Connector之间的数据交换,有以下模式:

  • Append Mode(增)
    仅交互INSERT操作数据

  • Retract Mode (增删改)
    交互INSERT(编码为ADD)和DELETE(编码为RETRACT)、UPDATE(对于修改前的行来说编码为RETRACT,对于修改后的新行来说编码为ADD)操作数据。

    与Upsert Mode相反,Retract Mode不能定义key。

    每个UPDATE操作由两条消息(RETRACT和ADD)组成,效率较低。

  • Upsert Mode(增删改)
    交互UPSERT(可编码INSERTUPDATE)和DELETE操作数据。

    该模式需要一个唯一的key(可能是组合的),用来传播update事件。具体来说,外部连接器需要了解该唯一key属性,才能正确处理消息。

    Upsert Mode和Retract Mode都支持增删改,但不同是,Upsert Mode中的UPDATE事件使用单条UPSERT消息进行编码,而Retract Mode中的UPDATE由两条消息(RETRACT和ADD)组成,因此本模式效率更高。

4.8.2 例子

.connect(...).inAppendMode()    // otherwise: inUpsertMode() or inRetractMode()

每个connector支持哪些update mode,请参阅具体connector文档。

4.9 更多例子

  • Flink官方-table_api
    展示了如何实现一个对batch source的简单的Table API 查询,以及如何拓展为streaming source的持续查询。
  • Flink学习5-使用rowtime且分窗,Connector读取Kafka写入MySQL例子
  • Apache Flink 各类关键数据格式读取/SQL支持
  • 如何在 Flink 1.9 中使用 Hive?
  • Flink SQL 解析复杂(嵌套)JSON

5 Connector

5.1 File System Connector

请点击File System Connector

5.2 Kafka Connector

5.2.1 概述

Kafka Connector使得Flink可从Kafka中消费、写入数据。

  • 依赖
    Kafka 0.11以上版本后可采用统一的flink-sql-connector-kafka_2.11-1.11.0.jar

  • 关于Flink分区和Kafka分区关系
    可通过connector参数sink.partitioner指定:

    • fixed
      默认情况下,KafkaSink最多可以写入与其自身并行性(parallelism)一样多的Kafka分区,即每个并行的KafkaSink实例都固定写入一个Kafka分区。 如果实例数大于Kafka分区数,则空闲。

    即Flink生产者发送消息到Kafka时默认用的FlinkFixedPartitioner,如下图

    这种方法有个明显的问题,即partition数大于task数时,多出来的partition不会有数据写入。partition扩容时,也需要重启Flinik程序。

    • round-robin
      循环分区器对于避免不平衡分区很有用, 但是,这将导致所有Flink实例与所有Kafka Broker节点之间的大量网络连接。
    • 自定义
      为了将写操作分配到更多分区或自定义每行数据到分区的路由,可以提供自定义接收器分区程序,继承FlinkKafkaPartitioner
  • 一致性保证
    默认情况下,如果在启用检查点的时执行Flink,则KafkaSink会将具有至少一次(at least once)保证的数据提取到Kafka中。

    当使用Checkpoint时,提供exactly-once精准一次语义,具体原理是两阶段提交。

    除了使用Checkpoint,还可以通过设置参数sink.semantic来调节语义:

    • at-least-once
      默认值。保证至少一次,可能重复
    • none
      没有任何保障,可能多或少
    • exactly-once
      使用Kafka生产者事务来保证。注意此时必须同时设定Kafka Consumer事务隔离等级isolation.levelread_committed才会 读不到未提交的事务数据
  • Kafka 0.10+的Timestamp属性
    Kafka0.10开始,数据就带了一个timestamp作为元数据的一部分,该字段含义是数据写入Kafka的时间。该字段可用作Flink rowtime,请参考Java/Scala的timestampsFromSource方法。

  • Kafka 0.11+版本
    因为Flink1.7开始,Kafka Connector的定义就应该是独立于硬编码的Kafka version了,所以使用.version("universal")作为Kafka0.11开始的所有版本Kafka的通配符。

  • Commit offset 策略

    • 不使用Checkpoint时,由enable.auto.commit, auto.commit.interval.ms一起决定 commit 行为,每隔一段时间向 kafka commit 一次 offset。
    • 使用Checkpoint时,kafka consumer 将 offset commit 到 checkpoint state 中。
  • Consumer 分配策略

    如上图,不同 topic 之间的 startIndex 是随机的,故解决了多 topic 负载均衡问题。

    注意,Consumer 无法感应获取 Kafka topic 新增的Partition!也就是说,某topic扩容Partition后必须重启Flink程序。

  • Changelog Source
    Flink支持将Kafka作为Changelog Source

    只要使用CDC工具从其他数据库捕获变动event放入Kafka,则可以使用cdc format来解释kafka中数据,转为INSERT/UPDATE/DELETE来放入Flink SQL系统中。目前支持debezium-json 和 canal-json。

  • 数据类型映射
    Kafka按format将数据序列化为字节,没有数据类型和schema。反序列化也是根据format.

  • 元数据
    可直接通过SQL获取Kafka元数据,应用到定义的列中

    以上R表示可读,W表示可写。只读的字段必须在定义时加上VIRTUAL,以在INSERT INTO时排除这些字段。

5.2.2 CDC - Changelog Source

如果Kafka数据来源于使用CDC工具从其他数据库捕获的,则你可以使用CDC format来将数据解析为Flink SQL中的INSERT/UPDATE/DELETE,然后继续处理,如同步数据到其他目的地等。

目前支持CDC格式如下:

  • debezium-json
  • canal-json

相关文档:

  • 阿里伍翀讲解CDC
  • Flink SQL CDC 上线!我们总结了 13 条生产实践经验

5.2.3 配置

配置项 Required Default Type 描述
connector required (none) String ‘kafka’, ‘kafka-0.11’, ‘kafka-0.10’.
topic required none) String Topic name from which the table is read.
properties.bootstrap.servers required (none) String Kafka brokers,逗号分隔.
properties.group.id required by source (none) String Kafka source consumer id
optional for Kafka sink.
format required (none) String deserialize/serialize Kafka messages的format. 可填 ‘csv’, ‘json’, ‘avro’, ‘debezium-json’ , ‘canal-json’.
scan.startup.mode optional group-offsets String Kafka consumer Startup mode, valid values:
‘earliest-offset’
‘latest-offset’
‘group-offsets’(从zk/kafka里保存的指定consumer group的offset开始消费)
‘timestamp’(从用户定义的scan.startup.timestamp-millis中时间戳开始消费)
‘specific-offsets’(从用户定义的scan.startup.specific-offsets开始消费)
scan.startup.specific-offsets optional (none) String ‘specific-offsets’ startup mode时指定每个分区开始消费的offset, 如’partition:0,offset:42;partition:1,offset:300’.
scan.startup.timestamp-millis optional (none) Long ‘timestamp’ startup mode时指定消费起始毫秒级时间戳
sink.partitioner optional (none) String Flink’s partitions 映射到输出的 Kafka’s partitions方式. 合法值如下:
fixed: 每个 Flink partition 最多输入一个Kafka partition
round-robin: 每个Flink partition轮询方式映射到所有Kafka partitions
自定义 FlinkKafkaPartitioner subclass: 如 ‘org.mycompany.MyPartitioner’.

5.2.4 例子

  • 1.11.0之前的例子,比较复杂:

    .connect(new Kafka()// 必填。版本号("0.8", "0.9", "0.10", "0.11", "universal").version("0.11")   // 必填。订阅的topic.topic("student_info")  // Kafka连接属性.property("zookeeper.connect", "localhost:2181").property("bootstrap.servers", "localhost:9092").property("group.id", "testGroup")// 可选如下。指定无法找到group对应的offset时,从哪儿开始消费。.startFromEarliest().startFromLatest().startFromSpecificOffsets(...)// 当Flink分区往kafka分区写入数据时需要配置。// 默认。每个Flink分区最多被分配到一个Kafka分区.sinkPartitionerFixed()// 一个Flink分区会以轮询方式发送到Kafka各个分区.sinkPartitionerRoundRobin()    // 自定义FlinkKafkaPartitioner.sinkPartitionerCustom(MyCustom.class)
    )
    
  • 1.11.0的例子,简化了很多:
    CREATE TABLE kafka_source_table (user_id BIGINT,item_id BIGINT,category_id BIGINT,behavior STRING,message ROW<log_create_time BIGINT>,ts_field AS TO_TIMESTAMP(FROM_UNIXTIME(message.log_create_time)),
    ) WITH ('connector' = 'kafka','topic' = 'user_behavior','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','format' = 'json','scan.startup.mode' = 'group-offsets'
    )
    
  • 1.12.0的例子,支持获取Kafka元数据
    CREATE TABLE KafkaTable (`event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format`origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL,  -- from Kafka connector`offset` BIGINT METADATA VIRTUAL,  -- from Kafka connector`user_id` BIGINT,`item_id` BIGINT,`behavior` STRING
    ) WITH ('connector' = 'kafka','topic' = 'user_behavior','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','value.format' = 'debezium-json'
    );
    

5.3 Upsert Kafka

Flink 1.12以前版本仅支持append only元素写入,而当我们产生聚合元素的时候无法使用kafka sink.

Flink 1.12特地新增Upsert Kafka SQL Connector解决此问题。

作为Source时:

  • 生产changelog

    • 有value的表示UPDATE
    • value为null的表示DELETE
  • 作为Sink时,可消费changelog
    • INSERT/UPDATE_AFTER作为普通消息输出到Kafka
    • DELETE会作为null值数据以示这是该key对应数据的墓碑
    • 由于Flink按照用户定义的主键列值进行消息分区,可保证该分区数据有序,同Key数据也会发到相同分区

官方示例:

CREATE TABLE pageviews_per_region (user_region STRING,pv BIGINT,uv BIGINT,PRIMARY KEY (region) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'pageviews_per_region','properties.bootstrap.servers' = '...','key.format' = 'avro','value.format' = 'avro'
);CREATE TABLE pageviews (user_id BIGINT,page_id BIGINT,viewtime TIMESTAMP,user_region STRING,WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH ('connector' = 'kafka','topic' = 'pageviews','properties.bootstrap.servers' = '...','format' = 'json'
);-- calculate the pv, uv and insert into the upsert-kafka sink
INSERT INTO pageviews_per_region
SELECTuser_region,COUNT(*),COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;

5.4 Elasticsearch Connector

5.5 JDBC Connector

5.5.1 概述


可以通过 JDBC Connector读写JDBC数据源。

目前支持Mysql、PostgreSQL、Derby等,需要下载对应Jar包放入$FLINK_HOME/lib

需要添加的依赖为

  • flink-connector-jdbc_2.11-1.11.0.jar
  • 特定的数据库所需的driver
    比如mysql-connector-java-5.1.47.jar

更多配置及数据库和Flink的类型映射请参考官网。

5.5.2 Streaming Mode

在流场景可支持

  • Append(适用于只交互INSERT信息场景)
    DDL不提供主键时
  • Upsert(使用唯一key,适用于交互增删改信息场景)模式。
    DDL提供主键时

Flink自动从流式查询中提取有效key。 例如,查询SELECT a, b, c FROM t GROUP BY a, b定义了字段a和b的组合键。

如果将JDBC表用作upsert sink,请确保查询的key是数据库的唯一键之一或是主键。 这样才可以保证输出结果符合预期。

5.5.3 Temporary Join与查询缓存

JDBC Connector可以被视为lookup source来做临时join。

当前,只支持同步模式的`lookup source。

可使用loopup source cache来提高临时join的性能(默认不开启缓存),具体原理是每个TaskManager先查询本节点持有的缓存而不是直接查询数据库本身,当目标不在缓存时再请求远程数据库,并将返回的结果更新到缓存。

但务必注意,使用缓存时数据可能不是最新,这需要使用者进行权衡

loopup source cache相关选项有:

  • lookup.cache.max-rows
    lookup source cache缓存的最大行数,超过时会删除最旧的缓存行。

    注意设置了本项就必须同时设置lookup.cache.ttl

  • lookup.cache.ttl
    缓存内每行的最大有效时间,超过就删除最旧的行

    设置太小会导致数据库请求负载过大,所以需要权衡性能和时效性。一般可以用在缓慢变换的维表中。

    填写格式为Time interval unit label recognized units: DAYS: (d | day | days), HOURS: (h | hour | hours), MINUTES: (min | minute | minutes), SECONDS: (s | sec | secs | second | seconds), MILLISECONDS: (ms | milli | millis | millisecond | milliseconds), MICROSECONDS: (µs | micro | micros | microsecond | microseconds), NANOSECONDS: (ns | nano | nanos | nanosecond | nanoseconds)

  • connector.lookup.max-retries
    可选的,当查询数据库表失败时的最大重试次数。

当数据作为维表时,如果使用缓存,需要注意:

  • 指定缓存后Join维表时拿到的可能是过期数据,其实已经被改变。但设置缓存过期时间太短会造成频繁查库增加性能开销。而将缓存条数设置过大可能打爆内存(可以适当调大内存,但不是根本方式)。所以,缓存的大小和时间配置需要权衡和实验。不过通常来说,维表中的数据应该缓慢变化的。
  • Flink Sql 1.9 原生支持的维表关联只支持同步模式,如果需要异步模式或者想用其他的第三方存储只能够自己去实现。

5.5.4 数据写入缓存与Flush

Flink JDBC Connector会将数据先放入内存,到达触发条件时再flush到jdbc数据库中。相关配置如下:

  • sink.buffer-flush.interval
    默认1秒,可选的。当flush间隔时间超过此值时,会有一个异步线程将数据刷入数据库表。

    可设为0禁用。

    可将sink.buffer-flush.max-rows设为0 ,本值设为大于0的数,以实现完全异步的缓冲处理。

  • sink.buffer-flush.max-rows
    默认100行,可选的。当数据条数(包括增、删、改)累积到该值时,才会将数据输入数据库表.

    可设为0禁用。

  • sink.max-retries
    默认3,可选的。当某批数据写入数据表失败时的最大重试次数。

5.5.5 主键

1.11可使用DDL来定义主键了,定义后采用UPSERT流模式,否则为APPEND模式。

  • 采用UPSERT流模式时,是根据主键来插入或更新记录,由此来保证幂等性。

    推荐将设置主键,且需要确保主键是该数据表的唯一键或主键。

  • APPEND模式下,所有数据都作为INSERT消息处理,当底层数据库表的主键或唯一键冲突时会导致插入失败。

5.5.6 幂等写入

JDBCSink在DDL中定义了主键列的情况,会使用UPSERT语义而不是INSERT语义。

UPSERT语义可使得当底层数据库存在唯一键约束时,原子性得插入一行或更新已有行,提供了幂等性。

应用场景:

  • 这样一来,即使发生失败需要从最后成功的Checkpoint重启导致重复处理数据,也能保证幂等,不会出现数据重复。
  • 数据源头生成相同主键数据,需要更新

没有标准upsert语法,根据底层数据库不同而不同,比如MySQL语法如下:

INSERT .. ON DUPLICATE KEY UPDATE ..

5.5.7 Partitioned Scan

目的是加速从JDBC数据表读取数据,手段是使用并行Source task多实例。

以下任一scan partition选项被指定,则所有scan partition选项都必须被指定。作用是在并行读取数据时指定将表分区的方式。以下这些选项是当从表读数据时使用:

  • scan.partition.column
    线程读取表时分区的列,只能是数值、日期或timestamp
  • scan.partition.num
    partitions数,决定了多线程并行读取表时指定怎么对表分区.
  • scan.partition.lower-bound
    用于确定分区步幅下界(不是用于过滤表中的行)
  • scan.partition.upper-bound
    用于确定分区步幅上界

5.5.8 例子

  • 1.11以前

    CREATE TABLE MyUserTable (...
    ) WITH ('connector.type' = 'jdbc', -- 必填: 指定动态表类型为jdbc'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- 必填: JDBC DB url'connector.table' = 'jdbc_table_name',  -- 必填: jdbc table name-- 可选的: JDBC driver ,不填时从url中派生'connector.driver' = 'com.mysql.jdbc.Driver',-- 可选的: jdbc user name and password'connector.username' = 'name','connector.password' = 'password',-- **以下这些选项是当从表读数据时使用,是一些可选的浏览选项**-- 注意,只要以下任何一项值被填写,那就需要填写所有选项值'connector.read.partition.column' = 'column_name', -- optional: 多线程读取表时分区的列,只能是数值、日期或timestamp.'connector.read.partition.num' = '50', -- optional: partitions数,多线程并行读取表时指定怎么对表分区.'connector.read.partition.lower-bound' = '500', -- optional: 用于确定分区步幅下界(不是用于过滤表中的行)'connector.read.partition.upper-bound' = '1000', -- optional: 用于确定分区步幅上界-- optional, 默认0,给读取者一个关于每轮读取数据行数的提示,如果设为0则提示被忽略'connector.read.fetch-size' = '100',-- **以下为temporary join时的lookup选项**'connector.lookup.cache.max-rows' = '5000', -- optional'connector.lookup.cache.ttl' = '10s', -- optional'connector.lookup.max-retries' = '3', -- optional-- **以下为写入jdbc表时使用的sink选项**'connector.write.flush.max-rows' = '5000', -- optional'connector.write.flush.interval' = '2s', -- optional'connector.write.max-retries' = '3' -- optional
    )
    
  • 1.11
    -- register a MySQL table 'users' in Flink SQL
    CREATE TABLE MyUserTable (id BIGINT,name STRING,age INT,status BOOLEAN,PRIMARY KEY (id) NOT ENFORCED
    ) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/mydatabase','table-name' = 'users'
    );-- write data into the JDBC table from the other table "T"
    INSERT INTO MyUserTable
    SELECT id, name, age, status FROM T;-- scan data from the JDBC table
    SELECT id, name, age, status FROM MyUserTable;-- temporal join the JDBC table as a dimension table
    SELECT * FROM myTopic
    LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
    ON myTopic.key = MyUserTable.id;
    

5.6 Hive

请点击HiveConnector

5.7 自定义Source Sink

5.7.1 概述

请参考User-defined Sources & Sinks

5.7.2 自定义Source

5.7.2.1 ScanTableSource

5.7.2.1.1 概述

ChangeLog,Source在batch场景发送有界的、insert-only流;Streaming场景发送无界的、insert-only流;CDC(Change Data Capture)场景发送无界/有界的、可包含增删改的记录的流。

从外部数据中扫描整张表的所有行。可包含增删改的行,所以可被用来读取changelog。

getChangelogMode返回ChangelogMode,表示运行时可能返回的Changelog类型集合。

如果Source有一些额外的能力,可以实现org.apache.flink.table.connector.source.abilities包内的接口,如SupportsProjectionPushDown投影下推。

ScanTableSource生产的记录必须是org.apache.flink.table.data.RowData,用来在Flink内部传递

其中定义了一个重要的方法getScanRuntimeProvider,需要得到一个ScanRuntimeProvider,可通过InputFormatProvider.of(InputFormat<RowData, ?> inputFormat)得到,如HBaseDynamicTableSource

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {return InputFormatProvider.of(new HBaseRowDataInputFormat(conf, tableName, hbaseSchema, nullStringLiteral));
}

也可通过SourceFunctionProvider of(SourceFunction<RowData> sourceFunction, boolean isBounded)得到,如KafkaDynamicSourceBase

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {DeserializationSchema<RowData> deserializationSchema =this.decodingFormat.createRuntimeDecoder(runtimeProviderContext, this.outputDataType);// Version-specific Kafka consumerFlinkKafkaConsumerBase<RowData> kafkaConsumer =getKafkaConsumer(topic, properties, deserializationSchema);return SourceFunctionProvider.of(kafkaConsumer, false);
}
5.7.2.1.2 HBaseDynamicTableSource
public class HBaseRowDataInputFormat extends AbstractTableInputFormat<RowData> {public HBaseRowDataInputFormat(org.apache.hadoop.conf.Configuration conf,String tableName,HBaseTableSchema schema,String nullStringLiteral) {super(conf);this.tableName = tableName;this.schema = schema;this.nullStringLiteral = nullStringLiteral;}
}
abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, TableInputSplit> {public AbstractTableInputFormat(org.apache.hadoop.conf.Configuration hConf) {serializedConfig = HBaseConfigurationUtil.serializeConfiguration(hConf);}
}
  1. HBaseRowDataInputFormat#configure
    创建了serde、Connection、HTable、Scan(此时无startRow和stopRow)
  2. createInputSplits
    获取该HTable的每个Region Start和End Key,为每个Region创建一个TableInputSplit(包含spilitId, hosts, tableName(), splitStart, splitStop),构成TableInputSplit[]
  3. 每个并行的input task创建一个实例,并调用open(T split)方法为该并行实例负责的spilit初始化。
    // set scan range
    currentRow = split.getStartRow();
    scan.setStartRow(currentRow);
    scan.setStopRow(split.getEndRow());
    // 准备scanner进行scan
    resultScanner = table.getScanner(scan);
    // reachedEnd方法会通过该变量判断是否input读取完成
    endReached = false;
    scannedRows = 0;
    
  4. 读取数据
    // 这里T就是RowData
    public T nextRecord(T reuse) throws IOException {Result res;try {// scan出的一条数据res = resultScanner.next();} catch (Exception e) {resultScanner.close();//workaround for timeout on scanLOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e);// 从当前已读位置的下一个位置继续读scan.withStartRow(currentRow, false);resultScanner = table.getScanner(scan);res = resultScanner.next();}if (res != null) {scannedRows++;// 记录已读位置currentRow = res.getRow();// 组装为RowData返回return mapResultToOutType(res);}// 没读到数据时就说明已经读取完inputendReached = true;return null;
    }
    
  5. close
    reachedEnd后说明 一个input split读取完成后调用本方法,被用来关闭channel和流,释放资源。

    currentRow = null;
    try {if (resultScanner != null) {resultScanner.close();}
    } finally {resultScanner = null;
    }
    
  6. 可能会再次open来读取下一个split

5.7.3 自定义Sink

5.7.4 自定义TableFactory

5.8 DataGen

5.8.1 概述

DataGen是一个调试用的Connector,可以配合使用Computed Column syntax来生成简单类型数据。

5.8.2 数据生成器

目前有两类:

  • RandomGenerator
    默认选项,是一个无界数据生成器。

    可指定随机值的最大和最小区间。

    对于字符串类型数据(char/varchar/string),可指定长度。

  • SequenceGenerator
    是一个有界数据生成器。

    可指定序列的开始和结束值,当Sequence抵达结束值时,读取结束。

5.8.3 配置

转自官网:

5.8.4 例子

CREATE TABLE datagen (f_sequence INT,f_random INT,f_random_str STRING,-- 以当前本地时区获取时间戳ts AS localtimestamp,WATERMARK FOR ts AS ts
) WITH ('connector' = 'datagen',-- optional options ---- task每秒生成5条数据'rows-per-second'='5',-- f_sequence字段使用SequenceGenerator,从1到100顺序生成'fields.f_sequence.kind'='sequence','fields.f_sequence.start'='1','fields.f_sequence.end'='1000',-- f_random字段使用RandomGenerator,从1到1000随机生成'fields.f_random.min'='1','fields.f_random.max'='1000',-- f_random_str使用RandomGenerator,指定字符串长度为10'fields.f_random_str.length'='10'
)

5.9 HBase Connector

6 Function

参考Functions

6.1 内置函数

6.2 UDF

7 通用配置

参考:Configuration

7.1 checkpoint

pipeline.time-characteristic EventTime
execution.checkpointing.interval 120000
execution.checkpointing.min-pause 60000
execution.checkpointing.timeout 60000
execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION
execution.savepoint.path hdfs:/tmp/flink-checkpoints/xxx/chk-yyy

8 调优

8.1 概述

目前官网只有流式聚合调优一项。

8.2 流式聚合调优调优

参考Streaming Aggregation

8.2.1 概述

Flink SQL允许用户直接用SQL来定义效率高的流式分析程序,已经做了大量工作来做查询优化和算子调优,但有一些选项需要手动开启进行调优。

注意:

  • 当前这里的调优只支持Blink Planner。
  • 当前流式聚合调优只支持无界聚合,未来会支持对窗口聚合的调优。

默认状况下,无界聚合算子逐条处理输入记录:

  1. 从状态读取累加器accumulator
  2. 从累加器中累加/撤回记录
  3. 将累加器写回位于StateBackend的状态中
  4. 下一条记录又从步骤1到3进行处理

通过了解无界聚合处理步骤,可以看到这样的处理模式会增加StateBackend(尤其是硬盘上的RocksDBStatebackend)的开销。此外,数据倾斜在生产中非常常见,更会加重此问题,使得job容易发生反压。

8.2.2 MiniBatch聚合

8.2.2.1 概述

MiniBatch聚合核心思想是在聚合算子内部缓冲区缓存一组输入的数据,当触发计算时,同一个key的所有记录只需要一个操作即可访问状态了。

8.2.2.2 特点

  • 这和传统无界聚合方法有很大不同,这样做能很大程度减少状态开销,提升吞吐。
  • 但同时也会因为需要缓存输入到聚合算自内部而带来额外开销和延时,而不是立刻处理每条输入记录,其实就是吞吐和延时之间的一个折中

8.2.2.3 配置

MiniBatch聚合默认关闭。

要开启,需要配置:

  • table.exec.mini-batch.enabled
    是否允许MiniBatch聚合。一旦开启,必须配置一下两个选项。
  • table.exec.mini-batch.allow-latency
    最大允许的MiniBatch聚合延迟时间
  • table.exec.mini-batch.size
    最大允许的MiniBatch聚合缓存数据大小,级别是每个聚合算子task

配置方法:

// instantiate table environment
val tEnv: TableEnvironment = ...// access flink configuration
val configuration = tEnv.getConfig().getConfiguration()
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization
configuration.setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records
configuration.setString("table.exec.mini-batch.size", "5000") // the maximum number of records can be buffered by each aggregate operator task

8.2.3 LocalGlobalBatch聚合

8.2.3.1 概述

LocalGlobalBatch聚合主要目的是解决数据倾斜,方法是将group by聚合拆解为两个阶段:

  1. 先在上游做本地聚合
  2. 在下游做全局聚合

这个思路类似于MR中的MapCombine + Reduce Combine两个阶段。

比如以下SQL

SELECT color, sum(id)
FROM T
GROUP BY color

有可能上游color字段数据已经倾斜,因此下游的某几个聚合算子实例必须处理比其他实例更多的数据,这就造成了数据热点,即数据倾斜。

此时就可使用 LocalGlobalBatch聚合,在上游算子将同key数据进行本地聚合,放入同一个累加器,只将聚合后的累加器发送到下游聚合算子,而不是原始的所有记录,这样大大减少网络shuffle时的开销以及状态访问开销。

每次本地聚合时累加的输入记录数目取决于mini-batch interval,也就是说LocalGlobalBatch聚合前提是mini-batch优化必须已经开启。

8.2.3.2 特点

大大减少网络shuffle时的开销以及状态访问开销

8.2.3.3 配置

LocalGlobalBatch聚合默认关闭。

要开启,需要配置:

  • table.exec.mini-batch.enabled
    是否允许MiniBatch聚合。一旦开启,必须配置一下两个选项。

  • table.optimizer.agg-phase-strategy
    TWO_PHASE

    默认AUTO(根据cost来决定采用两阶段还是一阶聚段合),还可填TWO_PHASE(两阶段聚合)、ONE_PHASE(一阶段聚合)

配置方法:

// instantiate table environment
val tEnv: TableEnvironment = ...// access flink configuration
val configuration = tEnv.getConfig().getConfiguration()
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true") // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "5 s")
configuration.setString("table.exec.mini-batch.size", "5000")
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable two-phase, i.e. local-global aggregation

8.2.4 拆分DistinctBatch聚合

8.2.4.1 概述

LocalGlobalBatch聚合优化可在常规聚合场景有效消除数据倾斜,如SUM, COUNT, MAX, MIN, AVG,但不适合处理DISTINCT聚合!

比如计算每天的用户UV:

SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day

当distinct key(比如这里的user_id,因为distinct的是user_id,而不是day)稀疏时,COUNT DISTINCT一般不能有效聚合而减少记录数,即使开启了LocalGlobalBatch聚合提升也不大。因为key稀疏场景下在COUNT DISTINCT后累加器依然持有大部分原始记录(比如这里在本地聚合后,只是得到了去重后的user_id而已),所以全局聚合阶段依然存在瓶颈。比如以上例子中,大多数负载高的累加器由一个task处理,也就是属于同一天的被一个task处理。

针对COUNT DISTINCT以上问题,就有了拆分DistinctBatch聚合优化思想,将COUNT DISTINCT拆分为两个层级:

  1. 首次聚合先通过group key和额外自定义的bucket key进行shuffle,并进行聚合计算。
    bucket key是通过HASH_CODE(distinct_key) % BUCKET_NUM计算,BUCKET_NUM默认1024,可通过table.optimizer.distinct-agg.split.bucket-num调整。
  2. 第二次聚合通过原始group key进行shuffle,并使用SUM聚合来自不同 bucket 的 COUNT DISTINCT

这个过程如下图,这里不同颜色表示热点group key,类似我们例子的day;字母表示user_id:左图为本地全局聚合,右图为拆分DistinctBatch聚合:

可以看到:

  • 开启本地全局聚合计算COUNT DISTINCT时,本地聚合阶段其实并没有减少太多数据量,导致传输到右上聚合算子时的数据过多,造成数据热点和倾斜,成为瓶颈。

  • 开启拆分DistinctBatch聚合计算COUNT DISTINCT时,首先根据group key即day和distinct key即user_id计算hash并对bucket取余后得到BucketKey联合进行shuffle,这个时候就已经将热点数据打散到各个第一阶段聚合算子上了。这一阶段就会计算COUNT DISTINCT,输出就是唯一值的数量;

    随后根据group key即day进行第二阶段聚合,此时由于第一阶段已经将相同group by数据聚合计算,所以第二阶段做的工作就少了很多了。此时只需要将第一阶段的唯一值SUM求和即可。

相同distinct key的行会在同一个Bucket内计算,所以能这样分阶段聚合。第一阶段聚合的意义是分担热点的group key,比如以上例子时可将最新一天的数据shuffle到不同task计算,有效解决 COUNT DISTINCT 时的数据倾斜和数据热点问题。

在拆分DistinctBatch聚合优化后,以上查询会被自动重写:

SELECT day, SUM(cnt)
FROM (SELECT day, COUNT(DISTINCT user_id) as cntFROM TGROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day

以上为做简单的例子,但其实Flink支持拆分更复杂的聚合查询,比如,多个具有不同 distinct key (例如 COUNT(DISTINCT a), SUM(DISTINCT b) )的 distinct 聚合,可以与其他非 distinct 聚合(例如 SUM、MAX、MIN、COUNT )配合使用。

需要注意的是,目前聚合拆分优化不支持包含用户定义的 AggregateFunction 的聚合运算。

8.2.4.2 特点

适用于distinct聚合时出现数据倾斜的场景。

8.2.4.3 配置

配置方法:

// instantiate table environment
val tEnv: TableEnvironment = ...tEnv.getConfig         // access high-level configuration.getConfiguration    // set low-level key-value options.setString("table.optimizer.distinct-agg.split.enabled", "true")  // enable distinct agg split

相关选项

  • table.optimizer.distinct-agg.split.enabled
    默认false,是否开启拆分distinct聚合
  • table.optimizer.distinct-agg.split.bucket-num
    默认1024,开启聚合拆分后的Bucket数量

8.2.5 在Distinct聚合上使用Filter修饰符替换CASE WHEN

有时用户可能需要从不同维度计算 UV 的数量,例如来自 Android 的 UV、iPhone 的 UV、Web 的 UV 和总 UV。一般会选择使用 CASE WHEN,例如:

SELECTday,COUNT(DISTINCT user_id) AS total_uv,COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day

在这种情况下,Flink官方建议使用 FILTER 语法而不是 CASE WHEN,因为 FILTER 更符合 SQL 标准,并且能获得更多的性能提升。

FILTER 是用于限制聚合中使用的value的聚合函数修饰符。

将上面的示例替换为 FILTER后如下所示:

SELECTday,COUNT(DISTINCT user_id) AS total_uv,COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day

Flink SQL 优化器可以识别在相同 distinct key上的不同Filter参数。例如,在上面的示例中,这三个 COUNT DISTINCT 都针对 user_id列。随后,Flink 可以只使用一个共享的State实例而不是三个State实例,这样可以减少State访问开销和State大小。在某些工作负载下,这样做可获得显著的性能提升。

8.3 Query Configuration

8.3.1 概述

有时需要限制State的大小,避免在消费无界流时状态无限增大。需要根据时间语义、查询本身场景业务要求、对计算结果精度影响来综合评估。

主要就是通过TableConfig来设定相关运行参数:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)// obtain query configuration from TableEnvironment
val tConfig: TableConfig = tableEnv.getConfig
// set query parameters
tConfig.setIdleStateRetentionTime(Time.hours(12), Time.hours(24))

8.3.2 空闲状态保留时长控制

8.3.2.1 背景

Flink需要为那些在一个或多个key上进行聚合或Join查询搜集数据或维护每个key的部分结果。随着更多唯一key流入,持续查询累积的State会不断增大。然而,通常经过一段时间后,一些key就不再活跃,对应的状态也随之变得无用了。

如现有一个统计每个session的点击事件:

SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;

此时Flink就会为每个GROUP BY KEY sessionId维护一个count值。

这种场景下,在客户端session结束后sessionId就变得不再活跃了。但Flink的sessionId State不能感知到并自动清理,会认为这些sessionId将来还会出现并实现。这样就导致该查询程序的整个State大小不断增加。

此时可配置Idle State Retention Time参数,他定义了每个key的状态在最后一次被修改后的保留时长。比如可谓上例的sessionId配置一个过期时间。

8.3.2.2 注意事项

但需要注意,如果一个key的状态被移除,则该持续查询程序会彻底遗忘是否见过该key。如果该key后面再次出现,则会被认为是首次出现!比如上例中,假设sessionId abc的count状态被移除,但一段时间后该abc的session再次出现,则会被给与一个初始化的count = 0 的状态!这可导致之前的结果被覆盖!

8.3.2.3 配置

val tConfig: TableConfig = ???// set idle state retention time: min = 12 hours, max = 24 hours
tConfig.setIdleStateRetentionTime(Time.hours(12), Time.hours(24))
  • min
    表示不活跃的空闲状态的最小保留时间
  • max
    表示不活跃的空闲状态的最大保留时间

注意,他们之间的差值至少为5分钟。如果两值都设为0,表示永不清理状态。

这两个值可保证,不活跃的空闲状态至少保留min,但也绝不会超过max

8.3.2.4 源码分析

FlinkSql 关于State的相关函数有一个重要的基类KeyedProcessFunction,内部定义了两个重要的方法

  • public abstract void processElement(I value, Context ctx, Collector out) throws Exception;
    处理每条输入元素
  • public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {}
    当使用TimerService注册的Timer触发时调用

我们看一个用于表的group by聚合的类GroupTableAggFunction,它继承自KeyedProcessFunctionWithCleanupState,同是也是KeyedProcessFunction的子孙类:

// 初始调用
@Override
public void open(Configuration parameters) throws Exception {...RowDataTypeInfo accTypeInfo = new RowDataTypeInfo(accTypes);ValueStateDescriptor<RowData> accDesc = new ValueStateDescriptor<>("accState", accTypeInfo);accState = getRuntimeContext().getState(accDesc);initCleanupTimeState("GroupTableAggregateCleanupTime");
}// KeyedProcessFunctionWithCleanupState#initCleanupTimeState
protected void initCleanupTimeState(String stateName) {// this.stateCleaningEnabled = minRetentionTime > 1if (stateCleaningEnabled) {ValueStateDescriptor<Long> inputCntDescriptor = new ValueStateDescriptor<>(stateName, Types.LONG);// 利用状态,创建cleanupTimeStatecleanupTimeState = getRuntimeContext().getState(inputCntDescriptor);}
}

处理每条记录:

@Override
public void processElement(RowData input, Context ctx, Collector<RowData> out) throws Exception {// 当前系统处理时间long currentTime = ctx.timerService().currentProcessingTime();// 注册 state-cleanup timerregisterProcessingCleanupTimer(ctx, currentTime);// 当前元素的keyRowData currentKey = ctx.getCurrentKey();boolean firstRow;// 拿出状态中的累加器RowData accumulators = accState.value();// 如果累加器为空,就创建一个if (null == accumulators) {firstRow = true;accumulators = function.createAccumulators();} else {firstRow = false;}// set accumulators to handler firstfunction.setAccumulators(accumulators);// 如果需要更新旧值,就更新if (!firstRow && generateUpdateBefore) {function.emitValue(out, currentKey, true);}// update aggregate result and set to the newRowif (isAccumulateMsg(input)) {// INSERT或UPDATE_AFTER// accumulate input to the accumulators.function.accumulate(input);} else {// retract input from the accumulators.function.retract(input);}// get accumulatoraccumulators = function.getAccumulators();if (!recordCounter.recordCountIsZero(accumulators)) {// 累加器有内容// 发送聚合结果function.emitValue(out, currentKey, false);// 更新状态中的累加器accState.update(accumulators);} else {// and clear all stateaccState.clear();// cleanup dataview under current keyfunction.cleanup();}
}

KeyedProcessFunctionWithCleanupState#registerProcessingCleanupTimer

protected void registerProcessingCleanupTimer(Context ctx, long currentTime) throws Exception {if (stateCleaningEnabled) {registerProcessingCleanupTimer(cleanupTimeState,// 当前系统处理时间currentTime,minRetentionTime,maxRetentionTime,ctx.timerService());}
}

CleanupState#registerProcessingCleanupTimer
其实这是一个接口,定义了default方法

default void registerProcessingCleanupTimer(ValueState<Long> cleanupTimeState,long currentTime,long minRetentionTime,long maxRetentionTime,TimerService timerService) throws Exception {// last registered timerLong curCleanupTime = cleanupTimeState.value();// check if a cleanup timer is registered and// that the current cleanup timer won't delete state we need to keep// 首次进来肯定是null// 假设 minRetentionTime = 12小时,maxRetentionTime=24小时// 第一次该元素A来的时间为20200827 00:00,则curCleanupTime=20200828 00:00// A 第二次出现是20200827 11:00,则 12:00+12小时 = 20200828 00:00 不大于 curCleanupTime,此时还是什么都不做。也就是说如果在20200828 00:00前 A不再出现,则会在0点被清理,虽然在12小时前出现过一次,这也就是最小保留时间的含义!// A 第三次出现是20200827 13:00,则 13:00+12小时 = 20200828 01:00 大于 curCleanupTime,此时就会更新curCleanupTime = 20200827 13:00+24小时= 20200828 13:00// 如果 A第四次出现是 20200829 14:00,此时已经距离上次出现超过24小时,则已经被清理,会被当做首次出现,覆盖以前的结果if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {// 触发时间为当前时间+state最大空闲时间long cleanupTime = currentTime + maxRetentionTime;// 以新的触发时间注册一个新的cleanupTime timertimerService.registerProcessingTimeTimer(cleanupTime);if (curCleanupTime != null) {// 说明此时为更新timer,需要删除过期的旧timertimerService.deleteProcessingTimeTimer(curCleanupTime);}// 更新cleanupTime状态cleanupTimeState.update(cleanupTime);}
}

onTimer
定时清理状态

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<RowData> out) throws Exception {if (stateCleaningEnabled) {// 清理所有累积的状态cleanupState(accState);function.cleanup();}
}// KeyedProcessFunctionWithCleanupState#cleanupState
protected void cleanupState(State... states) {for (State state : states) {state.clear();}this.cleanupTimeState.clear();
}

8.3.2.5 小结

  • 假设 minRetentionTime = 12小时,maxRetentionTime=24小时

  • 第一次该元素A来的时间为20200827 00:00,则curCleanupTime=20200828 00:00,并将A元素放入累加器,并放入State保存

  • A 第二次出现是20200827 11:00,则 12:00+12小时 = 20200828 00:00 不大于 curCleanupTime,此时还是什么都不做。当然,还是需要将A放入累加器累加,并放入State保存。

    也就是说如果在20200828 00:00前 A不再出现,则对应的累加器状态会在0点被清理,虽然在12小时前出现过一次,**这也就是状态的最小保留时间的含义!**也就是说虽然A最后一次出现是12小时以前,但在此12小时后就被清理了,符合状态最小保留时间。

  • A 第三次出现是20200827 13:00,则 13:00+12小时 = 20200828 01:00 大于 curCleanupTime,此时就会更新curCleanupTime = 20200827 13:00+24小时= 20200828 13:00

    随后将A放入累加器累加,并放入State保存。

  • 如果 A第四次出现是 20200829 14:00,此时已经距离上次出现超过24小时,则Key A对应的累加器状态已经被清理!

    此时新来的A会被当做首次出现,创建新的累加器并放入State。

9 示例

9.1 维表Join

本小节转自[Flink Sql教程(3)- 维表Join](https://blog.csdn.net/weixin_47482194/article/details/106672613),作者Flink-狄杰

  • 什么是维表
    维表,维度表的简称,来源于数据仓库,一般用来给事实数据补充信息。假设现在有一张销售记录表。销售记录表里面的一条销售记录就是一条事实数据,而这条销售记录中的地区字段就是一个维度。通常销售记录表里面的地区字段是地区表的主键,地区表就是一张维表。更多的细节可以面向百度/谷歌编程。

  • 为什么Flink中需要维表
    以流计算为例,一般情况下,消费的消息中间件中的消息,是事实表中的数据,我们需要把数据补全,不然谁也不知道字段地区对应的值01、02是个什么东西。所以,我们通常会在计算过程中,通过Join维表来补全数据。

维表相关配置可见join cache

Flink Sql教程(3)- 维表Join

参考文档

  • Flink官网
    部分内容翻译或转自官网
  • 《Stream Processing with Apache Flink》
    出处:Flink极客训练营
    作者:崔星灿
  • 《Flink SQL _ Table 介绍与实战》
    出处:Flink极客训练营
    作者:伍翀

Flink学习4-流式SQL相关推荐

  1. 为什么阿里会选择 Flink 作为新一代流式计算引擎?

    本文由 [AI前线]原创,ID:ai-front,原文链接:t.cn/ROISIr3 [AI前线导读]2017 年 10 月 19日,阿里巴巴的高级技术专家王绍翾(花名"大沙")将 ...

  2. flink大数据处理流式计算详解

    flink大数据处理 文章目录 flink大数据处理 二.WebUI可视化界面(测试用) 三.Flink部署 3.1 JobManager 3.2 TaskManager 3.3 并行度的调整配置 3 ...

  3. StreamDM:基于Spark Streaming、支持在线学习的流式分析算法引擎

    StreamDM:基于Spark Streaming.支持在线学习的流式分析算法引擎 streamDM:Data Mining for Spark Streaming,华为诺亚方舟实验室开源了业界第一 ...

  4. 使用 Flink Hudi 构建流式数据湖

    简介: 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增量计算模型的不断优化演进. 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增 ...

  5. Apache Griffin+Flink+Kafka实现流式数据质量监控实战

    点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 八股文教给我,你们专心刷题和面试 Hi,我是王知无,一个大数据领域的原创作者. 放心关注我,获取更 ...

  6. 移动web学习(一) --- 流式布局, 视口viewpoint ,2倍图和3倍图, less

    从这章开始,学习移动web 移动web开发指的是需要适配移动设备的网页开发,和pc端没有本质的区别,还是使用html,css,js 一.学习移动web的原因 1. 使用移动设备的用户越来越多 2. 一 ...

  7. KSQL:Apache Kafka的流式SQL

    更新:KSQL  现在可作为Confluent Platform的一个组件提供. 我很高兴地宣布KSQL,为Apache kafka流SQL引擎®.KSQL降低了流处理世界的入口,提供了一个简单而完全 ...

  8. Apache Kafka的流式SQL引擎——KSQL

    1. KSQL 介绍 KSQL 引擎--一个基于流的 SQL.推出 KSQL 是为了降低流式处理的门槛,为处理 Kafka 数据提供简单而完整的可交互式 SQL 接口.KSQL 目前可以支持多种流式操 ...

  9. 专访阿里云高级技术专家吴威:Kafka、Spark和Flink类支持流式计算的软件会越来越流行...

    杭州·云栖大会将于2016年10月13-16日在云栖小镇举办,在这场标签为互联网.创新.创业的云计算盛宴上,众多行业精英都将在这几天里分享超过450个演讲主题. 为了帮助大家进一步了解这场全球前言技术 ...

  10. Flink - 批量、流式计算和离线、实时计算

    在了解Flink之前,我们需要先简单了解批量.流式计算和离线.实时计算. 首先需要明确的一点是,批量.流式计算和离线.实时计算是按照不同维度划分的两套数据处理方式. (1)批量.流式计算体现在数据计算 ...

最新文章

  1. VUE v-if 和 v-for 的使用示例 VUE根据下标改变图片路径
  2. Android Studio 更换国内源下载依赖库
  3. JS详细入门教程(上)
  4. http,session,cookie
  5. [蓝桥杯][2019年第十届真题]修改数组(并查集)
  6. 前端学习(2033)vue之电商管理系统电商系统之通过路由加载报表
  7. 不等待输入_明明显示“对方正在输入”却总等不来回复,其实是你误解了
  8. 红芯事件追踪:官方致歉承认基于开源架构;创始人履历被指夸大
  9. php 等比例缩略图,PHP等比例生成缩略图
  10. E-Prime 3 安装
  11. 485通讯温湿度传感器工作原理
  12. python爬取人口数据_爬取人口数据
  13. 三角形和矩形傅里叶变换_第3章傅立叶变换.doc
  14. HR 必知的 360 评估
  15. 百度地图加载过慢问题
  16. 利用梆梆加固逻辑漏洞取巧脱壳
  17. 使用Webpack构建SPA模式的多页面应用(基于Vue 2)
  18. 国际移动设备识别码IMEI
  19. DSP 中的基础算法和模型的详细解析
  20. 192.168.0.1/27 表示什么

热门文章

  1. linux下如何拷贝软链接
  2. 【舒适区如何影响着我的生活】
  3. 使用Roslyn动态编译和执行
  4. 彩色激光同轴位移计在点胶行业的应用(胶水测量)
  5. 百度地图API基础操作--百度鹰眼篇
  6. 共享单车、公交车辆位置、地铁等50+个交通数据集
  7. /id_xndu5otm2mdq0.html,index.html
  8. 装逼第二弹——Laplace变换的前世今生
  9. python水位传感器输出水位_水位传感器坏了的表现_判断水位传感器好坏方法
  10. 互联网行业应届生年薪35W,倒挂老员工,这是逼老人离职吗?