摘录仅供学习使用,原文来自: Flink详解系列之五--水位线(watermark) - 简书

1、概念

在Flink中,水位线是一种衡量Event Time进展的机制,用来处理实时数据中的乱序问题的,通常是水位线和窗口结合使用来实现。

从设备生成实时流事件,到Flink的source,再到多个oparator处理数据,过程中会受到网络延迟、背压等多种因素影响造成数据乱序。在进行窗口处理时,不可能无限期的等待延迟数据到达,当到达特定watermark时,认为在watermark之前的数据已经全部达到(即使后面还有延迟的数据), 可以触发窗口计算,这个机制就是 Watermark(水位线),具体如下图所示。

window的生成时间是基于wall clock(processing time)的,与event time无关。

2、水位线的计算

watermark本质上是一个时间戳,且是动态变化的,会根据当前最大事件时间产生。watermarks具体计算为:

watermark = 进入 Flink 窗口的最大的事件时间(maxEventTime)— 指定的延迟时间(t)

当watermark时间戳大于等于窗口结束时间时,意味着窗口结束,需要触发窗口计算。

watermark的本质是使用event time 做一个函数映射生成触发window计算的wall clock时间(processing time).

3、水位线生成

3.1 生成的时机

水位线生产的最佳位置是在尽可能靠近数据源的地方,因为水位线生成时会做出一些有关元素顺序相对时间戳的假设。由于数据源读取过程是并行的,一切引起Flink跨行数据流分区进行重新分发的操作(比如:改变并行度,keyby等)都会导致元素时间戳乱序。但是如果是某些初始化的filter、map等不会引起元素重新分发的操作,可以考虑在生成水位线之前使用。

3.2 水位线分配器

  • Periodic Watermarks

周期性分配水位线比较常用,是我们会指示系统以固定的时间间隔发出的水位线。在设置时间为事件时间时,会默认设置这个时间间隔为200ms, 如果需要调整可以自行设置。比如下面的例子是手动设置每隔1s发出水位线。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 手动设置时间间隔为1s
env.getConfig().setAutoWatermarkInterval(1000);

周期水位线需要实现接口:AssignerWithPeriodicWatermarks,下面是示例:

public class TestPeriodWatermark implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {Long currentMaxTimestamp = 0L;final Long maxOutOfOrderness = 1000L;// 延迟时长是1s@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {long timestamp = element.f1;currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);return timestamp;}
}
  • Punctuated Watermarks

定点水位线不是太常用,主要为输入流中包含一些用于指示系统进度的特殊元组和标记,方便根据输入元素生成水位线的场景使用的。

由于数据流中每一个递增的EventTime都会产生一个Watermark。
在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。

public class TestPunctuateWatermark implements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> {@Nullable@Overridepublic Watermark checkAndGetNextWatermark(Tuple2<String, Long> lastElement, long extractedTimestamp) {return new Watermark(extractedTimestamp);}@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {return element.f1;}
}

4、水位线与数据完整性

水位线可以用于平衡延迟和结果的完整性,它控制着执行某些计算需要等待的时间。这个时间是预估的,现实中不存在完美的水位线,因为总会存在延迟的记录。现实处理中,需要我们足够了解从数据生成到数据源的整个过程,来估算延迟的上线,才能更好的设置水位线。

如果水位线设置的过于宽松,好处是计算时能保证近可能多的数据被收集到,但由于此时的水位线远落后于处理记录的时间戳,导致产生的数据结果延迟较大。

如果设置的水位线过于紧迫,数据结果的时效性当然会更好,但由于水位线大于部分记录的时间戳,数据的完整性就会打折扣。

所以,水位线的设置需要更多的去了解数据,并在数据时效性和完整性上有一个权衡。

===================================

从Flink 1.12 开始WaterMark接口进行了更新,原理没有变化。

WatermarkStrategy

-- TimestampAssigner
-- WatermarkGenerator

TimestampAssigner


/*** A {@code TimestampAssigner} assigns event time timestamps to elements. These timestamps are used* by all functions that operate on event time, for example event time windows.** <p>Timestamps can be an arbitrary {@code long} value, but all built-in implementations represent* it as the milliseconds since the Epoch (midnight, January 1, 1970 UTC), the same way as {@link* System#currentTimeMillis()} does it.** @param <T> The type of the elements to which this assigner assigns timestamps.*/
@Public
@FunctionalInterface
public interface TimestampAssigner<T> {/*** The value that is passed to {@link #extractTimestamp} when there is no previous timestamp* attached to the record.*/long NO_TIMESTAMP = Long.MIN_VALUE;/*** Assigns a timestamp to an element, in milliseconds since the Epoch. This is independent of* any particular time zone or calendar.** <p>The method is passed the previously assigned timestamp of the element. That previous* timestamp may have been assigned from a previous assigner. If the element did not carry a* timestamp before, this value is {@link #NO_TIMESTAMP} (= {@code Long.MIN_VALUE}: {@value* Long#MIN_VALUE}).** @param element The element that the timestamp will be assigned to.* @param recordTimestamp The current internal timestamp of the element, or a negative value, if*     no timestamp has been assigned yet.* @return The new timestamp.*/long extractTimestamp(T element, long recordTimestamp);
}

默认实现的类: RecordTimestampAssigner


/*** A {@link TimestampAssigner} that forwards the already-assigned timestamp. This is for use when* records come out of a source with valid timestamps, for example from the Kafka Metadata.*/
@Public
public final class RecordTimestampAssigner<E> implements TimestampAssigner<E> {@Overridepublic long extractTimestamp(E element, long recordTimestamp) {return recordTimestamp;}
}

WatermarkGenerator


/*** The {@code WatermarkGenerator} generates watermarks either based on events or periodically (in a* fixed interval).** <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the {@code* AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.*/
@Public
public interface WatermarkGenerator<T> {/*** Called for every event, allows the watermark generator to examine and remember the event* timestamps, or to emit a watermark based on the event itself.*/void onEvent(T event, long eventTimestamp, WatermarkOutput output);/*** Called periodically, and might emit a new watermark, or not.** <p>The interval in which this method is called and Watermarks are generated depends on {@link* ExecutionConfig#getAutoWatermarkInterval()}.*/void onPeriodicEmit(WatermarkOutput output);
}

实现的类主要有:

BoundedOutOfOrdernessWatermarks
WatermarksWithIdleness

参考:

Flink详解系列之五--水位线(watermark) - 简书

(一)Flink WaterMark 详解及实例 - 掘金

https://mp.csdn.net/mp_blog/creation/editor/129186757

Flink WaterMark 详解相关推荐

  1. Flink/Blink 原理漫谈(一)时间,watermark详解

    系列文章目录 Flink/Blink 原理漫谈(零)运行时的组件 Flink/Blink 原理漫谈(一)时间,watermark详解 Flink/Blink 原理漫谈(二)流表对偶性和distinct ...

  2. 1.17.Flink 并行度详解(Parallel)、TaskManager与Slot、Operator Level、Execution Environment Level、Client Level等

    1.17.Flink 并行度详解(Parallel) 1.17.1.TaskManager与Slot 1.17.2.TaskManager与Slot 1.17.3.并行度(Parallel) 1.17 ...

  3. 1.12.Flink Kafka-Connector详解、Consumer消费策略设置、动态加载Topic、Consumers Offset 自动提交、Producer、容错等

    1.12.Flink Kafka-Connector详解 1.12.1.Kafka Consumer消费策略设置 1.12.2.Kafka Consumer的容错 1.12.3.动态加载Topic 1 ...

  4. 数据湖架构Hudi(五)Hudi集成Flink案例详解

    五.Hudi集成Flink案例详解 5.1 hudi集成flink flink的下载地址: https://archive.apache.org/dist/flink/ Hudi Supported ...

  5. Flink Checkpoint 详解

    Flink Checkpoint 详解 一.checkpoint简介 二.checkpoint原理 三.精确一次 四.状态后端 五.配置推荐 一.checkpoint简介 Checkpoint是Fli ...

  6. Flink RPC 详解

    flink底层RPC调用 Flink架构分析之RPC详解 参考spark rpc 总览 总结: ActorSystem 和 Actor/ActorRef 的概念,工作方式,通信方式等,注意异步和 ma ...

  7. hive内置函数_flink教程flink modules详解之使用hive函数

    modules概念 通过hive module使用hive函数 内置函数 自定义函数 sql 客户端的使用 原理分析和源码解析 实现 modules概念 flink 提供了一个module的概念,使用 ...

  8. 【Flink】详解Flink的八种分区

    简介 Flink是一个流处理框架,一个Flink-Job由多个Task/算子构成,逻辑层面构成一个链条,同时Flink支持并行操作,每一个并行度可以理解为一个数据管道称之为SubTask.我们画图来看 ...

  9. Flink: CEP详解

    本文根据 Apache Flink 系列直播课程整理而成,由哈啰出行大数据实时平台资深开发刘博分享.通过一些简单的实际例子,从概念原理,到如何使用,再到功能的扩展,希望能够给计划使用或者已经使用的同学 ...

最新文章

  1. java treemap用法_Java TreeMap put()用法及代码示例
  2. Bash之break、continue和return命令在循环中的作用
  3. a,b为2个整型变量,在不引入第三个变量的前提下写一个算法实现 a与b的值互换...
  4. IPv6与IPv4比较
  5. MVC 使用Jquery实现AJax
  6. Jzoj3898 树的连通性
  7. 基于VS快速排序的单元测试
  8. 谷歌新政策的搜索字词紧密变体怎么应对?
  9. 深度系统安装移动硬盘启动_Legacy无损更改UEFI启动并安装双系统
  10. C++ Primer Plus习题及答案-第五章
  11. 关于Facebook发币的7个问题,专家都是怎么说的?
  12. 网站建设好但是访问不了的原因及解决方法
  13. 计算机输入法在桌面显示不出来的,电脑开机无法正常显示桌面只能看到输入法如何解决...
  14. openwrt利用arp获取局域网设备IP
  15. spring restTemplate的坑----会对String类型的url中的特殊字符进行转义
  16. 论文阅读_对比学习_SimCSE
  17. Tableau CA考试lod详细级别专题解析
  18. 测试用例-——教室和椅子
  19. LWIP协议栈设计与实现笔记:
  20. 《非对称风险》书中精髓:「风险共担」是我们理解人类社会和世界的前提,一个没有风险共担的系统会慢慢积累不平衡,最终垮掉。

热门文章

  1. 酒店三合一终端服务器,MOXA NPORT6650-32 三十二口三合一串口终端服务器
  2. 中职计算机课题研究题目参考,中职课题研究题目参考
  3. 11月25日在线研讨会 | 整车人机工效仿真及虚拟验证
  4. Jeff Dean-Google的那些传闻
  5. 研究生发计算机科学增刊,研究生学习成绩、科研成果计分办法(2017年6月修订)(试用稿) ......
  6. linux跑火车的命令sl
  7. 大辉谈-用redis和CDN实现百万并发架构
  8. JAVA扫码点餐(4)-遗留问题
  9. 基于intel soc+fpga智能驾驶舱和高级驾驶辅助系统软件设计(三)
  10. 计算机除用哪个函数,计算机常用函数表