一、概述

在Flink中,使用event-time模式时,默认提供的window有TumblingEventTimeWindows,SlidingEventTimeWindows,EventTimeSessionWindow等,其中这些是属于window operator中的一部分,称作 window assigner。window operator包含四个组件,除了 window assigner外,还包括 trigger , evictor, window process。其作用分别如下:

  • window assigner 指明数据流中的数据属于哪个window
  • trigger 指明在哪些条件下触发window计算,基于处理数据时的时间以及事件的特定属性、
  • evictor 可选组件,在window执行计算前或后,将window中的数据移除,如使用
  • globalWindow时,由于该window的默认trigger为永不触发,所以既需要实现自定义trigger,也需要实现evictor,移除部分已经计算完毕的数据。
    window process flink默认提供的有 ReduceFunction,AggragateFunction.还可以自定义实现 windowProcessFunction

二、Trigger 触发器

  • 窗口触发器,决定了窗口什么时候使用窗口函数处理窗口内元素。每个窗口分配器都带有一个默认的触发器。
  • TriggerResult四个值:CONTINUE、FIRE、FIRE_AND_PURGE、PURGE;
    FIRE、FIRE_AND_PURGE区别:FIRE触发计算不清空窗口数据,FIRE_AND_PURGE:触发计算并清空窗口数据;
    如果后面的Function等计算用户自己增量维护状态,可以只接受增量数据则使用FIRE_AND_PURGE;
  • FIRE之后的Function中会受到整个窗口的数据而FIRE_AND_PURGE只会收到增量数据,特别是在一些大窗口大数据量案例中不清理数据可能会oom
  • Flink带有一些内置触发器:
    EventTimeTrigger 窗口默认的Triiger,根据 watermarks 度量的事件时间进度进行触发。
    ProcessingTimeTrigger 窗口默认的Triiger,基于处理时间触发。
    CountTrigger 一旦窗口中的元素数量超过给定限制就会触发,FIRE不清理数据。
    ContinuousEventTimeTrigger 每隔一段时间触发,FIRE不清理数据。
    PurgingTrigger 将其作为另一个触发器的参数,并将其转换为带有清除功能(transforms it into a purging one)。

如下代码展示了,每条数据触发一次计算并清空窗口数据

class UtcTrigger() extends Trigger[MyTime, TimeWindow] {//当每个元素被添加窗口时调用override def onElement(t: MyTime, l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {println("触发器onElement")TriggerResult.FIRE_AND_PURGE}//当注册的处理时间计时器被触发时调用override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {println("触发器onProcessingTime")TriggerResult.CONTINUE}//当注册的事件时间计时器被触发时调用override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {println("触发器onEventTime")TriggerResult.FIRE_AND_PURGE}//在清除(removal)窗口时调用override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {println("触发器clear")}
}

三、Evictor 驱逐器

  • Flink 窗口模型还允许在窗口分配器和触发器之外指定一个可选的驱逐器(Evictor)。
    可以使用 evictor(…) 方法来完成。
  • 驱逐器能够在触发器触发之后,窗口函数使用之前或之后从窗口中清除元素。
    evictBefore()在窗口函数之前使用。而 evictAfter() 在窗口函数之后使用。
    在使用窗口函数之前被逐出的元素将不被处理。
  • Flink带有三种内置驱逐器:
    CountEvictor:在窗口维护用户指定数量的元素,如果多于用户指定的数量,从窗口缓冲区的开头丢弃多余的元素。
    DeltaEvictor:使用 DeltaFunction 和一个阈值,来计算窗口缓冲区中的最后一个元素与其余每个元素之间的差值,并删除差值大于或等于阈值的元素。
    TimeEvictor:以毫秒为单位的时间间隔(interval)作为参数,对于给定的窗口,找到元素中的最大的时间戳max_ts,并删除时间戳小于max_ts - interval的所有元素。
  • 默认情况下,所有内置的驱逐器在窗口函数之前使用。指定驱逐器可以避免预聚合(pre-aggregation),因为窗口内所有元素必须在窗口计算之前传递给驱逐器。
  • Flink 不保证窗口内元素的顺序。这意味着虽然驱逐器可以从窗口开头移除元素,但这些元素不一定是先到的还是后到的。
class MyEvictor() extends Evictor[MyTime, TimeWindow] {override def evictBefore(iterable: lang.Iterable[TimestampedValue[MyTime]], i: Int, w: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {val ite: util.Iterator[TimestampedValue[MyTime]] = iterable.iterator()while (ite.hasNext) {val elment: TimestampedValue[MyTime] = ite.next()//指定事件事件获取到的就是事件时间println("驱逐器获取到的时间:" + elment.getTimestamp)//模拟去掉非法参数数据if (elment.getValue.timestamp <= 0) {ite.remove()}}}override def evictAfter(iterable: lang.Iterable[TimestampedValue[MyTime]], i: Int, w: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {}
}

Flink之Trigger与Evictor相关推荐

  1. flink自定义trigger详解

    适用的场景解释: [1]中有句话是这样的: "其实,我们要实现基于事件时间的窗口随意输出,比如1000个元素触发一次输出,那么我们就可以通过修改这个触发器来实现." 这句话的意思是 ...

  2. Flink中Trigger的介绍及使用

    Flink中Trigger的介绍及使用 Flink中的Trigger用来确认一个窗口是否应该出发结果的计算,每个windowAssigner都有一个默认的Trigger,先来看看Trigger的定义及 ...

  3. Flink之Trigger

      Flink是真正的实时处理,数据流入flink的source之后,假如需要窗口函数,我们就要使用一定的规则来判断或者叫决定该数据应该属于哪个窗口,然后是窗口要是基于事件时间的话我们还要提供时间戳抽 ...

  4. 【Flink】Flink 自定义 trigger并且进行分析

    1.概述 对于flink的窗口操作,尤其是基于事件时间的窗口操作,大家还要掌三个重要的知识点: 窗口分配器:就是决定着流入flink的数据,该属于哪个窗口. 时间戳抽取器/watermark生成器:抽 ...

  5. flink 自定义trigger

    背景:一般情况下, 窗口操作都有默认的窗口触发器,有时候默认的trigger不满足条件,就需要我们自己去定义相应的trigger 去决定处理窗口数据的时间. Trigger抽象类 触发器接口有五种方法 ...

  6. Flink Window基本概念与实现原理

    Window意为窗口.在流处理系统中数据源源不断流入到系统,我们可以逐条处理流入的数据,也可以按一定规则一次处理流中的多条数据.当处理数据时程序需要知道什么时候开始处理.处理哪些数据.窗口提供了这样一 ...

  7. Flink – window operator

    参考, http://wuchong.me/blog/2016/05/25/flink-internals-window-mechanism/ http://wuchong.me/blog/2016/ ...

  8. Flink笔记--深度全面总结

    1.Flink 基础 1.1.Flink特性 流式计算是大数据计算的痛点,第1代实时计算引擎Storm对Exactly Once 语义和窗口支持较弱,使用的场景有限且无法支持高吞吐计算:Spark S ...

  9. 彻底搞清 Flink 中的 Window 机制

    [CSDN 编者按]Window是处理无限流的核心.Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理.Flink提 ...

最新文章

  1. Python3.5源码分析-垃圾回收机制
  2. 7月17日云栖精选夜读:深度 | 两个案例,掌握AI在大数据领域的前沿应用
  3. Action访问Servlet API的三种方法
  4. 在.net中读写XML方法的总结[转]
  5. 如何当好独立CIO(1)
  6. 最近的一些感想(关于移动客户端开发android,ios)
  7. 引入Hub再生的最短帧长及主机之间距离的最大值计算
  8. 第二周函数-的基本格式:
  9. Netty ChannelBuffer
  10. logstash之codec插件
  11. 草根站长建站需要掌握或者了解的5种技术
  12. 转:使用java生成数字验证码
  13. 【KnockoutJS】KnockoutJS 绑定列表数据。实现表头合并,列生成,图片上传等功能
  14. 从软件的价值体系开始向技术的反向分析
  15. 永恒之蓝漏洞紧急应对方案
  16. CSP-S2020总结
  17. matlab读取txt函数,matlab读取txt某一行
  18. android 支付宝第三方应用授权,支付宝开发平台—第三方应用授权
  19. 睡眠不好易失眠怎么回事?中医怎么辨证调理失眠
  20. OL实现属性查询的功能

热门文章

  1. Scrapy爬取图片网站——最详细的入门爬虫教程,新手入门干货,不进来看一下?
  2. zabbix 监控域名注册到期时间
  3. 触发浏览器回流的属性方法一览表
  4. 龙芯开始支持主流开发栈: Java, .Net以及NodeJS
  5. 这些学校招收!!!应用统计学专硕(025200),你知道吗?
  6. 考研 英语一 大作文-图画作文 (一)----第一段描述图画写作攻略
  7. 力挺 Linux 是一种怎样的体验?
  8. tortoise git 更换邮箱和账号时,报错because commituserEmail is not oncorrect
  9. java web下载文件设置
  10. vert.x中future的简单使用