文章目录

  • 将迟到的数据放入侧输出流

Lambda架构:用一个流处理器,先快速的得到一个正确,近似正确的结果,然后在另外一层是一个批处理器,然后在它是一直等着的,等所有数据都到齐了,计算出一个最终准确的结果,更新最终的结果,得到一个最终正确的结果。

将迟到的数据放入侧输出流

我们自然会想到,即使可以设置窗口的延迟时间,终归还是有限的,后续的数据还是会被丢弃。如果不想丢弃任何一个数据,又该怎么做呢?

Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了该上的车、本该被丢弃的数据。
基于 WindowedStream 调用.sideOutputLateData() 方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以 OutputTag 的类型与流中数据类型相同。

DataStream<Event> stream = env.addSource(...);
OutputTag<Event> outputTag = new OutputTag<Event>("late") {};
stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1))).sideOutputLateData(outputTag)

将迟到数据放入侧输出流之后,还应该可以将它提取出来。基于窗口处理完成之后的DataStream,调用.getSideOutput()方法,传入对应的输出标签,就可以获取到迟到数据所在的流了。

SingleOutputStreamOperator<AggResult> winAggStream = stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1)))     .sideOutputLateData(outputTag).aggregate(new MyAggregateFunction())
DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);

这里注意,getSideOutput()是 SingleOutputStreamOperator 的方法,获取到的侧输出流数据类型应该和 OutputTag 指定的类型一致,与窗口聚合之后流中的数据类型可以不同。

调用.allowedLateness()指定允许延迟时间、调用.sideOutputLateData()将迟到数据写入侧输出流,这些都是可选的 API,一般不需要实现。而如果定义了侧输出流,可以基于窗口聚合之后的 DataStream 调用.getSideOutput()获取侧输出流

stream.keyBy(data -> data.url).window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(10)) //允许10秒钟数据迟到延迟.sideOutputLateData(late).aggregate(new UrlCountViewExample.UrlViewCountAgg(), new UrlCountViewExample.UrlViewCountResult());//计算后的数据result.print("result");//侧输出流的数据result.getSideOutput(late).print("late");

类比班车的例子,我们可以这样理解:大多数人是在发车时刻前后到达的,所以我们只要把表调慢,稍微等一会儿,绝大部分人就都上车了,这个把表调慢的时间就是水位线的延迟;到点之后,班车就准时出发了,不过可能还有该来的人没赶上。于是我们就先慢慢往前开,这段时间内,如果迟到的人抓点紧还是可以追上的;如果有人追上来了,就停车开门让他上来,然后车继续向前开。当然我们的车不能一直慢慢开,需要有一个时间限制,这就是窗口的允许延迟时间。一旦超过了这个时间,班车就不再停留,开上高速疾驰而去了。

代码如下:Gitee上的源代码

public class LateDataTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据,并提取时间戳、生成水位线DataStream<Event> stream = env.socketTextStream("hadoop102",8888).map(new MapFunction<String, Event>() {@Overridepublic Event map(String value) throws Exception {String[] fields = value.split(" ");return new Event(fields[0].trim(),fields[1].trim(),Long.valueOf(fields[2].trim()));}})//允许数据延迟一秒触发计算.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));//每一条的数据stream.print("input");//定义一个输出标签OutputTag<Event> late = new OutputTag<Event>("late"){};//统计每个url的访问量SingleOutputStreamOperator<UrlViewCount> result = stream.keyBy(data -> data.url).window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(10)) //允许10秒钟数据迟到延迟.sideOutputLateData(late).aggregate(new UrlCountViewExample.UrlViewCountAgg(), new UrlCountViewExample.UrlViewCountResult());//计算后的数据result.print("result");//侧输出流的数据result.getSideOutput(late).print("late");env.execute();}
}

我们还是先启动 nc –lk 8888,然后依次输入以下数据:

Bob ./home 1000
Bob ./home 2000
Bob ./home 3000
Bob ./home 4000
Bob ./home 5000
Bob ./home 6000
Bob ./home 7000
Bob ./home 8000
Bob ./home 9000
Bob ./home 11000
Bob ./home 13000
Bob ./home 15000
Bob ./home 16000
Bob ./home 4000

我们来分析一下程序的运行过程。当输入数据到[Bob ./home 4000]时,时间戳为4000,由于设置了 1 秒钟的水位线延迟时间,所以此时水位线到达了 4 秒(事实上是 3999毫秒,这里不再追究减 1 的细节),并没有触发 [0, 5s) 窗口的计算;所以接下来时间戳为 5000的数据到来,同样可以直接进入窗口做增量聚合。当时间戳为 6000 的数据到来时(无所谓url 是什么,所有数据都可以推动水位线前进),水位线到达了 6000 – 1 * 1000 = 5000,所以触发了[0, 5s) 窗口的计算,第一次输出了窗口统计结果,如下所示:

输入的每一条数据是input>
计算的结果是result>
迟到的侧输出流的数据是late>

这里 count 值为 4,就包括了之前输入的时间戳为 1000、2000、3000、4000 的四条数据。不过窗口触发计算之后并没有关闭销毁,而是继续等待迟到数据。之后时间戳为 7000的数据继续推进水位线,此时时钟已经进展到了 6000ms;此时再来一条时间戳为 4000 的数据,我们会发现立即输出了一条统计结果:

很明显,这仍然是[0, 5s) 的窗口,在之前计数值 4 的基础上继续叠加,更新统计结果为5。所以允许窗口处理迟到数据之后,相当于窗口有了一段等待时间,在这期间所有的迟到数据都会立即触发窗口计算,更新之前的结果。因此,之后时间戳为 3000 的数据到来,同样会立即输出:


我们设置窗口等待的时间为 10 秒中,所以当时间推进到 5000 + 10 * 1000 = 15000 时,窗口就会真正被销毁。此前的所有迟到数据可以直接更新窗口的计算结果,而之后的迟到数据已经无法整合进窗口,就只能用侧输出流来捕获了。需要注意的是,这里的“时间”依然是由水位线来指示的,所以时间戳为 15000 的数据到来,并不会触发窗口的销毁;当时间戳为 16000的数据到来,水位线推进到了 16000 – 1 * 1000 = 15000,此时窗口真正销毁关闭,之后再来的迟到数据就会输出到侧输出流了:

Flink 窗口函数(Window Functions)处理迟到数据相关推荐

  1. flink笔记8(接笔记7——窗口(Window),迟到数据的处理)

    flink 3. 窗口(Window) (1)窗口的概念 (2)窗口的分类 (3)窗口 API 概览 (4)窗口分配器(Window Assigners) (5)窗口函数(Window Functio ...

  2. mysql 窗口函数最新一条_MySQL 8.0 窗口函数(window functions)

    窗口函数(window functions)是数据库的标准功能之一,主流的数据库比如Oracle,PostgreSQL都支持窗口函数功能,MySQL 直到 8.0 版本才开始支持窗口函数. 窗口函数, ...

  3. Flink中迟到数据的处理

    目录 设置水位线延迟时间 允许窗口处理迟到数据 将迟到数据放入窗口侧输出流 总结:         我们知道,所谓的"迟到数据"(late data),是指某个水位线之后到来的数据 ...

  4. Flink之乱序处理,时间语义,WaterMark,允许迟到数据,侧输出流

    一.理解Flink的乱序问题 理解Flink的乱序问题,的先理解Flink的时间语义. Flink有3中时间语义:Event Time:事件创建的时间Ingestion Time:数据进入Flink的 ...

  5. Flink-时间和窗口(水位线、窗口、迟到数据的处理等)

    文章目录 时间和窗口 时间 水位线(Watermark) 时间和窗口 水位线 有序和无序流的插入 水位线生成策略(Watermark Strategies) 水位线的传递 窗口(Window) 窗口 ...

  6. Flink 窗口函数(Window Functions)增量聚合函数

    文章目录 增量聚合函数(incremental aggregation functions) 归约函数(ReduceFunction) 聚合函数(AggregateFunction) 定义了窗口分配器 ...

  7. flink 处理迟到数据(Trigger、设置水位线延迟时间、允许窗口处理迟到数据、将迟到数据放入侧输出流、代码示例、迟到数据触发窗口计算重复结果处理)

    文章目录 前言 1.Trigger 2.处理迟到数据 2.1 设置水位线延迟时间 2.2 允许窗口处理迟到数据 2.3 将迟到数据放入侧输出流 3.实操 3.1 代码示例 3.2 中间遇到的异常 3. ...

  8. Flink对迟到数据的处理的三种方式

    ** Flink对迟到数据的处理 ** 水位线可以用来平衡计算的完整性和延迟两方面.除非我们选择一种非常保守的水位线策略(最大延时设置的非常大,以至于包含了所有的元素,但结果是非常大的延迟),否则我们 ...

  9. Flink迟到数据输出到测输出流

    一.迟到的数据如何处理? Event Time语义下我们使用Watermark来判断数据是否迟到.一个迟到元素是指元素到达窗口算子时,该元素本该被分配到某个窗口,但由于延迟,窗口已经触发计算.目前Fl ...

最新文章

  1. oracle time格式化比较,ORACLE DATE和TIMESTAMP数据类型的比较(二) (转)
  2. Js 向json对象中添加新元素
  3. 参加51CTO学院软考培训,我通过啦!
  4. UITableView cell自定义视图中插入Table实现复杂界面
  5. linux kernel defconfig和.config
  6. kafka是如何解决粘包拆包的
  7. spring配置文件最全约束
  8. Linux进阶之进程与线程
  9. 深度可分离卷积结构(depthwise separable convolution)计算复杂度分析
  10. wps计算机打印双面输出,如何在电脑wps软件内设置双面打印
  11. 西瓜视频(头条)解析并利用IDM工具下载
  12. 爬网易云音乐动态的坑
  13. 中望3d快捷键命令大全_CAD常用快捷键命令大全:335个cad快捷键
  14. 【计算机网络】宽带接入技术
  15. ES集群状态一直yellow状态引发的思考
  16. 网络操作系统 Linux配置与管理,网络操作系统—Linux配置与管理
  17. 科目二考试的只言片语
  18. 抖音国庆小游戏是如何实现的?带你走近 Cocos
  19. 人工智能是什么,机器学习就是人工智能吗?
  20. Tiled有java版本吗_【Cocos2d-X开发学习笔记】开发工具之Tiled地图编辑器的使用

热门文章

  1. 计算机硬件加速怎么开,启用硬件加速是什么 是如何进行的【详解】
  2. msxml4.dll加载失败、动态链接库例程失败
  3. SAP中MD04中交货计划行例外信息30和再计划日期的分析
  4. 上海亚商投顾:沪指探底回升 供销社、新冠检测概念领涨
  5. python爬取收费素材_基于Python爬取素材网站音频文件
  6. 从魅族的成功总结的几条经验?
  7. 攻击JavaWeb应用————2、CS交互安全
  8. php 带http的域名,php提取URL中的域名部分
  9. web安全攻防学习之1-渗透测试信息收集
  10. C#基于Emgucv的圆形识别定位方法