一、迟到的数据如何处理?

Event Time语义下我们使用Watermark来判断数据是否迟到。一个迟到元素是指元素到达窗口算子时,该元素本该被分配到某个窗口,但由于延迟,窗口已经触发计算。目前Flink有三种处理迟到数据的方式:
(1)直接将迟到数据丢弃:会造成数据丢失
(2)将迟到数据发送到另一个流:输出到侧输出流,保证数据的完整性,可更新计算结果
(3)重新执行一次计算,将迟到数据考虑进来,更新计算结果:数据准确率高,保证数据完整性

二、业务实现:将迟到数据输出到侧输出流

import org.apache.flink.api.common.functions.{AggregateFunction}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object AllowedLatenessDemo {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 使用eventTimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream= env.socketTextStream("flink101", 8888).map(line => {var arr = line.split(",")Log(arr(0).trim,arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)})val ds = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Log](Time.seconds(2)){override def extractTimestamp(element: Log): Long = {element.callTime  // EventTime}})// 定义一个侧输出流的标签val lateTag = new OutputTag[Log]("late")// 分组,开窗val result = ds.keyBy(_.sid).timeWindow(Time.seconds(10), Time.seconds(5))// 设置迟到的数据超过了2秒的情况下,交给AllowedLateness处理// 也分两种情况,第一种:允许数据迟到5秒(迟到2-5秒),再次迟到触发窗口函数,触发的条件是 watermark < end-of-window + AllowedLateness// 第二种:迟到的数据在5秒以上,输出到侧流中.allowedLateness(Time.seconds(5)) // 运行数据迟到5秒,还可以触发窗口.sideOutputLateData(lateTag).aggregate(new MyAggregateCountFunction, new OutputResultWindowFunction)  // 窗口聚合函数// 正常的结果数据result.print("normal data")result.getSideOutput(lateTag).print("late data")  // 迟到时间超过5秒的数据,根据业务做处理,如果正常数据存储到mysql中,迟到的数据需要进行updateenv.execute("AllowedLatenessDemo")}// 统计通话的次数class MyAggregateCountFunction extends AggregateFunction[Log, Long, Long] {override def createAccumulator(): Long = 0override def add(in: Log, acc: Long): Long = acc + 1override def getResult(acc: Long): Long = accoverride def merge(acc: Long, acc1: Long): Long = acc + acc1}// AggregateFunction 输出是这个函数的输入class OutputResultWindowFunction extends WindowFunction[Long, String, String, TimeWindow] {override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[String]): Unit = {var value = input.iterator.next()var sb = new StringBuildersb.append("窗口的范围:").append(window.getStart).append("--").append(window.getEnd).append(", 通话的次数是: ").append(value)out.collect(sb.toString())}}
}

Flink迟到数据输出到测输出流相关推荐

  1. flink 处理迟到数据(Trigger、设置水位线延迟时间、允许窗口处理迟到数据、将迟到数据放入侧输出流、代码示例、迟到数据触发窗口计算重复结果处理)

    文章目录 前言 1.Trigger 2.处理迟到数据 2.1 设置水位线延迟时间 2.2 允许窗口处理迟到数据 2.3 将迟到数据放入侧输出流 3.实操 3.1 代码示例 3.2 中间遇到的异常 3. ...

  2. Flink中迟到数据的处理

    目录 设置水位线延迟时间 允许窗口处理迟到数据 将迟到数据放入窗口侧输出流 总结:         我们知道,所谓的"迟到数据"(late data),是指某个水位线之后到来的数据 ...

  3. Flink对迟到数据的处理的三种方式

    ** Flink对迟到数据的处理 ** 水位线可以用来平衡计算的完整性和延迟两方面.除非我们选择一种非常保守的水位线策略(最大延时设置的非常大,以至于包含了所有的元素,但结果是非常大的延迟),否则我们 ...

  4. Flink-时间和窗口(水位线、窗口、迟到数据的处理等)

    文章目录 时间和窗口 时间 水位线(Watermark) 时间和窗口 水位线 有序和无序流的插入 水位线生成策略(Watermark Strategies) 水位线的传递 窗口(Window) 窗口 ...

  5. flink笔记8(接笔记7——窗口(Window),迟到数据的处理)

    flink 3. 窗口(Window) (1)窗口的概念 (2)窗口的分类 (3)窗口 API 概览 (4)窗口分配器(Window Assigners) (5)窗口函数(Window Functio ...

  6. Flink迟到数据处理

    一.迟到的数据如何处理? Event Time语义下我们使用Watermark来判断数据是否迟到.一个迟到元素是指元素到达窗口算子时,该元素本该被分配到某个窗口,但由于延迟,窗口已经触发计算.目前Fl ...

  7. Flink 窗口函数(Window Functions)处理迟到数据

    文章目录 将迟到的数据放入侧输出流 Lambda架构:用一个流处理器,先快速的得到一个正确,近似正确的结果,然后在另外一层是一个批处理器,然后在它是一直等着的,等所有数据都到齐了,计算出一个最终准确的 ...

  8. Flink之乱序处理,时间语义,WaterMark,允许迟到数据,侧输出流

    一.理解Flink的乱序问题 理解Flink的乱序问题,的先理解Flink的时间语义. Flink有3中时间语义:Event Time:事件创建的时间Ingestion Time:数据进入Flink的 ...

  9. 【学习笔记 — Flink 处理迟到数据(★)】

    Flink 处理迟到数据(★) 处理迟到数据之前首先了解Lambda架构 Lambda架构的实现是:一个批处理器.一个流处理器.流处理器首先实时输出近似正确的结果(因为乱序流,可能导致流处理结果不准确 ...

最新文章

  1. oracle外部表使用详解,详解Oracle外部表的一次维护(图文)
  2. SDUTOJ2779_找朋友(BFS | | DFS双解法)
  3. android 一个很漂亮的控件ObservableScrollView(含片段代码和源码)
  4. canvas图像保存
  5. hadoop元数据合并过程_Hadoop元数据合并异常及解决方法
  6. function “printf“ declared implicitly
  7. 使用Asp.net Core3Blazor 的全栈式网站开发体验
  8. 汇编语言之转移指令和原理
  9. Java学习笔记之:Java String类
  10. Java-自增自减运算符 初始Math类
  11. Java串口通信具体解释
  12. mysql统计age大于20的数_数据库命令记录
  13. AcWing1075. 数字转换(树形DP)题解
  14. jQuery实现Ajax
  15. 游戏筑基开发之栈、队列及基本功能实现(使用C语言链表的相关知识)
  16. 浅谈人工智能(AI)
  17. 一阶倒立摆神经网络控制matlab,BP神经网络在一级倒立摆 控制系统中的应用设计...
  18. 什么是 CDN 边缘服务器 - Edge Server
  19. css3边框交替动画_用纯CSS3制作的效果非常炫酷的元素边框线条动画特效
  20. centos8安装smplayer

热门文章

  1. 阿里P7晒出工资单:狠补了这个,真香...
  2. UGUI血条渐渐减掉实现
  3. python中的自定义函数
  4. 2. Mybatis流程
  5. 【智商都是硬伤】分析与逻辑思考能力测试
  6. Java并发编程实战——显示锁
  7. 服务器端控件TextBox 设为只读属性后无法获取javascript给其赋的值
  8. 微软的某些东西,确实不敢恭维,其实,它可以做得更好
  9. NetworkStream
  10. IDEA 自定义代码片段/模板