Event Time语义下我们使用Watermark来判断数据是否迟到。一个迟到元素是指元素到达窗口算子时,该元素本该被分配到某个窗口,但由于延迟,窗口已经触发计算。目前Flink有三种处理迟到数据的方式:直接将迟到数据丢弃

将迟到数据发送到另一个流

重新执行一次计算,将迟到数据考虑进来,更新计算结果

将迟到数据丢弃

如果不做其他操作,默认情况下迟到数据会被直接丢弃。

将迟到数据发送到另外一个流

如果想对这些迟到数据处理,我们可以使用Flink的侧输出(Side Output)功能,将迟到数据发到某个特定的流上。后续我们可以根据业务逻辑的要求,对迟到的数据流进行处理。

// 数据流有三个字段:(key, 时间戳, 数值)val input: DataStream[(String, Long, Int)] = ...

val mainStream = input.keyBy(item => item._1)

.timeWindow(Time.seconds(5))

// 将输出写到late-elements里 .sideOutputLateData(new OutputTag[(String, Long, Int)]("late-elements"))

.aggregate(new CountAggregate)

// 接受late-elements,形成一个数据流val lateStream: DataStream[(String, Long, Int)] = mainStream.getSideOutput(new OutputTag[(String, Long, Int)]("late-elements"))

上面的代码将迟到的内容写进名为“late-elements”的OutputTag下,之后使用getSideOutput获取这些迟到的数据。

更新计算结果

对于迟到数据,使用上面两种方法,都对计算结果的正确性有影响。如果将数据流发送到单独的侧输出,我们仍然需要完成单独的处理逻辑,相对比较复杂。更理想的情况是,将迟到数据重新进行一次,得到一个更新的结果。 allowedLateness允许用户在Event Time下对某个窗口先得到一个结果,如果在一定时间内有迟到数据,迟到数据会和之前的数据一起重新被计算,以得到一个更准确的结果。使用这个功能时需要注意,原来窗口中的状态数据在窗口已经触发的情况下仍然会被保留,否则迟到数据到来后也无法与之前数据融合。另一方面,更新的结果要以一种合适的形式输出到外部系统,或者将原来结果覆盖,或者同时保存且有时间戳以表明来自更新后的计算。比如,我们的计算结果是一个键值对(Key-Value),我们可以把这个结果输出到Redis这样的KV数据库中,使用某些Reids命令,对于同一个Key下,旧的结果被新的结果所覆盖。

如果不明确调用allowedLateness,默认的允许延迟的参数是0。如果对一个Processing Time下的程序使用allowedLateness,将引发异常。

// ProcessWindowFunction接收的泛型参数分别为:[输入类型、输出类型、Key、Window]class AllowedLatenessFunction extends ProcessWindowFunction[

(String, Long, Int), (String, String, Int, String), String, TimeWindow] {

override def process(key: String,

context: Context,

elements: Iterable[(String, Long, Int)],

out: Collector[(String, String, Int, String)]): Unit = {

// 是否被迟到数据更新 val isUpdated = context.windowState.getState(

new ValueStateDescriptor[Boolean]("isUpdated", Types.of[Boolean])

)

val count = elements.size

val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

if (isUpdated.value() == false) {

// 第一次使用process函数时, Boolean默认初始化为false,因此窗口函数第一次被调用时会进入这里 out.collect((key, format.format(Calendar.getInstance().getTime), count, "first"))

isUpdated.update(true)

} else {

// 之后isUpdated被置为true,窗口函数因迟到数据被调用时会进入这里 out.collect((key, format.format(Calendar.getInstance().getTime), count, "updated"))

}

}

}

// 数据流有三个字段:(key, 时间戳, 数值)val input: DataStream[(String, Long, Int)] = ...

val allowedLatenessStream = input.keyBy(item => item._1)

.timeWindow(Time.seconds(5))

.allowedLateness(Time.seconds(5))

.process(new AllowedLatenessFunction)

在上面的代码中,我们设置的窗口为5秒,5秒结束后,窗口计算会被触发,生成第一个计算结果。allowedLateness设置窗口结束后还要等待长为lateness的时间,某个迟到元素的Event Time大于窗口结束时间但是小于结束时间+lateness,该元素仍然会被加入到该窗口中。每新到一个迟到数据,迟到数据被加入ProcessWindowFunction的缓存中,窗口的Trigger会触发一次FIRE,窗口函数被重新调用一次,计算结果得到一次更新。

需要注意的是,会话窗口依赖Session gap来切分窗口,使用了allowedLateness可能会导致两个窗口合并成一个窗口。

Flink时间系列文章:

flink 三种时间机制_Flink时间系列:Event Time下如何处理迟到数据相关推荐

  1. gtest之断言宏的使用以及三种事件机制

    前面简单的介绍了一下gtest以及在Windows下如何安装gtest,今天再介绍一下在Linux下该如何安装. 本文重点: 在Linux下如何安装gtest: gtest下断言宏的介绍: gtest ...

  2. flink 三种时间机制_Flink1.10入门:时间机制简介

    一.概述 上篇文章介绍了Window窗口机制的相关知识,这里我们介绍下Flink的另外一个核心概念"Event Time机制",本篇文章只介绍相关概念不讲实战,实战会结合Windo ...

  3. 一文读懂三种并发控制机制(封锁、时间戳、有效性确认,大量例子+证明)

    文章目录 并发控制 概述 事务特性 定义 并发控制机制 串行调度和可串行调度 调度 串行调度 可串行化调度 事务和调度的记法 冲突可串行化 冲突 优先图 证明 使用锁的可串行化实现 锁 封锁调度器 两 ...

  4. 【Java】三种等待机制

    [AT]三种等待机制 一. 等待机制种类 二. 三种等待机制应用场景 三.显示等待介绍 3.1 相关概念 3.2 显示等待用到的两个类 3.2.1 WebDriverWait 3.2.2 Expect ...

  5. JDK/Dubbo/Spring 三种 SPI 机制,谁更好?

    点击关注公众号,Java干货及时送达 来源:juejin.cn/post/6950266942875779108 SPI 全称为 Service Provider Interface,是一种服务发现机 ...

  6. java同步异步调用_详解java 三种调用机制(同步、回调、异步)

    1:同步调用:一种阻塞式调用,调用方要等待对方执行完毕才返回,jsPwwCe它是一种单向调用 2:回调:一种双向调用模式,也就是说,被调用方在接口被调用时也会调用对方的接口: 3:异步调用:一种类似消 ...

  7. JDK/Dubbo/Spring 三种 SPI 机制,谁更好呢?

    JDK/Dubbo/Spring 三种 SPI 机制,谁更好? SPI 全称为 Service Provider Interface,是一种服务发现机制.SPI 的本质是将接口实现类的全限定名配置在文 ...

  8. Yarn的三种资源调度机制

    在企业中并不是只有一个人来执行MapReduce程序单独使用Yarn的资源,实际开发中,会有很多人一起使用Yarn这个资源,如果每个人都提交了job,这个时候Yarn就需要进行调度去分配资源给job, ...

  9. Spring学习之Spring三种装配机制:(一)自动化装配bean

    装配:创建应用组件(对象)之间的协作的行为,这也是Spring依赖注入(DI)的本质. Spring提供了三种装配机制: 隐式的自动装配bean: 通过java代码装配bean(显示): 通XML中装 ...

最新文章

  1. 用css3简单实现进度条
  2. BZOJ1003: [ZJOI2006]物流运输
  3. 重新认识mysql基本知识
  4. linux进程运行队列,Linux进程调度中队列的使用
  5. 苹果电脑投屏到电视_最全小米电视投屏官方教程公布:手机、PC、APP通吃
  6. leveldb登山之路——cache
  7. mysql--MySQL数据库的简单认识
  8. 被坑的过来人告诉你,为什么数据中台永远都搞不成?
  9. 1、【软件测试工具安装教程】
  10. 使用uniapp获取当前位置
  11. Vue动态渲染echarts图表
  12. [转]Form中控制Tab画布不同标签间切换的方法
  13. 网站浏览器崩溃原因分析
  14. mysql直方图_MySQL · 特性分析 · 直方图的实现与分析
  15. u3m8缓存文件.ts合成mp4
  16. Python:利用matplotlib库画各种统计图
  17. Google浏览器网页,大部分网页出现无法加载样式
  18. 屏幕亮度自动调节的实现
  19. 如何理解代码覆盖率?
  20. 高度近视患者担心视网膜脱落,是不是杞人忧天?

热门文章

  1. PyTorch系列入门到精通——生成对抗网络一瞥
  2. springboot 启动后打印_SpringBoot实战(五):配置健康检查与监控
  3. 朋友圈发图多大不会被压缩_类风湿会不会引发肾病?会!本文告诉你对内脏的伤害有多大...
  4. CCF认证-2014-12-2 Z字形扫描
  5. 吴恩达机器学习作业Python实现(八):异常检测和推荐系统
  6. BP(反向传播)神经网络
  7. 测试管理工具QC第一篇-QC安装步骤(史上最详细的图解过程)-第一篇
  8. Android调用相册、相机(兼容6.0、7.0、8.0)
  9. 性能测试:基础(1)
  10. 第二:Postman做各种类型的http接口测试