窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。

文章目录

  • 1.增量聚合函数
    • 1.1 ReduceFunction
    • 1.2 AggregateFunction
  • 2.全窗口函数
    • 2.1 WindowFunction
    • 2.2 ProcessWindowFunction
  • 3.增量聚合和全窗口函数的结合使用

1.增量聚合函数

  • 归约函数: ReduceFunction
  • 聚合函数: AggregateFunction

1.1 ReduceFunction

public class WindowTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setParallelism(1);environment.getConfig().setAutoWatermarkInterval(100);DataStream<Event> dataStreamSource = environment.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L),new Event("Alice", "./fav", 3000L),new Event("Mary", "./fav", 2000L),new Event("Bob", "./fav", 3000L),new Event("Alice", "./fav", 3000L),new Event("Bob", "./prod?id=1", 4000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}}));dataStreamSource.map(new MapFunction<Event, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(Event event) throws Exception {return Tuple2.of(event.user, 1L);}}).keyBy(data -> data.f0).window(TumblingEventTimeWindows.of(Time.seconds(10))).reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> stringLongTuple2, Tuple2<String, Long> t1) throws Exception {return Tuple2.of(stringLongTuple2.f0, stringLongTuple2.f1 + t1.f1);}}).print();environment.execute();}
}

先是基于 WindowedStream 调用.reduce()方法, 然后传入ReduceFunction作为参数, 就是将窗口中收集到的数据两两规约。

(Mary,2)
(Alice,2)
(Bob,3)

每来一条数据,就会调用内部的 reduce 方法,将新数据中的 count值叠加到状态上,并得到新的状态保存起来。等到了 5 秒窗口的结束时间,就把归约好的状态直接输出。

1.2 AggregateFunction

public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
{ACC createAccumulator();ACC add(IN value, ACC accumulator);OUT getResult(ACC accumulator);ACC merge(ACC a, ACC b);
}

AggregateFunction 可以看作是 ReduceFunction 的通用版本, 输入类型(IN)、累加器类型(ACC)和输出类型 (OUT), 累加器类型 ACC 则是我们进行聚合的中间状态类型

  • createAccumulator: 创建一个累加器, 为聚合的初始状态
  • add: 将输入的元素添加到累加器, 每条数据到来之后都会调用这个方法。
  • getResult: 从累加器中提取聚合的输出结果。
  • merge: 合并两个累加器

在电商网站中,PV(页面浏览量)和 UV(独立访客数)是非常重要的两个流量指标。

  1. PV 统计的是所有的点击量
  2. UV 是全部的用户id总和
  3. PV/UV 代表的是人均重复访问量
public class WindowAggregateTest_PvUv {public static void main(String[] args) throws Exception {// Pv: +1// Uv: 去重// Pv/Uv: 平均每一个用户的访问次数, 网站的活跃度StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setParallelism(1);SingleOutputStreamOperator<Event> dataStream = environment.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}}));dataStream.print("data: ");dataStream.keyBy(data -> true).window(TumblingEventTimeWindows.of(Time.seconds(10))).aggregate(new AvgPv()).print();environment.execute();}// 参数1 Pv, 参数2 Uvpublic static class AvgPv implements AggregateFunction<Event, Tuple2<Long, HashSet<String>>, Double> {@Overridepublic Tuple2<Long, HashSet<String>> createAccumulator() {return Tuple2.of(0L, new HashSet<>());}@Overridepublic Tuple2<Long, HashSet<String>> add(Event event, Tuple2<Long, HashSet<String>> longHashSetTuple2) {longHashSetTuple2.f1.add(event.user);return Tuple2.of(longHashSetTuple2.f0 + 1, longHashSetTuple2.f1);}@Overridepublic Double getResult(Tuple2<Long, HashSet<String>> longHashSetTuple2) {return Double.valueOf(longHashSetTuple2.f0/longHashSetTuple2.f1.size());}@Overridepublic Tuple2<Long, HashSet<String>> merge(Tuple2<Long, HashSet<String>> longHashSetTuple2, Tuple2<Long, HashSet<String>> acc1) {return null;}}
}
data: > Event{user='Mary', url='./fav', timestamp=2022-12-13 18:58:58.794}
data: > Event{user='Bob', url='./fav', timestamp=2022-12-13 18:58:59.8}
data: > Event{user='Mary', url='./cart', timestamp=2022-12-13 18:59:00.809}
1.0
data: > Event{user='Bob', url='./home', timestamp=2022-12-13 18:59:01.825}
data: > Event{user='Mary', url='./home', timestamp=2022-12-13 18:59:02.84}
data: > Event{user='Bob', url='./cart', timestamp=2022-12-13 18:59:03.853}
data: > Event{user='Mary', url='./home', timestamp=2022-12-13 18:59:04.865}
data: > Event{user='Cary', url='./cart', timestamp=2022-12-13 18:59:05.914}
data: > Event{user='Mary', url='./home', timestamp=2022-12-13 18:59:06.922}
data: > Event{user='Cary', url='./cart', timestamp=2022-12-13 18:59:07.936}
data: > Event{user='Cary', url='./fav', timestamp=2022-12-13 18:59:08.951}
data: > Event{user='Cary', url='./fav', timestamp=2022-12-13 18:59:09.965}
data: > Event{user='Alice', url='./cart', timestamp=2022-12-13 18:59:10.973}
3.0

通过 ReduceFunction 和 AggregateFunction 我们可以发现,增量聚合函数其实就是在用流处理的思路来处理有界数据集,核心是保持一个聚合状态,当数据到来时不停地更新状态。这就是 Flink 所谓的“有状态的流处理”,通过这种方式可以极大地提高程序运行的效率,所以在实际应用中最为常见。

2.全窗口函数

  • 窗口函数: WindowFunction
  • 处理窗口函数: ProcessWindowFunction

2.1 WindowFunction

WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。

stream.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction());

WindowFunction可以拿到可迭代集合和窗口本身信息

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function,
Serializable {void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws
Exception;
}

不过WindowFunction可提供的信息比较少, ProcessWindowFunction可以覆盖信息。

2.2 ProcessWindowFunction

ProcessWindowFunction 可以拿到上下文对象, 就包括了处理时间(processing time)和事件时间水位线(event time watermark)

统计电商网站统计每小时 UV

public class WindowProcessTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setParallelism(1);SingleOutputStreamOperator<Event> dataStream = environment.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}}));dataStream.print("data: ");dataStream.keyBy(data -> true).window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new UvCountProcess()).print();environment.execute();}public static class UvCountProcess extends ProcessWindowFunction<Event, String, Boolean, TimeWindow> {@Overridepublic void process(Boolean aBoolean, Context context, java.lang.Iterable<Event> elements, Collector<String> out) throws Exception {HashSet<String> hashSet = new HashSet<>();for (Event element : elements) {hashSet.add(element.user);}int size = hashSet.size();long start = context.window().getStart();long end = context.window().getEnd();out.collect("窗口 " + new Timestamp(start) + " ~ " + new Timestamp(end) + " Uv: " + size);}}
}
data: > Event{user='Cary', url='./fav', timestamp=2022-12-13 19:15:45.944}
data: > Event{user='Cary', url='./cart', timestamp=2022-12-13 19:15:46.956}
data: > Event{user='Bob', url='./cart', timestamp=2022-12-13 19:15:47.971}
data: > Event{user='Bob', url='./home', timestamp=2022-12-13 19:15:48.973}
data: > Event{user='Cary', url='./home', timestamp=2022-12-13 19:15:49.986}
data: > Event{user='Mary', url='./cart', timestamp=2022-12-13 19:15:51.001}
窗口 2022-12-13 19:15:40.0 ~ 2022-12-13 19:15:50.0 Uv: 2
data: > Event{user='Mary', url='./cart', timestamp=2022-12-13 19:15:52.014}
data: > Event{user='Cary', url='./home', timestamp=2022-12-13 19:15:53.018}
data: > Event{user='Cary', url='./fav', timestamp=2022-12-13 19:15:54.034}
data: > Event{user='Cary', url='./fav', timestamp=2022-12-13 19:15:55.038}
data: > Event{user='Mary', url='./cart', timestamp=2022-12-13 19:15:56.051}
data: > Event{user='Alice', url='./home', timestamp=2022-12-13 19:15:57.068}
data: > Event{user='Bob', url='./fav', timestamp=2022-12-13 19:15:58.084}
data: > Event{user='Mary', url='./fav', timestamp=2022-12-13 19:15:59.087}
data: > Event{user='Mary', url='./cart', timestamp=2022-12-13 19:16:00.104}
窗口 2022-12-13 19:15:50.0 ~ 2022-12-13 19:16:00.0 Uv: 4

HashSet 的元素个数就是 UV 值

3.增量聚合和全窗口函数的结合使用

增量聚合函数处理计算会更高效, 全窗口函数提供了更多的信息

WindowedStream的.aggregate()方法中, 可以添加两个函数的实现类

第一个参数为ReduceFunction或AggregateFunction, 第二个参数为WindowFunction或ProcessWindowFunction

基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果

Uv: 同一个网站的Uv

public class UvCountExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setParallelism(1);SingleOutputStreamOperator<Event> dataStream = environment.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}}));dataStream.print("data: ");dataStream.keyBy(data -> true).window(TumblingEventTimeWindows.of(Time.seconds(10))).aggregate(new UvAgg(), new UvCountResult()).print();environment.execute();}public static class UvAgg implements AggregateFunction<Event, HashSet<String>, Long> {@Overridepublic HashSet<String> createAccumulator() {return new HashSet<>();}@Overridepublic HashSet<String> add(Event event, HashSet<String> hashSet) {hashSet.add(event.user);return hashSet;}@Overridepublic Long getResult(HashSet<String> events) {return Long.valueOf(events.size());}@Overridepublic HashSet<String> merge(HashSet<String> events, HashSet<String> acc1) {return null;}}public static class UvCountResult extends ProcessWindowFunction<Long, String, Boolean, TimeWindow> {@Overridepublic void process(Boolean aBoolean, Context context, Iterable<Long> elements, Collector<String> out) throws Exception {Long uv = elements.iterator().next();long start = context.window().getStart();long end = context.window().getEnd();out.collect("窗口: " + new Timestamp(start) + " ~ " + new Timestamp(end) + " Uv: " + uv);}}
}
data: > Event{user='Alice', url='./home', timestamp=2022-12-13 20:54:39.832}
data: > Event{user='Alice', url='./home', timestamp=2022-12-13 20:54:40.837}
窗口: 2022-12-13 20:54:30.0 ~ 2022-12-13 20:54:40.0 Uv: 1
data: > Event{user='Mary', url='./home', timestamp=2022-12-13 20:54:41.84}
data: > Event{user='Alice', url='./fav', timestamp=2022-12-13 20:54:42.853}
data: > Event{user='Cary', url='./fav', timestamp=2022-12-13 20:54:43.855}
data: > Event{user='Alice', url='./cart', timestamp=2022-12-13 20:54:44.863}
data: > Event{user='Bob', url='./fav', timestamp=2022-12-13 20:54:45.875}
data: > Event{user='Cary', url='./cart', timestamp=2022-12-13 20:54:46.89}
data: > Event{user='Mary', url='./home', timestamp=2022-12-13 20:54:47.891}
data: > Event{user='Mary', url='./fav', timestamp=2022-12-13 20:54:48.905}
data: > Event{user='Bob', url='./home', timestamp=2022-12-13 20:54:49.909}
data: > Event{user='Alice', url='./home', timestamp=2022-12-13 20:54:50.916}
窗口: 2022-12-13 20:54:40.0 ~ 2022-12-13 20:54:50.0 Uv: 4

在不同的Url 下的Uv

POJO UrlCountView

public class UrlCountView {public String url;public Long count;public Long start;public Long end;@Overridepublic String toString() {return "UrlCountView{" +"url='" + url + '\'' +", count=" + count +", start=" + new Timestamp(start) +", end=" + new Timestamp(end) +'}';}public UrlCountView() {}public UrlCountView(String url, Long count, Long start, Long end) {this.url = url;this.count = count;this.start = start;this.end = end;}
}
public class UvCountViewExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setParallelism(1);SingleOutputStreamOperator<Event> dataStream = environment.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}}));dataStream.print("data: ");dataStream.keyBy(data -> data.url).window(TumblingEventTimeWindows.of(Time.seconds(10))).aggregate(new UvCountViewAgg(), new UvCountViewResult()).print();environment.execute();}public static class UvCountViewAgg implements AggregateFunction<Event, Long, Long>{@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(Event event, Long aLong) {return aLong + 1;}@Overridepublic Long getResult(Long aLong) {return aLong;}@Overridepublic Long merge(Long aLong, Long acc1) {return null;}}public static class UvCountViewResult extends ProcessWindowFunction<Long, UrlCountView, String, TimeWindow>{// key@Overridepublic void process(String s, Context context, Iterable<Long> elements, Collector<UrlCountView> out) throws Exception {Long uv = elements.iterator().next();long start = context.window().getStart();long end = context.window().getEnd();out.collect(new UrlCountView(s, uv, start, end));}}
}
data: > Event{user='Cary', url='./cart', timestamp=2022-12-13 20:56:18.065}
data: > Event{user='Alice', url='./fav', timestamp=2022-12-13 20:56:19.077}
data: > Event{user='Cary', url='./fav', timestamp=2022-12-13 20:56:20.091}
UrlCountView{url='./cart', count=1, start=2022-12-13 20:56:10.0, end=2022-12-13 20:56:20.0}
UrlCountView{url='./fav', count=1, start=2022-12-13 20:56:10.0, end=2022-12-13 20:56:20.0}
data: > Event{user='Alice', url='./cart', timestamp=2022-12-13 20:56:21.105}
data: > Event{user='Mary', url='./home', timestamp=2022-12-13 20:56:22.111}
data: > Event{user='Alice', url='./cart', timestamp=2022-12-13 20:56:23.115}
data: > Event{user='Alice', url='./home', timestamp=2022-12-13 20:56:24.119}
data: > Event{user='Bob', url='./home', timestamp=2022-12-13 20:56:25.125}
data: > Event{user='Cary', url='./cart', timestamp=2022-12-13 20:56:26.129}
data: > Event{user='Mary', url='./cart', timestamp=2022-12-13 20:56:27.14}
data: > Event{user='Cary', url='./home', timestamp=2022-12-13 20:56:28.145}
data: > Event{user='Bob', url='./home', timestamp=2022-12-13 20:56:29.155}
data: > Event{user='Bob', url='./cart', timestamp=2022-12-13 20:56:30.171}
UrlCountView{url='./fav', count=1, start=2022-12-13 20:56:20.0, end=2022-12-13 20:56:30.0}
UrlCountView{url='./cart', count=4, start=2022-12-13 20:56:20.0, end=2022-12-13 20:56:30.0}
UrlCountView{url='./home', count=5, start=2022-12-13 20:56:20.0, end=2022-12-13 20:56:30.0}

窗口处理的主体还是增量聚合,而引入全窗口函数又可以获取到更多的信息包装输出,这样的结合兼具了两种窗口函数的优势,在保证处理性能和实时性的同时支持了更加丰富的应用场景。

Flink Window Function相关推荐

  1. flink window实例分析

    window是处理数据的核心.按需选择你需要的窗口类型后,它会将传入的原始数据流切分成多个buckets,所有计算都在window中进行. flink本身提供的实例程序TopSpeedWindowin ...

  2. 1.16.Flink Window和Time详解、TimeWindow的应用、Window聚合分类之全量聚合、全量聚合状态变化过程-求最大值、Time介绍、EventTime和Watermarks等

    1.16.Flink Window和Time详解 1.16.1.Window(窗口) 1.16.2.Window的类型 1.16.3.Window类型汇总 1.16.4.TimeWindow的应用 1 ...

  3. Flink window 用法介绍

    Sink Flink没有类似spark中foreach方法 让用户进行迭代操作 虽有对外的输出操作 都要利用Sink完成 最后通过类似如下方式完成整个任务最终输出操作 stream.addSink(n ...

  4. 1.20_Flink的Window全面解析\Keyed Windows\Window Assigners\Tumbling,Sliding,Session,Global,Window Function

    1.20.透过窗口看无限数据流----Flink的Window全面解析 1.20.1.Quick Start 1.20.1.1.是什么? 1.20.1.2.如何用? 1.20.1.2.1.Keyed ...

  5. Vue父组件调用子组件的方法并传参的两种方式(用$refs.refName.functionName、window.function)

    如需了解儿子怎么控制老子的,传送门:https://s-z-q.blog.csdn.net/article/details/120094689 父组件father.vue <template&g ...

  6. Flink Window基本概念与实现原理

    Window意为窗口.在流处理系统中数据源源不断流入到系统,我们可以逐条处理流入的数据,也可以按一定规则一次处理流中的多条数据.当处理数据时程序需要知道什么时候开始处理.处理哪些数据.窗口提供了这样一 ...

  7. 窗函数(Window Function)在信号处理当中的应用

    窗函数(Window Function)在信号处理当中的应用 1. 从两个重要极限到时域低通滤波器 两个重要极限 数学里常常会把两个非常重要而且非常常见的极限放在一起,并称他们为两个重要极限. 第一个 ...

  8. mysql8 create table 语法错误_MySQL8.0 Window Function 剖析

    title: $MySQL8.0 Window Functions 剖析 author: $马腾 什么是window function window function 是在满足某种条件的记录集合上执行 ...

  9. 窗函数(window function)

    原文地址:窗函数(window function) 作者:了凡春秋 窗函数是频谱分析中一个重要的部分,窗函数修正了由于信号的非周期性并减小了频谱中由于泄露而带来的测量不准确性. 快速傅里叶变换假定了时 ...

最新文章

  1. 为python安装numpy和scipy(federo)
  2. 流利说统一可观察性平台实践
  3. linux命令之date
  4. 【Android】UI架构
  5. 决策树 算法原理及代码
  6. MS提供的Oracle for ADO 驱动,在一个应用中不能打开同时多个数据库连接
  7. Linux内存管理(一)——从硬件角度看内存管理
  8. 【Kafka】Kafka认证与授权
  9. 多个中间件_小T说:消息中间件,为什么用RabbitMQ及支持的场景
  10. Web CAD SDK 14.1.0 New Crack
  11. 如何用Go语言创建WebSocket服务
  12. 梯度下降算法和正规方程组学习笔记
  13. 该不该造自己的轮子?
  14. 04 现实生活中,你应该如何套利?
  15. 一个定时器实现IO模拟pwm,呼吸灯效果
  16. DecimalFormat 保留小数格式化
  17. 日记 [2007年08月29日]
  18. C++ 引用以及引用与指针的区别
  19. 最优控制学习笔记——最优控制问题基本组成
  20. java下载mp3_Java如何利用url下载MP3保存到本地?

热门文章

  1. ubuntu终端字体大小调整方法
  2. 现代基准测试程序种类以及使用方法
  3. 菜鸡游戏 云游戏平台 低配手机、电视、PCMAC电脑、平板 畅玩大型游戏
  4. Python数据预处理:机器学习、人工智能通用技术(1)
  5. 单个正态总体均值的置信区间
  6. 统计学基础之常用统计量和抽样分布
  7. [渝粤教育] 西南科技大学 现代数字系统设计 在线考试复习资料2021版
  8. 小X与神牛(dfs)
  9. 数据分析与挖掘(一)误差与精度
  10. pytorch矩阵乘法mm,bmm