Flink迟到数据输出到测输出流
一、迟到的数据如何处理?
Event Time语义下我们使用Watermark来判断数据是否迟到。一个迟到元素是指元素到达窗口算子时,该元素本该被分配到某个窗口,但由于延迟,窗口已经触发计算。目前Flink有三种处理迟到数据的方式:
(1)直接将迟到数据丢弃:会造成数据丢失
(2)将迟到数据发送到另一个流:输出到侧输出流,保证数据的完整性,可更新计算结果
(3)重新执行一次计算,将迟到数据考虑进来,更新计算结果:数据准确率高,保证数据完整性
二、业务实现:将迟到数据输出到侧输出流
import org.apache.flink.api.common.functions.{AggregateFunction}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object AllowedLatenessDemo {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 使用eventTimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream= env.socketTextStream("flink101", 8888).map(line => {var arr = line.split(",")Log(arr(0).trim,arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)})val ds = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Log](Time.seconds(2)){override def extractTimestamp(element: Log): Long = {element.callTime // EventTime}})// 定义一个侧输出流的标签val lateTag = new OutputTag[Log]("late")// 分组,开窗val result = ds.keyBy(_.sid).timeWindow(Time.seconds(10), Time.seconds(5))// 设置迟到的数据超过了2秒的情况下,交给AllowedLateness处理// 也分两种情况,第一种:允许数据迟到5秒(迟到2-5秒),再次迟到触发窗口函数,触发的条件是 watermark < end-of-window + AllowedLateness// 第二种:迟到的数据在5秒以上,输出到侧流中.allowedLateness(Time.seconds(5)) // 运行数据迟到5秒,还可以触发窗口.sideOutputLateData(lateTag).aggregate(new MyAggregateCountFunction, new OutputResultWindowFunction) // 窗口聚合函数// 正常的结果数据result.print("normal data")result.getSideOutput(lateTag).print("late data") // 迟到时间超过5秒的数据,根据业务做处理,如果正常数据存储到mysql中,迟到的数据需要进行updateenv.execute("AllowedLatenessDemo")}// 统计通话的次数class MyAggregateCountFunction extends AggregateFunction[Log, Long, Long] {override def createAccumulator(): Long = 0override def add(in: Log, acc: Long): Long = acc + 1override def getResult(acc: Long): Long = accoverride def merge(acc: Long, acc1: Long): Long = acc + acc1}// AggregateFunction 输出是这个函数的输入class OutputResultWindowFunction extends WindowFunction[Long, String, String, TimeWindow] {override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[String]): Unit = {var value = input.iterator.next()var sb = new StringBuildersb.append("窗口的范围:").append(window.getStart).append("--").append(window.getEnd).append(", 通话的次数是: ").append(value)out.collect(sb.toString())}}
}
Flink迟到数据输出到测输出流相关推荐
- flink 处理迟到数据(Trigger、设置水位线延迟时间、允许窗口处理迟到数据、将迟到数据放入侧输出流、代码示例、迟到数据触发窗口计算重复结果处理)
文章目录 前言 1.Trigger 2.处理迟到数据 2.1 设置水位线延迟时间 2.2 允许窗口处理迟到数据 2.3 将迟到数据放入侧输出流 3.实操 3.1 代码示例 3.2 中间遇到的异常 3. ...
- Flink中迟到数据的处理
目录 设置水位线延迟时间 允许窗口处理迟到数据 将迟到数据放入窗口侧输出流 总结: 我们知道,所谓的"迟到数据"(late data),是指某个水位线之后到来的数据 ...
- Flink对迟到数据的处理的三种方式
** Flink对迟到数据的处理 ** 水位线可以用来平衡计算的完整性和延迟两方面.除非我们选择一种非常保守的水位线策略(最大延时设置的非常大,以至于包含了所有的元素,但结果是非常大的延迟),否则我们 ...
- Flink-时间和窗口(水位线、窗口、迟到数据的处理等)
文章目录 时间和窗口 时间 水位线(Watermark) 时间和窗口 水位线 有序和无序流的插入 水位线生成策略(Watermark Strategies) 水位线的传递 窗口(Window) 窗口 ...
- flink笔记8(接笔记7——窗口(Window),迟到数据的处理)
flink 3. 窗口(Window) (1)窗口的概念 (2)窗口的分类 (3)窗口 API 概览 (4)窗口分配器(Window Assigners) (5)窗口函数(Window Functio ...
- Flink迟到数据处理
一.迟到的数据如何处理? Event Time语义下我们使用Watermark来判断数据是否迟到.一个迟到元素是指元素到达窗口算子时,该元素本该被分配到某个窗口,但由于延迟,窗口已经触发计算.目前Fl ...
- Flink 窗口函数(Window Functions)处理迟到数据
文章目录 将迟到的数据放入侧输出流 Lambda架构:用一个流处理器,先快速的得到一个正确,近似正确的结果,然后在另外一层是一个批处理器,然后在它是一直等着的,等所有数据都到齐了,计算出一个最终准确的 ...
- Flink之乱序处理,时间语义,WaterMark,允许迟到数据,侧输出流
一.理解Flink的乱序问题 理解Flink的乱序问题,的先理解Flink的时间语义. Flink有3中时间语义:Event Time:事件创建的时间Ingestion Time:数据进入Flink的 ...
- 【学习笔记 — Flink 处理迟到数据(★)】
Flink 处理迟到数据(★) 处理迟到数据之前首先了解Lambda架构 Lambda架构的实现是:一个批处理器.一个流处理器.流处理器首先实时输出近似正确的结果(因为乱序流,可能导致流处理结果不准确 ...
最新文章
- oracle外部表使用详解,详解Oracle外部表的一次维护(图文)
- SDUTOJ2779_找朋友(BFS | | DFS双解法)
- android 一个很漂亮的控件ObservableScrollView(含片段代码和源码)
- canvas图像保存
- hadoop元数据合并过程_Hadoop元数据合并异常及解决方法
- function “printf“ declared implicitly
- 使用Asp.net Core3Blazor 的全栈式网站开发体验
- 汇编语言之转移指令和原理
- Java学习笔记之:Java String类
- Java-自增自减运算符 初始Math类
- Java串口通信具体解释
- mysql统计age大于20的数_数据库命令记录
- AcWing1075. 数字转换(树形DP)题解
- jQuery实现Ajax
- 游戏筑基开发之栈、队列及基本功能实现(使用C语言链表的相关知识)
- 浅谈人工智能(AI)
- 一阶倒立摆神经网络控制matlab,BP神经网络在一级倒立摆 控制系统中的应用设计...
- 什么是 CDN 边缘服务器 - Edge Server
- css3边框交替动画_用纯CSS3制作的效果非常炫酷的元素边框线条动画特效
- centos8安装smplayer