Flink CusWaterMark
自定义生成水位线
package com.claroja;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;public class CusWaterMark {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);
// env.getConfig().setAutoWatermarkInterval(60 * 1000L); //默认200ms更新水位线,每进来一个event都记录最新的时间,但200ms才会更新DataStreamSource<String> stream = env.socketTextStream("localhost", 9999);stream.map(new MapFunction<String, Tuple2<String, Long>>() {// Tuple2<key, timestamp>,事件事件必须是毫秒时间戳@Overridepublic Tuple2<String, Long> map(String s) throws Exception {String[] arr = s.split(" ");return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);}}).assignTimestampsAndWatermarks(// 水位线必须在keyby之前new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {//设置最大延迟时间final long bound = 5 * 1000L;//系统观察到的元素包含的最大时间戳long maxTs = Long.MIN_VALUE + bound + 1;// 每来一条数据执行一次@Overridepublic long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {maxTs = Math.max(maxTs, stringLongTuple2.f1); // 更新观察到的最大的事件时间return stringLongTuple2.f1; // 告诉系统哪一个字段是事件时间}// 系统在流中插入水位线时执行,默认200ms执行一次@Overridepublic Watermark getCurrentWatermark() {return new Watermark(maxTs - bound - 1);}}).keyBy(r -> r.f0).timeWindow(Time.seconds(5)).process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {long count = 0L;for (Tuple2<String, Long> i : iterable) {count += 1;}collector.collect("窗口中共有 " + count + " 条元素");}}).print();env.execute();}
}
Flink CusWaterMark相关推荐
- hadoop,spark,scala,flink 大数据分布式系统汇总
20220314 https://shimo.im/docs/YcPW8YY3T6dT86dV/read 尚硅谷大数据文档资料 iceberg相当于对hive的读写,starrocks相当于对mysq ...
- 2021年大数据Flink(四十八):扩展阅读 Streaming File Sink
目录 扩展阅读 Streaming File Sink 介绍 场景描述 Bucket和SubTask.PartFile 案例演示 扩展阅读 配置详解 PartFile PartFile序列化编码 ...
- 2021年大数据Flink(四十六):扩展阅读 异步IO
目录 扩展阅读 异步IO 介绍 异步IO操作的需求 使用Aysnc I/O的前提条件 Async I/O API 案例演示 扩展阅读 原理深入 AsyncDataStream 消息的顺序性 扩展阅读 ...
- 2021年大数据Flink(四十五):扩展阅读 双流Join
目录 扩展阅读 双流Join 介绍 Window Join Interval Join 代码演示1 代码演示2 重点注意 扩展阅读 双流Join 介绍 https:// ...
- 2021年大数据Flink(四十四):扩展阅读 End-to-End Exactly-Once
目录 扩展阅读 End-to-End Exactly-Once 流处理的数据处理语义 At-most-once-最多一次 At-least-once-至少一次 Exactly-once-精确一次 En ...
- 2021年大数据Flink(四十二):BroadcastState
目录 BroadcastState BroadcastState介绍 需求-实现配置动态更新 编码步骤 1.env 2.source 3.transformation 4.sink 5.e ...
- 2021年大数据Flink(四十一):Flink实现订单自动好评
目录 Flink实现订单自动好评 需求 数据 编码步骤 1.env 2.source 3.transformation 4.sink 5.execute 参考代码 参考效果 实现代码: Flink实现 ...
- 2021年大数据Flink(四十):Flink模拟双十一实时大屏统计
目录 Flink模拟双十一实时大屏统计 需求 数据 编码步骤: 1.env 2.source 3.transformation 4.使用上面聚合的结果,实现业务需求: 5.execute 参考代码 实 ...
- 2021年大数据Flink(三十九):Table与SQL 总结 Flink-SQL常用算子
目录 总结 Flink-SQL常用算子 SELECT WHERE DISTINCT GROUP BY UNION 和 UNION ALL JOI ...
最新文章
- 重磅直播|大规模点云可视化技术
- iOS用户设计指南 - 平台特征
- python怎么导入包-如何理解Python中包的引入
- 使用字符流 创建文件 写入文件 复制文件
- 吴恩达《Machine Learning》精炼笔记 4:神经网络基础
- JS中的事件冒泡——总结
- docker和java容器_使用Docker容器和Java EE进行持续交付
- 第一章 概率论的基本概念
- apache php的权限,Unix上的Apache PHP写权限
- java 回调函数传值_说明Java的传递与回调机制的代码示例分享
- VINS-Mono 代码解析六、边缘化(3)
- python之迷宫小游戏
- 小程序全套购物车(全选,单选,反选,删除,价格计算)
- jenkins安装下载
- 武汉计算机软件应届毕业生工资,精打细算告诉你一个应届毕业生在武汉工资多少才能活下来(汉口物件)...
- 幽默感七个技巧_如何让自己变得幽默-16个聊天幽默技巧
- 玩客云root成功一键获取root权限
- R语言入门与数据分析(1)
- 排序算法之——归并排序和快速排序
- 远程服务器拷贝数据库或者大量数据,出现会话空闲时间已超出限制,将在2分钟之内断开连接
热门文章
- 渲染已保存的几何图形
- python 模块 类 函数_Python17之函数、类、模块、包、库
- go 基准测试 找不到函数_初学TDD:测试也能推动开发啦!
- 代码管理_阿里巴巴如何管理代码分支?
- 蓝鸽英语学习平台_蓝鸽集团携手英特尔,共筑智慧校园新生态——蓝鸽amp;英特尔智慧校园建设高峰论坛顺利举办...
- html5语音闹钟代码,HTML5数字时钟之闹钟
- c++ std 方法 取两个数的较大_【数据结构C++】两数交换(4种方法)
- 计算机由简单的二进制阴阳,二进制之美,大道至简,二生万物!
- comsol稀物质传递_COMSOL电弧仿真
- android 得到目录,android 获取路径的获取