Flink总共有三种时间语义:Processing time(处理时间)、Event time(事件时间)以及Ingestion time(摄入时间)。关于这些时间语义的具体解释,可以参考另一篇文章Flink的时间与watermarks详解。本文主要讲解Flink Table API & SQL中基于时间的算子如何定义时间语义。通过本文你可以了解到:

  • 时间属性的简介
  • 处理时间
  • 事件时间

时间属性简介

Flink TableAPI&SQL中的基于时间的操作(如window),需要指定时间语义,表可以根据指定的时间戳提供一个逻辑时间属性。

时间属性是表schama的一部分,当使用DDL创建表时、DataStream转为表时或者使用TableSource时,会定义时间属性。一旦时间属性被定义完成,该时间属性可以看做是一个字段的引用,从而在基于时间的操作中使用该字段。

时间属性像一个时间戳,可以被访问并参与计算,如果一个时间属性参与计算,那么该时间属性会被雾化成一个常规的时间戳,常规的时间戳不能与Flink的时间与水位线兼容,不能被基于时间的操作所使用。

Flink TableAPI & SQL所需要的时间属性可以通过Datastream程序中指定,如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 默认// 可以选择:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

处理时间

基于本地的机器时间,是一种最简单的时间语义,但是不能保证结果一致性,使用该时间语义不需要提取时间戳和生成水位线。总共有三种方式定义处理时间属性,具体如下

DDL语句创建表时定义处理时间

处理时间的属性可以在DDL语句中被定义为一个计算列,需要使用PROCTIME()函数,如下所示:

CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time AS PROCTIME() -- 声明一个额外字段,作为处理时间属性
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); -- 10分钟的滚动窗口

DataStream转为Table的过程中定义处理时间

在将DataStream转为表时,在schema定义中可以通过.proctime属性指定时间属性,并将其放在其他schema字段的最后面,具体如下:

DataStream<Tuple2<String, String>> stream = ...;
// 声明一个额外逻辑字段作为处理时间属性
Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.proctime");WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

使用TableSource

自定义TableSource并实现DefinedProctimeAttribute 接口,如下:

// 定义个带有处理时间属性的table source
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {@Overridepublic TypeInformation<Row> getReturnType() {String[] names = new String[] {"user_name" , "data"};TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};return Types.ROW(names, types);}@Overridepublic DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {// 创建streamDataStream<Row> stream = ...;return stream;}@Overridepublic String getProctimeAttribute() {// 该字段会追加到schema中,作为第三个字段return "user_action_time";}
}// 注册table source
tEnv.registerTableSource("user_actions", new UserActionSource());WindowedTable windowedTable = tEnv.from("user_actions").window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

事件时间

基于记录的具体时间戳,即便是存在乱序或者迟到数据也会保证结果的一致性。总共有三种方式定义处理时间属性,具体如下

DDL语句创建表时定事件时间

事件时间属性可以通过 WATERMARK语句进行定义,如下:

CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- 声明user_action_time作为事件时间属性,并允许5S的延迟  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

DataStream转为Table的过程中定义事件时间

当定义Schema时通过.rowtime属性指定事件时间属性,必须在DataStream中指定时间戳与水位线。例如在数据集中,事件时间属性为event_time,此时Table中的事件时间字段中可以通过’event_time. rowtime‘来指定。

目前Flink支持两种方式定义EventTime字段,如下:

// 方式1:
// 提取timestamp并分配watermarks
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// 声明一个额外逻辑字段作为事件时间属性
// 在table schema的末尾使用user_action_time.rowtime定义事件时间属性
// 系统会在TableEnvironment中获取事件时间属性
Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.rowtime");// 方式2:// 从第一个字段提取timestamp并分配watermarks
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// 第一个字段已经用来提取时间戳,可以直接使用对应的字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data");// 使用:WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

使用TableSource

另外也可以在创建TableSource的时候,实现DefinedRowtimeAttributes接口来定义EventTime字段,在接口中需要实现getRowtimeAttributeDescriptors方法,创建基于EventTime的时间属性信息。

// 定义带有rowtime属性的table source
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {@Overridepublic TypeInformation<Row> getReturnType() {String[] names = new String[] {"user_name", "data", "user_action_time"};TypeInformation[] types =new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};return Types.ROW(names, types);}@Overridepublic DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {// 创建流,基于user_action_time属性分配水位线DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);return stream;}@Overridepublic List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {// 标记user_action_time字段作为事件时间属性// 创建user_action_time描述符,用来标识时间属性字段RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor("user_action_time",new ExistingField("user_action_time"),new AscendingTimestamps());List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);return listRowtimeAttrDescr;}
}// register表
tEnv.registerTableSource("user_actions", new UserActionSource());WindowedTable windowedTable = tEnv.from("user_actions").window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

小结

本文主要介绍了如何在Flink Table API和SQL中使用时间语义,可以使用两种时间语义:处理时间和事件时间。分别对每种的时间语义的使用方式进行了详细解释。

往期精彩回顾

Flink Table API & SQL编程指南(1)

Flink Table API & SQL编程指南之动态表(2)

duration转为时间戳_Flink Table APIamp;SQL编程指南之时间属性(3)相关推荐

  1. (十八)Flink Table API SQL 编程指南 Table API 和Datastream API 集成

    文章目录 DataStream 和 Table 之间的转换 依赖项和导入 配置 执行行为 datastream API table API 批处理运行时模式 Changelog统一 处理(仅插入)流 ...

  2. Flink Table API SQL编程指南(自定义Sources Sinks)

    TableSource TableSource提供对存储在外部系统(数据库,键值存储,消息队列)或文件中的数据的访问的通用接口.在TableEnvironment中注册TableSource后,可以通 ...

  3. Spark SQL编程指南-收费版

    Spark SQL 编程指南 Spark SQL是用于结构化数据处理的一个模块.同Spark RDD 不同地方在于Spark SQL的API可以给Spark计算引擎提供更多地 信息,例如:数据结构.计 ...

  4. hive编程指南电子版_第三篇|Spark SQL编程指南

    在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...

  5. spark-sql建表语句限制_第三篇|Spark SQL编程指南

    在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...

  6. hive编程指南_第三篇|Spark SQL编程指南

    在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...

  7. oracle查询姓王的学生,Oracle之SQL 和 PL/SQL编程指南

    1.定义表的结构: sql> create  table stu ( student_id  number(5) constrant  student_pk  primary  key,  -- ...

  8. C#编程指南:使用属性

    属性可以放置在几乎所有的声明中(但特定的属性可能限制在其上有效的声明类型). 在语法上,属性的指定方法为:将括在方括号中的属性名置于其适用的实体声明之前.例如,具有 DllImport 属性的方法将声 ...

  9. Apache Spark 3.0 结构化Streaming流编程指南

    目录 总览 快速范例 Scala语言 Java语言 Python语言 R语言 程式设计模型 基本概念 处理事件时间和延迟数据 容错语义 使用数据集和数据帧的API 创建流数据框架和流数据集 流数据帧/ ...

最新文章

  1. OpenGL进阶(十三) - GLSL光照(Lighting)
  2. Ext2.0 form使用实例
  3. Cisco路由器操作命令汇总,看完全学会
  4. lintcode:排颜色 II
  5. python3 urlencode_Python2和Python3中urllib库中urlencode的使用注意事项
  6. 2019秋季PAT甲级考试心得
  7. 安装VS2008错误解决
  8. hive join on 条件 与 where 条件区别
  9. gulp前端自动化构建工具使用
  10. BRVAH官方使用指南(持续更新)
  11. 教你轻松玩转天线效应(Process Antenna Effect)
  12. SIFT/SURF算法
  13. layui 导航栏设置无鼠标停留特效_五款最受欢迎的热门wordpress开源主题 - 博客、导航...
  14. (转)前端文摘:深入解析浏览器的幕后工作原理
  15. win10文件资源管理器默认打开我的电脑及左侧导航设置
  16. 电子计算机X线体层摄影,X线计算机体层摄影.pdf
  17. Kafka删除历史消息的策略
  18. 我如何使用smartwatch传感器限制covid 19感染
  19. vue样式操作与事件绑定
  20. uniapp里css不是识别*,报 error at token “*“

热门文章

  1. Oracle SQL语句执行步骤
  2. GTK+开发环境搭建(Centos+Netbeans)
  3. PHP学习——定界符格式引起的错误
  4. 电力自动化及继电保护实验室规章制度
  5. 让sky Driver成为你的可见硬盘
  6. springboot controller 访问 404
  7. 自己从零安装hadoop-HA集群
  8. java 注释快捷打出时间_Java快捷---自动注释时间作者。。。
  9. php什么情况下使用静态属性,oop-做php项目什么时候该使用静态属性呢
  10. python独立log示例_带有Python示例的math.log1p()方法