文章目录

  • 1. Flink 中的时间和窗口
    • 1.1 时间语义
    • 1.2 两种时间语义的对比
  • 2. 水位线(Watermark)
    • 2.1 事件时间和窗口
    • 2.2 水位线
      • 2.2.1 有序流中的水位线
      • 2.2.2 乱序流中的水位线
    • 2.3 水位线特点
  • 3. 如何生成水位线
    • 3.1 生成水位线的总体原则
    • 3.2 水位线生成策略(Watermark Strategies)
    • 3.3 Flink 内置水位线生成器
      • 3.3.1 有序流
      • 3.3.2 乱序流
    • 3.4 自定义水位线策略
  • 4. 水位线的传递

1. Flink 中的时间和窗口

1.1 时间语义

  在flink中,当希望对数据按照时间窗口来进行收集计算时,时间的衡量标准就非常重要

  如图:在事件发生之后,生成的数据被收集起来,首先进入分布式消息队列,然后被 Flink 系统中的 Source 算子读取消费,进而向下游的转换算子(窗口算子)传递,最终由窗口算子进行计算处理。
  在这个过程中,有两个非常重要的时间点:一个是数据产生的时间,把它叫作事件时间(Event Time);另一个是数据真正被处理的时刻,叫作处理时间(Processing Time);窗口操作以那种时间作为衡量标准,就是所谓的时间语义(Notions of Time)。由于分布式系统中网络传输的延迟和时钟漂移,处理时间相对事件发生的时间会有所滞后

注:
  在事件时间语义下,对于时间的衡量,不再依赖于任何机器的系统时间,而是依赖于数据本身(Timestamp作为一个属性嵌入到数据中)。简单来说,这相当于任务处理的时候自己本身是没有时钟的,所以只好来一个数据就问一下“现在几点了”;而数据本身也没有表,只有一个自带的“出厂时间”,于是任务就基于这个时间来确定自己的时钟。由于流处理中数据是源源不断产生的,一般来说,先产生的数据也会先被处理,所以当任务不停地接到数据时,它们的时间戳也基本上是不断增长的,就可以代表时间的推进。
  当然这里有个前提,就是先产生的数据先被处理,这要求保证数据到达的顺序。但是由于分布式系统中网络传输延迟的不确定性,实际应用中数据流往往是乱序的。在这种情况下,就不能简单地把数据自带的时间戳当作时钟了,而需要用另外的标志来表示事件时间进展,在 Flink 中把它叫作事件时间的水位线(Watermarks)。

1.2 两种时间语义的对比

  在分布式环境中,处理时间其实是不确定的,各个并行任务时钟不统一;而且由于网络延迟,导致数据到达各个算子任务的时间有快有慢,对于窗口操作就可能收集不到正确的数据了,数据处理的顺序也会被打乱。这就会影响到计算结果的正确性。所以处理时间语义,一般用在对实时性要求极高、而对计算准确性要求不太高的场景
  而在事件时间语义下,水位线成为了时钟,可以统一控制时间的进度。这就保证将数据划分到正确的窗口中,比如 8 点 59 分 59 秒产生的数据,无论网络传输的延迟是多少,它永远属于 8 点~9 点的窗口,不会错分。但数据还可能是乱序的,要想让窗口正确地收集到所有数据,就必须等这些错乱的数据都到齐,这就需要一定的等待时间。所以整体上看,事件时间语义是以一定延迟为代价,换来了处理结果的正确性。由于网络延迟一般只有毫秒级,所以即使是事件时间语义,同样可以完成低延迟实时流处理的任务。
  另外,除了事件时间和处理时间,Flink 还有一个摄入时间(Ingestion Time)的概念,它是指数据进入 Flink 数据流的时间,也就是 Source 算子读入数据的时间。摄入时间相当于是事件时间和处理时间的一个中和,它是把 Source 任务的处理时间,当作了数据的产生时间添加到数据里。这样一来,水位线(watermark)就基于这个时间直接生成,不需要单独指定了。这种时间语义可以保证比较好的正确性,同时又不会引入太大的延迟。它的具体行为跟事件时间非常像,可以当作特殊的事件时间来处理。
  在 Flink 中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从 1.12 版本开始,Flink 已经将事件时间作为了默认的时间语义


2. 水位线(Watermark)

2.1 事件时间和窗口

  事件时间语义下,基于数据的时间戳,自定义了一个逻辑时钟。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。计算的过程可以完全不依赖处理时间(系统时间),不论什么时候进行统计处理,得到的结果都是正确的。

  如图:事件时间语义类似于物流发车,到达车上的商品,生产时间是8 点 05 分,那么当前车上的时间就是 8 点 05 分;又来了一个 8 点 10 分生产的商品,现在车上的时间就是 8 点 10 分。直接用数据的时间戳来指示当前的时间进展,窗口的关闭自然以数据的时间戳等于窗口结束时间为准,这就不受网络传输延迟的影响了。对于8 点 59 分 59 秒生产出来的商品,到车上的时候不管实际时间(系统时间)是几点,当前时间就是 8 点 59 分 59 秒,所以它总是能赶上车的;而 9 点这班车,要等到 9 点整生产的商品到来,才认为时间到了 9 点,这时才正式发车。

2.2 水位线

  在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以更新自己的时钟了。由于类似于水流中用来做标志的记号,在 Flink 中,这种用来衡量事
件时间(Event Time)进展的标记,就被称作水位线(Watermark)

2.2.1 有序流中的水位线

  在理想状态下,数据应该按照它们生成的先后顺序、排好队进入流中;也就是说,它们处理的过程会保持原先的顺序不变,遵守先来后到的原则。从每个数据中提取时间戳,就可以保证总是从小到大增长的,从而插入的水位线也会不断增长、事件时钟不断向前推进。
  实际应用中,如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一条数据就提取时间戳、插入水位线就做了大量的无用功。而且即使时间戳不同,同时涌来的数据时间差会非常小(比如几毫秒),往往对处理计算也没什么影响。所以为了提高效率,一般会每隔一段时间生成一个水位线,这个水位线的时间戳,就是当前最新数据的时间戳

注意:对于水位线的周期性生成,周期时间是指处理时间(系统时间),而不是事件时间

2.2.2 乱序流中的水位线

  乱序指数据的先后顺序不一致,主要是基于数据的产生时间而言的

  乱序流中的水位线,插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线

  如果考虑到大量数据同时到来的处理效率,同样可以周期性地生成水位线。这时只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线

  对于迟到数据,为了让窗口能够正确收集到迟到的数据,可以等上若干秒;即用当前已有数据的最大时间戳减去 若干秒,就是要插入的水位线的时间戳,如图水位线设置延迟2秒

注:
  一个窗口所收集的数据,并不是之前所有已经到达的数据。因为数据属于哪个窗口,是由数据本身的时间戳决定的,一个窗口只会收集真正属于它的那些数据。如图尽管水位线 W(20)之前有时间戳为 22 的数据到来,10~20 秒的窗口中也不会收集这个数据,进行计算依然可以得到正确的结果

2.3 水位线特点

(1)水位线是插入到数据流中的一个标志
(2)水位线主要内容是一个时间戳,用来表示当前事件时间的进展
(3)水位线是基于数据的时间戳产生的
(4)水位线的时间戳必须是单调递增的,来保障,以确保任务的事件时间时钟向前推进
(5)水位线可以设置延迟,来保证正确处理乱序数据
(6)一个水位线 Watermark(t),表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据


3. 如何生成水位线

3.1 生成水位线的总体原则

  由于网络传输的延迟不确定,为了获取所有迟到数据,只能设置延迟时间。但等待的时间越长,处理的实时性越低。因此可以单独创建一个 Flink 作业来监控事件流,建立概率分布或者机器学习模型,学习事件的迟到规律。得到分布规律之后,就可以选择置信区间来确定延迟,作为水位线的生成策略了。如,数据的迟到时间服从μ=1,σ=1 的正态分布,那么设置水位线延迟为 3 秒,就可以保证至少 97.7%的数据可以正确处理

3.2 水位线生成策略(Watermark Strategies)

  在 Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间:

stream.assignTimestampsAndWatermarks()


  .assignTimestampsAndWatermarks()方法需要传入一个 WatermarkStrategy 作为参数,这就是 所 谓 的 水 位 线 生 成 策 略 WatermarkStrategy 中 包 含 了 一 个 时 间 戳 分 配器TimestampAssigner 和一个水位线生成器WatermarkGenerator

TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础

WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()

onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,以及允许发出水位线的一个 WatermarkOutput,可以基于事件做各种操作

onPeriodicEmit:周期性调用的方法,可以由 WatermarkOutput 发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为200ms

env.getConfig().setAutoWatermarkInterval(60 * 1000L);

3.3 Flink 内置水位线生成器

3.3.1 有序流

  调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现,直接拿当前最大的时间戳作为水位线

  调用.withTimestampAssigner()方法,将数据中的 timestamp 字段提取出来,作为时间戳分配给数据元素;然后用内置的有序流水位线生成器构造出了生成策略


3.3.2 乱序流

  由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个 maxOutOfOrderness 参数,表示最大乱序程度,它表示数据流中乱序数据时间戳的最大差值

注:
  事实上,有序流的水位线生成器本质上和乱序流是一样的,相当于延迟设为 0 的乱序流水位线生成器,两者完全等同:

WatermarkStrategy.forMonotonousTimestamps()
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))

注:
  乱序流中生成的水位线真正的时间戳,其实是 当前最大时间戳 – 延迟时间 – 1,单位毫秒。为什么要减 1 毫秒呢?时间戳为 t 的水位线,表示时间戳≤t 的数据全部到齐,不会再来了。如果考虑有序流,也就是延迟时间为 0 的情况,那么时间戳为 7 秒的数据到来时,之后其实是还有可能继续来 7 秒的数据的;所以生成的水位线不是 7 秒,而是 6 秒 999 毫秒,7 秒的数据还可以继续来

BoundedOutOfOrdernessWatermarks源码:

3.4 自定义水位线策略

(1)周期性水位线生成器(Periodic Generator)
  周期性生成器一般是通过 onEvent()观察判断输入的事件,而在 onPeriodicEmit()里发出水
位线

// 自定义水位线的产生
public class CustomWatermarkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.addSource(new ClickSource()).assignTimestampsAndWatermarks(new CustomWatermarkStrategy()).print();env.execute();}public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段}};}@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new CustomPeriodicGenerator();}}public static class CustomPeriodicGenerator implements WatermarkGenerator<Event> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳@Overridepublic void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认 200ms 调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}}
}

(2)断点式水位线生成器(Punctuated Generator)
  断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的特殊事件时,就立即发出水位线。一般来说,断点式生成器不会通过 onPeriodicEmit()发出水位线

    public class CustomPunctuatedGenerator implements WatermarkGenerator<Event> {@Overridepublic void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {// 只有在遇到特定的 itemId 时,才发出水位线if (r.user.equals("Mary")) {output.emitWatermark(new Watermark(r.timestamp - 1));}}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线}}

4. 水位线的传递

  上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。这样,后续任务就不需要依赖原始数据中的时间戳(经过转化处理后,数据可能已经改变了),也可以知道当前事件时间

  在重分区(redistributing)的传输模式下

  当前任务的上游,有四个并行子任务,所以会接收到来自四个分区的水位线;而下游有三
个并行子任务,所以会向三个分区发出水位线。
具体过程如下:
  (1)上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个分区水位线(Partition Watermark),这是一个分区时钟;而当前任务的时钟,就是所有分区时钟里最小的那个
  (2)当有一个新的水位线(第一分区的 4)从上游传来时,当前任务会首先更新对应的分区时钟;然后再判断所有分区时钟中的最小值,如果比之前大,说明事件时间有了进展,当前任务的时钟也就可以更新了。这里要注意,更新后的任务时钟,并不一定是新来的那个分区水位线,比如这里改变的是第一分区的时钟,但最小的分区时钟是第三分区的 3,于是当前任务时钟就推进到了 3。当时钟有进展时,当前任务就会将自己的时钟以水位线的形式,广播给下游所有子任务
  (3)再次收到新的水位线(第二分区的 7)后,执行同样的处理流程。首先将第二个分区时钟更新为 7,然后比较所有分区时钟;发现最小值没有变化,那么当前任务的时钟也不变,也不会向下游任务发出水位线。
  (4)同样道理,当又一次收到新的水位线(第三分区的 6)之后,第三个分区时钟更新为6,同时所有分区时钟最小值变成了第一分区的 4,所以当前任务的时钟推进到 4,并发出时间戳为 4 的水位线,广播到下游各个分区任务

总之,每个任务都以处理完之前所有数据为标准来确定自己的时钟,就可以保证窗口处理的结果总是正确的;
水位线的默认计算公式:水位线 = 观察到的最大事件时间 – 最大延迟时间 – 1 毫秒

注意:
  在数据流开始之前,Flink 会插入一个大小是负无穷大(在 Java 中是-Long.MAX_VALUE)的水位线,而在数据流结束时,Flink 会插入一个正无穷大(Long.MAX_VALUE)的水位线,保证所有的窗口闭合以及所有的定时器都被触发


flink 时间语义、水位线(Watermark)、生成水位线、水位线的传递相关推荐

  1. Flink 时间语义与水位线(Watermarks)

    文章目录 时间语义 水位线(Watermarks) 时间语义 对于流式数据处理,最大的特点就是数据上具有时间的属性特征,Flink根据时间产生的位置不同,将时间区分为如下三种时间概念 事件时间(Eve ...

  2. Flink时间语义与watermark的原理

    时间语义 我们可以直接在代码中,对执行环境调用setStreamTimeCharacteristic方法,设置流的时间特性 val env: StreamExecutionEnvironment = ...

  3. 大数据——Flink 时间语义

    目录 一.时间语义 1.1 三种时间概念 1.1.1 ProcessTime 在代码中的使用 1.1.2 EventTime 在代码中的使用 1.1.3 关于窗口起始时间的计算值 二.对事件的处理 2 ...

  4. 最新 Flink 1.13 时间和窗口(时间语义、Watermark、Window 窗口、Trigger)快速入门、详细教程

    时间和窗口 文章目录 时间和窗口 一.Flink 的三种时间语义 二.水位线(Watermark) 1. Flink 中的 Watermark 机制 2. 如何生成水位线 3. 水位线的传递 三.窗口 ...

  5. Flink难点解析:揭开Watermark的神秘面纱

    目录 一.时间 1.1 时间语义 1.1.1 Event Time 1.1.2 Ingestion Time 1.1.3 Processing Time 1.2 设置时间语义 二.Watermark ...

  6. 4.1.19 Flink-流处理框架-Flink中的时间语义和watermark水位线

    目录 1.Flink中的时间语义 1.1 EventTime 的代码设置 2.Watermark水位线 2.1 watermark的基本概念 2.2 watermark的特点和传递 2.3 Water ...

  7. Flink的时间语义和Watermark

    1 时间语义    数据迟到的概念是:数据先产生,但是处理的时候滞后了    在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:    Event Time:是事件创建的时间.它通常由事件 ...

  8. Flink之乱序处理,时间语义,WaterMark,允许迟到数据,侧输出流

    一.理解Flink的乱序问题 理解Flink的乱序问题,的先理解Flink的时间语义. Flink有3中时间语义:Event Time:事件创建的时间Ingestion Time:数据进入Flink的 ...

  9. Flink中的时间和窗口——时间语义

    文章目录 前言 一.时间语义 1.1.Flink 中的时间语义 1.1.1.处理时间(Processing Time) 1.1.2.事件时间(Event Time) 1.2.哪种时间语义更重要 1.2 ...

  10. 大数据_Flink_Java版_数据处理_时间语义(1)_时间语义概念---Flink工作笔记0049

    然后我们来看flink中的一个重要概念,时间语义 来看一下,一般情况我们说的时间是,指的是系统的时间,比如数据是在系统时间是8点到9点到的,那么这个数据就属于8点到9点的窗口.如果这里的时间指的是,数 ...

最新文章

  1. Matlab与线性代数 -- 矩阵的连接
  2. 攻击 FreeIPA 域:对象枚举
  3. python中操作数据库中游标的使用方法
  4. 如何处理Partner function occurs less than specified in customizing error message
  5. 实验二 php基本语法1,实验二PHP基础.doc
  6. 爱数智慧荣获“阿里云2021年度优秀供应商” | 喜讯
  7. Git Windows下安装配置
  8. 解决办法:无法安装 /lib/x86_64-linux-gnu/libpng12.so.0 的新版本
  9. 利用hasOwnProperty实现的高效的javascript hashtable
  10. DWM1000DISCOVERY开发板简介
  11. h5页面生成图片分享到微信js_H5网页实现微信分享功能
  12. 052RINEX中N文件示例说明
  13. Scheduler 配置与注意事项
  14. Python OpenCV 图像平移,取经之旅第 10 天
  15. yoast seo_Yoast SEO vs All in a SEO Pack –最好的WordPress SEO插件是哪个?
  16. 前端进化史——The Evolution of Front End Development
  17. 几个重要的排列组合定理公式
  18. 理清互联网金融的脉络(一)
  19. OLED显示屏I2C接口
  20. 编译原理 - 词法分析

热门文章

  1. Meta-Tracker: Fast and Robust Online Adaptation for Visual Object Trackers
  2. 13个免费下载 SVG 图标网站
  3. 论文阅读17 | Cross-modality Person re-identification with Shared-Specific Feature Transfer
  4. 【0门槛】PR稿的自我修养
  5. Gossip协议笔记--谣言、流行病协议
  6. 51单片机DS18B20(单总线)温度读取
  7. 单片机开发怎么把杜邦线弄整齐?
  8. 使用FFMPEG5.0和SDL2.0编写视频简单播放器
  9. python实现 stft_python scipy signal.stft用法及代码示例
  10. Django默认用户模型类和父类 AbstractUser 介绍