只要水印watermark的时间大于等于窗口的结束时间,并且窗口内有数据存在,就会触发对应窗口计算。
除此之外,如果flink配置了allowedLateness参数,只要水印watermark的时间小于等于窗口的结束时间加上allowedLateness参数时间,将会重新触发对应窗口的计算。

滚动窗口联系watermark:

package Flink_Window;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import javax.annotation.Nullable;
import java.text.SimpleDateFormat;//watermark滚动窗口案例
public class SocketCountWindowEvent {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//连接socketDataStreamSource<String> dataStreamSource = env.socketTextStream("192.168.208.121", 8888, "\n");DataStream<String> streamOperator = dataStreamSource.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() {//设定水印,水印是只增不减的//maxOutOfOrderness表示允许数据最大乱序事件是30sLong maxOutOfOrderness = 30000L;//30sLong currentMaxTimestamp =0L;@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp-maxOutOfOrderness);}@Overridepublic long extractTimestamp(String s, long l) {String[] split =s.split(",");long event_time=Long.parseLong(split[1]);//水位线只增不减currentMaxTimestamp=Math.max(event_time,currentMaxTimestamp);return event_time;}});//WordCount程序主逻辑DataStream<Tuple2<String,Integer>> windowCounts = streamOperator.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] split = s.split("\\W+");collector.collect(Tuple2.of(split[0],1));}});DataStream<Tuple2<String, Integer>> result=windowCounts.keyBy(0).timeWindow(Time.seconds(60)).process(new ProcessWindowFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, Tuple, TimeWindow>() {SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");@Overridepublic void process(Tuple tuple, Context context, Iterable<Tuple2<String, Integer>> v2s, Collector<Tuple2<String,Integer>> collector) throws Exception {System.out.print("+++++++++++++++++++");System.out.print("算子是:"+Thread.currentThread().getId()+"窗口范围是:"+sdf.format(context.window().getStart())+"\t"+sdf.format(context.window().getEnd()));//查看那个水印出发了计算System.out.print(context.currentWatermark()+"\t"+sdf.format(context.currentWatermark()));int sum = 0;for(Tuple2<String,Integer> v2:v2s){sum += 1;}collector.collect(Tuple2.of(tuple.getField(0),sum));System.out.print("+++++++++++++++++++下++++++++++++++++++");}});result.print();env.execute("SocketCountWindowEvent");}}

watermark被认为是:eventtime减去30秒,这就是watermark的时间。

默认处理,当窗口被执行过后,数据再过来,Flink就会被遗弃掉。

Flink应该如何设置最大乱序时间maxOutOfOrderness

这个要结合自己的业务以及数据情况去设置。如果maxOutOfOrderness设置太小,而自身数据发送时由于网络等原因导致乱序或者late太多,
那么最终的结果就是会有很多单条的数据在window中被处罚,数据的正确性影响太大对于严重乱序的数据,需要严格统计数据最大延迟时间,才能保证计算的数据准确,
延时设置太小会影响数据准确性,延时设置太大不仅影响数据的实时性,更加会加重Flink作业的负担,不是对eventTime要求特别严格的数据,尽量不要采用eventTime方式来处理,会有丢数据的风险。

 滑动窗口的watermark案例

//滚动改为滑动窗口
//                .timeWindow(Time.seconds(60)).timeWindow(Time.seconds(60),Time.seconds(30))

Flink延迟数据三种处理方式:(延迟数据:eventTime<watermark时间的数据)

1、丢弃(默认的处理方式)
在Flink当中,如果输入数据所在的窗口已执行过了,Flink对这些延迟数据的处理方案默认就是丢弃,而不是再次出发响应的window窗口。
2、allowedLateness指定允许数据延迟的时间
在Flink当中,当输入数据所在的窗口已经执行过了,默认情况下即使再来心的数据,window也不会再次出发,但是如果我们希望再次被触发咋么解决?
即在某些情况下,我们希望为延迟的数据提供一个宽容的时间。

Flink提供了allowwedLateness方法,它可以实现对延迟数据设置一个延迟的时间,在指定延迟时间内到达的数据可以被再次出发窗口window计算。
在这里我们可以用一个列子来说明问题:
maxOutOfOrderness表示允许数据的最大乱序时间:好比我们的大货轮10:00开船,但是大货轮给乘客提供了5分钟的延迟时间,10:05开船;
allowedLateness表示是否可以再次触发窗口的延迟时间:好比大货轮10:05已经开船,但是大货轮又给乘客提供了2分钟的延迟时间,即只要大货轮在2min内的触发时间,都可以给你提供一个梯子,让你再次爬山来。

waterMark允许数据延迟时间与这个数据延迟的区别是;allowedLateness允许延迟时间在Watermark允许延迟时间的基础上增加的时间。

所谓的延迟数据,即窗口已经因为watermark进行了触发,则在此之后如果还有数据进入窗口,则默认情况下不会对窗口window进行再次触发和聚合计算。要想在数据进入已经被触发过的窗口后,还能继续触发窗口计算,则可以使用延迟数据处理机制。

因此如果你加上了allowedLateness参数,窗口的触发条件是:
a、窗口第一次触发是在Watermark时间>=window中的window_end_time;
b、第二次(或多次)触发的条件是Watermark时间<Window中的window_end_time + allowedLateness

3、sideOutputLateDate手机延迟数据
通过sideOutputLatedate函数可以把延迟的数据统一收集,统一存储,方便后续排查问题。

处理延迟数据:

package Flink_Window;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import javax.annotation.Nullable;
import java.text.SimpleDateFormat;//watermark滑动窗口案例
public class SocketCountWindowEvent {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//连接socketDataStreamSource<String> dataStreamSource = env.socketTextStream("192.168.208.121", 8888, "\n");DataStream<String> streamOperator = dataStreamSource.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() {//设定水印,水印是只增不减的//maxOutOfOrderness表示允许数据最大乱序事件是30sLong maxOutOfOrderness = 30000L;//30sLong currentMaxTimestamp =0L;@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp-maxOutOfOrderness);}@Overridepublic long extractTimestamp(String s, long l) {String[] split =s.split(",");long event_time=Long.parseLong(split[1]);//水位线只增不减currentMaxTimestamp=Math.max(event_time,currentMaxTimestamp);return event_time;}});//WordCount程序主逻辑DataStream<Tuple2<String,Integer>> windowCounts = streamOperator.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] split = s.split("\\W+");collector.collect(Tuple2.of(split[0],1));}});//保存被丢弃的数据OutputTag<Tuple2<String,Integer>> outputTag=new OutputTag<Tuple2<String,Integer>>("late_date"){};//注意:由于getSideOutput方法是SingleOutputStreamOperator类中特有的方法,所以这里不能用DataStream.SingleOutputStreamOperator<Tuple2<String, Integer>> result=windowCounts.keyBy(0)//滚动改为滑动窗口.timeWindow(Time.seconds(60))//保存被丢弃的数据.sideOutputLateData(outputTag).process(new ProcessWindowFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, Tuple, TimeWindow>() {SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");@Overridepublic void process(Tuple tuple, Context context, Iterable<Tuple2<String, Integer>> v2s, Collector<Tuple2<String,Integer>> collector) throws Exception {System.out.print("+++++++++++++++++++");System.out.print("算子是:"+Thread.currentThread().getId()+"窗口范围是:"+sdf.format(context.window().getStart())+"\t"+sdf.format(context.window().getEnd()));//查看那个水印出发了计算System.out.print(context.currentWatermark()+"\t"+sdf.format(context.currentWatermark()));int sum = 0;for(Tuple2<String,Integer> v2:v2s){sum += 1;}collector.collect(Tuple2.of(tuple.getField(0),sum));System.out.print("+++++++++++++++++++下++++++++++++++++++");}});//将延迟的数据暂时打印到控制台,实际中可以保存到其他存储介质当中DataStream<Tuple2<String,Integer>> sideOutput=result.getSideOutput(outputTag);sideOutput.print();result.print();env.execute("SocketCountWindowEvent");}}

Flink之Watermark滑动窗口案例相关推荐

  1. flink的(Sliding)滑动窗口

    flink的(Sliding)滑动窗口 未分组全局执行的滑动窗口(Sliding) 滑动窗口一般用于股票的可视化实现,他不会忽然高了忽然低了,他会有一个过渡缓冲区. public class Slid ...

  2. 2021年大数据Flink(二十):案例二 基于数量的滚动和滑动窗口

    目录 案例二 基于数量的滚动和滑动窗口 需求 代码实现 案例二 基于数量的滚动和滑动窗口 需求 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗 ...

  3. 2021年大数据Flink(十九):案例一 基于时间的滚动和滑动窗口

    目录 案例一 基于时间的滚动和滑动窗口 需求 代码实现 案例一 基于时间的滚动和滑动窗口 需求 nc -lk 9999 有如下数据表示: 信号灯编号和通过该信号灯的车的数量 9,3 9,2 9,7 4 ...

  4. Flink 滚动窗口、滑动窗口详解

    1 滚动窗口(Tumbling Windows) 滚动窗口有固定的大小,是一种对数据进行"均匀切片"的划分方式.窗口之间没有重叠,也不会有间隔,是"首尾相接"的 ...

  5. DataScience:数据处理技术之针对时间序列数据衍变—构造时间滑动窗口数据的简介、代码实现、案例应用之详细攻略

    DataScience:数据处理技术之针对时间序列数据衍变-构造时间滑动窗口数据的简介.代码实现.案例应用之详细攻略 目录 时间滑动窗口数据的简介

  6. 【FPGA教程案例76】通信案例2——基于FPGA的滑动窗口累加器实现

    FPGA教程目录 MATLAB教程目录 -------------------------------------------------------------------------------- ...

  7. Flink时间属性和窗口

    基于时间的操作,需要定义相关的时间语义和时间数据来源的信息.在Table API和SQL中,会给表单独提供一个逻辑上的时间字段,专门用来在表处理程序中指示时间 时间属性是每个表模式结构的一部分,它可以 ...

  8. Flink学习:WaterMark

    WaterMark 一.什么是水位线? 二.案例分析 三.如何生成水位线? (一).在SourceFunction中直接定义Timestamps和Watermarks (二).自定义生成Timstam ...

  9. Flink教程(17)- Flink Table与SQL(案例与SQL算子)

    文章目录 01 引言 02 Flink Table&SQL 案例 2.1 案例1(DataStream SQL统计) 2.2 案例2(DataStream Table&SQL统计) 2 ...

最新文章

  1. 一个中心、三大原则,阿里这样做智能对话开发平台
  2. 关于linux内核的wait等待事件和wakeup的核心原理
  3. 2.5 matlab稀疏矩阵
  4. 查看systemctl或service启动服务日志
  5. 将CloudWatch Logs与Cloudhub Mule集成
  6. 我花了十多分钟的i698源代码时间
  7. apache安装_kali Linux下的Apache的配置和安装:
  8. 什么是实验室人员比对人员_中实在线——程老师小课堂 实验室人员管理技巧...
  9. 基于平面 marker 的 Bundle Adjustmet
  10. python三引号的作用_Python学习笔记(三)基本数据类型
  11. 苹果计算机音乐谱大全,macOS乐谱制作软件大全推荐~
  12. 小米路由器r2d_小米路由器R2D亮黄灯维修
  13. 联想笔记本进入不了BIOS的解决方法
  14. @永和:为自己编码 --- 开源中国众包平台上线
  15. symbian塞班系统支持格式
  16. 一个案例深入Python中的__new__和__init__
  17. 欠定方程组的最小范数解
  18. 炫出我的色彩 HUAWEI nova青春版流光水波纹闪亮来袭
  19. win10计算机无法复制文件,Win10系统禁止U盘拷贝文件的方法【图文】
  20. 10/11论文关键词,自动去偏框架论文翻译,发文章思考

热门文章

  1. gitHub上传项目
  2. 新顶级域名、Cloud域名
  3. 牛客网-《剑指offer》-跳台阶
  4. ZT:公司绝对不会告诉你的潜规则(何杨)
  5. 实现查看订单详情功能
  6. mysql 表自动复制_mysql-10临时表、复制表
  7. android自定义textview销毁,Android自定义View去除TextView的Padding值
  8. 富文本框让最大四百像素_富文本框的使用
  9. idea js检查太卡_IntelliJ IDEA抑制、禁用与启用检查
  10. 对多用户分时系统最重要_互联网搜索引擎:让你的产品在最显眼的位置摆摊