专栏原创出处:github-源笔记文件 ,github-源码 ,欢迎 Star,转载请附上原文出处链接和本声明。
本节内容对应官方文档 ,本节内容对应示例源码

1 Time(时间)

所有由 Flink 事件-时间流应用生成的条目都必须伴随着一个时间戳。时间戳将一个条目与一个特定的时间点关联起来,一般这个时间点表示的是这条 record 发生的时间。不过 application 可以随意选择时间戳的含义,只要流中条目的时间戳是随着流的前进而递增即可。

支持的时间模型:

  • EventTime 是事件创建的时间,即事件产生时自带时间戳,在 Flink 处理计算中,事件时间难免有延迟,为了处理延迟,必须指定 Watermark 的生成方式
  • IngestionTime 是事件进入 Flink 的时间,即进入 source operator 时获取所在主机时间
  • ProcessingTime 是每一个算子操作的获取所在主机时间

时间模型比较

  • 性能: ProcessingTime> IngestTime> EventTime
  • 延迟: ProcessingTime< IngestTime< EventTime
  • 确定性: EventTime> IngestTime> ProcessingTime

注意:Flink 从数据流模型中实现了许多技术。有关事件时间和水印的一个很好的介绍,请查看下

  • 流式处理概念:时间域、窗口化 , 中文译文
  • 流式处理概念:水印、触发器、积累模式 , 中文译文
  • 流式处理概念:会话窗口 , 中文译文

如何设置时间域?

val env = StreamExecutionEnvironment.getExecutionEnvironment
// 默认使用 TimeCharacteristic.ProcessTime
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)// 可选的:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

2 Watermark(水印/水位线)

实时系统中,由于各种原因造成的延时,造成某些消息发到 flink 的时间延时于事件产生的时间。
如果基于event time构建window,但是对于迟到的事件,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是 Watermark。

Watermark 作为数据处理流中的一部分进行传输,并且携带一个时间戳 t。 一个 Watermark(t) 表示流中应该不再有事件时间比 t 小的元素(某个事件的时间戳比 Watermark 时间大)

水印有两个基本属性:

  1. 它们必须单调递增,以确保任务的 event-time 时钟向前推进,而不是向后
  2. 它们与记录的时间戳是相关的。一个时间戳为 T 的水印表示的是:在它之后接下来的所有记录的时间戳,都必须大于 T

Watermarks(水印) 处理机制如下:

  1. 参考 google 的 DataFlow。
  2. 是 event time 处理进度的标志。
  3. 表示比 watermark 更早 (更老) 的事件都已经到达 (没有比水位线更低的数据 )。
  4. 基于 watermark 来进行窗口触发计算的判断。

2.1 有序流中 Watermarks

某些情况下,基于 Event Time 的数据流是有续的 (相对 event time)。
在有序流中,watermark 就是一个简单的周期性标记。

2.2 乱序流中 Watermarks

在更多场景下,基于 Event Time 的数据流是无续的 (相对 event time)。

在无序流中,Watermarks 至关重要,他告诉 operator 比 Watermarks 更早 (更老/时间戳更小) 的事件已经到达,operator 可以将内部事件时间提前到 Watermarks 的时间戳 (可以触发 window 计算)

2.3 并行流中的 Watermarks

通常情况下, watermark 在源函数中或源函数后生成。如果指定多次 watermark,后面指定的 watermark 会覆盖前面的值。 源函数的每个 sub-task 独立生成水印。

随着水印在算子操作中的流动,它们会提前到达其到达的算子操作的事件时间。每当算子操作提前其事件时间时,同时算子操作会为下游生成一个新的 watermark。

一些算子消耗多个输入流;例如,keyBy(…) or partition(…) function。这样的算子的当前事件时间是其输入流的事件时间中的最小值。随着其输入流更新其事件时间,算子也将更新。

现在详细的解释一下,一个 task 如何释放一个水印,并在收到一个新的水印时如何更新它自身的 event-time 时钟(clock)。Flink 会将数据流分成不同的分区(partition),对于每个分区,都会有不同的 operator task 处理,这些 task 并行工作处理整个数据流。每个分区都是记录(包含时间戳)与水印的数据流。对于一个 operator,基于它与上游/下游 operators 连接的方式,它的 tasks 可以从一个或多个输入分区接受 records 和水印,并释放 records 和水印到一个或多个输出分区。下面我们会详细的介绍一个 task 如何释放水印到多个 output tasks,以及它如何根据(从输入 tasks)收到的水印,推进它自身的 event-time 时钟。

一个 task 对每个输入分区,都维护了一个分区水印。当 task 从一个分区收到一个水印,它会将对应分区的水印,更新为收到的水印最大值,并设置为当前值。然后,task 更新它的 event-time 时钟为所有分区水印中的最小值。如果 event-time 时钟相较之前有增加,则 task 处理所有被触发的计时器,并最终广播它的新事件-时间到所有下游 task,此操作通过释放一个对应的水印到所有连接的输出分区完成。

对于有多个输入流的(例如 Union 或 CoFlatMap 操作)operators,它们的 tasks 也会计算它们自身的 event-time 时钟,并作为所有分区水印的最小值– 他们并不(从不同的输入流中)区分 partition watermarks。这样做的结果是,两个不同的输入流中的数据会根据同一 event-time 时钟进行处理。但是,如果一个 application 的各个输入流的事件时间并不是一致的,则这个行为会导致问题。

Flink 的水印处理以及传播算法,确保了 operator task 恰当地释放一致时间戳的记录和水印。然而它依赖的基础是:所有分区持续提供递增的水印。一旦一个分区的水印不再递增,或者完全空闲(不再发送任何记录与水印),则 task 的事件-时间时钟不会再向前推进,并且 task 的计时器也不会被触发。在基于时间的、依赖于向前(advancing)时钟执行计算(并做清理)的 operators 中,便会造成问题。最终会导致处理延时、state 大小剧增(如果没有定期从所有的输入任务中接收到新的水印)。

若是两个输入流的水印差异太大,也会造成类似的影响。在有两个输入流的 task 中,它的事件-时钟会对应于较慢的流,并且较快的流的 records 或是中间结果一般会缓存到 state 中,直到 event-time 时钟允许处理它们。

3 指定 Timestamp 与生成 Watermarks

3.1 SourceFunction 直接定义

class GameSourceFunction[T <: GameModel](seq: Seq[T], millis: Long = 0) extends SourceFunction[T] {private var counter = 0private var isRunning = trueoverride def run(ctx: SourceFunction.SourceContext[T]): Unit = {while (isRunning && counter < seq.length) {// ctx.collect(seq(counter))val next = seq(counter)ctx.collectWithTimestamp(next, next.eventTimestamp) // 毫秒时间戳//      if (next.hasWatermarkTime) {//        ctx.emitWatermark(new Watermark(next.getWatermarkTime))//      }counter = counter + 1Thread.sleep(millis)}}override def cancel(): Unit = {isRunning = false}
}

3.2 通过 Flink 的 Timestamp Assigner 指定

Flink 提供了两个接口用于指定 Timestamp 与 Watermarks

  • AssignerWithPeriodicWatermarks 按时间间隔周期性生成 Watermarks
  • AssignerWithPunctuatedWatermarks 根据接入的事件触发条件生成 Watermarks

生成类图关系如下:

DataStream支持指定Timestamp 与 Watermarks API

def assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] // 已废弃 @Deprecated
def assignAscendingTimestamps(extractor: T => Long): DataStream[T] // 底层转换为 AssignerWithPeriodicWatermarks
def assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]
def assignTimestampsAndWatermarks(assigner: AssignerWithPunctuatedWatermarks[T]): DataStream[T]

简单示例:

sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 设置时间域
stream.assignTimestampsAndWatermarks(new GameAscendingTimestampExtractor[UserLogin]) // 设置水印生成器

####3.2.1 AssignerWithPeriodicWatermarks(周期性水印生成器)
通过定义生成水印的间隔(每 n 毫秒) ExecutionConfig.setAutoWatermarkInterval(...)
调用AssignerWithPeriodicWatermarksgetCurrentWatermark()方法,如果返回的水印非空且大于前一个水印,则覆盖以前的水印。

总结为:

  • 基于 Timer
  • ExecutionConfig.setAutoWatermarkInterval(msec) (默认是 200ms, 设置 Watermarks 发送的周期)
  • 实现 AssignerWithPeriodicWatermarks 接口
3.2.1.1 Flink-API 提供:时间戳单调递增的分配器

适用于 event-time 戳单调递增的场景,数据没有太多延时。

底层实现为 AscendingTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T>

val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )

3.2.1.2 Flink-API 提供:允许固定延迟的分配器

适用于预先知道最大延迟的场景 (例如最多比之前的元素延迟 3000ms)。

底层实现为 BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T>

3.2.1.3 自定义实现 AssignerWithPeriodicWatermarks 示例

// 设置水印生成器
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator[UserLogin]())
stream.assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator[UserLogin]())/**
周期性水印生成器 = 示例 1
此生成器生成的水印支持处理给定延迟时间范围内的数据
支持延迟的时间动态计算 = 当前处理事件中的最大时间 - 支持最大延迟时间*/
class BoundedOutOfOrdernessGenerator[T <: GameModel] extends AssignerWithPeriodicWatermarks[T] {val maxOutOfOrderness = 3500L // 支持最大延迟时间 3.5 secondsvar currentMaxTimestamp: Long = _ // 当前最大时间override def extractTimestamp(element: T, previousElementTimestamp: Long): Long = {val timestamp = element.eventTimestampcurrentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)timestamp}// 返回水印为当前最大时间减去支持最大延迟时间override def getCurrentWatermark: Watermark =new Watermark(currentMaxTimestamp - maxOutOfOrderness)
}/**
周期性水印生成器 = 示例 2
此生成器生成的水印支持处理给定延迟时间范围内的数据。
支持延迟的时间动态计算 = 当前系统时间 - 支持最大延迟时间*/
class TimeLagWatermarkGenerator[T <: GameModel] extends AssignerWithPeriodicWatermarks[T] {val maxTimeLag = 5000L // 支持最大延迟时间 5 secondsoverride def extractTimestamp(element: T, previousElementTimestamp: Long): Long =element.eventTimestamp// 返回水印为当前时间减去支持最大延迟时间override def getCurrentWatermark: Watermark =new Watermark(System.currentTimeMillis() - maxTimeLag)
}

3.2.2 AssignerWithPunctuatedWatermarks(条件水印生成器)

使用 AssignerWithPunctuatedWatermarks接口。
首先调用该 extractTimestamp(...)方法为元素分配时间戳,然后立即调用checkAndGetNextWatermark(...)方法。
如果返回的水印非空且大于前一个水印,则覆盖以前的水印。

总结为:

  • 实现 AssignerWithPunctuatedWatermarks 接口
  • 生成水印逻辑自定义

注意:可以在每个事件上生成水印。但是,由于每个水印都会在下游引起一些计算,因此过多的水印会降低性能。

// 设置水印生成器
stream.assignTimestampsAndWatermarks(new PunctuatedAssigner[UserLogin]())/** 带条件的水印生成器 = 示例
在特定事件规则,可能会生成新的水印时生成水印*/
class PunctuatedAssigner[T <: GameModel] extends AssignerWithPunctuatedWatermarks[T] {override def extractTimestamp(element: T, previousElementTimestamp: Long): Long = {element.eventTimestamp}override def checkAndGetNextWatermark(lastElement: T, extractedTimestamp: Long): Watermark = {if (lastElement.hasWatermarkMarker) new Watermark(extractedTimestamp) else null}
}

4.为每个 Kafka 分区分配时间戳/水印

当 kafka 作为数据源时,kafka 的每个 Partition 分区里面时间戳可能是升序或者乱序模式。通常情况,我们会多个 Partition 分区并行处理,我们可以为 kafka 配置水印。
kafka 内部为每个 Partition 分区维护一个水印,并且在流进行 shuffle 时以2.3 并行流中的 Watermarks进行水印合并

val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})val stream: DataStream[MyType] = env.addSource(kafkaSource)

Flink DataStream时间水印机制相关推荐

  1. flink DataStream API(三)事件时间-生成水印

    文章目录 生成水印 水印策略介绍 使用水印策略 处理空闲源 编写 `WatermarkGenerators` 编写周期 WatermarkGenerator 编写标点WatermarkGenerato ...

  2. flink 不设置水印_Flink基础:时间和水印

    ​ 往期推荐: 本篇终于到了Flink的核心内容:时间与水印.最初接触这个概念是在Spark Structured Streaming中,一直无法理解水印的作用.直到使用了一段时间Flink之后,对实 ...

  3. Flink事件时间、水印以及迟到数据处理的个人理解

    Flink中的时间概念 Flink在流式传输程序中支持不同的时间概念: ProcessingTime: 处理时间,正在执行操作的机器的时间 EventTime: 事件时间,事件发生的时间 Ingest ...

  4. Flink事件时间和水印详解

    前言 Flink使用版本:1.12.1.   水印是一个标记的时间戳,是一个标记:意味着水印代表时间前的数据均已到达(人为的设定--开发人员可以控制延迟和完整性之间的权衡),这一点水印保障了乱序问题的 ...

  5. 2023-01-18 flink 11.6 时间水印 和 窗口周期的关系计算方法

    forBoundedOutOfOrderness 和 TumblingEventTimeWindows forBoundedOutOfOrderness(M) TumblingEventTimeWin ...

  6. 【Flink】FLink 如果watermark水印时间超出今天会是什么问题呢

    1.概述 FLink 如果watermark水印时间超出今天会是什么问题呢 测试如下 /*** 测试点:测试事件时间,如果中途突然来了一个时间是未来时间 会导致什么?* 当前时间* 2022-01-0 ...

  7. 彻底搞清 Flink 中的 Window 机制

    [CSDN 编者按]Window是处理无限流的核心.Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理.Flink提 ...

  8. Flink的时间语义和Watermark

    1 时间语义    数据迟到的概念是:数据先产生,但是处理的时候滞后了    在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:    Event Time:是事件创建的时间.它通常由事件 ...

  9. Flink专题四:Flink DataStream 窗口介绍及使用

    由于工作需要最近学习flink 现记录下Flink介绍和实际使用过程 这是flink系列的第四篇文章 Flink DataStream 窗口介绍及使用 窗口介绍 时间窗口 翻滚窗口(数据以一个时间断为 ...

最新文章

  1. vmware虚拟机怎么让窗口自动调整大小适应主机
  2. ActiveMQ—安装配置及使用
  3. mysql数据库主从同步过程详述(三)
  4. 火墙之firewalld
  5. 牛客网 PAT 算法历年真题 1003: 数素数 (20)
  6. [vue] vue要做权限管理该怎么做?如果控制到按钮级别的权限怎么做?
  7. 中国石油大学(华东)计算机科学与技术,2017年中国石油大学(华东)函授本科计算机科学与技术专业...
  8. 这8种保证线程安全的技术你都知道吗?
  9. mysql中文问号 linux,解决Linux系统下Mysql数据库中文显示成问号的问题
  10. exists sql用法_彻底弄懂sql select各种查询用法
  11. Python学习笔记:用Python获取数据(本地数据与网络数据)
  12. Makefile 的使用
  13. lazy-load-img.js 源码 学习笔记及原理说明
  14. pdf去除密码 html,pdf密码移除工具
  15. 重新思考路易斯维尔足球品牌
  16. 微信小程序毕业设计 基于微信小程序在线考试系统开题报告
  17. 模拟器怎么安装xposed框架
  18. Traceback (most recent call last):IndexError: list assignment index out of range
  19. mysql消除冗余_mysql剔除冗余数据
  20. Sqlserver的身份验证模式

热门文章

  1. DEBUG:惠普打印机页边距总是不对
  2. 简转繁的JS代码(转)
  3. 在职研究生(多重继承)Python
  4. ArcGIS中无法复制粘贴的问题
  5. 设计模式详解:Singleton(单例类)
  6. FPGA电源设计方案
  7. Macbook Pro 鼠标卡顿问题
  8. flappy+bird+c语言程序,C语言实现flappy bird游戏
  9. 科研篇一:NeurIPS2019 分类整理-对抗样本Meta-Learning
  10. 联想笔记本声音太小怎么办_笔记本电脑声音变小了怎么办 这里有妙招