目录

  • 一、时间
    • 1.1 时间语义
      • 1.1.1 Event Time
      • 1.1.2 Ingestion Time
      • 1.1.3 Processing Time
    • 1.2 设置时间语义
  • 二、Watermark
    • 2.1 Watermark是什么?
    • 2.2 如何计算Watermark?
    • 2.3 何时出发窗口计算?
    • 2.4 原理
      • 2.4.1 窗口计算问题
      • 2.4.2 水印窗口
    • 2.5 Watermark设定策略
      • 2.5.1 AssignerWithPunctuatedWatermarks
      • 2.5.2 AssignerWithPeriodicWatermarks
  • 三、案例

Apache Flink称为终极流式框架,不仅仅提供高吞吐、低延迟和Exactly-Once语义的实时计算能力,还提供了基于流式引擎处理批量数据的计算能力,真正意义上实现了批流统一,无疑是继Spark和Storm的后起之秀。
但是刚刚入门Flink,会接触到水印水位线陌生的技术词汇,但到底什么是水位线,又给Apache Flink蒙上了一层神秘的面纱。这里就为大家揭开Watermark的神秘面纱。

一、时间

1.1 时间语义

在流式数据处理中,Flink根据时间产生的位置不同,将时间区分为三种时间语义,分别为事件时间、事件接入时间和事件处理时间。

1.1.1 Event Time

事件时间,即事件行为发生的时间,如系统终端用户注册时间、订单下单时间以及订单支付时间等,决定了事件真实产生时间。

1.1.2 Ingestion Time

事件接入时间,或摄入时间,即数据接入Flink系统,在DataSource接入时生成的接入时间。

1.1.3 Processing Time

处理时间,数据通过各个算子实例执行转换操作过程,算子实例所在系统的时间即为数据处理时间。

1.2 设置时间语义

在Flink中,默认情况下使用的是Process Time时间语义,如果用户选择使用Event Time或者Ingestion Time语义, 则需要在创建的StreamExecutionEnvironment中调用setStreamTimeCharacteristic()方法设定系统的时间概念,

    // 使用EventTimeenv.setStreamTImeCharacteristic(TimeCharacteristic.EventTime)// 使用IngestionTimeenv.setStreamTImeCharacteristic(TimeCharacteristic.IngestionTime)

二、Watermark

使用EventTime时间语义处理流式数据时,数据从Event产生,流经Source,再到Operator,这中间需要一定的时间。理论情况下,数据是按照EventTime先后顺序传输到Operator进行处理;但是也不排除由于网络延迟消息积压背压等原因而导致乱序情况;特别是使用Kafka的时候,无法保证多分区间数据的顺序。因此,在进行Window计算的时候,不能无限期地等下去,必须有机制保证在特定的时间后, 触发Window进行计算,即这个机制就是Watermark(水位线)。

2.1 Watermark是什么?

直译为水印,本质是时间戳,能一定程度上解决数据乱序或者延迟到达问题。

2.2 如何计算Watermark?

  • Watermark = 当前窗口最大的事件时间 - 最大允许数据延迟的时间/乱序时间

  • 最大允许数据延迟的时间设置

    // 设置时间语义
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 指定水印分配策略
    ds.assignTimestampsAndWatermarks(// 注意:WatermarkStrategy为Flink 1.11版本提供// 参数为最大延迟时间,或者最大无序度,或者最大乱序时间。// 值maxOutOfOrderness=2sWatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))// 指定事件时间数据.withTimestamAssigner(e, timestamp) -> e.getEventTime())
    );
    
  • BoundedOutOfOrdernessWatermarks源码分析

    @Public
    public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {// 当前窗口最大的事件时间private long maxTimestamp;// 窗口最大允许的延误时间private final long outOfOrdernessMillis;public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {// 延误时间, 以毫秒表示this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();// 刚开始保证时间为最小this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;}/*** 所有事件均会调用该方法,不限于该窗口数据* @param event 时间* @param eventTimestamp 事件时间* @param out Watermark输出器*/@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput out){// 计算窗口最大的事件时间,保证窗口水印单调递增maxTimestamp = Math.max(maxTimestamp, eventTimestamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 水印 = 当前窗口最大事件时间 - 最大延误时间 - 1// 水印需要减一的原因为:窗口为左闭右开的问题output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));}
    }
    

2.3 何时出发窗口计算?

  • Watermark >= 窗口结束时间
  • 推导
    Watermark = 当前窗口最大的事件时间 - 最大允许数据延迟的时间/乱序时间
    => Watermark = 当前窗口最大的事件时间 - 最大允许数据延迟的时间/乱序时间 >= 窗口结束时间
    => 当前窗口最大的事件时间 >= 窗口结束时间 + 最大允许数据延迟的时间/乱序时间

2.4 原理

在Apache Flink的窗口处理过程中,如果时间超过窗口最大结束时间,将触发数据的计算操作(如汇总、分组等)。但是,对于乱序数据来说,很容易错过窗口计算时间,导致数据丢失。而应用水位线(Watermark)机制,能一定程度上解决数据乱序或者延迟到达问题。

2.4.1 窗口计算问题


图示,当事件流数据C到达时,事件流数据C的时间超过窗口X的结束时间,因此窗口X将触发计算,并且新建窗口U接收事件流数据C。当事件流数据D和E接入时,由于窗口X已被触发计算,所以事件流数据D和E将丢失。

2.4.2 水印窗口


图示窗口加入Watermark计算,当事件流数据C到达时,Watermark为10:09:00但是小于窗口X的结束时间,没达到窗口X的计算条件,不触发窗口X计算。与此同时,新建窗口U接收事件流数据C。当事件流数据D/E到达时,窗口X还未触发计算,于是事件流数据D/E加入到窗口X,一定程度解决了2秒内数据乱序问题。当事件流数据F到达的时候,Watermark值为10:10:00且大于等于窗口X的结束时间,达到窗口X的计算条件,触发窗口X计算。

2.5 Watermark设定策略

2.5.1 AssignerWithPunctuatedWatermarks

标点水位线,通过数据流中某些特殊标记事件时间触发生成新的水位线。这种方式下,窗口的触发与时间无关,而是决定于何时收到标记事件。

在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。

2.5.2 AssignerWithPeriodicWatermarks

周期性水位线,系统会周期性的(一定时间间隔)产生一个Watermark。水位线提升的时间间隔是由用户设置的,在两次水位线提升时隔内会有一部分消息流入,用户可以根据这部分数据来计算出新的水位线。

在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。

举个例子,最简单的水位线算法就是取目前为止最大的事件时间,然而这种方式比较暴力,对乱序事件的容忍程度比较低,容易出现大量迟到事件。

三、案例

package com.hotmail.ithink.watermark;import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.io.Serializable;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;public class WatermarkMain {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 基于Watermark事件时间的窗口计算env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 获取事件流数据DataStreamSource<OrderEvent> eventDataStream = env.addSource(new SourceFunction<OrderEvent>() {private static final long serialVersionUID = 5652749729728486680L;// 开关private boolean switchFlag = true;@Overridepublic void run(SourceContext<OrderEvent> ctx) throws Exception {Random random = new Random();while (switchFlag) {String orderId = UUID.randomUUID().toString().replaceAll("-", "");int userId = random.nextInt(2);int money = random.nextInt(100);long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;OrderEvent orderEvent = new OrderEvent(orderId, userId, money, eventTime);System.out.println("data: " + orderEvent);// 发送元素ctx.collect(orderEvent);// 休眠1sTimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {switchFlag = false;}});// 为事件流数据添加Watermark和指定事件时间
//        SingleOutputStreamOperator<OrderEvent> eventWatermarkDataStream
//          = eventDataStream.assignTimestampsAndWatermarks(
//                // 设置最大允许延误时间为3s
//                WatermarkStrategy.<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(3))
//                        // 设置时间戳数据
//                        .withTimestampAssigner((e, timestamp) -> e.getEventTime())
//        );SingleOutputStreamOperator<OrderEvent> eventWatermarkDataStream = eventDataStream.assignTimestampsAndWatermarks(new WatermarkStrategy<OrderEvent>() {@Overridepublic WatermarkGenerator<OrderEvent> createWatermarkGenerator(WatermarkGeneratorSupplier.Context ctx) {return new WatermarkGenerator<OrderEvent>() {/** 最大允许延迟时间 */private final int outOfOrdernessMills = 3000;/** 用户ID **/private Integer userId;/** 事件时间 **/private Long eventTime;/** 事件最大时间戳 */private Long maxTimestamp = Long.MIN_VALUE + outOfOrdernessMills + 1;// 时间格式化private FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");@Overridepublic void onEvent(OrderEvent event, long eventTimestamp, WatermarkOutput output) {this.userId = event.userId;this.eventTime = event.eventTime;maxTimestamp = Math.max(maxTimestamp, eventTimestamp);System.out.println("watermark on event: "  + event);}@Overridepublic void onPeriodicEmit(WatermarkOutput out) {Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMills - 1);String note = String.format("watermark emit key:%s current time:%s " +"event time:%s watermark:%s",userId, System.currentTimeMillis(), df.format(eventTime),df.format(maxTimestamp - outOfOrdernessMills - 1));System.out.println(note);out.emitWatermark(watermark);}};}}.withTimestampAssigner((e, timestamp) -> e.getEventTime()));// 添加窗口计算SingleOutputStreamOperator<String> outDataStream = eventWatermarkDataStream.keyBy(OrderEvent::getUserId)// 设定滚动窗口.window(TumblingEventTimeWindows.of(Time.seconds(5)))// 指定窗口应用函数.apply(new WindowFunction<OrderEvent, String, Integer, TimeWindow>() {private static final long serialVersionUID = 7034105248794615763L;// 时间格式化private FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");@Overridepublic void apply(Integer key, TimeWindow window, Iterable<OrderEvent> events,Collector<String> out) throws Exception {List<String> eventTimeList = Lists.newLinkedList();for (OrderEvent event : events) {String time = df.format(event.getEventTime());eventTimeList.add(time);}String windowStartTime = df.format(window.getStart());String windowEndTime = df.format(window.getEnd());String rs = String.format("key:%s window:[%s,%s) window event times:%s",key, windowStartTime, windowEndTime, eventTimeList.toString());out.collect(rs);}});outDataStream.print("WaterMarkResult::");env.execute("WatermarkMain");}@Data@NoArgsConstructor@AllArgsConstructorpublic static class OrderEvent implements Serializable {private static final long serialVersionUID = 2082940433103599734L;/** 订单ID */private String orderId;/** 用户ID */private Integer userId;/** 金额 */private Integer money;/** 事件时间 */private Long eventTime;}
}

Flink难点解析:揭开Watermark的神秘面纱相关推荐

  1. linux操作系统说课稿,信息技术《揭开LINUX的神秘面纱》教案范文

    信息技术<揭开LINUX的神秘面纱>教案范文 教学目标: 1.会启动LINUX系统: 2.会关闭LINUX系统: 3.LINUX基本界面的认识. 教学重点: 1.会启动LINUX系统: 2 ...

  2. 了解黑客的关键工具---揭开Shellcode的神秘面纱

    2019独角兽企业重金招聘Python工程师标准>>> ref:  http://zhaisj.blog.51cto.com/219066/61428/ 了解黑客的关键工具---揭开 ...

  3. [转]揭开正则表达式的神秘面纱

    揭开正则表达式的神秘面纱 关闭高亮 [原创文章,转载请保留或注明出处:http://www.regexlab.com/zh/regref.htm] 引言 正则表达式(regular expressio ...

  4. 揭开PC-Lint9的神秘面纱

    前言 今天,又定位了一个令人懊恼的C++内存使用异常问题,最终结果,竟然是减少接口类的方法后,为了避免编译错误,顺手添加的强制类型转换导致的. 对于这样的问题,我们碰到很多很多次了.没有这样的问题,我 ...

  5. 未来已来?揭开量子计算机的神秘面纱

    从第一台现代计算机ENIAC的诞生到个人PC时代的降临,从互联网概念的提出到移动互联的疾跑,在这个信息年代里,变革正以前所未有的速度改变着我们熟悉的世界.熟悉的生活. 作为个人,我们早已习惯于智能计算 ...

  6. ASP.NET 运行时详解 揭开请求过程神秘面纱

    对于ASP.NET开发,排在前五的话题离不开请求生命周期.像什么Cache.身份认证.Role管理.Routing映射,微软到底在请求过程中干了哪些隐秘的事,现在是时候揭晓了.抛开乌云见晴天,接下来就 ...

  7. 冰河浅析 - 揭开木马的神秘面纱(下)

    冰河浅析   -   揭开木马的神秘面纱(下)     作者:·   shotgun·yesky 四.破解篇(魔高一尺.道高一丈)         本文主要是探讨木马的基本原理,   木马的破解并非是 ...

  8. 揭开木马的神秘面纱 2

    揭开木马的神秘面纱zz 2 离冰河二的问世已经快一年了,大家对于木马这种远程控制软件也有了一定的认 识,比如:他会改注册表,他会监听端口等等,和一年前几乎没有人懂得木马是什么东   西相比,这是一个质 ...

  9. 【翻译】揭开HTML5的神秘面纱

    写在前面的话: 这篇文章摘自Mozilla官网,主要针对HTML5和本地应用发表了一些.没有设计到技术,所以基本是逐字翻译,但愿我蹩脚的英语水平能把大师的 Chris Heilmann的思想整理明白. ...

最新文章

  1. A Sequence-Based Novel Approach for Quality Evaluation of Third-Generation Sequencing Reads
  2. 动态代理及工厂的简单实现
  3. [转]改变UITextField placeHolder颜色、字体
  4. java HashMap的实现原理
  5. Luogu4099 HEOI2013 SAO 组合、树形DP
  6. numpy.argmax详解
  7. 基于Android系统开发的简易音乐播放器
  8. linux lpte_linux常用命令
  9. 36个非常有趣的互动网站设计作品范例
  10. Shiro使用redis作为缓存(解决shiro频繁访问Redis)
  11. FC冒险岛java版_冒险岛单机版
  12. 激光slam_机器人主流定位技术,激光SLAM与视觉SLAM谁更胜一筹
  13. java线程卡住排查_基于 Java 线程栈 排查问题
  14. aix查看oracle用户密码,AIX详细查看用户/进程使用内存
  15. 计算机网络负载均衡图片,负载均衡计算机网络课程网.ppt
  16. 为什么家里pm25比外面高_你绝对不会相信在家用卷发棒烫发竟然比在外面烫发更伤发?...
  17. 计算机桌面文件如何发送给qq好友,怎么把电脑里的文件夹发给qq好友
  18. c语言stl大全,C++ STL库应用汇总
  19. 数据库开发技术的课程记录
  20. python血脉贲张的cosplay小姐姐图片

热门文章

  1. audio标签的属性
  2. 主机下的虚拟机与外部电脑连接无线局域网(WIFI实现)
  3. 基于图像金字塔的模板匹配检测螺母
  4. Grafana 未经授权任意访问漏洞(CVE-2022-32275)
  5. 感冒自然好&推荐一个Mini录音棚
  6. php ppt自动播放,ppt如何循环播放,ppt如何添加视频设置自动播放(ppt转换成视频)...
  7. 2019.5.25 提高A组 总结
  8. Java经典面试题详解:抖音Java后端123面开挂
  9. 在线gps定位html,看看gps地图网(汽车在线gps定位平台)
  10. D3(V7)树图绘制-数据可视化