一、ProcessFunction的使用

1、没有进行keyBy

package cn._51doit.flink.day07;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;public class NonKeyedProcessFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);//对NonKeyedDataStream调用ProcessSingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.process(new ProcessFunction<String, Tuple2<String, Integer>>() {@Overridepublic void processElement(String line, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {if (!word.equals("error")) {out.collect(Tuple2.of(word, 1));}}}});wordAndOne.print();env.execute();}
}

2、有keyBy

package cn._51doit.flink.day07;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;/****/
public class KeyedProcessFunctionDemo {public static void main(String[] args) throws Exception{//创建Flink流计算执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(10000);//设置重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));//创建DataStream//SourceDataStreamSource<String> lines = env.socketTextStream("localhost", 8888);//调用Transformation开始//调用TransformationSingleOutputStreamOperator<Tuple3<String, String, Double>> tpDataStream = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() {@Overridepublic Tuple3<String, String, Double> map(String value) throws Exception {String[] fields = value.split(",");return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2]));}});KeyedStream<Tuple3<String, String, Double>, String> keyedStream = tpDataStream.keyBy(t -> t.f0);//对KeyedDataStream调用process方法,可以获取KeyedStateSingleOutputStreamOperator<Tuple3<String, String, Double>> result = keyedStream.process(new KeyedProcessFunction<String, Tuple3<String, String, Double>, Tuple3<String, String, Double>>() {private transient MapState<String, Double> mapState;@Overridepublic void open(Configuration parameters) throws Exception {//定义一个状态描述器MapStateDescriptor<String, Double> stateDescriptor = new MapStateDescriptor<String, Double>("kv-state", String.class, Double.class);//初始化或恢复历史状态mapState = getRuntimeContext().getMapState(stateDescriptor);}@Overridepublic void processElement(Tuple3<String, String, Double> value, Context ctx, Collector<Tuple3<String, String, Double>> out) throws Exception {String city = value.f1;Double money = value.f2;Double historyMoney = mapState.get(city);if (historyMoney == null) {historyMoney = 0.0;}Double totalMoney = historyMoney + money; //累加//更新到state中mapState.put(city, totalMoney);//输出value.f2 = totalMoney;out.collect(value);}});result.print();//启动执行env.execute("StreamingWordCount");}
}

二、定时器

1、基本使用

package cn._51doit.flink.day07;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class ProcessingTimeTimerDemo {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//spark,1//spark,2//hadoop,1DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String line) throws Exception {String[] fields = line.split(",");return Tuple2.of(fields[0], Integer.parseInt(fields[1]));}});KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0);keyed.process(new KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>() {@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {//获取当前的ProcessingTimelong currentProcessingTime = ctx.timerService().currentProcessingTime();//System.out.println("当前时间:" + currentProcessingTime + ",定时器触发的时间:" + (currentProcessingTime + 30000));//将当前的ProcessingTime + 30 秒,注册一个定时器ctx.timerService().registerProcessingTimeTimer(currentProcessingTime + 30000);}//当闹钟到了指定的时间,就执行onTimer方法@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {System.out.println("定时器执行了:" + timestamp);}}).print();env.execute();}
}

2、先把数据攒起来,满足条件了再输出

package cn._51doit.flink.day07;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class ProcessingTimeTimerDemo02 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//spark,1//spark,2//hadoop,1DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String line) throws Exception {String[] fields = line.split(",");return Tuple2.of(fields[0], Integer.parseInt(fields[1]));}});KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0);keyed.process(new KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>() {private transient ValueState<Integer> counter;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("wc-state", Integer.class);counter = getRuntimeContext().getState(stateDescriptor);}@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {//获取当前的ProcessingTimelong currentProcessingTime = ctx.timerService().currentProcessingTime();long fireTime = currentProcessingTime - currentProcessingTime % 60000 + 60000;//下一分钟//如果注册相同数据的TimeTimer,后面的会将前面的覆盖,即相同的timeTimer只会触发一次ctx.timerService().registerProcessingTimeTimer(fireTime);Integer currentCount = value.f1;Integer historyCount = counter.value();if(historyCount == null) {historyCount = 0;}Integer totalCount =  historyCount + currentCount;//更新状态counter.update(totalCount);}//当闹钟到了指定的时间,就执行onTimer方法@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {//定时器触发,输出当前的结果Integer value = counter.value();String currentKey = ctx.getCurrentKey();//输出key,Value//如果想要实现类似滚动窗口,不累加类似数据,只是累加当前窗口的数据,就清空状态//counter.update(0);out.collect(Tuple2.of(currentKey, value));}}).print();env.execute();}
}

三、测流输出 / 旁路输出

1、获取不同类型的数据,打上不同的标签

package cn._51doit.flink.day07;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class SideOutputDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);//奇数OutputTag<String> oddOutputTag = new OutputTag<String>("odd") {};//偶数OutputTag<String> evenOutputTag = new OutputTag<String>("even") {};//非数字OutputTag<String> nanOutputTag = new OutputTag<String>("nan") {};SingleOutputStreamOperator<String> mainStream = lines.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String value, Context ctx, Collector<String> out) throws Exception {try {int i = Integer.parseInt(value);if (i % 2 == 0) {//偶数ctx.output(evenOutputTag, value);} else {//奇数ctx.output(oddOutputTag, value);}} catch (NumberFormatException e) {ctx.output(nanOutputTag, value);}//在主流中输出全部的数据out.collect(value);}});//偶数DataStream<String> evenStream = mainStream.getSideOutput(evenOutputTag);//奇数DataStream<String> oddStream = mainStream.getSideOutput(oddOutputTag);oddStream.print("odd: ");evenStream.print("even: ");mainStream.print("main: ");env.execute();}
}

2、使用侧流输出获取窗口迟到的数据

package cn._51doit.flink.day07;import org.apache.flink.api.common.eventtime.IngestionTimeAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.time.Duration;//使用侧流输出获取窗口迟到的数据
public class WindowLateDateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());//1000,spark,3//1200,spark,5//2000,hadoop,2//socketTextStream返回的DataStream并行度为1DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> dataWithWaterMark = lines.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((line, timestamp) -> Long.parseLong(line.split(",")[0])));SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = dataWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] fields = value.split(",");return Tuple2.of(fields[1], Integer.parseInt(fields[2]));}});//调用keyByKeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0);OutputTag<Tuple2<String, Integer>> lateDataTag = new OutputTag<Tuple2<String, Integer>>("late-data") {};//NonKeyd Window: 不调用KeyBy,然后调用windowAll方法,传入windowAssinger// Keyd Window: 先调用KeyBy,然后调用window方法,传入windowAssingerWindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = keyed.window(TumblingEventTimeWindows.of(Time.seconds(5)));.sideOutputLataData(lateDataTag); //将迟到数据打上标签SingleOutputStreamOperator<Tuple2<String, Integer>> summed = windowed.sum(1);summed.print();DataStream<Tuple2<String, Integer>> lataDataStream=summed.getSideOutput(lateDataTag); //从主流当中获取迟到数据lataDataStream.print("lata-Data: ");env.execute();}}

四、WindowFunction 使用

1、窗口内增量聚合,且与历史数据聚合

package cn._51oit.flink.day07;import org.apache.flink.api.common.eventtime.IngestionTimeAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.time.Duration;//使用侧流输出获取窗口迟到的数据
public class WindowLateDateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());//1000,spark,3//1200,spark,5//2000,hadoop,2//socketTextStream返回的DataStream并行度为1DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> dataWithWaterMark = lines.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((line, timestamp) -> Long.parseLong(line.split(",")[0])));SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = dataWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] fields = value.split(",");return Tuple2.of(fields[1], Integer.parseInt(fields[2]));}});//调用keyByKeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0);//NonKeyd Window: 不调用KeyBy,然后调用windowAll方法,传入windowAssinger//Keyd Window: 先调用KeyBy,然后调用window方法,传入windowAssingerWindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = keyed.window(TumblingEventTimeWindows.of(Time.seconds(5)));//如果直接调用sum或reduce,只会聚合窗口内的数据,不去跟历史数据进行累加//需求:可以在窗口内进行增量聚合,并且还可以与历史数据进行聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = windowed.reduce(new MyReduceFunc(), new MyWindowFunc());result.print();env.execute();}public static class MyReduceFunc implements ReduceFunction<Tuple2<String, Integer>> {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {value1.f1 = value1.f1 + value2.f1;return value1;}}public static class MyWindowFunc extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {private transient ValueState<Integer> sumState;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("wc-state", Integer.class);sumState = getRuntimeContext().getState(stateDescriptor);}@Overridepublic void process(String key, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple2<String, Integer>> out) throws Exception {Integer historyCount = sumState.value();if (historyCount == null) {historyCount = 0;}//获取到窗口聚合后输出的结果Tuple2<String, Integer> tp = elements.iterator().next();Integer windowCount = tp.f1;Integer totalCount = historyCount + windowCount;//更新状态sumState.update(totalCount);tp.f1 = totalCount;//输出out.collect(tp);}}
}

2、aggregate结合WindowFunction

package cn._51doit.flink.day07;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.time.Duration;//累加当前窗口的数据,并与历史数据进行累加
public class WindowAggregateFunctionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.enableCheckpointing(10000);//1000,spark,3//1200,spark,5//2000,hadoop,2//socketTextStream返回的DataStream并行度为1DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> dataWithWaterMark = lines.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((line, timestamp) -> Long.parseLong(line.split(",")[0])));SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = dataWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] fields = value.split(",");return Tuple2.of(fields[1], Integer.parseInt(fields[2]));}});//调用keyByKeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0);OutputTag<Tuple2<String, Integer>> lateDataTag = new OutputTag<Tuple2<String, Integer>>("late-data") {};//NonKeyd Window: 不调用KeyBy,然后调用windowAll方法,传入windowAssinger// Keyd Window: 先调用KeyBy,然后调用window方法,传入windowAssingerWindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = keyed.window(TumblingEventTimeWindows.of(Time.seconds(5)));//如果直接调用sum或reduce,只会聚合窗口内的数据,不去跟历史数据进行累加//需求:可以在窗口内进行增量聚合,并且还可以与历史数据进行聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = windowed.aggregate(new MyAggFunc(), new MyWindowFunc());result.print();env.execute();}private static class MyAggFunc implements AggregateFunction<Tuple2<String, Integer>, Integer, Integer> {//创建一个初始值@Overridepublic Integer createAccumulator() {return 0;}//数据一条数据,与初始值或中间累加的结果进行聚合@Overridepublic Integer add(Tuple2<String, Integer> value, Integer accumulator) {return value.f1 + accumulator;}//返回的结果@Overridepublic Integer getResult(Integer accumulator) {return accumulator;}//如果使用的是非SessionWindow,可以不实现@Overridepublic Integer merge(Integer a, Integer b) {return null;}}private static class MyWindowFunc extends ProcessWindowFunction<Integer, Tuple2<String, Integer>, String, TimeWindow> {private transient ValueState<Integer> sumState;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("wc-state", Integer.class);sumState = getRuntimeContext().getState(stateDescriptor);}@Overridepublic void process(String key, Context context, Iterable<Integer> elements, Collector<Tuple2<String, Integer>> out) throws Exception {Integer historyCount = sumState.value();if (historyCount == null) {historyCount = 0;}//获取到窗口聚合后输出的结果Integer windowCount = elements.iterator().next();Integer totalCount = historyCount + windowCount;//更新状态sumState.update(totalCount);//输出out.collect(Tuple2.of(key, totalCount));}}
}

3、ProcessWindowFunction使用

package cn._51doit.flink.day07;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.time.Duration;//累加当前串口的数据,并与历史数据进行累加
public class WindowProcessFunctionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.enableCheckpointing(10000);//1000,spark,3//1200,spark,5//2000,hadoop,2//socketTextStream返回的DataStream并行度为1DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> dataWithWaterMark = lines.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((line, timestamp) -> Long.parseLong(line.split(",")[0])));SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = dataWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] fields = value.split(",");return Tuple2.of(fields[1], Integer.parseInt(fields[2]));}});//调用keyByKeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0);//NonKeyd Window: 不调用KeyBy,然后调用windowAll方法,传入windowAssinger// Keyd Window: 先调用KeyBy,然后调用window方法,传入windowAssingerWindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = keyed.window(TumblingEventTimeWindows.of(Time.seconds(5)));//如果直接调用sum或reduce,只会聚合窗口内的数据,不去跟历史数据进行累加//需求:可以在窗口内进行增量聚合,并且还可以与历史数据进行聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = windowed.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {//窗口触发,才会调用process方法,该方法可以获取窗口内的全量获取窗口的数据,数据是缓存到windowstate中的@Overridepublic void process(String s, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple2<String, Integer>> out) throws Exception {for (Tuple2<String, Integer> element : elements) {out.collect(element);}}});result.print();env.execute();}}

大数据之flink定时器相关推荐

  1. Flink - 尚硅谷- 大数据高级 Flink 技术精讲 - 2

    七.Flink 时间语义与 Watermark 7.1 Flink 中的时间语义 7.2 设置 Event Time 7.3 水位线 - Watermark 7.3.1 基本概念 7.3.2 Wate ...

  2. 手把手教你搭建实时大数据引擎FLINK

    手把手教你搭建实时大数据引擎FLINK 服务器规划 Standalone高可用HA模式 架构图 下载并上传tar包 具体安装步骤 yarm 集群环境搭建 服务器规划 服务器规划 服务名称 职能 zhe ...

  3. 大数据之flink教程

    第一章 Flink简介 1.1  初识Flink Flink起源于Stratosphere项目,Stratosphere是在2010~2014年由3所地处柏林的大学和欧洲的一些其他的大学共同进行的研究 ...

  4. 深度解读!新一代大数据引擎Flink厉害在哪?(附实现原理细节)

    导语 | 大数据计算分为离线计算和实时计算,其中离线计算就是我们通常说的批计算,代表技术是Hadoop MapReduce.Hive等:实时计算也被称作流计算,代表技术是Storm.Spark Str ...

  5. 大数据入门--Flink(四)状态管理与容错机制

    状态管理与容错机制 术语 状态管理 容错机制 状态一致性 检查点(checkpoint) 保存点(savepoint) 状态后端(state backend) 案例 术语 算子状态.键控状态.状态一致 ...

  6. 大数据之Flink流式计算引擎

    Flink DataFlow 数据的分类 有界数据 离线的计算 效率:硬件相同的情况下:时间 无界数据 实时的计算 效率:硬件环境相同的情况下,吞吐量:数据处理的数量.延迟:结果输出的时间-数据接收的 ...

  7. 【硬刚大数据】Flink在实时在实时计算平台和实时数仓中的企业级应用小结

    欢迎关注博客主页:https://blog.csdn.net/u013411339 欢迎点赞.收藏.留言 ,欢迎留言交流! 本文由[王知无]原创,首发于 CSDN博客! 本文首发CSDN论坛,未经过官 ...

  8. 大数据之flink共享资源槽

    算子链: 为方便执行,Flink将不同算子的子任务(subtask)链接到一个任务里,每一个任务在一个线程中执行.这是一个非常有用的优化方式,它减小了进程间数据交换和缓存的开销,而且在减少延迟同时增加 ...

  9. 大数据之Flink的看了就可入门

    Flink介绍 介绍 原理 简单使用 初步编程 介绍 1 什么是Flink Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算. Flink起源于Stratosp ...

最新文章

  1. Linux VIM IDE
  2. python decode hex_在python2.7中使用b64decode()将base64转换为hex
  3. UP_DOWN_REQUEST
  4. Linux下VNC配置多个桌面和修改密码 不会当系统重启vnc失效
  5. Android用户界面设计:框架布局
  6. java字符流写入式乱码_字节流乱码与字符流乱码
  7. C语言 文件读写 EOF - C语言零基础入门教程
  8. ELKElasticSearch5.1基础概念及配置文件详解【转】
  9. 怀念08,憧憬09;08盘点,09启航。
  10. 100道MySQL数据库面试题解析
  11. php自定义请求headers,php通过header发送自定义数据方法
  12. 战斗部毁伤效能评估软件系统
  13. windows光标移动快捷键操作
  14. windows端的MarginNote:BookxNote
  15. 最终幻想13-2时钟迷题破解工具
  16. Bluemix平台打造DC/OS云计算平台(一)
  17. HTML5+JavaScript调用摄像头拍照或者摄像
  18. Java 空格“ ”、空字符串“”和null区别
  19. C语言中%s,%m.ns 和 %e,%m.ne 的意思
  20. NLP01(自然语言处理)第一章 绪论

热门文章

  1. 谁能给个orkut邀请啊~~
  2. 李刚疯狂java抄袭,推荐:疯狂java讲义--李刚著作(3)
  3. python 单词拆音节_计算一个单词的音节数
  4. Redies未授权访问
  5. 动易安全开发手册 完整版
  6. C/C++/SFML编写俄罗斯方块小程序 附代码和下载链接
  7. Duang~Shark 闪跌 99%! Fork 了 Bunny 的代码还 Fork 了它的攻击
  8. mysql重新设置主键生成策略为auto_increment时报错:resulting in duplicate entry '1' for key 'PRIMARY'
  9. linux内核如何读写ddr,linux内核解压详解.doc
  10. R语言 编写自定义函数