[Flink]Flink常用的DataStream转换算子
目录
3.1 Map
3.2 FlatMap
3.3 Filter
3.4 KeyBy
3.5 Reduce
3.6 Fold
3.7 Aggregations
3.8 Window
3.9 WindowAll
4.0 Aggregations on windows
4.1 Union
4.2 Split
4.3 select
3.1 Map
DataStream → DataStream
一对一转换,即输入的记录和输出的记录相等。
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransformationsMap {public static void main(String []arv) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();DataStream dsSocket=env.socketTextStream("192.168.23.210",9000);//函数式DataStream dsMap1=dsSocket.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String vlaue) throws Exception {Tuple2<String,Integer> word=Tuple2.of(vlaue,1);return word;}});//dsMap1.print();//Lamda方式DataStream dsMap2=dsSocket.map(value->Tuple2.of(value,1));dsMap2.print();env.execute("TransformationsMap");}
}
输入:
输出:
4> (d,1)
3.2 FlatMap
一行变零到多行,即输入一行,输出0行或多行。
DataStream → DataStream
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class TransformationsFlatmap {public static void main(String []arv) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();DataStream dsSocket=env.socketTextStream("192.168.23.210",9000);//函数式
//输入spark,hive,hbaseDataStream ds1=dsSocket.flatMap(new FlatMapFunction<String,Tuple2<String,Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String,Integer>> collector) throws Exception {String [] words=value.split(",");for(String word:words){collector.collect(Tuple2.of(word,1));}}});ds1.print();env.execute("TransformationsMap");}
}
3.3 Filter
DataStream → DataStream
通过一个布尔函数对每一个元素进行判断,返回符合条件的元素。
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransformationsFilter {public static void main(String []arv) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();DataStream dsSocket=env.socketTextStream("192.168.23.210",9000);//函数式DataStream ds1=dsSocket.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {boolean b=value.equalsIgnoreCase("spark");return b;}});ds1.print();env.execute("TransformationsMap");}
}
3.4 KeyBy
DataStream → KeyedStream
在逻辑上将一个流分成不相交的分区。具有相同键的所有记录都分配给同一个分区。在内部,keyBy在内部是通过哈希分区实现的。
注意:
- 分区结果和KeyBy下游算子的并行度强相关。如下游算子只有一个并行度,不管怎么分,都会分到一起。
- 对于POJO类型,KeyBy可以通过keyBy(fieldName)指定字段进行分区。
- 对于Tuple类型,KeyBy可以通过keyBy(fieldPosition)指定字段进行分区。
- 对于一般类型,如上, KeyBy可以通过keyBy(new KeySelector {...})指定字段进行分区
public class Person {private String id;private int sum;public Person(){};public Person(String id, int sum) {this.id = id;this.sum = sum;}public String getId() {return id;}public void setId(String id) {this.id = id;}public int getSum() {return sum;}public void setSum(int sum) {this.sum = sum;}
}import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class TransformationsBeyBy {public static void main(String []arv) throws Exception{ StreamExecutionEnvironment env1=StreamExecutionEnvironment.getExecutionEnvironment();DataStream dsCollection=env.fromCollection(Arrays.asList(new Person("a",1),new Person("a",2),new Person("b",1),new Person("a",2)));KeyedStream<Person, String> ksCollection1=dsCollection.keyBy(new KeySelector<Person,String>() {@Overridepublic String getKey(Person p) throws Exception {return p.getId();}});DataStream<Person> dsSum=ksCollection1.sum("sum");dsSum.print();env.execute("TransformationsMap");}
}
3.5 Reduce
KeyedStream → DataStream
将当前元素与上一个减少的值合并并发出新值,是一个有状态的算子。即基于ReduceFunction进行滚动聚合,并向下游算子输出每次滚动聚合后的结果。
public class Person {private String id;private int sum;public Person(){};public Person(String id, int sum) {this.id = id;this.sum = sum;}public String getId() {return id;}public void setId(String id) {this.id = id;}public int getSum() {return sum;}public void setSum(int sum) {this.sum = sum;}@Overridepublic String toString() {return "Person{" +"id='" + id + '\'' +", sum=" + sum +'}';}
}import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class TransformationsReduce {public static void main(String []arv) throws Exception{StreamExecutionEnvironment env1=StreamExecutionEnvironment.getExecutionEnvironment();DataStream dsCollection=env.fromCollection(Arrays.asList(new Person("a",1),new Person("a",2),new Person("b",1),new Person("a",2)));KeyedStream<Person, String> keyStream01=dsCollection.keyBy(new KeySelector<Person,String>() {@Overridepublic String getKey(Person p) throws Exception {return p.getId();}});DataStream<Person> dsReduce=keyStream01.reduce(new ReduceFunction<Person>() {@Overridepublic Person reduce(Person person, Person t1) throws Exception {int sum=person.getSum()+t1.getSum();return new Person(person.getId(),sum);}});dsReduce.print();env.execute("TransformationsMap");}
}
3.6 Fold
KeyedStream → DataStream
具有初始值的键控数据流上的“滚动”折叠。将当前元素与最后折叠的值合并,并输出新值。
这么理解呢?fold是组内的每个元素与累加器(一开始是初始值initialValue)合并再返回累加器,累加器的类型可以与组内的元素类型不一致;和reduce相似,只是多了一个初始参数。
public class Person {private String id;private int sum;public Person(){};public Person(String id, int sum) {this.id = id;this.sum = sum;}public String getId() {return id;}public void setId(String id) {this.id = id;}public int getSum() {return sum;}public void setSum(int sum) {this.sum = sum;}@Overridepublic String toString() {return "Person{" +"id='" + id + '\'' +", sum=" + sum +'}';}
}import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class TransformationsFold {public static void main(String []arv) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();//通过KeySelector指定keyStreamExecutionEnvironment env1=StreamExecutionEnvironment.getExecutionEnvironment();DataStream dsCollection=env.fromCollection(Arrays.asList(new Person("a",1),new Person("a",2),new Person("b",1),new Person("a",2)));KeyedStream<Person, String> keyStream01=dsCollection.keyBy(new KeySelector<Person,String>() {@Overridepublic String getKey(Person p) throws Exception {return p.getId();}});DataStream<String> dsFold=keyStream01.fold("start", new FoldFunction<Person, String>() {@Overridepublic String fold(String current, Person p) throws Exception {return current + "-" + p.getId();}});dsFold.print();env.execute("TransformationsMap");}
}输出:
6> start-a
6> start-a-a
2> start-b
6> start-a-a-a
3.7 Aggregations
KeyedStream → DataStream
在KeyedStream上进行滚动聚合;min和minBy之间的区别是min返回最小值,而minBy返回在此字段中具有最小值的元素(与max和maxBy相同)。
即min会根据指定的字段取最小值,并且把这个值保存在对应的位置上,对于其他的字段取了最先获取的值,不能保证每个元素的数值正确,max同理。而minBy会返回指定字段取最小值的元素,并且会覆盖指定字段小于当前已找到的最小值元素。maxBy同理。
主要的聚合方法:
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
实例:
import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;import java.util.Arrays;import java.util.List;public class TransformationsAggregations {public static void main(String []arv) throws Exception{StreamExecutionEnvironment env1=StreamExecutionEnvironment.getExecutionEnvironment();//获取数据源List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();data.add(new Tuple3<>(0,2,2));data.add(new Tuple3<>(0,1,1));data.add(new Tuple3<>(0,5,6));data.add(new Tuple3<>(0,3,5));data.add(new Tuple3<>(1,1,9));data.add(new Tuple3<>(1,2,8));data.add(new Tuple3<>(1,3,10));data.add(new Tuple3<>(1,2,9));DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env1.fromCollection(data);//items.keyBy(0).min(2).print();/* min输出6> (0,2,2)6> (0,2,1)6> (0,2,1)6> (0,2,1)6> (1,1,9)6> (1,1,8)6> (1,1,8)6> (1,1,8)*/items.keyBy(0).minBy(2).print();/* minBy输出6> (0,2,2)6> (0,1,1)6> (0,1,1)6> (0,1,1)6> (1,1,9)6> (1,2,8)6> (1,2,8)6> (1,2,8)*/env1.execute("TransformationsAggregations");}}
3.8 Window
KeyedStream → WindowedStream
把已经按key分区的KeyedStream,按window定义的窗口进行输出,可以设置平行度。
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class TransformationsWindow
{public static void main(String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.socketTextStream("192.168.23.210",9000);DataStream ds1 = text.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String s) throws Exception {return Tuple2.of(s,1);}});WindowedStream ds2 = ds1.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(5)));ds2.sum(1).print();env.execute();}
}
3.9 WindowAll
DataStream → AllWindowedStream
把数据流按windows指定的窗口大小输出。这个窗口可以的数据流可以说是DataStream的也可以是KeyedStream,但是不能设置平行度,并行度始终为1。
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class TransformationsWindowAll
{public static void main(String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.socketTextStream("192.168.23.210",9000);//text.print();DataStream ds1 = text.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String s) throws Exception {return Tuple2.of(s,1);}});//WindowAll用于DataStreamAllWindowedStream ts = ds1.timeWindowAll(Time.seconds(5));ts.sum(1).print();//WindowAll 用于keyByAllWindowedStream ds2 = ds1.keyBy(0).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)));ds2.sum(1).print();env.execute();}
}
4.0 Aggregations on windows
WindowedStream → DataStream
Windows的聚合函数和普通聚合函数类似,区别在于窗口聚合函数聚合时只统计窗口内的数据。
常用窗口聚合函数:
- windowedStream.sum(0);
- windowedStream.sum("key");
- windowedStream.min(0);
- windowedStream.min("key");
- windowedStream.max(0);
- windowedStream.max("key");
- windowedStream.minBy(0);
- windowedStream.minBy("key");
- windowedStream.maxBy(0);
- windowedStream.maxBy("key");
4.1 Union
DataStream* → DataStream
把多个数据流合并成一个数据流。
DataStream ds3=ds1.union(ds2,ds3,…);
4.2 Split
DataStream → SplitStream
把一个数据流拆分成多个数据流;
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() { @Override public Iterable<String> select(Integer value) { List<String> output = new ArrayList<String>(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; } }); SplitStream<Integer> split; DataStream<Integer> even = split.select("even"); DataStream<Integer> odd = split.select("odd"); DataStream<Integer> all = split.select("even","odd"); |
4.3 select
SplitStream → DataStream
从SplitStream获取DataStream
[Flink]Flink常用的DataStream转换算子相关推荐
- Flink 常用API(2)——转换算子+聚合算子
转换算子(Transformation) 映射(map) 用于将数据流中的数据进行转换,形成新的数据流 "一一映射",消费一个元素就产出一个元素 参数:接口 MapFunction ...
- 【2】flink数据流转换算子
[README] 本文记录了flink对数据的转换操作,包括 基本转换,map,flatMap,filter: 滚动聚合(min minBy max maxBy sum): 规约聚合-reduce: ...
- Flink DataStream iterate算子的简单使用
Flink DataStream iterate算子的简单使用 由于DataStream程序可能永远不会完成,因此没有最大迭代次数.相反你需要指定流的哪个部分反馈到迭代,哪个部分使用split转换或转 ...
- Flink Table 和 DataStream 转换
文章目录 Flink Table 和 DataStream 转换 1. 表(Table) 转换为 流(DataStream) 1.1 处理(仅插入)流 1.1.1 fromDataStream()方法 ...
- spark应用程序转换_Spark—RDD编程常用转换算子代码实例
Spark-RDD编程常用转换算子代码实例 Spark rdd 常用 Transformation 实例: 1.def map[U: ClassTag](f: T => U): RDD[U] ...
- Spark 常用算子详解(转换算子、行动算子、控制算子)
Spark简介 Spark是专为大规模数据处理而设计的快速通用的计算引擎: Spark拥有Hadoop MapReduce所具有的优点,但是运行速度却比MapReduce有很大的提升,特别是在数据挖掘 ...
- flink使用DataStreamUtils将DataStream的数据转成Iterator迭代器的数据(如数组、列表、集合等)
1.scala代码如下 import org.apache.flink.streaming.experimental.DataStreamUtils import scala.collection.J ...
- 数据湖(十七):Flink与Iceberg整合DataStream API操作
文章目录 Flink与Iceberg整合DataStream API操作 一.DataStream API 实时写入Iceberg表 1.首先在Maven中导入以下依赖
- [Flink]Flink的window介绍
目录 概述 窗口的生命周期 Keyed和Non-Keyed窗口 窗口分配器 Tumbling Windows Sliding Windows Session Windows Global Window ...
最新文章
- 只有这种AI芯片才能拯救人工智能?
- winform 打包
- 每天研究一个产品,阿德老师“手摸手”带你写产品分析报告 |
- gdb core调试
- ARM(IMX6U)裸机模仿STM32驱动开发实验(定义外设结构体)
- MyBatis框架笔记01:MyBatis初探
- fileinputstream读取文件_压缩 20M 文件从 30 秒到 1 秒的优化过程
- pyqt5 发送键盘信号_Python教程 | Pyqt5实战教程之操作交互处理,原来这么简单!...
- python里w_python vs pythonw | 学步园
- vsftpd配置系统用户为登陆用户
- 加载类型库/DLL 时出错。 (Exception from HRESULT: 0x80029C4A (TYPE_E_CANTLOADLIBRARY)
- SpringSecurity框架用法简介
- TLS SNI(TLS Server Name Indication)配置:F5、Nginx和IIS
- 2020云栖大会-达摩院
- windows2003 升级sp1,sp2 产品密匙无效 (转)
- 软件自动升级解决方案(一)
- Matlab实现点击按钮进行页面切换
- 怎样调整Firefox火狐浏览器开发者控制台字体大小
- 手把手教写拖拽布局插件(拖拽功能篇)
- 操作系统-时间片轮转调度算法