文章目录

  • 背景
  • 新的水印生成接口
  • 内置水印生成策略
    • 固定延迟生成水印
    • 单调递增生成水印
  • event时间的获取
  • 处理空闲数据源

背景

在flink 1.11之前的版本中,提供了两种生成水印(Watermark)的策略,分别是AssignerWithPunctuatedWatermarks和AssignerWithPeriodicWatermarks,这两个接口都继承自TimestampAssigner接口。

用户想使用不同的水印生成方式,则需要实现不同的接口,但是这样引发了一个问题,对于想给水印添加一些通用的、公共的功能则变得复杂,因为我们需要给这两个接口都同时添加新的功能,这样还造成了代码的重复。

所以为了避免代码的重复,在flink 1.11 中对flink的水印生成接口进行了重构,

新的水印生成接口

当我们构建了一个DataStream之后,使用assignTimestampsAndWatermarks方法来构造水印,新的接口需要传入一个WatermarkStrategy对象。

DataStream#assignTimestampsAndWatermarks(WatermarkStrategy<T>)

WatermarkStrategy 这个接口是做什么的呢?这里面提供了很多静态的方法和带有缺省实现的方法,只有一个方法是非default和没有缺省实现的,就是下面的这个方法。

 /*** Instantiates a WatermarkGenerator that generates watermarks according to this strategy.*/@OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);

所以默认情况下,我们只需要实现这个方法就行了,这个方法主要是返回一个
WatermarkGenerator,我们在进入这里边看看。

@Public
public interface WatermarkGenerator<T> {/*** Called for every event, allows the watermark generator to examine and remember the* event timestamps, or to emit a watermark based on the event itself.*/void onEvent(T event, long eventTimestamp, WatermarkOutput output);/*** Called periodically, and might emit a new watermark, or not.** <p>The interval in which this method is called and Watermarks are generated* depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.*/void onPeriodicEmit(WatermarkOutput output);
}

这个方法简单明了,主要是有两个方法:

  • onEvent :每个元素都会调用这个方法,如果我们想依赖每个元素生成一个水印,然后发射到下游(可选,就是看是否用output来收集水印),我们可以实现这个方法.
  • onPeriodicEmit : 如果数据量比较大的时候,我们每条数据都生成一个水印的话,会影响性能,所以这里还有一个周期性生成水印的方法。这个水印的生成周期可以这样设置:env.getConfig().setAutoWatermarkInterval(5000L);

我们自己实现一个简单的周期性的发射水印的例子:

在这个onEvent方法里,我们从每个元素里抽取了一个时间字段,但是我们并没有生成水印发射给下游,而是自己保存了在一个变量里,在onPeriodicEmit方法里,使用最大的日志时间减去我们想要的延迟时间作为水印发射给下游。

     DataStream<Tuple2<String,Long>> withTimestampsAndWatermarks = dataStream.assignTimestampsAndWatermarks(new WatermarkStrategy<Tuple2<String,Long>>(){@Overridepublic WatermarkGenerator<Tuple2<String,Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context){return new WatermarkGenerator<Tuple2<String,Long>>(){private long maxTimestamp;private long delay = 3000;@Overridepublic void onEvent(Tuple2<String,Long> event,long eventTimestamp,WatermarkOutput output){maxTimestamp = Math.max(maxTimestamp, event.f1);}@Overridepublic void onPeriodicEmit(WatermarkOutput output){output.emitWatermark(new Watermark(maxTimestamp - delay));}};}});

内置水印生成策略

为了方便开发,flink提供了一些内置的水印生成方法供我们使用。

固定延迟生成水印

通过静态方法forBoundedOutOfOrderness提供,入参接收一个Duration类型的时间间隔,也就是我们可以接受的最大的延迟时间.使用这种延迟策略的时候需要我们对数据的延迟时间有一个大概的预估判断。

WatermarkStrategy#forBoundedOutOfOrderness(Duration maxOutOfOrderness)

我们实现一个延迟3秒的固定延迟水印,可以这样做:

DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));

他的底层使用的WatermarkGenerator接口的一个实现类BoundedOutOfOrdernessWatermarks。我们看下源码中的这两个方法,是不是和我们上面自己写的很像.

 @Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {maxTimestamp = Math.max(maxTimestamp, eventTimestamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));}

单调递增生成水印

通过静态方法forMonotonousTimestamps来提供.

WatermarkStrategy.forMonotonousTimestamps()

这个也就是相当于上述的延迟策略去掉了延迟时间,以event中的时间戳充当了水印。

在程序中可以这样使用:

DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

它的底层实现是AscendingTimestampsWatermarks,其实它就是BoundedOutOfOrdernessWatermarks类的一个子类,没有了延迟时间,我们来看看具体源码的实现.

@Public
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {/*** Creates a new watermark generator with for ascending timestamps.*/public AscendingTimestampsWatermarks() {super(Duration.ofMillis(0));}
}

event时间的获取

上述我们讲了flink自带的两种水印生成策略,但是对于我们使用eventtime语义的时候,我们想从我们的自己的数据中抽取eventtime,这个就需要TimestampAssigner了.

@Public
@FunctionalInterface
public interface TimestampAssigner<T> {............long extractTimestamp(T element, long recordTimestamp);
}

使用的时候我们主要就是从我们自己的元素element中提取我们想要的eventtime。

使用flink自带的水印策略和eventtime抽取类,可以这样用:

DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, timestamp)->event.f1));

处理空闲数据源

在某些情况下,由于数据产生的比较少,导致一段时间内没有数据产生,进而就没有水印的生成,导致下游依赖水印的一些操作就会出现问题,比如某一个算子的上游有多个算子,这种情况下,水印是取其上游两个算子的较小值,如果上游某一个算子因为缺少数据迟迟没有生成水印,就会出现eventtime倾斜问题,导致下游没法触发计算。

所以filnk通过WatermarkStrategy.withIdleness()方法允许用户在配置的时间内(即超时时间内)没有记录到达时将一个流标记为空闲。这样就意味着下游的数据不需要等待水印的到来。

当下次有水印生成并发射到下游的时候,这个数据流重新变成活跃状态。

通过下面的代码来实现对于空闲数据流的处理

WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1));

完整的代码请参考:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/datastream/WatermarkTest.java

更多精彩内容,欢迎关注我的公众号【大数据技术与应用实战】

flink教程-聊聊 flink 1.11 中新的水印策略相关推荐

  1. Visual C++ 11 中新的并发功能

    最新的 C++ 迭代(称为 C++11,在去年通过了国际标准化组织 (ISO) 的审批)形式化了一组新库和一些保留字以处理并发. 许多开发者以前都在 C++ 中使用过并发功能,但都是通过第三方的库,即 ...

  2. C++11中新特性之:lambda 表达式

    首先摆出Lambda表达式语法 lambda-expression: lambda-introducer lambda-declaratoropt compound-statement lambda- ...

  3. Java 11中的新功能和API详解系列1

    Java 11中的新功能和API详解系列1 2018.9.27 版权声明:本文为博主chszs的原创文章,未经博主允许不得转载. JDK 11在语言语法方面有一个小改动,增加了相当数量的新API,以及 ...

  4. Java 11 中 11 个不为人知的瑰宝

    作者 | Nicolai Parlog 译者 | 罗昭成 出品 | CSDN(ID:CSDNnews) 我们已经迎来了 Java 11,尽管它的升级介绍里没有什么跨时代的特性,但却有一些不为人知的瑰宝 ...

  5. Flink教程(11)- Flink高级API(Window)

    文章目录 01 引言 02 Window 2.1 为什么需要Window? 2.2 Window分类 2.2.1 按照time和count分类 2.2.2 按照slide和size分类 2.2.3 总 ...

  6. Flink 教程 gitbook 从入门到入土(详细教程)

    Flink从入门到入土(详细教程) 和其他所有的计算框架一样,flink也有一些基础的开发步骤以及基础,核心的API,从开发步骤的角度来讲,主要分为四大部分 1.Environment Flink J ...

  7. Flink教程(20)- Flink高级特性(双流Join)

    文章目录 01 引言 02 双流join介绍 03 Window Join 3.1 Tumbling Window Join 3.2 Sliding Window Join 3.3 Session W ...

  8. Flink教程(16)- Flink Table与SQL

    文章目录 01 引言 02 Table API & SQL 介绍 2.1 Flink Table模块 2.2 Table API & SQL特点 2.3 Table API& ...

  9. Flink教程(31)- Flink网络流控及反压

    文章目录 01 引言 02 为什么需要网络流控? 03 网络流控的实现:静态限速 04 网络流控的实现:动态反馈/自动反压 4.1 案例一:Storm 反压实现 4.2 案例二:Spark Strea ...

最新文章

  1. form表单提交编码的问题
  2. 物联网成网络安全防护新重点!
  3. C语言一级指针(char *)易错模型分析
  4. 高斯模糊为什么叫高斯滤波_为什么高斯是所有发行之王?
  5. VMware Converter P2V 时,卡住
  6. 查看一个定义的方法在哪些地方被使用过(vs2008)
  7. 利用SqlBuikCopy实现数据批量写入
  8. 梯度消失和梯度爆炸_梯度消失梯度爆炸-Gradient Clip
  9. Yii Framework2.0开发教程(8)输入验证
  10. qnap 文件传输服务器,如何将 QNAP NAS 作为 RADIUS 服务器使用?
  11. 重磅 | 数据挖掘之父韩家炜:文本语料库的数据挖掘(附视频+PPT下载)
  12. (C语言)2048游戏实现
  13. 有一根27厘米的细木杆java_[转载]推荐最近看过的最好的java视频
  14. [MATLAB]最邻近插值法进行图像放大
  15. Go语言的errors
  16. PDF压缩有哪些方法?用迅读PDF大师,压缩清晰无损
  17. 第十一部分 项目采购管理
  18. java.lang.IllegalArgumentException: interface UserMapper is not visible from class loader
  19. Android夜间模式原理
  20. 华为鸿蒙创始人,华为鸿蒙操作系统怎样 华为创始人任正非:苹果是隐私榜样...

热门文章

  1. 纯html+css打造一款特殊的生日贺卡
  2. python编程midi键盘按键错乱_电脑键盘按键错乱怎么回事 电脑键盘按键错乱原因【图文】...
  3. 知识付费-windows+宝塔安装教程
  4. SWD是一种串行调试接口
  5. [工业互联-1]:工业互联全局概述
  6. 硬件MSB最高位优先、LSB最低位优先的CRC计算原理详细解释和程序,正算反算成功等效,DS18B20和HTU31D传感器CRC
  7. 爬虫学习笔记(十八)—— 点触验证码:超级鹰、12306自动登录
  8. 回眸 2022,展望 2023
  9. appinventor跑酷游戏_基于APPInventor的一款益智游戏的设计与实现
  10. 湖北计算机一级证书样本,湖北计算机一级(Hubei computer level).doc