2021年大数据Flink(二十三):Watermaker案例演示
目录
Watermaker案例演示
需求
API
代码实现-1-开发版-掌握
代码实现-2-验证版-了解
Watermaker案例演示
需求
有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s,计算5秒内,每个用户的订单总金额
并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。
API
注意:一般我们都是直接使用Flink提供好的BoundedOutOfOrdernessTimestampExtractor
代码实现-1-开发版-掌握
Apache Flink 1.12 Documentation: Generating Watermarks
package cn.it.watermaker;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** Author lanson* Desc* 模拟实时订单数据,格式为: (订单ID,用户ID,订单金额,时间戳/事件时间)* 要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额* 并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。*/
public class WatermakerDemo01_Develop {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.Source//模拟实时订单数据(数据有延迟和乱序)DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() {private boolean flag = true;@Overridepublic void run(SourceContext<Order> ctx) throws Exception {Random random = new Random();while (flag) {String orderId = UUID.randomUUID().toString();int userId = random.nextInt(3);int money = random.nextInt(100);//模拟数据延迟和乱序!long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;ctx.collect(new Order(orderId, userId, money, eventTime));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {flag = false;}});//3.Transformation//-告诉Flink要基于事件时间来计算!//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime//-告诉Flnk数据中的哪一列是事件时间,因为Watermaker = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间/*DataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(3)) {//最大允许的延迟时间或乱序时间@Overridepublic long extractTimestamp(Order element) {return element.eventTime;//指定事件时间是哪一列,Flink底层会自动计算://Watermaker = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间}});*/DataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((event, timestamp) -> event.getEventTime()));//代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了//要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额DataStream<Order> result = watermakerDS.keyBy(Order::getUserId)//.timeWindow(Time.seconds(5), Time.seconds(5)).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum("money");//4.Sinkresult.print();//5.executeenv.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Order {private String orderId;private Integer userId;private Integer money;private Long eventTime;}
}
代码实现-2-验证版-了解
package cn.it.watermaker;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** Author lanson* Desc* 模拟实时订单数据,格式为: (订单ID,用户ID,订单金额,时间戳/事件时间)* 要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额* 并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。*/
public class WatermakerDemo02_Check {public static void main(String[] args) throws Exception {FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.Source//模拟实时订单数据(数据有延迟和乱序)DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {private boolean flag = true;@Overridepublic void run(SourceContext<Order> ctx) throws Exception {Random random = new Random();while (flag) {String orderId = UUID.randomUUID().toString();int userId = random.nextInt(3);int money = random.nextInt(100);//模拟数据延迟和乱序!long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;System.out.println("发送的数据为: "+userId + " : " + df.format(eventTime));ctx.collect(new Order(orderId, userId, money, eventTime));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {flag = false;}});//3.Transformation/*DataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((event, timestamp) -> event.getEventTime()));*///开发中直接使用上面的即可//学习测试时可以自己实现DataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(new WatermarkStrategy<Order>() {@Overridepublic WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Order>() {private int userId = 0;private long eventTime = 0L;private final long outOfOrdernessMillis = 3000;private long maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;@Overridepublic void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {userId = event.userId;eventTime = event.eventTime;maxTimestamp = Math.max(maxTimestamp, eventTimestamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {//Watermaker = 当前最大事件时间 - 最大允许的延迟时间或乱序时间Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMillis - 1);System.out.println("key:" + userId + ",系统时间:" + df.format(System.currentTimeMillis()) + ",事件时间:" + df.format(eventTime) + ",水印时间:" + df.format(watermark.getTimestamp()));output.emitWatermark(watermark);}};}}.withTimestampAssigner((event, timestamp) -> event.getEventTime()));//代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了//要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额/* DataStream<Order> result = watermakerDS.keyBy(Order::getUserId)//.timeWindow(Time.seconds(5), Time.seconds(5)).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum("money");*///开发中使用上面的代码进行业务计算即可//学习测试时可以使用下面的代码对数据进行更详细的输出,如输出窗口触发时各个窗口中的数据的事件时间,Watermaker时间DataStream<String> result = watermakerDS.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.seconds(5)))//把apply中的函数应用在窗口中的数据上//WindowFunction<IN, OUT, KEY, W extends Window>.apply(new WindowFunction<Order, String, Integer, TimeWindow>() {@Overridepublic void apply(Integer key, TimeWindow window, Iterable<Order> input, Collector<String> out) throws Exception {//准备一个集合用来存放属于该窗口的数据的事件时间List<String> eventTimeList = new ArrayList<>();for (Order order : input) {Long eventTime = order.eventTime;eventTimeList.add(df.format(eventTime));}String outStr = String.format("key:%s,窗口开始结束:[%s~%s),属于该窗口的事件时间:%s",key.toString(), df.format(window.getStart()), df.format(window.getEnd()), eventTimeList);out.collect(outStr);}});//4.Sinkresult.print();//5.executeenv.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Order {private String orderId;private Integer userId;private Integer money;private Long eventTime;}
}
2021年大数据Flink(二十三):Watermaker案例演示相关推荐
- 2021年大数据Flink(十三):流批一体API Sink
目录 Sink 预定义Sink 基于控制台和文件的Sink 自定义Sink MySQL Sink 预定义Sink 基于控制台和文件的Sink API 1.ds.print 直接输出到控制台 2.ds. ...
- 2021年大数据HBase(十三):HBase读取和存储数据的流程
全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 HBase读取和存储数据的流程 一.HBase读取数据的流程 ...
- 2021年大数据Flink(二十二):Time与Watermaker
目录 Flink-Time与Watermaker Time分类 EventTime的重要性 示例1 示例2 示例3 示例4 总结 Watermaker水印机制 ...
- 2021年大数据Flink(二十四):Allowed Lateness案例演示
Allowed Lateness案例演示 需求 有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额) 要求每隔5s,计算5秒内,每个用户的订单总金额 并添加Watermaker来解 ...
- 2021年大数据Flink(四十八):扩展阅读 Streaming File Sink
目录 扩展阅读 Streaming File Sink 介绍 场景描述 Bucket和SubTask.PartFile 案例演示 扩展阅读 配置详解 PartFile PartFile序列化编码 ...
- 2021年大数据Flink(四十六):扩展阅读 异步IO
目录 扩展阅读 异步IO 介绍 异步IO操作的需求 使用Aysnc I/O的前提条件 Async I/O API 案例演示 扩展阅读 原理深入 AsyncDataStream 消息的顺序性 扩展阅读 ...
- 2021年大数据Flink(四十):Flink模拟双十一实时大屏统计
目录 Flink模拟双十一实时大屏统计 需求 数据 编码步骤: 1.env 2.source 3.transformation 4.使用上面聚合的结果,实现业务需求: 5.execute 参考代码 实 ...
- 2021年大数据Flink(一):乘风破浪的Flink-Flink概述
目录 乘风破浪的Flink-Flink概述 实时即未来 一切从Apache开始 富二代Flink Flink官方介绍 官网地址: Flink组件栈 Flink基石 Checkpoint ...
- 2021年大数据ELK(十三):Elasticsearch编程(添加职位数据)
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 Elasticsearch编程 一.添加职位数据 1.初始化客户端连接 2.实 ...
最新文章
- SMILES | 简化分子线性输入规范
- C++ 虚析构函数
- Notepadd ++ PluginManager安装
- Java黑皮书课后题第6章:*6.4(反序显示一个整数)使用下面的方法体编写方法,反序显示一个整数…例如reverse(3456)返回6543,编写一个测试程序,提示用户输入一个整数,然后显示它的反序
- Linux系统利用Crontab命令实现定时重启
- Android性能调优利器StrictMode
- 设计模式学习笔记-代理模式
- Mybatis怎么能看是否执行了sql语句
- java中为什么要用注解_java中的注解,真的很重要,你理解了嘛?
- mysql容器重启数据是否丢失_docker容器重启 数据会丢吗
- CentosX64使用yum快速搭建xen虚拟化环境
- 编写Windows服务疑问2:探索服务与安装器的关系
- php怎么重命名文件,phpstorm如何进行文件或者文件夹重命名
- 【TSP】基于matlab GUI改进的遗传算法求解旅行商问题【含Matlab源码 926期】
- 实时取色器(RGB)
- (转)RemapKey等:小巧实用的键盘映射工具
- mysql车库管理系统_小区停车管理系统(JSP+JAVA+MySQL)
- xshell 4 中文乱码问题解决
- 谷歌有望回归中国市场
- 全国计算机能力挑战赛含金量高吗,大学里,有哪些含金量高,又容易得奖的国家级比赛?...