Flink - allowedLateness
WindowOperator
processElement
final Collection<W> elementWindows = windowAssigner.assignWindows( //找出该element被assign的所有windowselement.getValue(), element.getTimestamp(), windowAssignerContext);//if element is handled by none of assigned elementWindows boolean isSkippedElement = true; //element默认是会skipedfor (W window: elementWindows) {// drop if the window is already lateif (isWindowLate(window)) { //如果window是late,逻辑是window.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark(),continue表示skipcontinue;}isSkippedElement = false; //只要有一个窗口非late,该element就是非late数据windowState.setCurrentNamespace(window); windowState.add(element.getValue()); //把数据加到windowState中triggerContext.key = key;triggerContext.window = window;//EventTimeTrigger,(window.maxTimestamp() <= ctx.getCurrentWatermark(),会立即fire//否则只是ctx.registerEventTimeTimer(window.maxTimestamp()),注册等待后续watermark来触发TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { //如果FireACC contents = windowState.get();if (contents == null) {continue;}emitWindowContents(window, contents); //emit window内容, 这里会调用自己定义的user function}//对于比较常用的TumblingEventTimeWindows,用EventTimeTrigger,所以是不会触发purge的if (triggerResult.isPurge()) { //如果purgewindowState.clear(); //将window的state清除掉}registerCleanupTimer(window); //window的数据也需要清除 }// side output input event if // element not handled by any window // late arriving tag has been set // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp //如果所有的assign window都是late,再判断一下element也是late if (isSkippedElement && isElementLate(element)) { //isElementLate, (element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark())if (lateDataOutputTag != null){sideOutput(element); //如果定义了sideOutput,就输出late element} else {this.numLateRecordsDropped.inc(); //否则直接丢弃} } 这里currentWatermark的默认值,
private long currentWatermark = Long.MIN_VALUE; 如果定期发送watermark,那么在第一次收到watermark前,不会有late数据
继续看看,数据清除掉逻辑
protected void registerCleanupTimer(W window) {long cleanupTime = cleanupTime(window); //cleanupTime, window.maxTimestamp() + allowedLatenessif (windowAssigner.isEventTime()) {triggerContext.registerEventTimeTimer(cleanupTime); //这里只是简单的注册registerEventTimeTimer} else {triggerContext.registerProcessingTimeTimer(cleanupTime);} }
如果clear只是简单的注册EventTimeTimer,那么在onEventTime的时候一定有clear的逻辑、
WindowOperator.onEventTime
if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) { //time == cleanupTime(window);clearAllState(triggerContext.window, windowState, mergingWindows); }
果然,onEventTime的时候会判断,如果Timer的time等于 window的cleanup time,就把all state清除掉
所以当超过,window.maxTimestamp() + allowedLateness就会被清理掉
转载于:https://www.cnblogs.com/fxjwind/p/7760798.html
Flink - allowedLateness相关推荐
- 【Flink】Flink allowedLateness 与 watermark 的区别
文章目录 1.概述 2. 那么watermark和lateness区别在哪里呢? 1.概述 60-300-022-使用-延迟数据-Flink中allowedLateness详细介绍 Flink对于乱序 ...
- Flink中allowedLateness介绍与测试
默认情况下,当watermark通过end-of-window之后,再有之前的数据到达时,这些数据会被删除. 为了避免有些迟到的数据被删除,因此产生了allowedLateness的概念. 简单来讲, ...
- 60-300-022-使用-延迟数据-Flink中allowedLateness详细介绍
1.视界 2.概述 当指定一个允许延迟大于0时,window以及window中的内容将会继续保持即使水印已经达到了window的最后时间.在这种情况下,当一个延迟事件到来而未丢弃时,它可能会触发w ...
- 2021年大数据Flink(二十四):Allowed Lateness案例演示
Allowed Lateness案例演示 需求 有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额) 要求每隔5s,计算5秒内,每个用户的订单总金额 并添加Watermaker来解 ...
- 深度解读!新一代大数据引擎Flink厉害在哪?(附实现原理细节)
导语 | 大数据计算分为离线计算和实时计算,其中离线计算就是我们通常说的批计算,代表技术是Hadoop MapReduce.Hive等:实时计算也被称作流计算,代表技术是Storm.Spark Str ...
- 技术实践 | 如何基于 Flink 实现通用的聚合指标计算框架
导读:网易云信作为一个 PaaS 服务,需要对线上业务进行实时监控,实时感知服务的"心跳"."脉搏"."血压"等健康状况.通过采集服务拿到 ...
- 1.16.Flink Window和Time详解、TimeWindow的应用、Window聚合分类之全量聚合、全量聚合状态变化过程-求最大值、Time介绍、EventTime和Watermarks等
1.16.Flink Window和Time详解 1.16.1.Window(窗口) 1.16.2.Window的类型 1.16.3.Window类型汇总 1.16.4.TimeWindow的应用 1 ...
- Flink – window operator
参考, http://wuchong.me/blog/2016/05/25/flink-internals-window-mechanism/ http://wuchong.me/blog/2016/ ...
- Flink EventTime和Watermarks原理结合代码分析(转载+解决+精简记录)
Apache Flink 框架保证Watermark单调递增,算子接收到一个Watermark时候,框架知道不会再有任何小于该Watermark的时间戳的数据元素到来了,所以Watermark可以看做 ...
最新文章
- [YTU]_2435 ( C++ 习题 输出日期时间--友元函数)
- Solr嵌套子文档的弊端以及一种替代方式
- 18、Page Object 设计模式
- 二分查找算法及其变种
- python指定文件路径_python实现指定文件夹下的指定文件移动到指定位置
- potplayer 多个进程_创建守护进程的步骤
- 【lucene】lucene 高级搜索
- java中两个xml文件内容拼接_比较Java中2个XML文档的最佳方法
- 基于多分类支持向量机和KNN分类器的大豆叶片病害检测与严重程度测量
- python实时监控redis队列_Python的Flask框架应用调用Redis队列数据
- conda install 报错PackagesNotFoundError: 以及和合理使用源的策略
- 利用kd树实现最近邻搜索
- 少儿计算机编程都学什么,少儿编程课是学什么的?
- python爬虫 | 爬取巨潮资讯上的上市公司招股说明书
- Python从入门到自闭(网络篇)
- js实现oss批量下载文件_前端实现批量打包下载文件
- 360Tray.exe是什么进程?360Tray.exe程序及常见问题介绍
- 李开复:我要找什么样的人一起创业?
- java微信小程序授权微信登录获取手机号
- 它来啦,它来啦!三子棋小游戏来啦!!!
热门文章
- Info.plist与Prefix.pch修改文件位置遇到的问题及解决方法
- Linux下profile environment bashrc的区别
- Java语言Socket接口用法详解
- 【翻译】如何获取正在运行的StreamInsight实例版本号?
- Toolbox的Ajax Extensions Tab不见了
- 【小安翻唱】Dreams-黑之契约者 双蛋快乐~顺便来拉票咯!
- 【项目合作】指甲识别与实时渲染
- DataFountain新上计算机视觉比赛-20万巨奖数钢筋
- “老年”程序员带你用Python玩街霸,你的童年用编程实现也很简单
- 嵌入式中常见的存储器总结(二)SRAM VS DRAM