目录

1.Flink中的时间语义

1.1 EventTime 的代码设置

2.Watermark水位线

2.1 watermark的基本概念

2.2 watermark的特点和传递

2.3 Watermark 的代码设置

2.3.1 Assigner with periodic watermarks

2.3.2 Assigner with punctuated watermarks

3.watermark的设定

4.代码演示

4.1  Flink 多并行度下的 watermark触发机制

(1)前面代码中设置了并行度为 1(env.setParallelism(1);)

(2)把代码中的并行度调整为 2 (env.setParallelism(2);)


1.Flink中的时间语义

在 Flink 的流式处理中,会涉及到时间的不同概念,如下图所示:

  • Event Time事件创建时间:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳。绝大部分的业务都会使用 eventTime
  • Ingestion Time进入时间:是数据进入 Flink 的时间
  • Processing Time处理时间:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是 Processing Time。

例如,一条日志进入 Flink 的时间为 2017-11-12 10:00:00.123,到达 Window 的 系统时间为 2017-11-12 10:00:01.234,日志的内容如下:

2017-11-02 18:37:15.624 INFO Fail over to rm2

对于业务来说,要统计 1min 内的故障日志个数,哪个时间是最有意义的?—— eventTime,因为我们要根据日志的生成时间进行统计。

1.1 EventTime 的代码设置

在 Flink 的流式处理中,绝大部分的业务都会使用 eventTime,一般只在 eventTime 无法使用时,才会被迫使用 ProcessingTime 或者 IngestionTime。如果要使用 EventTime,那么需要引入 EventTime 的时间属性,引入方式如下所 示:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给 env 创建的每一个 stream 追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

2.Watermark水位线

2.1 watermark的基本概念

我们知道,流处理从事件产生,到流经 source,再到 operator,中间是有一个过程和时间的,虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生,所谓乱序,就是指 Flink 接收到的事件的先后顺序不是严格按照事件的 Event Time 顺序排列的。

那么此时出现一个问题,一旦出现乱序,如果只根据 eventTime 决定 window 的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发 window 去进行计算了,这个特别的机制,就是 Watermark水位线机制。

  • ⚫ Watermark 是一种衡量 Event Time 进展的机制。
  • ⚫ Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 window 来实现。
  • ⚫ 数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,window 的执行也是由 Watermark 触发的。
  • Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时时长 t,每次系统会校验已经到达的数据中最大的 maxEventTime,然后认定 eventTime 小于 maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于 maxEventTime – t,那么这个窗口被触发执行。
  • ⚫ watermark相当于是我们去坐公交车,9点发车,但是我们将司机的表调慢了2分钟,司机会在9:02发车。
  • ⚫ 乱序时间处理:(1)watermark等一会处理,发车  (2)设置延迟时间,来一个处理一个(跑的快,停车让他上车继续发车)  (3)设置侧边输出流,最后聚合,进行兜底保证数据不丢失。

有序流的 Watermarker 如下图所示:(Watermark 设置为 0)

乱序流的 Watermarker 如下图所示:(Watermark 设置为 2)

当 Flink 接收到数据时,会按照一定的规则去生成 Watermark,这条 Watermark 就等于当前所有到达数据中的 maxEventTime - 延迟时长,也就是说,Watermark 是 基于数据携带的时间戳生成的,一旦 Watermark 比当前未触发的窗口的停止时间要 晚,那么就会触发相应窗口的执行。由于 event time 是由数据携带的,因此,如果 运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。

上图中,我们设置的允许最大延迟到达时间为 2s,所以时间戳为 7s 的事件对应 的 Watermark 是 5s,时间戳为 12s 的事件的 Watermark 是 10s,如果我们的窗口 1 是 1s~5s,窗口 2 是 6s~10s,那么时间戳为 7s 的事件到达时的 Watermarker 恰好触发窗口 1,时间戳为 12s 的事件到达时的 Watermark 恰好触发窗口 2。

Watermark 就是触发前一窗口的“关窗时间”,一旦触发关门那么以当前时刻为准在窗口范围内的所有所有数据都会收入窗中。

只要没有达到水位那么不管现实中的时间推进了多久都不会触发关窗

2.2 watermark的特点和传递

多个watermark的时候,选择一个最小的,表示可以保证在这个数之前的数字已经到了,相关的窗口可以关闭了。

2.3 Watermark 的代码设置

watermark 的引入很简单,对于乱序数据,最常见的引用方式如下:

下面代码:提取数据源中的时间戳+设定watermark最大延迟时间(最大的乱序程度)

dataStream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.milliseconds(1000)) {@Overridepublic long extractTimestamp(element: SensorReading): Long = {return element.getTimestamp() * 1000L;}
});

Event Time 的使用一定要指定数据源中的时间戳。否则程序无法知道事件的事件时间是什么(数据源里的数据没有时间戳的话,就只能使用 Processing Time 了)。

我们看到上面的例子中创建了一个看起来有点复杂的类,这个类实现的其实就是分配时间戳的接口。Flink 暴露了 TimestampAssigner 接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置事件时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<SensorReading> dataStream = env.addSource(new SensorSource()).assignTimestampsAndWatermarks(new MyAssigner());

MyAssigner 有两种类型:⚫ AssignerWithPeriodicWatermarks ⚫ AssignerWithPunctuatedWatermarks,以上两个接口都继承自 TimestampAssigner。

2.3.1 Assigner with periodic watermarks

周期性的生成 watermark:系统会周期性的将 watermark 插入到流中(水位线也是一种特殊的事件!)。默认周期是 200 毫秒。可以使用 ExecutionConfig.setAutoWatermarkInterval()方法进行设置。

// 每隔 5 秒产生一个 watermark
env.getConfig.setAutoWatermarkInterval(5000);

产生 watermark 的逻辑:每隔 5 秒钟,Flink 会调用AssignerWithPeriodicWatermarks 的 getCurrentWatermark()方法。如果方法返回一个时间戳大于之前水位的时间戳,新的 watermark 会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于等于之前水位的时间戳,则不会产生新的 watermark。

例子,自定义一个周期性的时间戳抽取:

// 自定义周期性时间戳分配器
public static class MyPeriodicAssigner implements AssignerWithPeriodicWatermarks<SensorReading>{private Long bound = 60 * 1000L; // 延迟一分钟private Long maxTs = Long.MIN_VALUE; // 当前最大时间戳@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(maxTs - bound);}@Overridepublic long extractTimestamp(SensorReading element, long previousElementTimestamp){maxTs = Math.max(maxTs, element.getTimestamp());return element.getTimestamp();}
}

一种简单的特殊情况是,如果我们事先得知数据流的时间戳是单调递增的,也就是说没有乱序,那我们可以使用 AscendingTimestampExtractor,这个类会直接使用数据的时间戳生成 watermark。

DataStream<SensorReading> dataStream = …dataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {@Overridepublic long extractAscendingTimestamp(SensorReading element) {return element.getTimestamp() * 1000;}
});

而对于乱序数据流,如果我们能大致估算出数据流中的事件的最大延迟时间, 就可以使用如下代码:

DataStream<SensorReading> dataStream = …dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(1)) {@Overridepublic long extractTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}
});

2.3.2 Assigner with punctuated watermarks

间断式地生成 watermark。和周期性生成的方式不同,这种方式不是固定时间的, 而是可以根据需要对每条数据进行筛选和处理。直接上代码来举个例子,我们只给 sensor_1 的传感器的数据流插入 watermark:

public static class MyPunctuatedAssigner implements AssignerWithPunctuatedWatermarks<SensorReading>{private Long bound = 60 * 1000L; // 延迟一分钟@Nullable@Overridepublic Watermark checkAndGetNextWatermark(SensorReading lastElement, long extractedTimestamp) {if(lastElement.getId().equals("sensor_1"))return new Watermark(extractedTimestamp - bound);elsereturn null;}@Overridepublic long extractTimestamp(SensorReading element, long previousElementTimestamp){return element.getTimestamp();}
}

3.watermark的设定

4.代码演示

package com.ucas.watermarkTest;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;/*** @author GONG* @version 1.0* @date 2021/3/10 9:58*/
public class WaterPar {/*** 测试点:测试多 多并行度下的 watermark触发机制* 参考:链接:https://juejin.im/post/5bf95810e51d452d705fef33** @throws Exception*/public static void main(String[] args) throws Exception {//定义socket的端口号int port = 7777;//获取运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置使用eventtime,默认是使用processtimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置并行度为1,默认并行度是当前机器的cpu数量env.setParallelism(1);//连接socket获取输入的数据DataStream<String> text = env.socketTextStream("8.131.72.75", port, "\n");//解析输入的数据DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {String[] arr = value.split(",");return new Tuple2(arr[0], Long.parseLong(arr[1]));}});//抽取timestamp和生成watermarkDataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {Long currentMaxTimestamp = 0L;final Long maxOutOfOrderness = 10000L;// 最大允许的乱序时间是10sSimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");/*** 定义生成watermark的逻辑* 默认100ms被调用一次*/@Overridepublic Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}//定义如何提取timestamp@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {long timestamp = element.f1;currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);long id = Thread.currentThread().getId();System.out.println("键值 :" + element.f0 + "   线程验证 :" + id + " , 事件事件:[ " + sdf.format(element.f1) + " ],currentMaxTimestamp:[ " +sdf.format(currentMaxTimestamp) + " ],水印时间:[ " + sdf.format(getCurrentWatermark().getTimestamp()) + " ]");return timestamp;}});//保存被丢弃的数据OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data") {};//注意,由于getSideOutput方法是SingleOutputStreamOperator子类中的特有方法,所以这里的类型,不能使用它的父类dataStream。SingleOutputStreamOperator<String> window = waterMarkStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和调用TimeWindow效果一样//.allowedLateness(Time.seconds(2))//允许数据迟到2秒.sideOutputLateData(outputTag).apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {/*** 对window内的数据进行排序,保证数据的顺序* @param tuple* @param window* @param input* @param out* @throws Exception*/@Overridepublic void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {String key = tuple.toString();List<Long> arrarList = new ArrayList<Long>();Iterator<Tuple2<String, Long>> it = input.iterator();while (it.hasNext()) {Tuple2<String, Long> next = it.next();arrarList.add(next.f1);}Collections.sort(arrarList);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String result = "\n键值 : " + key + "\n              触发窗内数据个数 : " + arrarList.size() + "\n              触发窗起始数据: " + sdf.format(arrarList.get(0)) + "\n              触发窗最后(可能是延时)数据:" + sdf.format(arrarList.get(arrarList.size() - 1))+ "\n              实际窗起始和结束时间: " + sdf.format(window.getStart()) + "《----》" + sdf.format(window.getEnd()) + " \n \n ";out.collect(result);}});//把迟到的数据暂时打印到控制台,实际中可以保存到其他存储介质中DataStream<Tuple2<String, Long>> sideOutput = window.getSideOutput(outputTag);sideOutput.print();//测试-把结果打印到控制台即可window.print();//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行env.execute("eventtime-watermark");}
}

4.1  Flink 多并行度下的 watermark触发机制

(1)前面代码中设置了并行度为 1(env.setParallelism(1);)

参数:窗口设置滚动窗口,窗口大小为3s,比如开始30s,结束时候为33s。

设置最大允许乱序时间为10S,所以30s的数据,直到接收到43s(10秒最大乱序+3s窗口大小)时,第一个窗口才执行计算并且关闭。

我们设置水印时间戳为当前最大时间戳(针对该id)-最大乱序时间(10s)

后面的参数是根据键值计算的,不是根据这个分区计算。中间有一个keyby操作

我们使用了AssignerWithPeriodicWatermarks周期性生成水印,默认每隔100s计算一次水印,计算公式为:水印时间戳=当前最大时间戳(针对该id)-最大乱序时间(10s),并且计算后得到的水印需要与原来水印比较,如果小于之前水印不更新,保证水印是递增的。

(2)把代码中的并行度调整为 2 (env.setParallelism(2);)

  • 发现玄机如下:在第二条事件时,其实已经达到窗的触发时机,但是因为并行度为2,只有等到最小watermark 到的时候才会触发窗计算。发现线程44处理的是001和003 ,线程42处理的是0002,所以只有等到线程42到达后,水印才会起作用执行2018-10-01 10:11:33.000所在的窗。
  • 下面这三个数据是轮询放到两个分区,0001和0003放到一个分区,所以当0002来的时候,虽然满足了43-30=13的条件,但是因为是不同分区,所以没有关闭该窗口。
0001,1538359890000       2018-10-01 10:11:30
0002,1538359903000      2018-10-01 10:11:43
0003,1538359908000      2018-10-01 10:11:48

注意:多并行度的情况下,watermark对齐会取所有channel最小的watermark

4.1.19 Flink-流处理框架-Flink中的时间语义和watermark水位线相关推荐

  1. 最新 Flink 1.13 时间和窗口(时间语义、Watermark、Window 窗口、Trigger)快速入门、详细教程

    时间和窗口 文章目录 时间和窗口 一.Flink 的三种时间语义 二.水位线(Watermark) 1. Flink 中的 Watermark 机制 2. 如何生成水位线 3. 水位线的传递 三.窗口 ...

  2. Flink时间语义与watermark的原理

    时间语义 我们可以直接在代码中,对执行环境调用setStreamTimeCharacteristic方法,设置流的时间特性 val env: StreamExecutionEnvironment = ...

  3. 流计算框架 Flink 与 Storm 的性能对比

    本文作者:孙梦瑶 | 美团点评 本文主要内容:通过将分布式实时计算框架 Flink 与 Storm 进行性能对比,为美团点评实时计算平台和业务提供数据参考. 一. 背景 Apache Flink 和 ...

  4. Flink(二十三)—— 流计算框架 Flink 与 Storm 的性能对比

    1. 背景 Apache Flink 和 Apache Storm 是当前业界广泛使用的两个分布式实时计算框架.其中 Apache Storm(以下简称"Storm")在美团点评实 ...

  5. Flink的时间语义和Watermark

    1 时间语义    数据迟到的概念是:数据先产生,但是处理的时候滞后了    在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:    Event Time:是事件创建的时间.它通常由事件 ...

  6. Flink 中的时间和窗口

    时间和窗口 一.时间语义 1. Flink 中的时间语义 1.1 处理时间(Processing Time) 1.2 事件时间(Event Time) 1.3 两种时间语义的对比 二.水位线(Wate ...

  7. Flink中的时间和窗口——时间语义

    文章目录 前言 一.时间语义 1.1.Flink 中的时间语义 1.1.1.处理时间(Processing Time) 1.1.2.事件时间(Event Time) 1.2.哪种时间语义更重要 1.2 ...

  8. Flink水位线-详细说明

    文章目录 时间语义 Flink 中的时间语义? 哪种时间语义更重要? 1. 水位线(Watermark) 1.1 什么是水位线? 1.2 如何生成水位线? 1.3 水位线的传递 1.4 水位线的计算

  9. Flink流式计算从入门到实战 三

    文章目录 四.Flink DataStream API 1.Flink程序的基础运行模型 2.Environment 运行环境 3.Source 3.1 基于File的数据源 3.2 基于Socket ...

最新文章

  1. kafka 同步提交 异步_腾讯游戏工程师分享:简单理解 Kafka 的消息可靠性策略
  2. flash_erase and flash_eraseall
  3. 数据库数据格式化之Kettle Spoon
  4. 胡珀:从危到机,AI 时代下的安全挑战
  5. JavaScript 中的闭包和作用域链(读书笔记)
  6. Spring Data,MongoDB和JSF集成教程
  7. 【技术培训】招收Jeecg门徒 ---javaweb初级入门班
  8. 会动的图解 | 既然IP层会分片,为什么TCP层也还要分段?
  9. Dev-c++下载地址
  10. 含泪推荐5款WIN10装机必备的软件
  11. Vue项目实战:电商后台管理系统(Vue+VueRouter+Axios+Element)
  12. 基于51单片机ds18b20智能温控风扇Proteus仿真
  13. 小胜靠智、大胜靠德、永胜靠和
  14. Mac OS X添加网络打印机
  15. golang 支付宝小程序 登陆
  16. 已解决:[emerg] bind() to 0.0.0.0:80 failed (10013: An attempt was made to access a socket in a way forb
  17. 制作QQ会员页面导航
  18. 吐血整理:43种机器学习开源数据集(附地址/调用方法)
  19. PC端浏览器如何设置无图模式
  20. MySQL 中的boolean/bool/tinyint(1)表示布尔类型

热门文章

  1. 基于大数据个性化音乐推荐算法分析(附代码github地址)
  2. 咸鱼ZTMR实例—土壤湿度计检测模块
  3. 梯控数据分析教程 梯控分析软件 门禁电梯停车卡分析教程
  4. java.lang.ClassCastException: java.lang.Integer cannot be cast to java.math.BigD
  5. HDU 1574 RP问题(DP)
  6. 方圆三维支吊架设计系统技术特点
  7. 【ASSIC】ASSIC码概念+字母ASSIC码+利用ASSIC实现大小写转换
  8. 墨尔本大学计算机本科学费,墨尔本大学本科学费要多少
  9. 中山大学计算机学院官网万海,万海:《计算机网络》课程研修班报告 - 中山大学信息科学与技术学院.doc...
  10. mysql drop后回收站怎么恢复吗_回收站清空了怎么恢复