方式1:设置水位线延迟时间

水位线延迟设置,一般设置为毫秒到秒级别。

SingleOutputStreamOperator<Event> watermarks = streamOperator.assignTimestampsAndWatermarks(// 方案1:设置水位线延迟时间2sWatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timeStamp;}}));

方式2:允许窗口处理迟到数据

由于大部分乱序数据已经被水位线的延迟等到了,所以往往迟到的数据不会太多。这样,我们会在水位线到达窗口结束时间时,先快速地输出一个近似正确的计算结果; 然后保持窗口继续等到延迟数据,每来一条数据,窗口就会再次计算,并将更新后的结果输出。这样就可以逐步修正计算结果,最终得到准确的统计值了。

SingleOutputStreamOperator<String> result = watermarks.keyBy(data -> data.user).window(TumblingEventTimeWindows.of(Time.seconds(10)))// 方案2:允许窗口处理迟到1分钟的数据.allowedLateness(Time.minutes(1))

方案3:迟到的数据放在侧输出流

即使我们有了前面的双重保证,可窗口不能一直等下去,最后总要真正关闭。窗口一旦关闭,后续的数据就都要被丢弃了。那如果真的还有漏网之鱼又该怎么办呢?
那就要用到最后一招了:用窗口的侧输出流来收集关窗以后的迟到数据。这种方式是最后“兜底”的方法,只能保证数据不丢失;因为窗口已经真正关闭,所以是无法基于之前窗口的结果直接做更新的。

package com.wanshun.bigdata.chapter06;import com.wanshun.bigdata.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.sql.Timestamp;
import java.time.Duration;/*** Author:panghu* Date:2022-07-20* Description: 处理迟到的数据*/
public class _08ProcessLateDataExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从端口接收数据DataStreamSource<String> streamSource = env.socketTextStream("hadoop102", 7777);// 处理接受到的数据SingleOutputStreamOperator<Event> streamOperator = streamSource.map(new MapFunction<String, Event>() {@Overridepublic Event map(String line) throws Exception {String[] split = line.split(",");return new Event(split[0], split[1], Long.parseLong(split[2]));}});streamOperator.print("来自端口的数据");// 提取时间戳,设置水位线SingleOutputStreamOperator<Event> watermarks = streamOperator.assignTimestampsAndWatermarks(// 方案1:设置水位线延迟时间2sWatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timeStamp;}}));// 定义侧输出流标签OutputTag<Event> outputTag = new OutputTag<Event>("late"){};SingleOutputStreamOperator<String> result = watermarks.keyBy(data -> data.user).window(TumblingEventTimeWindows.of(Time.seconds(10)))// 方案2:允许窗口处理迟到1分钟的数据.allowedLateness(Time.minutes(1))// 方案3:将迟到的数据放在侧输出流.sideOutputLateData(outputTag).aggregate(new AggregateFunction<Event, Long, Long>() {// 创建累加器@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(Event event, Long acc) {return acc + 1;}@Overridepublic Long getResult(Long acc) {return acc;}@Overridepublic Long merge(Long acc, Long acc1) {return null;}},new ProcessWindowFunction<Long, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<Long> elements, Collector<String> out) throws Exception {out.collect(key + " 的访问次数为:" + elements.iterator().next()+ ", 窗口开始时间:" + new Timestamp(context.window().getStart())+ ", 窗口结束时间:" + new Timestamp(context.window().getEnd())+ ", 当前水位线时间:" + new Timestamp(context.currentWatermark()));}});result.print("result");// 获取侧输出流中的数据result.getSideOutput(outputTag).print("侧输出流中的数据");env.execute();}
}

Flink处理迟到数据的几种方式相关推荐

  1. Flink对迟到数据的处理的三种方式

    ** Flink对迟到数据的处理 ** 水位线可以用来平衡计算的完整性和延迟两方面.除非我们选择一种非常保守的水位线策略(最大延时设置的非常大,以至于包含了所有的元素,但结果是非常大的延迟),否则我们 ...

  2. Flink中迟到数据的处理

    目录 设置水位线延迟时间 允许窗口处理迟到数据 将迟到数据放入窗口侧输出流 总结:         我们知道,所谓的"迟到数据"(late data),是指某个水位线之后到来的数据 ...

  3. 【Flink】迟到数据的处理

    窗口 在流上的工作方式与批处理不同,因为流通常是无限的,所以不可能计算流中的所有元素,流上的聚合事件则由窗口限定,例如"过去 5 分钟的计数"或"最后 100 个元素的总 ...

  4. discard connection丢失数据_python kafka 生产者发送数据的三种方式

    python kafka 生产者发送数据的三种方式 发送方式 同步发送 发送数据耗时最长 有发送数据的状态,不会丢失数据,数据可靠性高 以同步的方式发送消息时,一条一条的发送,对每条消息返回的结果判断 ...

  5. 向服务器端提交数据的两种方式

    朴素版:                                                                                                 ...

  6. es重建字段类型_关于elasticsearch中更新数据的几种方式

    作为一个成熟的框架,Elasticsearch里面提供了丰富的操作数据的api,本篇我们就来学习一下在es中更新数据的几种方式. (一)更新文档 (1)部分更新: java api: ` HashMa ...

  7. layui根据条件显示列_templet渲染layui表格数据的三种方式

    layui前端框架是我一直在使用,也很好用. 今天记录一下,templet渲染layui表格数据的三种方式. 第一种:直接渲染(对于表格数据样式要求不高) 直接在动态表格字段声明,添加templet属 ...

  8. html中获取modelandview中的json数据_从Bitmap中获取YUV数据的两种方式

    从Bitmap中我们能获取到的是RGB颜色分量,当需要获取YUV数据的时候,则需要先提取R,G,B分量的值,然后将RGB转化为YUV(根据具体的YUV的排列格式做相应的Y,U,V分量的排列) 所以这篇 ...

  9. ios网络学习------4 UIWebView的加载本地数据的三种方式

    ios网络学习------4 UIWebView的加载本地数据的三种方式 分类: IOS2014-06-27 12:56 959人阅读 评论(0) 收藏 举报 UIWebView是IOS内置的浏览器, ...

最新文章

  1. 用nagios检测内存
  2. Nagios 配置文件介绍
  3. C# 控件开发中常用属性整理
  4. @scheduled 每30s 执行一次_全球首发5G神U麒麟820,荣耀30S卡位5G档位最强,售价2399起...
  5. 长征五号运载火箭将于2019年7月复飞
  6. java测试字符串的编码_Java字符串测验
  7. python监控钉钉群消息_使用python对mysql主从进行监控,并调用钉钉发送报警信息...
  8. 从零开始学WEB前端——HTML理论讲解
  9. python format是什么意思_python的format什么意思
  10. matlab求复数的模
  11. 树莓派系列二:openCV之头像添加国旗
  12. iMeta | 南医大陈连民/孔祥清等综述从基因组功能角度揭示肠菌对复杂疾病的潜在影响...
  13. Windows Server 2012 R2 更改系统语言
  14. git push failed to push some refs to xxxx 失败与解决方法
  15. 法拉克机器人自动怎么调_FANUC机器人程序自动启动介绍汇总.ppt
  16. 本月,我最推荐的意外保险排行榜
  17. CSS之vertical-align之野史篇(超越官网的教程)
  18. 员工管理系统之添加修改删除操作
  19. 【源码分享】jquery+css实现侧边导航栏
  20. 服务器2003系统怎么卸载软件,WindowsXP系统添加删除程序的方法

热门文章

  1. 超图三维管线常见问题
  2. matlab分段函数M文件,MATLAB用命令文件编分段函数
  3. WorldFirst手续费是多少?WorldFirst收取的费用有哪些?
  4. 读书笔记{11} VLAN及其在生产中的应用
  5. 计算机学院剧本,【大学生微电影剧本】_大学生微电影剧本《流年》
  6. 听GlobalSources关于PSC系统的培训后有感...
  7. 考研英语长难句(刘晓燕)笔记 第三课 名词(短语)和名词性从句
  8. 进制转换,原码补码反码--学习笔记--03
  9. 离开nbsp;果真如此轻易?
  10. HTML5主体结构元素