自定义生成水位线

package com.claroja;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;public class CusWaterMark {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);
//        env.getConfig().setAutoWatermarkInterval(60 * 1000L); //默认200ms更新水位线,每进来一个event都记录最新的时间,但200ms才会更新DataStreamSource<String> stream = env.socketTextStream("localhost", 9999);stream.map(new MapFunction<String, Tuple2<String, Long>>() {// Tuple2<key, timestamp>,事件事件必须是毫秒时间戳@Overridepublic Tuple2<String, Long> map(String s) throws Exception {String[] arr = s.split(" ");return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);}}).assignTimestampsAndWatermarks(// 水位线必须在keyby之前new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {//设置最大延迟时间final long bound = 5 * 1000L;//系统观察到的元素包含的最大时间戳long maxTs = Long.MIN_VALUE + bound + 1;// 每来一条数据执行一次@Overridepublic long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {maxTs = Math.max(maxTs, stringLongTuple2.f1); // 更新观察到的最大的事件时间return stringLongTuple2.f1; // 告诉系统哪一个字段是事件时间}// 系统在流中插入水位线时执行,默认200ms执行一次@Overridepublic Watermark getCurrentWatermark() {return new Watermark(maxTs - bound - 1);}}).keyBy(r -> r.f0).timeWindow(Time.seconds(5)).process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {long count = 0L;for (Tuple2<String, Long> i : iterable) {count += 1;}collector.collect("窗口中共有 " + count + " 条元素");}}).print();env.execute();}
}

Flink CusWaterMark相关推荐

  1. hadoop,spark,scala,flink 大数据分布式系统汇总

    20220314 https://shimo.im/docs/YcPW8YY3T6dT86dV/read 尚硅谷大数据文档资料 iceberg相当于对hive的读写,starrocks相当于对mysq ...

  2. 2021年大数据Flink(四十八):扩展阅读  Streaming File Sink

    目录 扩展阅读  Streaming File Sink 介绍 场景描述 Bucket和SubTask.PartFile 案例演示 扩展阅读  配置详解 PartFile PartFile序列化编码 ...

  3. 2021年大数据Flink(四十六):扩展阅读 异步IO

    目录 扩展阅读  异步IO 介绍 异步IO操作的需求 使用Aysnc I/O的前提条件 Async I/O API 案例演示 扩展阅读 原理深入 AsyncDataStream 消息的顺序性 扩展阅读 ...

  4. 2021年大数据Flink(四十五):​​​​​​扩展阅读 双流Join

    目录 扩展阅读  双流Join 介绍 Window Join Interval Join ​​​​​​​代码演示1 ​​​​​​​代码演示2 重点注意 扩展阅读  双流Join 介绍 https:// ...

  5. 2021年大数据Flink(四十四):​​​​​​扩展阅读 End-to-End Exactly-Once

    目录 扩展阅读 End-to-End Exactly-Once 流处理的数据处理语义 At-most-once-最多一次 At-least-once-至少一次 Exactly-once-精确一次 En ...

  6. 2021年大数据Flink(四十二):​​​​​​​BroadcastState

    ​​​​​目录 ​BroadcastState BroadcastState介绍 需求-实现配置动态更新 编码步骤 1.env 2.source 3.transformation 4.sink 5.e ...

  7. 2021年大数据Flink(四十一):​​​​​​​Flink实现订单自动好评

    目录 Flink实现订单自动好评 需求 数据 编码步骤 1.env 2.source 3.transformation 4.sink 5.execute 参考代码 参考效果 实现代码: Flink实现 ...

  8. 2021年大数据Flink(四十):​​​​​​​Flink模拟双十一实时大屏统计

    目录 Flink模拟双十一实时大屏统计 需求 数据 编码步骤: 1.env 2.source 3.transformation 4.使用上面聚合的结果,实现业务需求: 5.execute 参考代码 实 ...

  9. 2021年大数据Flink(三十九):​​​​​​​Table与SQL ​​​​​​总结 Flink-SQL常用算子

    目录 总结 Flink-SQL常用算子 SELECT WHERE ​​​​​​​DISTINCT ​​​​​​​GROUP BY ​​​​​​​UNION 和 UNION ALL ​​​​​​​JOI ...

最新文章

  1. 重磅直播|大规模点云可视化技术
  2. iOS用户设计指南 - 平台特征
  3. python怎么导入包-如何理解Python中包的引入
  4. 使用字符流 创建文件 写入文件 复制文件
  5. 吴恩达《Machine Learning》精炼笔记 4:神经网络基础
  6. JS中的事件冒泡——总结
  7. docker和java容器_使用Docker容器和Java EE进行持续交付
  8. 第一章 概率论的基本概念
  9. apache php的权限,Unix上的Apache PHP写权限
  10. java 回调函数传值_说明Java的传递与回调机制的代码示例分享
  11. VINS-Mono 代码解析六、边缘化(3)
  12. python之迷宫小游戏
  13. 小程序全套购物车(全选,单选,反选,删除,价格计算)
  14. jenkins安装下载
  15. 武汉计算机软件应届毕业生工资,精打细算告诉你一个应届毕业生在武汉工资多少才能活下来(汉口物件)...
  16. 幽默感七个技巧_如何让自己变得幽默-16个聊天幽默技巧
  17. 玩客云root成功一键获取root权限
  18. R语言入门与数据分析(1)
  19. 排序算法之——归并排序和快速排序
  20. 远程服务器拷贝数据库或者大量数据,出现会话空闲时间已超出限制,将在2分钟之内断开连接

热门文章

  1. 渲染已保存的几何图形
  2. python 模块 类 函数_Python17之函数、类、模块、包、库
  3. go 基准测试 找不到函数_初学TDD:测试也能推动开发啦!
  4. 代码管理_阿里巴巴如何管理代码分支?
  5. 蓝鸽英语学习平台_蓝鸽集团携手英特尔,共筑智慧校园新生态——蓝鸽amp;英特尔智慧校园建设高峰论坛顺利举办...
  6. html5语音闹钟代码,HTML5数字时钟之闹钟
  7. c++ std 方法 取两个数的较大_【数据结构C++】两数交换(4种方法)
  8. 计算机由简单的二进制阴阳,二进制之美,大道至简,二生万物!
  9. comsol稀物质传递_COMSOL电弧仿真
  10. android 得到目录,android 获取路径的获取