1.案例

/*** 测试点:测试多 多并行度下的 watermark触发机制* 参考:链接:https://juejin.im/post/5bf95810e51d452d705fef33** @throws Exception*/@Testpublic void mainTest1() throws Exception {//定义socket的端口号int port = 9010;//获取运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置使用eventtime,默认是使用processtimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置并行度为1,默认并行度是当前机器的cpu数量env.setParallelism(1);//连接socket获取输入的数据DataStream<String> text = env.socketTextStream("localhost", port, "\n");//解析输入的数据DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {String[] arr = value.split(",");return new Tuple2<>(arr[0], Long.parseLong(arr[1]));}});//抽取timestamp和生成watermarkDataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {Long currentMaxTimestamp = 0L;final Long maxOutOfOrderness = 10000L;// 最大允许的乱序时间是10sSimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");/*** 定义生成watermark的逻辑* 默认100ms被调用一次*/@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}//定义如何提取timestamp@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {long timestamp = element.f1;currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);long id = Thread.currentThread().getId();System.out.println("键值 :" + element.f0 + "线程验证 :" + id + " , 事件事件:[ " + sdf.format(element.f1) + " ],currentMaxTimestamp:[ " +sdf.format(currentMaxTimestamp) + " ],水印时间:[ " + sdf.format(getCurrentWatermark().getTimestamp()) + " ]");return timestamp;}});//保存被丢弃的数据OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data") {};//注意,由于getSideOutput方法是SingleOutputStreamOperator子类中的特有方法,所以这里的类型,不能使用它的父类dataStream。SingleOutputStreamOperator<String> window = waterMarkStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和调用TimeWindow效果一样//.allowedLateness(Time.seconds(2))//允许数据迟到2秒.sideOutputLateData(outputTag).apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {/*** 对window内的数据进行排序,保证数据的顺序* @param tuple* @param window* @param input* @param out* @throws Exception*/@Overridepublic void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {String key = tuple.toString();List<Long> arrarList = new ArrayList<Long>();Iterator<Tuple2<String, Long>> it = input.iterator();while (it.hasNext()) {Tuple2<String, Long> next = it.next();arrarList.add(next.f1);}Collections.sort(arrarList);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String result = "\n键值 : " + key + "\n              触发窗内数据个数 : " + arrarList.size() + "\n              触发窗起始数据: " + sdf.format(arrarList.get(0)) + "\n              触发窗最后(可能是延时)数据:" + sdf.format(arrarList.get(arrarList.size() - 1))+ "\n              实际窗起始和结束时间: " + sdf.format(window.getStart()) + "《----》" + sdf.format(window.getEnd()) + " \n \n ";out.collect(result);}});//把迟到的数据暂时打印到控制台,实际中可以保存到其他存储介质中DataStream<Tuple2<String, Long>> sideOutput = window.getSideOutput(outputTag);sideOutput.print();//测试-把结果打印到控制台即可window.print();//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行env.execute("eventtime-watermark");}

2.前面代码中设置了并行度为 1

    env.setParallelism(1);

如果这里不设置的话, 代码在运行的时候会默认读取本机 CPU 数量设置并行度。
下面我们来验证一下, 把代码中的并行度调整为 2:

env.setParallelism(2);
  1. 发现玄机如下:在第二条事件时,其实已经达到窗的触发时机,但是因为并行度为2,只有等到最小
  2. watermark 到的时候才会触发窗计算。发现线程44处理的是001和003 ,线程42处理的是0002,所以只有等到线程42到达后,水印才会起作用执行2018-10-01 10:11:33.000所在的窗。
0001,1538359890000       2018-10-01 10:11:30
0002,1538359903000      2018-10-01 10:11:43
0003,1538359908000      2018-10-01 10:11:48

3.现在代码中设置了并行度为 8

发现 这 7 条数据都是被不同的线程处理的。 每个线程都有一个 watermark。且每一个线程都是基于自己接收数据的事件时间最大值。

因此,导致到最后现在还没获取到最小的 watermark, 所以 window 无法被触发执行。

只有所有的线程的最小watermark都满足watermark 时间 >= window_end_time时,触发历史窗才会执行。

  0001,1538359882000     2018-10-01 10:11:220002,1538359886000       2018-10-01 10:11:260003,1538359892000       2018-10-01 10:11:320004,1538359893000       2018-10-01 10:11:330005,1538359894000       2018-10-01 10:11:340006,1538359896000       2018-10-01 10:11:360007,1538359897000       2018-10-01 10:11:37


当持续发生事件数据时。一旦所有线程都达到最低的窗触发时机时,就会进行窗触发执行了。输入数据如下:
0007,1538359897000 2018-10-01 10:11:37
0008,1538359897000 2018-10-01 10:11:37
0009,1538359897000 2018-10-01 10:11:37
0010,1538359897000 2018-10-01 10:11:37
0011,1538359897000 2018-10-01 10:11:37
0012,1538359897000 2018-10-01 10:11:37
0013,1538359897000 2018-10-01 10:11:37
0014,1538359897000 2018-10-01 10:11:37
0015,1538359897000 2018-10-01 10:11:37


这里证明了这个结论

注意:多并行度的情况下,watermark对齐会取所有channel最小的watermark

【Flink】Flink 多并行度下的 watermark触发机制相关推荐

  1. 4.1.19 Flink-流处理框架-Flink中的时间语义和watermark水位线

    目录 1.Flink中的时间语义 1.1 EventTime 的代码设置 2.Watermark水位线 2.1 watermark的基本概念 2.2 watermark的特点和传递 2.3 Water ...

  2. Flink并行运行情况下watermark的传递机制

    今天学习了watermark传递机制,弄清楚了在多并行度情况下watermark的传递机制,特此备忘 此处使用的案例是参考了尚硅谷武老师flink教程中的案例,在此表示感谢 案例源代码 package ...

  3. 第一天:什么是Flink、WordCount入门、Flink安装、并行度

    1. 初识 Flink 在当前数据量激增的时代,各种业务场景都有大量的业务数据产生,对于这些不断产的数据应该如何进行有效的处理,成为当下大多数公司所面临的问题.目前比较流行的大数据处理引擎 Apach ...

  4. 【Flink】flink on yarn 并行度设置高导致任务失败

    文章目录 1.概述 1.概述 flink on yarn 并行度设置高了就失败有遇到过没,400的并行度就没事,设置成600就不断失败,是需要做什么额外的配置吗,比如网络缓冲?看报错日志没找到什么相关 ...

  5. Flink的Parallelism并行度

    一.Flink的Parallelism并行度 Flink的Parallelism并行度 在flink-conf.yaml中通过parallelism.default配置项给所有execution nv ...

  6. 1.21.Flink Slot和并行度(parallelism)\Flink的并行度由什么决定的?\Flink的task是什么?\slot和parallelism

    1.21.Flink Slot和并行度(parallelism) 1.21.1.Flink的并行度由什么决定的? 1.21.2.Flink的task是什么? 1.21.3.slot和paralleli ...

  7. 【Flink】Flink CDH6.3.2 下的yarn per job模式 savepoint和checkpoint,卡住,没有保存成功文件

    文章目录 1.场景1 1.1 概述 1.场景1 1.1 概述 我们遇见CDH6.3.2 下的yarn per job模式 savepoint和checkpoint,卡住,没有保存成功文件 非常简单的一 ...

  8. 【Flink】flink sql的并行度怎么单独设置

    1.概述 小记一下,记录flink sql的并行度怎么单独设置

  9. 【Flink状态】FsStateBackend 下 ValueState > MapState

    [Flink状态]FsStateBackend 下 ValueState > MapState 背景: 对程序进行状态后端替换(Rocks -> Fs)时,程序产生了背压.(状态开启了TT ...

最新文章

  1. SSH 远程连接原理及故障排错详解
  2. 吴裕雄--天生自然 HADOOP大数据分布式处理:修改CenterOS 7系统时间为北京时间
  3. 刷题总结——烽火传递(单调队列+dp)
  4. aliddns ipv6_群晖使用阿里云DDNS(ipv4和ipv6)
  5. TortoiseSVN每个菜单项都表示什么意思
  6. java 中映射关系_java – 在Hibernate中映射一对多的关系?
  7. Spring中的事件机制
  8. 【需求工程】需求分析的5W1H8C1D方法
  9. Flutter 即将占领整个 Web 开发
  10. linux转发邮件,转发Linux服务器上的传入邮件?
  11. 如何搭建反欺诈策略与模型
  12. IDEA破解图文教程
  13. 形式语言与自动机理论期末复习
  14. linux应用项目(一)数码相框(2)数码相框之字符编码与字符的点阵显示
  15. 2017《Java预备作业2》计科1502杨雪莹
  16. Golang线程池gpool
  17. iPhone 移除描述文件详细步骤(Apple Configurator 2)
  18. 查询数据库空间(mysql和oracle)
  19. 入选31个细分领域丨通付盾荣登嘶吼安全产业研究院《2022网络安全产业图谱》
  20. json进阶---jackson底层之JsonParser理解使用(springboot多结构参数的映射方法的实现思路)

热门文章

  1. 《和平精英》玩跨界,特斯拉主题店超级充电站现身海岛
  2. 嘀嗒公司被约谈 要求全面暂停进出京跨城网约车、顺风车等业务
  3. OPPO Find X2系列获蓝牙5.1认证:距离正式亮相又近了一步
  4. OPPO Reno 3 Pro再曝光:5G手机也有轻薄机身
  5. 外媒称青客公寓计划赴美IPO 筹资1.5亿美元
  6. 蔚来汽车:ES6和ES8首任车主可享受终生免费换电服务
  7. 苹果为什么收购英特尔手机基带业务?库克解释了一下
  8. 今天起高考能查分了!这种方式超方便的
  9. 华为P30 Pro现身安兔兔:未开性能模式跑分超28万
  10. 台达伺服驱动器说明书_干货:伺服调机实例讲解