【Flink】Flink 多并行度下的 watermark触发机制
1.案例
/*** 测试点:测试多 多并行度下的 watermark触发机制* 参考:链接:https://juejin.im/post/5bf95810e51d452d705fef33** @throws Exception*/@Testpublic void mainTest1() throws Exception {//定义socket的端口号int port = 9010;//获取运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置使用eventtime,默认是使用processtimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置并行度为1,默认并行度是当前机器的cpu数量env.setParallelism(1);//连接socket获取输入的数据DataStream<String> text = env.socketTextStream("localhost", port, "\n");//解析输入的数据DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {String[] arr = value.split(",");return new Tuple2<>(arr[0], Long.parseLong(arr[1]));}});//抽取timestamp和生成watermarkDataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {Long currentMaxTimestamp = 0L;final Long maxOutOfOrderness = 10000L;// 最大允许的乱序时间是10sSimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");/*** 定义生成watermark的逻辑* 默认100ms被调用一次*/@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}//定义如何提取timestamp@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {long timestamp = element.f1;currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);long id = Thread.currentThread().getId();System.out.println("键值 :" + element.f0 + "线程验证 :" + id + " , 事件事件:[ " + sdf.format(element.f1) + " ],currentMaxTimestamp:[ " +sdf.format(currentMaxTimestamp) + " ],水印时间:[ " + sdf.format(getCurrentWatermark().getTimestamp()) + " ]");return timestamp;}});//保存被丢弃的数据OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data") {};//注意,由于getSideOutput方法是SingleOutputStreamOperator子类中的特有方法,所以这里的类型,不能使用它的父类dataStream。SingleOutputStreamOperator<String> window = waterMarkStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和调用TimeWindow效果一样//.allowedLateness(Time.seconds(2))//允许数据迟到2秒.sideOutputLateData(outputTag).apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {/*** 对window内的数据进行排序,保证数据的顺序* @param tuple* @param window* @param input* @param out* @throws Exception*/@Overridepublic void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {String key = tuple.toString();List<Long> arrarList = new ArrayList<Long>();Iterator<Tuple2<String, Long>> it = input.iterator();while (it.hasNext()) {Tuple2<String, Long> next = it.next();arrarList.add(next.f1);}Collections.sort(arrarList);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String result = "\n键值 : " + key + "\n 触发窗内数据个数 : " + arrarList.size() + "\n 触发窗起始数据: " + sdf.format(arrarList.get(0)) + "\n 触发窗最后(可能是延时)数据:" + sdf.format(arrarList.get(arrarList.size() - 1))+ "\n 实际窗起始和结束时间: " + sdf.format(window.getStart()) + "《----》" + sdf.format(window.getEnd()) + " \n \n ";out.collect(result);}});//把迟到的数据暂时打印到控制台,实际中可以保存到其他存储介质中DataStream<Tuple2<String, Long>> sideOutput = window.getSideOutput(outputTag);sideOutput.print();//测试-把结果打印到控制台即可window.print();//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行env.execute("eventtime-watermark");}
2.前面代码中设置了并行度为 1
env.setParallelism(1);
如果这里不设置的话, 代码在运行的时候会默认读取本机 CPU 数量设置并行度。
下面我们来验证一下, 把代码中的并行度调整为 2:
env.setParallelism(2);
- 发现玄机如下:在第二条事件时,其实已经达到窗的触发时机,但是因为并行度为2,只有等到最小
- watermark 到的时候才会触发窗计算。发现线程44处理的是001和003 ,线程42处理的是0002,所以只有等到线程42到达后,水印才会起作用执行2018-10-01 10:11:33.000所在的窗。
0001,1538359890000 2018-10-01 10:11:30
0002,1538359903000 2018-10-01 10:11:43
0003,1538359908000 2018-10-01 10:11:48
3.现在代码中设置了并行度为 8
发现 这 7 条数据都是被不同的线程处理的。 每个线程都有一个 watermark。且每一个线程都是基于自己接收数据的事件时间最大值。
因此,导致到最后现在还没获取到最小的 watermark, 所以 window 无法被触发执行。
只有所有的线程的最小watermark都满足watermark 时间 >= window_end_time时,触发历史窗才会执行。
0001,1538359882000 2018-10-01 10:11:220002,1538359886000 2018-10-01 10:11:260003,1538359892000 2018-10-01 10:11:320004,1538359893000 2018-10-01 10:11:330005,1538359894000 2018-10-01 10:11:340006,1538359896000 2018-10-01 10:11:360007,1538359897000 2018-10-01 10:11:37
当持续发生事件数据时。一旦所有线程都达到最低的窗触发时机时,就会进行窗触发执行了。输入数据如下:
0007,1538359897000 2018-10-01 10:11:37
0008,1538359897000 2018-10-01 10:11:37
0009,1538359897000 2018-10-01 10:11:37
0010,1538359897000 2018-10-01 10:11:37
0011,1538359897000 2018-10-01 10:11:37
0012,1538359897000 2018-10-01 10:11:37
0013,1538359897000 2018-10-01 10:11:37
0014,1538359897000 2018-10-01 10:11:37
0015,1538359897000 2018-10-01 10:11:37
这里证明了这个结论
注意:多并行度的情况下,watermark对齐会取所有channel最小的watermark
。
【Flink】Flink 多并行度下的 watermark触发机制相关推荐
- 4.1.19 Flink-流处理框架-Flink中的时间语义和watermark水位线
目录 1.Flink中的时间语义 1.1 EventTime 的代码设置 2.Watermark水位线 2.1 watermark的基本概念 2.2 watermark的特点和传递 2.3 Water ...
- Flink并行运行情况下watermark的传递机制
今天学习了watermark传递机制,弄清楚了在多并行度情况下watermark的传递机制,特此备忘 此处使用的案例是参考了尚硅谷武老师flink教程中的案例,在此表示感谢 案例源代码 package ...
- 第一天:什么是Flink、WordCount入门、Flink安装、并行度
1. 初识 Flink 在当前数据量激增的时代,各种业务场景都有大量的业务数据产生,对于这些不断产的数据应该如何进行有效的处理,成为当下大多数公司所面临的问题.目前比较流行的大数据处理引擎 Apach ...
- 【Flink】flink on yarn 并行度设置高导致任务失败
文章目录 1.概述 1.概述 flink on yarn 并行度设置高了就失败有遇到过没,400的并行度就没事,设置成600就不断失败,是需要做什么额外的配置吗,比如网络缓冲?看报错日志没找到什么相关 ...
- Flink的Parallelism并行度
一.Flink的Parallelism并行度 Flink的Parallelism并行度 在flink-conf.yaml中通过parallelism.default配置项给所有execution nv ...
- 1.21.Flink Slot和并行度(parallelism)\Flink的并行度由什么决定的?\Flink的task是什么?\slot和parallelism
1.21.Flink Slot和并行度(parallelism) 1.21.1.Flink的并行度由什么决定的? 1.21.2.Flink的task是什么? 1.21.3.slot和paralleli ...
- 【Flink】Flink CDH6.3.2 下的yarn per job模式 savepoint和checkpoint,卡住,没有保存成功文件
文章目录 1.场景1 1.1 概述 1.场景1 1.1 概述 我们遇见CDH6.3.2 下的yarn per job模式 savepoint和checkpoint,卡住,没有保存成功文件 非常简单的一 ...
- 【Flink】flink sql的并行度怎么单独设置
1.概述 小记一下,记录flink sql的并行度怎么单独设置
- 【Flink状态】FsStateBackend 下 ValueState > MapState
[Flink状态]FsStateBackend 下 ValueState > MapState 背景: 对程序进行状态后端替换(Rocks -> Fs)时,程序产生了背压.(状态开启了TT ...
最新文章
- SSH 远程连接原理及故障排错详解
- 吴裕雄--天生自然 HADOOP大数据分布式处理:修改CenterOS 7系统时间为北京时间
- 刷题总结——烽火传递(单调队列+dp)
- aliddns ipv6_群晖使用阿里云DDNS(ipv4和ipv6)
- TortoiseSVN每个菜单项都表示什么意思
- java 中映射关系_java – 在Hibernate中映射一对多的关系?
- Spring中的事件机制
- 【需求工程】需求分析的5W1H8C1D方法
- Flutter 即将占领整个 Web 开发
- linux转发邮件,转发Linux服务器上的传入邮件?
- 如何搭建反欺诈策略与模型
- IDEA破解图文教程
- 形式语言与自动机理论期末复习
- linux应用项目(一)数码相框(2)数码相框之字符编码与字符的点阵显示
- 2017《Java预备作业2》计科1502杨雪莹
- Golang线程池gpool
- iPhone 移除描述文件详细步骤(Apple Configurator 2)
- 查询数据库空间(mysql和oracle)
- 入选31个细分领域丨通付盾荣登嘶吼安全产业研究院《2022网络安全产业图谱》
- json进阶---jackson底层之JsonParser理解使用(springboot多结构参数的映射方法的实现思路)