目录

Watermaker案例演示

需求

API

代码实现-1-开发版-掌握

代码实现-2-验证版-了解


Watermaker案例演示

需求

有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)

要求每隔5s,计算5秒内,每个用户的订单总金额

并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。

API

注意:一般我们都是直接使用Flink提供好的BoundedOutOfOrdernessTimestampExtractor

代码实现-1-开发版-掌握

Apache Flink 1.12 Documentation: Generating Watermarks

package cn.it.watermaker;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** Author lanson* Desc* 模拟实时订单数据,格式为: (订单ID,用户ID,订单金额,时间戳/事件时间)* 要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额* 并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。*/
public class WatermakerDemo01_Develop {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.Source//模拟实时订单数据(数据有延迟和乱序)DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() {private boolean flag = true;@Overridepublic void run(SourceContext<Order> ctx) throws Exception {Random random = new Random();while (flag) {String orderId = UUID.randomUUID().toString();int userId = random.nextInt(3);int money = random.nextInt(100);//模拟数据延迟和乱序!long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;ctx.collect(new Order(orderId, userId, money, eventTime));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {flag = false;}});//3.Transformation//-告诉Flink要基于事件时间来计算!//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime//-告诉Flnk数据中的哪一列是事件时间,因为Watermaker = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间/*DataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(3)) {//最大允许的延迟时间或乱序时间@Overridepublic long extractTimestamp(Order element) {return element.eventTime;//指定事件时间是哪一列,Flink底层会自动计算://Watermaker = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间}});*/DataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((event, timestamp) -> event.getEventTime()));//代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了//要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额DataStream<Order> result = watermakerDS.keyBy(Order::getUserId)//.timeWindow(Time.seconds(5), Time.seconds(5)).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum("money");//4.Sinkresult.print();//5.executeenv.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Order {private String orderId;private Integer userId;private Integer money;private Long eventTime;}
}

​​​​​​​代码实现-2-验证版-了解


package cn.it.watermaker;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** Author lanson* Desc* 模拟实时订单数据,格式为: (订单ID,用户ID,订单金额,时间戳/事件时间)* 要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额* 并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。*/
public class WatermakerDemo02_Check {public static void main(String[] args) throws Exception {FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.Source//模拟实时订单数据(数据有延迟和乱序)DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {private boolean flag = true;@Overridepublic void run(SourceContext<Order> ctx) throws Exception {Random random = new Random();while (flag) {String orderId = UUID.randomUUID().toString();int userId = random.nextInt(3);int money = random.nextInt(100);//模拟数据延迟和乱序!long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;System.out.println("发送的数据为: "+userId + " : " + df.format(eventTime));ctx.collect(new Order(orderId, userId, money, eventTime));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {flag = false;}});//3.Transformation/*DataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((event, timestamp) -> event.getEventTime()));*///开发中直接使用上面的即可//学习测试时可以自己实现DataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(new WatermarkStrategy<Order>() {@Overridepublic WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Order>() {private int userId = 0;private long eventTime = 0L;private final long outOfOrdernessMillis = 3000;private long maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;@Overridepublic void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {userId = event.userId;eventTime = event.eventTime;maxTimestamp = Math.max(maxTimestamp, eventTimestamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {//Watermaker = 当前最大事件时间 - 最大允许的延迟时间或乱序时间Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMillis - 1);System.out.println("key:" + userId + ",系统时间:" + df.format(System.currentTimeMillis()) + ",事件时间:" + df.format(eventTime) + ",水印时间:" + df.format(watermark.getTimestamp()));output.emitWatermark(watermark);}};}}.withTimestampAssigner((event, timestamp) -> event.getEventTime()));//代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了//要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额/* DataStream<Order> result = watermakerDS.keyBy(Order::getUserId)//.timeWindow(Time.seconds(5), Time.seconds(5)).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum("money");*///开发中使用上面的代码进行业务计算即可//学习测试时可以使用下面的代码对数据进行更详细的输出,如输出窗口触发时各个窗口中的数据的事件时间,Watermaker时间DataStream<String> result = watermakerDS.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.seconds(5)))//把apply中的函数应用在窗口中的数据上//WindowFunction<IN, OUT, KEY, W extends Window>.apply(new WindowFunction<Order, String, Integer, TimeWindow>() {@Overridepublic void apply(Integer key, TimeWindow window, Iterable<Order> input, Collector<String> out) throws Exception {//准备一个集合用来存放属于该窗口的数据的事件时间List<String> eventTimeList = new ArrayList<>();for (Order order : input) {Long eventTime = order.eventTime;eventTimeList.add(df.format(eventTime));}String outStr = String.format("key:%s,窗口开始结束:[%s~%s),属于该窗口的事件时间:%s",key.toString(), df.format(window.getStart()), df.format(window.getEnd()), eventTimeList);out.collect(outStr);}});//4.Sinkresult.print();//5.executeenv.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Order {private String orderId;private Integer userId;private Integer money;private Long eventTime;}
}

2021年大数据Flink(二十三):​​​​​​​Watermaker案例演示相关推荐

  1. 2021年大数据Flink(十三):流批一体API Sink

    目录 Sink 预定义Sink 基于控制台和文件的Sink 自定义Sink MySQL Sink 预定义Sink 基于控制台和文件的Sink API 1.ds.print 直接输出到控制台 2.ds. ...

  2. 2021年大数据HBase(十三):HBase读取和存储数据的流程

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 HBase读取和存储数据的流程 一.HBase读取数据的流程 ...

  3. 2021年大数据Flink(二十二):Time与Watermaker

    目录 Flink-Time与Watermaker Time分类 EventTime的重要性 示例1 示例2 ​​​​​​​示例3 ​​​​​​​示例4 ​​​​​​​总结 Watermaker水印机制 ...

  4. 2021年大数据Flink(二十四):​​​​​​​Allowed Lateness案例演示

    Allowed Lateness案例演示 需求 有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额) 要求每隔5s,计算5秒内,每个用户的订单总金额 并添加Watermaker来解 ...

  5. 2021年大数据Flink(四十八):扩展阅读  Streaming File Sink

    目录 扩展阅读  Streaming File Sink 介绍 场景描述 Bucket和SubTask.PartFile 案例演示 扩展阅读  配置详解 PartFile PartFile序列化编码 ...

  6. 2021年大数据Flink(四十六):扩展阅读 异步IO

    目录 扩展阅读  异步IO 介绍 异步IO操作的需求 使用Aysnc I/O的前提条件 Async I/O API 案例演示 扩展阅读 原理深入 AsyncDataStream 消息的顺序性 扩展阅读 ...

  7. 2021年大数据Flink(四十):​​​​​​​Flink模拟双十一实时大屏统计

    目录 Flink模拟双十一实时大屏统计 需求 数据 编码步骤: 1.env 2.source 3.transformation 4.使用上面聚合的结果,实现业务需求: 5.execute 参考代码 实 ...

  8. 2021年大数据Flink(一):乘风破浪的Flink-Flink概述

    目录 乘风破浪的Flink-Flink概述 实时即未来 一切从Apache开始 富二代Flink Flink官方介绍 官网地址: Flink组件栈 ​​​​​​​Flink基石 Checkpoint ...

  9. 2021年大数据ELK(十三):Elasticsearch编程(添加职位数据)

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 Elasticsearch编程 一.添加职位数据 1.初始化客户端连接 2.实 ...

最新文章

  1. SMILES | 简化分子线性输入规范
  2. C++ 虚析构函数
  3. Notepadd ++ PluginManager安装
  4. Java黑皮书课后题第6章:*6.4(反序显示一个整数)使用下面的方法体编写方法,反序显示一个整数…例如reverse(3456)返回6543,编写一个测试程序,提示用户输入一个整数,然后显示它的反序
  5. Linux系统利用Crontab命令实现定时重启
  6. Android性能调优利器StrictMode
  7. 设计模式学习笔记-代理模式
  8. Mybatis怎么能看是否执行了sql语句
  9. java中为什么要用注解_java中的注解,真的很重要,你理解了嘛?
  10. mysql容器重启数据是否丢失_docker容器重启 数据会丢吗
  11. CentosX64使用yum快速搭建xen虚拟化环境
  12. 编写Windows服务疑问2:探索服务与安装器的关系
  13. php怎么重命名文件,phpstorm如何进行文件或者文件夹重命名
  14. 【TSP】基于matlab GUI改进的遗传算法求解旅行商问题【含Matlab源码 926期】
  15. 实时取色器(RGB)
  16. (转)RemapKey等:小巧实用的键盘映射工具
  17. mysql车库管理系统_小区停车管理系统(JSP+JAVA+MySQL)
  18. xshell 4 中文乱码问题解决
  19. 谷歌有望回归中国市场
  20. 全国计算机能力挑战赛含金量高吗,大学里,有哪些含金量高,又容易得奖的国家级比赛?...

热门文章

  1. Linux下tomcat的安装与卸载以及配置(超简单)
  2. 导出swagger2生成的文档
  3. Idea groovy表生成实体类带注释
  4. Java 多线程概述
  5. mysql同事包含_mysql 包含关系处理
  6. java mybatis基础
  7. Python 中 (,|)和(and,or)之间的区别 逻辑判断
  8. python中使用指定GPU
  9. Python---哈夫曼树---Huffman Tree
  10. LLVM数据流分析的理论