1.19.5.3.时态表(Temporal Tables)
1.19.5.3.1.设计初衷
1.19.5.3.1.1.关联一张版本表
1.19.5.3.1.2.关联一张普通表
1.19.5.3.2.时态表
1.19.5.3.2.1.声明版本表
1.19.5.3.2.2.声明版本视图
1.19.5.3.2.3.声明普通表
1.19.5.3.3.时态表函数
1.19.5.3.3.1.定义时态表函数

1.19.5.3.时态表(Temporal Tables)

时态表(Temporal Table)是一张随时间变化的表 – 在Flink中称为动态表,时态表中的每条记录都关联了一个或多个时间段,所有的 Flink 表都是时态的(动态的)。

时态表包含表的一个或多个有版本的表快照,时态表可以是一张跟踪所有变更记录的表(例如数据库表的 changelog,包含多个表快照),也可以是物化所有变更之后的表(例如数据库表,只有最新表快照)。

版本: 时态表可以划分成一系列带版本的表快照集合,表快照中的版本代表了快照中所有记录的有效区间,有效区间的开始时间和结束时间可以通过用户指定,根据时态表是否可以追踪自身的历史版本与否,时态表可以分为 版本表 和 普通表。
版本表: 如果时态表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,来自数据库的 changelog 可以定义成版本表。
普通表: 如果时态表中的记录仅仅可以追踪并和它的最新版本,这种我们称之为普通表,来自数据库 或 HBASE的表可以定义成普通表。

1.19.5.3.1.设计初衷
1.19.5.3.1.1.关联一张版本表

以订单流关联产品表这个场景举例,orders 表包含了来自 Kafka 的实时订单流,product_changelog 表来自数据库表 products 的 changelog , 产品的价格在数据库表 products 中是随时间实时变化的。

SELECT * FROM product_changelog;(changelog kind)  update_time  product_id product_name price
================= ===========  ========== ============ =====
+(INSERT)         00:01:00     p_001      scooter      11.11
+(INSERT)         00:02:00     p_002      basketball   23.11
-(UPDATE_BEFORE)  12:00:00     p_001      scooter      11.11
+(UPDATE_AFTER)   12:00:00     p_001      scooter      12.99
-(UPDATE_BEFORE)  12:00:00     p_002      basketball   23.11
+(UPDATE_AFTER)   12:00:00     p_002      basketball   19.99
-(DELETE)         18:00:00     p_001      scooter      12.99

表product_changelog表示数据库表 products不断增长的 changelog, 比如,产品 scooter 在时间点 00:01:00的初始价格是 11.11, 在 12:00:00 的时候涨价到了 12.99, 在 18:00:00 的时候这条产品价格记录被删除。

如果我们想输出 product_changelog 表在 10:00:00 对应的版本,表的内容如下所示:

update_time  product_id product_name price
===========  ========== ============ =====
00:01:00     p_001      scooter      11.11
00:02:00     p_002      basketball   23.11

如果我们想输出 product_changelog 表在 13:00:00 对应的版本,表的内容如下所示:

update_time  product_id product_name price
===========  ========== ============ =====
12:00:00     p_001      scooter      12.99
12:00:00     p_002      basketball   19.99

上述例子中,products 表的版本是通过 update_time 和 product_id 进行追踪的,product_id 对应 product_changelog 表的主键,update_time 对应事件时间。

在 Flink 中, 这由版本表表示。

1.19.5.3.1.2.关联一张普通表

另一方面,某些用户案列需要连接变化的维表,该表是外部数据库表。
假设 LatestRates 是一个物化的最新汇率表 (比如:一张 HBase 表),LatestRates 总是表示 HBase 表 Rates 的最新内容。

我们在 10:15:00 时查询到的内容如下所示:

10:15:00 > SELECT * FROM LatestRates;currency  rate
========= ====
US Dollar 102
Euro      114
Yen       1

我们在 11:00:00 时查询到的内容如下所示:

11:00:00 > SELECT * FROM LatestRates;currency  rate
========= ====
US Dollar 102
Euro      116
Yen       1

在 Flink 中, 这由普通表表示。

1.19.5.3.2.时态表

注意:仅Blink planner支持此功能
Flink使用主键约束和事件时间来定义一张版本表和版本视图。

1.19.5.3.2.1.声明版本表

在Flink 中,定义了主键约束和事件时间属性的表就是版本表。
– 定义一张版本表

CREATE TABLE product_changelog (product_id STRING,product_name STRING,product_price DECIMAL(10, 4),update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,PRIMARY KEY(product_id) NOT ENFORCED,      -- (1) 定义主键约束WATERMARK FOR update_time AS update_time   -- (2) 通过 watermark 定义事件时间
) WITH ('connector' = 'kafka','topic' = 'products','scan.startup.mode' = 'earliest-offset','properties.bootstrap.servers' = 'localhost:9092','value.format' = 'debezium-json'
);

行(1) 为表product_changelog 定义了主键, 行(2) 把 update_time 定义为表 product_changelog 的事件时间,因此 product_changelog 是一张版本表。
注意:METADATA FROM ‘value.source.timestamp’ VIRTUAL 语法的意思是从每条 changelog 中抽取 changelog 对应的数据库表中操作的执行时间,强烈推荐使用数据库表中操作的 执行时间作为事件时间 ,否则通过时间抽取的版本可能和数据库中的版本不匹配。

1.19.5.3.2.2.声明版本视图

Flink 也支持定义版本视图只要一个视图包含主键和事件时间便是一个版本视图。
假设我们有表 RatesHistory如下所示:

-- 定义一张 append-only 表
CREATE TABLE RatesHistory (currency_time TIMESTAMP(3),currency STRING,rate DECIMAL(38, 10),WATERMARK FOR currency_time AS currency_time   -- 定义事件时间
) WITH ('connector' = 'kafka','topic' = 'rates','scan.startup.mode' = 'earliest-offset','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'                                -- 普通的 append-only 流
)

表 RatesHistory 代表一个兑换日元货币汇率表(日元汇率为1),该表是不断增长的 append-only 表。 例如,欧元 兑换 日元 从 09:00:00 到 10:45:00 的汇率为 114。从 10:45:00 到 11:15:00 的汇率为 116。

SELECT * FROM RatesHistory;currency_time currency  rate
============= ========= ====
09:00:00      US Dollar 102
09:00:00      Euro      114
09:00:00      Yen       1
10:45:00      Euro      116
11:15:00      Euro      119
11:49:00      Pounds    108

为了在RatesHistory上定义版本表,Flink支持通过去重查询(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/sql/queries.html#%E5%8E%BB%E9%87%8D)定义版本视图。

CREATE VIEW versioned_rates AS
SELECT currency, rate, currency_time            -- (1) `currency_time` 保留了事件时间FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY currency  -- (2) `currency` 是去重 query 的 unique key,可以作为主键ORDER BY currency_time DESC) AS rowNum FROM RatesHistory )
WHERE rowNum = 1; -- 视图 `versioned_rates` 将会产出如下的 changelog:(changelog kind) currency_time currency   rate
================ ============= =========  ====
+(INSERT)        09:00:00      US Dollar  102
+(INSERT)        09:00:00      Euro       114
+(INSERT)        09:00:00      Yen        1
+(UPDATE_AFTER)  10:45:00      Euro       116
+(UPDATE_AFTER)  11:15:00      Euro       119
+(INSERT)        11:49:00      Pounds     108

行 (1) 保留了事件时间作为视图 versioned_rates 的事件时间,行 (2) 使得视图 versioned_rates 有了主键, 因此视图 versioned_rates 是一个版本视图。

视图中的去重 query 会被 Flink 优化并高效地产出 changelog stream, 产出的 changelog 保留了主键约束和事件时间。

如果我们想输出 versioned_rates 表在 11:00:00 对应的版本,表的内容如下所示:

currency_time currency   rate
============= ========== ====
09:00:00      US Dollar  102
09:00:00      Yen        1
10:45:00      Euro       116

如果我们想输出versioned_rates表在12:00:00对应的版本,表的内容如下所示:

currency_time currency   rate
============= ========== ====
09:00:00      US Dollar  102
09:00:00      Yen        1
10:45:00      Euro       119
11:49:00      Pounds     108
1.19.5.3.2.3.声明普通表

普通表的声明和 Flink 建表 DDL 一致,参考 create table 页面获取更多如何建表的信息。
– 用 DDL 定义一张 HBase 表,然后我们可以在 SQL 中将其当作一张时态表使用
– ‘currency’ 列是 HBase 表中的 rowKey

 CREATE TABLE LatestRates (   currency STRING,   fam1 ROW<rate DOUBLE>   ) WITH (   'connector' = 'hbase-1.4',   'table-name' = 'rates',   'zookeeper.quorum' = 'localhost:2181'   );

注意:理论上讲任意都能用作时态表并在基于处理时间的时态表 Join 中使用,但当前支持作为时态表的普通表必须实现接口 LookupableTableSource。接口 LookupableTableSource 的实例只能作为时态表用于基于处理时间的时态 Join 。

通过 LookupableTableSource 定义的表意味着该表具备了在运行时通过一个或多个 key 去查询外部存储系统的能力,当前支持在 基于处理时间的时态表 join 中使用的表包括 JDBC, HBase 和 Hive。

另请参阅 LookupableTableSource页面了解更多信息。

在基于处理时间的时态表 Join 中支持任意表作为时态表会在不远的将来支持。

1.19.5.3.3.时态表函数

时态表函数是一种过时的方式去定义时态表并关联时态表的数据,现在我们可以用时态表 DDL 去定义时态表,用时态表 Join 语法去关联时态表。

时态表函数和时态表 DDL 最大的区别在于,时态表 DDL 可以在纯 SQL 环境中使用但是时态表函数不支持,用时态表 DDL 声明的时态表支持 changelog 流和 append-only 流但时态表函数仅支持 append-only 流。

为了访问时态表中的数据,必须传递一个时间属性,该属性确定将要返回的表的版本。 Flink 使用表函数的 SQL 语法提供一种表达它的方法。

定义后,时态表函数将使用单个时间参数 timeAttribute 并返回一个行集合。 该集合包含相对于给定时间属性的所有现有主键的行的最新版本。

假设我们基于 RatesHistory 表定义了一个时态表函数,我们可以通过以下方式查询该函数 Rates(timeAttribute):

SELECT * FROM Rates('10:15:00');rowtime  currency  rate
=======  ========= ====
09:00:00 US Dollar 102
09:00:00 Euro      114
09:00:00 Yen       1SELECT * FROM Rates('11:00:00');rowtime  currency  rate
======== ========= ====
09:00:00 US Dollar 102
10:45:00 Euro      116
09:00:00 Yen       1

对 Rates(timeAttribute) 的每个查询都将返回给定 timeAttribute 的 Rates 状态。

注意:当前 Flink 不支持使用常量时间属性参数直接查询时态表函数。目前,时态表函数只能在 join 中使用。上面的示例用于为函数 Rates(timeAttribute) 返回内容提供直观信息。

另请参阅有关用于持续查询的 join 页面,以获取有关如何与时态表 join 的更多信息。

1.19.5.3.3.1.定义时态表函数

以下代码段说明了如何从 append-only 表中创建时态表函数。
Java代码:

package com.toto.demo.sql;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;import java.util.ArrayList;
import java.util.List;import static org.apache.flink.table.api.Expressions.$;public class Demo {public static void main(String[] args) {// 获取 stream 和 table 环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// 提供一个汇率历史记录表静态数据集List<Tuple2<String, Long>> ratesHistoryData = new ArrayList<>();ratesHistoryData.add(Tuple2.of("US Dollar", 102L));ratesHistoryData.add(Tuple2.of("Euro", 114L));ratesHistoryData.add(Tuple2.of("Yen", 1L));ratesHistoryData.add(Tuple2.of("Euro", 116L));ratesHistoryData.add(Tuple2.of("Euro", 119L));// 用上面的数据集创建并注册一个示例表// 在实际设置中,应使用自己的表替换它DataStream<Tuple2<String, Long>> ratesHistoryStream = env.fromCollection(ratesHistoryData);Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, $("r_currency"), $("r_rate"), $("r_proctime").proctime());tEnv.createTemporaryView("RatesHistory", ratesHistory);// 创建和注册时态表函数// 指定 "r_proctime" 为时间属性,指定 "r_currency" 为主键TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency"); // <==== (1)tEnv.registerFunction("Rates", rates);}}

Scala代码:

// 获取 stream 和 table 环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)// 提供一个汇率历史记录表静态数据集
val ratesHistoryData = new mutable.MutableList[(String, Long)]
ratesHistoryData.+=(("US Dollar", 102L))
ratesHistoryData.+=(("Euro", 114L))
ratesHistoryData.+=(("Yen", 1L))
ratesHistoryData.+=(("Euro", 116L))
ratesHistoryData.+=(("Euro", 119L))// 用上面的数据集创建并注册一个示例表
// 在实际设置中,应使用自己的表替换它
val ratesHistory = env.fromCollection(ratesHistoryData).toTable(tEnv, 'r_currency, 'r_rate, 'r_proctime.proctime)tEnv.createTemporaryView("RatesHistory", ratesHistory)// 创建和注册时态表函数
// 指定 "r_proctime" 为时间属性,指定 "r_currency" 为主键
val rates = ratesHistory.createTemporalTableFunction($"r_proctime", $"r_currency") // <==== (1)
tEnv.registerFunction("Rates", rates)

行(1)创建了一个rates时态表函数,这使我们可以在Table API中使用rates函数。
行(2)在表环境中注册名称为Rates的函数,这使我们可以在SQL中使用Rates函数。

1.19.5.3.时态表、关联一张版本表、关联一张普通表、时态表、声明版本表、声明版本视图、声明普通表、时态表函数等相关推荐

  1. HubbleDotNet 开源全文搜索数据库项目--为数据库现有表或视图建立全文索引(三) 多表关联全文索引模式...

    关系型数据库中,多表关联是很常见的事情,HubbleDotNet 可以对部分情况的多表关联形式建立关联的全文索引,这样用户就不需要专门建一个大表 来解决多表关联时的全文索引问题. 下面以 为数据库现有 ...

  2. Windows核心编程_注册表操作和小练习程序关联

    大家有没有见过就是当我们下载一个软件比如视频播放器 下载之后我们电脑上的视频文件图标都变成了这个视频播放器的图标,然后打开时也是默认调用此视频播放器来播放 下面就给大家介绍如何在Windows平台上实 ...

  3. 零售商贩mysql表设计:banner+banner_item+image关联表(轮播图表)

    作者:陈业贵 华为云享专家 51cto(专家博主 明日之星 TOP红人) 阿里云专家博主 文章目录 banner管理表(轮播图管理表) 解析banner管理表(轮播图管理表) id解析: 数据 (ba ...

  4. Hibernate 关联映射 之 多对多 关联(二) 之拆分

    1.由问题引出一个多对多拆分成两个多对一 问题:Hibernate 关联映射 之 多对多 关联(一)中中间表只是一个存放用户和角色的表,并无其他作用,如果客户有其他的需求,该表就无法扩展. 2.问题解 ...

  5. mysql创建全外连接的视图_关系型数据库 MySQL 表索引和视图详解

    原创: JiekeXu JiekeXu之路 一.索引 数据库索引通俗的讲就是和书本的目录一样,主要就是为了提高查询数据的效率.由于数据存储在数据库表中,所以索引是创建在数据库表对象上,由表中的一个字段 ...

  6. (转)MyBatis框架的学习(五)——一对一关联映射和一对多关联映射

    http://blog.csdn.net/yerenyuan_pku/article/details/71894172 在实际开发中我们不可能只是对单表进行操作,必然要操作多表,本文就来讲解多表操作中 ...

  7. mysql查找两表中不同的数据库表_各位大侠怎样查找两张表的里面的数据(这两张表在不同的数据库)...

    Oracle: 一.同一个实例中的两个表 相应有两个用户a(表t1).b(表t2) 将用户a的t1查询权限授予b用户 grant select on t1 to b; 之后,b就可以查询t1表了 se ...

  8. mysql将查询结果写入另一张表_将一张表的查询结果插入到另一张表(转)

    将一张表的查询结果插入到另一张表 方法一: 代码 1 select into 和 insert into select 两种表复制语句 2 select * into destTbl from src ...

  9. mysql 关联 update_关于SQL UPDATE关联更新

    展开全部 分析如下 1.[如果存在测试临时表,则先e69da5e887aa62616964757a686964616f31333365666166删除,便于重复执行SQL]: IF OBJECT_ID ...

  10. oracle删除所有触发器的命令,Oracle删除当前用户的所有对象(表、视图、触发器、存储过程、函数)...

    Oracle删除当前用户的所有对象(表.视图.触发器.存储过程.函数) 1. DECLARE TYPE name_list IS TABLE OF VARCHAR2(40); TYPE type_li ...

最新文章

  1. Android ListView item设置分割线以及分割线宽度
  2. UVA10780:Again Prime? No Time(数论)
  3. macos 全局快捷键 打开 iterm_在 macOS 上实用的十大软件!你get了吗?
  4. python循环输出00-59
  5. ActiveMq消费端实现集群部署
  6. 单片机人流统计装置的程序_单片机其实不难
  7. SpringBoot如何实现自动配置
  8. myeclipse(eclipse)IDE配置
  9. installshield使用教程
  10. [-Flutter 自组篇-] 圆形进度条
  11. 寻找与黄金分割点最近的商
  12. 【组合】BZOJ3505(Cqoi2014)[数三角形]题解
  13. 如何提升w ndows10系统网速,win10系统网速提高50倍加快上网速度的图文办法
  14. HOJ1056 Fishermen(区间问题、思维)
  15. torch.masked_select和torch.masked_scatter
  16. linux 临时文件 锁,Linux上开启强制性记录锁
  17. 登不上http://www.veryyx.com/的问题
  18. 十大经典排序算法(图解与代码)——冒泡排序、选择排序、插入排序、希尔排序、归并排序、快速排序、堆排序、计数排序、桶排序、基数排序(Python and Java)
  19. Python paromiko每日生活学习感悟(第一次写,紧张hahaha)
  20. linux单独编译内核的驱动

热门文章

  1. 机械工业品电商平台后台开发(一):项目简介及SpringMVC工作原理(工作流程)介绍
  2. 全球与中国无线视频门铃对讲机市场深度研究分析报告
  3. excel清单数据导入到开票软件中进行开票
  4. 老照片瞬间修复神器!快帮你家的长辈恢复照片去吧
  5. react大数据量渲染_React大量数据渲染的绝佳解决方案——React虚拟化组件
  6. 2023浙江工商大学计算机考研信息汇总
  7. 完美解释:wenet-流式与非流式语音识别统一模型
  8. 深入浅出SSD 学习笔记整理——Johnathan Sung
  9. 【图像增广库imgaug】官方文档翻译(一):加载并增强图片
  10. 均方距离计算公式_均方末端距的统计计算法.ppt