文章目录

  • 1. flink 三种time简介
    • 1.1 Processing Time:
    • 1.2 Event time:
    • 1.3 Ingestion time:
  • 2. flink中使用event-time
    • 2.1 在stream-source中直接放入event-time
    • 2.2 使用Timestamp Assigners / Watermark Generators
      • 2.2.1 AssignerWithPeriodicWatermarks
      • 2.2.2 AssignerWithPunctuatedWatermarks
  • 3. window的两种使用方式
    • 3.1 evn设置TimeCharacteristic
    • 3.2 创建window的时候指定window的类型
  • 4. 使用kafka record 的timestamp作为event的timestamp
  • 5. watermark的注意事项
    • 5.1 . watermark要满足递增特性
    • 5.2. 多个输入流需要特别注意
  • 6. 连续多个window的计算

1. flink 三种time简介

flink中有三种时间: processing-time, event-time, ingestion-time

1.1 Processing Time:

Processing Time是指执行程序时对应的物理机系统时间。
  当一个流程序通过处理时间来运行时,所有基于时间的操作(如: 时间窗口)将使用各自操作所在的物理机的系统时间。例如:一个每小时处理的时间窗口将包括所有在系统指定的一个小时内到达指定操作的所有记录。
  处理时间是最简单的时间概念,不需要流和物理机之间的协调,它有最好的性能和最低的延迟。然而,在分布式或者异步环境中,因为受到记录到达系统时间的影响,处理时间不能够决定系统内操作之间记录流的速度。
  对于简单的总量统计模型,则可以采用processing-time 比如统计用户的发帖总量,实际上对数据是否有序并不敏感。

1.2 Event time:

Event Time是每个独立事件在产生它的设备上发生的时间,这个时间通常在事件进入Flink之前就已经嵌入到事件中了,并且事件的timestamp是可以从每一个record中抽取出来的。
  事件时间可以通过备份或者持久化日志获取无序数据、延迟事件或者重试数据的正确结果。在事件时间中,时间进度依赖于数据而不是其他形式的时钟。事件时间程序必须要指定如何产生事件时间水印(Event Time Watermarks),这是事件时间处理进度的信号机制,这个机制在下面描述。对于很多有明确时间以来的数据比较有用,比如统计用户过去5分钟内的发帖量,则是有必要使用event-time的,否则如果处理已经堆积的数据,使用proccessing-time则明显会填进去很多不是对应时间段的数据。
  使用event-time,需要我们做两个工作,
  1.为每个record提取他的timestamp,每个事件都要有一个timestamp
  2.产生watermark,这个watermark是在整个流当中都会起作用的

1.3 Ingestion time:

摄入时间(Ingestion Time)是事件进入Flink的时间,在源操作中每个记录都会获得源的当前时间作为时间戳,后续基于时间的操作(如: time window)会依赖这个时间戳
  摄入时间从概念上来讲是在event-time和processing-time之间,与处理时间相比,成本可能会高一点,但是会提供更加可预测的结果。因为摄入时间使用的是固定的时间戳(都是在源处指定的),记录中的不同窗口操作依赖同一个时间戳,而在处理时间中每个窗口操作可能将记录赋给不同的窗口(根据本地的系统时钟和传输时延)。
  与事件时间相比,摄入时间程序不能处理任何无序事件或者延迟事件,但是程序无需指定如何产生水印。

2. flink中使用event-time

flink提供了两种抽取event-time的方式

  1. 在stream-source中直接放入event-time并且产生watermark
  2. 使用Timestamp Assigners / Watermark Generators 来产生event-time和water-mark

2.1 在stream-source中直接放入event-time

DataStreamSource<MyEvent> dataStreamSource = env.addSource(new SourceFunction<MyEvent>() {@Override
public void run(SourceContext<MyType> ctx) throws Exception {while (/* condition */) {MyType next = getNext();ctx.collectWithTimestamp(next, next.getEventTimestamp());if (next.hasWatermarkTime()) {ctx.emitWatermark(new Watermark(next.getWatermarkTime()));}}
}}

这种需要自己对data-source进行封装,管理起来可能比较麻烦。

2.2 使用Timestamp Assigners / Watermark Generators

这种方式也有两种,

  1. AssignerWithPeriodicWatermarks 按照一定的时间产生waterMark
  2. AssignerWithPunctuatedWatermarks 按照特定的事件来产生waterMark

2.2.1 AssignerWithPeriodicWatermarks

实现这个类需要实现两个方法

/*** This generator generates watermarks assuming that elements arrive out of order,* but only to a certain degree. The latest elements for a certain timestamp t will arrive* at most n milliseconds after the earliest elements for timestamp t.*/
//这个产生water-mark 对应的情况是相同event-time产生的多个event,
//最晚的到达的event只会比最早到达的晚几毫秒的情况
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {private final long maxOutOfOrderness = 3500; // 3.5 secondsprivate long currentMaxTimestamp;// 实现了给每个event抽取timestamp@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {long timestamp = element.getCreationTime();currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);return timestamp;}// 实现了获取watermark@Overridepublic Watermark getCurrentWatermark() {// return the watermark as current highest timestamp minus the out-of-orderness boundreturn new Watermark(currentMaxTimestamp - maxOutOfOrderness);}
}/*** This generator generates watermarks that are lagging behind processing time by a fixed amount.* It assumes that elements arrive in Flink after a bounded delay.*/
// 这个water-mark允许元素对应当前系统时间有一定的延迟,
//感觉不好用,这种是用processing-time来度量延迟的
public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {private final long maxTimeLag = 5000; // 5 seconds@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {return element.getCreationTime();}@Overridepublic Watermark getCurrentWatermark() {// return the watermark as current time minus the maximum time lagreturn new Watermark(System.currentTimeMillis() - maxTimeLag);}
}

在这里 对于每个kafka元素都会调用 extractTimestamp 方法来产生 timestamp
然后再固定的时间片段会调用 getCurrentWatermark ,就像定时任务一样
具体的调用周期可以使用

ExecutionConfig.setAutoWatermarkInterval(...)

进行设置,一般都是几十毫秒就行了。

2.2.2 AssignerWithPunctuatedWatermarks

这个是针对特定的元素触发water-mark

public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> {@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {return element.getCreationTime();}@Overridepublic Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;}
}

3. window的两种使用方式

这个实际上说的是具体的api的使用

  1. 在env初始化的时候设置TimeCharacteristic,后面使用创建window的时候直接使用time
  2. env不指定TimeCharacteristic,创建window的时候指定window的类型
    使用这两个的前提是先要设置env的时间线类型

3.1 evn设置TimeCharacteristic

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<MyEvent> stream = env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter(), typeInfo);DataStream<MyEvent> withTimestampsAndWatermarks = stream.filter( event -> event.severity() == WARNING ).assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());withTimestampsAndWatermarks.keyBy( (event) -> event.getGroup() ).timeWindow(Time.seconds(10)).reduce( (a, b) -> a.add(b) ).addSink(...);

3.2 创建window的时候指定window的类型

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> input =env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter(), typeInfo);DataStream<Integer> resultsPerKey = input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).reduce(new Summer());

4. 使用kafka record 的timestamp作为event的timestamp

flink 对kafka consumer做了一些特殊的处理
当你在代码中进行了如下设置

 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

对于每个kafka元素,在获取的时候会将kafka record的timestamp作为当前元素的timestamp
所以,如果你选择使用kafka record的timestamp作为 event-time的话,在实现AssignerWithPeriodicWatermarks 只需要重点关注产生watermark的方法就行了。
下面这个是一个使用kafka record的timestamp作为 event-time的AssignerWithPeriodicWatermarks样例

public class KafkaMark implements AssignerWithPeriodicWatermarks<ObjectNode> {private long waterMark;private long allowLate;@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(waterMark-allowLate);}/**在这里,previousElementTimestamp 是当前element 之前产生的timestamp,默认是0,但是当使用 kafak-consumer并且开启了event-time时间线的时候,这里就是kafka-record的时间,所以你就不要再进行解析了,直接使用即可*/@Overridepublic long extractTimestamp(ObjectNode element, long previousElementTimestamp) {this.waterMark=Math.max(waterMark,previousElementTimestamp);return previousElementTimestamp;}}

5. watermark的注意事项

5.1 . watermark要满足递增特性

对于任何一个AssignerWithPeriodicWatermarks ,extractTimestamp产生的是每一个元素具体的timestamp,但是getCurrentWatermark 产生的一系列watermark必须满足递增特性,因为window是根据watermark创建和触发计算以及销毁的。如果watermark不能满足递增特性的话,同一个时间段的window可能会被反复创建,导致数据统计失真。所以在实现AssignerWithPeriodicWatermarks 的extractTimestamp 的时候一定需要注意,要满足watermark的递增特性。
下面的代码保证了watermark是递增的(严格的说是非递减的)

  @Overridepublic long extractTimestamp(ObjectNode element, long previousElementTimestamp) {this.waterMark=Math.max(waterMark,previousElementTimestamp);return previousElementTimestamp;}

5.2. 多个输入流需要特别注意

并行度的引入可能导致可能有些窗口无法被触发,需要注意,在union的时候,会引入多个并行度,然后window会取每个并行度的最小值来作为窗口的最小warter-mark,这样有可能会导致water-mark一直没有办法触发。
比如,统计用户最近30天发帖量,如果使用了两个kafka-topic,一个是init-topic(存放存量数据),一个是binlog-topic(存放增量数据),使用event-time作为时间线的话,有可能导致window无法触发,因为init-topic对应的数据是存量,没有增量数据,但是window中的watermark是取两个流当中的最小值作为他的watermark,这样的话会导致window无法触发计算。

6. 连续多个window的计算

watermark会在产生的地方持续往下游流过去,下游的多个window都会接收到这些watermark,会按照规则触发计算,当一个大于等于(end-timestamp - 1)的watermark到来的时候,会触发所有的endTime 小于等于end-timestamp的窗口

可以进行两个连续的窗口计算,从第一个窗口出来的元素的timestamp都是这个window的 endTime-1.
比如下面的样例


DataStream<Integer> input = ...;DataStream<Integer> resultsPerKey = input.keyBy(<key selector>).window(SlidingEventTimeWindows.of(Time.seconds(60),Time.seconds(6))).reduce(new Summer());DataStream<Integer> globalResults = resultsPerKey.windowAll(TumblingEventTimeWindows.of(Time.seconds(6))).process(new TopKWindowFunction());

上面的方式可以在第一个window window01中每隔6s计算一下过去60s的各个key的统计数据,然后再后面第二个window window02会拿到window01中每隔6s输出的数据,做一次计算,求出topk。

flink event-time 和连续窗口的使用相关推荐

  1. Flink 中的时间和窗口

    时间和窗口 一.时间语义 1. Flink 中的时间语义 1.1 处理时间(Processing Time) 1.2 事件时间(Event Time) 1.3 两种时间语义的对比 二.水位线(Wate ...

  2. 最新 Flink 1.13 时间和窗口(时间语义、Watermark、Window 窗口、Trigger)快速入门、详细教程

    时间和窗口 文章目录 时间和窗口 一.Flink 的三种时间语义 二.水位线(Watermark) 1. Flink 中的 Watermark 机制 2. 如何生成水位线 3. 水位线的传递 三.窗口 ...

  3. 【Flink系列2】时间窗口

    引出 对于流处理系统来说,流入的消息是无限的,所以对于聚合或是连接等操作,流处理系统需要对流入的消息进行分段,然后基于每一段数据进行聚合或是连接等操作.消息的分段即称为窗口,流处理系统支持的窗口有很多 ...

  4. 万字详述 Flink SQL 4 种时间窗口语义!(收藏)

    DML:窗口聚合 大家好我是老羊,由于窗口涉及到的知识内容比较多,所以博主先为大家说明介绍下面内容时的思路,大家跟着思路走.思路如下: ⭐ 先介绍 Flink SQL 支持的 4 种时间窗口 ⭐ 分别 ...

  5. Flink教程(3) 大白话 时间 窗口 watermark

    大白话 时间 窗口 watermark 一.前言 二.流处理术语 1 延迟和吞吐 1.1 延迟 1.2 吞吐 2 数据流上的操作 2.1 数据接入和数据输出 2.2 转换操作 2.3 滚动聚合 2.4 ...

  6. Flink中的时间和窗口——时间语义

    文章目录 前言 一.时间语义 1.1.Flink 中的时间语义 1.1.1.处理时间(Processing Time) 1.1.2.事件时间(Event Time) 1.2.哪种时间语义更重要 1.2 ...

  7. 学习笔记Flink(二)—— Flink数据流模型、时间窗口和核心概念

    一.Flink编程数据流模型 1.1.Flink – API封装 Flink 提供不同级别的API封装来支持流/批处理应用程序. 1.2.Flink-编程数据流 Source:一个不会结束的数据记录流 ...

  8. FLINK 窗口实现原理

    FLINK 窗口原理 Flink可以将数据流切分成一个个window窗口,对窗口内的数据进行处理. Windows是Flink流计算的核心.Flink中支持多种窗口,包括时间窗口(Time-based ...

  9. 【Flink】Flink中的窗口API、窗口函数以及迟到数据处理问题

    目录 一.窗口 1.窗口的概念 2.窗口的分类 (1)按照驱动类型分类--时间窗口和计数窗口 (2)按照窗口分配数据的规则分类 3.窗口 API (1)按键分区窗口(Keyed Windows) (2 ...

最新文章

  1. CSS与HTML结合
  2. django mysql orm教程_带你了解Django ORM操作(基础篇)
  3. Apache Jackrabbit源码研究(五)
  4. 【数字信号处理】傅里叶变换性质 ( 共轭对称序列性质 | 共轭反对称序列性质 | 模偶对称 | 相角奇对称 )
  5. 这场论文复现的华山论剑,谁能拔得头筹
  6. rational rose 逆向工程
  7. 字符串String的trim()方法
  8. pb string 接收dll按值返回_JavaScript 是如何工作的:JavaScript 的共享传递和按值传递...
  9. Keil5消除未调用警告
  10. 通俗易懂——5G调制方式全面解读
  11. selenium+python自动化106 - 滑动 iframe 上的滚动条
  12. 复杂性思维中文第二版 十二、合作进化
  13. 阿里巴巴中国站搜索店铺列表 API 返回值说明
  14. html特殊符号圆点,HTML特殊符号(字符实体)大全
  15. Python-集合练习(协助学生做问卷调查)
  16. 算法——弗洛伊德算法(Floyd-Warshall)(图论)(c++)
  17. 靠着游戏收入支撑打造众多口碑产品的网易,下一个20年会如何走
  18. 自学C语言的最大难题是什么?
  19. DOM解析与DOM4J
  20. Python | 如何运行.ipynb文件?如何安装Jupyter notebook?

热门文章

  1. python获取列表list里面元素的下标
  2. 深入理解分布式消息队列
  3. 基于Huffman算法和LZ77算法的文件压缩的改进方向
  4. 【线上圆桌】视频会议下半场
  5. 【线上分享】边缘计算与云原生架构应用及实践解析
  6. WebRTC视频数据流程分析
  7. Salsify:高流畅度的实时视频传输新方式
  8. 有关/etc/resolv.conf、/etc/hosts、/etc/sysconfig/network
  9. RabbitMQ研究与应用
  10. SpringMVC Spring Mybatis Druid SpringSession集成例子