WaterMark 和 Window 机制解决了流式数据的乱序问题,对于因为延迟而顺序有误的数据,可以根据eventTime进行业务处理。

Event Time语义下我们使用Watermark来判断数据是否迟到。一个迟到元素是指元素到达窗口算子时,该元素本该被分配到某个

窗口,但由于延迟,窗口已经触发计算。目前Flink有三种处理迟到数据的方式:

  • 直接将迟到数据丢弃
  • 将迟到数据发送到另一个流
  • 重新执行一次计算,将迟到数据考虑进来,更新计算结果

将迟到数据丢弃

如果不做其他操作,默认情况下迟到数据会被直接丢弃。

将迟到数据发送到另外一个流

如果想对这些迟到数据处理,我们可以使用Flink的侧输出(Side Output)功能,将迟到数据发到某个特定的流上。后续我们可以根

据业务逻辑的要求,对迟到的数据流进行处理。

假设输入的数据格式如下

String : timestamp

如 hello:1559207589000

代码示例如下

DataStream<Tuple2<String, Long>> dataStream = env.socketTextStream("10.0.2.11", 10000, "\n").map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String s) throws Exception {String[] arr = s.split(":");return new Tuple2<String, Long>(arr[0], Long.valueOf(arr[1]));}}).filter(new FilterFunction<Tuple2<String, Long>>() {@Overridepublic boolean filter(Tuple2<String, Long> tuple2) throws Exception {return !tuple2.f0.equals("0") && tuple2.f1 != 0L;}})
;SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");DataStream waterStream = dataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {Long currentMaxTimestamp = 0L;Long maxOutOfOrderness = 3_000L;Long lastEmittedWatermark = Long.MIN_VALUE;@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {// 将元素的时间字段值作为该数据的timestampLong time = element.f1;if (time > currentMaxTimestamp) {currentMaxTimestamp = time;}String outData = String.format("key: %s    EventTime: %s    waterMark:  %s", element.f0, sdf.format(time),sdf.format(getCurrentWatermark().getTimestamp()));System.out.println(outData);return time;}@Nullable@Overridepublic Watermark getCurrentWatermark() {// 允许延迟三秒Long potentialWM = currentMaxTimestamp - maxOutOfOrderness;// 保证水印能依次递增if (potentialWM >= lastEmittedWatermark) {lastEmittedWatermark = potentialWM;}return new Watermark(lastEmittedWatermark);}
});OutputTag<Tuple2<String, Long>> lateData = new OutputTag<Tuple2<String, Long>>("late"){};
// 根据 name 进行分组
DataStream result = waterStream.keyBy(0).window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(5L))).sideOutputLateData(lateData).apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {@Overridepublic void apply(Tuple s, TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {System.out.println("trigger window [" + sdf.format(new Date(timeWindow.getStart())) + "," + sdf.format(new Date(timeWindow.getEnd())) + "), " + s + ", " + JSON.toJSONString(iterable));}});((SingleOutputStreamOperator<String>) result).getSideOutput(lateData).print("late");

上面的代码将迟到的内容写进名为“late”的OutputTag下,之后使用getSideOutput获取这些迟到的数据。

更新计算结果

对于迟到数据,使用上面两种方法,都对计算结果的正确性有影响。如果将数据流发送到单独的侧输出,我们仍然需要完成单独

的处理逻辑,相对比较复杂。更理想的情况是,将迟到数据重新进行一次触发,得到一个更新的结果。 allowedLateness允许用户在

Event Time下对某个窗口先得到一个结果,如果在一定时间内有迟到数据,迟到数据会和之前的数据一起重新被计算,以得到一

个更准确的结果。使用这个功能时需要注意,原来窗口中的状态数据在窗口已经触发的情况下仍然会被保留,否则迟到数据到来

后也无法与之前数据融合。另一方面,更新的结果要以一种合适的形式输出到外部系统,或者将原来结果覆盖,或者同时保存且

有时间戳以表明来自更新后的计算。比如,我们的计算结果是一个键值对(Key-Value),我们可以把这个结果输出到Redis这样

的KV数据库中,使用某些Reids命令,对于同一个Key下,旧的结果被新的结果所覆盖。

如果不明确调用allowedLateness,默认的允许延迟的参数是0。如果对一个Processing Time下的程序使用allowedLateness,将

引发异常。

DataStream<Tuple2<String, Long>> dataStream = env.socketTextStream("10.0.2.11", 10000, "\n").map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String s) throws Exception {String[] arr = s.split(":");return new Tuple2<String, Long>(arr[0], Long.valueOf(arr[1]));}}).filter(new FilterFunction<Tuple2<String, Long>>() {@Overridepublic boolean filter(Tuple2<String, Long> tuple2) throws Exception {return !tuple2.f0.equals("0") && tuple2.f1 != 0L;}})
;SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");DataStream waterStream = dataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {Long currentMaxTimestamp = 0L;Long maxOutOfOrderness = 3_000L;Long lastEmittedWatermark = Long.MIN_VALUE;@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {// 将元素的时间字段值作为该数据的timestampLong time = element.f1;if (time > currentMaxTimestamp) {currentMaxTimestamp = time;}String outData = String.format("key: %s    EventTime: %s    waterMark:  %s", element.f0, sdf.format(time),sdf.format(getCurrentWatermark().getTimestamp()));System.out.println(outData);return time;}@Nullable@Overridepublic Watermark getCurrentWatermark() {// 允许延迟三秒Long potentialWM = currentMaxTimestamp - maxOutOfOrderness;// 保证水印能依次递增if (potentialWM >= lastEmittedWatermark) {lastEmittedWatermark = potentialWM;}return new Watermark(lastEmittedWatermark);}
});OutputTag<Tuple2<String, Long>> lateData = new OutputTag<Tuple2<String, Long>>("late"){};
// 根据 name 进行分组
DataStream result = waterStream.keyBy(0).window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(5L))).allowedLateness(org.apache.flink.streaming.api.windowing.time.Time.seconds(2L)).sideOutputLateData(lateData).apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {@Overridepublic void apply(Tuple s, TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {System.out.println("trigger window [" + sdf.format(new Date(timeWindow.getStart())) + "," + sdf.format(new Date(timeWindow.getEnd())) + "), " + s + ", " + JSON.toJSONString(iterable));}});((SingleOutputStreamOperator<String>) result).getSideOutput(lateData).print("late");

在上面的代码中,我们设置的窗口为5秒,5秒结束后,窗口计算会被触发,生成第一个计算结果。allowedLateness设置窗口结束后

还要等待长为lateness的时间。如果某个迟到元素归属窗口的结束时间 + lateness > watermark 时间,该元素仍然会被加入到该窗

口中。每新到一个迟到数据,迟到数据被加入WindowFunction的缓存中,窗口的Trigger会触发一次FIRE,窗口函数被重新调用一

次,计算结果得到一次更新。否则会被计入迟到元素。

需要注意的是,使用了allowedLateness可能会导致两个窗口合并成一个窗口。

Flink 对于迟到数据的处理相关推荐

  1. 【学习笔记 — Flink 处理迟到数据(★)】

    Flink 处理迟到数据(★) 处理迟到数据之前首先了解Lambda架构 Lambda架构的实现是:一个批处理器.一个流处理器.流处理器首先实时输出近似正确的结果(因为乱序流,可能导致流处理结果不准确 ...

  2. Flink中迟到数据的处理

    目录 设置水位线延迟时间 允许窗口处理迟到数据 将迟到数据放入窗口侧输出流 总结:         我们知道,所谓的"迟到数据"(late data),是指某个水位线之后到来的数据 ...

  3. 【Flink】迟到数据的处理

    窗口 在流上的工作方式与批处理不同,因为流通常是无限的,所以不可能计算流中的所有元素,流上的聚合事件则由窗口限定,例如"过去 5 分钟的计数"或"最后 100 个元素的总 ...

  4. Flink对迟到数据的处理的三种方式

    ** Flink对迟到数据的处理 ** 水位线可以用来平衡计算的完整性和延迟两方面.除非我们选择一种非常保守的水位线策略(最大延时设置的非常大,以至于包含了所有的元素,但结果是非常大的延迟),否则我们 ...

  5. flink 处理迟到数据(Trigger、设置水位线延迟时间、允许窗口处理迟到数据、将迟到数据放入侧输出流、代码示例、迟到数据触发窗口计算重复结果处理)

    文章目录 前言 1.Trigger 2.处理迟到数据 2.1 设置水位线延迟时间 2.2 允许窗口处理迟到数据 2.3 将迟到数据放入侧输出流 3.实操 3.1 代码示例 3.2 中间遇到的异常 3. ...

  6. Flink处理迟到数据的几种方式

    方式1:设置水位线延迟时间 水位线延迟设置,一般设置为毫秒到秒级别. SingleOutputStreamOperator<Event> watermarks = streamOperat ...

  7. flink笔记8(接笔记7——窗口(Window),迟到数据的处理)

    flink 3. 窗口(Window) (1)窗口的概念 (2)窗口的分类 (3)窗口 API 概览 (4)窗口分配器(Window Assigners) (5)窗口函数(Window Functio ...

  8. Flink-时间和窗口(水位线、窗口、迟到数据的处理等)

    文章目录 时间和窗口 时间 水位线(Watermark) 时间和窗口 水位线 有序和无序流的插入 水位线生成策略(Watermark Strategies) 水位线的传递 窗口(Window) 窗口 ...

  9. flink 三种时间机制_Flink时间系列:Event Time下如何处理迟到数据

    Event Time语义下我们使用Watermark来判断数据是否迟到.一个迟到元素是指元素到达窗口算子时,该元素本该被分配到某个窗口,但由于延迟,窗口已经触发计算.目前Flink有三种处理迟到数据的 ...

最新文章

  1. Virtura box 构建一个简单局域网并联入外网
  2. django oracle数据库配置,django连接oracle时setting 配置方法
  3. artificial intelligence courses
  4. 基于react开发package.json的配置
  5. java 看书浏览器官_JAVA读取文件流,设置浏览器下载或直接预览操作
  6. gulp如何保存后自动刷新?看这里就够了
  7. XP没有IIS服务组组件
  8. ORACLE 12c RAC的常用管理命令
  9. 微信小程序onReachBottom不触发
  10. Springer期刊LaTeX模板的一些问题
  11. 两个三维向量叉积_俩个三维向量叉乘怎么算啊?
  12. 北大计算机复试被刷经历,为什么那么多高分被刷?复试真的有黑幕吗?
  13. JS splice的用法
  14. android 联系人操作: ContentProvider往通讯录添加联系人和获取联系人
  15. 【动网论坛7.1 sp1 修改】-会员信息修改方案
  16. 微信公众号会替代手机APP吗?
  17. 通过picgo+gitee搭建图床
  18. 【openWrt】随身wifi装openWrt的linux内存不够用?开启swap分区吧
  19. HDFS的fsimage和edits是什么、有什么作用
  20. memcpy和strcpy实现

热门文章

  1. 初次进入职场如何工作与学习
  2. EMC实验实战案例-ESD静电实验
  3. mobx 的autorun和reaction使用,监听mobx值变化
  4. 【SLM6550】 2A同步降压型锂电池充电电路
  5. 直流电机位置控制matlab仿真,利用Simulink仿真直流伺服电机的闭环位置控制系统...
  6. 安卓桌面软件_装bi小神器,让你的手机变成电脑桌面
  7. Python 实现自动打开电脑程序进行操作
  8. 输入存款金额m、存期(年)y和年利率r,计算并输出到期的利息p
  9. 2023年企业如何改善员工体验?为什么员工体验很重要?
  10. 【Java案例】超市购物