为什么80%的码农都做不了架构师?>>>   

本文主要研究一下flink的Tumbling Window

WindowAssigner

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java

@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {private static final long serialVersionUID = 1L;/*** Returns a {@code Collection} of windows that should be assigned to the element.** @param element The element to which windows should be assigned.* @param timestamp The timestamp of the element.* @param context The {@link WindowAssignerContext} in which the assigner operates.*/public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);/*** Returns the default trigger associated with this {@code WindowAssigner}.*/public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);/*** Returns a {@link TypeSerializer} for serializing windows that are assigned by* this {@code WindowAssigner}.*/public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);/*** Returns {@code true} if elements are assigned to windows based on event time,* {@code false} otherwise.*/public abstract boolean isEventTime();/*** A context provided to the {@link WindowAssigner} that allows it to query the* current processing time.** <p>This is provided to the assigner by its containing* {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},* which, in turn, gets it from the containing* {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.*/public abstract static class WindowAssignerContext {/*** Returns the current processing time.*/public abstract long getCurrentProcessingTime();}
}
  • WindowAssigner定义了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime这几个抽象方法,同时定义了抽象静态类WindowAssignerContext;它有两个泛型,其中T为元素类型,而W为窗口类型

Window

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/Window.java

@PublicEvolving
public abstract class Window {/*** Gets the largest timestamp that still belongs to this window.** @return The largest timestamp that still belongs to this window.*/public abstract long maxTimestamp();
}
  • Window对象代表把无限流数据划分为有限buckets的集合,它有一个maxTimestamp,代表该窗口数据在该时间点内到达;它有两个子类,一个是GlobalWindow,一个是TimeWindow

TimeWindow

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java

@PublicEvolving
public class TimeWindow extends Window {private final long start;private final long end;public TimeWindow(long start, long end) {this.start = start;this.end = end;}/*** Gets the starting timestamp of the window. This is the first timestamp that belongs* to this window.** @return The starting timestamp of this window.*/public long getStart() {return start;}/*** Gets the end timestamp of this window. The end timestamp is exclusive, meaning it* is the first timestamp that does not belong to this window any more.** @return The exclusive end timestamp of this window.*/public long getEnd() {return end;}/*** Gets the largest timestamp that still belongs to this window.** <p>This timestamp is identical to {@code getEnd() - 1}.** @return The largest timestamp that still belongs to this window.** @see #getEnd()*/@Overridepublic long maxTimestamp() {return end - 1;}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (o == null || getClass() != o.getClass()) {return false;}TimeWindow window = (TimeWindow) o;return end == window.end && start == window.start;}@Overridepublic int hashCode() {return MathUtils.longToIntWithBitMixing(start + end);}@Overridepublic String toString() {return "TimeWindow{" +"start=" + start +", end=" + end +'}';}/*** Returns {@code true} if this window intersects the given window.*/public boolean intersects(TimeWindow other) {return this.start <= other.end && this.end >= other.start;}/*** Returns the minimal window covers both this window and the given window.*/public TimeWindow cover(TimeWindow other) {return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end));}// ------------------------------------------------------------------------// Serializer// ------------------------------------------------------------------------//......// ------------------------------------------------------------------------//  Utilities// ------------------------------------------------------------------------/*** Merge overlapping {@link TimeWindow}s. For use by merging* {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners}.*/public static void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {// sort the windows by the start time and then merge overlapping windowsList<TimeWindow> sortedWindows = new ArrayList<>(windows);Collections.sort(sortedWindows, new Comparator<TimeWindow>() {@Overridepublic int compare(TimeWindow o1, TimeWindow o2) {return Long.compare(o1.getStart(), o2.getStart());}});List<Tuple2<TimeWindow, Set<TimeWindow>>> merged = new ArrayList<>();Tuple2<TimeWindow, Set<TimeWindow>> currentMerge = null;for (TimeWindow candidate: sortedWindows) {if (currentMerge == null) {currentMerge = new Tuple2<>();currentMerge.f0 = candidate;currentMerge.f1 = new HashSet<>();currentMerge.f1.add(candidate);} else if (currentMerge.f0.intersects(candidate)) {currentMerge.f0 = currentMerge.f0.cover(candidate);currentMerge.f1.add(candidate);} else {merged.add(currentMerge);currentMerge = new Tuple2<>();currentMerge.f0 = candidate;currentMerge.f1 = new HashSet<>();currentMerge.f1.add(candidate);}}if (currentMerge != null) {merged.add(currentMerge);}for (Tuple2<TimeWindow, Set<TimeWindow>> m: merged) {if (m.f1.size() > 1) {c.merge(m.f1, m.f0);}}}/*** Method to get the window start for a timestamp.** @param timestamp epoch millisecond to get the window start.* @param offset The offset which window start would be shifted by.* @param windowSize The size of the generated windows.* @return window start*/public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {return timestamp - (timestamp - offset + windowSize) % windowSize;}
}
  • TimeWindow有start及end属性,其中start为inclusive,而end为exclusive,所以maxTimestamp返回的是end-1;这里重写了equals及hashcode方法
  • TimeWindow提供了intersects方法用于表示本窗口与指定窗口是否有交叉;而cover方法用于返回本窗口与指定窗口的重叠窗口
  • TimeWindow还提供了mergeWindows及getWindowStartWithOffset静态方法;前者用于合并重叠的时间窗口,后者用于获取指定timestamp、offset、windowSize的window start

TumblingEventTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java

@PublicEvolving
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {private static final long serialVersionUID = 1L;private final long size;private final long offset;protected TumblingEventTimeWindows(long size, long offset) {if (offset < 0 || offset >= size) {throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");}this.size = size;this.offset = offset;}@Overridepublic Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {if (timestamp > Long.MIN_VALUE) {// Long.MIN_VALUE is currently assigned when no timestamp is presentlong start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);return Collections.singletonList(new TimeWindow(start, start + size));} else {throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +"'DataStream.assignTimestampsAndWatermarks(...)'?");}}@Overridepublic Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {return EventTimeTrigger.create();}@Overridepublic String toString() {return "TumblingEventTimeWindows(" + size + ")";}public static TumblingEventTimeWindows of(Time size) {return new TumblingEventTimeWindows(size.toMilliseconds(), 0);}public static TumblingEventTimeWindows of(Time size, Time offset) {return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());}@Overridepublic TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {return new TimeWindow.Serializer();}@Overridepublic boolean isEventTime() {return true;}
}
  • TumblingEventTimeWindows继承了Window,其中元素类型为Object,而窗口类型为TimeWindow;它有两个参数,一个是size,一个是offset,其中offset必须大于等于0,size必须大于offset
  • assignWindows方法获取的窗口为start及start+size,而start=TimeWindow.getWindowStartWithOffset(timestamp, offset, size);getDefaultTrigger方法返回的是EventTimeTrigger;getWindowSerializer方法返回的是TimeWindow.Serializer();isEventTime返回true
  • TumblingEventTimeWindows提供了of静态工厂方法,可以指定size及offset参数

TumblingProcessingTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {private static final long serialVersionUID = 1L;private final long size;private final long offset;private TumblingProcessingTimeWindows(long size, long offset) {if (offset < 0 || offset >= size) {throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy  0 <= offset < size");}this.size = size;this.offset = offset;}@Overridepublic Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {final long now = context.getCurrentProcessingTime();long start = TimeWindow.getWindowStartWithOffset(now, offset, size);return Collections.singletonList(new TimeWindow(start, start + size));}public long getSize() {return size;}@Overridepublic Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {return ProcessingTimeTrigger.create();}@Overridepublic String toString() {return "TumblingProcessingTimeWindows(" + size + ")";}public static TumblingProcessingTimeWindows of(Time size) {return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0);}public static TumblingProcessingTimeWindows of(Time size, Time offset) {return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds());}@Overridepublic TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {return new TimeWindow.Serializer();}@Overridepublic boolean isEventTime() {return false;}
}
  • TumblingProcessingTimeWindows继承了WindowAssigner,其中元素类型为Object,而窗口类型为TimeWindow;它有两个参数,一个是size,一个是offset,其中offset必须大于等于0,size必须大于offset
  • assignWindows方法获取的窗口为start及start+size,而start=TimeWindow.getWindowStartWithOffset(now, offset, size),而now值则为context.getCurrentProcessingTime(),则是与TumblingEventTimeWindows的不同之处,TumblingProcessingTimeWindows不使用timestamp参数来计算,它使用now值替代;getDefaultTrigger方法返回的是ProcessingTimeTrigger,而isEventTime方法返回的为false
  • TumblingProcessingTimeWindows也提供了of静态工厂方法,可以指定size及offset参数

小结

  • flink的Tumbling Window分为TumblingEventTimeWindows及TumblingProcessingTimeWindows,它们都继承了WindowAssigner,其中元素类型为Object,而窗口类型为TimeWindow;它有两个参数,一个是size,一个是offset,其中offset必须大于等于0,size必须大于offset
  • WindowAssigner定义了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime这几个抽象方法,同时定义了抽象静态类WindowAssignerContext;它有两个泛型,其中T为元素类型,而W为窗口类型;TumblingEventTimeWindows及TumblingProcessingTimeWindows的窗口类型为TimeWindow,它有start及end属性,其中start为inclusive,而end为exclusive,maxTimestamp返回的是end-1,它还提供了mergeWindows及getWindowStartWithOffset静态方法;前者用于合并重叠的时间窗口,后者用于获取指定timestamp、offset、windowSize的window start
  • TumblingEventTimeWindows及TumblingProcessingTimeWindows的不同在于assignWindows、getDefaultTrigger、isEventTime方法;前者assignWindows使用的是参数中的timestamp,而后者使用的是now值;前者的getDefaultTrigger返回的是EventTimeTrigger,而后者返回的是ProcessingTimeTrigger;前者isEventTime方法返回的为true,而后者返回的为false

doc

  • Tumbling Windows

转载于:https://my.oschina.net/go4it/blog/2995872

聊聊flink的Tumbling Window相关推荐

  1. Flink中的window知识体系与scala完整案例

    [1]中得到大类,插图来自[2] 窗口大类(官方) 子分类 数据是否在窗口之间重叠 Time Windows Tumbling Windows Sliding Windows Count Window ...

  2. flink sql 知其所以然(八):flink sql tumble window 的奇妙解析之路

    感谢您的小爱心(关注  +  点赞 + 再看),对博主的肯定,会督促博主持续的输出更多的优质实战内容!!! 1.序篇-本文结构 大数据羊说 用数据提升美好事物发生的概率~ 34篇原创内容 公众号 源码 ...

  3. Flink之窗口 (Window) 上篇

    我们已经了解了 Flink 中事件时间和水位线的概念,那它们有什么具体应用呢?当然是做基于时间的处理计算了.其中最常见的场景,就是窗口聚合计算. 之前我们已经了解了 Flink 中基本的聚合操作.在流 ...

  4. 【基础】Flink -- Time and Window

    Flink -- Time and Window Flink 时间语义 水位线 Watermark 水位线的概念 有序流中的水位线 乱序流中的水位线 水位线的特性 水位线的基本使用 水位线生成策略 内 ...

  5. 聊聊flink的FsStateBackend

    序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...

  6. 彻底搞清 Flink 中的 Window 机制

    [CSDN 编者按]Window是处理无限流的核心.Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理.Flink提 ...

  7. 彻底搞清Flink中的Window(Flink版本1.8)

    flink-window 窗口 在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理.当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分 ...

  8. 聊聊flink的TimeCharacteristic

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的TimeCharacteristic TimeCharacteristic flink-streami ...

  9. 聊聊flink Table的groupBy操作

    序 本文主要研究一下flink Table的groupBy操作 Table.groupBy flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/t ...

最新文章

  1. 关于运算符重载的问题
  2. 【题解】洛谷P1541 [NOIP2010TG] 乌龟棋(类似背包的DP)
  3. HDU - 3551 Hard Problem(一般图最大匹配)
  4. 在kubernetes 集群内访问k8s API服务
  5. top、postop、scrolltop、scrollHeight、offsetHeight
  6. java excel条件格式_Java 使用条件格式高亮Excel单元格
  7. pythonATM,购物车项目实战_补充7-start.py
  8. PHP漏洞全解—————9、文件上传漏洞
  9. 关于点击button按钮自动刷新页面导致ajax来不及执行的原因
  10. aix中c语言生成pdf文档,利用PDFLib生成PDF文档
  11. matlab建模和仿真实验,MATLAB-Simulink系统建模与仿真-实验报告
  12. Activiti流程引擎与业务整合方案
  13. 官方通知:考研国家线预计4月中旬公布
  14. Windows配置Rsync同步,安装cwRsync
  15. Java中Date时区的转换
  16. 中国AI监控摄像头市场现状研究分析与发展前景预测报告(2022)
  17. css之“css3的新特性”
  18. MySQL从删库到跑路(9):group by——给漂亮小姐姐分个组
  19. 2017年内大892数据结构部分参考答案
  20. js自定义提示框弹窗

热门文章

  1. WebMisDeveloper4.2.0面世
  2. UA MATH567 高维统计专题1 稀疏信号及其恢复5 LASSO的估计误差
  3. MFC CString转换为字符数组
  4. VC++非MFC项目中使用TRACE宏
  5. 第一个Node.js实例
  6. 调试一个C#研究生管理信息系统源码
  7. .net 插件式开发学习总结
  8. 日期转换器和编码过滤器
  9. Windows Server 2012 RS 配置IIS8.0+发布网站
  10. 网络爬虫requests-bs4-re-1