flink设置watermark以及事件时间字段源码分析

背景

1.1、提取时间戳字段,用于事件时间语义处理数据

1.2、设置水位线(水印)watermark

TimestampAssigner 核心接口介绍

TimestampAssigner 时间分配器接口 实现类关系图:

提取时间戳字段方法

TimestampAssigner 时间戳分配器, 提取数据流中的时间戳字段,

public interface TimestampAssigner<T> extends Function {/*** Assigns a timestamp to an element, in milliseconds since the Epoch.** <p>The method is passed the previously assigned timestamp of the element.* That previous timestamp may have been assigned from a previous assigner,* by ingestion time. If the element did not carry a timestamp before, this value is* {@code Long.MIN_VALUE}.** @param element The element that the timestamp will be assigned to.* @param previousElementTimestamp The previous internal timestamp of the element,*                                 or a negative value, if no timestamp has been assigned yet.* @return The new timestamp.*/long extractTimestamp(T element, long previousElementTimestamp);
}

TimestampAssigner 实现类

AssignerWithPeriodicWatermarks  //周期性的生成水印
AssignerWithPunctuatedWatermarks //打断式的生成,也就是可以每一条数据都生成
BoundedOutOfOrdernessTimestampExtractor //乱序数据周期性生成
AscendingTimestampExtractor //升序数据周期性生成
IngestionTimeExtractor  //进入flink系统时间分配器

设置时间戳、水印方法

DataStream类设置时间戳的方法:assignTimestampsAndWatermarks,指定watermark

 public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {// match parallelism to input, otherwise dop=1 sources could lead to some strange// behaviour: the watermark will creep along very slowly because the elements// from the source go to each extraction operator round robin.final int inputParallelism = getTransformation().getParallelism();final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);TimestampsAndPeriodicWatermarksOperator<T> operator =new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator).setParallelism(inputParallelism);}


1、AssignerWithPeriodicWatermarks接口:

public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {@NullableWatermark getCurrentWatermark();
}

官方实现类:

BoundedOutOfOrdernessTimestampExtractor //乱序数据周期性生成
AscendingTimestampExtractor //升序数据周期性生成
IngestionTimeExtractor //进入flink系统时间分配器

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SwSBcHQr-1608622738958)(C:\Users\64483\AppData\Roaming\Typora\typora-user-images\image-20201222130237492.png)]

因此我们一般选择使用实现类即可

2、AscendingTimestampExtractor 周期性生成watermark,升序数据

//实现类提取时间戳字段方法,调用者实现

public abstract long extractAscendingTimestamp(T element);
//根据数据流时间戳,计算watermark的时间戳   --升序处理数据
@Overridepublic final long extractTimestamp(T element, long elementPrevTimestamp) {//数据流中获取的时间戳final long newTimestamp = extractAscendingTimestamp(element);//如果当前数据的时俱戳大于当前已知的时间戳中的,则更新watermark中的时间戳if (newTimestamp >= this.currentTimestamp) {this.currentTimestamp = newTimestamp;return newTimestamp;} else {//否则打印日志violationHandler.handleViolation(newTimestamp, this.currentTimestamp);return newTimestamp;}}

打印日志处理方法:

public static final class LoggingHandler implements MonotonyViolationHandler {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(AscendingTimestampExtractor.class);@Overridepublic void handleViolation(long elementTimestamp, long lastTimestamp) {LOG.warn("Timestamp monotony violated: {} < {}", elementTimestamp, lastTimestamp);}}

获取当前watermark的方法

 @Overridepublic final Watermark getCurrentWatermark() {//默认延迟1毫秒//如果当前时间戳等于Long.MIN_VALUE 则返回Long.MIN_VALUE,否则返回最大时间戳-1return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);}

AscendingTimestampExtractor 继承 AscendingTimestampExtractor

@PublicEvolving
@Deprecated
public abstract class AscendingTimestampExtractor<T>extends org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor<T> {}

3、BoundedOutOfOrdernessTimestampExtractor 周期性的乱序数据

1、在创建对象时,默认给了一个最大的时间戳, Long.MIN_VALUE + this.maxOutOfOrderness;

2、来一条数据,判断当前时间戳和最大时间戳的大小,如果当前时间戳大于最大时间戳,则更新

3、生成watermark,用最大时间戳减去最大延迟,也就是watermark中的时间戳调慢的时间,比如原本是3点结束的窗口,延迟为1分钟,那么watermark中的时间应该为2分59秒

4、默认是 Long.MIN_VALUE是防止出现最大的时间戳减去最大延迟为负数,watermark中的时间戳为负数,出现时间倒转

BoundedOutOfOrdernessTimestampExtractor 有参构造函数:

/** The current maximum timestamp seen so far. */private long currentMaxTimestamp; //截至目前最大的时间戳/** The timestamp of the last emitted watermark. */private long lastEmittedWatermark = Long.MIN_VALUE; //上次watermark中时间戳/*** The (fixed) interval between the maximum seen timestamp seen in the records* and that of the watermark to be emitted.*/private final long maxOutOfOrderness; //最大延迟时间//构造函数  maxOutOfOrderness为乱序可容忍的最大程度,单位可以为milliseconds  seconds minutes等等public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {//如果延迟时间小于0,抛出异常if (maxOutOfOrderness.toMilliseconds() < 0) {throw new RuntimeException("Tried to set the maximum allowed " +"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");}//最大延迟转换为毫秒数this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();//计算最大的默认的时间戳 防止数据溢出,这里要要加上最大延迟this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;}

实现类重写提取时间戳字段的方法: 调用者使用,提取出指定字段的数据并返回当前时间戳的大小

public abstract long extractTimestamp(T element);

extractTimestamp重载方法,用于更新最大的时间戳,每来一条数据进行判断

@Overridepublic final long extractTimestamp(T element, long previousElementTimestamp) {//获取当前数据的时间戳大小long timestamp = extractTimestamp(element);//如果当前数据的时间戳大小大于目前最大的时间戳,则赋值if (timestamp > currentMaxTimestamp) {currentMaxTimestamp = timestamp;}//如果当前数据的时间戳小于目前最大的时间戳,则不变return timestamp;}

获取watermark中的时间戳:

@Override
public final Watermark getCurrentWatermark() {// this guarantees that the watermark never goes backwards.//当前时间的最大时间戳 - 最大延迟时间 =watermark中的时间戳long potentialWM = currentMaxTimestamp - maxOutOfOrderness;// 如果当前的最大时间戳延迟后的时间戳大于上次的watermark中的时间戳,则更新watermarkif (potentialWM >= lastEmittedWatermark) {lastEmittedWatermark = potentialWM;}return new Watermark(lastEmittedWatermark);
}

4、接口AssignerWithPunctuatedWatermarks

每一条数据都生成watermark的接口

    public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {@NullableWatermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}

没有实现类,需我们自己实现

 public static class Test implements AssignerWithPunctuatedWatermarks {/*** 生成Watermark** @param lastElement        上一条数据* @param extractedTimestamp 水印的时间戳* @return*/@Nullable@Overridepublic Watermark checkAndGetNextWatermark(Object lastElement, long extractedTimestamp) {return null;}//提取时间戳字段@Overridepublic long extractTimestamp(Object element, long previousElementTimestamp) {return 0;}}

Watermark类介绍

@PublicEvolving
public final class Watermark extends StreamElement {/** The watermark that signifies end-of-event-time. */public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);// ------------------------------------------------------------------------/** The timestamp of the watermark in milliseconds. */private final long timestamp;/*** Creates a new watermark with the given timestamp in milliseconds.*///构造函数,传入时间戳public Watermark(long timestamp) {this.timestamp = timestamp;}/*** Returns the timestamp associated with this {@link Watermark} in milliseconds.*///获取当前水印的时间戳大小public long getTimestamp() {return timestamp;}// ------------------------------------------------------------------------@Overridepublic boolean equals(Object o) {return this == o ||o != null && o.getClass() == Watermark.class && ((Watermark) o).timestamp == this.timestamp;}@Overridepublic int hashCode() {return (int) (timestamp ^ (timestamp >>> 32));}@Overridepublic String toString() {return "Watermark @ " + timestamp;}
}

总结

1、Watermark可以理解为一个带着时间戳的空数据或者带着时间戳的标志数据,和其他数据一样,一条一条的处理

2、Watermark只能一直递增

3、Watermark计算方式为当前时间戳减去延迟时间 ,实现窗口延迟

4、window的执行由watermark触发,watermark机制结合window实现

5、升序数据-AscendingTimestampExtractor

​ 乱序数据-BoundedOutOfOrdernessTimestampExtractor

6、BoundedOutOfOrdernessTimestampExtractor比AscendingTimestampExtractor区别就在于,使用了一个最大的时间戳的值,

来对每个数据进行判断,大于则更新,不大于则不更新。而AscendingTimestampExtractor后面的数据如果小于则会出现预警日志

以上仅为个人学习时的理解,如果不确定,麻烦大佬指正!

flink设置watermark以及事件时间字段源码分析相关推荐

  1. VC++设置文件最后修改时间(附源码)

      VC++开发常用功能一系列文章 (欢迎订阅,持续更新...) 第21章:VC++设置文件最后修改时间(附源码) 源代码demo已上传到百度网盘:永久生效  ,代码实现了设置文件最后修改时间 上一篇 ...

  2. View的Touch事件分发(二.源码分析)

    Android中Touch事件的分发又分为View和ViewGroup的事件分发,先来看简单的View的touch事件分发. 主要分析View的dispatchTouchEvent()方法和onTou ...

  3. android 事件分发 代码解析,Android事件分发之源码分析

    原文首发于微信公众号:躬行之,欢迎关注交流! 上篇文章中叙述了 Android 事件分发的大致流程,下面从 Activity.ViewGroup.View 三个方面介绍事件的相关方法,小节如下: Ac ...

  4. 从flink-example分析flink组件(1)WordCount batch实战及源码分析

    上一章<windows下flink示例程序的执行> 简单介绍了一下flink在windows下如何通过flink-webui运行已经打包完成的示例程序(jar),那么我们为什么要使用fli ...

  5. Android事件分发之源码分析,kotlin库

    //这里又调用了FrameLayout中的dispatchTouchEvent方法 return super.dispatchTouchEvent(event); } - } 由于在 FrameLay ...

  6. 升级SpringCloud到Hoxton.SR3出现The bean 'xxx.FeignClientSpecification' could not be registered. 源码分析和解决

    最近提升项目的SpringCloud版本后出错误导致项目无法启动 关键词 The bean 'xxx.FeignClientSpecification' could not be registered ...

  7. ViewGroup的Touch事件分发(源码分析)

    Android中Touch事件的分发又分为View和ViewGroup的事件分发,View的touch事件分发相对比较简单,可参考 View的Touch事件分发(一.初步了解) View的Touch事 ...

  8. 从flink-example分析flink组件(3)WordCount 流式实战及源码分析

    前面介绍了批量处理的WorkCount是如何执行的 <从flink-example分析flink组件(1)WordCount batch实战及源码分析> <从flink-exampl ...

  9. 【Android源码】源码分析深度好文+精编内核解析分享

    阅读Android源码的好处有很多,比如:可以加深我们对系统的了解:可以参考牛人优雅的代码实现:可以从根本上找出一些bug的原因-我们应该庆幸Android是开源的,所有的功能都可以看到实现,所有的b ...

最新文章

  1. 外包干了四年,废了!
  2. vue中使用Base64编码和解码
  3. 20200912 texstudio 添加到字典的错误单词如何删除
  4. Linux 命令[3]:cd
  5. QuartusII下verilog设计使用OC8051和VGA两个IP核组成片上系统
  6. scala写入mysql_Scala:读写文件
  7. 机器学习课程笔记【三】广义线性模型(2)-构建广义线性模型
  8. mysql 协议解析源码 c_MySQL协议分析2
  9. 翻译:Swift5 使用日期类型:Date、DateFormatter、DateComponent
  10. 在Maya和ZBrush中制作战士模型
  11. Ubuntu18网络配置
  12. 洞烛幽微系列 之 梯度 散度 旋度
  13. PHP与MySQL交互实现网页登录注册功能(步骤超详细!!!)
  14. 【Typecho主题】情侣博客Brave主题源码
  15. 编写短信验证码(Java基础)
  16. 线性回归-----标准方程法实现线性回归方程
  17. uniapp跨域设置
  18. 特岗计算机考试面试,你应该知道的特岗教师面试注意事项!快来收藏吧!
  19. 异步处理,Event Souring,事务补偿,实现最终一致性和服务的弹性和批处理
  20. Android录音转为MP2的实现

热门文章

  1. matlab傅里叶变换去噪代码,[转载]MATLAB小波去噪
  2. 无安装费、无服务费、无管理费的“三无”门诊管理软件
  3. 双模sa_三大运营商发声:5G首班车,NSA+SA双模手机是最低门槛
  4. [开源]java版QQ机器人集成小黄鸡功能
  5. linux怎么做路由跟踪_Linux 路由追踪命令:traceroute
  6. 网站访客系统php,2套网站访客IP黑名单源码有效屏蔽ip(PHP实现,CC防火墙)
  7. 华为ICT大赛辅导——防火墙高可用技术(HA)
  8. 待办事项下拉html代码,HTML5 + jQuery 实现日历待办事项demo
  9. 浪人棋牌游戏德闲麻将简介
  10. Python爬虫笔记之用BeautifulSoup及requests库爬取