flink自定义trigger详解
适用的场景解释:
[1]中有句话是这样的:
"其实,我们要实现基于事件时间的窗口随意输出,比如1000个元素触发一次输出,那么我们就可以通过修改这个触发器来实现。"
这句话的意思是,默认的自带的trigger一般是基于EventTime的。
那么这1000 个元素可能跨度是一小时,也可能跨度是两小时,对吧
但是显然默认的Trigger只能是盯着EventTime(时间戳)来决定是否触发计算,并不能根据元素个数进行触发。
也就是说,默认的Trigger盯着的跨度是"时间差"。而不是"个数差"
讲人话就是:
①例如Flink的Trigger默认每隔一天输出统计数据,
②但是不支持默认每隔一千个订单输出统计数据。
但是注意这里的一千个统计数据可能超过一天,甚至超过一周,耗时可能不固定。
因为你想啊,代码都是要把逻辑写死的对吧?
一千个订单可能一开始耗时一周,后来耗时一个月。那程序要怎么根据变化的时间来锁定一千个订单触发一次?
显然做不到,这个时候我们就希望锁定"个数间隔"、“个数差”,这个时候就需要自定义Trigger
官方文档说明:
下面是官方文档[4]中Triggers这一节的内容概括
.
需要override的函数 | 函数作用 |
onElement()
|
数据(element)被加入window的时候会调用该函数 |
onEventTime()
|
当一个注册的Event-Time定时器触发 |
onProcessingTime()
|
当一个注册的Processing-Time定时器触发 |
onMerge()
|
与有状态触发器(stateful triggers)和当两个窗口整合的时候整合(merge)状态相关。 例如使用session windows |
clear()
|
window清理数据需要 |
前面三个用来设定调用事件(invocation event)以后如何操作,
所以这些"操作"必须是一个TriggerResult
也就是说,前三个函数返回的TriggerResult可以是下面几种选择:
返回的TriggerResult | 作用 |
CONTINUE
|
什么都不做 |
FIRE
|
触发计算 |
PURGE
|
删除窗口中的所有数据 |
FIRE_AND_PURG
|
触发计算后删除窗口中所有数据 |
然后是Fire and Purge这一节的内容:
触发计算时,返回的一定是FIRE或者FIRE_AND_PURG(这个话仅仅是来自官方文档的翻译,其实Intellij提示的选项并不仅仅是上面几个)
具体示范代码参考[5]即可
private static Logger LOG = LoggerFactory.getLogger(CountTriggerWithTimeout.class);/*** 窗口最大数据量*/private int maxCount;/*** event time / process time*/private TimeCharacteristic timeType;/*** 用于储存窗口当前数据量的状态对象*/private ReducingStateDescriptor<Long> countStateDescriptor =new ReducingStateDescriptor("counter", new Sum(), LongSerializer.INSTANCE);public CountTriggerWithTimeout(int maxCount, TimeCharacteristic timeType) {this.maxCount = maxCount;this.timeType = timeType;}private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception {clear(window, ctx);return TriggerResult.FIRE_AND_PURGE;}@Overridepublic TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);countState.add(1L);if (countState.get() >= maxCount) {LOG.info("fire with count: " + countState.get());return fireAndPurge(window, ctx);}if (timestamp >= window.getEnd()) {LOG.info("fire with tiem: " + timestamp);return fireAndPurge(window, ctx);} else {return TriggerResult.CONTINUE;}}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {if (timeType != TimeCharacteristic.ProcessingTime) {return TriggerResult.CONTINUE;}if (time >= window.getEnd()) {return TriggerResult.CONTINUE;} else {LOG.info("fire with process tiem: " + time);return fireAndPurge(window, ctx);}}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {if (timeType != TimeCharacteristic.EventTime) {return TriggerResult.CONTINUE;}if (time >= window.getEnd()) {return TriggerResult.CONTINUE;} else {LOG.info("fire with event tiem: " + time);return fireAndPurge(window, ctx);}}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);countState.clear();}/*** 计数方法*/class Sum implements ReduceFunction<Long> {@Overridepublic Long reduce(Long value1, Long value2) throws Exception {return value1 + value2;}}
}
Reference:
[1]flink自定义trigger-实现窗口随意输出
[2]Flink 自定义Trigger
[3]Flink 自定义trigger
[4]flink官方文档-窗口
[5]Flink 自定义触发器
flink自定义trigger详解相关推荐
- pandas dataframe中的列进行重新排序、倒排、正排、自定义排序详解及实践
pandas dataframe中的列进行重新排序,pandas dataframe列重排.倒排.正排.自定义排序详解及实践 实施数据构建: import pandas as pd import nu ...
- Android Gradle 自定义Task详解二:进阶
转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/78523958 本文出自[赵彦军的博客] 系列目录 Android Gradle使用 ...
- Android Gradle 自定义Task 详解
转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/76408024 本文出自[赵彦军的博客] 系列目录 Android Gradle使用 ...
- python数据处理常用函数_pytorch中的自定义数据处理详解
pytorch在数据中采用Dataset的数据保存方式,需要继承data.Dataset类,如果需要自己处理数据的话,需要实现两个基本方法. :.getitem:返回一条数据或者一个样本,obj[in ...
- html 自定义打印模板,HTML+CSS入门 自定义模板详解
本篇教程介绍了HTML+CSS入门 自定义模板详解,希望阅读本篇文章以后大家有所收获,帮助大家HTML+CSS入门.< 首先总的stylecss和大模板都是当初angel_Kitty学姐的,嗯, ...
- python构造自定义数据包_pytorch中的自定义数据处理详解
pytorch在数据中采用Dataset的数据保存方式,需要继承data.Dataset类,如果需要自己处理数据的话,需要实现两个基本方法. :.getitem:返回一条数据或者一个样本,obj[in ...
- ASP.NET技巧:GridView控件自定义分页详解第一页
ASP.NET技巧:GridView控件自定义分页详解 日期:2007年9月11日 作者: 查看:[大字体 中字体 小字体] <script src="../gg/info468.js ...
- C语言自定义类型详解
C语言自定义类型详解 一.结构体 1.结构的声明 2.特殊的声明 3.结构的自引用 4.结构体变量的定义和初始化 5.结构体内存对齐 6.修改默认对齐数 7.结构体传参 二.位段 1.什么是位段 2. ...
- android 自定义xmlns,Android xmlns 的作用及其自定义实例详解
Android xmlns 的作用及其自定义实例详解 xmlns:Android="http://schemas.android.com/apk/res/android的作用是: 这个是xm ...
最新文章
- druid拦截器_CMS基于SpringBoot+Shiro+Mybatis+Druid+layui后台管理系统
- 网友问答之:AD、DNS转发器、WINS不能自动启动
- 成功解决To fix this you could try to: 1. loosen the range of package versions you‘ve specified ​​​​​​​
- python外星人入侵飞船上下移动_python外星人入侵游戏左移正常,右移屏幕上不显示,但实际上已经移动了?...
- Oracle之索引、权限
- PHP 分布式集群中session共享问题以及session有效期的设置
- mysql数据库文件位置
- Purification(CF-330C)
- 万万没想到,AI算法开发、OCR应用已经进入零门槛时代!
- 模板上 php dede,DEDE模板中使用php和if判断语句实例
- JDK、STS、SVN、Tomcat 、mysql的下载安装及环境变量的配置和sts修改字体大小
- CSS 实现半透明边框效果实战
- flash 中.Swf 格式的文字 转化为 word 或pdf 或 xps
- 码农、程序员、工程师这三者之间有什么区别?
- html 走马看花还有vb,走马看花的故事
- edp接口规范_edp和lvds区别在哪里
- 微信小程序轨迹回放实现及遇到的坑
- Windows防火墙导致FTP服务器不能访问的解决方法
- 小程序——picker组件
- sim卡在苹果手机显示无服务器,iPhone手机没有信号怎么办 手机提示无服务怎么解决...