文章目录

  • 定时器(Timer)和定时服务(TimerService)
  • KeyedProcessFunction 的使用

在 Flink 程序中,为了实现数据的聚合统计,或者开窗计算之类的功能,我们一般都要先用 keyBy 算子对数据流进行“按键分区”,得到一个 KeyedStream。也就是指定一个键(key),按照它的哈希值(hash code)将数据分成不同的“组”,然后分配到不同的并行子任务上执行计算;这相当于做了一个逻辑分流的操作,从而可以充分利用并行计算的优势实时处理海量数据。

只有在 KeyedStream 中才支持使用 TimerService 设置定时器的操作。所以一般情况下,我们都是先做了 keyBy 分区之后,再去定义处理操作;代码中更加常见的处理函数是 KeyedProcessFunction,最基本的 ProcessFunction 反而出镜率没那么高。

定时器(Timer)和定时服务(TimerService)

KeyedProcessFunction 的一个特色,就是可以灵活地使用定时器。定时器(timers)是处理函数中进行时间相关操作的主要机制。在.onTimer()方法中可以实现定时处理的逻辑,而它能触发的前提,就是之前曾经注册过定时器、并且现在已经到了触发时间。注册定时器的功能,是通过上下文中提供的“定时服务”(TimerService)来实现的。定时服务与当前运行的环境有关。前面已经介绍过,ProcessFunction 的上下文(Context)中提供了 .timerService() 方法,可以直接返回一个 TimerService 对象:

public abstract TimerService timerService();

TimerService 是 Flink 关于时间和定时器的基础服务接口,包含以下六个方法:

// 获取当前的处理时间
long currentProcessingTime();
// 获取当前的水位线(事件时间)
long currentWatermark();
// 注册处理时间定时器,当处理时间超过 time 时触发
void registerProcessingTimeTimer(long time);
// 注册事件时间定时器,当水位线超过 time 时触发
void registerEventTimeTimer(long time);
// 删除触发时间为 time 的处理时间定时器
void deleteProcessingTimeTimer(long time);
// 删除触发时间为 time 的处理时间定时器
void deleteEventTimeTimer(long time);

六个方法可以分成两大类:基于处理时间和基于事件时间。而对应的操作主要有三个:获取当前时间,注册定时器,以及删除定时器。需要注意,尽管处理函数中都可以直接访问TimerService,不过只有基于 KeyedStream 的处理函数,才能去调用注册和删除定时器的方法;未作按键分区的 DataStream 不支持定时器操作,只能获取当前时间。

对于处理时间和事件时间这两种类型的定时器,TimerService 内部会用一个优先队列将它们的时间戳(timestamp)保存起来,排队等待执行。可以认为,定时器其实是 KeyedStream上处理算子的一个状态,它以时间戳作为区分。所以 TimerService 会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个 key 和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次。这样一来,我们在代码中就方便了很多,可以肆无忌惮地对一个 key 注册定时器,而不用担心重复定义——因为一个时间戳上的定时器只会触发一次。

基于 KeyedStream 注册定时器时,会传入一个定时器触发的时间戳,这个时间戳的定时器对于每个 key 都是有效的。这样,我们的代码并不需要做额外的处理,底层就可以直接对不同key 进行独立的处理操作了。

利用这个特性,有时我们可以故意降低时间戳的精度,来减少定时器的数量,从而提高处理性能。比如我们可以在设置定时器时只保留整秒数,那么定时器的触发频率就是最多 1 秒一次。

long coalescedTime = time / 1000 * 1000;
ctx.timerService().registerProcessingTimeTimer(coalescedTime);

这里注意定时器的时间戳必须是毫秒数,所以我们得到整秒之后还要乘以 1000。定时器默认的区分精度是毫秒。另外 Flink 对.onTimer()和.processElement()方法是同步调用(synchronous),所以也不会出现状态的并发修改。

KeyedProcessFunction 的使用

KeyedProcessFunction 可以说是处理函数中的“嫡系部队”,可以认为是 ProcessFunction 的一个扩展。我们只要基于 keyBy 之后的 KeyedStream,直接调用.process()方法,这时需要传入的参数就是 KeyedProcessFunction 的实现类。

类似地,KeyedProcessFunction 也是继承自 AbstractRichFunction 的一个抽象类,源码中定义如下:

@PublicEvolving
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {private static final long serialVersionUID = 1L;public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

可以看到与 ProcessFunction 的定义几乎完全一样,区别只是在于类型参数多了一个 K,这是当前按键分区的 key 的类型。同样地,我们必须实现一个 .processElement() 抽象方法,用来处理流中的每一个数据;另外还有一个非抽象方法 .onTimer(),用来定义定时器触发时的回调操作。由于定时器只能在 KeyedStream 上使用,所以到了 KeyedProcessFunction 这里,我们才真正对时间有了精细的控制,定时方法.onTimer()才真正派上了用场。

使用处理时间定时器的具体示例:具体示例

public class ProcessingTimeTimerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据,并提取时间戳、生成水位线SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());stream.keyBy(data -> data.user).process(new KeyedProcessFunction<String, Event, String>() {@Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception {// 获取当前处理时间long currTs = ctx.timerService().currentProcessingTime();out.collect(ctx.getCurrentKey() + "数据到达,到达时间:" + new Timestamp(currTs));//注册一个10s之后的定时器ctx.timerService().registerProcessingTimeTimer(currTs + 10 * 1000L);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {out.collect(ctx.getCurrentKey() + "定时器触发,触发时间:" + new Timestamp(timestamp));}}).print();env.execute();}
}

我们自定义了一个 KeyedProcessFunction,其中.processElement()方法是每来一个数据都会调用一次,主要是定义了一个 10 秒之后的定时器;而 .onTimer() 方法则会在定时器触发时调用。所以我们会看到,程序运行后先在控制台输出“数据到达”的信息,等待 10 秒之后,又会输出“定时器触发”的信息,打印出的时间间隔正是 10 秒。

上面的例子是处理时间的定时器,所以我们是真的需要等待 10 秒才会看到结果。事件时间语义下,又会有什么不同呢?我们可以对上面的代码略作修改,做一个测试:具体测试

public class EventTimeTimerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据,并提取时间戳、生成水位线SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));//事件时间定时器stream.keyBy(data -> data.user).process(new KeyedProcessFunction<String, Event, String>() {@Overridepublic void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {// 获取数据当前的时间long currTs = ctx.timestamp();out.collect(ctx.getCurrentKey() + "数据到达,时间戳:" + new Timestamp(currTs) + " watermark:" + ctx.timerService().currentWatermark());//注册一个10s之后的定时器ctx.timerService().registerEventTimeTimer(currTs + 10 * 1000L);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {System.err.println(ctx.getCurrentKey() + "定时器触发,触发时间:" + new Timestamp(timestamp) + " watermark:" + ctx.timerService().currentWatermark());//out.collect(ctx.getCurrentKey() + "定时器触发,触发时间:" + new Timestamp(timestamp) + " watermark:" + ctx.timerService().currentWatermark());}}).print();env.execute();}
}

事件时间语义下,定时器触发的条件就是水位线推进到设定的时间。

设置了10s的定时器,定时器触发的时间是最开始的时间 + 10 * 1000L 之后的数据到达之后,才会触发。


当没有更多的数据生成了,整个程序运行结束将要退出,此时 Flink 会自动将水位线推进到长整型的最大值(Long.MAX_VALUE)。

Flink 案件分区 处理函数(KeyedProcessFunction)相关推荐

  1. Flink SQL中的函数

    Table API是内嵌在Java语言中的,很多方法需要在类中额外添加,扩展功能比较麻烦,目前支持的函数比较少,故一般情况下我们使用Flink SQL中的函数 Flink SQL中的函数主要分为两类: ...

  2. 【Flink】Flink Table SQL 用户自定义函数: UDF、UDAF、UDTF

    本文总结Flink Table & SQL中的用户自定义函数: UDF.UDAF.UDTF. UDF: 自定义标量函数(User Defined Scalar Function).一行输入一行 ...

  3. Flink AggOperator 增量聚合函数

    增量聚合函数(incremental aggregation functions) 1.来一条数据计算一次 2.每次计算,保持一个简单的状态(累加器) 3.当窗口闭合时,增量聚合完成 ReduceFu ...

  4. Flink中增量聚合函数和全量聚合函数的关系

    在上一篇博客当中,我们对Window的整体分类即使用进行了介绍,今天我们将从另外一个角度对Window进行分类,这个角度就是聚合角度. Window的聚合操作分为2种:一种是增量聚合,另外一种是全量聚 ...

  5. mysql分区函数_mysql 分区可用函数

    DAY() DAYOFMONTH() DAYOFWEEK() DAYOFYEAR() DATEDIFF() EXTRACT() HOUR() MICROSECOND() MINUTE() MOD() ...

  6. Flink SQL自定义聚合函数

    <2021年最新版大数据面试题全面开启更新> 本篇幅介绍Flink Table/SQL中如何自定义一个聚合函数,介绍其基本用法.撤回定义以及与源码结合分析每个方法的调用位置. 基本使用 F ...

  7. Flink java 自定义reduce函数,以wordcount为例

    maven项目的文本文件与pom.xml配置请参考:https://blog.csdn.net/weixin_35757704/article/details/120555968 同样以wordcou ...

  8. 电脑爱好——PE系统分区工具 分区时函数错误,报000000001错误 解决方法

    1.启动硬盘分区软件diskgenius(一般都是这个分区软件,这个PE系统自带的居多) 2.将现有的分区全部删掉 3.选择菜单栏--"硬盘"--"转换分区表类型为MBR ...

  9. Flink 处理函数

    概述   之前所介绍的流处理 API,无论是基本的转换.聚合,还是更为复杂的窗口操作,其实都是基于 DataStream 进行转换的:所以可以统称为DataStream API,这也是 Flink 编 ...

最新文章

  1. 绝对定位下margin的作用
  2. mysql 数据库快照迁移_快照方式备份mysql 数据库
  3. 使用rsync实现数据实时同步备份--实战
  4. mysqls压力测试怎么用_MySQL压力测试工具使用
  5. python写的软件怎么逆向_python逆向工程:通过代码生成类图
  6. gz键盘增强小工具_干货推荐∣6个超有用的在线工具,日常必备
  7. sql字段合并mysql_sql合并字段
  8. vue 判断是否function_vue2.0组件的prop验证中的Function类型怎么使用(向子组件传递函数对象的正确方法)?...
  9. Java的新项目学成在线笔记-day14(二)
  10. Navicat安装问题及解决办法
  11. JAVA JVM优化总结
  12. 汉字编码对照表(gb2312/unicode/utf8)
  13. 修改element $prompt的文字样式
  14. Matlab与Excel文件的数据交换
  15. php中大于等于的表示方法,php:判断php版本是否大于等于某个版本的方法
  16. c# MVC 网页开发
  17. android项目实现查询功能,Android实现归属地查询功能(示例代码)
  18. 如何保存微信视频到本地,微信朋友圈怎么发本地的视频。
  19. 他是“中国第一程序员”,一人之力单挑微软!
  20. 《孢子》我脑瘫一样的坐在沙发上玩了一个晚上

热门文章

  1. Vector 类存储原理
  2. 电脑只有飞行模式,没有WiFi(多种有效解决方式)
  3. PotPlayer不支持S/W HEVC(H.265)解码怎么办
  4. ASP.NET 2.0揭秘(中文版)国庆献礼
  5. 泛微E9 流程 独立选择框对应数据库表 查询
  6. 2023年宜昌市中职组“网络安全”赛项竞赛任务书-2
  7. 人脸识别——脸部属性辅助(Attribute-Centered Loss)
  8. php模拟post提交json数据,如何在PHP中利用curl模拟post提交json数据
  9. Google Earth Engine——地图对象
  10. 被动环绕解码混音方法