Flink窗口全解析:三种时间窗口、窗口处理函数使用及案例
我们经常需要在一个时间窗口维度上对数据进行聚合,窗口是流处理应用中经常需要解决的问题。Flink的窗口算子为我们提供了方便易用的API,我们可以将数据流切分成一个个窗口,对窗口内的数据进行处理。本文将介绍如何在Flink上进行窗口的计算。
一个Flink窗口应用的大致骨架结构如下所示:
// Keyed Window
stream.keyBy(...) <- 按照一个Key进行分组.window(...) <- 将数据流中的元素分配到相应的窗口中[.trigger(...)] <- 指定触发器Trigger(可选)[.evictor(...)] <- 指定清除器Evictor(可选).reduce/aggregate/process() <- 窗口处理函数Window Function// Non-Keyed Window
stream.windowAll(...) <- 不分组,将数据流中的所有元素分配到相应的窗口中[.trigger(...)] <- 指定触发器Trigger(可选)[.evictor(...)] <- 指定清除器Evictor(可选).reduce/aggregate/process() <- 窗口处理函数Window Function
首先,我们要决定是否对一个DataStream
按照Key进行分组,这一步必须在窗口计算之前进行。经过keyBy
的数据流将形成多组数据,下游算子的多个实例可以并行计算。windowAll
不对数据流进行分组,所有数据将发送到下游算子单个实例上。决定是否分组之后,窗口的后续操作基本相同,本文所涉及内容主要针对经过keyBy
的窗口(Keyed Window),经过windowAll
的算子是不分组的窗口(Non-Keyed Window),它们的原理和操作与Keyed Window类似,唯一的区别在于所有数据将发送给下游的单个实例,或者说下游算子的并行度为1。
Flink窗口的骨架结构中有两个必须的两个操作:
- 使用窗口分配器(WindowAssigner)将数据流中的元素分配到对应的窗口。
- 当满足窗口触发条件后,对窗口内的数据使用窗口处理函数(Window Function)进行处理,常用的Window Function有
reduce
、aggregate
、process
。
其他的trigger
、evictor
则是窗口的触发和销毁过程中的附加选项,主要面向需要更多自定义的高级编程者,如果不设置则会使用默认的配置。
上图是窗口的生命周期示意图,假如我们设置的是一个10分钟的滚动窗口,第一个窗口的起始时间是0:00,结束时间是0:10,后面以此类推。当数据流中的元素流入后,窗口分配器会根据时间(Event Time或Processing Time)分配给相应的窗口。相应窗口满足了触发条件,比如已经到了窗口的结束时间,会触发相应的Window Function进行计算。注意,本图只是一个大致示意图,不同的Window Function的处理方式略有不同。
从数据类型上来看,一个DataStream
经过keyBy
转换成KeyedStream
,再经过window
转换成WindowedStream
,我们要在之上进行reduce
、aggregate
或process
等Window Function,对数据进行必要的聚合操作。
WindowAssigner
Count-based Window根据事件到达窗口的先后顺序管理窗口,到达窗口的先后顺序和Event Time并不一致,因此Count-based Window的结果具有不确定性。
滚动窗口
val input: DataStream[T] = ...// tumbling event-time windows
input.keyBy(...).window(TumblingEventTimeWindows.of(Time.seconds(5))).<window function>(...)// tumbling processing-time windows
input.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).<window function>(...)// 1 hour tumbling event-time windows offset by 15 minutes.
input.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15))).<window function>(...)
滑动窗口
跟前面介绍的一样,我们使用Time
类中的时间单位来定义Slide和Size,也可以设置offset。同样,timeWindow
是一种缩写,根据执行环境中设置的时间语义来选择相应的方法初始化窗口。
val input: DataStream[T] = ...// sliding event-time windows
input.keyBy(...).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).<window function>(...)// sliding processing-time windows
input.keyBy(<...>).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).<window function>(...)// sliding processing-time windows offset by -8 hours
input.keyBy(<...>).window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))).<window function>(...)
会话窗口
val input: DataStream[T] = ...// event-time session windows with static gap
input.keyBy(...).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<window function>(...)// event-time session windows with dynamic gap
input.keyBy(...).window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {override def extract(element: T): Long = {// determine and return session gap}})).<window function>(...)// processing-time session windows with static gap
input.keyBy(...).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).<window function>(...)// processing-time session windows with dynamic gap
input.keyBy(...).window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {override def extract(element: T): Long = {// determine and return session gap}})).<window function>(...)
窗口函数
ReduceFunction
case class StockPrice(symbol: String, price: Double)val input: DataStream[StockPrice] = ...senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)// reduce的返回类型必须和输入类型StockPrice一致
val sum = input.keyBy(s => s.symbol).timeWindow(Time.seconds(10)).reduce((s1, s2) => StockPrice(s1.symbol, s1.price + s2.price))
AggregateFunction
AggregateFunction
也是一种增量计算窗口函数,也只保存了一个中间状态数据,但AggregateFunction
使用起来更复杂一些。我们看一下它的源码定义:
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {// 在一次新的aggregate发起时,创建一个新的Accumulator,Accumulator是我们所说的中间状态数据,简称ACC// 这个函数一般在初始化时调用ACC createAccumulator();// 当一个新元素流入时,将新元素与状态数据ACC合并,返回状态数据ACCACC add(IN value, ACC accumulator);// 将两个ACC合并ACC merge(ACC a, ACC b);// 将中间数据转成结果数据OUT getResult(ACC accumulator);}
case class StockPrice(symbol: String, price: Double)// IN: StockPrice
// ACC:(String, Double, Int) - (symbol, sum, count)
// OUT: (String, Double) - (symbol, average)
class AverageAggregate extends AggregateFunction[StockPrice, (String, Double, Int), (String, Double)] {override def createAccumulator() = ("", 0, 0)override def add(item: StockPrice, accumulator: (String, Double, Int)) =(item.symbol, accumulator._2 + item.price, accumulator._3 + 1)override def getResult(accumulator:(String, Double, Int)) = (accumulator._1 ,accumulator._2 / accumulator._3)override def merge(a: (String, Double, Int), b: (String, Double, Int)) =(a._1 ,a._2 + b._2, a._3 + b._3)
}senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)val input: DataStream[StockPrice] = ...val average = input.keyBy(s => s.symbol).timeWindow(Time.seconds(10)).aggregate(new AverageAggregate)
ProcessWindowFunction
/*** IN 输入类型* OUT 输出类型* KEY keyBy中按照Key分组,Key的类型* W 窗口的类型*/
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {/*** 对一个窗口内的元素进行处理,窗口内的元素缓存在Iterable<IN>,进行处理后输出到Collector<OUT>中* 我们可以输出一到多个结果*/public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;/** * 当窗口执行完毕被清理时,删除各类状态数据。*/public void clear(Context context) throws Exception {}/*** 一个窗口的上下文,包含窗口的一些元数据、状态数据等。*/public abstract class Context implements java.io.Serializable {// 返回当前正在处理的Windowpublic abstract W window();// 返回当前Process Timepublic abstract long currentProcessingTime();// 返回当前Event Time对应的Watermarkpublic abstract long currentWatermark();// 返回某个Key下的某个Window的状态public abstract KeyedStateStore windowState();// 返回某个Key下的全局状态public abstract KeyedStateStore globalState();// 迟到数据发送到其他位置public abstract <X> void output(OutputTag<X> outputTag, X value);}
}
下面的代码是一个ProcessWindowFunction
的简单应用,我们对价格出现的次数做了统计,选出出现次数最多的输出出来。
case class StockPrice(symbol: String, price: Double)class FrequencyProcessFunction extends ProcessWindowFunction[StockPrice, (String, Double), String, TimeWindow] {override def process(key: String, context: Context, elements: Iterable[StockPrice], out: Collector[(String, Double)]): Unit = {// 股票价格和该价格出现的次数var countMap = scala.collection.mutable.Map[Double, Int]()for(element <- elements) {val count = countMap.getOrElse(element.price, 0)countMap(element.price) = count + 1}// 按照出现次数从高到低排序val sortedMap = countMap.toSeq.sortWith(_._2 > _._2)// 选出出现次数最高的输出到Collectorif (sortedMap.size > 0) {out.collect((key, sortedMap(0)._1))}}
}senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)val input: DataStream[StockPrice] = ...val frequency = input.keyBy(s => s.symbol).timeWindow(Time.seconds(10)).process(new FrequencyProcessFunction)
ProcessWindowFunction与增量计算相结合
case class StockPrice(symbol: String, price: Double)case class MaxMinPrice(symbol: String, max: Double, min: Double, windowEndTs: Long)class WindowEndProcessFunction extends ProcessWindowFunction[(String, Double, Double), MaxMinPrice, String, TimeWindow] {override def process(key: String,context: Context,elements: Iterable[(String, Double, Double)],out: Collector[MaxMinPrice]): Unit = {val maxMinItem = elements.headval windowEndTs = context.window.getEndout.collect(MaxMinPrice(key, maxMinItem._2, maxMinItem._3, windowEndTs))}}senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)val input: DataStream[StockPrice] = ...// reduce的返回类型必须和输入类型相同
// 为此我们将StockPrice拆成一个三元组 (股票代号,最大值、最小值)
val maxMin = input
.map(s => (s.symbol, s.price, s.price))
.keyBy(s => s._1)
.timeWindow(Time.seconds(10))
.reduce(((s1: (String, Double, Double), s2: (String, Double, Double)) => (s1._1, Math.max(s1._2, s2._2), Math.min(s1._3, s2._3))),new WindowEndProcessFunction
)
Trigger
我们先看Trigger返回一个什么样的结果。当满足某个条件,Trigger会返回一个名为TriggerResult
的结果:
当这些已有的Trigger无法满足我们的需求时,我们需要自定义Trigger,接下来我们看一下Flink的Trigger源码。
/*** T为元素类型* W为窗口*/
public abstract class Trigger<T, W extends Window> implements Serializable {/*** 当某窗口增加一个元素时调用onElement方法,返回一个TriggerResult*/public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;/*** 当一个基于Processing Time的Timer触发了FIRE时调用onProcessTime方法*/public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;/*** 当一个基于Event Time的Timer触发了FIRE时调用onEventTime方法*/public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;/*** 如果这个Trigger支持状态合并,则返回true*/public boolean canMerge() {return false;}/*** 当多个窗口被合并时调用onMerge*/public void onMerge(W window, OnMergeContext ctx) throws Exception {throw new UnsupportedOperationException("This trigger does not support merging.");}/*** 当窗口数据被清理时,调用clear方法来清理所有的Trigger状态数据*/public abstract void clear(W window, TriggerContext ctx) throws Exception/*** 上下文,保存了时间、状态、监控以及定时器*/public interface TriggerContext {/*** 返回当前Processing Time*/long getCurrentProcessingTime();/*** 返回MetricGroup */MetricGroup getMetricGroup();/*** 返回当前Watermark时间*/long getCurrentWatermark();/*** 将某个time注册为一个Timer,当系统时间到达time这个时间点时,onProcessingTime方法会被调用*/void registerProcessingTimeTimer(long time);/*** 将某个time注册为一个Timer,当Watermark时间到达time这个时间点时,onEventTime方法会被调用*/void registerEventTimeTimer(long time);/*** 将注册的Timer删除*/void deleteProcessingTimeTimer(long time);/*** 将注册的Timer删除*/void deleteEventTimeTimer(long time);/*** 获取该窗口Trigger下的状态*/<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);}/*** 将多个窗口下Trigger状态合并*/public interface OnMergeContext extends TriggerContext {<S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);}
}
class MyTrigger extends Trigger[StockPrice, TimeWindow] {override def onElement(element: StockPrice,time: Long,window: TimeWindow,triggerContext: Trigger.TriggerContext): TriggerResult = {val lastPriceState: ValueState[Double] = triggerContext.getPartitionedState(new ValueStateDescriptor[Double]("lastPriceState", classOf[Double]))// 设置返回默认值为CONTINUEvar triggerResult: TriggerResult = TriggerResult.CONTINUE// 第一次使用lastPriceState时状态是空的,需要先进行判断// 状态数据由Java端生成,如果是空,返回一个null// 如果直接使用Scala的Double,需要使用下面的方法判断是否为空if (Option(lastPriceState.value()).isDefined) {if ((lastPriceState.value() - element.price) > lastPriceState.value() * 0.05) {// 如果价格跌幅大于5%,直接FIRE_AND_PURGEtriggerResult = TriggerResult.FIRE_AND_PURGE} else if ((lastPriceState.value() - element.price) > lastPriceState.value() * 0.01) {val t = triggerContext.getCurrentProcessingTime + (10 * 1000 - (triggerContext.getCurrentProcessingTime % 10 * 1000))// 给10秒后注册一个TimertriggerContext.registerProcessingTimeTimer(t)}}lastPriceState.update(element.price)triggerResult}// 我们不用EventTime,直接返回一个CONTINUEoverride def onEventTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {TriggerResult.CONTINUE}override def onProcessingTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {TriggerResult.FIRE_AND_PURGE}override def clear(window: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {val lastPrice: ValueState[Double] = triggerContext.getPartitionedState(new ValueStateDescriptor[Double]("lastPrice", classOf[Double]))lastPrice.clear()}
}senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)val input: DataStream[StockPrice] = ...val average = input.keyBy(s => s.symbol).timeWindow(Time.seconds(60)).trigger(new MyTrigger).aggregate(new AverageAggregate)
在自定义Trigger时,如果使用了状态,一定要使用clear
方法将状态数据清理,否则随着窗口越来越多,状态数据会越积越多。
Evictor
清除器(Evictor)是在WindowAssigner
和Trigger
的基础上的一个可选选项,用来清除一些数据。我们可以在Window Function执行前或执行后调用Evictor。
/*** T为元素类型* W为窗口*/
public interface Evictor<T, W extends Window> extends Serializable {/*** 在Window Function前调用*/void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);/*** 在Window Function后调用*/void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);/*** Evictor的上下文*/interface EvictorContext {long getCurrentProcessingTime();MetricGroup getMetricGroup();long getCurrentWatermark();}
}
Flink窗口全解析:三种时间窗口、窗口处理函数使用及案例相关推荐
- flink 三种时间机制_Flink的时间与watermarks详解
当我们在使用Flink的时候,避免不了要和时间(time).水位线(watermarks)打交道,理解这些概念是开发分布式流处理应用的基础.那么Flink支持哪些时间语义?Flink是如何处理乱序事件 ...
- flink 三种时间机制_Flink1.10入门:时间机制简介
一.概述 上篇文章介绍了Window窗口机制的相关知识,这里我们介绍下Flink的另外一个核心概念"Event Time机制",本篇文章只介绍相关概念不讲实战,实战会结合Windo ...
- Linux文件的三种时间属性
一.Linux文件时间属性的分类 我们在用windows系统时,在查看磁盘文件时,经常会看到文件或目录的后面有一个时间信息,这个是文件在磁盘上别创建的时间.其实,在windows系统中,文件还有文件的 ...
- linux文件三种时间及stat的用法
转自:http://blog.csdn.net/signjing/article/details/7723516 在windows下,一个文件有三种时间属性: 创建时间.修改时间.访问时间. 而在Li ...
- web自动化测试-第四讲: 三种时间等待
我们在做web自动化测试,执行脚本的时候,想要对一些页面对象(输入框.按钮等)进行操作,需要对获取该元素的对象,才能对其操作(点击.输入文本内容等),但是,可能由于页面加载过慢导致代码报错:Messa ...
- 时间戳、中国标准时间、年月日三种时间格式转换
以2022年4月9号为例,列出三种时间格式形式: 时间戳-格式: 1649462400000 中国标准时间-格式: Sat Apr 09 2022 08:00:00 GMT+0800 (中国标准时间) ...
- 查看linux 文件创建时间,在Linux下查看文件三种时间
原标题:在Linux下查看文件三种时间 在Linux下,文件包含三种时间属性,分别为: atime(access time):最近访问文件内容时间(Last Access Time). mtime(m ...
- 视频监控系统中的流媒体服务器,视频监控系统中的流媒体服务器、直写和全切换三种取流架构方案...
原标题:视频监控系统中的流媒体服务器.直写和全切换三种取流架构方案 一.流媒体服务器架构 前摄像头视频信号通过转发流媒体服务器转发至上壁面显示和终端接入,视频存储磁阵列通过流媒体存储服务器写入.实时流 ...
- flink 三种时间机制_360深度实践:Flink 与 Storm 协议级对比
本文从数据传输和数据可靠性的角度出发,对比测试了 Storm 与 Flink 在流处理上的性能,并对测试结果进行分析,给出在使用 Flink 时提高性能的建议. Apache Storm.Apache ...
最新文章
- 我们是在搞学术,还是被学术搞?
- Html.ActionLink 几种重载方式说明及例子
- 线性布局 相对布局 参数
- luabind-0.9.1在windows、linux下的使用详解及示例
- php打印出函数的内容吗,PHP打印函数集合详解以及PHP打印函数对比详解(精)
- python中res代表什么_在下面的代码中,zip(*res)在python中是什么意思?
- matlab的开方算法_区域生长算法(附MATLAB代码实现)
- 数学模型--预测模型、BP神经网络预测
- MapReduce作业运行机制
- 看雪CTF.TSRC 2018 团队赛 第二题 半加器 writeup
- CLion的Toolchains are not configured和no CMAKE profiles问题
- 华为手机误删照片,除了相册恢复,还有这招能救命
- 使用大华NetSDK对接大华相机
- 情话套路大全,哈哈哈~~~
- 【POI2004】【Bzoj2069】T2 洞穴 zaw
- 创智汇集,汉韵流芳!大创智国风汉服赏与您相约十月
- Photoshop CS6 for Mac破解版/序列号简介
- [编程基础] Python命令行解析库argparse学习笔记
- 视图、存储过程、触发器
- 中国工程院院士高文ICTC演讲《国家新一代人工智能发展规划》
热门文章
- rt thread studio使用QBOOT和片外flash实现OTA升级
- 可达100K/月,美团招聘各类安全工程师(地点:北京/上海,内含大量岗位)
- 专攻国内实体瘤CAR-T细胞疗法,南京卡提医学获数千万元A轮融资
- Weasis研究(一): IDEA启动运行Weasis3.0.4
- 反思:项目开发中的语言沟通与文档沟通
- TrueLicense实现产品License验证
- ExternalException (0x80004005): 无法执行程序
- win7 下MCR的安装以及环境变量配置
- 贵阳python培训价格
- 什么叫封装?封装有什么作用?