Window意为窗口。在流处理系统中数据源源不断流入到系统,我们可以逐条处理流入的数据,也可以按一定规则一次处理流中的多条数据。当处理数据时程序需要知道什么时候开始处理、处理哪些数据。窗口提供了这样一种依据,决定了数据何时开始处理。

Flink内置Window

Flink有3个内置Window

  • 以事件数量驱动的Count Window

  • 以会话间隔驱动的Session Window

  • 以时间驱动的Time Window

本文围绕这3个内置窗口展开讨论,我们首先了解这3个窗口在运行时产生的现象,最后再讨论它们的实现原理。

Count Window

计数窗口,采用事件数量作为窗口处理依据。计数窗口分为滚动和滑动两类,使用keyedStream.countWindow实现计数窗口定义。

  • Tumbling Count Window 滚动计数窗口
    例子:以用户分组,当每位用户有3次付款事件时计算一次该用户付款总金额。下图中“消息A、B、C、D”代表4位不同用户,我们以A、B、C、D分组并计算金额。

/** 每3个事件,计算窗口内数据 */
keyedStream.countWindow(3);
复制代码

  • Sliding Count Window 滑动计数窗口
    例子:一位用户每3次付款事件计算最近4次付款事件总金额。

/** 每3个事件,计算最近4个事件消息 */
keyedStream.countWindow(4,3);
复制代码

Session Window

会话窗口,采用会话持续时长作为窗口处理依据。设置指定的会话持续时长时间,在这段时间中不再出现会话则认为超出会话时长。

例子:每只股票超过2秒没有交易事件时计算窗口内交易总金额。下图中“消息A、消息B”代表两只不同的股票。

/** 会话持续2秒。当超过2秒不再出现会话认为会话结束 */
keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(2)))复制代码

Time Window

时间窗口,采用时间作为窗口处理依据。时间窗分为滚动和滑动两类,使用keyedStream.timeWindow实现时间窗定义。

  • Tumbling Time Window 滚动时间窗口:

/** 每1分钟,计算窗口数据 */
keyedStream.timeWindow(Time.minutes(1));复制代码

  • Sliding Time Window 滑动时间窗口:

/** 每半分钟,计算最近1分钟窗口数据 */
keyedStream.timeWindow(Time.minutes(1), Time.seconds(30));
复制代码

Flink Window组件

Flink Window使用3个组件协同实现了内置的3个窗口。通过对这3个组件不同的组合,可以满足许多场景的窗口定义。

WindowAssigner组件为数据分配窗口、Trigger组件决定如何处理窗口中的数据、借助Evictor组件实现灵活清理窗口中数据时机。

WindowAssigner

当有数据流入到Window Operator时需要按照一定规则将数据分配给窗口,WindowAssigner为数据分配窗口。下面代码片段是WindowAssigner部分定义,assignWindows方法定义返回的结果是一个集合,也就是说数据允许被分配到多个窗口中。

/*** WindowAssigner关键接口定义 ***/
public abstract class WindowAssigner<T, W extends Window> implements Serializable {/** 分配数据到窗口集合并返回 */public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
}复制代码

Flink内置WindowAssigner

Flink针对不同窗口类型实现了相应的WindowAssigner。Flink 1.7.0继承关系如下图

Trigger

Trigger触发器,它定义了3个触发动作,并且定义了触发动作处理完毕后的返回结果。返回结果交给Window Operator后由Window Operator决定后续操作。也就是说,Trigger通过具体的动作处理结果决定窗口是否应该被处理、被清除、被处理+清除、还是什么都不做。

/** Trigger关键接口定义 */
public abstract class Trigger<T, W extends Window> implements Serializable {/*** 新的数据进入窗口时触发 ***/public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;/*** 处理时间计数器触发 ***/public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;/*** 事件时间计数器触发 ***/public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
}
复制代码

当有数据流入Window Operator时会触发onElement方法、当处理时间和事件时间生效时会触发onProcessingTime和onEventTime方法。每个触发动作的返回结果用TriggerResult定义。

TriggerResult返回类型及说明

Trigger触发运算后返回处理结果,处理结果使用TriggerResult枚举表示。

public enum TriggerResult {CONTINUE,FIRE,PURGE,FIRE_AND_PURGE;
}复制代码

Flink内置Trigger

Flink的内置窗口(Counter、Session、Time)有自己的触发器实现。下表为不同窗口使用的触发器。

Evictor

Evictor驱逐者,如果定义了Evictor当执行窗口处理前会删除窗口内指定数据再交给窗口处理,或等窗口执行处理后再删除窗口中指定数据。

public interface Evictor<T, W extends Window> extends Serializable {/** 在窗口处理前删除数据 */void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);/** 在窗口处理后删除数据 */void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
}复制代码

Flink内置Evictor

实现原理

通过KeyedStream可以直接创建Count Window和Time Window。他们最终都是基于window(WindowAssigner)方法创建,在window方法中创建WindowedStream实例,参数使用当前的KeyedStream对象和指定的WindowAssigner。

/** 依据WindowAssigner实例化WindowedStream */
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {return new WindowedStream<>(this, assigner);
}复制代码

/** WindowedStream构造器 */
public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {this.input = input;this.windowAssigner = windowAssigner;this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
}
复制代码

构造器执行完毕后,WindowedStream创建完成。构造器中初始化了3个属性。默认情况下trigger属性使用WindowAssigner提供的DefaultTrigger作为初始值。

同时,WindowedStream提供了trigger方法用来覆盖默认的trigger。Flink内置的计数窗口就使用windowedStream.trigger方法覆盖了默认的trigger。

public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {if (windowAssigner instanceof MergingWindowAssigner && !trigger.canMerge()) {throw new UnsupportedOperationException();}if (windowAssigner instanceof BaseAlignedWindowAssigner) {throw new UnsupportedOperationException();}this.trigger = trigger;return this;
}复制代码

在WindowedStream中还有一个比较重要的属性evictor,可以通过evictor方法设置。

public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {if (windowAssigner instanceof BaseAlignedWindowAssigner) {throw new UnsupportedOperationException();}this.evictor = evictor;return this;
}复制代码

WindowedStream实现中根据evictor属性是否空(null == evictor)决定是创建WindowOperator还是EvictingWindowOperator。EvictingWindowOperator继承自WindowOperator,它主要扩展了evictor属性以及相关的逻辑处理。

public class EvictingWindowOperator extends WindowOperator {private final Evictor evictor;
}复制代码

Evictor定义了清理数据的时机。在EvictingWindowOperator的emitWindowContents方法中,实现了清理数据逻辑调用。这也是EvictingWindowOperator与WindowOperator的主要区别。「在WindowOperator中压根就没有evictor的概念」

private void emitWindowContents(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception {/** Window处理前数据清理 */evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));/** Window处理 */userFunction.process(triggerContext.key, triggerContext.window, processContext, projectedContents, timestampedCollector);/** Window处理后数据清理 */evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
}
复制代码

Count Window API

下面代码片段是KeyedStream提供创建Count Window的API。

/** 滚动计数窗口 */
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}/** 滑动计数窗口 */
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {return window(GlobalWindows.create()).evictor(CountEvictor.of(size)).trigger(CountTrigger.of(slide));
}
复制代码

滚动计数窗口与滑动计数窗口有几个差异

  • 入参不同

  • 滑动窗口使用了evictor组件

  • 两者使用的trigger组件不同

下面我们对这几点差异做深入分析,看一看他们是如何影响滚动计数窗口和滑动计数窗口的。

Count Window Assigner
通过方法window(GlobalWindows.create())创建WindowedStream实例,滚动计数窗口处理和滑动计数窗口处理都是基于GlobalWindows作为WindowAssigner来创建窗口处理器。GlobalWindows将所有数据都分配到同一个GlobalWindow中。「这里需要注意GlobalWindows是一个WindowAssigner,而GlobalWindow是一个Window」

/** GlobalWindows是一个WindowAssigner实现,这里只展示实现assignWindows的代码片段 */
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {/** 返回一个GlobalWindow */public Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {return Collections.singletonList(GlobalWindow.get());}
}
复制代码

GlobalWindow继承了Window,表示为一个窗口。对外提供get()方法返回GlobalWindow实例,并且是个全局单例。所以当使用GlobalWindows作为WindowAssigner时,所有数据将被分配到一个窗口中。

/** GlobalWindow是一个Window */
public class GlobalWindow extends Window {private static final GlobalWindow INSTANCE = new GlobalWindow();/** 永远返回GlobalWindow单例 */public static GlobalWindow get() {return INSTANCE;}
}
复制代码

Count Window Trigger
滚动计数窗口创建时使用PurgingTrigger.of(CountTrigger.of(size))覆盖了GlobalWindows默认的Trigger,而滑动计数窗口创建时使用CountTrigger.of(size)覆盖了GlobalWindows默认的Trigger。

PurgingTrigger是一个代理模式的Trigger实现,在计数窗口中PurgingTrigger代理了CountTrigger。

/** PurgingTrigger代理的Trigger */
private Trigger<T, W> nestedTrigger;
/** PurgingTrigger私有构造器 */
private PurgingTrigger(Trigger<T, W> nestedTrigger) {this.nestedTrigger = nestedTrigger;
}/** 为代理的Trigger构造一个PurgingTrigger实例 */
public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {return new PurgingTrigger<>(nestedTrigger);
}
复制代码

在这里比较一下PurgingTrigger.onElement和CountTrigger.onElement方法实现,帮助理解PurgingTrigger的作用。

/** CountTrigger实现 */
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {ReducingState<Long> count = ctx.getPartitionedState(stateDesc);count.add(1L);if (count.get() >= maxCount) {count.clear();return TriggerResult.FIRE;}return TriggerResult.CONTINUE;
}
/** PurgingTrigger实现 */
public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}
复制代码

在CountTrigger实现中,当事件流入窗口后计数+1,之后比较窗口中事件数是否大于设定的最大数量,一旦大于最大数量返回FIRE。也就是说只处理窗口数据,不做清理。

在PurgingTrigger实现中,依赖CountTrigger的处理逻辑,但区别在于当CounterTrigger返回FIRE时PurgingTrigger返回FIRE_AND_PURGE。也就是说不仅处理窗口数据,还做数据清理。通过这种方式实现了滚动计数窗口数据不重叠。

Count Window Evictor
滚动计数窗口和滑动计数窗口另一个区别在于滑动计数窗口通过windowedStream.evictor(CountEvictor.of(size))方法设置了Evictor,而滚动窗口并没有设置Evictor。

滑动计数窗口依赖Evictor组件在窗口处理前清除了指定数量以外的数据,再交给窗口处理。通过这种方式实现了窗口计算最近指定次数的事件数量。

总结

Time Window API

下面代码片段是KeyedStream中提供创建Time Window的API。

/** 创建滚动时间窗口 */
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {return window(TumblingProcessingTimeWindows.of(size));} else {return window(TumblingEventTimeWindows.of(size));}
}
/** 创建滑动时间窗口 */
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {return window(SlidingProcessingTimeWindows.of(size, slide));} else {return window(SlidingEventTimeWindows.of(size, slide));}
}
复制代码

创建TimeWindow时会根据Flink应用当前时间类型environment.getStreamTimeCharacteristic()来决定使用哪个WindowAssigner创建窗口。

Flink对时间分成了3类。处理时间、摄入时间、事件时间。使用TimeCharacteristic枚举定义。

public enum TimeCharacteristic {/** 处理时间 */ProcessingTime,/** 摄入时间 */IngestionTime,/** 事件时间 */EventTime
}
复制代码

对于Flink的3个时间概念,我们目前只需要了解

  • 处理时间(TimeCharacteristic.ProcessingTime)就是运行Flink环境的系统时钟产生的时间

  • 事件时间(TimeCharacteristic.EventTime)是业务上产生的时间,由数据自身携带

  • 摄入时间(TimeCharacteristic.IngestionTime)是数据进入到Flink的时间,它在底层实现上与事件时间相同。

Time Window Assigner

下面的表格中展示了窗口类型和时间类型对应的WindowAssigner的实现类

我们以一个TumblingProcessingTimeWindows和一个SlidingEventTimeWindows为例,讨论它的实现原理。

TumblingProcessingTimeWindows
TumblingProcessingTimeWindows基于处理时间的滚动时间窗口分配器,它是一个WindowAssigner。Flink提供两个接口初始化TumblingProcessingTimeWindows

public static TumblingProcessingTimeWindows of(Time size) {return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0);
}
public static TumblingProcessingTimeWindows of(Time size, Time offset) {return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
}
复制代码

不管使用哪种方式初始化TumblingProcessingTimeWindows,最终都会调用同一个构造方法初始化,构造方法初始化size和offset两个属性。

/** TumblingProcessingTimeWindows构造器 */
private TumblingProcessingTimeWindows(long size, long offset) {if (offset < 0 || offset >= size) {throw new IllegalArgumentException();}this.size = size;this.offset = offset;
}
复制代码

TumblingProcessingTimeWindows是一个WindowAssigner,所以它实现了assignWindows方法来为流入的数据分配窗口。

public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {final long now = context.getCurrentProcessingTime();long start = TimeWindow.getWindowStartWithOffset(now, offset, size);return Collections.singletonList(new TimeWindow(start, start + size));
}
复制代码

第一步assignWindows首先获得系统当前时间戳,context.getCurrentProcessingTime();最终实现实际是调用System.currentTimeMillis()。

第二步执行TimeWindow.getWindowStartWithOffset(now, offset, size);这个方法根据当前时间、偏移量、设置的间隔时间最终计算窗口起始时间。

第三步根据起始时间和结束时间创建一个新的窗口new TimeWindow(start, start + size)并返回。

比如,希望每10秒处理一次窗口数据keyedStream.timeWindow(Time.seconds(10))。当数据源源不断的流入Window Operator时,它会按10秒切割一个时间窗。

我们假设数据在2019年1月1日 12:00:07到达,那么窗口以下面方式切割(请注意,窗口是左闭右开)。

Window[2019年1月1日 12:00:00, 2019年1月1日 12:00:10)复制代码

如果在2019年1月1日 12:10:09又一条数据到达,窗口是这样的

Window[2019年1月1日 12:10:00, 2019年1月1日 12:10:10)复制代码

如果我们希望从第15秒开始,每过1分钟计算一次窗口数据,这种场景需要用到offset。基于处理时间的滚动窗口可以这样写

keyedStream.window(TumblingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(15)))复制代码

我们假设数据从2019年1月1日 12:00:14到达,那么窗口以下面方式切割

Window[2019年1月1日 11:59:15, 2019年1月1日 12:00:15)复制代码

如果在2019年1月1日 12:00:16又一数据到达,那么窗口以下面方式切割

Window[2019年1月1日 12:00:15, 2019年1月1日 12:01:15)复制代码

TumblingProcessingTimeWindows.assignWindows方法每次都会返回一个新的窗口,也就是说窗口是不重叠的。但因为TimeWindow实现了equals方法,所以通过计算后start, start + size相同的数据,在逻辑上是同一个窗口。

public boolean equals(Object o) {if (this == o) {return true;}if (o == null || getClass() != o.getClass()) {return false;}TimeWindow window = (TimeWindow) o;return end == window.end && start == window.start;
}
复制代码

SlidingEventTimeWindows
SlidingEventTimeWindows基于事件时间的滑动时间窗口分配器,它是一个WindowAssigner。Flink提供两个接口初始化SlidingEventTimeWindows

public static SlidingEventTimeWindows of(Time size, Time slide) {return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
}
public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),offset.toMilliseconds() % slide.toMilliseconds());
}复制代码

同样,不管使用哪种方式初始化SlidingEventTimeWindows,最终都会调用同一个构造方法初始化,构造方法初始化三个属性size、slide和offset。

protected SlidingEventTimeWindows(long size, long slide, long offset) {if (offset < 0 || offset >= slide || size <= 0) {throw new IllegalArgumentException();}this.size = size;this.slide = slide;this.offset = offset;
}复制代码

SlidingEventTimeWindows是一个WindowAssigner,所以它实现了assignWindows方法来为流入的数据分配窗口。

public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {if (timestamp > Long.MIN_VALUE) {List<TimeWindow> windows = new ArrayList<>((int) (size / slide));long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);for (long start = lastStart; start > timestamp - size;start -= slide) {windows.add(new TimeWindow(start, start + size));}return windows;} else {throw new RuntimeException();}
}复制代码

与基于处理时间的WindowAssigner不同,基于事件时间的WindowAssigner不依赖于系统时间,而是依赖于数据本身的事件时间。在assignWindows方法中第二个参数timestamp就是数据的事件时间。

第一步assignWindows方法会先初始化一个List<TimeWindow>,大小是size / slide。这个集合用来存放时间窗对象并作为返回结果。

第二步执行TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);计算窗口起始时间。

第三步根据事件时间、滑动大小和窗口大小计算并生成数据能落入的窗口new TimeWindow(start, start + size),最后加入到List集合并返回。「因为是滑动窗口一个数据可能落在多个窗口」

比如,希望每5秒滑动一次处理最近10秒窗口数据keyedStream.timeWindow(Time.seconds(10), Time.seconds(5))。当数据源源不断流入Window Operator时,会按10秒切割一个时间窗,5秒滚动一次。

我们假设一条付费事件数据付费时间是2019年1月1日 17:11:24,那么这个付费数据将落到下面两个窗口中(请注意,窗口是左闭右开)。

Window[2019年1月1日 17:11:20, 2019年1月1日 17:11:30)
Window[2019年1月1日 17:11:15, 2019年1月1日 17:11:25)复制代码

Time Window Trigger
Flink API在创建Time Window时没有使用windowStream.trigger方法覆盖默认Trigger。

TumblingProcessingTimeWindows使用ProcessingTimeTrigger作为默认Trigger。ProcessingTimeTrigger在onElement的策略是永远返回CONTINUE,也就是说它不会因为数据的流入触发窗口计算和清理。在返回CONTINUE前调用registerProcessingTimeTimer(window.maxTimestamp());注册一个定时器,并且逻辑相同窗口只注册一次,事件所在窗口的结束时间与系统当前时间差决定了定时器多久后触发。

ScheduledThreadPoolExecutor.schedule(new TriggerTask(), timeEndTime - systemTime, TimeUnit.MILLISECONDS);
复制代码

定时器一旦触发会回调Trigger的onProcessingTime方法。ProcessingTimeTrigger中实现的onProcessingTime直接返回FIRE。也就是说系统时间大于等于窗口最大时间时,通过回调方式触发窗口计算。但因为返回的是FIRE只是触发了窗口计算,并没有做清除。

SlidingEventTimeWindows使用EventTimeTrigger作为默认Trigger。事件时间、摄入时间与处理时间在时间概念上有一点不同,处理时间处理依赖的是系统时钟生成的时间,而事件时间和摄入时间依赖的是Watermark(水印)。我们现在只需要知道水印是一个时间戳,可以由Flink以固定的时间间隔发出,或由开发人员根据业务自定义。水印用来衡量处理程序的时间进展。

EventTimeTrigger的onElement方法中比较窗口的结束时间与当前水印时间,如果窗口结束时间已小于或等于当前水印时间立即返回FIRE。

「个人理解这是由于时间差问题导致的窗口时间小于或等于当前水印时间,正常情况下如果窗口结束时间已经小于水印时间则数据不会被处理,也不会调用onElement」

如果窗口结束时间大于当前水印时间,调用registerEventTimeTimer(window.maxTimestamp())注册一个事件后直接返回CONTINUE。EventTime注册事件没有使用Scheduled,因为它依赖水印时间。所以在注册时将逻辑相同的时间窗封装为一个特定对象添加到一个排重队列,并且相同窗口对象只添加一次。

上面提到水印是以固定时间间隔发出或由开发人员自定义的,Flink处理水印时从排重队列头获取一个时间窗对象与水印时间戳比较,一旦窗口时间小于或等于水印时间回调trigger的onEventTime。

EventTimeTrigger中onEventTime并不是直接返回FIRE,而是判断窗口结束时间与获取的时间窗对象时间做比较,仅当时间相同时才返回FIRE,其他情况返回CONTINUE。「个人理解这么做是为了满足滑动窗口的需求,因为滑动窗口在排重队列中存在两个不同的对象,而两个窗口对象的时间可能同时满足回调条件」

Time Window Evictor

Flink内置Time Window实现没有使用Evictor。

Session Window API

KeyedStream中没有为Session Window提供类似Count Windown和Time Window一样能直接使用的API。我们可以使用window(WindowAssigner assigner)创建Session Window。

比如创建一个基于处理时间,时间间隔为2秒的SessionWindow可以这样实现

keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(2)))复制代码

Assigner
Flink内置的Session Window Assigner全部继承MergingWindowAssigner。下图展示了MergingWindowAssigner的上下结构关系。

MergingWindowAssigner继承了WindowAssigner,所以它具备分配时间窗的能力。MergingWindowAssigner自身是一个可以merge的Window,它的内部定义了一个mergeWindows抽象方法以及merge时的回调定义。

public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback);public interface MergeCallback<W> {void merge(Collection<W> toBeMerged, W mergeResult);
}复制代码

我们以ProcessingTimeSessionWindows为例介绍Session Window。ProcessingTimeSessionWindows提供了一个静态方法用来初始化ProcessingTimeSessionWindows

public static ProcessingTimeSessionWindows withGap(Time size) {return new ProcessingTimeSessionWindows(size.toMilliseconds());
}复制代码

静态方法withGap接收一个时间参数,用来描述时间间隔。并调用构造方法将时间间隔赋值给sessionTimeout属性。

protected ProcessingTimeSessionWindows(long sessionTimeout) {if (sessionTimeout <= 0) {throw new IllegalArgumentException();}this.sessionTimeout = sessionTimeout;
}复制代码

ProcessingTimeSessionWindows是一个WindowAssigner,所以它实现了数据分配窗口的能力。

public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {long currentProcessingTime = context.getCurrentProcessingTime();return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
}复制代码

ProcessingTimeSessionWindows会为每个数据都分配一个新的时间窗口。由于是基于处理时间,所以窗口的起始时间就是系统当前时间,而结束时间是系统当前时间+设置的时间间隔。通过起始时间和结束时间确定了窗口的时间范围。

Trigger
如果在代码中我们不手动覆盖Trigger,那么将使用ProcessingTimeSessionWindows默认的ProcessingTimeTrigger

public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {return ProcessingTimeTrigger.create();
}复制代码

ProcessingTimeTrigger在基于处理时间的Time Window介绍过,它通过注册、onProcessorTime回调方式触发窗口计算,这里不再讨论。

Evictor
Session Window不由Flink API控制生成,完全取决于客户端如何创建。在创建Window实例后可以通过调用evictor方法并传入Flink内置的Evictor或自己实现的Evictor。

Merging
Session Window继承MergingWindowAssigner,MergingWindowAssigner继承WindowAssigner。所以本质上Session Window还是一个WindowAssigner,但因继承了MergingWindowAssigner使得自己具有了一个「可以合并时间窗口」的特性。

public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {TimeWindow.mergeWindows(windows, c);
}复制代码

Session Window处理流程大致是这样

  1. 使用WindowAssigner为流入的数据分配窗口

  2. Merge窗口,将存在交集的窗口合并,取最小时间和最大时间作为窗口的起始和关闭。假设有两条数据流入系统后,通过WindowAssigner分配的窗口分别是
    数据A:Window[2019年1月1日 10:00:00, 2019年1月1日 10:20:00)
    数据B:Window[2019年1月1日 10:05:00, 2019年1月1日 10:25:00)
    经过合并后,使用数据A的起始时间和数据B的结束时间作为节点,窗口时间变为了
    [2019年1月1日 10:00:00, 2019年1月1日 10:25:00)

  3. 执行Trigger.onMerge,为合并后的窗口注册回调事件

  4. 移除其他注册的回调事件

  5. Window State合并

  6. 开始处理数据,执行Trigger.onElement
    …后续与其他Window处理一样

可以看到,Session Window与Time Window类似,通过注册回调方式触发数据处理。但不同的是Session Window通过不断为新流入的数据做Merge操作来改变回调时间点,以实现Session Window的特性。

总结

  • Window Operator创建
    Window处理流程由WindowOperator或EvictingWindowOperator控制,他们的关系及区别体现在以下几点

  1. EvictingWindowOperator继承自WindowOperator,所以EvictingWindowOperator是一个WindowOperator,具备WindowOperator的特性。

  2. 清理窗口数据的机制不同,EvictingWindowOperator内部依赖Evictor组件,而WindowOperator内部不使用Evictor。这也导致它们两个Operator初始化时的差异

  • MergeWindow特殊处理
    可以合并窗口的WindowAssigner会继承MergingWindowAssigner。当数据流入Window Operator后,根据WindowAssigner是否为一个MergingWindowAssigner决定了处理流程。

  • 窗口生命周期
    Flink内置的窗口生命周期是不同的,下表描述了他们直接的差异

  • 侧路输出
    当Flink应用采用EventTime作为时间机制时,Window不会处理延迟到达的数据,也就是说不处理在水印时间戳之前的数据。Flink提供了一个SideOutput机制可以处理这些延迟到达的数据。通过WindowedStream.sideOutputLateData方法实现侧路输出。

  • 自定义窗口
    Flink内置窗口利用WindowAssigner、Trigger、Evictor3个组件的相互组合实现了多种非常强大的功能,我们也可以尝试通过组件实现一个自定义的Window。由于篇幅原因,自定义窗口下篇再细聊。

作者:TalkingData 史天舒

转载于:https://juejin.im/post/5cb6d4426fb9a068af37aa60

Flink Window基本概念与实现原理相关推荐

  1. 1.16.Flink Window和Time详解、TimeWindow的应用、Window聚合分类之全量聚合、全量聚合状态变化过程-求最大值、Time介绍、EventTime和Watermarks等

    1.16.Flink Window和Time详解 1.16.1.Window(窗口) 1.16.2.Window的类型 1.16.3.Window类型汇总 1.16.4.TimeWindow的应用 1 ...

  2. Flink Window机制详解

    Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理.而窗口(window)就是从 Streaming 到 Batch ...

  3. GIS基础知识汇总篇(五)-无人机真正射影像的概念和制作原理

    正射影像应同时具有地图的几何精度和影像的视觉特征,特别是对于高分辨率.大比例尺的正射影像图,它可作为背景控制信息去评价其他地图空间数据的精度.现势性和完整性.然而作为一个视觉影像地图产品,影像上由于投 ...

  4. OSPF路由协议概念及工作原理

    随着Internet技术在全球范围内的飞速发展,IP网络作为一种最有前景的网络技术,受到了人们的普遍关注.而作为IP网络生存.运作.组织的核心--IP路由技术提供了解决IP网络动态可变性.实时性.Qo ...

  5. 轻松认识HTTP协议的概念和工作原理

    轻松认识HTTP协议的概念和工作原理 当我们想浏览一个网站的时候,只要在浏览器的地址栏里输入网站的地址就可以了,例如:www.microsoft.com,但是在浏览器的地址栏里面出现的却是:http: ...

  6. 区块链的基本概念和工作原理

    区块链的基本概念和工作原理 1.基本概念 区块链是分布式数据存储.点对点传输.共识机制.加密算法等计算机技术的新型应用模式.所谓共识机制是区块链系统中实现不同节点之间建立信任.获取权益的数学算法. 区 ...

  7. DI的概念和实现原理—Spring系列介绍

    DI的概念和实现原理-Spring系列介绍 DI和AOP是Spring中的两个核心概念,要学习DI和AOP,首先就需要了解清楚什么是DI,什么是AOP,这篇文章会讲解一下DI的概念和实现原理,不足之处 ...

  8. 无人机真正射影像的概念和制作原理

    正射影像应同时具有地图的几何精度和影像的视觉特征,特别是对于高分辨率.大比例尺的正射影像图,它可作为背景控制信息去评价其他地图空间数据的精度.现势性和完整性.然而作为一个视觉影像地图产品,影像上由于投 ...

  9. 第12节 DNS服务器基本概念、解析原理及部署——以win2003为例

    DNS服务器基本概念.解析原理及部署 1 DNS概述 1.1 基本概念 1.2 域名的结构--树形结构 2 DNS解析分类及过程 2.1 按查询方式分类: 2.2 按查询的内容分类 2.3 普通用户机 ...

最新文章

  1. LeetCode实战:寻找两个有序数组的中位数
  2. 我的zsh配置, 2019最新方案
  3. 判断一个数是否是素数,为什么只要除到根号那个数就够了
  4. Spring MVC Controller中返回json数据中文乱码处理
  5. fastText初探
  6. 2018年90后薪资报告出炉:你在哪个级别???
  7. 【oracle ocp知识点一】
  8. Atitit freemarker模板总结 D:\workspace\springboothelloword\src\com\attilax\util\TempleteFreemarkerUtil.
  9. 表情包网站项目(学习自程序员鱼皮)
  10. opencv学习——翻转摄像头
  11. AutoCAD 天正建筑2014安装破解
  12. 硬盘出现“文件或目录损坏且无法读取”的故障,怎么解决?
  13. 2017 robotart x86_RobotArt:机器人离线编程仿真软件领航者
  14. Centos7为yum设置代理
  15. pdf文件怎么缩小兆数
  16. 输入框只允许输入数字字母下划线
  17. 《编译 - 编译杂记》GCC优化等级说明
  18. OAuth2.0,CodeChallenge的生成问题
  19. Nginx特性、安装、配置
  20. Nmap使用教程 - 一

热门文章

  1. Python 之 matplotlib (十三) subplot分格显示
  2. 世界一流大学如何建设人工智能学科
  3. 揭秘人工智能背后鲜为人知的人工力量——数据标注
  4. 薛其坤院士对话马斯克:下一个颠覆性创新是什么?
  5. Gartner发布2021年重要战略科技趋势!
  6. 五问智能教育未来发展:重点解决什么问题?
  7. 教育部:建设100+AI特色专业, 500万AI人才缺口要补上!
  8. 百度谷歌等联合推出机器学习基准 加速AI软硬件发展
  9. Java面试高Spring Boot+Sentinel+Nacos高并发已撸完
  10. NLP的神经网络训练的新模式