目录

3.1 Map

3.2 FlatMap

3.3 Filter

3.4 KeyBy

3.5 Reduce

3.6 Fold

3.7 Aggregations

3.8 Window

3.9 WindowAll

4.0 Aggregations on windows

4.1 Union

4.2 Split

4.3 select


3.1 Map

DataStream → DataStream

一对一转换,即输入的记录和输出的记录相等。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransformationsMap {public static void main(String []arv) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();DataStream dsSocket=env.socketTextStream("192.168.23.210",9000);//函数式DataStream dsMap1=dsSocket.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String vlaue) throws Exception {Tuple2<String,Integer> word=Tuple2.of(vlaue,1);return word;}});//dsMap1.print();//Lamda方式DataStream dsMap2=dsSocket.map(value->Tuple2.of(value,1));dsMap2.print();env.execute("TransformationsMap");}
}

输入:

输出:

4> (d,1)

3.2 FlatMap

一行变零到多行,即输入一行,输出0行或多行。

DataStream → DataStream

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class TransformationsFlatmap {public static void main(String []arv) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();DataStream dsSocket=env.socketTextStream("192.168.23.210",9000);//函数式
//输入spark,hive,hbaseDataStream ds1=dsSocket.flatMap(new FlatMapFunction<String,Tuple2<String,Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String,Integer>> collector) throws Exception {String [] words=value.split(",");for(String word:words){collector.collect(Tuple2.of(word,1));}}});ds1.print();env.execute("TransformationsMap");}
}

3.3 Filter

DataStream → DataStream

通过一个布尔函数对每一个元素进行判断,返回符合条件的元素。

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransformationsFilter {public static void main(String []arv) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();DataStream dsSocket=env.socketTextStream("192.168.23.210",9000);//函数式DataStream ds1=dsSocket.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {boolean b=value.equalsIgnoreCase("spark");return b;}});ds1.print();env.execute("TransformationsMap");}
}

3.4 KeyBy

DataStream → KeyedStream

在逻辑上将一个流分成不相交的分区。具有相同键的所有记录都分配给同一个分区。在内部,keyBy在内部是通过哈希分区实现的。

注意:

  1. 分区结果和KeyBy下游算子的并行度强相关。如下游算子只有一个并行度,不管怎么分,都会分到一起。
  2. 对于POJO类型,KeyBy可以通过keyBy(fieldName)指定字段进行分区。
  3. 对于Tuple类型,KeyBy可以通过keyBy(fieldPosition)指定字段进行分区。
  4. 对于一般类型,如上, KeyBy可以通过keyBy(new KeySelector {...})指定字段进行分区
public class Person {private String id;private int sum;public Person(){};public Person(String id, int sum) {this.id = id;this.sum = sum;}public String getId() {return id;}public void setId(String id) {this.id = id;}public int getSum() {return sum;}public void setSum(int sum) {this.sum = sum;}
}import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class TransformationsBeyBy {public static void main(String []arv) throws Exception{ StreamExecutionEnvironment env1=StreamExecutionEnvironment.getExecutionEnvironment();DataStream dsCollection=env.fromCollection(Arrays.asList(new Person("a",1),new Person("a",2),new Person("b",1),new Person("a",2)));KeyedStream<Person, String>  ksCollection1=dsCollection.keyBy(new KeySelector<Person,String>() {@Overridepublic String getKey(Person p) throws Exception {return p.getId();}});DataStream<Person>  dsSum=ksCollection1.sum("sum");dsSum.print();env.execute("TransformationsMap");}
}

3.5 Reduce

KeyedStream → DataStream

将当前元素与上一个减少的值合并并发出新值,是一个有状态的算子。即基于ReduceFunction进行滚动聚合,并向下游算子输出每次滚动聚合后的结果。

public class Person {private String id;private int sum;public Person(){};public Person(String id, int sum) {this.id = id;this.sum = sum;}public String getId() {return id;}public void setId(String id) {this.id = id;}public int getSum() {return sum;}public void setSum(int sum) {this.sum = sum;}@Overridepublic String toString() {return "Person{" +"id='" + id + '\'' +", sum=" + sum +'}';}
}import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class TransformationsReduce {public static void main(String []arv) throws Exception{StreamExecutionEnvironment env1=StreamExecutionEnvironment.getExecutionEnvironment();DataStream dsCollection=env.fromCollection(Arrays.asList(new Person("a",1),new Person("a",2),new Person("b",1),new Person("a",2)));KeyedStream<Person, String>  keyStream01=dsCollection.keyBy(new KeySelector<Person,String>() {@Overridepublic String getKey(Person p) throws Exception {return p.getId();}});DataStream<Person>  dsReduce=keyStream01.reduce(new ReduceFunction<Person>() {@Overridepublic Person reduce(Person person, Person t1) throws Exception {int sum=person.getSum()+t1.getSum();return new Person(person.getId(),sum);}});dsReduce.print();env.execute("TransformationsMap");}
}

3.6 Fold

KeyedStream → DataStream

具有初始值的键控数据流上的“滚动”折叠。将当前元素与最后折叠的值合并,并输出新值。

这么理解呢?fold是组内的每个元素与累加器(一开始是初始值initialValue)合并再返回累加器,累加器的类型可以与组内的元素类型不一致;和reduce相似,只是多了一个初始参数。

public class Person {private String id;private int sum;public Person(){};public Person(String id, int sum) {this.id = id;this.sum = sum;}public String getId() {return id;}public void setId(String id) {this.id = id;}public int getSum() {return sum;}public void setSum(int sum) {this.sum = sum;}@Overridepublic String toString() {return "Person{" +"id='" + id + '\'' +", sum=" + sum +'}';}
}import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class TransformationsFold {public static void main(String []arv) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();//通过KeySelector指定keyStreamExecutionEnvironment env1=StreamExecutionEnvironment.getExecutionEnvironment();DataStream dsCollection=env.fromCollection(Arrays.asList(new Person("a",1),new Person("a",2),new Person("b",1),new Person("a",2)));KeyedStream<Person, String>  keyStream01=dsCollection.keyBy(new KeySelector<Person,String>() {@Overridepublic String getKey(Person p) throws Exception {return p.getId();}});DataStream<String> dsFold=keyStream01.fold("start", new FoldFunction<Person, String>() {@Overridepublic String fold(String current, Person p) throws Exception {return   current + "-" + p.getId();}});dsFold.print();env.execute("TransformationsMap");}
}输出:
6> start-a
6> start-a-a
2> start-b
6> start-a-a-a

3.7 Aggregations

KeyedStream → DataStream

在KeyedStream上进行滚动聚合;min和minBy之间的区别是min返回最小值,而minBy返回在此字段中具有最小值的元素(与max和maxBy相同)。

即min会根据指定的字段取最小值,并且把这个值保存在对应的位置上,对于其他的字段取了最先获取的值,不能保证每个元素的数值正确,max同理。而minBy会返回指定字段取最小值的元素,并且会覆盖指定字段小于当前已找到的最小值元素。maxBy同理。

主要的聚合方法:

keyedStream.sum(0);

keyedStream.sum("key");

keyedStream.min(0);

keyedStream.min("key");

keyedStream.max(0);

keyedStream.max("key");

keyedStream.minBy(0);

keyedStream.minBy("key");

keyedStream.maxBy(0);

keyedStream.maxBy("key");

实例:

  import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;import java.util.Arrays;import java.util.List;public class TransformationsAggregations {public static void main(String []arv) throws Exception{StreamExecutionEnvironment env1=StreamExecutionEnvironment.getExecutionEnvironment();//获取数据源List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();data.add(new Tuple3<>(0,2,2));data.add(new Tuple3<>(0,1,1));data.add(new Tuple3<>(0,5,6));data.add(new Tuple3<>(0,3,5));data.add(new Tuple3<>(1,1,9));data.add(new Tuple3<>(1,2,8));data.add(new Tuple3<>(1,3,10));data.add(new Tuple3<>(1,2,9));DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env1.fromCollection(data);//items.keyBy(0).min(2).print();/* min输出6> (0,2,2)6> (0,2,1)6> (0,2,1)6> (0,2,1)6> (1,1,9)6> (1,1,8)6> (1,1,8)6> (1,1,8)*/items.keyBy(0).minBy(2).print();/* minBy输出6> (0,2,2)6> (0,1,1)6> (0,1,1)6> (0,1,1)6> (1,1,9)6> (1,2,8)6> (1,2,8)6> (1,2,8)*/env1.execute("TransformationsAggregations");}}

3.8 Window

KeyedStream → WindowedStream

把已经按key分区的KeyedStream,按window定义的窗口进行输出,可以设置平行度。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class TransformationsWindow
{public  static  void main(String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.socketTextStream("192.168.23.210",9000);DataStream  ds1 = text.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic  Tuple2<String,Integer> map(String s) throws Exception {return Tuple2.of(s,1);}});WindowedStream ds2 = ds1.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(5)));ds2.sum(1).print();env.execute();}
}
 

3.9 WindowAll

DataStream → AllWindowedStream

把数据流按windows指定的窗口大小输出。这个窗口可以的数据流可以说是DataStream的也可以是KeyedStream,但是不能设置平行度,并行度始终为1。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class TransformationsWindowAll
{public  static  void main(String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.socketTextStream("192.168.23.210",9000);//text.print();DataStream  ds1 = text.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic  Tuple2<String,Integer> map(String s) throws Exception {return Tuple2.of(s,1);}});//WindowAll用于DataStreamAllWindowedStream ts = ds1.timeWindowAll(Time.seconds(5));ts.sum(1).print();//WindowAll 用于keyByAllWindowedStream ds2 = ds1.keyBy(0).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)));ds2.sum(1).print();env.execute();}
}

4.0 Aggregations on windows

WindowedStream → DataStream

Windows的聚合函数和普通聚合函数类似,区别在于窗口聚合函数聚合时只统计窗口内的数据。

常用窗口聚合函数:

  1. windowedStream.sum(0);
  2. windowedStream.sum("key");
  3. windowedStream.min(0);
  4. windowedStream.min("key");
  5. windowedStream.max(0);
  6. windowedStream.max("key");
  7. windowedStream.minBy(0);
  8. windowedStream.minBy("key");
  9. windowedStream.maxBy(0);
  10. windowedStream.maxBy("key");

4.1 Union

DataStream* → DataStream

把多个数据流合并成一个数据流。

DataStream  ds3=ds1.union(ds2,ds3,…);

4.2 Split

DataStream → SplitStream

把一个数据流拆分成多个数据流;

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {

@Override

public Iterable<String> select(Integer value) {

List<String> output = new ArrayList<String>();

if (value % 2 == 0) {

output.add("even");

}

else {

output.add("odd");

}

return output;

}

});

SplitStream<Integer> split;

DataStream<Integer> even = split.select("even");

DataStream<Integer> odd = split.select("odd");

DataStream<Integer> all = split.select("even","odd");

4.3 select

SplitStream → DataStream

从SplitStream获取DataStream

[Flink]Flink常用的DataStream转换算子相关推荐

  1. Flink 常用API(2)——转换算子+聚合算子

    转换算子(Transformation) 映射(map) 用于将数据流中的数据进行转换,形成新的数据流 "一一映射",消费一个元素就产出一个元素 参数:接口 MapFunction ...

  2. 【2】flink数据流转换算子

    [README] 本文记录了flink对数据的转换操作,包括 基本转换,map,flatMap,filter: 滚动聚合(min minBy max maxBy sum): 规约聚合-reduce: ...

  3. Flink DataStream iterate算子的简单使用

    Flink DataStream iterate算子的简单使用 由于DataStream程序可能永远不会完成,因此没有最大迭代次数.相反你需要指定流的哪个部分反馈到迭代,哪个部分使用split转换或转 ...

  4. Flink Table 和 DataStream 转换

    文章目录 Flink Table 和 DataStream 转换 1. 表(Table) 转换为 流(DataStream) 1.1 处理(仅插入)流 1.1.1 fromDataStream()方法 ...

  5. spark应用程序转换_Spark—RDD编程常用转换算子代码实例

    Spark-RDD编程常用转换算子代码实例 Spark rdd 常用 Transformation 实例: 1.def map[U: ClassTag](f: T => U): RDD[U]  ...

  6. Spark 常用算子详解(转换算子、行动算子、控制算子)

    Spark简介 Spark是专为大规模数据处理而设计的快速通用的计算引擎: Spark拥有Hadoop MapReduce所具有的优点,但是运行速度却比MapReduce有很大的提升,特别是在数据挖掘 ...

  7. flink使用DataStreamUtils将DataStream的数据转成Iterator迭代器的数据(如数组、列表、集合等)

    1.scala代码如下 import org.apache.flink.streaming.experimental.DataStreamUtils import scala.collection.J ...

  8. 数据湖(十七):Flink与Iceberg整合DataStream API操作

    文章目录 Flink与Iceberg整合DataStream API操作 一.DataStream API 实时写入Iceberg表 1.首先在Maven中导入以下依赖

  9. [Flink]Flink的window介绍

    目录 概述 窗口的生命周期 Keyed和Non-Keyed窗口 窗口分配器 Tumbling Windows Sliding Windows Session Windows Global Window ...

最新文章

  1. 只有这种AI芯片才能拯救人工智能?
  2. winform 打包
  3. 每天研究一个产品,阿德老师“手摸手”带你写产品分析报告 |
  4. gdb core调试
  5. ARM(IMX6U)裸机模仿STM32驱动开发实验(定义外设结构体)
  6. MyBatis框架笔记01:MyBatis初探
  7. fileinputstream读取文件_压缩 20M 文件从 30 秒到 1 秒的优化过程
  8. pyqt5 发送键盘信号_Python教程 | Pyqt5实战教程之操作交互处理,原来这么简单!...
  9. python里w_python vs pythonw | 学步园
  10. vsftpd配置系统用户为登陆用户
  11. 加载类型库/DLL 时出错。 (Exception from HRESULT: 0x80029C4A (TYPE_E_CANTLOADLIBRARY)
  12. SpringSecurity框架用法简介
  13. TLS SNI(TLS Server Name Indication)配置:F5、Nginx和IIS
  14. 2020云栖大会-达摩院
  15. windows2003 升级sp1,sp2 产品密匙无效 (转)
  16. 软件自动升级解决方案(一)
  17. Matlab实现点击按钮进行页面切换
  18. 怎样调整Firefox火狐浏览器开发者控制台字体大小
  19. 手把手教写拖拽布局插件(拖拽功能篇)
  20. 操作系统-时间片轮转调度算法

热门文章

  1. 【bzoj2834】回家的路 分层图最短路
  2. 马士兵讲jsp项目--BBS项目分析笔记
  3. 静态路由和DHCP/NAT/VLAN的配置实例
  4. 谈谈运维人员谨慎操作系统环境和管理
  5. 管理数据,应用程序和主机安全-C
  6. sublime 常用插件列表
  7. 如何在eclipse里使用git
  8. 转载:NPOI导出到Excel表格
  9. Unable to compile template. Check the Errors list for details 问题解决办法
  10. 反射学习系列1-反射入门