一.理解Flink的乱序问题

理解Flink的乱序问题,的先理解Flink的时间语义.
Flink有3中时间语义:Event Time:事件创建的时间Ingestion Time:数据进入Flink的时间,后续版本好像这个时间语义.也就不讨论了.Processing Time:执行操作算子的本地系统时间,与机器相关.(Event Time的使用,必须配合WaterMark)Flink的时间语义的使用,需要搭配window机制.没有window开窗也就不存在乱序问题.反正所有数据最终都会被处理. 只有开窗了,窗口关闭了,有的数据没进来才会导致数据丢失,进而导致计算结果错误.那么思考一个问题:什么时间语义才会导致乱序呢?
答:Event Time.
解释:ProcessingTime以算子处理时间为准,就不存在乱序问题.
但是Event Time以数据产生时间为准.当 Flink 以 Event Time 模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子.由于网络、分布式等原因,会导致乱序数据的产生.乱序数据会让窗口计算不准确总结:Flink的乱序,需要基于Event Time 并且 后续有开窗操作

二.Flink的乱序问题处理

1.WaterMark

1.1WaterMark的理解

1.在事件时间体系中, 时间的进度依赖于数据本身, 和任何设备的时间无关.  事件时间程序必须制定如何产生Event Time Watermarks(水印) . 在事件时间体系中, 水印是表示时间进度的标志(作用就相当于现实时间的时钟).
2.数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,window 的执行也是由 Watermark 触发的(但是这种事理想情况,所谓的理想情况就是设置乱序程度刚好满足最大乱序时间差)
3.是一个特殊的时间戳,生成之后随着流的流动而向后传递。
4.单调递增的(时间不能倒退)。

==我的理解:==WaterMark主要用来控制窗口的关闭,和计算时间. 至于数据应该进入到哪个窗口里边还是基于数据自身的Event Time

1.2WaterMark的分类

有序流中的WaterMark

事件是有序的(生成数据的时间和被处理的时间顺序是一致的), watermark是流中一个简单的周期性的标记。
有序场景:
1、  底层调用的也是乱序的Watermark生成器,只是乱序程度传了一个0ms。
2、  Watermark = maxTimestamp – outOfOrdernessMills – 1ms
= maxTimestamp – 0ms – 1ms
=>事件时间 – 1ms

        //TODO 设置WaterMark 通常在开始时候设置/*** 参入填入 生成WaterMark的策略:有两个:单调增长和固定延迟的策略*/SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = map.assignTimestampsAndWatermarks(WatermarkStrategy//有序流WaterMark.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {//指定哪个字段作为事件时间字段@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}}));
//        waterSensorSingleOutputStreamOperator.print();//TODO 将相同key聚合到一起KeyedStream<WaterSensor, String> keyedStream = waterSensorSingleOutputStreamOperator.keyBy(r -> r.getId());//TODO 开启一个基于事件时间的滚动窗口WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));//TODO 窗口函数window.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {String msg = "当前key:" + key + "窗口:[" + context.window().getStart() / 1000 + "," + context.window().getEnd() / 1000 + ")一共有" +elements.spliterator().estimateSize() + "条数据";out.collect(msg);}}).print();

乱序流中的WaterMark

在下图中, 按照他们时间戳来看, 这些事件是乱序的, 则watermark对于这些乱序的流来说至关重要.通常情况下, 水印是一种标记, 是流中的一个点, 所有在这个时间戳(水印中的时间戳)前的数据应该已经全部到达. 一旦水印到达了算子, 则这个算子会提高他内部的时钟的值为这个水印的值.乱序场景:
1、  什么是乱序 => 时间戳大的比时间戳小的先来
2、  乱序程度设置多少比较合适?
a)  经验值 => 对自身集群和数据的了解,大概估算。
b)  对数据进行抽样。
c)  肯定不会设置为几小时,一般设为 秒 或者 分钟。
3、  Watermark = maxTimestamp – outOfOrdernessMills – 1ms=>当前最大的事件时间 – 乱序程度(等待时间)- 1ms 

        //TODO 设置WaterMark(允许固定延迟)乱序程度或者说固定延迟时间为3sSingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = map.assignTimestampsAndWatermarks(WatermarkStrategy//参数是固定延迟时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs()*1000;}}));//TODO keyByKeyedStream<WaterSensor, String> keyedStream = waterSensorSingleOutputStreamOperator.keyBy(r -> r.getId());//TODO 开启 基于事件时间的5s滚动窗口WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));//TODO 窗口聚合window.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {String msg = "当前key:" + key + "窗口:[" + context.window().getStart() / 1000 + "," + context.window().getEnd() / 1000 + ")一共有" +elements.spliterator().estimateSize() + "条数据";out.collect(msg);}}).print();

1.3WaterMark产生方式

周期型(后期看源码补)

周期型产生Watermark中,源码有两个重要属性.
1.记录当前当前最大时间戳 currentWatermark 记录200ms内最大的时间戳
2.上次发送的WaterMark: lastEmittedWatermark默认每200ms,生成一个WaterMark.
当前(最大时间戳-乱序程度)与lastEmittedWatermark比较,谁大往下游发送

间歇型

每来一条生成一个Watermark

多并行度下WaterMark的传递

总结:
1.多并行度的条件下, 向下游传递WaterMark的时候是以广播的方式传递的
2.总是以最小的那个WaterMark为准! 木桶原理!
3.并且当watermark值没有增长的时候不会向下游传递,注意:生成不变。

2.窗口允许迟到数据

已经添加了wartemark之后, 仍有数据会迟到怎么办?
Flink的窗口, 也允许迟到数据. 当触发了窗口计算后, 会先计算当前的结果, 但是此时并不会关闭窗口.以后每来一条迟到数据, 则触发一次这条数据所在窗口计算(增量计算). 那么什么时候会真正的关闭窗口呢?
wartermark 超过了窗口结束时间+等待时间
        //TODO 设置WaterMark(允许固定延迟)乱序程度或者说叫固定延迟为3秒SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = map.assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))//指定哪个字段作为事件时间字段.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}}));//将相同key的数据聚和到一块KeyedStream<WaterSensor, Tuple> keyedStream = waterSensorSingleOutputStreamOperator.keyBy("id");//TODO 开启一个基于事件时间的滚动窗口WindowedStream<WaterSensor, Tuple, TimeWindow> window = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)))//TODO 设置允许迟到的时间为3S.allowedLateness(Time.seconds(3));

3.侧输出流

当迟到窗口关闭后,还有数据过来.我们可以将其放入侧输出流中
public class SideOutput {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStream = env.socketTextStream("hadoop102", 9999);//端口中数据转为SensorReadingSingleOutputStreamOperator<SensorReading> mapDS = dataStream.map(line -> {String[] splits = line.split(" ");return new SensorReading(splits[0], new Long(splits[1]), new Double(splits[2]));});//侧输出流标签OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {};//提取WatermarkSingleOutputStreamOperator<SensorReading> resultDS = mapDS.assignTimestampsAndWatermarks(WatermarkStrategy.<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<SensorReading>() {@Overridepublic long extractTimestamp(SensorReading element, long recordTimestamp) {return element.getTs();}})).keyBy(r -> r.getId()).window(TumblingEventTimeWindows.of(Time.seconds(5)))//设置允许迟到时间.allowedLateness(Time.milliseconds(1))//迟到数据放入侧输出.sideOutputLateData(outputTag).max(2);//获取侧输出流resultDS.getSideOutput(outputTag);env.execute();}
}

三.代码注意事项

1.WaterMark生成策略时:泛型方法

mapDS.assignTimestampsAndWatermarks(
WatermarkStrategy.====forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(SensorReading element, long recordTimestamp) {
return element.getTs();
}
})
)

2.定义侧输出流标签

OutputTag outputTag = new OutputTag(“late”) {
};

//侧输出流标签
OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {
};//提取Watermark
SingleOutputStreamOperator<SensorReading> resultDS = mapDS.assignTimestampsAndWatermarks(WatermarkStrategy.<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<SensorReading>() {@Overridepublic long extractTimestamp(SensorReading element, long recordTimestamp) {return element.getTs();}})
).keyBy(r -> r.getId()).window(TumblingEventTimeWindows.of(Time.seconds(5)))//设置允许迟到时间.allowedLateness(Time.milliseconds(1))//迟到数据放入侧输出.sideOutputLateData(outputTag).max(2);//获取侧输出流resultDS.getSideOutput(outputTag);

Flink之乱序处理,时间语义,WaterMark,允许迟到数据,侧输出流相关推荐

  1. Flink中window 窗口和时间以及watermark水印

    我们都知道,Flink的核心是流式处理,但同时也支持批处理,Flink底层是一个流式引擎,在这个上面实现了流处理和批处理,而窗口则是批处理的实现. 在Flink中window从大的分类上主要有三种:T ...

  2. 【Excel】乱序不同行数的两列数据对比匹配

    1 情境 表格需求: 以上乱序不同行数的两个表格分别为总名单和签到表,需要在总名单中找到未签到人员. 表格特点: [表2:签到表] 为 [表1:总名单] 的子集: 两表顺序错乱: 不同姓名对应身份证号 ...

  3. excel两列乱序姓名如何一一对应 excel 两列数据自动配对

    excel两列乱序姓名如何一一对应?Excel是非常好用的数据表格处理软件,能够帮助用户快速的处理复杂的数据,而excel中有很多实用的功能,需要我们合理使用.有时需要将两个名单中的数据自动对应,自己 ...

  4. 【Flink】迟到数据的处理

    窗口 在流上的工作方式与批处理不同,因为流通常是无限的,所以不可能计算流中的所有元素,流上的聚合事件则由窗口限定,例如"过去 5 分钟的计数"或"最后 100 个元素的总 ...

  5. Flink的时间语义和Watermark

    1 时间语义    数据迟到的概念是:数据先产生,但是处理的时候滞后了    在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:    Event Time:是事件创建的时间.它通常由事件 ...

  6. 最新 Flink 1.13 时间和窗口(时间语义、Watermark、Window 窗口、Trigger)快速入门、详细教程

    时间和窗口 文章目录 时间和窗口 一.Flink 的三种时间语义 二.水位线(Watermark) 1. Flink 中的 Watermark 机制 2. 如何生成水位线 3. 水位线的传递 三.窗口 ...

  7. 【原理】Flink如何巧用WaterMark机制解决乱序问题

    这是彭文华的第91篇原创 问:数据工程师最期望数据怎么来? 答:按顺序来. MapReduce当初能用起来,就是因为Map阶段对所有数据都进行排序了,后面的Reduce阶段就可以直接用排序好的数据了. ...

  8. Flink乱序延迟时间处理-Watermark

    背景 一般我们都是用EventTime事件时间进行处理统计数据 但数据由于网络问题延迟.乱序到达会导致窗口计算数据不准确 需求:比如时间窗是 [12:01:01,12:01:10 ) ,但是有数据延迟 ...

  9. Flink事件时间、水印以及迟到数据处理的个人理解

    Flink中的时间概念 Flink在流式传输程序中支持不同的时间概念: ProcessingTime: 处理时间,正在执行操作的机器的时间 EventTime: 事件时间,事件发生的时间 Ingest ...

最新文章

  1. 买不到口罩怎么办?Python爬虫帮你时刻盯着自动下单!| 原力计划
  2. c++ RTTI(运行时类型识别)
  3. win8 C盘空间不足的几种解决方法
  4. linux基础-延时命令:sleep
  5. WPF代码模板-布局部分
  6. php strchr和strrchr,strrchr与Strchr
  7. 20165223《Java程序设计》第八周Java学习总结
  8. 【PMP学习笔记】:三、项目经理角色
  9. Qt实践| HTTP知识点-Qt填充referer请求头盗取图片
  10. python sendto(右键发送文件到执行的bat)功能的实现
  11. 几种简易APP制作方式!自留!
  12. 互联网晚报 | 10月16日 星期六 | 搜狗正式并入腾讯;宏光MINIEV累计销量破40万台;神舟十三号载人飞船成功发射...
  13. 如何快速找回丢失的数据?
  14. 多设备时设置default serial的方法
  15. 中国科学技术大学2021计算机考研分数线,【中国科学技术大学】2021考研复试分数线3月13日已公布!速看!...
  16. 基于固件的漏洞挖掘方法梳理
  17. linux远程可视化
  18. 达芬奇 - 构建数据查询API的框架
  19. 我们为什么需要光纤配线架
  20. 线性回归-多元线性回归

热门文章

  1. 【网络是怎么连接的】第四章 探索接入网和网络运营商
  2. NanoCore RAT流量分析报告
  3. python用什么敲代码_你还在纠结用什么库写 Python 命令行程序?看这一篇就够了...
  4. 【栈】L2-032 彩虹瓶 (25分)
  5. manjaro(linux)系统各类命令合集(渐渐更新)
  6. 请结合在播影视剧,对比一下腾讯、优酷和爱奇艺三家在付费内容推广策略上的差异。
  7. 107-周跳探测之MW
  8. C#使用checked检查溢出
  9. 命令查看主板和CPU温度
  10. 用Java写最简易版的银行系统