Flink 窗口函数(Window Functions)处理迟到数据
文章目录
- 将迟到的数据放入侧输出流
Lambda架构:用一个流处理器,先快速的得到一个正确,近似正确的结果,然后在另外一层是一个批处理器,然后在它是一直等着的,等所有数据都到齐了,计算出一个最终准确的结果,更新最终的结果,得到一个最终正确的结果。
将迟到的数据放入侧输出流
我们自然会想到,即使可以设置窗口的延迟时间,终归还是有限的,后续的数据还是会被丢弃。如果不想丢弃任何一个数据,又该怎么做呢?
Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了该上的车、本该被丢弃的数据。
基于 WindowedStream 调用.sideOutputLateData() 方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以 OutputTag 的类型与流中数据类型相同。
DataStream<Event> stream = env.addSource(...);
OutputTag<Event> outputTag = new OutputTag<Event>("late") {};
stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1))).sideOutputLateData(outputTag)
将迟到数据放入侧输出流之后,还应该可以将它提取出来。基于窗口处理完成之后的DataStream,调用.getSideOutput()方法,传入对应的输出标签,就可以获取到迟到数据所在的流了。
SingleOutputStreamOperator<AggResult> winAggStream = stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1))) .sideOutputLateData(outputTag).aggregate(new MyAggregateFunction())
DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);
这里注意,getSideOutput()是 SingleOutputStreamOperator 的方法,获取到的侧输出流数据类型应该和 OutputTag 指定的类型一致,与窗口聚合之后流中的数据类型可以不同。
调用.allowedLateness()指定允许延迟时间、调用.sideOutputLateData()将迟到数据写入侧输出流,这些都是可选的 API,一般不需要实现。而如果定义了侧输出流,可以基于窗口聚合之后的 DataStream 调用.getSideOutput()获取侧输出流
stream.keyBy(data -> data.url).window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(10)) //允许10秒钟数据迟到延迟.sideOutputLateData(late).aggregate(new UrlCountViewExample.UrlViewCountAgg(), new UrlCountViewExample.UrlViewCountResult());//计算后的数据result.print("result");//侧输出流的数据result.getSideOutput(late).print("late");
类比班车的例子,我们可以这样理解:大多数人是在发车时刻前后到达的,所以我们只要把表调慢,稍微等一会儿,绝大部分人就都上车了,这个把表调慢的时间就是水位线的延迟;到点之后,班车就准时出发了,不过可能还有该来的人没赶上。于是我们就先慢慢往前开,这段时间内,如果迟到的人抓点紧还是可以追上的;如果有人追上来了,就停车开门让他上来,然后车继续向前开。当然我们的车不能一直慢慢开,需要有一个时间限制,这就是窗口的允许延迟时间。一旦超过了这个时间,班车就不再停留,开上高速疾驰而去了。
代码如下:Gitee上的源代码
public class LateDataTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据,并提取时间戳、生成水位线DataStream<Event> stream = env.socketTextStream("hadoop102",8888).map(new MapFunction<String, Event>() {@Overridepublic Event map(String value) throws Exception {String[] fields = value.split(" ");return new Event(fields[0].trim(),fields[1].trim(),Long.valueOf(fields[2].trim()));}})//允许数据延迟一秒触发计算.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));//每一条的数据stream.print("input");//定义一个输出标签OutputTag<Event> late = new OutputTag<Event>("late"){};//统计每个url的访问量SingleOutputStreamOperator<UrlViewCount> result = stream.keyBy(data -> data.url).window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(10)) //允许10秒钟数据迟到延迟.sideOutputLateData(late).aggregate(new UrlCountViewExample.UrlViewCountAgg(), new UrlCountViewExample.UrlViewCountResult());//计算后的数据result.print("result");//侧输出流的数据result.getSideOutput(late).print("late");env.execute();}
}
我们还是先启动 nc –lk 8888,然后依次输入以下数据:
Bob ./home 1000
Bob ./home 2000
Bob ./home 3000
Bob ./home 4000
Bob ./home 5000
Bob ./home 6000
Bob ./home 7000
Bob ./home 8000
Bob ./home 9000
Bob ./home 11000
Bob ./home 13000
Bob ./home 15000
Bob ./home 16000
Bob ./home 4000
我们来分析一下程序的运行过程。当输入数据到[Bob ./home 4000]时,时间戳为4000,由于设置了 1 秒钟的水位线延迟时间,所以此时水位线到达了 4 秒(事实上是 3999毫秒,这里不再追究减 1 的细节),并没有触发 [0, 5s) 窗口的计算;所以接下来时间戳为 5000的数据到来,同样可以直接进入窗口做增量聚合。当时间戳为 6000 的数据到来时(无所谓url 是什么,所有数据都可以推动水位线前进),水位线到达了 6000 – 1 * 1000 = 5000,所以触发了[0, 5s) 窗口的计算,第一次输出了窗口统计结果,如下所示:
输入的每一条数据是input>
计算的结果是result>
迟到的侧输出流的数据是late>
这里 count 值为 4,就包括了之前输入的时间戳为 1000、2000、3000、4000 的四条数据。不过窗口触发计算之后并没有关闭销毁,而是继续等待迟到数据。之后时间戳为 7000的数据继续推进水位线,此时时钟已经进展到了 6000ms;此时再来一条时间戳为 4000 的数据,我们会发现立即输出了一条统计结果:
很明显,这仍然是[0, 5s) 的窗口,在之前计数值 4 的基础上继续叠加,更新统计结果为5。所以允许窗口处理迟到数据之后,相当于窗口有了一段等待时间,在这期间所有的迟到数据都会立即触发窗口计算,更新之前的结果。因此,之后时间戳为 3000 的数据到来,同样会立即输出:
我们设置窗口等待的时间为 10 秒中,所以当时间推进到 5000 + 10 * 1000 = 15000 时,窗口就会真正被销毁。此前的所有迟到数据可以直接更新窗口的计算结果,而之后的迟到数据已经无法整合进窗口,就只能用侧输出流来捕获了。需要注意的是,这里的“时间”依然是由水位线来指示的,所以时间戳为 15000 的数据到来,并不会触发窗口的销毁;当时间戳为 16000的数据到来,水位线推进到了 16000 – 1 * 1000 = 15000,此时窗口真正销毁关闭,之后再来的迟到数据就会输出到侧输出流了:
Flink 窗口函数(Window Functions)处理迟到数据相关推荐
- flink笔记8(接笔记7——窗口(Window),迟到数据的处理)
flink 3. 窗口(Window) (1)窗口的概念 (2)窗口的分类 (3)窗口 API 概览 (4)窗口分配器(Window Assigners) (5)窗口函数(Window Functio ...
- mysql 窗口函数最新一条_MySQL 8.0 窗口函数(window functions)
窗口函数(window functions)是数据库的标准功能之一,主流的数据库比如Oracle,PostgreSQL都支持窗口函数功能,MySQL 直到 8.0 版本才开始支持窗口函数. 窗口函数, ...
- Flink中迟到数据的处理
目录 设置水位线延迟时间 允许窗口处理迟到数据 将迟到数据放入窗口侧输出流 总结: 我们知道,所谓的"迟到数据"(late data),是指某个水位线之后到来的数据 ...
- Flink之乱序处理,时间语义,WaterMark,允许迟到数据,侧输出流
一.理解Flink的乱序问题 理解Flink的乱序问题,的先理解Flink的时间语义. Flink有3中时间语义:Event Time:事件创建的时间Ingestion Time:数据进入Flink的 ...
- Flink-时间和窗口(水位线、窗口、迟到数据的处理等)
文章目录 时间和窗口 时间 水位线(Watermark) 时间和窗口 水位线 有序和无序流的插入 水位线生成策略(Watermark Strategies) 水位线的传递 窗口(Window) 窗口 ...
- Flink 窗口函数(Window Functions)增量聚合函数
文章目录 增量聚合函数(incremental aggregation functions) 归约函数(ReduceFunction) 聚合函数(AggregateFunction) 定义了窗口分配器 ...
- flink 处理迟到数据(Trigger、设置水位线延迟时间、允许窗口处理迟到数据、将迟到数据放入侧输出流、代码示例、迟到数据触发窗口计算重复结果处理)
文章目录 前言 1.Trigger 2.处理迟到数据 2.1 设置水位线延迟时间 2.2 允许窗口处理迟到数据 2.3 将迟到数据放入侧输出流 3.实操 3.1 代码示例 3.2 中间遇到的异常 3. ...
- Flink对迟到数据的处理的三种方式
** Flink对迟到数据的处理 ** 水位线可以用来平衡计算的完整性和延迟两方面.除非我们选择一种非常保守的水位线策略(最大延时设置的非常大,以至于包含了所有的元素,但结果是非常大的延迟),否则我们 ...
- Flink迟到数据输出到测输出流
一.迟到的数据如何处理? Event Time语义下我们使用Watermark来判断数据是否迟到.一个迟到元素是指元素到达窗口算子时,该元素本该被分配到某个窗口,但由于延迟,窗口已经触发计算.目前Fl ...
最新文章
- oracle time格式化比较,ORACLE DATE和TIMESTAMP数据类型的比较(二) (转)
- Js 向json对象中添加新元素
- 参加51CTO学院软考培训,我通过啦!
- UITableView cell自定义视图中插入Table实现复杂界面
- linux kernel defconfig和.config
- kafka是如何解决粘包拆包的
- spring配置文件最全约束
- Linux进阶之进程与线程
- 深度可分离卷积结构(depthwise separable convolution)计算复杂度分析
- wps计算机打印双面输出,如何在电脑wps软件内设置双面打印
- 西瓜视频(头条)解析并利用IDM工具下载
- 爬网易云音乐动态的坑
- 中望3d快捷键命令大全_CAD常用快捷键命令大全:335个cad快捷键
- 【计算机网络】宽带接入技术
- ES集群状态一直yellow状态引发的思考
- 网络操作系统 Linux配置与管理,网络操作系统—Linux配置与管理
- 科目二考试的只言片语
- 抖音国庆小游戏是如何实现的?带你走近 Cocos
- 人工智能是什么,机器学习就是人工智能吗?
- Tiled有java版本吗_【Cocos2d-X开发学习笔记】开发工具之Tiled地图编辑器的使用