flink设置watermark以及事件时间字段源码分析
flink设置watermark以及事件时间字段源码分析
背景
1.1、提取时间戳字段,用于事件时间语义处理数据
1.2、设置水位线(水印)watermark
TimestampAssigner 核心接口介绍
TimestampAssigner 时间分配器接口 实现类关系图:
提取时间戳字段方法:
TimestampAssigner 时间戳分配器, 提取数据流中的时间戳字段,
public interface TimestampAssigner<T> extends Function {/*** Assigns a timestamp to an element, in milliseconds since the Epoch.** <p>The method is passed the previously assigned timestamp of the element.* That previous timestamp may have been assigned from a previous assigner,* by ingestion time. If the element did not carry a timestamp before, this value is* {@code Long.MIN_VALUE}.** @param element The element that the timestamp will be assigned to.* @param previousElementTimestamp The previous internal timestamp of the element,* or a negative value, if no timestamp has been assigned yet.* @return The new timestamp.*/long extractTimestamp(T element, long previousElementTimestamp);
}
TimestampAssigner 实现类
AssignerWithPeriodicWatermarks //周期性的生成水印
AssignerWithPunctuatedWatermarks //打断式的生成,也就是可以每一条数据都生成
BoundedOutOfOrdernessTimestampExtractor //乱序数据周期性生成
AscendingTimestampExtractor //升序数据周期性生成
IngestionTimeExtractor //进入flink系统时间分配器
设置时间戳、水印方法
DataStream类设置时间戳的方法:assignTimestampsAndWatermarks,指定watermark
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {// match parallelism to input, otherwise dop=1 sources could lead to some strange// behaviour: the watermark will creep along very slowly because the elements// from the source go to each extraction operator round robin.final int inputParallelism = getTransformation().getParallelism();final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);TimestampsAndPeriodicWatermarksOperator<T> operator =new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator).setParallelism(inputParallelism);}
1、AssignerWithPeriodicWatermarks接口:
public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {@NullableWatermark getCurrentWatermark();
}
官方实现类:
BoundedOutOfOrdernessTimestampExtractor //乱序数据周期性生成
AscendingTimestampExtractor //升序数据周期性生成
IngestionTimeExtractor //进入flink系统时间分配器
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SwSBcHQr-1608622738958)(C:\Users\64483\AppData\Roaming\Typora\typora-user-images\image-20201222130237492.png)]
因此我们一般选择使用实现类即可
2、AscendingTimestampExtractor 周期性生成watermark,升序数据
//实现类提取时间戳字段方法,调用者实现
public abstract long extractAscendingTimestamp(T element);
//根据数据流时间戳,计算watermark的时间戳 --升序处理数据
@Overridepublic final long extractTimestamp(T element, long elementPrevTimestamp) {//数据流中获取的时间戳final long newTimestamp = extractAscendingTimestamp(element);//如果当前数据的时俱戳大于当前已知的时间戳中的,则更新watermark中的时间戳if (newTimestamp >= this.currentTimestamp) {this.currentTimestamp = newTimestamp;return newTimestamp;} else {//否则打印日志violationHandler.handleViolation(newTimestamp, this.currentTimestamp);return newTimestamp;}}
打印日志处理方法:
public static final class LoggingHandler implements MonotonyViolationHandler {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(AscendingTimestampExtractor.class);@Overridepublic void handleViolation(long elementTimestamp, long lastTimestamp) {LOG.warn("Timestamp monotony violated: {} < {}", elementTimestamp, lastTimestamp);}}
获取当前watermark的方法
@Overridepublic final Watermark getCurrentWatermark() {//默认延迟1毫秒//如果当前时间戳等于Long.MIN_VALUE 则返回Long.MIN_VALUE,否则返回最大时间戳-1return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);}
AscendingTimestampExtractor 继承 AscendingTimestampExtractor
@PublicEvolving
@Deprecated
public abstract class AscendingTimestampExtractor<T>extends org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor<T> {}
3、BoundedOutOfOrdernessTimestampExtractor 周期性的乱序数据
1、在创建对象时,默认给了一个最大的时间戳, Long.MIN_VALUE + this.maxOutOfOrderness;
2、来一条数据,判断当前时间戳和最大时间戳的大小,如果当前时间戳大于最大时间戳,则更新
3、生成watermark,用最大时间戳减去最大延迟,也就是watermark中的时间戳调慢的时间,比如原本是3点结束的窗口,延迟为1分钟,那么watermark中的时间应该为2分59秒
4、默认是 Long.MIN_VALUE是防止出现最大的时间戳减去最大延迟为负数,watermark中的时间戳为负数,出现时间倒转
BoundedOutOfOrdernessTimestampExtractor 有参构造函数:
/** The current maximum timestamp seen so far. */private long currentMaxTimestamp; //截至目前最大的时间戳/** The timestamp of the last emitted watermark. */private long lastEmittedWatermark = Long.MIN_VALUE; //上次watermark中时间戳/*** The (fixed) interval between the maximum seen timestamp seen in the records* and that of the watermark to be emitted.*/private final long maxOutOfOrderness; //最大延迟时间//构造函数 maxOutOfOrderness为乱序可容忍的最大程度,单位可以为milliseconds seconds minutes等等public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {//如果延迟时间小于0,抛出异常if (maxOutOfOrderness.toMilliseconds() < 0) {throw new RuntimeException("Tried to set the maximum allowed " +"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");}//最大延迟转换为毫秒数this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();//计算最大的默认的时间戳 防止数据溢出,这里要要加上最大延迟this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;}
实现类重写提取时间戳字段的方法: 调用者使用,提取出指定字段的数据并返回当前时间戳的大小
public abstract long extractTimestamp(T element);
extractTimestamp重载方法,用于更新最大的时间戳,每来一条数据进行判断
@Overridepublic final long extractTimestamp(T element, long previousElementTimestamp) {//获取当前数据的时间戳大小long timestamp = extractTimestamp(element);//如果当前数据的时间戳大小大于目前最大的时间戳,则赋值if (timestamp > currentMaxTimestamp) {currentMaxTimestamp = timestamp;}//如果当前数据的时间戳小于目前最大的时间戳,则不变return timestamp;}
获取watermark中的时间戳:
@Override
public final Watermark getCurrentWatermark() {// this guarantees that the watermark never goes backwards.//当前时间的最大时间戳 - 最大延迟时间 =watermark中的时间戳long potentialWM = currentMaxTimestamp - maxOutOfOrderness;// 如果当前的最大时间戳延迟后的时间戳大于上次的watermark中的时间戳,则更新watermarkif (potentialWM >= lastEmittedWatermark) {lastEmittedWatermark = potentialWM;}return new Watermark(lastEmittedWatermark);
}
4、接口AssignerWithPunctuatedWatermarks
每一条数据都生成watermark的接口
public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {@NullableWatermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}
没有实现类,需我们自己实现
public static class Test implements AssignerWithPunctuatedWatermarks {/*** 生成Watermark** @param lastElement 上一条数据* @param extractedTimestamp 水印的时间戳* @return*/@Nullable@Overridepublic Watermark checkAndGetNextWatermark(Object lastElement, long extractedTimestamp) {return null;}//提取时间戳字段@Overridepublic long extractTimestamp(Object element, long previousElementTimestamp) {return 0;}}
Watermark类介绍
@PublicEvolving
public final class Watermark extends StreamElement {/** The watermark that signifies end-of-event-time. */public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);// ------------------------------------------------------------------------/** The timestamp of the watermark in milliseconds. */private final long timestamp;/*** Creates a new watermark with the given timestamp in milliseconds.*///构造函数,传入时间戳public Watermark(long timestamp) {this.timestamp = timestamp;}/*** Returns the timestamp associated with this {@link Watermark} in milliseconds.*///获取当前水印的时间戳大小public long getTimestamp() {return timestamp;}// ------------------------------------------------------------------------@Overridepublic boolean equals(Object o) {return this == o ||o != null && o.getClass() == Watermark.class && ((Watermark) o).timestamp == this.timestamp;}@Overridepublic int hashCode() {return (int) (timestamp ^ (timestamp >>> 32));}@Overridepublic String toString() {return "Watermark @ " + timestamp;}
}
总结:
1、Watermark可以理解为一个带着时间戳的空数据或者带着时间戳的标志数据,和其他数据一样,一条一条的处理
2、Watermark只能一直递增
3、Watermark计算方式为当前时间戳减去延迟时间 ,实现窗口延迟
4、window的执行由watermark触发,watermark机制结合window实现
5、升序数据-AscendingTimestampExtractor
乱序数据-BoundedOutOfOrdernessTimestampExtractor
6、BoundedOutOfOrdernessTimestampExtractor比AscendingTimestampExtractor区别就在于,使用了一个最大的时间戳的值,
来对每个数据进行判断,大于则更新,不大于则不更新。而AscendingTimestampExtractor后面的数据如果小于则会出现预警日志
以上仅为个人学习时的理解,如果不确定,麻烦大佬指正!
flink设置watermark以及事件时间字段源码分析相关推荐
- VC++设置文件最后修改时间(附源码)
VC++开发常用功能一系列文章 (欢迎订阅,持续更新...) 第21章:VC++设置文件最后修改时间(附源码) 源代码demo已上传到百度网盘:永久生效 ,代码实现了设置文件最后修改时间 上一篇 ...
- View的Touch事件分发(二.源码分析)
Android中Touch事件的分发又分为View和ViewGroup的事件分发,先来看简单的View的touch事件分发. 主要分析View的dispatchTouchEvent()方法和onTou ...
- android 事件分发 代码解析,Android事件分发之源码分析
原文首发于微信公众号:躬行之,欢迎关注交流! 上篇文章中叙述了 Android 事件分发的大致流程,下面从 Activity.ViewGroup.View 三个方面介绍事件的相关方法,小节如下: Ac ...
- 从flink-example分析flink组件(1)WordCount batch实战及源码分析
上一章<windows下flink示例程序的执行> 简单介绍了一下flink在windows下如何通过flink-webui运行已经打包完成的示例程序(jar),那么我们为什么要使用fli ...
- Android事件分发之源码分析,kotlin库
//这里又调用了FrameLayout中的dispatchTouchEvent方法 return super.dispatchTouchEvent(event); } - } 由于在 FrameLay ...
- 升级SpringCloud到Hoxton.SR3出现The bean 'xxx.FeignClientSpecification' could not be registered. 源码分析和解决
最近提升项目的SpringCloud版本后出错误导致项目无法启动 关键词 The bean 'xxx.FeignClientSpecification' could not be registered ...
- ViewGroup的Touch事件分发(源码分析)
Android中Touch事件的分发又分为View和ViewGroup的事件分发,View的touch事件分发相对比较简单,可参考 View的Touch事件分发(一.初步了解) View的Touch事 ...
- 从flink-example分析flink组件(3)WordCount 流式实战及源码分析
前面介绍了批量处理的WorkCount是如何执行的 <从flink-example分析flink组件(1)WordCount batch实战及源码分析> <从flink-exampl ...
- 【Android源码】源码分析深度好文+精编内核解析分享
阅读Android源码的好处有很多,比如:可以加深我们对系统的了解:可以参考牛人优雅的代码实现:可以从根本上找出一些bug的原因-我们应该庆幸Android是开源的,所有的功能都可以看到实现,所有的b ...
最新文章
- 外包干了四年,废了!
- vue中使用Base64编码和解码
- 20200912 texstudio 添加到字典的错误单词如何删除
- Linux 命令[3]:cd
- QuartusII下verilog设计使用OC8051和VGA两个IP核组成片上系统
- scala写入mysql_Scala:读写文件
- 机器学习课程笔记【三】广义线性模型(2)-构建广义线性模型
- mysql 协议解析源码 c_MySQL协议分析2
- 翻译:Swift5 使用日期类型:Date、DateFormatter、DateComponent
- 在Maya和ZBrush中制作战士模型
- Ubuntu18网络配置
- 洞烛幽微系列 之 梯度 散度 旋度
- PHP与MySQL交互实现网页登录注册功能(步骤超详细!!!)
- 【Typecho主题】情侣博客Brave主题源码
- 编写短信验证码(Java基础)
- 线性回归-----标准方程法实现线性回归方程
- uniapp跨域设置
- 特岗计算机考试面试,你应该知道的特岗教师面试注意事项!快来收藏吧!
- 异步处理,Event Souring,事务补偿,实现最终一致性和服务的弹性和批处理
- Android录音转为MP2的实现
热门文章
- matlab傅里叶变换去噪代码,[转载]MATLAB小波去噪
- 无安装费、无服务费、无管理费的“三无”门诊管理软件
- 双模sa_三大运营商发声:5G首班车,NSA+SA双模手机是最低门槛
- [开源]java版QQ机器人集成小黄鸡功能
- linux怎么做路由跟踪_Linux 路由追踪命令:traceroute
- 网站访客系统php,2套网站访客IP黑名单源码有效屏蔽ip(PHP实现,CC防火墙)
- 华为ICT大赛辅导——防火墙高可用技术(HA)
- 待办事项下拉html代码,HTML5 + jQuery 实现日历待办事项demo
- 浪人棋牌游戏德闲麻将简介
- Python爬虫笔记之用BeautifulSoup及requests库爬取