话不多说,直接上手今天的主题,探索一个容易让人忽略和困惑的问题:Flink 时间窗口的起始时间

就以最简单的demo为例:

timeWindow(Time.seconds(5))

上述定义一个步长为5s的滚动窗口,就以这个简单的入口进入Flink的源码开始探索

1)timeWindow的定义

public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {return window(TumblingProcessingTimeWindows.of(size));} else {return window(TumblingEventTimeWindows.of(size));}
}

这段源码比较贴近大众,就是一个普通的判断,而且environment.getStreamTimeCharacteristic()这个东西我们再熟悉不过了,判断当前是ProcessingTime还是EventTime,当然除了EventTime还有IngestionTime,但是比较常用的还是ProcessingTime和EventTime,所以我们就非ProcessingTime即EventTime这样理解,因为生产环境比较常用的是EventTime,所以我们就进入else的代码继续查看

2)TumblingEventTimeWindows的定义

window(TumblingEventTimeWindows.of(size))这段代码,window利用TumblingEventTimeWindows来分配元素,所以我们要了解的核心是TumblingEventTimeWindows.of(size)的定义

public static TumblingEventTimeWindows of(Time size) {return new TumblingEventTimeWindows(size.toMilliseconds(), 0);
}protected TumblingEventTimeWindows(long size, long offset) {if (Math.abs(offset) >= size) {throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy abs(offset) < size");}this.size = size;this.offset = offset;
}

可以看到通过of方法我们构建了一个offset为0,size为5的TumblingEventTimeWindows对象,然后就是我们需要的核心方法,assignWindows,窗口分配元素的核心方法

@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {if (timestamp > Long.MIN_VALUE) {// Long.MIN_VALUE is currently assigned when no timestamp is presentlong start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);return Collections.singletonList(new TimeWindow(start, start + size));} else {throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +"'DataStream.assignTimestampsAndWatermarks(...)'?");}
}

重点来了

long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
/*** Method to get the window start for a timestamp.** @param timestamp epoch millisecond to get the window start.* @param offset The offset which window start would be shifted by.* @param windowSize The size of the generated windows.* @return window start
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {return timestamp - (timestamp - offset + windowSize) % windowSize;
}

Method to get the window start for a timestamp.翻译过来就是这个方法用来获取窗口的开始时间戳

核心算法就是

timestamp - (timestamp - offset + windowSize) % windowSize

上一段代码小测一下

  case class TestData(timestamp:Long,word:String)def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 便于输出,设置并行度为1env.setParallelism(1)val socketStream = env.socketTextStream("localhost",9999)val windowedStream = socketStream.map(row=>TestData(row.split(" ")(0).toLong,row.split(" ")(0))).assignTimestampsAndWatermarks(new             BoundedOutOfOrdernessTimestampExtractor[TestData](Time.seconds(1)) {override def extractTimestamp(element: TestData): Long = element.timestamp * 1000}).keyBy(_.word).timeWindow(Time.seconds(5)).reduce((r1,r2)=>TestData(r1.timestamp,"hello "+r2.word))windowedStream.print("window output is")socketStream.print("input data is")env.execute("window_test_job")}
准备一下测试数据
1599623712 word(2020-09-09 11:55:12)
1599623715 word(2020-09-09 11:55:15)
根据公式算出开始时间:1599623712 - (1599623712 - 0 + 5) % 5 == 1599623710也就是开始时间为 1599623710,步长为5s,也就是下次触发窗口计算为1599623715

验证一下:
    nc录入数据:

1599623712 word
1599623715 word

控制台输出结果:

input data is> 1599623712 word
input data is> 1599623715 word
window output is> TestData(1599623712,word)

结果验证了公式结果即为窗口的开始时间,ProcessingTime与之类似就不测试了,其实也可以看到公式的计算结果一般为自然时间的开始,如2020-09-09 11:55:12的开始时间为2020-09-09 11:55:10

【随记】Flink 时间窗口的起始时间相关推荐

  1. vue elementUI 时间控件优化 选择起始时间不能在结束时间之后,结束时间不能在起始时间之前

    时间控件优化 elementUI 选择起始时间不能在结束时间之后,结束时间不能在起始时间之前 <el-form-item label="起始时间:" class=" ...

  2. sentinel 时间窗口的实现

    本文的github地址点击这里 获取时间窗口的主要流程 在 Sentinel 中,主要是通过 LeapArray 类来实现滑动时间窗口的实现和选择.在 sentinel 的这个获取时间窗口并为时间窗口 ...

  3. Flink窗口全解析:三种时间窗口、窗口处理函数使用及案例

    我们经常需要在一个时间窗口维度上对数据进行聚合,窗口是流处理应用中经常需要解决的问题.Flink的窗口算子为我们提供了方便易用的API,我们可以将数据流切分成一个个窗口,对窗口内的数据进行处理.本文将 ...

  4. Flink从入门到真香(12、Flink一大利器-时间窗口)

    flink中支持多种窗口,包括:时间窗口,session窗口,统计窗口等等,能想到的基本都可以实现 时间窗口(Time Windows) 最简单常用的窗口形式是基于时间的窗口,flink支持三种种时间 ...

  5. 【Flink】各种窗口的使用(处理时间窗口、事件时间窗口、窗口聚合窗口)

    文章目录 一 Flink 中的 Window 1 Window (1)Window概述 (2) Window类型 a 滚动窗口(Tumbling Windows) b 滑动窗口(Sliding Win ...

  6. 2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析

    目录 事件时间窗口分析 时间概念 ​​​​​​​event-time ​​​​​​​延迟数据处理 ​​​​​​​延迟数据 ​​​​​​​Watermarking 水位 ​​​​​​​官方案例演示 事件 ...

  7. sentinel滑动时间窗口算法学习

    滑动时间窗口 先不说sentinel的算法实现,先说什么是滑动时间窗口, 我们在进行限流的时候,比如通过QPS进行限流,那假如我们以秒为单位,举个例子: 我设置了限流规则,qps是10 如果不使用滑动 ...

  8. 时间窗口(Time Windows)的原理和使用

    概述 flink中支持多种窗口,包括:时间窗口,session窗口,count窗口等,本文简单介绍这些窗口的原理,并通过例子说明如何使用这些窗口. 时间窗口(Time Windows) 最简单常用的窗 ...

  9. iOS-自定义起始时间选择器视图

    概述 自定义起始时间选择器视图, 调起时间选择器, 传值(起始时间/截止时间), 两者时间均要合理, 不能超过未来时间, 并且起始时间不能大于截止时间. 点击取消或空白处收起时间选择器. 详细 代码下 ...

最新文章

  1. centos7 设置中文
  2. 区块链的价值在于建立信任,而ICO却在摧毁信任
  3. Swift の 函数式编程
  4. puppet 基础篇
  5. spring boot(一)入门
  6. 编写第一个Linux环境下程序的编译,下载记录
  7. “听话”的苏宁少东家
  8. Python for循环倒序遍历列表
  9. 【Linux】yum卸载恢复
  10. PhpStorm在上传文件的时候提示 Upload to 虚拟机 failed: invalid descendent file name \.
  11. bzoj 1022: [SHOI2008]小约翰的游戏John(反nim游戏)
  12. 智慧工地系统包括哪些部分
  13. nvcc: command not found
  14. 超神学院暗质计算机,超神学院之黑白守护者
  15. android手电筒功能吗,android通过led实现手电筒功能
  16. WIN7修改“桌面”存储位置
  17. 【Paper】2017_Consensus of linear multi-agent systems with exogenous disturbance generated from hetero
  18. mysql 2038年问题_关于PHP转换超过2038年日期出错的问题解决
  19. 下一半***清除全集
  20. react 添加css_在JS中使用情感CSS将暗模式添加到您的React应用中

热门文章

  1. type-c转HDMI+PD+USB3.0多合一拓展转换芯片
  2. 使用NW——Dragging files into page
  3. UIScrollView Dragging Decelerating Delegate
  4. MATLAB图像去噪算法设计
  5. Linux-ifcfg-eth0配置介绍
  6. 正基AP6XXX系列模块硬件排查方法
  7. 阿里云镜像地址网易云镜像
  8. 三类IP地址的私有地址
  9. 聆听(Monitor)系列驱动下载方法
  10. 严重违法失信?额度不够用?老赖惩治措施?你知道几条?