flink watermark 生成机制与总结

  • watermark 介绍
    • flink时间概念
      • 1. 分配器(Window Assinger)
      • 2. 触发器(Trigger)
      • 3.驱逐器(Evictor)
  • watermark生成方式
  • 迟到时间处理
  • FlinkSql 中的watermark
  • 引出问题与源码分析

watermark 介绍

本质上watermark是flink为了处理eventTime窗口计算提出的一种机制,本质上也是一种时间戳,由flink souce或者自定义的watermark生成器按照需求定期或者按条件生成一种系统event,与普通数据流event一样流转到对应的下游operations,接收到watermark数据的operator以此不断调整自己管理的window event time clock。

首先,eventTime计算意味着flink必须有一个地方用于抽取每条消息中自带的时间戳,所以TimestampAssigner的实现类都要具体实现
long extractTimestamp(T element, long previousElementTimestamp);方法用来抽取当前元素的eventTime,这个eventTime会用来决定元素落到下游的哪个或者哪几个window中进行计算。

其次,在数据进入window前,需要有一个Watermarker生成当前的event time对应的水位线,flink支持两种后置的Watermarker:Periodic和Punctuated,一种是定期产生watermark(即使没有消息产生),一种是在满足特定情况的前提下触发。两种Watermark分别需要实现接口为
Watermark getCurrentWatermark()和Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);

flink时间概念

Flink 在流应⽤程序中三种 Time 概念

Time 类型 解释
Processing Time 事件被机器处理的系统时间,提供最好的性能和最低的延迟。分支式异步环境下,容易受到事件到达系统的速度,事件在系统内操作流动速度以及中断的影响。
Event Time 一般指数据本身携带的时间戳,能够满足在特定场景下数据准确性的需求。一般而言与 Processing Time 有时间延迟,需要引入水印机制处理事件乱序和时间乱序问题。
Ingestion Time 事件进入 Flink 的时间。一般在 Flink Source 定义,提供给下游窗口计算的触发计算。

⼀般来说,在⽣产环境中 Event Time 与 Processing Time 是常用的策略。


watermark最重要的就是与window配合来实现实时数据的计算,对于window的组成主要有三个成员,也是写代码时经常写缺不知道具体含义的三个方法。

1. 分配器(Window Assinger)

窗口分配器定义了数据流中的元素如何分配到窗口中,通过在分组数据流中调用 .window(…) 或者非分组数据流中调用 .windowAll(…) 时指定窗口分配器(WindowAssigner)来实现。WindowAssigner 负责将每一个到来的元素分配给一个或者多个窗口(window), Flink 提供了一些常用的预定义的窗口分配器,即:滚动窗口、滑动窗口、会话窗口和全局窗口。你也可以通过继承 WindowAssigner 类来自定义自己的分配器。

Assinger 备注
GlobalWindows 所有的数据都分配到同一个窗口。
MergingWindowAssigner 可 Merge 的窗口分配处理。
SlidingProcessingTimeWindows 基于 Processing Time 的滚动窗口分配处理。
SlidingEventTimeWindows 基于 Event Time 的滚动窗口分配处理。
TumblingProcessingTimeWindows 基于 Processing Time 的滑动窗口分配处理。
TumblingEventTimeWindows 基于 Event Time 的滑动窗口分配处理。
ProcessingTimeSessionWindows 基于 Processing Time 且可 merge 的会话窗口分配处理。
EventTimeSessionWindows 基于 Event Time 且可 merge 会话窗口分配处理。

2. 触发器(Trigger)

触发器决定了一个窗口何时可以被窗口函数处理,每一个窗口分配器都有一个默认的触发器,该触发器决定合适计算和清除窗口。如果默认的触发器不能满足你的需要,你可以通过调用 trigger(…)来指定一个自定义的触发器。触发器的接口有5个方法来允许触发器处理不同的事件:

  • onElement()方法,每个元素被添加到窗口时调用
  • onEventTime()方法,当一个已注册的事件时间计时器启动时调用
  • onProcessingTime()方法,当一个已注册的处理时间计时器启动时调用
  • onMerge()方法,与状态性触发器相关,当使用会话窗口时,两个触发器对应的窗口合并时,合并两个触发器的状态。

每个触发动作的返回结果⽤ TriggerResult 定。TriggerResult 有四种状态:

  • CONTINUE:什么也不做
  • FIRE:触发计算
  • PUGE:清除窗口中的数据
  • FIRE_AND_PURGE:触发计算并清除窗口中的数据
Trigger 备注
EventTimeTrigger 当水印通过窗口末尾时触发的触发器。
ProcessingTimeTrigger 当系统时间通过窗口末尾时触发的触发器。
CountTrigger 窗口元素达到阈值触发的触发器。
PurgingTrigger 作为参数,使其成为带有清除功能触发器。
DeltaTrigger 基于 DeltaFunction 和一个阈值的触发器。

3.驱逐器(Evictor)

Flink 的窗口模型允许指定一个除了 WindowAssigner 和 Trigger 之外的可选参数 Evitor,这个可以通过调用 evitor(…) 方法来实现。这个驱逐器(evitor)可以在触发器触发之前或者之后,或者窗口函数被应用之前清理窗口中的元素。如果没有定义 Evictor,触发器直接将所有窗⼝元素交给计算函数。

Evictor 备注
TimeEvitor 清除时间戳小于窗口元素中的最大时间戳 - interval的元素。
CountEvitor 只保存指定数量的数据。
DeltaEvitor 通过一个 DeltaFunction 和一个阈值,计算窗口缓存中最近的一个元素和剩余的所有元素的 delta 值,并清除 delta 值大于或者等于阈值的元素。

watermark生成方式

在 Flink 中,数据处理中需要通过调⽤ DataStream 中的 assignTimestampsAndWatermarks ⽅法来分配时间和⽔印,该⽅法可以传⼊两种参数,⼀个是 AssignerWithPeriodicWatermarks,另⼀个是 AssignerWithPunctuatedWatermarks,通常建议在数据源(source)之后就进⾏⽣成⽔印,或者做些简单操作⽐如 filter/map/flatMap 之 后再⽣成⽔印,越早⽣成⽔印的效果会更好,也可以直接在数据源头就做⽣成⽔印。

  • With Periodic Watermarks(常用):周期性(一定时间间隔或者达到一定的记录条数)生成watermark

    • 需要实现AssignerWithPeriodicWatermarks接口
    • 默认周期是200ms,可通过env.getConfig.setAutoWatermarkInterval进行修改
    • 实际生产环境用得多,但必须结合时间或者累计条数两个维度,否则在极端情况下会有很大的延时
  • With Punctuated Watermarks(不常用):在满足自定义条件时生成watermark,每一个元素都有机会判断是否生成一个watermark。

    • 需要实现AssignerWithPunctuatedWatermarks接口
    • 在TPS很高的生产环境下会产生大量的 Watermark,可能在一定程度上对下游算子造成一定的压力,只有在实时性很高的场景才会选择这种方式来进行生成水印
    • 新版 Flink 源码中已经标记为 @Deprecated

迟到时间处理

  • 直接丢弃:将迟到事件视为错误消息并丢弃(flink默认处理方式)。
  • Side Output机制:可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。
  • Allowed Lateness机制:允许用户设置一个允许的最大迟到时长。Flink 会再窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。

FlinkSql 中的watermark

在创建表的 DDL 中定义
事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 Watermark 生成表达式,同时标记这个已有字段为时间属性字段。

CREATE TABLE user_actions (
user_name STRING,data STRING,
user_action_time TIMESTAMP(3),-- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

如果源中的时间戳数据表示为一个 epoch time,通常是一个长值,例如 1618989564564,建议将事件时间属性定义为 TIMESTAMP_LTZ 列

CREATE TABLE user_actions (
user_name STRING,data STRING,
ts BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),-- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);

引出问题与源码分析

我们这里来引出一个问题,不能排除有同学就是想用ProcessTime。

那么问题来了,EventTime 情况的watermark 很好理解,
可是ProcessTime的watermark到底做了什么,也不需要用它来过滤数据,本来就没有用数据内的时间,根本就不知道数据的顺序,更谈不上乱序了,那ProcessTime起了什么用呢?

我们首先来看下env.setStreamTimeCharacteristic() 这个方法

/*** Sets the time characteristic for all streams create from this environment, e.g., processing* time, event time, or ingestion time.** <p>If you set the characteristic to IngestionTime of EventTime this will set a default* watermark update interval of 200 ms. If this is not applicable for your application* you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.** @param characteristic The time characteristic.*/@PublicEvolvingpublic void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {this.timeCharacteristic = Preconditions.checkNotNull(characteristic);if (characteristic == TimeCharacteristic.ProcessingTime) {getConfig().setAutoWatermarkInterval(0);} else {getConfig().setAutoWatermarkInterval(200);}}

这个方法设置用户使用的是eventtime还是processtime

由源码可以看到 ,如果设置的是ProcessingTime ,会把autoWatermarkInterval这个属性值设为0,如果是EventTime,会设置为 200,我们追踪这个值发现,用户自定义的watermark类,需要注册在assignTimestampsAndWatermarks中,而在assignTimestampsAndWatermarks类中能够找到TimestampsAndPeriodicWatermarksOperator,
TimestampsAndPeriodicWatermarksOperator的open方法中有autoWatermarkInterval这个属性值。

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {// match parallelism to input, otherwise dop=1 sources could lead to some strange// behaviour: the watermark will creep along very slowly because the elements// from the source go to each extraction operator round robin.final int inputParallelism = getTransformation().getParallelism();final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);TimestampsAndPeriodicWatermarksOperator<T> operator =new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator).setParallelism(inputParallelism);}
@Overridepublic void open() throws Exception {super.open();currentWatermark = Long.MIN_VALUE;watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();if (watermarkInterval > 0) {long now = getProcessingTimeService().getCurrentProcessingTime();getProcessingTimeService().registerTimer(now + watermarkInterval, this);}}

我们来看这个open的初始方法,这个open是TimestampsAndPeriodicWatermarksOperator的初始方法,其实也是assignTimestampsAndWatermarks启动的条件,这个open给定了watermark的初始值。

这里初始化了两个值,

  • 一个是其实的watermark的初始值,最小的long值,-9223372036854775808
  • 另一个是初始的 watermark的间隔 如果是 EventTime就是当前时间加200ms,如果是ProcessTime就是当前时间。

再来看下面的定时任务

 @Overridepublic void onProcessingTime(long timestamp) throws Exception {// register next timerWatermark newWatermark = userFunction.getCurrentWatermark();if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {currentWatermark = newWatermark.getTimestamp();// emit watermarkoutput.emitWatermark(newWatermark);}long now = getProcessingTimeService().getCurrentProcessingTime();getProcessingTimeService().registerTimer(now + watermarkInterval, this);}

再来看下我们自定义注册的watermark方法

     @Overridepublic Watermark getCurrentWatermark() {// return the watermark as current highest timestamp minus the out-of-orderness boundreturn new Watermark(currentMaxTimestamp - maxOutOfOrderness);}

autoWatermarkInterval为0的话 super.open() 不会被调用

这里面now,就是System.currentTimeMillis(); 所以如果时间间隔不为0,那么下一次调用的时间就是 当前时间 + 方法运行的时间 + 时间间隔,由于方法运行的时间约等于0ms,所以基本就是每个时间间隔(默认200ms),运行一次获取wakermark的方法。

所以如果是ProcessingTime,那么默认时间间隔是0,所以matermarks时间就是一直-9223372036854775808,所以就一直不会过滤时间。
所以想要启动ProcessingTime 来做 时间戳 ,就一定要设置
env.getConfig().setAutoWatermarkInterval(200);

flink watermark 生成机制与总结相关推荐

  1. Flink自定义生成 Watermark

    Watermark 策略简介 # 为了使用事件时间语义,Flink 应用程序需要知道事件时间戳对应的字段,意味着数据流中的每个元素都需要拥有可分配的事件时间戳.其通常通过使用 TimestampAss ...

  2. 【Flink】Flink 多并行度下的 watermark触发机制

    1.案例 /*** 测试点:测试多 多并行度下的 watermark触发机制* 参考:链接:https://juejin.im/post/5bf95810e51d452d705fef33** @thr ...

  3. Flink1.11 intervalJoin watermark生成,状态清理机制源码理解Demo分析

    参考博客 https://cloud.tencent.com/developer/article/1738836 数据类型为左流 FlinkClick(userid=gk01, click=Pay, ...

  4. Flink Watermark 源码分析

    随着 flink 的快速发展与 API 的迭代导致新老版本差别巨大遂重拾 flink,在回顾到时间语义时对 watermark 有了不一样的理解. 一.如何生成 在 flink 1.12(第一次学习的 ...

  5. Flink WaterMark 详解

    摘录仅供学习使用,原文来自: Flink详解系列之五--水位线(watermark) - 简书 1.概念 在Flink中,水位线是一种衡量Event Time进展的机制,用来处理实时数据中的乱序问题的 ...

  6. 【Flink】 Flink JobManager HA 机制的扩展与实现

    1.概述 转载:Flink 源码阅读笔记(21)- Flink JobManager HA 机制的扩展与实现 在 Flink 1.12 中,Flink on Kubernetes 的 Native 部 ...

  7. Flink的CheckPoint机制

    这里已经是Flink的第三篇原创啦.第一篇:Flink入门讲解了Flink的基础和相关概念,第二篇:压背原理,讲解了什么是背压,在Flink背压大概的流程是怎么样的. 这篇来讲Flink另一个比较重要 ...

  8. [源码解析] 从TimeoutException看Flink的心跳机制

    [源码解析] 从TimeoutException看Flink的心跳机制 文章目录 [源码解析] 从TimeoutException看Flink的心跳机制 0x00 摘要 0x01 缘由 0x02 背景 ...

  9. Flink中容错机制 完整使用 (第十章)

    Flink中容错机制 完整使用 一.容错机制 1.检查点(Checkpoint) 1. 检查点的保存 1. 周期性的触发保存 2. 保存的时间点 3. 保存的具体流程 2.从检查点恢复状态 (1)重启 ...

  10. JBPM对象主键生成机制

       什么是主键 我们在建立数据库的时候,需要为每张表指定一个主键,所谓主键就是能够唯一标识表中某一行的属性或属性组,一个表只能有一个主键,但可以有多个候选索引.因为主键可以唯一标识某一行记录,所以可 ...

最新文章

  1. 7 Papers Radios | NeurIPS 2020最佳论文;全卷积网络实现E2E目标检测
  2. 如何关注掘金的所有小伙伴
  3. Vue 中的作用域插槽
  4. 修改echarts环形图的牵引线及文字位置
  5. 如何对计算属性进行修改_「计算摄影」计算机如何学会自动地进行图像美学增强?...
  6. 3、Eternal框架-控制器
  7. NB朴素贝叶斯进行中文文本分类
  8. dell 戴尔电脑官网保修期查询或驱动下载安装
  9. laravel添加语言包
  10. ANSYS 有限元分析 加载/求解/输出
  11. numpy.pad对图片进行填充
  12. 吐槽智能手机上那些不爽的事
  13. NSMutableAttributedString
  14. 游戏c是什么网络语言,游戏cpdd网络用语是什么意思 王者荣耀里很常见
  15. Linux下优秀的音频编辑软件
  16. Android - 制作聊天气泡.9格式
  17. 2021-11-4 socket的通信过程
  18. 流媒体服务器主板型号怎么看,玩转NAS 篇五:双2.5G接口+J4125处理器,对于NAS意味着什么?威联通453Dmini对比453Bmini...
  19. 关于单边账的解释及解决(收单行业)
  20. Mybatis-Plus 条件构造器Wrapper常用方法

热门文章

  1. 《光剑文集》拾叶: 24首
  2. 软件测试常问面试题,你真的会搭建测试环境吗?
  3. windows操作快捷键
  4. python usb摄像头 截图_python实现摄像头远程截图功能
  5. 土方工程量计算表格excel_市政道路土方excel计算表(含公式)
  6. 贪心算法设计作业调度c语言,c语言贪心算法
  7. 长沙理工大学第十二届ACM大赛【9/12】
  8. WPS Word为PDF签名
  9. Gossip 协议详解
  10. Docker快速入门-腾讯云