WaterMark

  • 一、什么是水位线?
  • 二、案例分析
  • 三、如何生成水位线?
    • (一)、在SourceFunction中直接定义Timestamps和Watermarks
    • (二)、自定义生成Timstamps和Watermarks

一、什么是水位线?

  • 通常情况下,由于网络或系统等外部因素影响,事件数据往往不能及时传输至Flink系统中,导致数据乱序或者延迟到达等问题,因此,需要有一种机制能够控制数据处理的过程和进度,这种机制就是水位线
  • 水位线本质上是一个时间戳,且是动态变化的,会根据最大事件时间生成
watermark = 进入Flink窗口的最大事件时间(maxEventTime) - 一定的延迟时间(t)
//这个延迟时间t是在程序当中配置的
  • watermark时间戳是与窗口结束时间比较的,当watermark大于窗口结束时间时,意味着窗口结束,需要触发窗口计算
  • 举个例子,某条数据的事件时间为2023:03:16 9:00:00,它的下一条数据的事件时间为2023:03:16 9:06:00,窗口设置为滚动窗口为5分钟,延迟时间t设置为2分钟,此时窗口结束时间为2023:03:16 9:00:00,水位线是2023:03:16 9:06:00 - 2min = 2023:03:16 9:04:00,watermark < window endtime,这两条数据应该在同一个窗口内,下面是具体的例子

二、案例分析

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.api.java.tuple._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Rowcase class User(id:Int,name:String,age:Int,timestamp:Long)
object SqlTest {def main(args: Array[String]): Unit = {val streamEnv = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = TableEnvironment.getTableEnvironment(streamEnv)//指定时间类型为事件时间streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = streamEnv.fromElements(User(1,"nie",22,1511658000),User(2,"hu",20,1511658000),User(2,"xiao",19,1511658000)).assignAscendingTimestamps(_.timestamp * 1000L) //指定水位线tEnv.registerDataStream("testTable",stream,'id, 'name,'age,'event_time.rowtime)val result = tEnv.sqlQuery("select id,sum(age) from testTable group by TUMBLE(event_time,INTERVAL '5' MINUTE),id")result.toRetractStream[Row].print()streamEnv.execute("windowTest")}
}
  • 如上述代码所示,三条数据时间戳一样,也就是事件时间相同,水位线为自带的时间戳*1000L转成毫秒,滚动窗口时间间隔为5min,这时计算结果应该是3条数据都在一个窗口中计算,最终会产生2条数据

  • 如下所示,把第三条时间戳增加300000,也就是增加了5分钟,下面三条数据的真实日期分别为2017-11-26 9:0:0、2017-11-26
    9:0:0、2017-11-26 9:5:0
    val stream = streamEnv.fromElements(User(1,"nie",22,1511658000000L),User(2,"hu",20,1511658000000L),User(2,"xiao",19,1511658300000L)).assignAscendingTimestamps(_.timestamp) //指定水位线
  • 第三条数据正好晚了5分钟,此时前两条数据在一个窗口,第三条数据在一个窗口,最终应该产生三条数据,如下所示


我们直到可以利用水位线处理延迟情况,上面assignAscendingTimestamps方法针对的是数据有序,无法设定允许延迟时间,也就无法处理数据延迟的情况,下面介绍几种生成水位线的方式

三、如何生成水位线?

生成水位线分为两步:

  • 第一步需要指定eventTime,可以通过StreamExecutionEnvironment的TimeCharacteristic指定,还需要在Flink程序中指定event
    time时间戳在数据中的字段信息,在Flink程序中会通过指定字段抽取出对应的事件时间,该过程叫做Timestamps Assigning
  • 第二步就是创建相应的Watermarks,需要用户定义根据Timestamps计算出Watermarks的生成策略
  • 目前Flink支持两种方式指定Timestamps和生成WaterMarks,一种方式在DataStream Source算子接口的Source Function定义,另一种方式是通过自定义Timestamp Assigner和Watermark Generator生成

(一)、在SourceFunction中直接定义Timestamps和Watermarks

(二)、自定义生成Timstamps和Watermarks

自定义生成分为两种:

  • Periodic Watermarks:根据设定时间间隔周期性地生成Watermarks
  • Punctuated Watermarks:根据接入数据的数量生成

1、Periodic Watermarks

  • Periodic Watermark又分为两种:升序模式和固定时延间隔

1)、升序模式

  • 会将数据中的Timestamp根据指定字段提取,并用当前的Timestamp作为最新的watermarks,适用于事件按顺序生成
  • 调用DataStream API中的assignAscendingTimestamps来指定Timestamp字段

eg:

    //指定时间类型为事件时间streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = streamEnv.fromElements(User(1,"nie",22,1511658000),User(2,"hu",20,1511658000),User(2,"xiao",19,1511658300)).assignAscendingTimestamps(_.timestamp * 1000L) //指定水位线

2)、使用固定时延间隔的Timestamp Assigner

  • 通过设定固定的时间间隔来指定Watermark落后于Timestamp的区间长度,也就是最长容忍到多长时间内的数据到达系统

如下代码所示,通过创建BoundedOutOfOrdernessTimestampExtractor实现类来定义Timestamp Assigner,其中第一个参数Time.seconds(10)代表了最长的时延为10s,第二个为extractTimestamp抽取逻辑,选择样例类User的第三个元素作为Timestamps
eg:

 case class User(id:Int,name:String,age:Int,timestamp:Long)//指定时间类型为事件时间streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = streamEnv.fromElements(User(1, "nie", 22, 1511658000),User(2, "hu", 20, 1511658000),User(2, "xiao", 19, 1511658000)).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[User](Time.seconds(10)) {override def extractTimestamp(t: User): Long = t.timestamp})

eg:

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.api.java.tuple._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Rowcase class User(id:Int,name:String,age:Int,timestamp:Long)
object SqlTest {def main(args: Array[String]): Unit = {val streamEnv = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = TableEnvironment.getTableEnvironment(streamEnv)//指定时间类型为事件时间streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = streamEnv.fromElements(User(1, "nie", 22, 1511658000000L),User(2, "hu", 20, 1511658000000L),User(2, "xiao", 19, 1511658003000L),User(2, "feng", 31, 1511658002000L)).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[User](Time.seconds(1)) {override def extractTimestamp(t: User): Long = t.timestamp})tEnv.registerDataStream("testTable",stream,'id, 'name,'age,'event_time.rowtime)val result = tEnv.sqlQuery("select id,sum(age) from testTable group by TUMBLE(event_time,INTERVAL '2' SECOND),id")result.toRetractStream[Row].print()streamEnv.execute("windowTest")}
}

如上述代码所示,设置滚动窗口,窗口大小为2s,允许延迟时间为1s,四条数据的日期分别为2017-11-26 9:0:0、2017-11-26 9:0:0、2017-11-26 9:0:3、2017-11-26 9:0:2,可以看到第1、2、4条数据应该属于同一个窗口,只不过第四条数据延迟了,当第三条数据到达后,水位线应该为1511658003000L - 1000 = 1511658002000L,没有超过窗口结束时间1511658002000L,所以不触发窗口计算,第1、2、4条数据应该还是在一个窗口中计算的

这时候我们修改下代码,如下所示,修改了第三条数据的时间戳

    val stream = streamEnv.fromElements(User(1, "nie", 22, 1511658000000L),User(2, "hu", 20, 1511658000000L),User(2, "xiao", 19, 1511658004000L),User(2, "feng", 31, 1511658002000L)).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[User](Time.seconds(1)) {override def extractTimestamp(t: User): Long = t.timestamp})

这时候当第三条数据到达的时候,水位线为1511658004000L - 1000 = 1511658003000L,超过了窗口结束时间1511658002000L,前两条数据触发计算,这时候第四条数据就没有加入计算

2、Punctuated Watermarks

  • 上述两种是根据时间周期生成Periodic Watermark,用户也可以根据某些特殊条件生成Punctuated Watermarks
  • 如判断数据流中某特殊元素的数量满足条件后生成Watermarks
  • 生成Punctuated Watermarks的逻辑需要通过实现AssignerWithPunctuatedWatermarks接口定义,然后分别复写extractTimestamp方法和checkAndGetNextWatermark方法

eg:判断某个元素的当前状态,如果状态为0则触发生成Watermarks,如果状态不为0,则不触发生成Watermarks。

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[(String,Long,Int)]{//复写extractTimestamps方法,定义抽取Timestamp逻辑override def extractTimestamp(element:(String,Long,Int),
previousElementTimestamp:Long):Long = {element._2}//复写checkAndGetNextWatermark方法,定义Watermark生成逻辑override def checkAndGetNextWatermark(lastElement:(String,Long,Int),
extractedTimestamp:Long):Watermark = {//根据元素中第三位字段状态是否为0生成Watermarkif (lastElement._3 == 0) new Watermark(extractedTimestamp) else null}
}

Flink学习:WaterMark相关推荐

  1. Flink学习-DataStream-HDFSConnector(StreamingFileSink)

    Flink学习-DataStream-HDFSConnector(StreamingFileSink) Flink系列文章 更多Flink系列文章请点击Flink系列文章 更多大数据文章请点击大数据好 ...

  2. 全网第一 | Flink学习面试灵魂40问答案,文末有福利!

    大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! 来源:王知无 作者:王知无 By 暴走大数据 场景描述:这是一份Flink学习面试指北.看看你搞清楚 ...

  3. Flink学习4-流式SQL

    Flink学习4-流式SQL Flink系列文章 更多Flink系列文章请点击Flink系列文章 更多大数据文章请点击大数据好文推荐 摘要 介绍Flink Table Sql API相关概念,还会提供 ...

  4. flink设置watermark以及事件时间字段源码分析

    flink设置watermark以及事件时间字段源码分析 背景 1.1.提取时间戳字段,用于事件时间语义处理数据 1.2.设置水位线(水印)watermark TimestampAssigner 核心 ...

  5. Flink学习1-基础概念

    Flink学习1-基础概念 Flink系列文章 更多Flink系列文章请点击Flink系列文章 更多大数据文章请点击大数据好文推荐 摘要 本文是作者学习Flink的一些文档整理.记录和心得体会,希望与 ...

  6. 年度总结 | 2020 Flink 学习路线总结

    2020年,最后几天了,不管这一年过的怎么样,也都过来了,来年还是得继续努力呀.大数据学习指南给大家整理了一份年度总结系列文章,今天分享的是 Flink 学习路线. 以下资料来源都有标注,基本都属于一 ...

  7. 2020 年 Flink 学习资料整合,建议收藏

    精选30+云产品,助力企业轻松上云!>>> 点击蓝色"大数据每日哔哔"关注我 加个"星标",第一时间获取大数据架构,实战经验 以下资料来源都有 ...

  8. 【Flink】FLink 如果watermark水印时间超出今天会是什么问题呢

    1.概述 FLink 如果watermark水印时间超出今天会是什么问题呢 测试如下 /*** 测试点:测试事件时间,如果中途突然来了一个时间是未来时间 会导致什么?* 当前时间* 2022-01-0 ...

  9. flink的watermark简单理解

    1.flink的watermark的作用是处理乱序,核心有两点: a.延迟等待一段时间,等乱序的数据到达 b.不能一直等,得有个限度,到了时间点没到,那么后面再来的乱序数据只能丢弃 2.对某个时间窗开 ...

最新文章

  1. BDTC 2017 | 中国大数据技术大会全日程和讲师曝光
  2. 输出区间内素数的c语言程序,1137C/C++经典程序训练7---求某个范围内的所有素数...
  3. 图解javascript中this指向
  4. python列表间隔合并_Python使用zip合并相邻列表项的方法示例
  5. ThreadPoolExecutor机制
  6. elasticsearch 客户端工具_ELK集群部署 - elasticsearch-7.9.3
  7. java字节码反编译_javap 反编译 java 字节码文件
  8. 前端学习(557):css与百分比单位
  9. 提到刺这种兵器的guandan
  10. 数据结构(二)——堆
  11. 为linux添加新字体
  12. MAC配置thinkPHP的心路历程(课设vue-tpadmin商城)
  13. 德鲁克的时间管理法—《可以量化的…
  14. 基于SSM高校教室管理系统毕业设计-附源码181523
  15. 灯塔资产系统(ARL)部署
  16. CCNP13:QOS【分类、队列、标记、丢弃】技术
  17. hdu 1116 并查集和欧拉路径
  18. QT之CheckBox单项选择与多项选择
  19. 利用FFmpeg将H.264文件读入内存,再输出封装格式文件
  20. PCSX2:PlayStation 2 游戏模拟器

热门文章

  1. 通信协议——MII/GMII
  2. 归并排序(C语言版)
  3. 动态规划之01背包问题
  4. Eclipse-CDT
  5. CSharp(C#)语言_命名空间和程序集
  6. R语言机器学习 格兰杰因果关系检验(Granger cointegratance)
  7. 启发式函数在A* 中的作用
  8. 【深度学习】ReID相关知识点解析(PCB、BoT、MGN)
  9. GameJam线下48小时极限游戏开发体验
  10. 人脸识别之FaceNet