文章目录

  • 生成水印
    • 水印策略介绍
    • 使用水印策略
    • 处理空闲源
    • 编写 `WatermarkGenerators`
      • 编写周期 WatermarkGenerator
      • 编写标点WatermarkGenerator
    • 水印策略和 Kafka 连接
    • 操作者如何处理水印
    • 已弃用的 AssignerWithPeriodicWatermarks 和 ssignerWithPunctuatedWatermarks

生成水印

在本节中,您将了解Flink为处理事件时间戳和水印提供的API。有关事件时间、处理时间和摄取时间的介绍,请参阅introduction to event time的相关介绍。

水印策略介绍

为了处理事件时间,flink需要知道每个事件的时间戳,这就意味着,流中的没个元素都需要有它自己的事件时间戳。通常是通过使用TimestampAssigner从元素的某些字段中提取时间戳来实现的。

时间戳分配与生成水印密切相关,水印告诉系统事件时间的进度。您可以通过指定水印生成器来配置它。

Flink API需要一个同时包含TimestampAssignerWatermarkGeneratorWatermarkStrategy。许多常见的策略都是现成的,作为水印策略的静态方法,但用户也可以在需要时构建自己的策略。许多常用策略作为 WatermarkStrategy 中的静态方法开箱即用,但用户也可以在需要时构建自己的策略。

为了完整性起见,下面是接口:

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{/*** Instantiates a {@link TimestampAssigner} for assigning timestamps according to this* strategy.*/@OverrideTimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);/*** Instantiates a WatermarkGenerator that generates watermarks according to this strategy.*/@OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

如前所述,您通常不会自己实现这个接口,而是使用 WatermarkStrategy 上的静态帮助器方法来实现常见的水印策略。例如,通过静态方法forBoundedOutOfOrderness提供,入参接收一个Duration类型的时间间隔,也就是我们可以接受的最大的延迟时间.使用这种延迟策略的时候需要我们对数据的延迟时间有一个大概的预估判断。

WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner((event, timestamp) -> event.f0);

指定一个TimestampAssigner是可选的,在大多数情况下,实际上您并不希望指定一个TimestampAssigner。例如,当使用Kafka或Kinesis时,你会直接从Kafka/Kinesis记录中获得时间戳。

注意:时间戳和水印都指定为自 Java 纪元 1970-01-01T00:00:00Z 以来的毫秒数。

使用水印策略

在 Flink 应用中有两个地方可以使用 WatermarkStrategy

  • 直接在源上使用
  • 在非源操作之后使用。

第一个选项更可取,因为它允许源利用水印中的分片/分区/分割的逻辑。然后源可以生成更加准确的时间水印。

直接在源上指定 WatermarkStrategy 通常意味着您必须使用特定源。请参阅Watermark Strategies and the Kafka Connector 以了解其在 Kafka 连接器上的工作方式以及有关每个分区水印如何在那里工作的更多详细信息。

第二个选项(在任意操作后设置 WatermarkStrategy)仅在您无法直接在源上设置策略时使用:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<MyEvent> stream = env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter(), typeInfo);DataStream<MyEvent> withTimestampsAndWatermarks = stream.filter( event -> event.severity() == WARNING ).assignTimestampsAndWatermarks(<watermark strategy>);withTimestampsAndWatermarks.keyBy( (event) -> event.getGroup() ).window(TumblingEventTimeWindows.of(Time.seconds(10))).reduce( (a, b) -> a.add(b) ).addSink(...);

以这种方式使用 WatermarkStrategy 会获取一个流并生成一个带有时间戳元素和水印的新流。如果原始流已经有时间戳或水印,时间戳分配器会覆盖它们。

处理空闲源

如果其中一个输入的分割/分区/分片在一段时间内没有任何事件,这意味着水印生成器不会获取任何新的水印信息。我们称之为空闲输入或空闲源。这是一个问题,因为您的其他分区可能仍然存在事件。在这种情况下,水印将被阻止向下游继续传播,因为向下游传播的水印是所有并行水印中的最小值,由于上游水印被阻止了,所以当前阶段永远就不会获取所有水印,这样最小水印就不会被计算出来,所以flink程序就不会继续向下执行。

要处理这个问题,您可以使用WatermarkStrategy来检测空闲状态并将输入标记为空闲状态。WatermarkStrategy为此提供了一个方便的解决策略:

WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1));

编写 WatermarkGenerators

TimestampAssigner 是一个从事件中提取字段值的简单函数,因此我们不需要详细查看它们。另一方面,WatermarkGenerator 的编写有点复杂,我们将在接下来的两节中介绍如何做到这一点。以下是 WatermarkGenerator 接口:

/*** The {@code WatermarkGenerator} generates watermarks either based on events or* periodically (in a fixed interval).** <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the* {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.*/
@Public
public interface WatermarkGenerator<T> {/*** Called for every event, allows the watermark generator to examine * and remember the event timestamps, or to emit a watermark based on* the event itself.*/void onEvent(T event, long eventTimestamp, WatermarkOutput output);/*** Called periodically, and might emit a new watermark, or not.** <p>The interval in which this method is called and Watermarks * are generated depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.*/void onPeriodicEmit(WatermarkOutput output);
}

有两种不同风格的水印生成方式:周期生成和标点生成。

周期生成器通常通过onEvent()监视传入事件,然后在框架调用onPeriodicEmit()时发出水印。

标点生成器通常通过onEvent()监视传入事件,并等待流中带有特殊标记或者标记符号的事件,当监视到这些事件之一,它会立即生成水印,通常,标点生成器不会从 onPeriodicEmit() 发出水印。

接下来,我们将看看如何为每种风格实现生成器。

编写周期 WatermarkGenerator

周期生成器观察流事件并周期性的生成时间水印。

生成水印的间隔(每 n 毫秒)通过 ExecutionConfig.setAutoWatermarkInterval(...) 定义。每次都会调用生成器的 onPeriodicEmit() 方法,如果返回的水印为非空且大于前一个水印,则会发出新的水印。

在这里,我们展示了两个使用周期性水印生成器生成水印的简单示例。注意,Flink附带了BoundedOutOfOrdernessWatermarks,这是一个与下面所示的BoundedOutOfOrdernessGenerator工作类似的水印生成器。你可以在这里读到如何使用它。

/*** This generator generates watermarks assuming that elements arrive out of order,* but only to a certain degree. The latest elements for a certain timestamp t will arrive* at most n milliseconds after the earliest elements for timestamp t.*/
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {private final long maxOutOfOrderness = 3500; // 3.5 secondsprivate long currentMaxTimestamp;@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// emit the watermark as current highest timestamp minus the out-of-orderness boundoutput.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));}}/*** This generator generates watermarks that are lagging behind processing time * by a fixed amount. It assumes that elements arrive in Flink after a bounded delay.*/
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {private final long maxTimeLag = 5000; // 5 seconds@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {// don't need to do anything because we work on processing time}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));}
}

编写标点WatermarkGenerator

标点水印生成器将观察事件流并在看到带有水印信息的特殊元素时发出水印

这就是如何实现一个加标点的生成器,每当事件表明它带有某个标记时,它就会发出水印:

public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {if (event.hasWatermarkMarker()) {output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));}}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// don't need to do anything because we emit in reaction to events above}
}

注意:可以在每个单独的事件上生成水印。但是,由于每个水印都会引起下游的一些计算,过多的水印会降低性能。

水印策略和 Kafka 连接

当使用Apache Kafka作为数据源时,每个Kafka分区可能有一个简单的事件时间模式(升序时间戳或有界无序)。然而,当从 Kafka 消费流时,多个分区通常会被并行消费,从而将来自分区的事件交织在一起,并破坏了每个分区的模式(这是Kafka的客户端工作方式中固有的)

在这种情况下,您可以使用 Flink 的 Kafka 分区感知水印生成。使用该功能,每个消费 Kafka 分区的消费者在其内部生成水印,并且每个分区水印的合并方式与水印在shuffle时的合并方式相同。

例如,如果每个 Kafka 分区的事件时间戳严格递增,则使用递增时间戳水印生成器生成每个分区的水印将产生完美的整体水印。

下面的插图展示了如何使用每个kafka分区的水印生成,以及在这种情况下水印如何通过流数据传播。

FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy..forBoundedOutOfOrderness(Duration.ofSeconds(20)));DataStream<MyType> stream = env.addSource(kafkaSource);

操作者如何处理水印

一般来说,操作员在将给定的水印转发给下游之前,需要对其进行处理。例如,WindowOperator 将首先评估所有应该被触发的窗口,由水印触发所产生的所有事件全部被输出后,水印本身才会被发送到下游。换句话说,由于水印的出现而产生的所有元素都将在水印之前发送。

同样的规则也适用于TwoInputStreamOperator,但是,在这种情况下,操作符的当前水印被定义为其两个输入的最小值。

已弃用的 AssignerWithPeriodicWatermarks 和 ssignerWithPunctuatedWatermarks

在介绍 WatermarkStrategyTimestampAssignerWatermarkGenerator 的抽象之前,Flink 使用了 AssignerWithPeriodicWatermarksAssignerWithPunctuatedWatermarks。您仍然会在 API 中看到它们,但建议使用新接口,因为它们提供了更清晰的关注点分离,并且统一了水印生成的周期性和标点样式。

flink DataStream API(三)事件时间-生成水印相关推荐

  1. Flink事件时间和水印详解

    前言 Flink使用版本:1.12.1.   水印是一个标记的时间戳,是一个标记:意味着水印代表时间前的数据均已到达(人为的设定--开发人员可以控制延迟和完整性之间的权衡),这一点水印保障了乱序问题的 ...

  2. Flink事件时间、水印以及迟到数据处理的个人理解

    Flink中的时间概念 Flink在流式传输程序中支持不同的时间概念: ProcessingTime: 处理时间,正在执行操作的机器的时间 EventTime: 事件时间,事件发生的时间 Ingest ...

  3. flink设置watermark以及事件时间字段源码分析

    flink设置watermark以及事件时间字段源码分析 背景 1.1.提取时间戳字段,用于事件时间语义处理数据 1.2.设置水位线(水印)watermark TimestampAssigner 核心 ...

  4. flink DataStream API使用及原理

    传统的大数据处理方式一般是批处理式的,也就是说,今天所收集的数据,我们明天再把今天收集到的数据算出来,以供大家使用,但是在很多情况下,数据的时效性对于业务的成败是非常关键的. Spark 和 Flin ...

  5. Flink DataStream API 介绍

    Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:" ...

  6. 【基础】Flink -- DataStream API

    Flink -- DataStream API 执行环境 Execution Environment 创建执行环境 设置执行模式 触发程序执行 源算子 Source 从集合中读取数据 从文件读取数据 ...

  7. flink DataStream API(三)事件时间-内置水印生成器

    文章目录 内置水印生成器 单调递增的时间戳 固定的延迟时间 内置水印生成器 如生成水印中所述,Flink提供了抽象,允许程序员分配他们自己的时间戳并发出他们自己的水印.更具体地说,可以通过实现Wate ...

  8. flink中的事件时间和水印

    引言 在Flink流式引擎消费平台的项目中遇到数据处理顺序错乱的问题,导致项目处于一个不可用状态.本文记录了流式引擎中处理乱序方案中的基础知识点. 正文 一 事件时间(EventTime)和水印(Wa ...

  9. Flink DataStream API(基础版)

    概述   DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的 Flink 代码其实就是基于这种数据类型的处理,所以这套核心API 就以DataStr ...

最新文章

  1. 初步学习TypeScript
  2. 12无法使用otg_ios设备该如何选择U盘,以及U盘日常使用技巧
  3. 麦肯锡发布《中国互联网公司崛起报告》
  4. 配置MySQL8.0 环境变量
  5. JAXB自定义绑定– Java.util.Date / Spring 3序列化
  6. matlab求傅里叶级数展开式_连续时间的傅里叶级数
  7. oracle中的日期查询在mybatis中写法
  8. c语言-基本计算 pm2.5,C语言程序设计题(A卷).doc
  9. HDU-4618 Palindrome Sub-Array 暴力枚举
  10. 力扣904-水果成篮(C++,总结别人的思路)
  11. 深入理解JVM的内存区域划分
  12. Element UI的表格table列的宽度自适应设置
  13. 【Django 2021年最新版教程9】数据库查询操作
  14. 主板怎么开启csm_主板的Launch CSM的开启和关闭是什么作用?会有哪些影响?
  15. [希腊神话--英语]另类重看英语词汇---序言
  16. 港澳台、大陆身份证正则表达式
  17. 6-32 表头插入法构造链表
  18. 游戏‘微信打飞机’ 第二课
  19. Android 麦克风录音动画
  20. 51nod 1299 监狱逃离 树形dp

热门文章

  1. vs2019工程中打开ui文件就卡死
  2. 一行代码解决各种IE兼容问题
  3. RANSAC 算法拟合平面
  4. 显示控件——字符显示之滚动文本
  5. 工会大数据:推进工会“大系统”整合步伐
  6. 【MATLAB】求导数
  7. 【打CF,学算法】CodeForces网站简介
  8. 为什么很多程序员宁可打游戏也不追女生
  9. 编译vc10版(VS2010)opencv2/3
  10. TEC1303.Form个性化技术总结 - 第一部分 Form个性化技术讲解