Flink WatermarkKeyed
1.stream1
监听端口9999
,stream2
监听端口9998
此时stream1和stream2 以及keyed水位线都是最小值
2.输入 a 1,最新的水位线为999ms(1000ms-1ms)
3.stream2
a = 5
stream2
stream2
package com.claroja;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class WatermarkKeyed {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Long>> stream1 = env.socketTextStream("localhost", 9999,'\n').map(r -> Tuple2.of(r.split(" ")[0], Long.parseLong(r.split(" ")[1]) * 1000L)).returns(new TypeHint<Tuple2<String, Long>>() {}).assignTimestampsAndWatermarks(// 升序时间戳抽取WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {return stringLongTuple2.f1;}}));SingleOutputStreamOperator<Tuple2<String, Long>> stream2 = env.socketTextStream("localhost", 9998).map(r -> Tuple2.of(r.split(" ")[0], Long.parseLong(r.split(" ")[1]) * 1000L)).returns(new TypeHint<Tuple2<String, Long>>() {}).assignTimestampsAndWatermarks(// 升序时间戳抽取WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {return stringLongTuple2.f1;}}));stream1.union(stream2).keyBy(r -> r.f0).process(new Keyed()).print();env.execute();}public static class Keyed extends KeyedProcessFunction<String, Tuple2<String, Long>, String> {//每收到一个数据就会掉用一次@Overridepublic void processElement(Tuple2<String, Long> stringLongTuple2, Context context, Collector<String> collector) throws Exception {collector.collect("当前水位线是:" + context.timerService().currentWatermark());}}
}
Flink WatermarkKeyed相关推荐
- hadoop,spark,scala,flink 大数据分布式系统汇总
20220314 https://shimo.im/docs/YcPW8YY3T6dT86dV/read 尚硅谷大数据文档资料 iceberg相当于对hive的读写,starrocks相当于对mysq ...
- 2021年大数据Flink(四十八):扩展阅读 Streaming File Sink
目录 扩展阅读 Streaming File Sink 介绍 场景描述 Bucket和SubTask.PartFile 案例演示 扩展阅读 配置详解 PartFile PartFile序列化编码 ...
- 2021年大数据Flink(四十六):扩展阅读 异步IO
目录 扩展阅读 异步IO 介绍 异步IO操作的需求 使用Aysnc I/O的前提条件 Async I/O API 案例演示 扩展阅读 原理深入 AsyncDataStream 消息的顺序性 扩展阅读 ...
- 2021年大数据Flink(四十五):扩展阅读 双流Join
目录 扩展阅读 双流Join 介绍 Window Join Interval Join 代码演示1 代码演示2 重点注意 扩展阅读 双流Join 介绍 https:// ...
- 2021年大数据Flink(四十四):扩展阅读 End-to-End Exactly-Once
目录 扩展阅读 End-to-End Exactly-Once 流处理的数据处理语义 At-most-once-最多一次 At-least-once-至少一次 Exactly-once-精确一次 En ...
- 2021年大数据Flink(四十二):BroadcastState
目录 BroadcastState BroadcastState介绍 需求-实现配置动态更新 编码步骤 1.env 2.source 3.transformation 4.sink 5.e ...
- 2021年大数据Flink(四十一):Flink实现订单自动好评
目录 Flink实现订单自动好评 需求 数据 编码步骤 1.env 2.source 3.transformation 4.sink 5.execute 参考代码 参考效果 实现代码: Flink实现 ...
- 2021年大数据Flink(四十):Flink模拟双十一实时大屏统计
目录 Flink模拟双十一实时大屏统计 需求 数据 编码步骤: 1.env 2.source 3.transformation 4.使用上面聚合的结果,实现业务需求: 5.execute 参考代码 实 ...
- 2021年大数据Flink(三十九):Table与SQL 总结 Flink-SQL常用算子
目录 总结 Flink-SQL常用算子 SELECT WHERE DISTINCT GROUP BY UNION 和 UNION ALL JOI ...
最新文章
- [python] 从GPS坐标获取国家名
- python每日一类(3):os和sys
- 大学python和vb哪个简单-python和vb哪个简单
- 本地---tcpserver与tcpclient
- Android之DrawText详解
- python界面gui随机生成器_Python 实现的、带GUI界面的词云生成器
- 学习Python+numpy数组运算和矩阵运算看这254页PPT就够了
- JS进阶Date format(日期格式化)
- CDN---共享单车算啥,阿里云发布共享网络黑科技PCDN,降低视频行业75%的成本
- Incorrect username or password (access token)
- c语言字符输出128,如何将128位整数转换为C中的十进制ASCII字符串?
- 在 Linux 上如何清除内存的 Cache、Buffer 和交换空间
- AXI仿真之AXI Chip2Chip
- CAD与GIS集成说明(在线CAD结合GIS,webCAD)
- linux下部署selenium爬虫程序
- stm32学习笔记——电容触摸按键的实现
- 关于STVP写保护等级2的问题
- 服务器维护封号,LOL客服的关于他们自己服务器问题导致账号被封号的问题
- 傻白入门芯片设计,三大基本定律(十)
- 使用C#.NET WebBrowser控件导航到不同的网站出现 所请求的资源正在使用中。 (从HRESULT异常:0x800700AA)
热门文章
- 学习笔记 - 002
- c语言RTK算法,C-RTK 9P定位系统
- Linux 冯诺依曼体系结构
- oracle设置控制文件多路径,【备份恢复】 控制文件多路径
- plsql导出表结构_mysqldump命令详解 Part 5-按条件备份表数据
- stm32 web get 参数_纯进口mpv销量排行榜 迈巴赫vs680商务车参数
- java mongo数据去重复_java – MongoSpark保存重复的密钥错误E11000
- linux 源码 网络驱动,Linux网络驱动源码分析(一)
- python中静态变量和静态方法_从静态变量引用静态方法
- java tree类子项的添加和删除_Java学习2-23 JTree节点的删除与添加(来源网上)