Flink数据流DataStream
准备
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.registerJobListener(new JobListener() {@Overridepublic void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {Logger.getLogger("test").info("onJobSubmitted");}@Overridepublic void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {Logger.getLogger("test").info("onJobExecuted");}
});
1. 运算
1)map(MapFunction<T, R> mapper)
输入一个数据,输出一个数据,中间可以做任意变换,下面例子中输入流是TestObj类型,最终输出的是String类型
List<TestObj> testObjs=new ArrayList<>();
testObjs.add(new TestObj(1,"苹果,梨"));
testObjs.add(new TestObj(2,"柚子,橘子"));
testObjs.add(new TestObj(3,"猫,虎"));
testObjs.add(new TestObj(4,"狗,狼"));
DataStream<TestObj> data=env.fromCollection(testObjs);
data.map(new MapFunction<TestObj, String>() {@Overridepublic String map(TestObj testObj) throws Exception {return testObj.getValue();}
}).print();
try {env.execute();
} catch (Exception e) {e.printStackTrace();
}
结果
7> 苹果,梨
8> 柚子,橘子
1> 猫,虎
2> 狗,狼
>号前面的7、8、1、2是子任务序号,后面是输出结果,这里输出了testObj的value值
MapFunction<T, O>
Function的子接口,泛型T是输入值类型,O是输出值类型,包含唯一方法 O map(T var1) throws Exception,输入T类型数据返回O类型数据,中间可以自定义处理。
2)flatMap(FlatMapFunction<T, R> flatMapper)
输入一个数据,输出一个或多个数据,下面例子是把testObj的value值用逗号分割后存入集合,最终输出了2倍的数据
data.flatMap(new FlatMapFunction<TestObj, String>() {@Overridepublic void flatMap(TestObj testObj, Collector<String> collector) throws Exception {String[] ss=testObj.getValue().split(",");for (String s:ss){collector.collect(testObj.getKey()+":"+s);}}
}).print();
结果:
4> 1:苹果
6> 3:猫
7> 4:狗
6> 3:虎
5> 2:柚子
5> 2:橘子
4> 1:梨
7> 4:狼
冒号前面的数字是testObj的key值,相同key的两个数据来自于一个初始数据,可以看到一个初始数据的testObj依然是一个子任务执行的
FlatMapFunction<T, O>
Function的子接口,泛型T是输入值类型,O是输出值类型,包含唯一方法 void flatMap(T var1, Collector<O> var2) throws Exception,输入T类型数据处理后存入Collector<O>
3)filter(FilterFunction<T> filter)
输入一个数据,根据自定义判断是否保留该数据,下面例子数据key值为2的倍数的testObj,为了方便看结果用map做了一下转换,不然输出的是object地址。
data.filter(new FilterFunction<TestObj>() {@Overridepublic boolean filter(TestObj testObj) throws Exception {return testObj.getKey()%2==0;}
}).map(new MapFunction<TestObj, String>() {@Overridepublic String map(TestObj testObj) throws Exception {return testObj.getKey()+":"+testObj.getValue();}
}).print();
结果:
1> 2:柚子,橘子
3> 4:狗,狼
仅输出了key为2和4的数据
FilterFunction<T>
Function的子接口,泛型T是输入值类型,输出值布尔类型,包含唯一方法 boolean filter(T var1) throws Exception,返回true保留数据,false去掉数据。
4)assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy)
水位线,一般用于处理乱序事件
data.assignTimestampsAndWatermarks(new WatermarkStrategy<TestObj>() {@Overridepublic WatermarkGenerator<TestObj> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<TestObj>() {@Overridepublic void onEvent(TestObj testObj, long l, WatermarkOutput watermarkOutput) {Logger.getLogger("test").info("onEvent: "+testObj.getKey());//时间触发时执行,检查并记忆时间戳或生成watermark}@Overridepublic void onPeriodicEmit(WatermarkOutput watermarkOutput) {Logger.getLogger("test").info("onPeriodicEmit: ");//周期性执行,可能会生成新的Watermark}};}
}).map(new MapFunction<TestObj, String>() {@Overridepublic String map(TestObj testObj) throws Exception {return testObj.getKey()+":"+testObj.getValue();}
}).print();
输出
九月 30, 2021 4:17:03 下午 com.test.flink.Test$1 onJobSubmitted
信息: onJobSubmitted
九月 30, 2021 4:17:03 下午 com.test.flink.Test$3$1 onEvent
信息: onEvent: 1
九月 30, 2021 4:17:03 下午 com.test.flink.Test$3$1 onEvent
信息: onEvent: 2
九月 30, 2021 4:17:03 下午 com.test.flink.Test$3$1 onEvent
信息: onEvent: 3
九月 30, 2021 4:17:03 下午 com.test.flink.Test$3$1 onEvent
信息: onEvent: 4
九月 30, 2021 4:17:03 下午 com.test.flink.Test$3$1 onPeriodicEmit
信息: onPeriodicEmit:
8> 4:狗,狼
6> 2:柚子,橘子
7> 3:猫,虎
5> 1:苹果,梨
九月 30, 2021 4:17:03 下午 com.test.flink.Test$1 onJobExecuted
信息: onJobExecuted
可以看出会给每个数据都打上一个watermark
5) process(ProcessFunction<T, R> processFunction)
ProcessFunction比FlatMapFunction多了一个Context参数,context可以获得时间戳和watermark,当然前提是之前设置过,不然返回null
data.assignTimestampsAndWatermarks(new WatermarkStrategy<TestObj>() {@Overridepublic WatermarkGenerator<TestObj> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<TestObj>() {@Overridepublic void onEvent(TestObj testObj, long l, WatermarkOutput watermarkOutput) {}@Overridepublic void onPeriodicEmit(WatermarkOutput watermarkOutput) {}};}
}).process(new ProcessFunction<TestObj, String>() {@Overridepublic void processElement(TestObj testObj, Context context, Collector<String> collector) throws Exception {long ts = context.timestamp();long cpt = context.timerService().currentProcessingTime();long cw = context.timerService().currentWatermark();collector.collect(testObj.getKey()+":"+ts+"-"+cpt+"-"+cw);}
}).print();
输出
1> 4:-9223372036854775808-1633654899323--9223372036854775808
6> 1:-9223372036854775808-1633654899323--9223372036854775808
7> 2:-9223372036854775808-1633654899324--9223372036854775808
8> 3:-9223372036854775808-1633654899323--9223372036854775808
2. 分区
1)keyBy(KeySelector<T, K> key)
data.keyBy(v->v.getKey()).print();data.keyBy(TestObj::getKey).print();data.keyBy(new KeySelector<TestObj, Integer>() {@Overridepublic Integer getKey(TestObj testObj) throws Exception {return testObj.getKey();}
}).print();
这几种写法都是一个意思,大概翻了一下源码,好像是用key分了个区。
DataSream.class
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {Preconditions.checkNotNull(key);return new KeyedStream(this, (KeySelector)this.clean(key));
}protected <F> F clean(F f) {return this.getExecutionEnvironment().clean(f);
}
KeyedStream.class
public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {this(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
}public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {this(dataStream, new PartitionTransformation(dataStream.getTransformation(), new KeyGroupStreamPartitioner(keySelector, 128)), keySelector, keyType);
}@Internal
KeyedStream(DataStream<T> stream, PartitionTransformation<T> partitionTransformation, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {super(stream.getExecutionEnvironment(), partitionTransformation);this.keySelector = (KeySelector)this.clean(keySelector);this.keyType = this.validateKeyType(keyType);
}
PartitionTransformation.class 的官方说明
This transformation represents a change of partitioning of the input elements.
这个transformation代表输入数据的分区变化
This does not create a physical operation, it only affects how upstream operations are connected to downstream operations.
它不会生成一个实际上的算子,仅影响上游算子如何连接下游算子
也就是说keyBy本身其实没有执行运算,因此如果把后面的print()去掉,仅保留一个keyBy是不能执行的,会报错:java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. 但是map等方法是可以的。
稍微修改一下数据
List<TestObj> testObjs=new ArrayList<>();
testObjs.add(new TestObj(1,"苹果,梨"));
testObjs.add(new TestObj(1,"柚子,橘子"));
testObjs.add(new TestObj(3,"猫,虎"));
testObjs.add(new TestObj(3,"狗,狼"));
DataStream<TestObj> data=env.fromCollection(testObjs);
data.keyBy(new KeySelector<TestObj, Integer>() {@Overridepublic Integer getKey(TestObj testObj) throws Exception {return testObj.getKey();}
}).map(new MapFunction<TestObj, String>() {@Overridepublic String map(TestObj testObj) throws Exception {return testObj.getKey()+":"+testObj.getValue();}
}).print();
输出
8> 3:猫,虎
6> 1:苹果,梨
8> 3:狗,狼
6> 1:柚子,橘子
可以看出分区之后同一个分区的数据由一个子任务执行
keyBy方法返回的是KeyedSream,DataStream的一个子类,可以使用DataSteam除了分区方法之外的所有方法。
2)forward()
直接保留上游的分区,下面例子先用keyBy分区,然后运算,forward()之后再次运算
data.keyBy(TestObj::getKey).map(new MapFunction<TestObj, TestObj>() {@Overridepublic TestObj map(TestObj testObj) throws Exception {return testObj;}
}).forward().map(new MapFunction<TestObj, String>() {@Overridepublic String map(TestObj testObj) throws Exception {return testObj.getKey()+":"+testObj.getValue();}
}).print();
输出
8> 3:猫,虎
8> 3:狗,狼
6> 1:苹果,梨
6> 1:柚子,橘子
依然是两两一组
3)rebalence()
把上游数据循环分区到下游
data.keyBy(TestObj::getKey).map(new MapFunction<TestObj, TestObj>() {@Overridepublic TestObj map(TestObj testObj) throws Exception {return testObj;}
}).rebalance().map(new MapFunction<TestObj, String>() {@Overridepublic String map(TestObj testObj) throws Exception {return testObj.getKey()+":"+testObj.getValue();}
}).print();
输出
1> 3:狗,狼
7> 1:柚子,橘子
8> 3:猫,虎
6> 1:苹果,梨
重新分配使用了不同的子任务
4)shuffle()
随机分配上游数据到下游
data.keyBy(TestObj::getKey).map(new MapFunction<TestObj, TestObj>() {@Overridepublic TestObj map(TestObj testObj) throws Exception {return testObj;}
}).shuffle().map(new MapFunction<TestObj, String>() {@Overridepublic String map(TestObj testObj) throws Exception {return testObj.getKey()+":"+testObj.getValue();}
}).print();
输出
7> 3:树袋熊
8> 1:苹果,梨
8> 1:葡萄
8> 3:猫,虎
8> 3:羊,牛
5> 1:柚子,橘子
5> 3:狗,狼
为了看出随机效果加了些数据,key还是1,3两个
5)rescale()
把上游分区数据分别循环到下游分区中
data.keyBy(TestObj::getKey).map(new MapFunction<TestObj, TestObj>() {@Overridepublic TestObj map(TestObj testObj) throws Exception {return testObj;}
}).rescale().map(new MapFunction<TestObj, String>() {@Overridepublic String map(TestObj testObj) throws Exception {return testObj.getKey()+":"+testObj.getValue();}
}).print().setParallelism(4);
输出
1> 1:柚子,橘子
2> 1:葡萄
4> 1:苹果,梨
1> 3:猫,虎
2> 3:狗,狼
3> 3:羊,牛
4> 3:树袋熊
keyBy之后是两个分区,每个分区分别有3个和4个数据,之后设置为四个分区,用rescale重分区后,之前的两个分区各自循环到新分区上
6)global()
上游所有数据分配到下游第一个分区
data.keyBy(TestObj::getKey).map(new MapFunction<TestObj, TestObj>() {@Overridepublic TestObj map(TestObj testObj) throws Exception {return testObj;}
}).global().map(new MapFunction<TestObj, String>() {@Overridepublic String map(TestObj testObj) throws Exception {return testObj.getKey()+":"+testObj.getValue();}
}).print();
输出
1> 1:苹果,梨
1> 1:柚子,橘子
1> 1:葡萄
1> 3:猫,虎
1> 3:狗,狼
1> 3:羊,牛
1> 3:树袋熊
7)broadcast()
将上游的数据分配到下游的每个分区上
data.keyBy(TestObj::getKey).map(new MapFunction<TestObj, TestObj>() {@Overridepublic TestObj map(TestObj testObj) throws Exception {return testObj;}
}).broadcast().map(new MapFunction<TestObj, String>() {@Overridepublic String map(TestObj testObj) throws Exception {return testObj.getKey()+":"+testObj.getValue();}
}).print().setParallelism(3);
输出
3> 3:猫,虎
3> 3:树袋熊
3> 1:葡萄
3> 3:猫,虎
3> 3:树袋熊
3> 1:葡萄
3> 3:羊,牛
3> 1:柚子,橘子
3> 3:羊,牛
3> 1:柚子,橘子
3> 3:猫,虎
3> 3:树袋熊
3> 1:葡萄
3> 3:羊,牛
3> 1:柚子,橘子
1> 3:狗,狼
3> 3:狗,狼
3> 1:苹果,梨
2> 3:羊,牛
2> 1:柚子,橘子
2> 3:羊,牛
2> 1:柚子,橘子
2> 3:狗,狼
2> 1:苹果,梨
2> 3:狗,狼
2> 1:苹果,梨
2> 3:羊,牛
2> 1:柚子,橘子
2> 3:狗,狼
2> 1:苹果,梨
2> 3:猫,虎
2> 3:树袋熊
2> 1:葡萄
3> 3:猫,虎
3> 3:树袋熊
3> 1:葡萄
1> 1:苹果,梨
1> 3:狗,狼
1> 1:苹果,梨
1> 3:猫,虎
1> 3:树袋熊
1> 1:葡萄
1> 3:猫,虎
1> 3:树袋熊
1> 1:葡萄
1> 3:狗,狼
1> 1:苹果,梨
1> 3:猫,虎
1> 3:树袋熊
1> 1:葡萄
2> 3:羊,牛
2> 1:柚子,橘子
1> 3:羊,牛
1> 1:柚子,橘子
1> 3:狗,狼
1> 1:苹果,梨
8)partitionCustom(Partitioner<K> partitioner, KeySelector<T, K> keySelector)
自定义分区,改了下数据,下面例子是按照key值对2取余进行分区的
List<TestObj> testObjs=new ArrayList<>();
testObjs.add(new TestObj(1,"苹果,梨"));
testObjs.add(new TestObj(2,"柚子,橘子"));
testObjs.add(new TestObj(3,"猫,虎"));
testObjs.add(new TestObj(4,"狗,狼"));
testObjs.add(new TestObj(5,"羊,牛"));
testObjs.add(new TestObj(6,"葡萄"));
testObjs.add(new TestObj(7,"树袋熊"));
DataStream<TestObj> data=env.fromCollection(testObjs);
data.partitionCustom(new Partitioner<Integer>() {@Overridepublic int partition(Integer integer, int i) {return integer%2;}
},TestObj::getKey).map(new MapFunction<TestObj, String>() {@Overridepublic String map(TestObj testObj) throws Exception {return testObj.getKey()+":"+testObj.getValue();}
}).print();
输出
1> 2:柚子,橘子
1> 4:狗,狼
1> 6:葡萄
2> 1:苹果,梨
2> 3:猫,虎
2> 5:羊,牛
2> 7:树袋熊
最终奇数在一个分区,偶数在一个分区
3. 输出
1)print()
已经用过很多次了,输出到控制台
2)writeToSocket(String hostName, int port, SerializationSchema<T> schema)
输出到特定地址
hostName - 主机地址
port - 端口
schema - 序列化方法
3)addSink(SinkFunction<T> sinkFunction)
输出到其他位置
data.map(new MapFunction<TestObj, String>() {@Overridepublic String map(TestObj testObj) throws Exception {return testObj.getKey()+":"+testObj.getValue();}
}).addSink(new SinkFunction<String>() {@Overridepublic void invoke(String value, Context context) throws Exception {//实际输出方法//e.g. 输出到文件 FileUtils.writeFileUtf8(file,value);}
});
4. window相关在这里 >>> Flink 窗口 Window
5. 合并
1)union(DataStream<T>... streams)
把两个以上数据流合并成一个新的数据流,这些数据流的类型需要相同
List<TestObj> testObjs=new ArrayList<>();
testObjs.add(new TestObj(1,"苹果,梨"));
testObjs.add(new TestObj(1,"柚子,橘子"));
testObjs.add(new TestObj(3,"猫,虎"));
testObjs.add(new TestObj(3,"狗,狼"));
testObjs.add(new TestObj(3,"羊,牛"));
testObjs.add(new TestObj(1,"葡萄"));
testObjs.add(new TestObj(3,"树袋熊"));
List<TestObj> testObjs2=new ArrayList<>();
testObjs2.add(new TestObj(2,"白菜,油菜"));
testObjs2.add(new TestObj(2,"茄子"));
testObjs2.add(new TestObj(4,"桦树,杉树"));
testObjs2.add(new TestObj(4,"海棠"));
DataStream<TestObj> data=env.fromCollection(testObjs);
DataStream<TestObj> data2=env.fromCollection(testObjs2);
data.union(data2).map(new MapFunction<TestObj, String>() {@Overridepublic String map(TestObj testObj) throws Exception {return testObj.getKey()+":"+testObj.getValue();}
}).print();
输出
3> 3:树袋熊
2> 1:葡萄
1> 3:羊,牛
8> 3:狗,狼
5> 1:苹果,梨
8> 4:海棠
5> 2:白菜,油菜
7> 3:猫,虎
7> 4:桦树,杉树
6> 1:柚子,橘子
6> 2:茄子
2)join(DataStream<T2> otherStream)
两个数据流中的数据之间按照某个条件遍历合并,有点类似与数据库的连接操作,两个数据流中的数据类型可以不同
List<TestObj> testObjs=new ArrayList<>();
testObjs.add(new TestObj(1,"苹果,梨"));
testObjs.add(new TestObj(1,"柚子,橘子"));
testObjs.add(new TestObj(3,"猫,虎"));
testObjs.add(new TestObj(3,"狗,狼"));
testObjs.add(new TestObj(3,"羊,牛"));
testObjs.add(new TestObj(1,"葡萄"));
testObjs.add(new TestObj(3,"树袋熊"));
List<TestObj> testObjs2=new ArrayList<>();
testObjs2.add(new TestObj(1,"白菜,油菜"));
testObjs2.add(new TestObj(1,"茄子"));
testObjs2.add(new TestObj(3,"桦树,杉树"));
testObjs2.add(new TestObj(3,"海棠"));
DataStream<TestObj> data=env.fromCollection(testObjs).assignTimestampsAndWatermarks(new TestWatermarkStrategy());
DataStream<TestObj> data2=env.fromCollection(testObjs2).assignTimestampsAndWatermarks(new TestWatermarkStrategy());
data.join(data2).where(TestObj::getKey).equalTo(TestObj::getKey).window(TumblingEventTimeWindows.of(Time.minutes(1))).apply(new JoinFunction<TestObj, TestObj, TestObj>() {@Overridepublic TestObj join(TestObj testObj, TestObj testObj2) throws Exception {return new TestObj(testObj.getKey(),testObj.getValue()+","+testObj2.getValue());}}).map(new MapFunction<TestObj, String>() {@Overridepublic String map(TestObj testObj) throws Exception {return testObj.getKey()+":"+testObj.getValue();}}).print();
输出
8> 3:猫,虎,桦树,杉树
8> 3:猫,虎,海棠
8> 3:狗,狼,桦树,杉树
8> 3:狗,狼,海棠
8> 3:羊,牛,桦树,杉树
8> 3:羊,牛,海棠
8> 3:树袋熊,桦树,杉树
8> 3:树袋熊,海棠
6> 1:苹果,梨,白菜,油菜
6> 1:苹果,梨,茄子
6> 1:柚子,橘子,白菜,油菜
6> 1:柚子,橘子,茄子
6> 1:葡萄,白菜,油菜
6> 1:葡萄,茄子
3)intervalJoin(KeyedStream<T1, KEY> otherStream)
根据key相等合并两个KeyedStream,between(Time lowerBound, Time upperBound) 设置时间差范围,时间戳在testObject1的时间戳+lowerBound到testObject1的时间戳+upperBound内的testObject2会和testObject1合并,process是合并的方法
data.keyBy(TestObj::getKey).intervalJoin(data2.keyBy(TestObj::getKey)).between(Time.milliseconds(-2),Time.milliseconds(2)).process(new ProcessJoinFunction<TestObj, TestObj, TestObj>() {@Overridepublic void processElement(TestObj testObj, TestObj testObj2, Context context, Collector<TestObj> collector) throws Exception {collector.collect(new TestObj(testObj.getKey(),testObj.getValue()+","+testObj2.getValue()));}}).map(new MapFunction<TestObj, String>() {@Overridepublic String map(TestObj testObj) throws Exception {return testObj.getKey()+":"+testObj.getValue();}}).print();
输出
6> 1:苹果,梨,白菜,油菜
6> 1:柚子,橘子,茄子
6> 1:葡萄,茄子
8> 3:猫,虎,桦树,杉树
8> 3:狗,狼,桦树,杉树
8> 3:猫,虎,海棠
8> 3:狗,狼,海棠
8> 3:羊,牛,桦树,杉树
8> 3:羊,牛,海棠
8> 3:树袋熊,桦树,杉树
8> 3:树袋熊,海棠
和全部合并比少了一些结果,全部合并的结果参照join方法的输出
4)coGroup(DataStream<T2> otherStream)
根据设置的key合并两个DataSream,直接对key相同的两个列表进行处理,可以更加灵活的处理数据,下面例子将所有key相同的数据合并为一个
data.coGroup(data2).where(TestObj::getKey).equalTo(TestObj::getKey).window(TumblingEventTimeWindows.of(Time.minutes(1))).apply(new CoGroupFunction<TestObj, TestObj, TestObj>() {@Overridepublic void coGroup(Iterable<TestObj> iterable1, Iterable<TestObj> iterable2, Collector<TestObj> collector) throws Exception {int key=0;StringBuilder stringBuilder=new StringBuilder();for (TestObj obj:iterable1){if (key==0){key=obj.getKey();}stringBuilder.append(",").append(obj.getValue());}for (TestObj obj:iterable2){stringBuilder.append(",").append(obj.getValue());}collector.collect(new TestObj(key, stringBuilder.substring(1)));}}).map(new MapFunction<TestObj, String>() {@Overridepublic String map(TestObj testObj) throws Exception {return testObj.getKey()+":"+testObj.getValue();}}).print();
输出
8> 3:猫,虎,狗,狼,羊,牛,树袋熊,桦树,杉树,海棠
6> 1:苹果,梨,柚子,橘子,葡萄,白菜,油菜,茄子
5) connect(DataStream<R> dataStream)
两个数据流可以按照不同的方式统一类型最后成为同一个数据流,有两种统一格式的方式
data.connect(data2).map(new CoMapFunction<TestObj, TestObj, String>() {@Overridepublic String map1(TestObj testObj) throws Exception {return testObj.getKey()+"";}@Overridepublic String map2(TestObj testObj) throws Exception {return testObj.getValue();}}).print();
data.connect(data2).flatMap(new CoFlatMapFunction<TestObj, TestObj, String>() {@Overridepublic void flatMap1(TestObj testObj, Collector<String> collector) throws Exception {collector.collect(testObj.getKey()+"");}@Overridepublic void flatMap2(TestObj testObj, Collector<String> collector) throws Exception {collector.collect(testObj.getValue());}}).print();
输出
4> 3
5> 茄子
4> 白菜,油菜
6> 1
7> 1
7> 海棠
3> 1
8> 3
2> 3
1> 3
6> 桦树,杉树
6.iterate
把官网的例子缩了个水,把非必要的部分都去掉后其实就下面代码中的上面两句(剩下的是打印,为了看结果)
IterativeStream<TestObj> iterativeStream=data.iterate();
iterativeStream.closeWith(iterativeStream);
iterativeStream.map(new MapFunction<TestObj, String>() {@Overridepublic String map(TestObj testObj) throws Exception {return testObj.getValue();}
}).print();
输出
6> 柚子,橘子
4> 狗,狼
2> 葡萄
1> 柚子,橘子
1> 猫,虎
1> 狗,狼
1> 羊,牛
... ...
会无限循环数据,不会自己停止
可以使用iterate(long maxWaitTimeMillis)方法增加限制,这种情况下如果maxWaitTimeMillis毫秒内没有新数据加入,就自动停止
根据官网,iterate()方法非常适合做算法驯练
大致就是这个意思
IterativeStream<TestObj> iterativeStream=data.iterate();
//数据预处理
DataStream<TestObj> iterativeBody=iterativeStream.map(new MapFunction<TestObj, TestObj>() {@Overridepublic TestObj map(TestObj testObj) throws Exception {return new TestObj(testObj.getKey()*2,testObj.getValue());}
});
//驯练的函数方法
DataStream<TestObj> feedback=iterativeBody.filter(new FilterFunction<TestObj>() {@Overridepublic boolean filter(TestObj testObj) throws Exception {return testObj.getValue().indexOf(",")>0;}
}).setParallelism(1);
//开始循环
iterativeStream.closeWith(feedback);
Flink数据流DataStream相关推荐
- Flink基于 DataStream API 实现欺诈检测
目录 系列文章目录 文章目录 前言 一.Flink基于 DataStream API 实现欺诈检测 二.使用步骤 1.引入pom.xml 2.主类 3.欺诈逻辑判断类 4.运行结果: 总结 前言 在当 ...
- 【2】flink数据流转换算子
[README] 本文记录了flink对数据的转换操作,包括 基本转换,map,flatMap,filter: 滚动聚合(min minBy max maxBy sum): 规约聚合-reduce: ...
- Flink数据流编程模型(Dataflow Programming Model)
目录 抽象层次(Levels of Abstraction) 程序与数据流(Programs and Dataflows) Parallel Dataflows(并行数据流) 窗口(Windows) ...
- 学习笔记Flink(二)—— Flink数据流模型、时间窗口和核心概念
一.Flink编程数据流模型 1.1.Flink – API封装 Flink 提供不同级别的API封装来支持流/批处理应用程序. 1.2.Flink-编程数据流 Source:一个不会结束的数据记录流 ...
- flink中datastream和dataset各自print()的异同
根据[1] 负责输出的进程 默认能打印到提交任务的终端吗? print()结果 输出位置 在WEB UI查看位置 批处理(DataSet) Job Manager 能 Master节点的 提交任务 ...
- flink的datastream输出没有结果
代码如下: import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.ap ...
- 0006-Flink原理(Flink数据流 执行图)
一.程序与数据流转换(DataFlow) • 所有的Flink程序都是由三部分组成的: Source .Transformation 和 Sink. • Source 负责读取数据源,Transfor ...
- Flink:DataStream Connectors 之 Kafka
本文主要介绍 Kafka 在 Apache Flink 中的使用,以一个简单的示例,向大家介绍在 Apache Flink 中如何使用 Kafka. 版本: kafka_2.11-2.1.0.tgz ...
- 大数据系列教程(4)Flink 使用 DataStream API 进行欺诈检测
目录 使用 DataStream API 进行欺诈检测 **版本1** 版本2 版本3 使用 DataStream API 进行欺诈检测 Apache Flink 提供了一个 DataStream A ...
最新文章
- ios9定位服务的app进入后台三分钟收不到经纬度,应用被挂起问题及解决方案
- 宿主如何访问虚拟机中的web服务器
- (DNS被劫持所导致的)QQ音乐与视频网页打开很慢的解决方法
- Swift5之网易云音乐页面搭建
- RxJava之PublishSubject、BehaviorSubject、ReplaySubject和AsyncSubject
- 膨胀卷积的缺点_膨胀卷积与IDCNN
- 为什么大多数人是穷人?
- linux杂谈(十七):iscsi存储分离技术
- tomcat7源代码Bootstrap
- 课堂经验值管理小程序_济南小程序开发,微信小程序应用开发实现单店管理
- 黑客为什么不攻击支付宝?
- 交接文档怎么写_怎么写一篇实用的需求说明文档
- RT-Thread : IEEE1588/PTP 协议的实现
- Windows进程管理
- cad线段总和lisp_autocadlisp统计多段线方法
- 三菱FX5U传送指令
- VisualSVN的使用
- 论文阅读——(邬江兴院士) 网络空间内生安全综述 Cyberspace Endogenous Safety and Security
- 基于springboot学生公寓管理系统-计算机毕业设计源码+LW文档
- openssl之C++实现私(公)钥生成、转换
热门文章
- Dao层,Mapper层,controller层,service层,model层都有什么作用
- 春节家宴必不可少的解腻凉菜【山楂白菜心】
- 外贸邮件群发,如何群发邮件?
- 88是python语言的整数类型_Python开发【第二篇】:Python基本数据类型
- Artanis: 工作日志自动收发系统
- 大屏可视化!2022新趋势!
- 计算机主机型号查询,如何查询电脑硬件的型号
- 解决ora-01861文字与格式字符串不匹配
- 【20210919】LaTex入门:overleaf使用
- springboot毕设项目基于springboot的模拟面试平台 7tch0(java+VUE+Mybatis+Maven+Mysql)