1.16.Flink Window和Time详解
1.16.1.Window(窗口)
1.16.2.Window的类型
1.16.3.Window类型汇总
1.16.4.TimeWindow的应用
1.16.5.CountWindow的应用
1.16.6.Window聚合分类
1.16.7.Window聚合分类之增量聚合
1.16.7.1.增量聚合状态变化过程-累加求和
1.16.7.2.reduce(reduceFunction)
1.16.7.3.aggregate(aggregateFunction)
1.16.8.Window聚合分类之全量聚合
1.16.8.1.全量聚合状态变化过程-求最大值
1.16.8.2.apply(windowFunction)
1.16.8.3.process(processWindowFunction)
1.16.9.Time介绍
1.16.9.1.设置Time类型
1.16.9.2.EventTime和Watermarks
1.16.9.3.有序的流的watermarks
1.16.9.4.无序的流的watermarks
1.16.9.5.多并行度流的watermarks
1.16.9.6.watermarks的生成方式
1.16.9.7.Flink应该如何设置最大乱序时间?
1.16.9.8.Flink应该如何设置最大乱序时间?

1.16.Flink Window和Time详解

1.16.1.Window(窗口)

聚合事件(比如计数、求和)在流上的工作方式与批处理不同。

  • 比如,对流中的所有元素进行计数是不可能的,因为通常流是无限的(无界的)。所以,流上的聚合需要由 window 来划定范围,比如 “计算过去的5分钟” ,或者 “最后100个元素的和”。
  • window是一种可以把无限数据切割为有限数据块的手段。
    窗口可以是 时间驱动的 【Time Window】(比如:每30秒)或者 数据驱动的【Count Window】 (比如:每100个元素)。

1.16.2.Window的类型

窗口通常被区分为不同的类型:
一:tumbling windows:滚动窗口 【没有重叠】

二:sliding windows:滑动窗口 【有重叠】

三:session windows:会话窗口

1.16.3.Window类型汇总

TimeWindow和CountWindow都可以有tumbling windows和sliding wndows

1.16.4.TimeWindow的应用

1.16.5.CountWindow的应用

1.16.6.Window聚合分类

增量聚合
全量聚合

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;/*** window** Created by xxxx on 2020/10/09 .*/
public class SocketDemoFullCount {public static void main(String[] args) throws Exception{//获取需要的端口号int port;try {ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("No port set. use default port 9000--java");port = 9000;}//获取flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String hostname = "hadoop100";String delimiter = "\n";//连接socket获取输入的数据DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);DataStream<Tuple2<Integer,Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer,Integer>>() {@Overridepublic Tuple2<Integer,Integer> map(String value) throws Exception {return new Tuple2<>(1,Integer.parseInt(value));}});intData.keyBy(0).timeWindow(Time.seconds(5)).process(new ProcessWindowFunction<Tuple2<Integer,Integer>, String, Tuple, TimeWindow>() {@Overridepublic void process(Tuple key, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<String> out)throws Exception {System.out.println("执行process。。。");long count = 0;for(Tuple2<Integer,Integer> element: elements){count++;}out.collect("window:"+context.window()+",count:"+count);}}).print();//这一行代码一定要实现,否则程序不执行env.execute("Socket window count");}}
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;/*** window** Created by xxxx on 2020/10/09 .*/
public class SocketDemoIncrAgg {public static void main(String[] args) throws Exception{//获取需要的端口号int port;try {ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("No port set. use default port 9000--java");port = 9000;}//获取flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String hostname = "hadoop100";String delimiter = "\n";//连接socket获取输入的数据DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);DataStream<Tuple2<Integer,Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer,Integer>>() {@Overridepublic Tuple2<Integer,Integer> map(String value) throws Exception {return new Tuple2<>(1,Integer.parseInt(value));}});intData.keyBy(0).timeWindow(Time.seconds(5)).reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {System.out.println("执行reduce操作:"+value1+","+value2);return new Tuple2<>(value1.f0,value1.f1+value2.f1);}}).print();//这一行代码一定要实现,否则程序不执行env.execute("Socket window count");}}
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;/*** 滑动窗口计算** 通过socket模拟产生单词数据* flink对数据进行统计计算** 需要实现每隔1秒对最近2秒内的数据进行汇总计算*** Created by xxxx on 2020/10/09 .*/
public class SocketWindowWordCountJava {public static void main(String[] args) throws Exception{//获取需要的端口号int port;try {ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("No port set. use default port 9000--java");port = 9000;}//获取flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String hostname = "hadoop100";String delimiter = "\n";//连接socket获取输入的数据DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);// a a c// a 1// a 1// c 1DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {public void flatMap(String value, Collector<WordWithCount> out) throws Exception {String[] splits = value.split("\\s");for (String word : splits) {out.collect(new WordWithCount(word, 1L));}}}).keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2秒,指定时间间隔为1秒.sum("count");//在这里使用sum或者reduce都可以/*.reduce(new ReduceFunction<WordWithCount>() {public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {return new WordWithCount(a.word,a.count+b.count);}})*///把数据打印到控制台并且设置并行度windowCounts.print().setParallelism(1);//这一行代码一定要实现,否则程序不执行env.execute("Socket window count");}public static class WordWithCount{public String word;public long count;public  WordWithCount(){}public WordWithCount(String word,long count){this.word = word;this.count = count;}@Overridepublic String toString() {return "WordWithCount{" +"word='" + word + '\'' +", count=" + count +'}';}}}
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;/*** checkpoint** Created by xxxx on 2020/10/09 .*/
public class SocketWindowWordCountJavaCheckPoint {public static void main(String[] args) throws Exception{//获取需要的端口号int port;try {ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("No port set. use default port 9000--java");port = 9000;}//获取flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】env.enableCheckpointing(1000);// 高级选项:// 设置模式为exactly-once (这是默认值)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一时间只允许进行一个检查点env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//设置statebackend//env.setStateBackend(new MemoryStateBackend());//env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints"));//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));String hostname = "hadoop100";String delimiter = "\n";//连接socket获取输入的数据DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);// a a c// a 1// a 1// c 1DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {public void flatMap(String value, Collector<WordWithCount> out) throws Exception {String[] splits = value.split("\\s");for (String word : splits) {out.collect(new WordWithCount(word, 1L));}}}).keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2秒,指定时间间隔为1秒.sum("count");//在这里使用sum或者reduce都可以/*.reduce(new ReduceFunction<WordWithCount>() {public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {return new WordWithCount(a.word,a.count+b.count);}})*///把数据打印到控制台并且设置并行度windowCounts.print().setParallelism(1);//这一行代码一定要实现,否则程序不执行env.execute("Socket window count");}public static class WordWithCount{public String word;public long count;public  WordWithCount(){}public WordWithCount(String word,long count){this.word = word;this.count = count;}@Overridepublic String toString() {return "WordWithCount{" +"word='" + word + '\'' +", count=" + count +'}';}}}
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;/*** 把collection集合作为数据源** Created by xxxx on 2020/10/09 on 2018/10/23.*/
public class StreamingFromCollection {public static void main(String[] args) throws Exception {//获取Flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();ArrayList<Integer> data = new ArrayList<>();data.add(10);data.add(15);data.add(20);//指定数据源DataStreamSource<Integer> collectionData = env.fromCollection(data);//通map对数据进行处理DataStream<Integer> num = collectionData.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer value) throws Exception {return value + 1;}});//直接打印num.print().setParallelism(1);env.execute("StreamingFromCollection");}
}

另外的Scala案例:

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time/*** 滑动窗口计算** 每隔1秒统计最近2秒内的数据,打印到控制台** Created by xxxx on 2020/10/09 .*/
object SocketWindowWordCountScala {def main(args: Array[String]): Unit = {//获取socket端口号val port: Int = try {ParameterTool.fromArgs(args).getInt("port")}catch {case e: Exception => {System.err.println("No port set. use default port 9000--scala")}9000}//获取运行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//链接socket获取输入数据val text = env.socketTextStream("hadoop100",port,'\n')//解析数据(把数据打平),分组,窗口计算,并且聚合求sum//注意:必须要添加这一行隐式转行,否则下面的flatmap方法执行会报错import org.apache.flink.api.scala._val windowCounts = text.flatMap(line => line.split("\\s"))//打平,把每一行单词都切开.map(w => WordWithCount(w,1))//把单词转成word , 1这种形式.keyBy("word")//分组.timeWindow(Time.seconds(2),Time.seconds(1))//指定窗口大小,指定间隔时间.sum("count");// sum或者reduce都可以//.reduce((a,b)=>WordWithCount(a.word,a.count+b.count))//打印到控制台windowCounts.print().setParallelism(1);//执行任务env.execute("Socket window count");}case class WordWithCount(word: String,count: Long)}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/
object StreamingFromCollectionScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隐式转换import org.apache.flink.api.scala._val data = List(10,15,20)val text = env.fromCollection(data)//针对map接收到的数据执行加1的操作val num = text.map(_+1)num.print().setParallelism(1)env.execute("StreamingFromCollectionScala")}}

1.16.7.Window聚合分类之增量聚合

窗口中每进入一条数据,就进行一次计算

reduce(reduceFunction)
aggregate(aggregateFunction)
sum(),min(),max()

1.16.7.1.增量聚合状态变化过程-累加求和

1.16.7.2.reduce(reduceFunction)

1.16.7.3.aggregate(aggregateFunction)

1.16.8.Window聚合分类之全量聚合

全量聚合

  • 等属于窗口的数据到齐,才开始进行聚合计算【可以实现对窗口内的数据进行排序等需求】
  • apply(windowFunction)
  • process(processWindowFunction)
    processWindowFunction比windowFunction提供了更多的上下文信息。

1.16.8.1.全量聚合状态变化过程-求最大值

1.16.8.2.apply(windowFunction)

1.16.8.3.process(processWindowFunction)


1.16.9.Time介绍

针对stream数据中的时间,可以分为以下三种

  • Event Time:事件产生的时间,它通常由事件中的时间戳描述。
  • Ingestion time:事件进入Flink的时间
  • Processing Time:事件被处理时当前系统的时间。

    处理时间(processing time):处理时间是指执行相应操作的机器的系统时间。
    当流处理程序基于处理时间运行时,所有基于时间的操作(如时间窗口)将使用运行相应运算符的机器的系统时钟。每小时处理时间窗口将包括在系统时钟指示整个小时之间到达特定运算符的所有记录。 例如,如果应用程序在上午9:15开始运行,则第一个每小时处理时间窗口将包括在上午9:15到10:00之间处理的事件,下一个窗口将包括在上午10:00到11:00之间处理的事件,以此类推。

处理时间是最简单的时间概念,不需要流和机器之间的协调。 它提供最佳性能和最低延迟。 但是,在分布式和异步环境中,处理时间不提供确定性,因为它容易受到记录到达系统的速度(例如从消息队列),记录在系统内的运算符之间流动的速度的影响,以及停电(计划或其他)。
事件时间(event time):事件时间是每个事件在其生产设备上发生的时间。此时间通常在进入Flink之前嵌入记录中,并且可以从每个记录中提取该事件时间戳。 在事件时间,时间的进展取决于数据,而不是任何时钟。 事件时间程序必须指定如何生成事件时间水印,这是表示事件时间进度的机制。 该水印机制在下面的后面部分中描述。

在一个完美的世界中,事件时间处理将产生完全一致和确定的结果,无论事件何时到达或其它们的顺序。 但是,除非事件已知按顺序到达(按时间戳),否则事件时间处理会在等待无序事件时产生一些延迟。 由于只能等待一段有限的时间,因此限制了确定性事件时间应用程序的运行方式。

假设所有数据都已到达,事件时间操作将按预期运行,即使在处理无序或延迟事件或重新处理历史数据时也会产生正确且一致的结果。 例如,每小时事件时间窗口将包含带有落入该小时的事件时间戳的所有记录,无论它们到达的顺序如何,或者何时处理它们。 (有关更多信息,请参阅有关迟到事件的部分。)

请注意,有时基于事件时间的程序处理实时数据时,它们将使用一些处理时间(processing time)操作,以保证它们及时进行。
进入时间(Ingestion time): 进入时间是事件进入Flink的时间。 在源运算符处,每个记录将源的当前时间作为时间戳,并且基于时间的操作(如时间窗口)引用该时间戳。
进入时间在概念上位于事件时间和处理时间之间。与处理时间相比,它代价稍高,但可以提供更可预测的结果。 因为进入时间使用稳定的时间戳(在源处分配一次),所以对记录的不同窗口操作将引用相同的时间戳,而在处理时间中,每个窗口操作符可以将记录分配给不同的窗口(基于本地系统时钟和 任何传输延误)。

与事件时间相比,进入时间程序无法处理任何无序事件或延迟数据,但程序不必指定如何生成水印。

在内部,摄取时间与事件时间非常相似,但具有自动分配时间戳和自动生成水印功能。

1.16.9.1.设置Time类型

Flink中,默认Time类似是ProcessingTime
可以在代码中设置

1.16.9.2.EventTime和Watermarks

在使用eventTime的时候如何处理乱序数据?
我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络延迟等原因,导致乱序的产生,特别是使用kafka的话,多个分区的数据无法保证有序。所以在进行window计算的时候,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark,watermark是用于处理乱序事件的。
watermark可以翻译为水位线

1.16.9.3.有序的流的watermarks

1.16.9.4.无序的流的watermarks

1.16.9.5.多并行度流的watermarks

注意:多并行度的情况下,watermark对齐会取所有channel最小的watermark

1.16.9.6.watermarks的生成方式

通常,在接收到source的数据后,应该立刻生成watermark;但是,也可以在source后,应用简单的map或者filter操作后,再生成watermark。
注意:如果指定多次watermark,后面指定的会覆盖前面的值。
生成方式

  • With Periodic Watermarks
    1、周期性的触发watermark的生成和发送,默认是100ms
    2、每隔N秒自动向流里注入一个WATERMARK 时间间隔由ExecutionConfig.setAutoWatermarkInterval 决定. 每次调用getCurrentWatermark 方法, 如果得到的WATERMARK 不为空并且比之前的大就注入流中。
    3、可以定义一个最大允许乱序的时间,这种比较常用
    4、实现AssignerWithPeriodicWatermarks接口

  • With Punctuated Watermarks
    1、基于某些事件触发watermark的生成和发送
    2、基于事件向流里注入一个WATERMARK,每一个元素都有机会判断是否生成一个WATERMARK. 如果得到的WATERMARK 不为空并且比之前的大就注入流中。
    3、实现AssignerWithPunctuatedWatermarks接口

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;/**** Watermark 案例** Created by xxxx on 2020/10/09.*/
public class StreamingWindowWatermark {public static void main(String[] args) throws Exception {//定义socket的端口号int port = 9000;//获取运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置使用eventtime,默认是使用processtimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置并行度为1,默认并行度是当前机器的cpu数量env.setParallelism(1);//连接socket获取输入的数据DataStream<String> text = env.socketTextStream("hadoop100", port, "\n");//解析输入的数据DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {String[] arr = value.split(",");return new Tuple2<>(arr[0], Long.parseLong(arr[1]));}});//抽取timestamp和生成watermarkDataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {Long currentMaxTimestamp = 0L;final Long maxOutOfOrderness = 10000L;// 最大允许的乱序时间是10sSimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");/*** 定义生成watermark的逻辑* 默认100ms被调用一次*/@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}//定义如何提取timestamp@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {long timestamp = element.f1;currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);long id = Thread.currentThread().getId();System.out.println("currentThreadId:"+id+",key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");return timestamp;}});DataStream<String> window = waterMarkStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和调用TimeWindow效果一样.apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {/*** 对window内的数据进行排序,保证数据的顺序* @param tuple* @param window* @param input* @param out* @throws Exception*/@Overridepublic void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {String key = tuple.toString();List<Long> arrarList = new ArrayList<Long>();Iterator<Tuple2<String, Long>> it = input.iterator();while (it.hasNext()) {Tuple2<String, Long> next = it.next();arrarList.add(next.f1);}Collections.sort(arrarList);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1))+ "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());out.collect(result);}});//测试-把结果打印到控制台即可window.print();//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行env.execute("eventtime-watermark");}}
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;/**** Watermark 案例** sideOutputLateData 收集迟到的数据** Created by xxxx on 2020/10/09.*/
public class StreamingWindowWatermark2 {public static void main(String[] args) throws Exception {//定义socket的端口号int port = 9000;//获取运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置使用eventtime,默认是使用processtimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置并行度为1,默认并行度是当前机器的cpu数量env.setParallelism(1);//连接socket获取输入的数据DataStream<String> text = env.socketTextStream("hadoop100", port, "\n");//解析输入的数据DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {String[] arr = value.split(",");return new Tuple2<>(arr[0], Long.parseLong(arr[1]));}});//抽取timestamp和生成watermarkDataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {Long currentMaxTimestamp = 0L;final Long maxOutOfOrderness = 10000L;// 最大允许的乱序时间是10sSimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");/*** 定义生成watermark的逻辑* 默认100ms被调用一次*/@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}//定义如何提取timestamp@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {long timestamp = element.f1;currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);System.out.println("key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");return timestamp;}});//保存被丢弃的数据OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data"){};//注意,由于getSideOutput方法是SingleOutputStreamOperator子类中的特有方法,所以这里的类型,不能使用它的父类dataStream。SingleOutputStreamOperator<String> window = waterMarkStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和调用TimeWindow效果一样//.allowedLateness(Time.seconds(2))//允许数据迟到2秒.sideOutputLateData(outputTag).apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {/*** 对window内的数据进行排序,保证数据的顺序* @param tuple* @param window* @param input* @param out* @throws Exception*/@Overridepublic void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {String key = tuple.toString();List<Long> arrarList = new ArrayList<Long>();Iterator<Tuple2<String, Long>> it = input.iterator();while (it.hasNext()) {Tuple2<String, Long> next = it.next();arrarList.add(next.f1);}Collections.sort(arrarList);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1))+ "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());out.collect(result);}});//把迟到的数据暂时打印到控制台,实际中可以保存到其他存储介质中DataStream<Tuple2<String, Long>> sideOutput = window.getSideOutput(outputTag);sideOutput.print();//测试-把结果打印到控制台即可window.print();//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行env.execute("eventtime-watermark");}}

scala案例:

import java.text.SimpleDateFormatimport org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collectorimport scala.collection.mutable.ArrayBuffer
import scala.util.Sorting/*** Watermark 案例* Created by xxxx on 2020/10/09*/
object StreamingWindowWatermarkScala {def main(args: Array[String]): Unit = {val port = 9000val env = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)val text = env.socketTextStream("hadoop100",port,'\n')val inputMap = text.map(line=>{val arr = line.split(",")(arr(0),arr(1).toLong)})val waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {var currentMaxTimestamp = 0Lvar maxOutOfOrderness = 10000L// 最大允许的乱序时间是10sval sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");override def getCurrentWatermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long) = {val timestamp = element._2currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)val id = Thread.currentThread().getIdprintln("currentThreadId:"+id+",key:"+element._1+",eventtime:["+element._2+"|"+sdf.format(element._2)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+ sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp+"|"+sdf.format(getCurrentWatermark().getTimestamp)+"]")timestamp}})val window = waterMarkStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3))) //按照消息的EventTime分配窗口,和调用TimeWindow效果一样.apply(new WindowFunction[Tuple2[String, Long], String, Tuple, TimeWindow] {override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]) = {val keyStr = key.toStringval arrBuf = ArrayBuffer[Long]()val ite = input.iteratorwhile (ite.hasNext){val tup2 = ite.next()arrBuf.append(tup2._2)}val arr = arrBuf.toArraySorting.quickSort(arr)val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");val result = keyStr + "," + arr.length + "," + sdf.format(arr.head) + "," + sdf.format(arr.last)+ "," + sdf.format(window.getStart) + "," + sdf.format(window.getEnd)out.collect(result)}})window.print()env.execute("StreamingWindowWatermarkScala")}
}
import java.text.SimpleDateFormatimport org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collectorimport scala.collection.mutable.ArrayBuffer
import scala.util.Sorting/*** Watermark 案例** sideOutputLateData 收集迟到的数据** Created by xxxx on 2020/10/09*/
object StreamingWindowWatermarkScala2 {def main(args: Array[String]): Unit = {val port = 9000val env = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)val text = env.socketTextStream("hadoop100",port,'\n')val inputMap = text.map(line=>{val arr = line.split(",")(arr(0),arr(1).toLong)})val waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {var currentMaxTimestamp = 0Lvar maxOutOfOrderness = 10000L// 最大允许的乱序时间是10sval sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");override def getCurrentWatermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long) = {val timestamp = element._2currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)val id = Thread.currentThread().getIdprintln("currentThreadId:"+id+",key:"+element._1+",eventtime:["+element._2+"|"+sdf.format(element._2)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+ sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp+"|"+sdf.format(getCurrentWatermark().getTimestamp)+"]")timestamp}})val outputTag = new OutputTag[Tuple2[String,Long]]("late-data"){}val window = waterMarkStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3))) //按照消息的EventTime分配窗口,和调用TimeWindow效果一样//.allowedLateness(Time.seconds(2))//允许数据迟到2秒.sideOutputLateData(outputTag).apply(new WindowFunction[Tuple2[String, Long], String, Tuple, TimeWindow] {override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]) = {val keyStr = key.toStringval arrBuf = ArrayBuffer[Long]()val ite = input.iteratorwhile (ite.hasNext){val tup2 = ite.next()arrBuf.append(tup2._2)}val arr = arrBuf.toArraySorting.quickSort(arr)val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");val result = keyStr + "," + arr.length + "," + sdf.format(arr.head) + "," + sdf.format(arr.last)+ "," + sdf.format(window.getStart) + "," + sdf.format(window.getEnd)out.collect(result)}})val sideOutput: DataStream[Tuple2[String, Long]] = window.getSideOutput(outputTag)sideOutput.print()window.print()env.execute("StreamingWindowWatermarkScala")}}

1.16.9.7.Flink应该如何设置最大乱序时间?

这个要结合自己的业务以及数据情况去设置。如果maxOutOfOrderness设置的太小,而自身数据发送时由于网络等原因导致乱序或者late太多,那么最终的结果就是会有很多单条的数据在window中被触发,数据的正确性影响太大。

对于严重乱序的数据,需要严格统计数据最大延迟时间,才能保证计算的数据准确,延时设置太小会影响数据准确性,延时设置太大不仅影响数据的实时性,更加会加重Flink作业的负担,不是对eventTime要求特别严格的数据,尽量不要采用eventTime方式来处理,会有丢数据的风险。

1.16.9.8.Flink应该如何设置最大乱序时间?

这个要结合自己的业务以及数据情况去设置。如果maxOutOfOrderness设置的太小,而自身数据发送时由于网络等原因导致乱序或者late太多,那么最终的结果就是会有很多单条的数据在window中被触发,数据的正确性影响太大。

对于严重乱序的数据,需要严格统计数据最大延迟时间,才能保证计算的数据准确,延时设置太小会影响数据准确性,延时设置太大不仅影响数据的实时性,更加会加重Flink作业的负担,不是对eventTime要求特别严格的数据,尽量不要采用eventTime方式来处理,会有丢数据的风险。

1.16.Flink Window和Time详解、TimeWindow的应用、Window聚合分类之全量聚合、全量聚合状态变化过程-求最大值、Time介绍、EventTime和Watermarks等相关推荐

  1. SpringBoot2.1.5(16)--- Spring Boot的日志详解

    SpringBoot2.1.5(16)--- Spring Boot的日志详解 市面上有许多的日志框架,比如 JUL( java.util.logging), JCL( Apache Commons ...

  2. OpenCV实战(16)——角点检测详解

    OpenCV实战(16)--角点检测详解 0. 前言 1. Harris 特征检测器 1.1 检测 Harris 角点 1.2 cv::cornerHarris 函数参数 2. 可追踪的良好特征 3. ...

  3. js基础 -- window.btoa和window.atob使用详解.md

    欢迎访问我的个人博客:http://www.xiaolongwu.cn 定义 atob() 解码一个Base64字符串. btoa() 从一个字符串或者二进制数据编码一个Base64字符串. 用法 只 ...

  4. linux 弹出窗口,实现弹出窗口的window.open用法详解(js代码)

    实现弹出窗口的window.open用法详解(js代码) [1.最基本的弹出窗口代码] 其实代码非常简单: 因为这是一段javascripts代码,所以它们应该放在 cript">标签 ...

  5. window.onload用法详解

    网页中的javaScript脚本代码往往需要在文档加载完成后才能够去执行,否则可能导致无法获取对象的情况,为了避免这种情况的发生,可以使用以下两种方式: 一.将脚本代码放在网页的底端,这样在运行脚本代 ...

  6. 深度学习网络模型——RepVGG网络详解、RepVGG网络训练花分类数据集整体项目实现

    深度学习网络模型--RepVGG网络详解.RepVGG网络训练花分类数据集整体项目实现 0 前言 1 RepVGG Block详解 2 结构重参数化 2.1 融合Conv2d和BN 2.2 Conv2 ...

  7. 16.linux用户和组详解演练,useradd,usermod,chage,userdel,id,su,groups,passwd,shadow,group,gshadow等命令和文件

    前言 本小节会详细介绍用户和组的关系,UID和GID,初始组和附加组,同时详细介绍groups,passwd,shadow,group,gshadow,login.defs,useradd 等文件含义 ...

  8. Flink CEP结合案例详解

    1.介绍 FlinkCEP是在Flink之上实现的复杂事件处理(CEP)库.它允许您在无穷无尽的事件流中检测事件模式,使您有机会掌握数据中重要的内容.通常会用来做一些用户操作APP的日志风控策略等多种 ...

  9. FLink时间与窗口详解

    大家好,我是小寒~ 从今天开始,我们开始分享 FLink 系列相关的文章.如果喜欢,记得关注一波. 在Flink中有四大基石,分别是 Time.Window.CheckPoint 和 State,今天 ...

最新文章

  1. python我的所得税计算器_教你使用Python实现新个税计算器
  2. 无界阻塞队列 LinkedBlockingQueue 原理探究
  3. 在Lotus Domino中使用Java构建应用程序
  4. python汉化之后好用吗_买了《Python学习手册(中文第4版)》后悔了
  5. Selector-背景选择器
  6. 巧妙设置yum软件库轻松解决软件包安装问题
  7. 妙味css3课程---1-1、css中自定义属性可以用属性选择器么
  8. 华为平板matepad pro鸿蒙,华为MatePad Pro 2平板电脑入网:首款预装鸿蒙OS
  9. Python菜鸟入门:day16编程学习
  10. 第0002 天:琐碎费时的小事
  11. 普通人的编辑利器——Vim
  12. python发送邮件
  13. Python模拟随机游走
  14. 蚂蚁的愤怒之源(落日余晖)-终结篇
  15. 【高效程序员系列】别碰鼠标——让键盘飞起来
  16. Shell脚本中dirname命令的使用
  17. 蓝桥杯省赛 砝码称重(B组)
  18. Involution 笔记:
  19. 2021-10-19 学习笔记 什么是 JK 触发器?
  20. sd和sem啥区别_Mean ± SEM or Mean(SD) 区别

热门文章

  1. python新人一月工资_python【项目】:工资管理(简易版)
  2. python自带的sum()函数和numpy库中的sum()函数的区别
  3. 1024-程序员节快乐!给大家发福利啦!以及向大家讲述节日由来
  4. Beyond的歌里最多是唏嘘吗? - Python分词+词频
  5. Python学习笔记--组合数据类型
  6. JavaScript实现计算需要更改的位数,以便将 numberA转换为 numberB(bitsDiff)算法(附完整源码)
  7. OpenCASCADE绘制测试线束:数据交换命令之XDE 命令概览
  8. wxWidgets:wxCaret 示例
  9. boost::mpl模块实现transform_view相关的测试程序
  10. boost::mp11::mp_reverse_fold相关用法的测试程序