1 时间语义

   数据迟到的概念是:数据先产生,但是处理的时候滞后了

   在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:

   Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。

   Ingestion Time:是数据进入Flink的时间。

   Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。

   在Flink的流式处理中,绝大部分的业务都会使用eventTime,一般只在eventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。引入EventTime的时间属性如下:

val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer[MyEvent](topic, schema, props))stream.keyBy( _.getUser ).timeWindow(Time.hours(1)).reduce( (a, b) => a.add(b) ).addSink(...)

   设置了EventTime后后面处理底层会判断

   注意:设置了事件时间,但是并不知道事件时间,Event Time 的使用一定要指定数据源中的时间戳,通过assignTimestampsAndWatermarks指定,时间戳要是ms单位。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream: DataStream[MyEvent] = env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter())val withTimestampsAndWatermarks: DataStream[MyEvent] = stream.filter( _.severity == WARNING ).assignTimestampsAndWatermarks(<watermark strategy>)withTimestampsAndWatermarks.keyBy( _.getGroup ).timeWindow(Time.seconds(10)).reduce( (a, b) => a.add(b) ).addSink(...)

   对于排序好的数据,不需要延迟触发,可以只指定时间戳就行了

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream.assignTimestampsAndWatermarks(_.timestamp)

   对于乱序数据调用 assignTimestampAndWatermarks 方法,传入一个 BoundedOutOfOrdernessTimestampExtractor,就可以指定 watermark

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[WC](Time.milliseconds(1000)){override def extractTimestamp(element: WC): Long = {element.timestamp * 1000}}

2 WaterMark

2.1 什么是WaterMark

   我们的数据从采集经过kafka,etl等操作要耗时的,再到流经source,到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生。

   迟到数据是因为有延迟,简单的想法就多等一下。不要5秒的事件到了就关闭窗口,多等一会。我们要考虑的是当前事件的时间进展到底要按照什么时间算,也就是说假设现在5秒的窗口要关闭,设置延迟为2秒,那么5秒的数据来了就多等2秒,5秒的事件来了就相当于还没有进展到5秒,是进展到了5-2=3秒,也就是时间才进展到3秒。按照这种多等2秒的方式的话要等到时间戳是7的数据来了之后7-2=5才关闭5秒的窗口。这就提出了Watermark

   乱序,其实就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。

   Watermark可以从以下几个方面理解:①Watermark是一种衡量Event Time进展的机制。②Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现。③数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经到达了,因此,window的执行也是由Watermark触发的。④Watermark可以理解成一个延迟触发机制,我们可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime - t的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行

   Watermark延迟时间的设置一般根据数据的乱序情况定义,通常设置成最大乱序程度

2.2 Watermark传递

   真正的Watermark其实就是一条特殊的记录,可以认为是插入数据流里面的一个特殊数据,Watermark可以理解为是一个有时间戳的特殊数据结构,就和数据一样一条一条来,后面处理数据如果是正常数据就正常处理,如果是Watermark就按照对于时间的操作该关闭窗口就关闭窗口。

   Watermark必须单调递增,既然表示当前事件时间的进展,时间只能朝前不停的推进,另外总和当前数据的时间戳相关,数据的时间戳就应该是当前的事件时间。

   当Flink接收到数据时,会按照一定的规则去生成Watermark。Watermark要求单调递增的话就选取所有当前已经来的数据里面最大的时间戳作为当前的事件时间,要多等一会的话在当前最大的时间戳基础上再减去一个延迟时间就可以了,即maxEventTime - 延迟时长。所以Watermark是基于数据携带的时间戳生成的,如果Watermark比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。

   有序流的Watermarker(最大延迟时间为0)如下图所示:

   乱序流的Watermarker(最大延迟时间为4)如下图所示:

   上图中,采用周期性插入Watermark的生成策略,默认每200ms系统插入Watermark。我们设置的允许最大延迟到达时间为4s,当系统要插入第一个Watermark时查看此时数据中的最大事件时间为15,所以插入的Watermark是11s。过了200ms后到了第二次插入watermark的时候,此时数据中的最大事件时间为22,所以插入Watermark是18s。果我们的窗口1是1s-10s,窗口2是10s-20s,那么Watermarker为11到达之后需要触发窗口1。一旦触发以当前时刻为准在窗口范围内的所有所有数据都会收入窗中。

2.3 Watermark的传递

   Watermark的传递如上图所示。

   Flink 的传递策略基本上遵循三点:①watermark 会以广播的形式在算子之间进行传播。并行任务没有数据交互不考虑,只要考虑上游有多少个任务给他发数据,下游要发送多少个数据到别的任务。②如果在程序里面收到了一个 Long.MAX_VALUE这个数值的 watermark,就表示对应的那一条流的一个部分不会再有数据发过来了,它相当于就是一个终止的标志。③单流输入取其大,多流输入取小。不同的上游任务发来的Watermark不一样,不能按照上游所有的Watermark中最大的Watermark来判定当前的事件时间,而是应该按照最小的那个来判定,因为Watermark代表的数据是他之前的数据都到期了,如果只接收到一个分区的Watermark是29表示这个分区29之前数据已经到齐了,但是不能保证当前任务不在接收29之前的数据,因为之前别的Watermark可能还没进展到29,所以应该按照最小的。

   底层实现:上游有2个分区就会对每一个分区都去创建一个分区的Watermark(PARTITION Watermark),分别是29,14所以当前任务的事件时间是14,那么下游的子任务广播出去也是14,14之前的数据都到齐了。接下来一个分区来了一个新的Watermark是17,相当于这个分区的时间进展为17之前的都到齐,那么首先更新当前的Watermark,然后观察现在所有分区的Watermark最小值是否改变,如果改变那么事件时间就朝前进展,事件时间更新就往下游广播。

2.4 WaterMark使用

   watermark对于有序数据,最常见的引用方式如下:

dataStream.assignTimestampsAndWatermarks(_.timestamp)

   升序数据不用管Watermark,本身数据来就带有时间戳

   watermark对于乱序数据,最常见的引用方式如下:

dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Element](Time.seconds(allowDealy)) {override def extractTimestamp(element: Element): Long = element.ds})

   Watermark就是在assignTimestampsAndWatermarks里面定义出来的,BoundedOutOfOrdernessTimestampExtractor 是Flink内置提供的允许乱序最大延时的watermark生成方式,只需要重写其extractTimestamp方法。现在kafka源也支持直接生成Watermark,所以etl的时候可以把Watermark也产生。不过我们一般是在Flink把数据读进来做了转换之后马上分配一个Watermark。Watermark要保证正确性,延迟时间一般定义成最大的乱序程度(从数据里面提炼出来的参数)。同个分区数据可能会乱序,Watermark不会乱序(单调递增,取最大的时间戳减去延迟时间)

2.5 自定义WaterMark

   watermark的生成策略有两种:一种是AssignerWithPeriodicWatermarks周期性生成(隔一段时间系统自动插入),另外一种是AssignerWithPunctuatedWatermarks根据特定标记生成。这两个接口都是Flink暴露了TimestampAssigner接口的子类型。实际生成中大量密集数据比较多,稀疏较少,所以一般使用周期性AssignerWithPeriodicWatermarks方式。

   周期性的生成watermark系统会周期性的将watermark插入到流中。默认周期是200毫秒。可以使用ExecutionConfig.setAutoWatermarkInterval(watermarkInterval)方法进行设置。每隔watermarkInterval,Flink会调用AssignerWithPeriodicWatermarks的getCurrentWatermark(watermarkInterval)方法。如果方法返回的watermark大于之前的watermark,新的watermark会被插入到流中。这个检查保证了watermark是单调递增的。如果方法返回的时间戳小于等于之前watermark,则不会产生新的watermark。

   自定义一个周期性的时间戳抽取:

class MyPeriodicAssigner extends AssignerWithPeriodicWatermarks[Element] {val bound: Long = 60 * 1000 // 延时为1分钟var maxTs: Long = Long.MinValue // 观察到的最大时间戳override def getCurrentWatermark: Watermark = {new Watermark(maxTs - bound)}override def extractTimestamp(r: Element, previousTS: Long) = {maxTs = maxTs.max(r.timestamp)r.timestamp}
}

   间断式地生成watermark。和周期性生成的方式不同,这种方式不是固定时间的,而是可以根据需要对每条数据进行筛选和处理,自定义一个间断式地生成watermar:

class MyPunctuatedAssigner extends AssignerWithPunctuatedWatermarks[Element] {val bound: Long = 60 * 1000override def checkAndGetNextWatermark(r: Element, extractedTS: Long): Watermark = {if (r.status == "sucess") {new Watermark(extractedTS - bound)} else {null}}override def extractTimestamp(r: Element, previousTS: Long): Long = {r.timestamp}
}

Flink的时间语义和Watermark相关推荐

  1. Flink时间语义与watermark的原理

    时间语义 我们可以直接在代码中,对执行环境调用setStreamTimeCharacteristic方法,设置流的时间特性 val env: StreamExecutionEnvironment = ...

  2. 最新 Flink 1.13 时间和窗口(时间语义、Watermark、Window 窗口、Trigger)快速入门、详细教程

    时间和窗口 文章目录 时间和窗口 一.Flink 的三种时间语义 二.水位线(Watermark) 1. Flink 中的 Watermark 机制 2. 如何生成水位线 3. 水位线的传递 三.窗口 ...

  3. 4.1.19 Flink-流处理框架-Flink中的时间语义和watermark水位线

    目录 1.Flink中的时间语义 1.1 EventTime 的代码设置 2.Watermark水位线 2.1 watermark的基本概念 2.2 watermark的特点和传递 2.3 Water ...

  4. Flink之乱序处理,时间语义,WaterMark,允许迟到数据,侧输出流

    一.理解Flink的乱序问题 理解Flink的乱序问题,的先理解Flink的时间语义. Flink有3中时间语义:Event Time:事件创建的时间Ingestion Time:数据进入Flink的 ...

  5. 41-58-flink-window-时间语义和watermark

    41-Flink-window相关: Flink的Window 参考:https://ashiamd.github.io/docsify-notes/#/README 1.Window Flink_W ...

  6. 大数据——Flink 时间语义

    目录 一.时间语义 1.1 三种时间概念 1.1.1 ProcessTime 在代码中的使用 1.1.2 EventTime 在代码中的使用 1.1.3 关于窗口起始时间的计算值 二.对事件的处理 2 ...

  7. Flink中的时间和窗口——时间语义

    文章目录 前言 一.时间语义 1.1.Flink 中的时间语义 1.1.1.处理时间(Processing Time) 1.1.2.事件时间(Event Time) 1.2.哪种时间语义更重要 1.2 ...

  8. Flink 时间语义与水位线(Watermarks)

    文章目录 时间语义 水位线(Watermarks) 时间语义 对于流式数据处理,最大的特点就是数据上具有时间的属性特征,Flink根据时间产生的位置不同,将时间区分为如下三种时间概念 事件时间(Eve ...

  9. flink 三种时间机制_Flink的时间类型和watermark机制

    一FlinkTime类型 有3类时间,分别是数据本身的产生时间.进入Flink系统的时间和被处理的时间,在Flink系统中的数据可以有三种时间属性: Event Time 是每条数据在其生产设备上发生 ...

最新文章

  1. HTML - embed 与 object 之争
  2. 迭代器笔试题,看看你会不会?
  3. 从数仓到数据中台,谈技术选型最优解
  4. 【2017-02-19】数据类型、类型转换、常量、变量、转义符。
  5. 钉钉产品介绍_钉钉正式推出智能OA:免费开放、一站解决“人财物事”管理难题...
  6. 3 配置ftp文件服务器,03-FTP和TFTP配置
  7. 1-9月欧洲新能源车份额上升 混动车注册量增加8.8%
  8. 利用哈夫曼树编码与译码
  9. 计算机电脑五笔怎么打,卸五笔怎么打_电脑极品五笔输入法卸载方法介绍
  10. 计算机如何将两个磁盘合在一起,win10怎么把电脑自带的两个磁盘合并到一起
  11. 【阿里云盘变本地硬盘】CloudDrive1.1.59.2 (修复阿里云盘扫描二维码无法登录的问题)
  12. B站小甲鱼Python基础学习课堂笔记
  13. 使用 MEAN 进行全栈开发基础篇——2、弄一个简单的用户管理试试
  14. NOIp2018 pj 滚粗记
  15. 小饭馆引流推广流程,小饭馆活动促销方案
  16. 国家标准:电子计算机机房设计规范
  17. 侠众道武功最佳练级方案_《侠众道》武功选择推荐(图文)
  18. 在Linux系统中如何把文件拷贝到U盘
  19. Unity3D界面功能操作讲解
  20. 杰里之ANC降噪MIC的要求】【篇】

热门文章

  1. ios 后台下载,断点续传总结
  2. Luogu 4244 [SHOI2008]仙人掌图
  3. 重装系统失败后怎么用好系统U盘启动解决?
  4. 未知宽高元素的水平垂直居中
  5. DateTime和字符串转换问题
  6. (待解)静态构造器和静态字段调用的相互嵌套
  7. int *p = *******a是什么鬼?
  8. 面试常见的C语言字符串操作
  9. 1200可以读取modbus tcp_S7-1200 作 MODBUS TCP服务器
  10. 图像目标分割_2 FCN(Fully Convolutional Networks for Semantic Segmentation)