1.stream1监听端口9999,stream2监听端口9998

此时stream1和stream2 以及keyed水位线都是最小值

2.输入 a 1,最新的水位线为999ms(1000ms-1ms)

3.stream2

a = 5

stream2

stream2


package com.claroja;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class WatermarkKeyed {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Long>> stream1 = env.socketTextStream("localhost", 9999,'\n').map(r -> Tuple2.of(r.split(" ")[0], Long.parseLong(r.split(" ")[1]) * 1000L)).returns(new TypeHint<Tuple2<String, Long>>() {}).assignTimestampsAndWatermarks(// 升序时间戳抽取WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {return stringLongTuple2.f1;}}));SingleOutputStreamOperator<Tuple2<String, Long>> stream2 = env.socketTextStream("localhost", 9998).map(r -> Tuple2.of(r.split(" ")[0], Long.parseLong(r.split(" ")[1]) * 1000L)).returns(new TypeHint<Tuple2<String, Long>>() {}).assignTimestampsAndWatermarks(// 升序时间戳抽取WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {return stringLongTuple2.f1;}}));stream1.union(stream2).keyBy(r -> r.f0).process(new Keyed()).print();env.execute();}public static class Keyed extends KeyedProcessFunction<String, Tuple2<String, Long>, String> {//每收到一个数据就会掉用一次@Overridepublic void processElement(Tuple2<String, Long> stringLongTuple2, Context context, Collector<String> collector) throws Exception {collector.collect("当前水位线是:" + context.timerService().currentWatermark());}}
}

Flink WatermarkKeyed相关推荐

  1. hadoop,spark,scala,flink 大数据分布式系统汇总

    20220314 https://shimo.im/docs/YcPW8YY3T6dT86dV/read 尚硅谷大数据文档资料 iceberg相当于对hive的读写,starrocks相当于对mysq ...

  2. 2021年大数据Flink(四十八):扩展阅读  Streaming File Sink

    目录 扩展阅读  Streaming File Sink 介绍 场景描述 Bucket和SubTask.PartFile 案例演示 扩展阅读  配置详解 PartFile PartFile序列化编码 ...

  3. 2021年大数据Flink(四十六):扩展阅读 异步IO

    目录 扩展阅读  异步IO 介绍 异步IO操作的需求 使用Aysnc I/O的前提条件 Async I/O API 案例演示 扩展阅读 原理深入 AsyncDataStream 消息的顺序性 扩展阅读 ...

  4. 2021年大数据Flink(四十五):​​​​​​扩展阅读 双流Join

    目录 扩展阅读  双流Join 介绍 Window Join Interval Join ​​​​​​​代码演示1 ​​​​​​​代码演示2 重点注意 扩展阅读  双流Join 介绍 https:// ...

  5. 2021年大数据Flink(四十四):​​​​​​扩展阅读 End-to-End Exactly-Once

    目录 扩展阅读 End-to-End Exactly-Once 流处理的数据处理语义 At-most-once-最多一次 At-least-once-至少一次 Exactly-once-精确一次 En ...

  6. 2021年大数据Flink(四十二):​​​​​​​BroadcastState

    ​​​​​目录 ​BroadcastState BroadcastState介绍 需求-实现配置动态更新 编码步骤 1.env 2.source 3.transformation 4.sink 5.e ...

  7. 2021年大数据Flink(四十一):​​​​​​​Flink实现订单自动好评

    目录 Flink实现订单自动好评 需求 数据 编码步骤 1.env 2.source 3.transformation 4.sink 5.execute 参考代码 参考效果 实现代码: Flink实现 ...

  8. 2021年大数据Flink(四十):​​​​​​​Flink模拟双十一实时大屏统计

    目录 Flink模拟双十一实时大屏统计 需求 数据 编码步骤: 1.env 2.source 3.transformation 4.使用上面聚合的结果,实现业务需求: 5.execute 参考代码 实 ...

  9. 2021年大数据Flink(三十九):​​​​​​​Table与SQL ​​​​​​总结 Flink-SQL常用算子

    目录 总结 Flink-SQL常用算子 SELECT WHERE ​​​​​​​DISTINCT ​​​​​​​GROUP BY ​​​​​​​UNION 和 UNION ALL ​​​​​​​JOI ...

最新文章

  1. [python] 从GPS坐标获取国家名
  2. python每日一类(3):os和sys
  3. 大学python和vb哪个简单-python和vb哪个简单
  4. 本地---tcpserver与tcpclient
  5. Android之DrawText详解
  6. python界面gui随机生成器_Python 实现的、带GUI界面的词云生成器
  7. 学习Python+numpy数组运算和矩阵运算看这254页PPT就够了
  8. JS进阶Date format(日期格式化)
  9. CDN---共享单车算啥,阿里云发布共享网络黑科技PCDN,降低视频行业75%的成本
  10. Incorrect username or password (access token)
  11. c语言字符输出128,如何将128位整数转换为C中的十进制ASCII字符串?
  12. 在 Linux 上如何清除内存的 Cache、Buffer 和交换空间
  13. AXI仿真之AXI Chip2Chip
  14. CAD与GIS集成说明(在线CAD结合GIS,webCAD)
  15. linux下部署selenium爬虫程序
  16. stm32学习笔记——电容触摸按键的实现
  17. 关于STVP写保护等级2的问题
  18. 服务器维护封号,LOL客服的关于他们自己服务器问题导致账号被封号的问题
  19. 傻白入门芯片设计,三大基本定律(十)
  20. 使用C#.NET WebBrowser控件导航到不同的网站出现 所请求的资源正在使用中。 (从HRESULT异常:0x800700AA)

热门文章

  1. 学习笔记 - 002
  2. c语言RTK算法,C-RTK 9P定位系统
  3. Linux 冯诺依曼体系结构
  4. oracle设置控制文件多路径,【备份恢复】 控制文件多路径
  5. plsql导出表结构_mysqldump命令详解 Part 5-按条件备份表数据
  6. stm32 web get 参数_纯进口mpv销量排行榜 迈巴赫vs680商务车参数
  7. java mongo数据去重复_java – MongoSpark保存重复的密钥错误E11000
  8. linux 源码 网络驱动,Linux网络驱动源码分析(一)
  9. python中静态变量和静态方法_从静态变量引用静态方法
  10. java tree类子项的添加和删除_Java学习2-23 JTree节点的删除与添加(来源网上)