问题排查-Flink session窗口最后一个不过期问题
问题描述
线上使用flink进行session窗口分析,分析结果写入db。在使用分析结果时发现实际统计数据缺少最后一条窗口的数据。
排查过程
查看flink写入db日志,发现没有最后一个session窗口统计数据写入的日志。猜测是flink没有触发session窗口过期,遂采取以下步骤进行问题复现和定位。
联系业务系统,梳理session窗口数据来源
经反馈数据来源为用户xx操作。
线上环境模拟用户操作,触发session窗口数据写入
业务系统有写入session窗口数据日志。
查看flink写入db日志
一段时间后,上一次未触发过期的session窗口统计数据写入db,但是没有最后一次的session窗口统计数据。
上网百度、查看官网session窗口功能文档
重点了解timestamp、水印、打点功能。关注WatermarkStrategy、WatermarkGenerator、TimestampAssigner及其实现类的功能,了解了flink内部时间戳生成逻辑、周期性发送水印逻辑。定位到问题在周期性发送水印上。
水印生成类及相关方法 BoundedOutOfOrdernessTimestampExtractor.class
public final Watermark getCurrentWatermark() {long potentialWM = this.currentMaxTimestamp - this.maxOutOfOrderness;if (potentialWM >= this.lastEmittedWatermark) {this.lastEmittedWatermark = potentialWM;}return new Watermark(this.lastEmittedWatermark);}public final long extractTimestamp(T element, long previousElementTimestamp) {long timestamp = this.extractTimestamp(element);if (timestamp > this.currentMaxTimestamp) {this.currentMaxTimestamp = timestamp;}return timestamp;}
这里可以知道发送的水印依赖于currentMaxTimestamp,而currentMaxTimestamp依赖于接收元素element触发extractTimestamp计算,也就是水印的时间更新必须要接收元素才行!!
问题解决、测试上线
使用自己的WatermarkStrategy和WatermarkGenerator实现水印超过一段时间没有接收到元素后就使用当前时间做为水印时间戳。代码如下:
MyWatermarkStrategy.class
public class MyWatermarkStrategy<T> implements WatermarkStrategy<T> {private static final long serialVersionUID = 1L;private final AssignerWithPeriodicWatermarks<T> wms;private final Long autoWatermarkInterval;public MyWatermarkStrategy(AssignerWithPeriodicWatermarks<T> wms, Long autoWatermarkInterval) {this.wms = (AssignerWithPeriodicWatermarks)Preconditions.checkNotNull(wms);this.autoWatermarkInterval = autoWatermarkInterval;}@Overridepublic TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return this.wms;}@Overridepublic WatermarkGenerator<T> createWatermarkGenerator(org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier.Context context) {return new MyAssignerWithPeriodicWatermarksAdapter(this.wms, this.autoWatermarkInterval);}}
MyAssignerWithPeriodicWatermarksAdapter.class
public static class MyAssignerWithPeriodicWatermarksAdapter<T> implements WatermarkGenerator<T>{private final AssignerWithPeriodicWatermarks<T> wms;private final Long autoWatermarkInterval;public MyAssignerWithPeriodicWatermarksAdapter(AssignerWithPeriodicWatermarks<T> wms, Long autoWatermarkInterval) {this.wms = (AssignerWithPeriodicWatermarks)Preconditions.checkNotNull(wms);this.autoWatermarkInterval = autoWatermarkInterval;}@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {}@Overridepublic void onPeriodicEmit(org.apache.flink.api.common.eventtime.WatermarkOutput output) {Watermark next = this.wms.getCurrentWatermark();if (next != null) {Long emitWatermarkTimestamp = (autoWatermarkInterval != null && autoWatermarkInterval.longValue() != 0L && (System.currentTimeMillis() - next.getTimestamp() < autoWatermarkInterval)) ? next.getTimestamp() : System.currentTimeMillis();output.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(emitWatermarkTimestamp));}}}
经测试,线上环境最后一个session窗口在没有数据超过autoWatermarkInterval时间间隔后会触发过期。
问题排查-Flink session窗口最后一个不过期问题相关推荐
- Flink的窗口聚合操作(Time\Count Window)
窗口基本概念:Flink中的窗口是左闭右开的窗口 Flink认为批处理是流处理的一个特例,而窗口window就是从流处理到批处理的一个桥梁,通常来讲窗口就是用来将无线数据流转换为优先数据集,从而在优先 ...
- Flink之窗口 (Window) 下篇
窗口函数(Window Functions) 定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了:至于收集起来到底要做什么,其实还完全没有头绪.所以在窗口分配器之后,必须再接上一个 ...
- Flink专题四:Flink DataStream 窗口介绍及使用
由于工作需要最近学习flink 现记录下Flink介绍和实际使用过程 这是flink系列的第四篇文章 Flink DataStream 窗口介绍及使用 窗口介绍 时间窗口 翻滚窗口(数据以一个时间断为 ...
- Flink之窗口 (Window) 上篇
我们已经了解了 Flink 中事件时间和水位线的概念,那它们有什么具体应用呢?当然是做基于时间的处理计算了.其中最常见的场景,就是窗口聚合计算. 之前我们已经了解了 Flink 中基本的聚合操作.在流 ...
- flink 自定义 窗口_【Flink 精选】阐述 Watermark 机制,剖析 Watermark 的产生和传递流程...
本文阐述 Flink 的事件时间和 Watermark 机制,剖析 Watermark 产生和传递的流程. 1 Event time 和 Watermark 的关系 1.1 Event time 和 ...
- flink 空闲窗口-withIdleness
flink 空闲窗口 flink多并行时,如果有窗口中没数据,那么有数据的窗口即使watermark到达了触发边界,barren没对齐,窗口也不会触发计算.这样的空窗口即空闲窗口.可通过设置空闲时间( ...
- Flink的窗口计算案例
窗口的分类 按照时间生成Window,为TimeWindow,根据窗口实现原理可分为三类: 滚动窗口(Tumbling Window):将数据依据固定的窗口长度对数据进行分片.滚动窗口分配器将每个元素 ...
- 【随记】Flink 时间窗口的起始时间
话不多说,直接上手今天的主题,探索一个容易让人忽略和困惑的问题:Flink 时间窗口的起始时间 就以最简单的demo为例: timeWindow(Time.seconds(5)) 上述定义一个步长为5 ...
- 【奇葩问题】每次打开excel文件都会出现两个窗口,一个是空白的sheet1,另一个是自己的文档
程序员的奇葩问题又增加了,问题如题:每次打开excel文件都会出现两个窗口,一个是空白的sheet1,另一个是自己的文档 一切的源头在于前段时间装了MySQL数据库,他就自动加载了两者之间的关联启动项 ...
最新文章
- Git使用4:Git分支
- mysql 4.0.21 下载_W2K下安装 MYSQL 4.0.21 手记
- 如何更改OST、OAB文件的默认路径?
- 智能家居数据库设计_设计更智能的数据表
- Java wait notify
- 第十一期:数据挖掘其实就是为了干这四种事?
- 华为云MySQL数据库外网使用
- 华为Mate 30系列或下血本采用双主摄方案:CMOS尺寸破纪录
- swagger3 设置值可以为空_swagger3.0使用及https问题处理
- 关于Linux内核vmlinuz、initrd.img和System.map
- android 鼠标大小设置,BlueStacks安卓模拟器不能调整屏幕窗口大小用鼠标拖拽也不能...
- APP推广运营手册全集
- SpringBoot-Learning-作者:翟永超
- boost斩波电路控制系统C语言,Boost升压斩波电路[精华]
- 深度学习21_李宏毅_04_Local Minimum And Saddle Point
- iOS 渲染原理解析
- Neno和OpenMP的性能提升验证
- 洛谷 P1879 [USACO06NOV]玉米田Corn Fields
- 开关电源matlab仿真设计报告,MATLAB非隔离式开关电源仿真分析+源代码
- 大商创是用哪种php柜架写的,大商创商家入驻入口去除说明简述