Allowed Lateness案例演示

需求

有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)

要求每隔5s,计算5秒内,每个用户的订单总金额

并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。

并使用OutputTag+allowedLateness解决数据丢失问题

API

package cn.it.watermaker;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;import java.time.Duration;
import java.util.Random;
import java.util.UUID;/*** Author lanson* Desc* 模拟实时订单数据,格式为: (订单ID,用户ID,订单金额,时间戳/事件时间)* 要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额* 并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。*/
public class WatermakerDemo03_AllowedLateness {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.Source//模拟实时订单数据(数据有延迟和乱序)DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {private boolean flag = true;@Overridepublic void run(SourceContext<Order> ctx) throws Exception {Random random = new Random();while (flag) {String orderId = UUID.randomUUID().toString();int userId = random.nextInt(3);int money = random.nextInt(100);//模拟数据延迟和乱序!long eventTime = System.currentTimeMillis() - random.nextInt(10) * 1000;ctx.collect(new Order(orderId, userId, money, eventTime));//TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {flag = false;}});//3.TransformationDataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((event, timestamp) -> event.getEventTime()));//代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了//要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额OutputTag<Order> outputTag = new OutputTag<>("Seriouslylate", TypeInformation.of(Order.class));SingleOutputStreamOperator<Order> result = watermakerDS.keyBy(Order::getUserId)//.timeWindow(Time.seconds(5), Time.seconds(5)).window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(5)).sideOutputLateData(outputTag).sum("money");DataStream<Order> result2 = result.getSideOutput(outputTag);//4.Sinkresult.print("正常的数据和迟到不严重的数据");result2.print("迟到严重的数据");//5.executeenv.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Order {private String orderId;private Integer userId;private Integer money;private Long eventTime;}
}

2021年大数据Flink(二十四):​​​​​​​Allowed Lateness案例演示相关推荐

  1. 2021年大数据Flink(十四):流批一体API Connectors JDBC

    目录 Connectors JDBC 代码演示 Connectors JDBC Apache Flink 1.12 Documentation: JDBC Connector 代码演示 package ...

  2. 2021年大数据Flink(十九):案例一 基于时间的滚动和滑动窗口

    目录 案例一 基于时间的滚动和滑动窗口 需求 代码实现 案例一 基于时间的滚动和滑动窗口 需求 nc -lk 9999 有如下数据表示: 信号灯编号和通过该信号灯的车的数量 9,3 9,2 9,7 4 ...

  3. 2021年大数据HBase(十四):HBase的原理及其相关的工作机制

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 HBase的原理及其相关的工作机制 一.HBase的flus ...

  4. 2021年大数据Hadoop(十四):HDFS的高可用机制

    全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 HDFS的高可用机制 HDFS高可用介绍 组件介绍 Nam ...

  5. 2021年大数据Flink(十二):流批一体API Transformation

    目录 Transformation 官网API列表 基本操作-略 map flatMap keyBy filter sum reduce 代码演示 合并-拆分 union和connect split. ...

  6. 2021年大数据Flink(十五):流批一体API Connectors ​​​​​​​Kafka

    目录 Kafka pom依赖 参数设置 参数说明 Kafka命令 代码实现-Kafka Consumer 代码实现-Kafka Producer 代码实现-实时ETL Kafka pom依赖 Flin ...

  7. 2021年大数据Flink(十):流处理相关概念

    目录 流处理相关概念 数据的时效性 ​​​​​​​流处理和批处理 ​​​​​​​流批一体API DataStream API 支持批执行模式 API 编程模型 ​​​​​​​流处理相关概念 数据的时效 ...

  8. 大数据实战二十四课 - Spark SQL04

    第一章:上次课回顾 第二章:Spark SQL Functions 2.1 简单小应用 2.2 Spark SQL自定义函数 第三章:Catalog 第四章:DataSet 第五章:窗口函数 第六章: ...

  9. 2021年大数据Flink(十八):Flink Window操作

    目录 ​​​​​​​Flink-Window操作 为什么需要Window Window的分类 按照time和count分类 ​​​​​​​按照slide和size分类 ​​​​​​​总结 Window ...

  10. 2021年大数据Flink(十六):流批一体API Connectors ​​​​​​​​​​​​​​Redis

    目录 Redis API 使用RedisCommand设置数据结构类型时和redis结构对应关系 需求 代码实现 Redis API 通过flink 操作redis 其实我们可以通过传统的redis ...

最新文章

  1. rb c语言,C语言,RB和RBT什么区别啊???这里的typedef 什么作用???
  2. hive 字段不包含某个字符_hive之面试必问 hive调优
  3. Python 技术篇-基于PyHook3+threading多线程实现鼠标单击事件和双击事件的识别实例演示
  4. linux查看服务器网络状态
  5. python 控制手机摄像头_python+open cv调用手机摄像头,保存文件
  6. Android 自定义组件学习 3
  7. smalot-bootstrap-datetimepicker 使用心得
  8. 精确到门牌号的地图_IP地址精准查询工具:能精确到门牌号
  9. 主成分分析(PCA)详解
  10. encapsulation dot1q vlan-id命令
  11. 微信墙html5,微信墙怎么做?微信墙制作流程介绍
  12. 这是我见过最牛逼的Shell脚本!
  13. 6.STC15W408AS单片机外部中断
  14. 搭建ruby + jekyll + github pages
  15. Don‘t Cry Over Spilt Milk
  16. Spring框架学习笔记,超详细!!(4)
  17. 外包3年,吃透这三份Java程序员必刷的算法宝典后,已从13K涨到25K
  18. 以太坊POA共识算法解析
  19. 2021.12- 参加第四届海淀区青少年科普科幻创作活动(一等奖)
  20. 【电机驱动芯片(单、双极性步进电机驱动方式/四相五线和42步进电机)——ULN2003、双H桥芯片(DRV8833/DRV8825)】

热门文章

  1. 两步完成项目定时启动,java项目定时启动
  2. 2022-2028年中国蛋制品行业市场专项调查及前瞻分析报告
  3. 常用开源协议介绍以及开源软件规范列表
  4. 外包工作经历暨2021年终总结
  5. tf.variable_scope 参数
  6. 最全Pycharm教程(43)——Pycharm扩展功能之UML类图使用 代码结构
  7. TVM yolov3优化代码修改(编译运行OK)
  8. MinkowskiEngine语义分割
  9. FPGA与ASIC:它们之间的区别以及使用哪一种?
  10. 蓝牙mesh网络技术的亮点