适用的场景解释:

[1]中有句话是这样的:

"其实,我们要实现基于事件时间的窗口随意输出,比如1000个元素触发一次输出,那么我们就可以通过修改这个触发器来实现。"

这句话的意思是,默认的自带的trigger一般是基于EventTime的。

那么这1000 个元素可能跨度是一小时,也可能跨度是两小时,对吧

但是显然默认的Trigger只能是盯着EventTime(时间戳)来决定是否触发计算,并不能根据元素个数进行触发。

也就是说,默认的Trigger盯着的跨度是"时间差"。而不是"个数差"

讲人话就是:

①例如Flink的Trigger默认每隔一天输出统计数据,

②但是不支持默认每隔一千个订单输出统计数据。

但是注意这里的一千个统计数据可能超过一天,甚至超过一周,耗时可能不固定。

因为你想啊,代码都是要把逻辑写死的对吧?

一千个订单可能一开始耗时一周,后来耗时一个月。那程序要怎么根据变化的时间来锁定一千个订单触发一次?

显然做不到,这个时候我们就希望锁定"个数间隔"、“个数差”,这个时候就需要自定义Trigger

官方文档说明:

下面是官方文档[4]中Triggers这一节的内容概括

.

需要override的函数 函数作用
onElement() 数据(element)被加入window的时候会调用该函数
onEventTime() 

当一个注册的Event-Time定时器触发

onProcessingTime()  当一个注册的Processing-Time定时器触发
onMerge()

与有状态触发器(stateful triggers)和当两个窗口整合的时候整合(merge)状态相关。

例如使用session windows

clear() window清理数据需要

前面三个用来设定调用事件(invocation event)以后如何操作,

所以这些"操作"必须是一个TriggerResult

也就是说,前三个函数返回的TriggerResult可以是下面几种选择:

返回的TriggerResult 作用
CONTINUE 什么都不做
FIRE 触发计算
PURGE 删除窗口中的所有数据
FIRE_AND_PURG 触发计算后删除窗口中所有数据

然后是Fire and Purge这一节的内容:

触发计算时,返回的一定是FIRE或者FIRE_AND_PURG(这个话仅仅是来自官方文档的翻译,其实Intellij提示的选项并不仅仅是上面几个)

具体示范代码参考[5]即可


private static Logger LOG = LoggerFactory.getLogger(CountTriggerWithTimeout.class);/*** 窗口最大数据量*/private int maxCount;/*** event time / process time*/private TimeCharacteristic timeType;/*** 用于储存窗口当前数据量的状态对象*/private ReducingStateDescriptor<Long> countStateDescriptor =new ReducingStateDescriptor("counter", new Sum(), LongSerializer.INSTANCE);public CountTriggerWithTimeout(int maxCount, TimeCharacteristic timeType) {this.maxCount = maxCount;this.timeType = timeType;}private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception {clear(window, ctx);return TriggerResult.FIRE_AND_PURGE;}@Overridepublic TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);countState.add(1L);if (countState.get() >= maxCount) {LOG.info("fire with count: " + countState.get());return fireAndPurge(window, ctx);}if (timestamp >= window.getEnd()) {LOG.info("fire with tiem: " + timestamp);return fireAndPurge(window, ctx);} else {return TriggerResult.CONTINUE;}}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {if (timeType != TimeCharacteristic.ProcessingTime) {return TriggerResult.CONTINUE;}if (time >= window.getEnd()) {return TriggerResult.CONTINUE;} else {LOG.info("fire with process tiem: " + time);return fireAndPurge(window, ctx);}}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {if (timeType != TimeCharacteristic.EventTime) {return TriggerResult.CONTINUE;}if (time >= window.getEnd()) {return TriggerResult.CONTINUE;} else {LOG.info("fire with event tiem: " + time);return fireAndPurge(window, ctx);}}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);countState.clear();}/*** 计数方法*/class Sum implements ReduceFunction<Long> {@Overridepublic Long reduce(Long value1, Long value2) throws Exception {return value1 + value2;}}
}

Reference:

[1]flink自定义trigger-实现窗口随意输出

[2]Flink 自定义Trigger

[3]Flink 自定义trigger

[4]flink官方文档-窗口

[5]Flink 自定义触发器

flink自定义trigger详解相关推荐

  1. pandas dataframe中的列进行重新排序、倒排、正排、自定义排序详解及实践

    pandas dataframe中的列进行重新排序,pandas dataframe列重排.倒排.正排.自定义排序详解及实践 实施数据构建: import pandas as pd import nu ...

  2. Android Gradle 自定义Task详解二:进阶

    转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/78523958 本文出自[赵彦军的博客] 系列目录 Android Gradle使用 ...

  3. Android Gradle 自定义Task 详解

    转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/76408024 本文出自[赵彦军的博客] 系列目录 Android Gradle使用 ...

  4. python数据处理常用函数_pytorch中的自定义数据处理详解

    pytorch在数据中采用Dataset的数据保存方式,需要继承data.Dataset类,如果需要自己处理数据的话,需要实现两个基本方法. :.getitem:返回一条数据或者一个样本,obj[in ...

  5. html 自定义打印模板,HTML+CSS入门 自定义模板详解

    本篇教程介绍了HTML+CSS入门 自定义模板详解,希望阅读本篇文章以后大家有所收获,帮助大家HTML+CSS入门.< 首先总的stylecss和大模板都是当初angel_Kitty学姐的,嗯, ...

  6. python构造自定义数据包_pytorch中的自定义数据处理详解

    pytorch在数据中采用Dataset的数据保存方式,需要继承data.Dataset类,如果需要自己处理数据的话,需要实现两个基本方法. :.getitem:返回一条数据或者一个样本,obj[in ...

  7. ASP.NET技巧:GridView控件自定义分页详解第一页

    ASP.NET技巧:GridView控件自定义分页详解 日期:2007年9月11日 作者: 查看:[大字体 中字体 小字体] <script src="../gg/info468.js ...

  8. C语言自定义类型详解

    C语言自定义类型详解 一.结构体 1.结构的声明 2.特殊的声明 3.结构的自引用 4.结构体变量的定义和初始化 5.结构体内存对齐 6.修改默认对齐数 7.结构体传参 二.位段 1.什么是位段 2. ...

  9. android 自定义xmlns,Android xmlns 的作用及其自定义实例详解

    Android xmlns 的作用及其自定义实例详解 xmlns:Android="http://schemas.android.com/apk/res/android的作用是: 这个是xm ...

最新文章

  1. druid拦截器_CMS基于SpringBoot+Shiro+Mybatis+Druid+layui后台管理系统
  2. 网友问答之:AD、DNS转发器、WINS不能自动启动
  3. 成功解决To fix this you could try to: 1. loosen the range of package versions you‘ve specified ​​​​​​​
  4. python外星人入侵飞船上下移动_python外星人入侵游戏左移正常,右移屏幕上不显示,但实际上已经移动了?...
  5. Oracle之索引、权限
  6. PHP 分布式集群中session共享问题以及session有效期的设置
  7. mysql数据库文件位置
  8. Purification(CF-330C)
  9. 万万没想到,AI算法开发、OCR应用已经进入零门槛时代!
  10. 模板上 php dede,DEDE模板中使用php和if判断语句实例
  11. JDK、STS、SVN、Tomcat 、mysql的下载安装及环境变量的配置和sts修改字体大小
  12. CSS 实现半透明边框效果实战
  13. flash 中.Swf 格式的文字 转化为 word 或pdf 或 xps
  14. 码农、程序员、工程师这三者之间有什么区别?
  15. html 走马看花还有vb,走马看花的故事
  16. edp接口规范_edp和lvds区别在哪里
  17. 微信小程序轨迹回放实现及遇到的坑
  18. Windows防火墙导致FTP服务器不能访问的解决方法
  19. 小程序——picker组件
  20. sim卡在苹果手机显示无服务器,iPhone手机没有信号怎么办 手机提示无服务怎么解决...

热门文章

  1. 使用MyBatis Generator自动生成实体、mapper和dao层
  2. Python将JSON格式数据转换为SQL语句以便导入MySQL数据库
  3. 单例模式:Instance
  4. 函数传参和实际应用—JS学习笔记2015-6-5(第49天)
  5. 生成两个表的笛卡尔积
  6. 查找单项链表中间元素,若有相同,取第一个
  7. .NET静态类的概念
  8. 服务器系统使用30金手指,seo专业培训拾首选金手指三:自动设置锚文本的
  9. PHP实现文章的删除,php如何实现删除文章
  10. JS疑惑-1(连续赋值)