flink watermark 生成机制与总结
flink watermark 生成机制与总结
- watermark 介绍
- flink时间概念
- 1. 分配器(Window Assinger)
- 2. 触发器(Trigger)
- 3.驱逐器(Evictor)
- watermark生成方式
- 迟到时间处理
- FlinkSql 中的watermark
- 引出问题与源码分析
watermark 介绍
本质上watermark是flink为了处理eventTime窗口计算提出的一种机制,本质上也是一种时间戳,由flink souce或者自定义的watermark生成器按照需求定期或者按条件生成一种系统event,与普通数据流event一样流转到对应的下游operations,接收到watermark数据的operator以此不断调整自己管理的window event time clock。
首先,eventTime计算意味着flink必须有一个地方用于抽取每条消息中自带的时间戳,所以TimestampAssigner的实现类都要具体实现
long extractTimestamp(T element, long previousElementTimestamp);方法用来抽取当前元素的eventTime,这个eventTime会用来决定元素落到下游的哪个或者哪几个window中进行计算。
其次,在数据进入window前,需要有一个Watermarker生成当前的event time对应的水位线,flink支持两种后置的Watermarker:Periodic和Punctuated,一种是定期产生watermark(即使没有消息产生),一种是在满足特定情况的前提下触发。两种Watermark分别需要实现接口为
Watermark getCurrentWatermark()和Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
flink时间概念
Flink 在流应⽤程序中三种 Time 概念
Time 类型 | 解释 |
---|---|
Processing Time | 事件被机器处理的系统时间,提供最好的性能和最低的延迟。分支式异步环境下,容易受到事件到达系统的速度,事件在系统内操作流动速度以及中断的影响。 |
Event Time | 一般指数据本身携带的时间戳,能够满足在特定场景下数据准确性的需求。一般而言与 Processing Time 有时间延迟,需要引入水印机制处理事件乱序和时间乱序问题。 |
Ingestion Time | 事件进入 Flink 的时间。一般在 Flink Source 定义,提供给下游窗口计算的触发计算。 |
⼀般来说,在⽣产环境中 Event Time 与 Processing Time 是常用的策略。
watermark最重要的就是与window配合来实现实时数据的计算,对于window的组成主要有三个成员,也是写代码时经常写缺不知道具体含义的三个方法。
1. 分配器(Window Assinger)
窗口分配器定义了数据流中的元素如何分配到窗口中,通过在分组数据流中调用 .window(…) 或者非分组数据流中调用 .windowAll(…) 时指定窗口分配器(WindowAssigner)来实现。WindowAssigner 负责将每一个到来的元素分配给一个或者多个窗口(window), Flink 提供了一些常用的预定义的窗口分配器,即:滚动窗口、滑动窗口、会话窗口和全局窗口。你也可以通过继承 WindowAssigner 类来自定义自己的分配器。
Assinger | 备注 |
---|---|
GlobalWindows | 所有的数据都分配到同一个窗口。 |
MergingWindowAssigner | 可 Merge 的窗口分配处理。 |
SlidingProcessingTimeWindows | 基于 Processing Time 的滚动窗口分配处理。 |
SlidingEventTimeWindows | 基于 Event Time 的滚动窗口分配处理。 |
TumblingProcessingTimeWindows | 基于 Processing Time 的滑动窗口分配处理。 |
TumblingEventTimeWindows | 基于 Event Time 的滑动窗口分配处理。 |
ProcessingTimeSessionWindows | 基于 Processing Time 且可 merge 的会话窗口分配处理。 |
EventTimeSessionWindows | 基于 Event Time 且可 merge 会话窗口分配处理。 |
2. 触发器(Trigger)
触发器决定了一个窗口何时可以被窗口函数处理,每一个窗口分配器都有一个默认的触发器,该触发器决定合适计算和清除窗口。如果默认的触发器不能满足你的需要,你可以通过调用 trigger(…)来指定一个自定义的触发器。触发器的接口有5个方法来允许触发器处理不同的事件:
- onElement()方法,每个元素被添加到窗口时调用
- onEventTime()方法,当一个已注册的事件时间计时器启动时调用
- onProcessingTime()方法,当一个已注册的处理时间计时器启动时调用
- onMerge()方法,与状态性触发器相关,当使用会话窗口时,两个触发器对应的窗口合并时,合并两个触发器的状态。
每个触发动作的返回结果⽤ TriggerResult 定。TriggerResult 有四种状态:
- CONTINUE:什么也不做
- FIRE:触发计算
- PUGE:清除窗口中的数据
- FIRE_AND_PURGE:触发计算并清除窗口中的数据
Trigger | 备注 |
---|---|
EventTimeTrigger | 当水印通过窗口末尾时触发的触发器。 |
ProcessingTimeTrigger | 当系统时间通过窗口末尾时触发的触发器。 |
CountTrigger | 窗口元素达到阈值触发的触发器。 |
PurgingTrigger | 作为参数,使其成为带有清除功能触发器。 |
DeltaTrigger | 基于 DeltaFunction 和一个阈值的触发器。 |
3.驱逐器(Evictor)
Flink 的窗口模型允许指定一个除了 WindowAssigner 和 Trigger 之外的可选参数 Evitor,这个可以通过调用 evitor(…) 方法来实现。这个驱逐器(evitor)可以在触发器触发之前或者之后,或者窗口函数被应用之前清理窗口中的元素。如果没有定义 Evictor,触发器直接将所有窗⼝元素交给计算函数。
Evictor | 备注 |
---|---|
TimeEvitor | 清除时间戳小于窗口元素中的最大时间戳 - interval的元素。 |
CountEvitor | 只保存指定数量的数据。 |
DeltaEvitor | 通过一个 DeltaFunction 和一个阈值,计算窗口缓存中最近的一个元素和剩余的所有元素的 delta 值,并清除 delta 值大于或者等于阈值的元素。 |
watermark生成方式
在 Flink 中,数据处理中需要通过调⽤ DataStream 中的 assignTimestampsAndWatermarks ⽅法来分配时间和⽔印,该⽅法可以传⼊两种参数,⼀个是 AssignerWithPeriodicWatermarks,另⼀个是 AssignerWithPunctuatedWatermarks,通常建议在数据源(source)之后就进⾏⽣成⽔印,或者做些简单操作⽐如 filter/map/flatMap 之 后再⽣成⽔印,越早⽣成⽔印的效果会更好,也可以直接在数据源头就做⽣成⽔印。
With Periodic Watermarks(常用):周期性(一定时间间隔或者达到一定的记录条数)生成watermark
- 需要实现AssignerWithPeriodicWatermarks接口
- 默认周期是200ms,可通过env.getConfig.setAutoWatermarkInterval进行修改
- 实际生产环境用得多,但必须结合时间或者累计条数两个维度,否则在极端情况下会有很大的延时
With Punctuated Watermarks(不常用):在满足自定义条件时生成watermark,每一个元素都有机会判断是否生成一个watermark。
- 需要实现AssignerWithPunctuatedWatermarks接口
- 在TPS很高的生产环境下会产生大量的 Watermark,可能在一定程度上对下游算子造成一定的压力,只有在实时性很高的场景才会选择这种方式来进行生成水印
- 新版 Flink 源码中已经标记为 @Deprecated
迟到时间处理
- 直接丢弃:将迟到事件视为错误消息并丢弃(flink默认处理方式)。
- Side Output机制:可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。
- Allowed Lateness机制:允许用户设置一个允许的最大迟到时长。Flink 会再窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。
FlinkSql 中的watermark
在创建表的 DDL 中定义
事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 Watermark 生成表达式,同时标记这个已有字段为时间属性字段。
CREATE TABLE user_actions (
user_name STRING,data STRING,
user_action_time TIMESTAMP(3),-- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
如果源中的时间戳数据表示为一个 epoch time,通常是一个长值,例如 1618989564564,建议将事件时间属性定义为 TIMESTAMP_LTZ 列
CREATE TABLE user_actions (
user_name STRING,data STRING,
ts BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),-- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
引出问题与源码分析
我们这里来引出一个问题,不能排除有同学就是想用ProcessTime。
那么问题来了,EventTime 情况的watermark 很好理解,
可是ProcessTime的watermark到底做了什么,也不需要用它来过滤数据,本来就没有用数据内的时间,根本就不知道数据的顺序,更谈不上乱序了,那ProcessTime起了什么用呢?
我们首先来看下env.setStreamTimeCharacteristic() 这个方法
/*** Sets the time characteristic for all streams create from this environment, e.g., processing* time, event time, or ingestion time.** <p>If you set the characteristic to IngestionTime of EventTime this will set a default* watermark update interval of 200 ms. If this is not applicable for your application* you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.** @param characteristic The time characteristic.*/@PublicEvolvingpublic void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {this.timeCharacteristic = Preconditions.checkNotNull(characteristic);if (characteristic == TimeCharacteristic.ProcessingTime) {getConfig().setAutoWatermarkInterval(0);} else {getConfig().setAutoWatermarkInterval(200);}}
这个方法设置用户使用的是eventtime还是processtime
由源码可以看到 ,如果设置的是ProcessingTime ,会把autoWatermarkInterval这个属性值设为0,如果是EventTime,会设置为 200,我们追踪这个值发现,用户自定义的watermark类,需要注册在assignTimestampsAndWatermarks中,而在assignTimestampsAndWatermarks类中能够找到TimestampsAndPeriodicWatermarksOperator,
TimestampsAndPeriodicWatermarksOperator的open方法中有autoWatermarkInterval这个属性值。
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {// match parallelism to input, otherwise dop=1 sources could lead to some strange// behaviour: the watermark will creep along very slowly because the elements// from the source go to each extraction operator round robin.final int inputParallelism = getTransformation().getParallelism();final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);TimestampsAndPeriodicWatermarksOperator<T> operator =new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator).setParallelism(inputParallelism);}
@Overridepublic void open() throws Exception {super.open();currentWatermark = Long.MIN_VALUE;watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();if (watermarkInterval > 0) {long now = getProcessingTimeService().getCurrentProcessingTime();getProcessingTimeService().registerTimer(now + watermarkInterval, this);}}
我们来看这个open的初始方法,这个open是TimestampsAndPeriodicWatermarksOperator的初始方法,其实也是assignTimestampsAndWatermarks启动的条件,这个open给定了watermark的初始值。
这里初始化了两个值,
- 一个是其实的watermark的初始值,最小的long值,-9223372036854775808
- 另一个是初始的 watermark的间隔 如果是 EventTime就是当前时间加200ms,如果是ProcessTime就是当前时间。
再来看下面的定时任务
@Overridepublic void onProcessingTime(long timestamp) throws Exception {// register next timerWatermark newWatermark = userFunction.getCurrentWatermark();if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {currentWatermark = newWatermark.getTimestamp();// emit watermarkoutput.emitWatermark(newWatermark);}long now = getProcessingTimeService().getCurrentProcessingTime();getProcessingTimeService().registerTimer(now + watermarkInterval, this);}
再来看下我们自定义注册的watermark方法
@Overridepublic Watermark getCurrentWatermark() {// return the watermark as current highest timestamp minus the out-of-orderness boundreturn new Watermark(currentMaxTimestamp - maxOutOfOrderness);}
autoWatermarkInterval为0的话 super.open() 不会被调用
这里面now,就是System.currentTimeMillis(); 所以如果时间间隔不为0,那么下一次调用的时间就是 当前时间 + 方法运行的时间 + 时间间隔,由于方法运行的时间约等于0ms,所以基本就是每个时间间隔(默认200ms),运行一次获取wakermark的方法。
所以如果是ProcessingTime,那么默认时间间隔是0,所以matermarks时间就是一直-9223372036854775808,所以就一直不会过滤时间。
所以想要启动ProcessingTime 来做 时间戳 ,就一定要设置
env.getConfig().setAutoWatermarkInterval(200);
flink watermark 生成机制与总结相关推荐
- Flink自定义生成 Watermark
Watermark 策略简介 # 为了使用事件时间语义,Flink 应用程序需要知道事件时间戳对应的字段,意味着数据流中的每个元素都需要拥有可分配的事件时间戳.其通常通过使用 TimestampAss ...
- 【Flink】Flink 多并行度下的 watermark触发机制
1.案例 /*** 测试点:测试多 多并行度下的 watermark触发机制* 参考:链接:https://juejin.im/post/5bf95810e51d452d705fef33** @thr ...
- Flink1.11 intervalJoin watermark生成,状态清理机制源码理解Demo分析
参考博客 https://cloud.tencent.com/developer/article/1738836 数据类型为左流 FlinkClick(userid=gk01, click=Pay, ...
- Flink Watermark 源码分析
随着 flink 的快速发展与 API 的迭代导致新老版本差别巨大遂重拾 flink,在回顾到时间语义时对 watermark 有了不一样的理解. 一.如何生成 在 flink 1.12(第一次学习的 ...
- Flink WaterMark 详解
摘录仅供学习使用,原文来自: Flink详解系列之五--水位线(watermark) - 简书 1.概念 在Flink中,水位线是一种衡量Event Time进展的机制,用来处理实时数据中的乱序问题的 ...
- 【Flink】 Flink JobManager HA 机制的扩展与实现
1.概述 转载:Flink 源码阅读笔记(21)- Flink JobManager HA 机制的扩展与实现 在 Flink 1.12 中,Flink on Kubernetes 的 Native 部 ...
- Flink的CheckPoint机制
这里已经是Flink的第三篇原创啦.第一篇:Flink入门讲解了Flink的基础和相关概念,第二篇:压背原理,讲解了什么是背压,在Flink背压大概的流程是怎么样的. 这篇来讲Flink另一个比较重要 ...
- [源码解析] 从TimeoutException看Flink的心跳机制
[源码解析] 从TimeoutException看Flink的心跳机制 文章目录 [源码解析] 从TimeoutException看Flink的心跳机制 0x00 摘要 0x01 缘由 0x02 背景 ...
- Flink中容错机制 完整使用 (第十章)
Flink中容错机制 完整使用 一.容错机制 1.检查点(Checkpoint) 1. 检查点的保存 1. 周期性的触发保存 2. 保存的时间点 3. 保存的具体流程 2.从检查点恢复状态 (1)重启 ...
- JBPM对象主键生成机制
什么是主键 我们在建立数据库的时候,需要为每张表指定一个主键,所谓主键就是能够唯一标识表中某一行的属性或属性组,一个表只能有一个主键,但可以有多个候选索引.因为主键可以唯一标识某一行记录,所以可 ...
最新文章
- 7 Papers Radios | NeurIPS 2020最佳论文;全卷积网络实现E2E目标检测
- 如何关注掘金的所有小伙伴
- Vue 中的作用域插槽
- 修改echarts环形图的牵引线及文字位置
- 如何对计算属性进行修改_「计算摄影」计算机如何学会自动地进行图像美学增强?...
- 3、Eternal框架-控制器
- NB朴素贝叶斯进行中文文本分类
- dell 戴尔电脑官网保修期查询或驱动下载安装
- laravel添加语言包
- ANSYS 有限元分析 加载/求解/输出
- numpy.pad对图片进行填充
- 吐槽智能手机上那些不爽的事
- NSMutableAttributedString
- 游戏c是什么网络语言,游戏cpdd网络用语是什么意思 王者荣耀里很常见
- Linux下优秀的音频编辑软件
- Android - 制作聊天气泡.9格式
- 2021-11-4 socket的通信过程
- 流媒体服务器主板型号怎么看,玩转NAS 篇五:双2.5G接口+J4125处理器,对于NAS意味着什么?威联通453Dmini对比453Bmini...
- 关于单边账的解释及解决(收单行业)
- Mybatis-Plus 条件构造器Wrapper常用方法