flink 不设置水印_区分理解Flink水印延迟与窗口允许延迟的概念
link 在开窗处理事件时间(Event Time) 数据时,可设置水印延迟以及设置窗口允许延迟(allowedLateness)以保证数据的完整性。这两者因都是设置延迟时间所以刚接触时容易混淆。本文接下将展开讨论分析“水印延迟”与“窗口允许延迟”概念及区别。
水印延迟(WaterMark)
(1) 水印
由于采用了事件时间,脱离了物理挂钟。窗口不知道什么时候需要关闭并进行计算,这个时候需要借助水印来解决该问题。当窗口遇到水位标识时就默认是窗口时间段内的数据都到齐了,可以触发窗口计算。
(2) 水印延迟
设置水印延迟时间的目的是让水印延迟到达,从而可以解决乱序问题。通过水印延迟到达让在延迟时间范围内到达的迟到数据可以加入到窗口计算中,保证了数据的完整性。当水印到达后就会触发窗口计算,在水印之后到达的迟到数据则会被丢弃。
窗口允许延迟(allowedLateness)
使用 StreamAPI 时,在进行开窗后可设置 allowedLateness 窗口延迟。官网中对其解释如下:
默认情况下,当水印到达窗口末端时,迟到元素将会被删除。但Flink允许为window operators指定允许的最大延迟。允许延迟指定元素在被删除之前延迟的时间,默认值为0。当元素在水印经过窗口末端后到达,且它的到达时间在窗口末端加上运行延迟的时间之内,其仍会被添加到窗口中。根据所使用的触发器,延迟但未被丢弃的元素可能会再次触发窗口计算。EventTimeTrigger就是这种情况。为了做到这一点,Flink保持窗口的状态,直到它们允许的延迟到期。一旦发生这种情况,Flink将删除窗口并删除其状态,正如窗口生命周期部分中所描述的那样。
简单理解:通常在水印到达之后迟到数据将会被删除,而窗口的延迟则是指数据在被删除之前的允许保留时间。也就是说,在水印达到之后迟到数据本该被删除,但是如果设置了窗口延迟,那么在水印之后到窗口延迟时间段内到达的迟到数据还是会被加入到窗口计算中,并再次触发窗口计算。
一个Demo 两个猜想
下面我用一个 Demo 和两个猜想来帮助大家加深理解这两个概念。
例子:接收 Kafka 数据,数据为 JSON 格式如:{"word":"a","count":1,"time":1604286564}。我们开一个 5 秒的 tumbling windows 滚动窗口,以 word 作为 key 在窗口内对 count 值进行累加。同时设置水印延迟 2 秒,窗口延迟 2 秒。代码如下:
public class MyExample {
public static void main(String[] args) throws Exception {
// 创建环境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 设置时间特性为
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 水印策略,其需要注入Timestamp Assigner(描述了如何访问事件时间戳)和 Watermark Generator (事件流显示的超出正常范围的程度)
WatermarkStrategywatermarkStrategy=WatermarkStrategy
// forBoundedOutOfOrderness 属于(periodic周期性),周期生成器通常通过onEvent()观察传入的事件,然后在框架调用onPeriodicEmit()时发出水印。
.forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(WC wc, long l) {
return wc.getEventTime() * 1000;
}
});
// Kafka 配置
Properties properties=newProperties();
properties.setProperty("bootstrap.servers", "Kafka地址:9092");
properties.setProperty("group.id", "test");
// Flink 需要知道如何转换Kafka消息为Java对象(反序列化),默认提供了 KafkaDeserializationSchema(序列化需要自己编写)、JsonDeserializationSchema、AvroDeserializationSchema、TypeInformationSerializationSchema
env.addSource(new FlinkKafkaConsumer<>("flinktest1", new JSONKeyValueDeserializationSchema(true), properties).setStartFromLatest())
// map 构建 WC 对象
.map(new MapFunction() {
@Override
public WC map(ObjectNode jsonNode) throws Exception {
JsonNode valueNode=jsonNode.get("value");
WC wc=newWC(valueNode.get("word").asText(),valueNode.get("count").asInt(),valueNode.get("time").asLong());
return wc;
}
})
// 设定水印策略
.assignTimestampsAndWatermarks(watermarkStrategy)
.keyBy(WC::getWord)
// 窗口设置,这里设置为滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// 设置窗口延迟
.allowedLateness(Time.seconds(2))
.reduce(new ReduceFunction() {
@Override
public WC reduce(WC wc, WC t1) throws Exception {
return new WC(wc.getWord(), wc.getCount() + t1.getCount());
}
})
.print();
env.execute();
}
static class WC {
public String word;
public int count;
public long eventTime;
public long getEventTime() {
return eventTime;
}
public void setEventTime(long eventTime) {
this.eventTime= eventTime;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word= word;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count= count;
}
public WC(String word, int count) {
this.word= word;
this.count= count;
}
public WC(String word, int count,long eventTime) {
this.word= word;
this.count= count;
this.eventTime= eventTime;
}
@Override
public String toString() {
return "WC{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
猜想1:
水印延迟 2s 达到,所以会在第 5 + 2 = 7s 时认为 [ 0 ,5 ) 窗口的数据全部到齐,并触发窗口计算。
// 往 Kafka 中写入数据
{"word":"a","count":1,"time":1604286560} //2020-11-02 11:09:20
{"word":"a","count":1,"time":1604286561} //2020-11-02 11:09:21
{"word":"a","count":1,"time":1604286562} //2020-11-02 11:09:22
{"word":"a","count":1,"time":1604286566} //2020-11-02 11:09:26
{"word":"a","count":1,"time":1604286567} //2020-11-02 11:09:27 (触发了窗口计算)
控制台输出
分析:通过测试发现最后在第 7s 也就是 11:09:27 时触发了窗口计算,这符合了我们的猜想一。水印延迟 2s 达到,所以会在第 5 + 2 = 7s 时认为 [ 0 ,5 ) 窗口的数据全部到齐,并触发窗口计算。计算结果为3,这是因为只有最前面的3条数据属于 [0,5) 窗口计算范围之内。
猜想2:
设置了窗口延迟2秒,那么只要在水印之后到窗口允许延迟的时间范围内达到且属于 [ 0,5) 窗口的迟到数据会被加入到窗口中,且再次触发窗口运算:
// 继续往 Kafka 中写入数据
{"word":"a","count":1,"time":1604286568} //2020-11-02 11:09:28 时间到达了第 8 秒
{"word":"a","count":1,"time":1604286563} //2020-11-02 11:09:23 模拟一个在水印之后、在窗口允许延迟范围内、且属于[0,5) 窗口的迟到数据,该数据还是会触发并参与到[0,5) 窗口的计算
控制台输出新增了一行
// 我们再继续往 Kafka 中写入数据
{"word":"a","count":1,"time":1604286569} //2020-11-02 11:09:29 时间到达第9秒
{"word":"a","count":1,"time":1604286563} //2020-11-02 11:09:23 模拟一个在水印之后且超出窗口允许延迟范围、且属于[0,5) 窗口的迟到数据,该数据不会参与和触发[0,5)窗口计算
查看控制台并没有发现新的输出打印。
解析:水印因延迟在第 7s 到达之后会触发[0,5) 窗口计算,如果没有设置窗口延迟的情况下,水印之后迟到且属于 [0,5) 窗口的数据会被丢弃。上面我们实验设置窗口延迟 2s,实现的效果就是在水印之后,窗口允许延迟时间之内(7 + 2 = 9s 之间),迟到且属于 [0,5) 窗口的数据还是会触发一次窗口计算,并参与到窗口计算中。而在 9s 之后,也就是超过窗口允许延时时间,那么迟到且属于[0,5)的数据就会被丢弃。
总结
WaterMark 到达之前,窗口在攒数据,不会触发计算。
WaterMark 等于 windowEndTime 时,第一次触发窗口计算。
WaterMark 到达之后,allowlateness之前,如果来了数据,每条数据都会触发窗口计算。
超过了allowlateness之后到达的迟到数据会丢弃。
水印用于解决乱序问题保证数据的完整性。而之所以有allowlateness的出现是因为如果WaterMark 加大会导致窗口计算延迟。WaterMark 设定的时间,是第一次触发窗口计算的时间。allowlateness 表示,WaterMark 触发窗口计算以后,还可以再等多久的迟到数据,每次符合条件的数据到达都会再次触发一次窗口计算。allowlateness 是在 Watermark 基础上再做了一层迟到数据的保证。
【责任编辑:赵宁宁 TEL:(010)68476606】
点赞 0
flink 不设置水印_区分理解Flink水印延迟与窗口允许延迟的概念相关推荐
- flink sql设置并行度_《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍
前言 之所以写这个是因为前段时间自己的项目出现过这样的一个问题: Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Acto ...
- react加水印_给网页增加水印的方法,react
function addWaterMarker(str){ var can = document.createElement('canvas'); var body = document.body; ...
- lodop直接打印怎么去除水印_手机视频有水印怎么办?一招教你轻松去除,不学真是可惜了...
随着短视频的兴起,现在我们很多地方都需要用到短视频.但是大部分视频都有水印,这是比较烦人的,而且视频上的水印还不像图片那么容易去掉,今天教分享一个非常简单的视频去水印的办法. 由于我们手机是没有自带这 ...
- flink 不设置水印_Flink基础:时间和水印
往期推荐: 本篇终于到了Flink的核心内容:时间与水印.最初接触这个概念是在Spark Structured Streaming中,一直无法理解水印的作用.直到使用了一段时间Flink之后,对实 ...
- flink sql设置并行度_Flink原理——任务调度原理
本文主要从以下几个方面介绍Flink的任务调度原理 一.Flink运行时的组件 二.TaskManger与Slots 三.程序与数据流 四.Flink的执行图 五.Flink程序执行的并行度 六.Fl ...
- 深入理解 Flink 容错机制
本文作者:Paul Lin 本文链接: 2019/07/28/深入理解-Flink-容错机制/ 版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 CN 许可协议.转载请注 ...
- 初学Flink,对Watermarks的一些理解和感悟(透彻2)
2019独角兽企业重金招聘Python工程师标准>>> 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1. ...
- flink运行原理_浅谈Flink分布式运行时和数据流图的并行化
本文将以WordCount的案例为主线,主要介绍Flink的设计和运行原理.关于Flink WordCount程序可以参考我之前的文章:读取Kafka实时数据流,实现Flink WordCount.阅 ...
- 一文理解Flink 水位线(Flink Watermark)
文章目录 Flink 中的时间语义 `处理时间` `事件时间` 水位线(Watermark) `事件时间和窗口` `什么是水位线` 有序流中的水位线 乱序流中的水位线 `水位线的特性` `如何生成水位 ...
最新文章
- gcc与__cplusplus宏
- oracle datafile损坏,史上最全Oracle文件损坏处理办法(附实验步骤)
- JQuery简介选择器
- WebRTC 2021 流行趋势大赏
- Fiddler进行模拟Post提交json数据,总为null解决方式
- 上传图片和音频到Server,再转存到DB
- pooleddb mysql_使用dbutils的PooledDB连接池,操作数据库
- 45本数据分析、Python的书籍,包邮送到家
- 查看svn服务器上的文件,如何在命令行通过SVN命令筛选出修改过的文件并递交
- The RSpec Book笔记《四》Describing Code with RSpec用RSpec描述代码
- 收集整理的一些windows好用的工具(持续更新)
- java emun ordinal_关于Java:JPA枚举ORDINAL与STRING
- Spring和SpringMVC配置中父子WebApplicationContext的关系
- plex插件显示无服务器,deepin 15.11 安装plex和插件
- 通过WIFI信号跟踪三维人体姿态的新方法
- 【PHP】保留两位小数并向上取整
- ubuntu20.04启动时黑屏
- ubuntu16.04无法联网
- TA游戏推荐:Android益智游戏《戳青蛙》
- 培训班出来的人后来都怎么样了?(六)
热门文章
- linux CentOS7最小化安装环境静默安装Oracle11GR2数据库(安装常用工具_02)
- IntelliJ IDEA 2019.1 windows找不到文件‘chrome’
- 实战03_SSM整合ActiveMQ支持多种类型消息
- 企业实战_02_Redis基础
- Java-if选择结构
- qt自定义窗口添加父窗口后,显示不出来
- android内置t卡中预制资源,[FAQ17514][Recovery]Recovery mode FAQ搜寻指南
- java测试用例编写_TestNG测试用例编写和执行
- HTML用css让input无法使用,html – 是否可以使用CSS设置一个禁用的INPUT元素?
- linux 进程调度源码分析,Linux调度器源码分析