触发器(Triggers)

触发器确定窗口(由窗口分配器形成)何时准备好由窗口功能处理。每个WindowAssigner都带有一个默认触发器。如果默认触发器不适合您的需求,则可以使用trigger(...)指定自定义触发器。

trigger触发器接口有五个方法允许trigger对不同的事件做出反应:

  • onElement()进入窗口的每个元素都会调用该方法。

  • onEventTime()事件时间timer触发的时候被调用。

  • onProcessingTime()处理时间timer触发的时候会被调用。

  • onMerge()有状态的触发器相关,并在它们相应的窗口合并时合并两个触发器的状态,例如使用会话窗口。

  • clear()该方法主要是执行窗口的删除操作。

关于上述方法需要注意两点:

1).前三方法决定着如何通过返回一个TriggerResult来操作输入事件。

CONTINUE:什么都不做。

FIRE:触发计算。

PURE:清除窗口的元素。

FIRE_AND_PURE:触发计算和清除窗口元素。

2). 这些方法中的任何一个都可用于为将来的操作注册处理或事件时间计时器

Fire和Purge

一旦触发器确定窗口已准备好进行处理,它将触发,即返回FIRE或FIRE_AND_PURGE。这是窗口操作员发出当前窗口结果的信号。给定一个带有ProcessWindowFunction的窗口,所有元素都将传递给ProcessWindowFunction(可能在将它们传递给逐出者之后)。具有ReduceFunction,AggregateFunction或FoldFunction的Windows只会发出其急切的聚合结果。

当触发器触发时,它可以是FIRE或FIRE_AND_PURGE。在FIRE保留窗口内容的同时,FIRE_AND_PURGE删除其内容。默认情况下,预实现的触发器仅触发FIRE,而不会清除窗口状态。

注意⚠️:
清除将仅删除窗口的内容,并将保留有关该窗口的任何潜在元信息以及任何触发状态。

默认触发器

WindowAssigner的默认触发器适用于许多用例。例如,所有事件时间窗口分配器都有一个EventTimeTrigger作为默认触发器。一旦WaterMark通过窗口的末端,该触发器便会触发。

注意⚠️:
GlobalWindow的默认触发器是NeverTrigger,它从不触发。因此,在使用GlobalWindow时,您始终必须定义一个自定义触发器。
通过使用trigger()指定触发器,您将覆盖WindowAssigner的默认触发器。例如,如果为TumblingEventTimeWindows指定CountTrigger,
则将不再基于时间进度而是仅通过计数来获取窗口触发。现在,如果要基于时间和计数做出反应,则必须编写自己的自定义触发器。

内置和自定义触发器

Flink带有一些内置触发器。

  • EventTimeTrigger基于事件时间和watermark机制来对窗口进行触发计算。

  • ProcessingTimeTrigger基于处理时间触发。

  • CountTrigger窗口元素数超过预先给定的限制值的话会触发计算。

  • PurgingTrigger作为其它trigger的参数,将其转化为一个purging触发器。

如果需要实现自定义触发器,则应该实现Trigger类。请注意,API仍在不断发展,并可能在Flink的未来版本中更改。

import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;/*** <p/>* <li>title: DataStream 触发器</li>* <li>@author: li.pan</li>* <li>Date: 2019/12/29 5:00 下午</li>* <li>Version: V1.0</li>* <li>Description: 自定义元素个数触发器</li>*/
public class CustomProcessingTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private CustomProcessingTimeTrigger() {}private static int flag = 0;@Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {ctx.registerProcessingTimeTimer(window.maxTimestamp());// CONTINUE是代表不做输出,也即是,此时我们想要实现比如100条输出一次,// 而不是窗口结束再输出就可以在这里实现。if(flag > 9){flag = 0;return TriggerResult.FIRE;}else{flag++;}System.out.println("onElement : "+element);return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {return TriggerResult.FIRE_AND_PURGE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {return TriggerResult.FIRE_AND_PURGE;}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteProcessingTimeTimer(window.maxTimestamp());}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(TimeWindow window, OnMergeContext ctx) {// only register a timer if the time is not yet past the end of the merged window// this is in line with the logic in onElement(). If the time is past the end of// the window onElement() will fire and setting a timer here would fire the window twice.long windowMaxTimestamp = window.maxTimestamp();if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {ctx.registerProcessingTimeTimer(windowMaxTimestamp);}}@Overridepublic String toString() {return "ProcessingTimeTrigger()";}/*** 创建一个自定义触发器对象*/public static CustomProcessingTimeTrigger create() {return new CustomProcessingTimeTrigger();}}

Flink触发器Triggers相关推荐

  1. 老猿学5G:融合计费基于QoS流计费QBC的触发器Triggers

    ☞ ░ 前往老猿Python博文目录 ░ 一.引言 SMF中的功能体CTF在用户上网时达到一定条件就会向CHF上报流量,而CTF什么时候触发流量上报是由CTF中的触发器来控制的.在<老猿学5G: ...

  2. WPF 触发器Triggers

    这一篇,总结Triggers. FrameworkElement.Style.ControlTemplate和DataTemplate都具有一个类型为TriggerCollection 的Trigge ...

  3. WPF 触发器Triggers

    这一篇,总结Triggers. FrameworkElement.Style.ControlTemplate和DataTemplate都具有一个类型为TriggerCollection 的Trigge ...

  4. WPF之触发器Triggers

    Trigger触发器:表示一个触发器,它按单个条件应用属性值或执行操作. SourceName属性: 获取或设置与导致关联的 setter 要应用的属性对象的名称. Property属性: 设置需要判 ...

  5. (02) 任务(Jobs)和触发器(Triggers)

    Quart 的 API Quartz API 中的关键接口和类如下: IScheduler-与调度器(scheduler)进行交互的主要 API: IJob-被组件继承和实现,由调度器来执行的接口: ...

  6. WPF系列教程(二十九):触发器Triggers、MultiTrggers、EventTrigger——属性触发器、多触发器、事件触发器

    使用触发器可以自动完成简单的样式改变. 项目源码 触发器 在Style定义时使用Style.Triggers属性来实现: <!--设置触发器--> <Style.Triggers&g ...

  7. 老猿学5G:融合计费基于流计费的触发器Triggers

    ☞ ░ 前往老猿Python博文目录 ░ 一.概述 每个触发条件都是一个可计费事件.SMF中的功能体CTF在用户上网时达到一定条件就会向CHF上报流量,而CTF什么时候触发流量上报是由CTF中的触发器 ...

  8. oracle apex触发器,triggers - 插入之前/更新Apex触发器之前,测试覆盖率失败 - 堆栈内存溢出...

    在机会上插入/更新触发器之前,我有一个非常简单的方法,它可以根据包含销售处(州)位置信息的下拉值自动选择价目表. 这是我的触发器 : trigger SelectPriceBook on Opport ...

  9. zabbix探究告警触发器Triggers

    Triggers函数的分类 功能 函数 值的比较与查找 abschange.delta.diff.band.change.nodata.last.prev 值的计算 max.min.avg.sum.c ...

最新文章

  1. html怎么移动文字的位置,css怎么移动文字
  2. oracle表空间最大30G?如果一张表超过30G怎么办
  3. 20应用统计考研复试要点(part38)--概率论与数理统计
  4. 人口预测和阻尼-增长模型_使用分类模型预测利率-第2部分
  5. 大专计算机应用技术答辩,计算机应用技术专业硕士答辩.ppt
  6. ICCV 2019 | 基于全局类别表征的小样本学习
  7. supersr--图形上下文的注意点
  8. 51单片机实验-蜂鸣器播放音乐
  9. Java:结合JavaSocket编程开发文本处理程序
  10. Python中IO编程-StringIO和BytesIO
  11. LinuxQt打包发布
  12. DTMF信号检测分析(Matlab)
  13. 计算机的按cpu分类,英特尔处理器分类有哪些 英特尔处理器分类详解
  14. L Norms 范数
  15. CH340G的调试过程
  16. 企业加速推进数字化转型,程序员进国企靠谱吗?
  17. 为什么商家有了收款二维码还要使用聚合支付?
  18. 估计量的无偏性,有效性和一致性
  19. Cascaded Shadow Map(CSM)中的一些问题
  20. 免费html5代码,HTML5(示例代码)

热门文章

  1. 深入探索Android内存优化(炼狱级别)
  2. 不看遗憾啊,韩版《大长今》中文音译,天才之作!
  3. 应该怎样学习Unity3D
  4. Pandas 根据category自定义排序
  5. 中职计算机老师的一天,信息技术教师的一天
  6. windows10计算机无法启动不了,windows10不能开机怎么办
  7. 美的大数据挖掘笔试总结
  8. 2020年R2移动式压力容器充装多少分及格及R2移动式压力容器充装考试申请表
  9. 服务器ui制作盘启动,U启动制作U盘启动盘详细教程
  10. 接口中的变量为什么不能是普通变量,只能是static final