【随记】Flink 时间窗口的起始时间
话不多说,直接上手今天的主题,探索一个容易让人忽略和困惑的问题:Flink 时间窗口的起始时间
就以最简单的demo为例:
timeWindow(Time.seconds(5))
上述定义一个步长为5s的滚动窗口,就以这个简单的入口进入Flink的源码开始探索
1)timeWindow的定义
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {return window(TumblingProcessingTimeWindows.of(size));} else {return window(TumblingEventTimeWindows.of(size));}
}
这段源码比较贴近大众,就是一个普通的判断,而且environment.getStreamTimeCharacteristic()这个东西我们再熟悉不过了,判断当前是ProcessingTime还是EventTime,当然除了EventTime还有IngestionTime,但是比较常用的还是ProcessingTime和EventTime,所以我们就非ProcessingTime即EventTime这样理解,因为生产环境比较常用的是EventTime,所以我们就进入else的代码继续查看
2)TumblingEventTimeWindows的定义
window(TumblingEventTimeWindows.of(size))这段代码,window利用TumblingEventTimeWindows来分配元素,所以我们要了解的核心是TumblingEventTimeWindows.of(size)的定义
public static TumblingEventTimeWindows of(Time size) {return new TumblingEventTimeWindows(size.toMilliseconds(), 0);
}protected TumblingEventTimeWindows(long size, long offset) {if (Math.abs(offset) >= size) {throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy abs(offset) < size");}this.size = size;this.offset = offset;
}
可以看到通过of方法我们构建了一个offset为0,size为5的TumblingEventTimeWindows对象,然后就是我们需要的核心方法,assignWindows,窗口分配元素的核心方法
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {if (timestamp > Long.MIN_VALUE) {// Long.MIN_VALUE is currently assigned when no timestamp is presentlong start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);return Collections.singletonList(new TimeWindow(start, start + size));} else {throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +"'DataStream.assignTimestampsAndWatermarks(...)'?");}
}
重点来了
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
/*** Method to get the window start for a timestamp.** @param timestamp epoch millisecond to get the window start.* @param offset The offset which window start would be shifted by.* @param windowSize The size of the generated windows.* @return window start
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {return timestamp - (timestamp - offset + windowSize) % windowSize;
}
Method to get the window start for a timestamp.翻译过来就是这个方法用来获取窗口的开始时间戳
核心算法就是
timestamp - (timestamp - offset + windowSize) % windowSize
上一段代码小测一下
case class TestData(timestamp:Long,word:String)def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 便于输出,设置并行度为1env.setParallelism(1)val socketStream = env.socketTextStream("localhost",9999)val windowedStream = socketStream.map(row=>TestData(row.split(" ")(0).toLong,row.split(" ")(0))).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TestData](Time.seconds(1)) {override def extractTimestamp(element: TestData): Long = element.timestamp * 1000}).keyBy(_.word).timeWindow(Time.seconds(5)).reduce((r1,r2)=>TestData(r1.timestamp,"hello "+r2.word))windowedStream.print("window output is")socketStream.print("input data is")env.execute("window_test_job")}
准备一下测试数据 1599623712 word(2020-09-09 11:55:12) 1599623715 word(2020-09-09 11:55:15)
根据公式算出开始时间:1599623712 - (1599623712 - 0 + 5) % 5 == 1599623710也就是开始时间为 1599623710,步长为5s,也就是下次触发窗口计算为1599623715
验证一下:
nc录入数据:1599623712 word 1599623715 word
控制台输出结果:
input data is> 1599623712 word input data is> 1599623715 word window output is> TestData(1599623712,word)
结果验证了公式结果即为窗口的开始时间,ProcessingTime与之类似就不测试了,其实也可以看到公式的计算结果一般为自然时间的开始,如2020-09-09 11:55:12的开始时间为2020-09-09 11:55:10
【随记】Flink 时间窗口的起始时间相关推荐
- vue elementUI 时间控件优化 选择起始时间不能在结束时间之后,结束时间不能在起始时间之前
时间控件优化 elementUI 选择起始时间不能在结束时间之后,结束时间不能在起始时间之前 <el-form-item label="起始时间:" class=" ...
- sentinel 时间窗口的实现
本文的github地址点击这里 获取时间窗口的主要流程 在 Sentinel 中,主要是通过 LeapArray 类来实现滑动时间窗口的实现和选择.在 sentinel 的这个获取时间窗口并为时间窗口 ...
- Flink窗口全解析:三种时间窗口、窗口处理函数使用及案例
我们经常需要在一个时间窗口维度上对数据进行聚合,窗口是流处理应用中经常需要解决的问题.Flink的窗口算子为我们提供了方便易用的API,我们可以将数据流切分成一个个窗口,对窗口内的数据进行处理.本文将 ...
- Flink从入门到真香(12、Flink一大利器-时间窗口)
flink中支持多种窗口,包括:时间窗口,session窗口,统计窗口等等,能想到的基本都可以实现 时间窗口(Time Windows) 最简单常用的窗口形式是基于时间的窗口,flink支持三种种时间 ...
- 【Flink】各种窗口的使用(处理时间窗口、事件时间窗口、窗口聚合窗口)
文章目录 一 Flink 中的 Window 1 Window (1)Window概述 (2) Window类型 a 滚动窗口(Tumbling Windows) b 滑动窗口(Sliding Win ...
- 2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析
目录 事件时间窗口分析 时间概念 event-time 延迟数据处理 延迟数据 Watermarking 水位 官方案例演示 事件 ...
- sentinel滑动时间窗口算法学习
滑动时间窗口 先不说sentinel的算法实现,先说什么是滑动时间窗口, 我们在进行限流的时候,比如通过QPS进行限流,那假如我们以秒为单位,举个例子: 我设置了限流规则,qps是10 如果不使用滑动 ...
- 时间窗口(Time Windows)的原理和使用
概述 flink中支持多种窗口,包括:时间窗口,session窗口,count窗口等,本文简单介绍这些窗口的原理,并通过例子说明如何使用这些窗口. 时间窗口(Time Windows) 最简单常用的窗 ...
- iOS-自定义起始时间选择器视图
概述 自定义起始时间选择器视图, 调起时间选择器, 传值(起始时间/截止时间), 两者时间均要合理, 不能超过未来时间, 并且起始时间不能大于截止时间. 点击取消或空白处收起时间选择器. 详细 代码下 ...
最新文章
- centos7 设置中文
- 区块链的价值在于建立信任,而ICO却在摧毁信任
- Swift の 函数式编程
- puppet 基础篇
- spring boot(一)入门
- 编写第一个Linux环境下程序的编译,下载记录
- “听话”的苏宁少东家
- Python for循环倒序遍历列表
- 【Linux】yum卸载恢复
- PhpStorm在上传文件的时候提示 Upload to 虚拟机 failed: invalid descendent file name \.
- bzoj 1022: [SHOI2008]小约翰的游戏John(反nim游戏)
- 智慧工地系统包括哪些部分
- nvcc: command not found
- 超神学院暗质计算机,超神学院之黑白守护者
- android手电筒功能吗,android通过led实现手电筒功能
- WIN7修改“桌面”存储位置
- 【Paper】2017_Consensus of linear multi-agent systems with exogenous disturbance generated from hetero
- mysql 2038年问题_关于PHP转换超过2038年日期出错的问题解决
- 下一半***清除全集
- react 添加css_在JS中使用情感CSS将暗模式添加到您的React应用中