今天学习了watermark传递机制,弄清楚了在多并行度情况下watermark的传递机制,特此备忘
此处使用的案例是参考了尚硅谷武老师flink教程中的案例,在此表示感谢

案例源代码

package com.atguigu.apitest.window;import com.atguigu.apitest.beans.SensorReading;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @author mars* @version 1.0* Create by 2022/1/8 22:57*/public class WindowTest3_EventTimeWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStreamSource<String> dataStreamSource = env.socketTextStream("hadoop1", 9999);SingleOutputStreamOperator<SensorReading> map = dataStreamSource.map(new MapFunction<String, SensorReading>() {@Overridepublic SensorReading map(String value) throws Exception {String[] fields = value.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}});SingleOutputStreamOperator<SensorReading> assignTimestampsAndWatermarks = map.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {@Overridepublic long extractTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});//基于事件时间的开窗聚合SingleOutputStreamOperator<SensorReading> cntStream = assignTimestampsAndWatermarks.keyBy("id").window(TumblingEventTimeWindows.of(Time.seconds(10))).minBy("temperature");cntStream.print("cntStream");env.execute();}
}

pojo类

package com.atguigu.apitest.beans;/*** @author mars* @version 1.0* Create by 2021/12/30 20:38*//*** 传感器温度读数的数据类型*/
public class SensorReading {private String id;private Long timestamp;private Double temperature;public SensorReading() {}public SensorReading(String id, Long timestamp, Double temperature) {this.id = id;this.timestamp = timestamp;this.temperature = temperature;}public String getId() {return id;}public void setId(String id) {this.id = id;}public Long getTimestamp() {return timestamp;}public void setTimestamp(Long timestamp) {this.timestamp = timestamp;}public Double getTemperature() {return temperature;}public void setTemperature(Double temperature) {this.temperature = temperature;}@Overridepublic String toString() {return "SensorReading{" +"id='" + id + '\'' +", timestamp=" + timestamp +", temperature=" + temperature +'}';}
}

测试数据

sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718209,32.8
sensor_1,1547718212,37.1

情景1:设置并行度为1时


情景2:设置并行度为4时

watermark的传递机制是向下游广播

补充:如何知道上图中的划分的window起止点为[190,200)?

在使用事件时间语义的窗口函数中

package org.apache.flink.streaming.api.windowing.windows;

函数如下

 /*** Method to get the window start for a timestamp.** @param timestamp epoch millisecond to get the window start.* @param offset The offset which window start would be shifted by.* @param windowSize The size of the generated windows.* @return window start*/public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {return timestamp - (timestamp - offset + windowSize) % windowSize;}```
因此
1547718199 的起始点为1547718190

Flink并行运行情况下watermark的传递机制相关推荐

  1. 凯撒密码:设想在某些情况下给朋友传递字条信息,但又不希望传递中途被第三方看懂这些信息,因此需要对字条信息进行加密处理

    题目 凯撒密码:设想在某些情况下给朋友传递字条信息,但又不希望传递中途被第三方看懂这些信息,因此需要对字条信息进行加密处理.凯撒密码采用了替换算法对信息中的每一个英文字符循环替换为该字符后面第三个字符 ...

  2. python继承如何进行引用传递的区别_python在什么情况下会引用传递呢?

    我写了一个对单链表进行排序的程序,核心部分如下: class Solution: def insertionSortList(self, head): ans = None cur = head wh ...

  3. Android 中Touch(触屏)事件传递机制

    版本:2.0 日期:2014.3.21 2014.3.29 版权:© 2014 kince 转载注明出处 一.基本概念 在实际开发中,经常会遇到与触屏事件有关的问题,最典型的一个就是滑动冲突.比如在使 ...

  4. Android事件传递机制(转)

    Android事件构成 在Android中,事件主要包括点按.长按.拖拽.滑动等,点按又包括单击和双击,另外还包括单指操作和多指操作.所有这些都构成了Android中的事件响应.总的来说,所有的事件都 ...

  5. iOS 消息的传递机制

    2019独角兽企业重金招聘Python工程师标准>>> 转载原地址:http://beyondvincent.com/blog/2013/12/14/124-communicatio ...

  6. PyTorch 深度剖析:并行训练的 DP 和 DDP 分别在什么情况下使用及实例

    ↑ 点击蓝字 关注极市平台 作者丨科技猛兽 编辑丨极市平台 极市导读 这篇文章从应用的角度出发,介绍 DP 和 DDP 分别在什么情况下使用,以及各自的使用方法.以及 DDP 的保存和加载模型的策略, ...

  7. python单核运行_python下多核,单核CPU对于并行,并发执行效率的对比-Go语言中文社区...

    ** ** 这篇博客主要内容为python 中多线程以及多进程的效率对比,以及记录自己在做这个实验中遇到的一些问题以及心得 背景引入: CPU制造商为了追求CPU效率放弃了在CPU频率上的追求(CPU ...

  8. linux 下怎么将可执行文件做成镜像 开机就能运行这个可执行文件,圣诞老人的ELFs:在没有execve的情况下运行Linux可执行文件...

    这篇博客是年度的12 Days of HaXmas系列博客中的第11篇. Executable and Linkable Format ( ELF ) 是许多类Unix操作系统(如Linux,大多数现 ...

  9. Java不用main方法运行_如何在不定义main方法的情况下运行Java程序?

    我正在查看一些Java源代码,并注意到main方法没有定义. Java如何编译源代码而不知道从哪里开始? main方法仅在Java虚拟机执行代码时使用.没有main方法就无法执行代码,但仍然可以编译代 ...

最新文章

  1. Attach Volume 操作(Part I) - 每天5分钟玩转 OpenStack(53)
  2. 为什么白帽SEO更好?
  3. 程序员如何选择适合的公司
  4. 「高并发秒杀」linux安装软件有哪几种方式
  5. java永久冻结_Java如何解决脆弱基类(基类被冻结)问题
  6. 工业计算机改造报告,工业计算机实验报告..docx
  7. python---可执行文件的转换
  8. C#的变迁史08 - C# 5.0 之并行编程总结篇
  9. 【记忆化递归+DP】LeetCode 139. Word Break
  10. 在设计四人抢答器中灯全亮_EDA课程设计—四人抢答器设计
  11. c语言程序设计实验总结范文,《c语言程序设计》课程实验报告模板.doc
  12. c 语言 登陆窗口界面,c/c++语言实现登陆界面
  13. 弓形锯床主传动及工作机构设计
  14. ES自定义评分机制:function_score查询详解
  15. python 等高线图标注_Pyplot等高线图-clabel间距
  16. 互联网与移动互联网仍是本世纪最大创业机会
  17. oracle狎鸥亭_韩国Oracle(奥拉克)整形医院
  18. 计算器(C++QT)——有全部代码哦
  19. C++020-C++因数,公因数,公倍数
  20. 句子反转(“hello xiao mi”- “mi xiao hello”)

热门文章

  1. Unity优化 调整画质(贴图)质量
  2. 【针对产品说测试】之测试“用户登录”
  3. 阿里巴巴2020年双11: Flink流批一体化真的来了 (Flink Forward 2020  PPT)
  4. DUC的matlab仿真
  5. 超级计算机多少钱100万,超级计算机的100万亿倍!中国量子计算机“九章”为何这么快?...
  6. JPA简单入门以及IDEA第一个JPA程序
  7. ABAP中 PERFORM USING与CHANGING的用法
  8. SpringBoot https双向认证操作
  9. FPGA程序如何模块化设计?
  10. C语言中的int8_t,uint8_t, int16_t,uint16_t, int32_t,uint32_t, int64_t,uint64_t和int数组,char数组以及sizeof()的理解