Flink -- Time and Window

  • Flink 时间语义
  • 水位线 Watermark
    • 水位线的概念
      • 有序流中的水位线
      • 乱序流中的水位线
      • 水位线的特性
    • 水位线的基本使用
      • 水位线生成策略
      • 内置水位线生成器
      • 自定义水位线策略
      • 在自定义数据源中发送水位线
  • 窗口 Window
    • 窗口的基本概述
      • 窗口的基本概念
      • 窗口的分类
      • 窗口的 API
    • 窗口的基本使用
      • 窗口分配器
        • 时间窗口
        • 计数窗口
      • 窗口函数
        • 增量聚合函数
        • 全窗口函数
        • 增量聚合和全窗口函数的结合使用
      • 测试水位线与窗口的使用
    • 窗口的其他 API
      • 触发器 Trigger
      • 移除器 Evictor
      • 允许延迟 Allowed Lateness
      • 侧入流 Side Output
    • 迟到数据的处理

Flink 时间语义

在一台机器当中,所谓的时间当然就是指系统时间;但是在一个分布式的系统当中,各个节点彼此独立、互不影响,因此并不存在一个统一的时钟。因此,当我们需要进行数据的聚合计算时,”并不同时被处理“的事件可能会导致计算结果出错。

此外,当流中的数据进行传输时,也会存在传输的延迟,延迟将会导致时间语义的模糊,比如:上游任务在8点59分59秒发出一条数据,到下游要做窗口计算时已经是9点零1秒了,那这条数据到底该不该被收到 8~9 点的窗口呢?

因此,我们需要指定一个统一的时间标准,按照这个标准进行数据的收集和计算。Flink 中为我们提供了两种时间语义,分别为事件时间和处理时间。

  • 处理时间(Processing Time)

    处理时间就是在执行处理操作时机器的系统时间。这种时间语义简单粗暴,不需要各个节点之间的同步协调,因此处理时间是最简单的语义。

  • 事件时间(Event Time)

    事件时间指的是数据生成的时间,这个时间伴随着数据的生成而生成,并且是不会变化的,因此我们可以将这个时间作为一个属性嵌入数据当中,并以该时间作为指标进行数据的收集和计算。

在 Flink 中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从 1.12 版本开始,Flink 已经将事件时间作为了默认的时间语义。

水位线 Watermark

水位线的概念

当使用事件时间作为时间语义时,就相当于每条数据都携带了一个时钟。因此,在数据处理过程中的时钟则根据数据的到来而不停的驱动。即何时产生的数据到达处理系统时,系统的时间就会推进到当前数据产生的时间,从而实现数据的时间标准化。

但是在分布式系统中,上述的时间驱动方式会存在以下问题:

  1. 若数据在处理转换的过程当中存在窗口聚合操作,那么会导致下游的数据量减少,因此对于时间控制的精细度便会下降;

  2. 在一般情况下,数据向下游的传递只会传递给一个子任务,此时其他并行子任务的时钟便无法推进;

为解决上述问题,我们在进行数据传递时,需要把时钟也以数据的形式进行传递,这个时间的标志不会因窗口聚合运算而停滞,同时可以直接广播到下游。

在 Flink 中,这种用来衡量事件时间 Event Time 进展的标记,就被称作”水位线“ Watermark 。水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后,这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

如上图所示,当某个数据到来之后,就可以依据这条数据的事件时间,在其后插入一个水位线,并伴随着数据一起向下游流动,以推进时间进度。

有序流中的水位线

在理想状态下,数据会按照生成的顺序,排队进入流中进行处理。这就可以保证时间的推进都是从小到大的,即插入的水位线是不断增长的。

在实际应用中,若当前的数据量非常大,就可能会存在很多具有相同时间戳的数据,此时若针对每条数据都插入水位线,就会产生很多相同的无意义的水位线。因此对于有序流,一般采取周期性的水位线。如下图所示。

注意,这里的周期也是一个时间的概念,周期是以系统时间为基准的。

乱序流中的水位线

这里所说的乱序是指数据到达的顺序是混乱的,由于网络延迟的存在,先产生的数据可能之后才会到达。如果还采取之前的策略,那么有可能会产生“时间倒流”的问题。比如第9秒的数据到达之后,流中的时间水位推移到第 9 秒,随后第7秒数据到达,此时再插入水位线将导致时间水位变回第 7 秒。

针对这个问题,Flink 采取判断 + 周期生成的办法。假设我们的周期设置为 5s,那么 Flink 会记录五秒内数据携带的时间戳的最大值,并每隔五秒将水位线添加进流中传递。采用这种方法可以有效避免“时间倒流”的问题。如下图所示。

此外,乱序流还存在另外一个头疼的问题,即当我们需要对数据按照时间窗口进行聚合计算时,“迟到”的数据将会导致汇总数据结果不准确。比如我们需要每 10 秒计算一下某电站的发电量,若第8秒的发电量在第10秒之后达到,那么在第10秒的数据到达之后,该时间窗口就已经关闭进行计算并输出结果,因此第8秒的数据就不会被统计,这将导致错误的汇总结果。

为解决“迟到”数据的问题,Flink 允许我们采取“等待策略”。在 Flink 当中,水位线就是时钟,若我们在确定水位线时故意延迟几秒,这样就可以达到等待“迟到”数据的效果。如下图所示。我们对“迟到”数据的容忍度设置为 2 秒。因此,在第一次插入水位线时,即使该周期内的最大时间为第9秒的数据,但我们插入的水位线时间为 7,这意味着第7秒前的数据已经全部到达,即使第8秒的数据随后才抵达,那也不会影响聚合计算的结果,因为水位线就是时钟。

该方法通过牺牲数据的实时性而保证了数据计算的准确定。值得注意的一点是,如果数据"迟到"的太久,那么我们只能通过加长等待时间来处理。

水位线的特性

  • 水位线是插入数据流中的一个标记,可以看作一个特殊的数据;

  • 水位线主要内容就是一个时间戳,用于推进时钟;

  • 水位线是基于数据的时间戳而生成的;

  • 水位线的时间戳必须单调递增,以保证任务时钟一直向前推进;

  • 水位线可以通过设置延迟的方式来保证乱序数据的正确处理;

  • 若水位线到达某个时间 t,则代表第 t 秒之前的数据已经全部到达(包含 t),在之后的数据中不会有时间戳小于等于 t 的数据出现;

水位线的基本使用

水位线生成策略

Flink 的 DataStream API 中提供了一个用于生成水位线的方法.assignTimestampsAndWatermarks(),该方法可以为流中的数据分配时间戳,并生成水位线来指示时钟。

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks
(WatermarkStrategy<T> watermarkStrategy)

该方法需要传入一个WatermarkStrategy类对象作为参数,该类中包含一个时间戳分配器方法createTimestampAssigner()以及生成水位线生成器方法createWatermarkGenerator()

下述代码中使用的实体类与源算子的代码分别如下:

实体类 Event

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Event {public String user;public String url;public Long timestamp;}

源算子 EventSource

public class EventSource implements SourceFunction<Event> {private Boolean flag = true;String[] users = {"曹操", "刘备", "孙权", "诸葛亮"};String[] urls = {"/home", "/test?id=1", "/test?id=2", "/play/football", "/play/basketball"};@Overridepublic void run(SourceContext<Event> sourceContext) throws Exception {Random random = new Random();while (flag) {sourceContext.collect(new Event(users[random.nextInt(users.length)],urls[random.nextInt(urls.length)],Calendar.getInstance().getTimeInMillis()));Thread.sleep(1000);}}@Overridepublic void cancel() {flag = false;}
}

内置水位线生成器

Flink 提供了内置的水位线生成器(WatermarkGenerator),不仅开箱即用简化了编程,
而且也为我们自定义水位线策略提供了模板。这两个生成器可以通过调用WatermarkStrategy的静态辅助方法来创建。它们都是周期性生成水位线的,分别对应着处理有序流和乱序流的场景。

示例代码如下,该代码段中提取实体类 Event 的 Timestamp 字段作为时间戳,并设置 5s 的延迟时间用于处理乱序流的迟到数据。

public class WaterMarkDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);environment.setParallelism(1);// 2. 加载数据源environment.addSource(new EventSource())// 3. 设置水位线逻辑.assignTimestampsAndWatermarks(WatermarkStrategy// 3.1 针对乱序流插入水位线,延迟时间设置为 5s// 针对有序流周期性生成水位线即可:.<Event>forMonotonousTimestamps();.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))// 3.2 抽取时间戳的逻辑.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}}))// 4. 打印输出.print();// 5. 执行程序environment.execute();}}

这里需要注意的是,乱序流中生成的水位线真正的时间戳,其实是当前最大时间戳 – 延 迟时间 – 1,位是毫秒。

为什么要减 1 毫秒呢?回想一下水位线的特点:时间戳为 t 的水位线,表示时间戳小于等于 t 的数据全部到齐,不会再来了。如果考虑有序流,也就是延迟时间为 0 的情况,那么时间戳为 7 秒的数据到来时,之后其实是还有可能继续来 7 秒的数据的;所以生成的水位线不是 7 秒,而是 6 秒 999 毫秒,7 秒的数据还可以继续来。这一点可以在BoundedOutOfOrdernessWatermarks的源码中明显地看到:

public void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(this.maxTimestamp - this.outOfOrdernessMillis - 1L));
}

自定义水位线策略

自定义水位线策略时,需要我们创建一个类并实现WatermarkStrategy接口,重写接口中的时间戳分配器方法createTimestampAssigner与水位线生成方法createWatermarkGenerator即可。

  • 时间戳分配器:接收数据的实体类,提取某个字段作为时间戳即可;

  • 水位线生成器:需要返回一个WatermarkGenerator接口类,实现该接口需要实现两个方法

    • onEvent():每来一条数据便调用一次,用于观察判断输入的事件;

    • onPeriodicEmit():由框架周期性的进行调用,生成水位线;

示例代码如下:

public class CustomWaterMarkDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);environment.setParallelism(1);// 2. 加载数据源environment.addSource(new EventSource())// 3. 设置水位线逻辑.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())// 4. 打印输出.print();// 5. 执行程序environment.execute();}}

自定义水位线策略 CustomWatermarkStrategy:

public class CustomWatermarkStrategy implements WatermarkStrategy<Event> {@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}};}@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new CustomPeriodicGenerator();
//        return new CustomPunctuatedGenerator();}/*** 周期性水位线生成器(Periodic Generator)*/public static class CustomPeriodicGenerator implements WatermarkGenerator<Event> {private Long delayTime = 5000L;private Long maxTs = Long.MIN_VALUE + delayTime + 1L;/*** 每来一条数据就调用一次*/@Overridepublic void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {// 更新最大时间戳maxTs = Math.max(event.timestamp, maxTs);}/*** 发射水位线,默认 200ms 调用一次* 可以调用环境配置的.setAutoWatermarkInterval()方法来设置*/@Overridepublic void onPeriodicEmit(WatermarkOutput watermarkOutput) {watermarkOutput.emitWatermark(new Watermark(maxTs - delayTime - 1L));}}/*** 断点式水位线生成器(Punctuated Generator)*/public static class CustomPunctuatedGenerator implements WatermarkGenerator<Event> {/*** 遇到特定的事件时发出水位线*/@Overridepublic void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {if ("曹操".equals(event.user)) {watermarkOutput.emitWatermark(new Watermark(event.timestamp - 1));}}/*** 发射水位线*/@Overridepublic void onPeriodicEmit(WatermarkOutput watermarkOutput) {// 不需要做任何处理,在 onEvent() 中发射水位线}}
}

在自定义数据源中发送水位线

也可以在自定义的源算子中定义发送水位线的逻辑,采用该源算子就不需要在代码中使用assignTimestampsAndWatermarks重复设置水位线了。

示例代码如下:

public class ClickSourceWithWatermark implements SourceFunction<Event> {private Boolean flag = true;String[] users = {"曹操", "刘备", "孙权", "诸葛亮"};String[] urls = {"/home", "/test?id=1", "/test?id=2", "/play/football", "/play/basketball"};@Overridepublic void run(SourceContext<Event> sourceContext) throws Exception {Random random = new Random();while (flag) {Event event = new Event(users[random.nextInt(users.length)],urls[random.nextInt(users.length)],Calendar.getInstance().getTimeInMillis());// 使用 collectWithTimestamp 方法将数据发送出去,并指明数据中的时间戳的字段sourceContext.collectWithTimestamp(event, event.timestamp);// 发送水位线sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));Thread.sleep(1000);}}@Overridepublic void cancel() {flag = false;}}

窗口 Window

窗口的基本概述

窗口的基本概念

Flink 是一种流式计算的引擎,主要用于处理无界数据流。但是对于无界的数据,有时也需要进行一些“批处理”的操作,即将无限的数据按照一定的规则切割成有限的数据块进行处理,这就是所谓的窗口。

在 Flink 中,窗口实际上就是一个数据的存储桶,如下图所示。窗口可以把流切割成有限大小的多个“存储桶”,每个数据都会分发到对应的桶中。当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。

窗口的分类

Flink 中的窗口可以按照不同的角度进行分类:

  • 按照驱动类型分类

    • 时间窗口Time Window:可以定义窗口的开始和结束时间,到达结束时间时,出发计算并输出结果,同时窗口关闭销毁。时间窗口是一个左闭右开的区间;

    • 计数窗口Count Window:基于元素的个数来截取数据,到达固定的个数时出发计算并关闭窗口;

  • 按照窗口分配数据的规则分类

    • 滚动窗口Tumbling Windows:滚动窗口拥有固定的大小,是一种对数据均匀划分的切割方式,窗口之间没有间隔、没有重叠,是一种“首尾相接”的状态;

    • 滑动窗口Sliding Windows:滑动窗口同样拥有固定的大小,与滚动窗口不同的是,滑动窗口是存在重叠部分的。其存在一个滑动步长,每间隔该步长,则向前滑动一个步长的距离;

    • 会话窗口Session Windows:类似 web 应用中 session 的概念,需要制定一个超时时间参数,超过该时间没有数据到来则关闭窗口,因此绘画窗口只能基于时间定义;

    • 全局窗口Global Windows:该窗口没有切割数据流,而是将所有相同 key 的数据分配到同一个窗口,数据依旧是无界流。若希望对数据进行计算处理,还需要自定义“触发器”实现;

窗口的 API

在定义窗口操作之前,首先需要确定是基于按键分区的数据流KeyedStream来开窗还是在没有按键分区的DataStream上开窗。

  • 按键分区窗口:相同 key 的数据会被发送至同一个并行子任务,故窗口计算会被分配到多个并行子任务上执行,即每一组 key 都定义了一组窗口,各自独立统计;

    stream.keyBy(...).window(...)
    
  • 非按键分区:并行度为 1,一般不使用

    stream.windowAll(...)
    

窗口的基本使用

窗口分配器

定义窗口分配器是构建窗口算子的第一步,定义数据应该被分到哪个窗口,即窗口分配器就是在指定窗口的类型。

时间窗口

  • 滚动处理时间窗口

    • 窗口分配器类:TumblingProcessingTimeWindows

    • 方法:of

    • 参数

      • 参数一:传入Time类型的参数 size,表示滚动窗口的大小;

      • 参数二:可以传入Time类型的参数 offset,表示窗口开始的时间偏移量;

  • 滑动处理时间窗口

    • 窗口分配器类:SlidingProcessingTimeWindows

    • 方法:of

    • 参数:

      • 参数一:传入Time类型的参数 size,表示滚动窗口的大小;

      • 参数二:传入Time类型的参数 slide,表示滑动窗口的滑动步长;

  • 处理时间会话窗口

    • 窗口分配器类:ProcessingTimeSessionWindows

    • 方法:withGap()withDynamicGap()

    • 参数:

      • withGap()传入一个Time类型的参数 size,表示会话的超时时间;

      • withDynamicGap()传入一个SessionWindowTimeGapExtractor,定义动态提取超时时间的逻辑;

  • 滚动事件时间窗口

    • 窗口分配器类:TumblingEventTimeWindows

    • 使用方法同“滚动处理时间窗口”;

  • 滑动事件时间窗口

    • 窗口分配器类:SlidingEventTimeWindows

    • 使用方法同“滑动处理时间窗口”;

  • 事件时间会话窗口

    • 窗口分配器类:EventTimeSessionWindows

    • 使用方法同“处理时间会话窗口”;

计数窗口

  • 滚动计数窗口

    • 直接调用countWindow()方法,传入长整型的参数 size,表示窗口的大小;
  • 滑动计数窗口

    • 直接调用countWindow()方法,传入长整型的参数 size 和 slide,表示窗口的大小以及窗口的滑动步长;
  • 全局窗口

    • 直接调用window(),分配器由GlobalWindows类提供;

窗口函数

窗口分配器定义了数据的收集逻辑,窗口函数则定义了收集起来的数据的计算逻辑。

经窗口分配器处理之后,数据被分配到对应的窗口中,而数据流经过转换得到的数据类型是 WindowedStream。这个类型并不是DataStream,因此并不能直接进行其他转换,必须
进一步调用窗口函数,对收集的数据进行处理计算之后,才能最终再次得到DataStream

增量聚合函数

规约函数 ReduceFunction

窗口的归约就是将窗口中收集到的数据两两进行归约。当我们进行流处理时,就是要保存一个状态;每来一个新的数据,就和之前的聚合状态做归约,这样就实现了增量式的聚合。

示例代码如下,该代码段统计了在 5s 的时间窗口内各个 url 的访问量。

public class WindowReduceDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);environment.setParallelism(1);// 2. 加载数据源并设置水位线SingleOutputStreamOperator<Event> stream = environment// 2.1 加载数据源.addSource(new EventSource())// 2.2 获取时间戳、设置水位线.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((SerializableTimestampAssigner<Event>) (event, l) -> event.timestamp));// 3. 数据处理及输出stream// 3.1 将数据转换为 tuple 二元组,方便计算.map(new MapFunction<Event, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(Event event) throws Exception {return Tuple2.of(event.getUrl(), 1L);}})// 3.2 按键进行分区.keyBy(tuple -> tuple.f0)// 3.3 设置滚动事件时间窗口,窗口大小为 5s.window(TumblingEventTimeWindows.of(Time.seconds(5)))// 3.4 定义规约规则,窗口闭合时向下游发送结果.reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) {return Tuple2.of(value1.f0, value1.f1 + value2.f1);}})// 3.5 输出结果.print();// 4. 执行程序environment.execute();}}

聚合函数 AggregateFunction

增量聚合函数可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。这就要求我们必须在聚合前,先将数据转换成预期结果类型。

Flink 的 Window API 中的aggregate则提供了更加灵活的操作,该方法需要传入一个AggregateFunction的实现类作为参数。该接口存在四个方法,在下列代码示例中有功能解释。该代码段计算了在 10s 的时间窗口内,所有 url 的人均重复访问量(总访问量 / 访问用户数),每 2s 统计一次。

public class WindowAggregateDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);environment.setParallelism(1);// 2. 加载数据源并设置水位线SingleOutputStreamOperator<Event> stream = environment// 2.1 加载数据源.addSource(new EventSource())// 2.2 获取时间戳、设置水位线.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((SerializableTimestampAssigner<Event>) (event, l) -> event.timestamp));// 3. 数据处理及输出stream// 3.1 分区,将所有数据发送到一个分区进行统计.keyBy(item -> true)// 3.2 设置滑动事件事件窗口,窗口大小为 10s,滑动步长为 2s.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))// 3.3 定义聚合规则.aggregate(new CustomAggregate())// 3.4 输出结果.print();// 4. 执行程序environment.execute();}public static class CustomAggregate implements AggregateFunction<Event, Tuple2<HashSet<String>, Long>, Double> {/*** 创建累加器*/@Overridepublic Tuple2<HashSet<String>, Long> createAccumulator() {return Tuple2.of(new HashSet<String>(), 0L);}/*** 累加逻辑设计*/@Overridepublic Tuple2<HashSet<String>, Long> add(Event event, Tuple2<HashSet<String>, Long> accumulator) {// 每当一条属于该窗口的数据到来,就进行累加并返回累加器accumulator.f0.add(event.user);return Tuple2.of(accumulator.f0, accumulator.f1 + 1L);}/*** 窗口闭合时执行计算*/@Overridepublic Double getResult(Tuple2<HashSet<String>, Long> accumulator) {// 计算结果并发送到下游return (double) accumulator.f1 / accumulator.f0.size();}/*** 合并两个累加器*/@Overridepublic Tuple2<HashSet<String>, Long> merge(Tuple2<HashSet<String>, Long> hashSetLongTuple2, Tuple2<HashSet<String>, Long> acc1) {return null;}}}

全窗口函数

ProcessWindowFunction是 Window API 中最底层的通用窗口函数接口。之所以说它“最底
层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象”Context。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。

ProcessWindowFunction的优势是以牺牲性能和资源为代价的,作为一个全窗口函数其需要将所有的数据缓存下来,等到窗口触发计算时再使用。

示例代码如下,该代码段计算了在 5s 的时间窗口内所有 url 的独立访客数量。

public class ProcessWindowDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);environment.setParallelism(1);// 2. 加载数据源并设置水位线SingleOutputStreamOperator<Event> stream = environment// 2.1 加载数据源.addSource(new EventSource())// 2.2 获取时间戳、设置水位线.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((SerializableTimestampAssigner<Event>) (event, l) -> event.timestamp));// 3. 数据处理及输出stream// 3.1 分区,将所有数据发送到一个分区进行统计.keyBy(item -> true)// 3.2 设置滚动事件时间窗口,窗口大小为 10s.window(TumblingEventTimeWindows.of(Time.seconds(5)))// 3.3 定义窗口函数处理规则.process(new CustomProcessWindow())// 3.4 输出结果.print();// 4. 执行程序environment.execute();}public static class CustomProcessWindow extends ProcessWindowFunction<Event, String, Boolean, TimeWindow> {/*** 窗口函数处理规则,窗口关闭时执行处理*/@Overridepublic void process(Boolean aBoolean, ProcessWindowFunction<Event, String, Boolean, TimeWindow>.Context context,Iterable<Event> iterable, Collector<String> collector) {// 创建用户统计SetHashSet<String> userSet = new HashSet<>();for (Event event: iterable) {userSet.add(event.user);}long start = context.window().getStart();long end = context.window().getEnd();// 定制输出内容collector.collect("窗口【" + new TimeStamp(start) + "~" + new TimeStamp(end)+ "】的独立访客数量为>>>" + userSet.size());}}}

增量聚合和全窗口函数的结合使用

通过上面对增量聚合以及全窗口函数的了解,不难发现增量聚合的计算更加高效,其将计算过程分摊到窗口收集数据的过程当中,而全窗口函数则是将数据全部收集起来,等到窗口要闭合时再统一处理,因此增量聚合函数效率更高。

但是全窗口函数的优势在于其可以提供更多的信息,其只负责收集数据并提供信息,这让我们可以对数据执行更加灵活的操作。因此,在实际使用中,常把增量聚合以及全窗口函数结合使用。

通过结合使用,只需要在增量聚合中维护一个状态。当窗口触发需要执行计算时,全窗口函数直接获取增量函数的结果即可。

示例代码如下,该代码段计算了在 10s 的时间窗口内各个 url 的人均重复访问量(url 访问量 / url 访问用户数),并以 EventUrlCount 对象的形式输出结果,每 5s 计算一次。

public class CombinedUseDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);environment.setParallelism(1);// 2. 加载数据源并设置水位线SingleOutputStreamOperator<Event> stream = environment// 2.1 加载数据源.addSource(new EventSource())// 2.2 获取时间戳、设置水位线.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((SerializableTimestampAssigner<Event>) (event, l) -> event.timestamp));// 3. 数据处理及输出stream// 3.1 分区,将所有数据发送到一个分区进行统计.keyBy(item -> item.url)// 3.2 设置滑动事件事件窗口,窗口大小为 10s,滑动步长为 5s.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))// 3.3 结合使用增量聚合函数和全窗口函数.aggregate(new UrlCountAgg(), new UrlCountRes())// 3.4 输出结果.print();// 4. 执行程序environment.execute();}/*** 增量聚合函数定义*/public static class UrlCountAgg implements AggregateFunction<Event, Long, Long> {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(Event event, Long accumulator) {return accumulator + 1;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long aLong, Long acc1) {return null;}}/*** 全窗口函数定义*/public static class UrlCountRes extends ProcessWindowFunction<Long, EventUrlCount, String, TimeWindow> {@Overridepublic void process(String url, ProcessWindowFunction<Long, EventUrlCount, String, TimeWindow>.Context context,Iterable<Long> iterable, Collector<EventUrlCount> collector) {collector.collect(new EventUrlCount(url,iterable.iterator().next(),context.window().getStart(),context.window().getEnd()));}}}

实体类 EventUrlCount

@Data
@NoArgsConstructor
@AllArgsConstructor
public class EventUrlCount {public String url;public Long count;public Long windowStart;public Long windowEnd;}

测试水位线与窗口的使用

在之前的介绍中,水位线就是程序时钟的指示,而时间窗口也是根据时钟对数据进行分割的,在了解完水位线和窗口的知识后,可以将两者结合起来,通过下列测试程序简单的感受一下水位线对窗口的控制。

测试代码如下:此处采用 socket 数据源算子,详情参考Flink – DataStream API

public class WaterMarkWithWindowDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);environment.setParallelism(1);// 2. 加载数据源进行数据处理environment// 2.1 加载 socket 数据源.socketTextStream("XXX.XX.XX.XXX", 8080)// 2.2 对数据源进行简单处理,封装成对象.map(new MapFunction<String, Event>() {@Overridepublic Event map(String s) throws Exception {String[] split = s.split(",");return new Event(split[0].trim(),split[1].trim(),Long.valueOf(split[2].trim()));}})// 2.3 设置水位线,时延5s,timestamp 字段作为时间戳.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long recordTimestamp) {return event.timestamp;}}))// 2.4 按照 user 字段进行分区.keyBy(event -> event.user)// 2.5 配置滚动时间窗口,窗口大小为 10s.window(TumblingEventTimeWindows.of(Time.seconds(10)))// 2.6 配置全窗口处理函数.process(new WaterMarkResult())// 2.7 输出结果.print();// 3. 执行处理environment.execute();}public static class WaterMarkResult extends ProcessWindowFunction<Event, String, String, TimeWindow> {/*** 全局窗口处理函数逻辑*/@Overridepublic void process(String s, ProcessWindowFunction<Event, String, String, TimeWindow>.Context context,Iterable<Event> iterable, Collector<String> collector) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();long currentWatermark = context.currentWatermark();long count = iterable.spliterator().getExactSizeIfKnown();collector.collect("窗口[" + start + "~" + end + "]中共有" + count + "个元素|窗口闭合计算时,水位线处于>>>" + currentWatermark);}}}

执行测试,观察结果:

注意:窗口左闭右开,如 10000ms 的数据,应属于【10000-20000】窗口

窗口的其他 API

对于一个窗口算子,窗口分配器与窗口函数是必不可少的。除此之外,Flink 还提供了一些可选的 API,让我们可以更加灵活的控制窗口行为。

触发器 Trigger

触发器主要用于控制窗口何时进行触发计算,其是窗口算子的内部属性。对于 Flink 内置的窗口类型,其内部触发器都已经做了相应的实现,因此一般情况下不需要我们自定义触发器执行。

若业务逻辑确实需求自定义触发器,则直接调用 WindowedStream 的trigger()方法并传入自定义的触发器即可。如下所示。

stream.keyBy(...).window(...).trigger(new CustomTrigger())

自定义触发器需要继承Trigger抽象类,并实现下列四个方法:

  • onElement():窗口中每到来一个元素,都会调用该方法;

  • onEventTime():当注册的事件时间定时器触发时,将调用该方法;

  • onProcessingTime:当注册的处理时间定时器触发时,将调用该方法;

  • clear():当窗口关闭销毁时,将调用该方法,一般用于清除某些自定义的状态;

上述的方法中,处最后一个方法,另外三个方法的返回值均为枚举类型TriggerResult,其中定义了对窗口进行操作的四种类型,用于响应事件:

  • CONTINUE:继续,什么都不做;

  • FIRE:触发,触发计算,输出结果;

  • PURGE:清除,清空窗口中的所有数据,销毁窗口;

  • FIRE_AND_PURGE:触发并清除,触发计算输出结果,并清除窗口;

移除器 Evictor

移除器用于定义移除某些数据的逻辑,不同的窗口类型都有各自预实现的移除器。

若业务逻辑确实需求自定义移除器,则直接调用 WindowedStream 的evictor()方法并传入自定义的移除器即可。如下所示。

stream.keyBy(...).window(...).evictor(new CustomEvictor())

自定义移除器需要实现 Evictor 接口并实现下列两个方法:

  • evictBefore():定义执行窗口函数之前的移除数据操作;

  • evictAfter():定义执行窗口函数之后的一处数据操作;

允许延迟 Allowed Lateness

在事件时间语义当中,窗口中会出现迟到数据。若这部分数据在窗口触发计算并输出结果之后才到达,此时窗口一般已经销毁,因此迟到数据一般就会直接丢弃,这将导致统计结果不准确。

为解决迟到数据问题,除了之前提到过的设置水位线延迟的方法,Flink 还为开发者提供了一个特殊的接口。该接口可以为窗口算子提供一个”允许最大延迟“,在这段时间内,窗口不会被销毁,迟到的数据仍然可以被收集到相应的窗口并触发计算,直到时钟被推进到”水位线 + 允许最大延迟“时才会真正的关闭窗口。

基于 WindowedStream 调用allowedLateness()方法并传入一个Time类型的延迟时间即可完成允许延迟的设置,如下所示。

stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1))).allowedLateness(Time.minutes(5))

侧入流 Side Output

在某些情况下,即便设置了”水位线延迟“+”允许延迟时间“,很多数据还是会被丢弃。Flink 提供了侧输出流 side output 的方式来处理迟到的数据。该输出流相当于是数据流的一个分支,专门用于存放迟到过久的、本应该丢弃的数据。

基于 WindowedStream 调用sideOutputLateData()方法并传入一个OutputTag类型的侧入流输出标签即可实现该功能,OutputTag中的类型应与数据流中的类型相同,如下所示。

OutputTag<Event> outputTag = new OutputTag<Event>("late") {};
stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1))).sideOutputLateData(outputTag)

基于窗口处理完成之后的 DataStream,调用getSideOutput()方法并传入对应的输出标签,就可以获取到迟到数据所在的流,如下所示。

这里需要注意,getSideOutput()是 SingleOutputStreamOperator 的方法,获取到的侧输出流数据类型应该和OutputTag指定的类型一致,与窗口聚合之后流中的数据类型可以不同。

SingleOutputStreamOperator<AggResult> winAggStream = stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1))).sideOutputLateData(outputTag).aggregate(new MyAggregateFunction())
DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);

迟到数据的处理

通过对水位线以及窗口相关知识的了解,在此总结一下对于迟到数据的处理方式。在 Flink 中,对迟到数据的处理主要有下述三种方式:

  1. 设置水位线延迟时间;

  2. 设置允许延时,允许窗口处理迟到数据;

  3. 将迟到数据放入侧输出流;

一般来说,对于可控的较小延迟,通过 1、2 所述的方法就可以解决。但某些情况下,即便有前面的双重保证,但窗口也无法一直等待下去,总归是要关闭的。此时只能用 3 所属方法来兜底,保证数据不会丢失。后续开发者可以从侧输出流中重新获取数据并判断其所属的窗口,对数据进行手动的汇总更新。

下述代码在之前测试增量聚合函数与全窗口函数结合使用的代码上加以改进,增加了对迟到数据的处理,可以作为简单示例感受一下整个处理过程,代码如下:

public class LateDataDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);environment.setParallelism(1);// 2. 加载数据源并设置水位线SingleOutputStreamOperator<Event> stream = environment// 2.1 加载数据源.socketTextStream("XXX.XX.XX.XXX", 8080)// 2.2 对数据源进行简单处理,封装成对象.map(new MapFunction<String, Event>() {@Overridepublic Event map(String s) throws Exception {String[] split = s.split(",");return new Event(split[0].trim(),split[1].trim(),Long.valueOf(split[2].trim()));}})// 2.3 获取时间戳、设置水位线.assignTimestampsAndWatermarks(WatermarkStrategy// 迟到数据处理方式一:设置水位线延时.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((SerializableTimestampAssigner<Event>) (event, l) -> event.timestamp));// 定义侧输出流标签OutputTag<Event> outputTag = new OutputTag<Event>("late"){};// 3. 数据处理及输出SingleOutputStreamOperator<EventUrlCount> result = stream// 3.1 分区,将所有数据发送到一个分区进行统计.keyBy(event -> event.url)// 3.2 设置滚动窗口,窗口大小为 10s.window(TumblingEventTimeWindows.of(Time.seconds(10)))// 3.3 设置窗口允许延迟时间为 1min// 迟到数据处理方式二:设置窗口允许延时时间.allowedLateness(Time.minutes(1L))// 3.4 将最终的迟到数据输出到侧输出流// 迟到数据处理方式三:迟到数据入侧输出流.sideOutputLateData(outputTag)// 3.5 结合使用增量聚合函数和全窗口函数.aggregate(new UrlCountAgg(), new UrlCountRes());// 4. 输出结果// 4.1 正常数据输出result.print("result");// 4.2 迟到数据输出result.getSideOutput(outputTag).print("late");// 5. 执行程序environment.execute();}/*** 增量聚合函数定义*/public static class UrlCountAgg implements AggregateFunction<Event, Long, Long> {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(Event event, Long accumulator) {return accumulator + 1;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long aLong, Long acc1) {return null;}}/*** 全窗口函数定义*/public static class UrlCountRes extends ProcessWindowFunction<Long, EventUrlCount, String, TimeWindow> {@Overridepublic void process(String url, ProcessWindowFunction<Long, EventUrlCount, String, TimeWindow>.Context context,Iterable<Long> iterable, Collector<EventUrlCount> collector) {collector.collect(new EventUrlCount(url,iterable.iterator().next(),context.window().getStart(),context.window().getEnd()));}}}

测试结果如下,数据的关键时间点用红色方框标出,箭头指示的是聚合结果中包含的数据

【基础】Flink -- Time and Window相关推荐

  1. 聊聊flink的Tumbling Window

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的Tumbling Window WindowAssigner flink-streaming-java ...

  2. Flink中的window知识体系与scala完整案例

    [1]中得到大类,插图来自[2] 窗口大类(官方) 子分类 数据是否在窗口之间重叠 Time Windows Tumbling Windows Sliding Windows Count Window ...

  3. flink sql 知其所以然(八):flink sql tumble window 的奇妙解析之路

    感谢您的小爱心(关注  +  点赞 + 再看),对博主的肯定,会督促博主持续的输出更多的优质实战内容!!! 1.序篇-本文结构 大数据羊说 用数据提升美好事物发生的概率~ 34篇原创内容 公众号 源码 ...

  4. Flink 窗口函数(Window Functions)处理迟到数据

    文章目录 将迟到的数据放入侧输出流 Lambda架构:用一个流处理器,先快速的得到一个正确,近似正确的结果,然后在另外一层是一个批处理器,然后在它是一直等着的,等所有数据都到齐了,计算出一个最终准确的 ...

  5. Flink之窗口 (Window) 下篇

    窗口函数(Window Functions) 定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了:至于收集起来到底要做什么,其实还完全没有头绪.所以在窗口分配器之后,必须再接上一个 ...

  6. Flink之窗口 (Window) 上篇

    我们已经了解了 Flink 中事件时间和水位线的概念,那它们有什么具体应用呢?当然是做基于时间的处理计算了.其中最常见的场景,就是窗口聚合计算. 之前我们已经了解了 Flink 中基本的聚合操作.在流 ...

  7. 彻底搞清 Flink 中的 Window 机制

    [CSDN 编者按]Window是处理无限流的核心.Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理.Flink提 ...

  8. flink EventTime与Window

    EventTime的引入 在Flink的流式处理中,绝大部分的业务都会使用eventTime,一般只在eventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTim ...

  9. 彻底搞清Flink中的Window(Flink版本1.8)

    flink-window 窗口 在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理.当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分 ...

最新文章

  1. 字节腾讯阿里罕见联手:发布直播技术新标准,要让手机直播像电视一样丝滑...
  2. java 二叉树_二叉树实现java
  3. 20行python代码的入门级小游戏-200行Python代码实现的2048小游戏
  4. SET IDENTITY_INSERT [Table] [ON|OFF]
  5. SAP ABAP和C4C,Hybris Commerce里一些性能分析工具
  6. java jai create 方法_使用JAI扩展Java Image的功能
  7. 粗暴,干就完了----徐晓冬似的C语言自学笔记-----实现一个链表结构
  8. php 可视化neo4j,开源图形数据库Neo4j使用 php开发
  9. url decode problem
  10. oracle sql 分区查询语句_Oracle分区表跨分区查询数据为空
  11. 算法:回溯十六 Add and Search Word添加并查找单词
  12. 计算机二级数据库题库百度云,计算机二级数据库试题及答案
  13. 微信团购小程序怎么做?一般要多少钱?
  14. ppt菜鸟学飞第一天——基础知识及字体知识
  15. 魔兽争霸lostTemple地图
  16. 2018.11.07【NOIP训练】lzy的游戏(01背包)
  17. 原笔迹手写实现平滑和笔锋效果之:笔迹的平滑(一)
  18. sd卡驱动分析之host
  19. 北理工嵩天Python语言程序设计笔记(10 Python计算生态概览)
  20. Android 版本号---版本名

热门文章

  1. react antd-menu中使用阿里巴巴图片矢量库iconfont
  2. css中间镂空,css3如何实现遮罩层镂空效果 css3遮罩层镂空效果的多种实现方法
  3. 【备忘】用命令测试SMTP的发邮件功能
  4. 【小白学java】D36》》》线程入门学习,线程同步机制 和 线程等待与唤醒机制
  5. 隐藏PPT2003与PPT2007幻灯片中的声音图标
  6. 视频编解码之关于AI、RA、LD的解释
  7. ASP.net MySQL ExecuteScalar的简单使用
  8. 区块链应用 | 企业区块链如何转型?学学当年的“干脆面”吧
  9. HotSpot汇编器与模板解释器
  10. Linux 系统 ldirectord.cf 文件详解