WindowOperator

processElement

final Collection<W> elementWindows = windowAssigner.assignWindows(   //找出该element被assign的所有windowselement.getValue(), element.getTimestamp(), windowAssignerContext);//if element is handled by none of assigned elementWindows
boolean isSkippedElement = true;  //element默认是会skipedfor (W window: elementWindows) {// drop if the window is already lateif (isWindowLate(window)) { //如果window是late,逻辑是window.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark(),continue表示skipcontinue;}isSkippedElement = false; //只要有一个窗口非late,该element就是非late数据windowState.setCurrentNamespace(window); windowState.add(element.getValue()); //把数据加到windowState中triggerContext.key = key;triggerContext.window = window;//EventTimeTrigger,(window.maxTimestamp() <= ctx.getCurrentWatermark(),会立即fire//否则只是ctx.registerEventTimeTimer(window.maxTimestamp()),注册等待后续watermark来触发TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { //如果FireACC contents = windowState.get();if (contents == null) {continue;}emitWindowContents(window, contents); //emit window内容, 这里会调用自己定义的user function}//对于比较常用的TumblingEventTimeWindows,用EventTimeTrigger,所以是不会触发purge的if (triggerResult.isPurge()) { //如果purgewindowState.clear();  //将window的state清除掉}registerCleanupTimer(window); //window的数据也需要清除
}// side output input event if
// element not handled by any window
// late arriving tag has been set
// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
//如果所有的assign window都是late,再判断一下element也是late
if (isSkippedElement && isElementLate(element)) { //isElementLate, (element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark())if (lateDataOutputTag != null){sideOutput(element); //如果定义了sideOutput,就输出late element} else {this.numLateRecordsDropped.inc(); //否则直接丢弃}
}

这里currentWatermark的默认值,
private long currentWatermark = Long.MIN_VALUE;

如果定期发送watermark,那么在第一次收到watermark前,不会有late数据
继续看看,数据清除掉逻辑
protected void registerCleanupTimer(W window) {long cleanupTime = cleanupTime(window); //cleanupTime, window.maxTimestamp() + allowedLatenessif (windowAssigner.isEventTime()) {triggerContext.registerEventTimeTimer(cleanupTime); //这里只是简单的注册registerEventTimeTimer} else {triggerContext.registerProcessingTimeTimer(cleanupTime);}
}

如果clear只是简单的注册EventTimeTimer,那么在onEventTime的时候一定有clear的逻辑、

WindowOperator.onEventTime

if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {  //time == cleanupTime(window);clearAllState(triggerContext.window, windowState, mergingWindows);
}

果然,onEventTime的时候会判断,如果Timer的time等于 window的cleanup time,就把all state清除掉

所以当超过,window.maxTimestamp() + allowedLateness就会被清理掉

转载于:https://www.cnblogs.com/fxjwind/p/7760798.html

Flink - allowedLateness相关推荐

  1. 【Flink】Flink allowedLateness 与 watermark 的区别

    文章目录 1.概述 2. 那么watermark和lateness区别在哪里呢? 1.概述 60-300-022-使用-延迟数据-Flink中allowedLateness详细介绍 Flink对于乱序 ...

  2. Flink中allowedLateness介绍与测试

    默认情况下,当watermark通过end-of-window之后,再有之前的数据到达时,这些数据会被删除. 为了避免有些迟到的数据被删除,因此产生了allowedLateness的概念. 简单来讲, ...

  3. 60-300-022-使用-延迟数据-Flink中allowedLateness详细介绍

    1.视界 2.概述 ​ 当指定一个允许延迟大于0时,window以及window中的内容将会继续保持即使水印已经达到了window的最后时间.在这种情况下,当一个延迟事件到来而未丢弃时,它可能会触发w ...

  4. 2021年大数据Flink(二十四):​​​​​​​Allowed Lateness案例演示

    Allowed Lateness案例演示 需求 有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额) 要求每隔5s,计算5秒内,每个用户的订单总金额 并添加Watermaker来解 ...

  5. 深度解读!新一代大数据引擎Flink厉害在哪?(附实现原理细节)

    导语 | 大数据计算分为离线计算和实时计算,其中离线计算就是我们通常说的批计算,代表技术是Hadoop MapReduce.Hive等:实时计算也被称作流计算,代表技术是Storm.Spark Str ...

  6. 技术实践 | 如何基于 Flink 实现通用的聚合指标计算框架

    导读:网易云信作为一个 PaaS 服务,需要对线上业务进行实时监控,实时感知服务的"心跳"."脉搏"."血压"等健康状况.通过采集服务拿到 ...

  7. 1.16.Flink Window和Time详解、TimeWindow的应用、Window聚合分类之全量聚合、全量聚合状态变化过程-求最大值、Time介绍、EventTime和Watermarks等

    1.16.Flink Window和Time详解 1.16.1.Window(窗口) 1.16.2.Window的类型 1.16.3.Window类型汇总 1.16.4.TimeWindow的应用 1 ...

  8. Flink – window operator

    参考, http://wuchong.me/blog/2016/05/25/flink-internals-window-mechanism/ http://wuchong.me/blog/2016/ ...

  9. Flink EventTime和Watermarks原理结合代码分析(转载+解决+精简记录)

    Apache Flink 框架保证Watermark单调递增,算子接收到一个Watermark时候,框架知道不会再有任何小于该Watermark的时间戳的数据元素到来了,所以Watermark可以看做 ...

最新文章

  1. [YTU]_2435 ( C++ 习题 输出日期时间--友元函数)
  2. Solr嵌套子文档的弊端以及一种替代方式
  3. 18、Page Object 设计模式
  4. 二分查找算法及其变种
  5. python指定文件路径_python实现指定文件夹下的指定文件移动到指定位置
  6. potplayer 多个进程_创建守护进程的步骤
  7. 【lucene】lucene 高级搜索
  8. java中两个xml文件内容拼接_比较Java中2个XML文档的最佳方法
  9. 基于多分类支持向量机和KNN分类器的大豆叶片病害检测与严重程度测量
  10. python实时监控redis队列_Python的Flask框架应用调用Redis队列数据
  11. conda install 报错PackagesNotFoundError: 以及和合理使用源的策略
  12. 利用kd树实现最近邻搜索
  13. 少儿计算机编程都学什么,少儿编程课是学什么的?
  14. python爬虫 | 爬取巨潮资讯上的上市公司招股说明书
  15. Python从入门到自闭(网络篇)
  16. js实现oss批量下载文件_前端实现批量打包下载文件
  17. 360Tray.exe是什么进程?360Tray.exe程序及常见问题介绍
  18. 李开复:我要找什么样的人一起创业?
  19. java微信小程序授权微信登录获取手机号
  20. 它来啦,它来啦!三子棋小游戏来啦!!!

热门文章

  1. Info.plist与Prefix.pch修改文件位置遇到的问题及解决方法
  2. Linux下profile environment bashrc的区别
  3. Java语言Socket接口用法详解
  4. 【翻译】如何获取正在运行的StreamInsight实例版本号?
  5. Toolbox的Ajax Extensions Tab不见了
  6. 【小安翻唱】Dreams-黑之契约者 双蛋快乐~顺便来拉票咯!
  7. 【项目合作】指甲识别与实时渲染
  8. DataFountain新上计算机视觉比赛-20万巨奖数钢筋
  9. “老年”程序员带你用Python玩街霸,你的童年用编程实现也很简单
  10. 嵌入式中常见的存储器总结(二)SRAM VS DRAM