需求描述:每隔5秒,计算最近10秒单词出现的次数。
TimeWindow实现

/*** 每隔5秒计算最近10秒单词出现的次数*/
public class TimeWindowWordCount {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStream = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {String[] fields = line.split(",");for (String word : fields) {out.collect(new Tuple2<>(word, 1));}}}).keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5)).sum(1);result.print().setParallelism(1);env.execute("TimeWindowWordCount");}
}

ProcessWindowFunction

/*** 每隔5秒计算最近10秒单词出现的次数*/
public class TimeWindowWordCount {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStream = env.socketTextStream("10.148.15.10", 8888);SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {String[] fields = line.split(",");for (String word : fields) {out.collect(new Tuple2<>(word, 1));}}}).keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5)).process(new SumProcessWindowFunction());result.print().setParallelism(1);env.execute("TimeWindowWordCount");}/*** IN, OUT, KEY, W* IN:输入的数据类型* OUT:输出的数据类型* Key:key的数据类型(在Flink里面,String用Tuple表示)* W:Window的数据类型*/public static class SumProcessWindowFunction extendsProcessWindowFunction<Tuple2<String,Integer>,Tuple2<String,Integer>,Tuple,TimeWindow> {FastDateFormat dataFormat = FastDateFormat.getInstance("HH:mm:ss");/*** 当一个window触发计算的时候会调用这个方法* @param tuple key* @param context operator的上下文* @param elements 指定window的所有元素* @param out 用户输出*/@Overridepublic void process(Tuple tuple, Context context, Iterable<Tuple2<String, Integer>> elements,Collector<Tuple2<String, Integer>> out) {System.out.println("当天系统的时间:"+dataFormat.format(System.currentTimeMillis()));System.out.println("Window的处理时间:"+dataFormat.format(context.currentProcessingTime()));System.out.println("Window的开始时间:"+dataFormat.format(context.window().getStart()));System.out.println("Window的结束时间:"+dataFormat.format(context.window().getEnd()));int sum = 0;for (Tuple2<String, Integer> ele : elements) {sum += 1;}// 输出单词出现的次数out.collect(Tuple2.of(tuple.getField(0), sum));}}
}

Time的种类

针对stream数据中的时间,可以分为以下三种:
Event Time:事件产生的时间,它通常由事件中的时间戳描述。
Ingestion time:事件进入Flink的时间
Processing Time:事件被处理时当前系统的时间

Process Time Window

需求:每隔5秒计算最近10秒的单词出现的次数

自定义source,模拟:第 13 秒的时候连续发送 2 个事件,第 16 秒的时候再发送 1 个事件

/*** 每隔5秒计算最近10秒单词出现的次数*/
public class TimeWindowWordCount {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> dataStream = env.addSource(new TestSouce());SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {String[] fields = line.split(",");for (String word : fields) {out.collect(new Tuple2<>(word, 1));}}}).keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5)).process(new SumProcessWindowFunction());result.print().setParallelism(1);env.execute("TimeWindowWordCount");}public static class TestSouce implements SourceFunction<String>{FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");@Overridepublic void run(SourceContext<String> ctx) throws Exception {// 控制大约在 10 秒的倍数的时间点发送事件String currTime = String.valueOf(System.currentTimeMillis());while (Integer.valueOf(currTime.substring(currTime.length() - 4)) > 100) {currTime = String.valueOf(System.currentTimeMillis());continue;}System.out.println("开始发送事件的时间:" + dateFormat.format(System.currentTimeMillis()));// 第 13 秒发送两个事件TimeUnit.SECONDS.sleep(13);ctx.collect("hadoop," + System.currentTimeMillis());// 产生了一个事件,但是由于网络原因,事件没有发送ctx.collect("hadoop," + System.currentTimeMillis());// 第 16 秒发送一个事件TimeUnit.SECONDS.sleep(3);ctx.collect("hadoop," + System.currentTimeMillis());TimeUnit.SECONDS.sleep(300);}@Overridepublic void cancel() {}}/*** IN, OUT, KEY, W* IN:输入的数据类型* OUT:输出的数据类型* Key:key的数据类型(在Flink里面,String用Tuple表示)* W:Window的数据类型*/public static class SumProcessWindowFunction extendsProcessWindowFunction<Tuple2<String,Integer>,Tuple2<String,Integer>,Tuple,TimeWindow> {FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");/*** 当一个window触发计算的时候会调用这个方法* @param tuple key* @param context operator的上下文* @param elements 指定window的所有元素* @param out 用户输出*/@Overridepublic void process(Tuple tuple, Context context, Iterable<Tuple2<String, Integer>> elements,Collector<Tuple2<String, Integer>> out) {//            System.out.println("当天系统的时间:"+dateFormat.format(System.currentTimeMillis()));
//
//            System.out.println("Window的处理时间:"+dateFormat.format(context.currentProcessingTime()));
//            System.out.println("Window的开始时间:"+dateFormat.format(context.window().getStart()));
//            System.out.println("Window的结束时间:"+dateFormat.format(context.window().getEnd()));int sum = 0;for (Tuple2<String, Integer> ele : elements) {sum += 1;}// 输出单词出现的次数out.collect(Tuple2.of(tuple.getField(0), sum));}}
}

Process Time Window

自定义source,模拟:第 13 秒的时候连续发送 2 个事件,但是有一个事件确实在第13秒的发送出去了,另外一个事件因为某种原因在19秒的时候才发送出去,第 16 秒的时候再发送 1 个事件

/*** 每隔5秒计算最近10秒单词出现的次数*/
public class TimeWindowWordCount {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> dataStream = env.addSource(new TestSouce());SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {String[] fields = line.split(",");for (String word : fields) {out.collect(new Tuple2<>(word, 1));}}}).keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5)).process(new SumProcessWindowFunction());result.print().setParallelism(1);env.execute("TimeWindowWordCount");}/*** 模拟:第 13 秒的时候连续发送 2 个事件,第 16 秒的时候再发送 1 个事件*/public static class TestSouce implements SourceFunction<String>{FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");@Overridepublic void run(SourceContext<String> ctx) throws Exception {// 控制大约在 10 秒的倍数的时间点发送事件String currTime = String.valueOf(System.currentTimeMillis());while (Integer.valueOf(currTime.substring(currTime.length() - 4)) > 100) {currTime = String.valueOf(System.currentTimeMillis());continue;}System.out.println("开始发送事件的时间:" + dateFormat.format(System.currentTimeMillis()));// 第 13 秒发送两个事件TimeUnit.SECONDS.sleep(13);ctx.collect("hadoop," + System.currentTimeMillis());// 产生了一个事件,但是由于网络原因,事件没有发送String event = "hadoop," + System.currentTimeMillis();// 第 16 秒发送一个事件TimeUnit.SECONDS.sleep(3);ctx.collect("hadoop," + System.currentTimeMillis());// 第 19 秒的时候发送TimeUnit.SECONDS.sleep(3);ctx.collect(event);TimeUnit.SECONDS.sleep(300);}@Overridepublic void cancel() {}}/*** IN, OUT, KEY, W* IN:输入的数据类型* OUT:输出的数据类型* Key:key的数据类型(在Flink里面,String用Tuple表示)* W:Window的数据类型*/public static class SumProcessWindowFunction extendsProcessWindowFunction<Tuple2<String,Integer>,Tuple2<String,Integer>,Tuple,TimeWindow> {FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");/*** 当一个window触发计算的时候会调用这个方法* @param tuple key* @param context operator的上下文* @param elements 指定window的所有元素* @param out 用户输出*/@Overridepublic void process(Tuple tuple, Context context, Iterable<Tuple2<String, Integer>> elements,Collector<Tuple2<String, Integer>> out) {//            System.out.println("当天系统的时间:"+dateFormat.format(System.currentTimeMillis()));
//
//            System.out.println("Window的处理时间:"+dateFormat.format(context.currentProcessingTime()));
//            System.out.println("Window的开始时间:"+dateFormat.format(context.window().getStart()));
//            System.out.println("Window的结束时间:"+dateFormat.format(context.window().getEnd()));int sum = 0;for (Tuple2<String, Integer> ele : elements) {sum += 1;}// 输出单词出现的次数out.collect(Tuple2.of(tuple.getField(0), sum));}}
}

使用Event Time处理无序

使用Event Time处理

/*** 每隔5秒计算最近10秒单词出现的次数*/
public class TimeWindowWordCount {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//步骤一:设置时间类型env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStreamSource<String> dataStream = env.addSource(new TestSouce());dataStream.map(new MapFunction<String, Tuple2<String,Long>>() {@Overridepublic Tuple2<String, Long> map(String line) throws Exception {String[] fields = line.split(",");return new Tuple2<>(fields[0],Long.valueOf(fields[1]));}//步骤二:获取数据里面的event Time}).assignTimestampsAndWatermarks(new EventTimeExtractor() ).keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5)).process(new SumProcessWindowFunction()).print().setParallelism(1);env.execute("TimeWindowWordCount");}public static class TestSouce implements SourceFunction<String>{FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");@Overridepublic void run(SourceContext<String> ctx) throws Exception {// 控制大约在 10 秒的倍数的时间点发送事件String currTime = String.valueOf(System.currentTimeMillis());while (Integer.valueOf(currTime.substring(currTime.length() - 4)) > 100) {currTime = String.valueOf(System.currentTimeMillis());continue;}System.out.println("开始发送事件的时间:" + dateFormat.format(System.currentTimeMillis()));// 第 13 秒发送两个事件TimeUnit.SECONDS.sleep(13);ctx.collect("hadoop," + System.currentTimeMillis());// 产生了一个事件,但是由于网络原因,事件没有发送String event = "hadoop," + System.currentTimeMillis();// 第 16 秒发送一个事件TimeUnit.SECONDS.sleep(3);ctx.collect("hadoop," + System.currentTimeMillis());// 第 19 秒的时候发送TimeUnit.SECONDS.sleep(3);ctx.collect(event);TimeUnit.SECONDS.sleep(300);}@Overridepublic void cancel() {}}/*** IN, OUT, KEY, W* IN:输入的数据类型* OUT:输出的数据类型* Key:key的数据类型(在Flink里面,String用Tuple表示)* W:Window的数据类型*/public static class SumProcessWindowFunction extendsProcessWindowFunction<Tuple2<String,Long>,Tuple2<String,Integer>,Tuple,TimeWindow> {FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");/*** 当一个window触发计算的时候会调用这个方法* @param tuple key* @param context operator的上下文* @param elements 指定window的所有元素* @param out 用户输出*/@Overridepublic void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements,Collector<Tuple2<String, Integer>> out) {//            System.out.println("当天系统的时间:"+dateFormat.format(System.currentTimeMillis()));
//
//            System.out.println("Window的处理时间:"+dateFormat.format(context.currentProcessingTime()));
//            System.out.println("Window的开始时间:"+dateFormat.format(context.window().getStart()));
//            System.out.println("Window的结束时间:"+dateFormat.format(context.window().getEnd()));int sum = 0;for (Tuple2<String, Long> ele : elements) {sum += 1;}// 输出单词出现的次数out.collect(Tuple2.of(tuple.getField(0), sum));}}private static class EventTimeExtractorimplements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");// 拿到每一个事件的 Event Time@Overridepublic long extractTimestamp(Tuple2<String, Long> element,long previousElementTimestamp) {return element.f1;}@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(System.currentTimeMillis());}}
}

现在我们第三个window的结果已经计算准确了,但是我们还是没有彻底的解决问题。接下来就需要我们使用WaterMark机制来解决了。

使用WaterMark机制解决无序

/*** 每隔5秒计算最近10秒单词出现的次数*/
public class TimeWindowWordCount {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//步骤一:设置时间类型env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStreamSource<String> dataStream = env.addSource(new TestSouce());dataStream.map(new MapFunction<String, Tuple2<String,Long>>() {@Overridepublic Tuple2<String, Long> map(String line) throws Exception {String[] fields = line.split(",");return new Tuple2<>(fields[0],Long.valueOf(fields[1]));}//步骤二:获取数据里面的event Time}).assignTimestampsAndWatermarks(new EventTimeExtractor() ).keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5)).process(new SumProcessWindowFunction()).print().setParallelism(1);env.execute("TimeWindowWordCount");}public static class TestSouce implements SourceFunction<String>{FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");@Overridepublic void run(SourceContext<String> ctx) throws Exception {// 控制大约在 10 秒的倍数的时间点发送事件String currTime = String.valueOf(System.currentTimeMillis());while (Integer.valueOf(currTime.substring(currTime.length() - 4)) > 100) {currTime = String.valueOf(System.currentTimeMillis());continue;}System.out.println("开始发送事件的时间:" + dateFormat.format(System.currentTimeMillis()));// 第 13 秒发送两个事件TimeUnit.SECONDS.sleep(13);ctx.collect("hadoop," + System.currentTimeMillis());// 产生了一个事件,但是由于网络原因,事件没有发送String event = "hadoop," + System.currentTimeMillis();// 第 16 秒发送一个事件TimeUnit.SECONDS.sleep(3);ctx.collect("hadoop," + System.currentTimeMillis());// 第 19 秒的时候发送TimeUnit.SECONDS.sleep(3);ctx.collect(event);TimeUnit.SECONDS.sleep(300);}@Overridepublic void cancel() {}}/*** IN, OUT, KEY, W* IN:输入的数据类型* OUT:输出的数据类型* Key:key的数据类型(在Flink里面,String用Tuple表示)* W:Window的数据类型*/public static class SumProcessWindowFunction extendsProcessWindowFunction<Tuple2<String,Long>,Tuple2<String,Integer>,Tuple,TimeWindow> {FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");/*** 当一个window触发计算的时候会调用这个方法* @param tuple key* @param context operator的上下文* @param elements 指定window的所有元素* @param out 用户输出*/@Overridepublic void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements,Collector<Tuple2<String, Integer>> out) {//            System.out.println("当天系统的时间:"+dateFormat.format(System.currentTimeMillis()));
//
//            System.out.println("Window的处理时间:"+dateFormat.format(context.currentProcessingTime()));
//            System.out.println("Window的开始时间:"+dateFormat.format(context.window().getStart()));
//            System.out.println("Window的结束时间:"+dateFormat.format(context.window().getEnd()));int sum = 0;for (Tuple2<String, Long> ele : elements) {sum += 1;}// 输出单词出现的次数out.collect(Tuple2.of(tuple.getField(0), sum));}}private static class EventTimeExtractorimplements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");// 拿到每一个事件的 Event Time@Overridepublic long extractTimestamp(Tuple2<String, Long> element,long previousElementTimestamp) {return element.f1;}@Nullable@Overridepublic Watermark getCurrentWatermark() {//window延迟5秒触发return new Watermark(System.currentTimeMillis() - 5000);}}
}

WaterMark的周期

/*** 每隔5秒计算最近10秒单词出现的次数*/
public class TimeWindowWordCount {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//步骤一:设置时间类型env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置waterMark产生的周期为1senv.getConfig().setAutoWatermarkInterval(1000);DataStreamSource<String> dataStream = env.addSource(new TestSouce());dataStream.map(new MapFunction<String, Tuple2<String,Long>>() {@Overridepublic Tuple2<String, Long> map(String line) throws Exception {String[] fields = line.split(",");return new Tuple2<>(fields[0],Long.valueOf(fields[1]));}//步骤二:获取数据里面的event Time}).assignTimestampsAndWatermarks(new EventTimeExtractor() ).keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5)).process(new SumProcessWindowFunction()).print().setParallelism(1);env.execute("TimeWindowWordCount");}public static class TestSouce implements SourceFunction<String>{FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");@Overridepublic void run(SourceContext<String> ctx) throws Exception {// 控制大约在 10 秒的倍数的时间点发送事件String currTime = String.valueOf(System.currentTimeMillis());while (Integer.valueOf(currTime.substring(currTime.length() - 4)) > 100) {currTime = String.valueOf(System.currentTimeMillis());continue;}System.out.println("开始发送事件的时间:" + dateFormat.format(System.currentTimeMillis()));// 第 13 秒发送两个事件TimeUnit.SECONDS.sleep(13);ctx.collect("hadoop," + System.currentTimeMillis());// 产生了一个事件,但是由于网络原因,事件没有发送String event = "hadoop," + System.currentTimeMillis();// 第 16 秒发送一个事件TimeUnit.SECONDS.sleep(3);ctx.collect("hadoop," + System.currentTimeMillis());// 第 19 秒的时候发送TimeUnit.SECONDS.sleep(3);ctx.collect(event);TimeUnit.SECONDS.sleep(300);}@Overridepublic void cancel() {}}/*** IN, OUT, KEY, W* IN:输入的数据类型* OUT:输出的数据类型* Key:key的数据类型(在Flink里面,String用Tuple表示)* W:Window的数据类型*/public static class SumProcessWindowFunction extendsProcessWindowFunction<Tuple2<String,Long>,Tuple2<String,Integer>,Tuple,TimeWindow> {FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");/*** 当一个window触发计算的时候会调用这个方法* @param tuple key* @param context operator的上下文* @param elements 指定window的所有元素* @param out 用户输出*/@Overridepublic void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements,Collector<Tuple2<String, Integer>> out) {int sum = 0;for (Tuple2<String, Long> ele : elements) {sum += 1;}// 输出单词出现的次数out.collect(Tuple2.of(tuple.getField(0), sum));}}private static class EventTimeExtractorimplements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");// 拿到每一个事件的 Event Time@Overridepublic long extractTimestamp(Tuple2<String, Long> element,long previousElementTimestamp) {//这个方法是每获取到一个数据就会被调用一次。return element.f1;}@Nullable@Overridepublic Watermark getCurrentWatermark() {/*** WasterMark会周期性的产生,默认就是每隔200毫秒产生一个**         设置 watermark 产生的周期为 1000ms*         env.getConfig().setAutoWatermarkInterval(1000);*///window延迟5秒触发System.out.println("water mark...");return new Watermark(System.currentTimeMillis() - 5000);}}
}

得到并打印每隔 3 秒钟统计前 3 秒内的相同的 key 的所有的事件

/*** 得到并打印每隔 3 秒钟统计前 3 秒内的相同的 key 的所有的事件*/
public class WaterMarkWindowWordCount {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//步骤一:设置时间类型env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置waterMark产生的周期为1senv.getConfig().setAutoWatermarkInterval(1000);DataStreamSource<String> dataStream = env.socketTextStream("10.148.15.10", 8888);dataStream.map(new MapFunction<String, Tuple2<String,Long>>() {@Overridepublic Tuple2<String, Long> map(String line) throws Exception {String[] fields = line.split(",");return new Tuple2<>(fields[0],Long.valueOf(fields[1]));}//步骤二:获取数据里面的event Time}).assignTimestampsAndWatermarks(new EventTimeExtractor() ).keyBy(0).timeWindow(Time.seconds(3)).process(new SumProcessWindowFunction()).print().setParallelism(1);env.execute("TimeWindowWordCount");}/*** IN, OUT, KEY, W* IN:输入的数据类型* OUT:输出的数据类型* Key:key的数据类型(在Flink里面,String用Tuple表示)* W:Window的数据类型*/public static class SumProcessWindowFunction extendsProcessWindowFunction<Tuple2<String,Long>,String,Tuple,TimeWindow> {FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");/*** 当一个window触发计算的时候会调用这个方法* @param tuple key* @param context operator的上下文* @param elements 指定window的所有元素* @param out 用户输出*/@Overridepublic void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements,Collector<String> out) {System.out.println("处理时间:" + dateFormat.format(context.currentProcessingTime()));System.out.println("window start time : " + dateFormat.format(context.window().getStart()));List<String> list = new ArrayList<>();for (Tuple2<String, Long> ele : elements) {list.add(ele.toString() + "|" + dateFormat.format(ele.f1));}out.collect(list.toString());System.out.println("window end time  : " + dateFormat.format(context.window().getEnd()));}}private static class EventTimeExtractorimplements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");private long currentMaxEventTime = 0L;private long maxOutOfOrderness = 10000; // 最大允许的乱序时间 10 秒// 拿到每一个事件的 Event Time@Overridepublic long extractTimestamp(Tuple2<String, Long> element,long previousElementTimestamp) {long currentElementEventTime = element.f1;currentMaxEventTime = Math.max(currentMaxEventTime, currentElementEventTime);System.out.println("event = " + element+ "|" + dateFormat.format(element.f1) // Event Time+ "|" + dateFormat.format(currentMaxEventTime)  // Max Event Time+ "|" + dateFormat.format(getCurrentWatermark().getTimestamp())); // Current Watermarkreturn currentElementEventTime;}@Nullable@Overridepublic Watermark getCurrentWatermark() {/*** WasterMark会周期性的产生,默认就是每隔200毫秒产生一个**         设置 watermark 产生的周期为 1000ms*         env.getConfig().setAutoWatermarkInterval(1000);*///window延迟5秒触发System.out.println("water mark...");return new Watermark(currentMaxEventTime - maxOutOfOrderness);}}
}
收集迟到的数据
/*** 得到并打印每隔 3 秒钟统计前 3 秒内的相同的 key 的所有的事件* 收集迟到太多的数据*/
public class WaterMarkWindowWordCount {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//步骤一:设置时间类型env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置waterMark产生的周期为1senv.getConfig().setAutoWatermarkInterval(1000);// 保存迟到的,会被丢弃的数据OutputTag<Tuple2<String, Long>> outputTag =new OutputTag<Tuple2<String, Long>>("late-date"){};DataStreamSource<String> dataStream = env.socketTextStream("10.148.15.10", 8888);SingleOutputStreamOperator<String> result = dataStream.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String line) throws Exception {String[] fields = line.split(",");return new Tuple2<>(fields[0], Long.valueOf(fields[1]));}//步骤二:获取数据里面的event Time}).assignTimestampsAndWatermarks(new EventTimeExtractor()).keyBy(0).timeWindow(Time.seconds(3))// .allowedLateness(Time.seconds(2)) // 允许事件迟到 2 秒.sideOutputLateData(outputTag) // 保存迟到太多的数据.process(new SumProcessWindowFunction());//打印正常的数据result.print();//获取迟到太多的数据DataStream<String> lateDataStream= result.getSideOutput(outputTag).map(new MapFunction<Tuple2<String, Long>, String>() {@Overridepublic String map(Tuple2<String, Long> stringLongTuple2) throws Exception {return "迟到的数据:" + stringLongTuple2.toString();}});lateDataStream.print();env.execute("TimeWindowWordCount");}/*** IN, OUT, KEY, W* IN:输入的数据类型* OUT:输出的数据类型* Key:key的数据类型(在Flink里面,String用Tuple表示)* W:Window的数据类型*/public static class SumProcessWindowFunction extendsProcessWindowFunction<Tuple2<String,Long>,String,Tuple,TimeWindow> {FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");/*** 当一个window触发计算的时候会调用这个方法* @param tuple key* @param context operator的上下文* @param elements 指定window的所有元素* @param out 用户输出*/@Overridepublic void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements,Collector<String> out) {System.out.println("处理时间:" + dateFormat.format(context.currentProcessingTime()));System.out.println("window start time : " + dateFormat.format(context.window().getStart()));List<String> list = new ArrayList<>();for (Tuple2<String, Long> ele : elements) {list.add(ele.toString() + "|" + dateFormat.format(ele.f1));}out.collect(list.toString());System.out.println("window end time  : " + dateFormat.format(context.window().getEnd()));}}private static class EventTimeExtractorimplements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");private long currentMaxEventTime = 0L;private long maxOutOfOrderness = 10000; // 最大允许的乱序时间 10 秒// 拿到每一个事件的 Event Time@Overridepublic long extractTimestamp(Tuple2<String, Long> element,long previousElementTimestamp) {long currentElementEventTime = element.f1;currentMaxEventTime = Math.max(currentMaxEventTime, currentElementEventTime);System.out.println("event = " + element+ "|" + dateFormat.format(element.f1) // Event Time+ "|" + dateFormat.format(currentMaxEventTime)  // Max Event Time+ "|" + dateFormat.format(getCurrentWatermark().getTimestamp())); // Current Watermarkreturn currentElementEventTime;}@Nullable@Overridepublic Watermark getCurrentWatermark() {/*** WasterMark会周期性的产生,默认就是每隔200毫秒产生一个**         设置 watermark 产生的周期为 1000ms*         env.getConfig().setAutoWatermarkInterval(1000);*/System.out.println("water mark...");return new Watermark(currentMaxEventTime - maxOutOfOrderness);}}
}

多并行度下的WaterMark
一个window可能会接受到多个waterMark,我们以最小的为准。

/**
* 得到并打印每隔 3 秒钟统计前 3 秒内的相同的 key 的所有的事件
* 测试多并行度
*/
public class WaterMarkWindowWordCount {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//把并行度设置为2env.setParallelism(2);//步骤一:设置时间类型env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置waterMark产生的周期为1senv.getConfig().setAutoWatermarkInterval(1000);// 保存迟到的,会被丢弃的数据OutputTag<Tuple2<String, Long>> outputTag =new OutputTag<Tuple2<String, Long>>("late-date"){};DataStreamSource<String> dataStream = env.socketTextStream("10.148.15.10", 8888);SingleOutputStreamOperator<String> result = dataStream.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String line) throws Exception {String[] fields = line.split(",");return new Tuple2<>(fields[0], Long.valueOf(fields[1]));}//步骤二:获取数据里面的event Time}).assignTimestampsAndWatermarks(new EventTimeExtractor()).keyBy(0).timeWindow(Time.seconds(3))// .allowedLateness(Time.seconds(2)) // 允许事件迟到 2 秒.sideOutputLateData(outputTag) // 保存迟到太多的数据.process(new SumProcessWindowFunction());//打印正常的数据result.print();//获取迟到太多的数据DataStream<String> lateDataStream= result.getSideOutput(outputTag).map(new MapFunction<Tuple2<String, Long>, String>() {@Overridepublic String map(Tuple2<String, Long> stringLongTuple2) throws Exception {return "迟到的数据:" + stringLongTuple2.toString();}});lateDataStream.print();env.execute("TimeWindowWordCount");}/*** IN, OUT, KEY, W* IN:输入的数据类型* OUT:输出的数据类型* Key:key的数据类型(在Flink里面,String用Tuple表示)* W:Window的数据类型*/public static class SumProcessWindowFunction extendsProcessWindowFunction<Tuple2<String,Long>,String,Tuple,TimeWindow> {FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");/*** 当一个window触发计算的时候会调用这个方法* @param tuple key* @param context operator的上下文* @param elements 指定window的所有元素* @param out 用户输出*/@Overridepublic void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements,Collector<String> out) {System.out.println("处理时间:" + dateFormat.format(context.currentProcessingTime()));System.out.println("window start time : " + dateFormat.format(context.window().getStart()));List<String> list = new ArrayList<>();for (Tuple2<String, Long> ele : elements) {list.add(ele.toString() + "|" + dateFormat.format(ele.f1));}out.collect(list.toString());System.out.println("window end time  : " + dateFormat.format(context.window().getEnd()));}}private static class EventTimeExtractorimplements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");private long currentMaxEventTime = 0L;private long maxOutOfOrderness = 10000; // 最大允许的乱序时间 10 秒// 拿到每一个事件的 Event Time@Overridepublic long extractTimestamp(Tuple2<String, Long> element,long previousElementTimestamp) {long currentElementEventTime = element.f1;currentMaxEventTime = Math.max(currentMaxEventTime, currentElementEventTime);//打印线程long id = Thread.currentThread().getId();System.out.println("当前线程ID:"+id+"event = " + element+ "|" + dateFormat.format(element.f1) // Event Time+ "|" + dateFormat.format(currentMaxEventTime)  // Max Event Time+ "|" + dateFormat.format(getCurrentWatermark().getTimestamp())); // Current Watermarkreturn currentElementEventTime;}@Nullable@Overridepublic Watermark getCurrentWatermark() {/*** WasterMark会周期性的产生,默认就是每隔200毫秒产生一个**         设置 watermark 产生的周期为 1000ms*         env.getConfig().setAutoWatermarkInterval(1000);*/System.out.println("water mark...");return new Watermark(currentMaxEventTime - maxOutOfOrderness);}}
}

WaterMark生成机制

/*** 得到并打印每隔 3 秒钟统计前 3 秒内的相同的 key 的所有的事件* 有条件的产生watermark*/
public class WaterMarkWindowWordCount {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//把并行度设置为2env.setParallelism(2);//步骤一:设置时间类型env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置waterMark产生的周期为1senv.getConfig().setAutoWatermarkInterval(1000);// 保存迟到的,会被丢弃的数据OutputTag<Tuple2<String, Long>> outputTag =new OutputTag<Tuple2<String, Long>>("late-date"){};DataStreamSource<String> dataStream = env.socketTextStream("10.148.15.10", 8888);SingleOutputStreamOperator<String> result = dataStream.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String line) throws Exception {String[] fields = line.split(",");return new Tuple2<>(fields[0], Long.valueOf(fields[1]));}//步骤二:获取数据里面的event Time}).assignTimestampsAndWatermarks(new EventTimeExtractor()).keyBy(0).timeWindow(Time.seconds(3))// .allowedLateness(Time.seconds(2)) // 允许事件迟到 2 秒.sideOutputLateData(outputTag) // 保存迟到太多的数据.process(new SumProcessWindowFunction());//打印正常的数据result.print();//获取迟到太多的数据DataStream<String> lateDataStream= result.getSideOutput(outputTag).map(new MapFunction<Tuple2<String, Long>, String>() {@Overridepublic String map(Tuple2<String, Long> stringLongTuple2) throws Exception {return "迟到的数据:" + stringLongTuple2.toString();}});lateDataStream.print();env.execute("TimeWindowWordCount");}/*** IN, OUT, KEY, W* IN:输入的数据类型* OUT:输出的数据类型* Key:key的数据类型(在Flink里面,String用Tuple表示)* W:Window的数据类型*/public static class SumProcessWindowFunction extendsProcessWindowFunction<Tuple2<String,Long>,String,Tuple,TimeWindow> {FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");/*** 当一个window触发计算的时候会调用这个方法* @param tuple key* @param context operator的上下文* @param elements 指定window的所有元素* @param out 用户输出*/@Overridepublic void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements,Collector<String> out) {System.out.println("处理时间:" + dateFormat.format(context.currentProcessingTime()));System.out.println("window start time : " + dateFormat.format(context.window().getStart()));List<String> list = new ArrayList<>();for (Tuple2<String, Long> ele : elements) {list.add(ele.toString() + "|" + dateFormat.format(ele.f1));}out.collect(list.toString());System.out.println("window end time  : " + dateFormat.format(context.window().getEnd()));}}/*** 按条件产生waterMark*/private static class EventTimeExtractor2implements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> {@Nullable@Overridepublic Watermark checkAndGetNextWatermark(Tuple2<String, Long> lastElement,long extractedTimestamp) {// 这个方法是每接收到一个事件就会调用// 根据条件产生 watermark ,并不是周期性的产生 watermarkif (lastElement.f0 == "000002") {// 才发送 watermarkreturn new Watermark(lastElement.f1 - 10000);}// 则表示不产生 watermarkreturn null;}@Overridepublic long extractTimestamp(Tuple2<String, Long> element,long previousElementTimestamp) {return element.f1;}}private static class EventTimeExtractorimplements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");private long currentMaxEventTime = 0L;private long maxOutOfOrderness = 10000; // 最大允许的乱序时间 10 秒// 拿到每一个事件的 Event Time@Overridepublic long extractTimestamp(Tuple2<String, Long> element,long previousElementTimestamp) {long currentElementEventTime = element.f1;currentMaxEventTime = Math.max(currentMaxEventTime, currentElementEventTime);long id = Thread.currentThread().getId();System.out.println("当前线程ID:"+id+"event = " + element+ "|" + dateFormat.format(element.f1) // Event Time+ "|" + dateFormat.format(currentMaxEventTime)  // Max Event Time+ "|" + dateFormat.format(getCurrentWatermark().getTimestamp())); // Current Watermarkreturn currentElementEventTime;}@Nullable@Overridepublic Watermark getCurrentWatermark() {/*** WasterMark会周期性的产生,默认就是每隔200毫秒产生一个**         设置 watermark 产生的周期为 1000ms*         env.getConfig().setAutoWatermarkInterval(1000);*** 和事件关系不大*    1. watermark 值依赖处理时间的场景*    2. 当有一段时间没有接收到事件,但是仍然需要产生 watermark 的场景*/System.out.println("water mark...");return new Watermark(currentMaxEventTime - maxOutOfOrderness);}}
}

flink的watermark参考配置相关推荐

  1. flink的watermark简单理解

    1.flink的watermark的作用是处理乱序,核心有两点: a.延迟等待一段时间,等乱序的数据到达 b.不能一直等,得有个限度,到了时间点没到,那么后面再来的乱序数据只能丢弃 2.对某个时间窗开 ...

  2. 机顶盒ttl无法输入_一个作业,多个TTL——Flink SQL 细粒度TTL配置的实现(二)

    ​在系列文前篇<FlinkSQL细粒度TTL配置的实现(一)>中,我们介绍了实现Flink SQL 细粒度TTL配置的基本原理:通过将原来一段SQL按照TTL的不同拆分为多段子SQL,然后 ...

  3. 【Flink】FLink 如果watermark水印时间超出今天会是什么问题呢

    1.概述 FLink 如果watermark水印时间超出今天会是什么问题呢 测试如下 /*** 测试点:测试事件时间,如果中途突然来了一个时间是未来时间 会导致什么?* 当前时间* 2022-01-0 ...

  4. 组装高性能服务器配置,高性能计算服务器参考配置需求.doc

    高性能计算服务器参考配置需求.doc 高性能计算服务器参考配置需求 第一台: 配置名称规格数量具体型号CPUIntel? Core? i7-4770X Processor Extreme Editio ...

  5. Flink之watermark(水印)讲解

    flink中watermark的详细介绍 使用前提: 处理数据开窗,处理数据的时间语义是事件时间,也就是每条数据产生的时间. 使用场景(解决问题): 处理乱序数据:flink中是实时处理数据,但是在处 ...

  6. Flink:watermark

    Table of Contents 三种时间概念 Processing time Event Time Ingestion time watermark 并行流的Watermarks 迟到的事件 wa ...

  7. Druid连接池参考配置和说明

    原文:Druid连接池参考配置和说明,以下是一个参考的连接池配置:通常来说,只需要修改initialSize.minIdle.maxActive. 如果用Oracle,则把poolPreparedSt ...

  8. 计算机电子预览室配置清单,[计算机]多功能学术报告厅环境系统设备参考配置清单表.docx...

    [计算机]多功能学术报告厅环境系统设备参考配置清单表 多功能学术报告厅环境系统设备参考配置清单表 序设备名称 参考配置 数量 单位号 一(显示设备 1主投影机分辨率1024 X768;亮度4500AN ...

  9. renren-fast后端源码参考-配置和对应工具

    1. renren-fast后端源码参考-配置和对应工具 1.1. 前言 renren-fast是个开源的前后端分离快速开放平台,没有自己框架的同学可以直接使用它的,而我打算浏览一遍它的代码,提取一些 ...

最新文章

  1. 知乎热议:国家何时整治程序员的高薪现象?网友:用命和头发换的钱都被人眼红!...
  2. AttributeError: ‘FPDF‘ object has no attribute ‘unifontsubset‘
  3. Leetcode - 347. Top K Frequent Elements(堆排序)
  4. Linux之ubuntu的网卡配置
  5. 洛谷 3398 仓鼠找sugar 【模板】判断树上两链有交
  6. Linux系统调用过程(Linux0.11内核实验)
  7. (转载)python re模块详解 正则表达式
  8. 阿里聚合直播盒子APP源码™ AlijuheCMS Build Demo 20190206
  9. 88个塑胶模具设计中常用的知识点
  10. java读取局域网种大华摄像机信息
  11. 科技文献检索与计算机应用,科技文献检索与计算机应用.doc
  12. 关于下载excel 解析文件名乱码
  13. JPA自动生成数据库表教程
  14. Twitter群推王的推特自动发帖功能是如何实现的
  15. 深度学习、机器学习领域毕业设计选题方法及建议
  16. 女程序员晒出5月的工资条:工资是高,但是真累,网友评论炸锅了
  17. Ubuntu16.04 终端命令行 文件重命名
  18. 谷歌seo关键词排名优化指南【2023年新版】
  19. JS 滚动屏幕至顶端
  20. 免费且超级好用的搜索引擎INSO

热门文章

  1. 模拟网页行为之实践篇
  2. 现代程序设计 作业 第1次
  3. 创新, FMA amp; SMA 世界第一台VCD机的故事
  4. define定义的是什么类型_为什么Django 3后建议使用Field.choices枚举类型定义choices选项...
  5. div字体居中_div和span的使用
  6. 粒子群算法tsp java_粒子群算法解决TSP问题
  7. html分类页面,CSS网页设计 把HTML标记分类
  8. 超级计算机游戏电脑,Salad邀请PC玩家参与全球最大分布式超级计算机的构建
  9. 更新elementui图标不显示_elementUI字体图标不显示问题
  10. 概要设计说明书_没有什么比牙签更好的设计了