参考,

http://wuchong.me/blog/2016/05/25/flink-internals-window-mechanism/

http://wuchong.me/blog/2016/06/06/flink-internals-session-window/

WindowOperator

window operator通过WindowAssigner和Trigger来实现它的逻辑

当一个element到达时,通过KeySelector先assign一个key,并且通过WindowAssigner assign若干个windows,这样这个element会被放入若干个pane

一个pane会存放所有相同key和相同window的elements

/*** An operator that implements the logic for windowing based on a {@link WindowAssigner} and* {@link Trigger}.** <p>* When an element arrives it gets assigned a key using a {@link KeySelector} and it gets* assigned to zero or more windows using a {@link WindowAssigner}. Based on this, the element* is put into panes. A pane is the bucket of elements that have the same key and same* {@code Window}. An element can be in multiple panes if it was assigned to multiple windows by the* {@code WindowAssigner}.** <p>* Each pane gets its own instance of the provided {@code Trigger}. This trigger determines when* the contents of the pane should be processed to emit results. When a trigger fires,* the given {@link InternalWindowFunction} is invoked to produce the results that are emitted for* the pane to which the {@code Trigger} belongs.** @param <K> The type of key returned by the {@code KeySelector}.* @param <IN> The type of the incoming elements.* @param <OUT> The type of elements emitted by the {@code InternalWindowFunction}.* @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.*/
@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {// ------------------------------------------------------------------------// Configuration values and user functions// ------------------------------------------------------------------------protected final WindowAssigner<? super IN, W> windowAssigner;protected final KeySelector<IN, K> keySelector;protected final Trigger<? super IN, ? super W> trigger;protected final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;/*** The allowed lateness for elements. This is used for:* <ul>*     <li>Deciding if an element should be dropped from a window due to lateness.*     <li>Clearing the state of a window if the system time passes the*         {@code window.maxTimestamp + allowedLateness} landmark.* </ul>*/protected final long allowedLateness; //允许late多久,即当watermark已经触发后/*** To keep track of the current watermark so that we can immediately fire if a trigger* registers an event time callback for a timestamp that lies in the past.*/protected transient long currentWatermark = Long.MIN_VALUE;protected transient Context context = new Context(null, null); //Trigger Contextprotected transient WindowAssigner.WindowAssignerContext windowAssignerContext; //只为获取getCurrentProcessingTime// ------------------------------------------------------------------------// State that needs to be checkpointed// ------------------------------------------------------------------------/*** Processing time timers that are currently in-flight.*/protected transient PriorityQueue<Timer<K, W>> processingTimeTimersQueue; //Timer用于存储timestamp,key,window, queue按时间排序/*** Current waiting watermark callbacks.*/protected transient Set<Timer<K, W>> watermarkTimers;protected transient PriorityQueue<Timer<K, W>> watermarkTimersQueue; //
protected transient Map<K, MergingWindowSet<W>> mergingWindowsByKey; //用于记录merge后的stateWindow和window的对应关系

对于window operator而已,最关键的是WindowAssigner和Trigger

WindowAssigner

WindowAssigner,用于指定一个tuple应该被分配到那些windows去

借用个图,可以看出有多少种WindowAssigner

对于WindowAssigner,最关键的接口是,assignWindows

为一个element,分配一组windows, Collection<W>

@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {private static final long serialVersionUID = 1L;/*** Returns a {@code Collection} of windows that should be assigned to the element.** @param element The element to which windows should be assigned.* @param timestamp The timestamp of the element.* @param context The {@link WindowAssignerContext} in which the assigner operates.*/public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);/*** Returns the default trigger associated with this {@code WindowAssigner}.*/public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);/*** Returns a {@link TypeSerializer} for serializing windows that are assigned by* this {@code WindowAssigner}.*/public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);

实际看下,具体WindowAssigner的实现

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {@Overridepublic Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {final long now = context.getCurrentProcessingTime();long start = now - (now % size);return Collections.singletonList(new TimeWindow(start, start + size)); //很简单,分配一个TimeWindow
    }@Overridepublic Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {return ProcessingTimeTrigger.create(); //默认给出的是ProcessingTimeTrigger,如其名}

public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {private final long size;private final long slide;@Overridepublic Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {if (timestamp > Long.MIN_VALUE) {List<TimeWindow> windows = new ArrayList<>((int) (size / slide));long lastStart = timestamp - timestamp % slide;for (long start = lastStart;start > timestamp - size;start -= slide) {windows.add(new TimeWindow(start, start + size)); //可以看到这里会assign多个TimeWindow,因为是slide
            }return windows;} else {}}@Overridepublic Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {return EventTimeTrigger.create();}

Trigger, Evictor

参考,Flink – Trigger,Evictor

下面看看3个主要的接口,分别触发,onElement,onEventTime,onProcessingTime

processElement

处理element到达的逻辑,触发onElement

public void processElement(StreamRecord<IN> element) throws Exception {Collection<W> elementWindows = windowAssigner.assignWindows(  //通过WindowAssigner为element分配一系列windows
        element.getValue(), element.getTimestamp(), windowAssignerContext);final K key = (K) getStateBackend().getCurrentKey();if (windowAssigner instanceof MergingWindowAssigner) { //如果是MergingWindow//.......} else { //如果是普通windowfor (W window: elementWindows) {// drop if the window is already lateif (isLate(window)) { //late data的处理,默认是丢弃  continue;}AppendingState<IN, ACC> windowState = getPartitionedState( //从backend中取出该window的状态,就是buffer的element
                window, windowSerializer, windowStateDescriptor);windowState.add(element.getValue()); //把当前的element加入buffer state
context.key = key;context.window = window; //context的设计相当tricky和晦涩
TriggerResult triggerResult = context.onElement(element); //触发onElment,得到triggerResultif (triggerResult.isFire()) { //对triggerResult做各种处理ACC contents = windowState.get();if (contents == null) {continue;}fire(window, contents); //如果fire,真正去计算窗口中的elements
            }if (triggerResult.isPurge()) {cleanup(window, windowState, null); //purge,即去cleanup elements} else {registerCleanupTimer(window);}}}
}

判断是否是late data的逻辑

protected boolean isLate(W window) {return (windowAssigner.isEventTime() && (cleanupTime(window) <= currentWatermark));
}
private long cleanupTime(W window) {long cleanupTime = window.maxTimestamp() + allowedLateness; //allowedLateness; return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
}

fire逻辑

private void fire(W window, ACC contents) throws Exception {timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());userFunction.apply(context.key, context.window, contents, timestampedCollector);
}

processWatermark

处理watermark,onEvent触发

@Override
public void processWatermark(Watermark mark) throws Exception {boolean fire;do {Timer<K, W> timer = watermarkTimersQueue.peek(); //这叫watermarkTimersQueue,是否有些歧义,叫eventTimerQueue更好理解些if (timer != null && timer.timestamp <= mark.getTimestamp()) {fire = true;watermarkTimers.remove(timer);watermarkTimersQueue.remove();context.key = timer.key;context.window = timer.window;setKeyContext(timer.key);  //stateBackend.setCurrentKey(key);
AppendingState<IN, ACC> windowState;MergingWindowSet<W> mergingWindows = null;if (windowAssigner instanceof MergingWindowAssigner) { //MergingWindowmergingWindows = getMergingWindowSet();W stateWindow = mergingWindows.getStateWindow(context.window);if (stateWindow == null) {// then the window is already purged and this is a cleanup// timer set due to allowed lateness that has nothing to clean,// so it is safe to just ignorecontinue;}windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);} else { //普通windowwindowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); //取得window的state
            }ACC contents = windowState.get();if (contents == null) {// if we have no state, there is nothing to docontinue;}TriggerResult triggerResult = context.onEventTime(timer.timestamp); //触发onEventif (triggerResult.isFire()) {fire(context.window, contents);}if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {cleanup(context.window, windowState, mergingWindows);}} else {fire = false;}} while (fire); //如果fire为true,继续看下个waterMarkTimer是否需要fire
output.emitWatermark(mark); //把waterMark传递下去this.currentWatermark = mark.getTimestamp(); //更新currentWaterMark
}

trigger

首先,这个函数的命名有问题,为何和前面的process…不匹配

这个是用来触发onProcessingTime,这个需要依赖系统时间的定时器来触发,逻辑和processWatermark基本等同,只是触发条件不一样

@Override
public void trigger(long time) throws Exception {boolean fire;//Remove information about the triggering task
    processingTimeTimerFutures.remove(time);processingTimeTimerTimestamps.remove(time, processingTimeTimerTimestamps.count(time));do {Timer<K, W> timer = processingTimeTimersQueue.peek();if (timer != null && timer.timestamp <= time) {fire = true;processingTimeTimers.remove(timer);processingTimeTimersQueue.remove();context.key = timer.key;context.window = timer.window;setKeyContext(timer.key);AppendingState<IN, ACC> windowState;MergingWindowSet<W> mergingWindows = null;if (windowAssigner instanceof MergingWindowAssigner) {mergingWindows = getMergingWindowSet();W stateWindow = mergingWindows.getStateWindow(context.window);if (stateWindow == null) {// then the window is already purged and this is a cleanup// timer set due to allowed lateness that has nothing to clean,// so it is safe to just ignorecontinue;}windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);} else {windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);}ACC contents = windowState.get();if (contents == null) {// if we have no state, there is nothing to docontinue;}TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);if (triggerResult.isFire()) {fire(context.window, contents);}if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {cleanup(context.window, windowState, mergingWindows);}} else {fire = false;}} while (fire);
}

EvictingWindowOperator

Evicting对于WindowOperator而言,就是多了Evictor

private void fire(W window, Iterable<StreamRecord<IN>> contents) throws Exception {timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());// Work around type system restrictions...int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window); //执行evict
FluentIterable<IN> projectedContents = FluentIterable.from(contents).skip(toEvict).transform(new Function<StreamRecord<IN>, IN>() {@Overridepublic IN apply(StreamRecord<IN> input) {return input.getValue();}});userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);
}

关键的逻辑就是在fire的时候,在apply function之前,会先remove需要evict的elements

转载于:https://www.cnblogs.com/fxjwind/p/6137608.html

Flink – window operator相关推荐

  1. Flink Window基本概念与实现原理

    Window意为窗口.在流处理系统中数据源源不断流入到系统,我们可以逐条处理流入的数据,也可以按一定规则一次处理流中的多条数据.当处理数据时程序需要知道什么时候开始处理.处理哪些数据.窗口提供了这样一 ...

  2. 1.16.Flink Window和Time详解、TimeWindow的应用、Window聚合分类之全量聚合、全量聚合状态变化过程-求最大值、Time介绍、EventTime和Watermarks等

    1.16.Flink Window和Time详解 1.16.1.Window(窗口) 1.16.2.Window的类型 1.16.3.Window类型汇总 1.16.4.TimeWindow的应用 1 ...

  3. Flink Window机制详解

    Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理.而窗口(window)就是从 Streaming 到 Batch ...

  4. Flink大数据实时计算系列-Flink的Operator Chains的优化机制

    Flink大数据实时计算系列-Flink的Operator Chains的优化机制 目录 Flink改变并行度 并行度改为3 并行度改为2 Flink Operator Chains Flink gr ...

  5. Flink大数据实时计算系列-Flink的Operator State与Keyed State的Redistribute

    Flink大数据实时计算系列-Flink的Operator State与Keyed State的Redistribute 目录 Flink的Operator State与Keyed State的Red ...

  6. 2021年大数据Flink(十八):Flink Window操作

    目录 ​​​​​​​Flink-Window操作 为什么需要Window Window的分类 按照time和count分类 ​​​​​​​按照slide和size分类 ​​​​​​​总结 Window ...

  7. flink window实例分析

    window是处理数据的核心.按需选择你需要的窗口类型后,它会将传入的原始数据流切分成多个buckets,所有计算都在window中进行. flink本身提供的实例程序TopSpeedWindowin ...

  8. Flink window 用法介绍

    Sink Flink没有类似spark中foreach方法 让用户进行迭代操作 虽有对外的输出操作 都要利用Sink完成 最后通过类似如下方式完成整个任务最终输出操作 stream.addSink(n ...

  9. Flink Window Function

    窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数. 文章目录 1.增量聚合函数 1.1 ReduceFunction 1.2 AggregateFu ...

最新文章

  1. 「交互式梦境」首次被验证:睡着后,还能回答数学问题
  2. 程序员辞职的7个常用理由,你用的是哪一个?
  3. Mac OSX使用VMware Fusion安装windows虚拟机教程
  4. 求一个向量变换为另一个向量的矩阵_OpenGL里旋转等变换矩阵为什么是4x4的矩阵...
  5. java oracle to date_用TRUNC和TO_DATE截斷oracle中的java格式化日期()
  6. Python | Numpy核心语法和代码整理汇总!
  7. 北大OJ(POJ 2795)金银岛
  8. 雷军发“玄妙”知识微博:暗示小米MIX4 将采用42W快充?
  9. python 打开本地程序发生异常_Python中的异常处理
  10. python des加密文件_python DES3 加密解密
  11. linux systemd命令,systemd命令
  12. 在LaTex中插入代码块
  13. 解决Git报“OpenSSL SSL_read: Connection was reset, errno 10054”错的问题
  14. Linux串口分析open
  15. asp.net大型制造业进销存源码 c#源代码 bs 本系统 为ASP.NET C# WinForm源码,数据库为SQL Server。系统完全开源,
  16. TOGAF9-certification简介
  17. 电路中的输入输出阻抗以及阻抗匹配
  18. mysql之mysql.sock文件
  19. Win7任务管理器进程一直在跳动选中不了的处理方法
  20. 岭南的一艘 “海盗船” 出海时扬起了单机游戏的帆

热门文章

  1. java面试题十九 判断题
  2. JavaScript高级笔记
  3. Linux系统关闭或重新启动主机的命令详解
  4. 程序员必知的8大排序(二)-------简单选择排序,堆排序(java实现)
  5. HBase 1.x Coprocessor使用指南
  6. 第07课:动手实战基于 ML 的中文短文本聚类
  7. DL论文第一周-Deep learning
  8. Luogu2495[SDOI2011]消耗战
  9. Focus on the Good 专注于好的方面
  10. android电池(五):电池 充电IC(PM2301)驱动分析篇【转】