Flink 常用API(2)——转换算子+聚合算子
转换算子(Transformation)
映射(map)
用于将数据流中的数据进行转换,形成新的数据流
“一一映射”,消费一个元素就产出一个元素
参数:接口 MapFunction 的实现
方法:map
返回值类型:DataStream
通过匿名类/实现类实现:
// 传入匿名类,实现 MapFunction
//Event是输入,String是输出
stream.map(new MapFunction<Event, String>() {@Overridepublic String map(Event e) throws Exception {return e.user;}});// 传入 MapFunction 的实现类stream.map(new UserExtractor()).print();env.execute();
public static class UserExtractor implements MapFunction<Event, String> {@Overridepublic String map(Event e) throws Exception {return e.user;}}
源码相关:
调用map方法,可以看到返回的是一个SingleOutputStreamOperator
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {TypeInformation<R> outType = TypeExtractor.getMapReturnTypes((MapFunction)this.clean(mapper), this.getType(), Utils.getCallLocationName(), true);return this.map(mapper, outType);}
而SingleOutputStreamOperator 类本身也继承自 DataStream 类,在DataStream类的基础上对一些参数进行了更详细的设置;
public class SingleOutputStreamOperator<T> extends DataStream<T>
过滤(filter)
参数:FilterFunction 接口的实现
方法:filter
返回值类型:DataStream
// 传入匿名类实现FilterFunctionstream.filter(new FilterFunction<Event>() {@Overridepublic boolean filter(Event e) throws Exception {return e.user.equals("Mary");}});// 传入FilterFunction实现类stream.filter(new UserFilter()).print();env.execute();}public static class UserFilter implements FilterFunction<Event> {@Overridepublic boolean filter(Event e) throws Exception {return e.user.equals("Mary");}}
扁平映射(flatMap)
先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理
参数:FlatMapFunction 接口的实现
方法:flatMap
返回值类型:flatMap 并没有直接定义返回值类型,而是通过一个“收集器”(Collector)来指定输出
同 map 一样,flatMap 也可以使用 Lambda 表达式或者 FlatMapFunction 接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同
public static class MyFlatMap implements FlatMapFunction<Event, String> {@Overridepublic void flatMap(Event value, Collector<String> out) throws Exception
{if (value.user.equals("Mary")) {out.collect(value.user);} else if (value.user.equals("Bob")) {out.collect(value.user);out.collect(value.url);}}}
聚合算子(Aggregation)
按键分区(keyBy)
对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率,因此要聚合,先分区;
keyBy 通过指定键(key),可以将一条流从逻辑上划分成不同的分区,也就是并行处理的子任务,对应着任务槽(slot);这样具有相同的 key 的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个 slot中进行处理
在keyBy()方法的内部,是通过计算 key 的哈希值(hash code),对分区数进行取模运算来实现的。所以这里 key 如果是 POJO 的话,必须要重写 *hashCode() *方法
实现方法:
// 使用 Lambda 表达式KeyedStream<Event, String> keyedStream = stream.keyBy(e -> e.user);// 使用匿名类实现 KeySelectorKeyedStream<Event, String> keyedStream1 = stream.keyBy(new KeySelector<Event, String>() {@Overridepublic String getKey(Event e) throws Exception {return e.user;}});//结果:将 DataStream 转换为KeyedStream
简单聚合
参数:
传入的参数决定了聚合的依据,
对于元组有两种形式,位置和名称:
stream.keyBy(r -> r.f0).sum(1).print();
(元组中1号位置)
stream.keyBy(r -> r.f0).sum("f1").print();
(元组中名称为f1的数据)
对于POJO,只能传入字段名:
stream.keyBy(e -> e.user).max( "timestamp" ).print();
通过聚合操作,从 KeyedStream 转换成了DataStream
也就是说,先分区,再聚合,得到的依然是一个DataStream
归约聚合(reduce)
reduce 算子是一个一般化的聚合统计操作
实现 ReduceFunction 接口,接口里需要实现 reduce()方法
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 通过reduce算子实现sum和maxBy的功能env.addSource(new ClickSource()).map(new MapFunction<Event, Tuple2<String,Long>>() {@Overridepublic Tuple2<String, Long> map(Event event) throws Exception {return Tuple2.of(event.getUser(),1L);}}).keyBy(e -> e.f0).reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> t1, Tuple2<String, Long> t2) throws Exception {return Tuple2.of(t1.f0, t1.f1+t2.f1);}}).keyBy(e -> true).reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> t1, Tuple2<String, Long> t2) throws Exception {return t1.f1 > t2.f1 ? t1 : t2;}}).print();env.execute();}
用户自定义函数(UDF)
富函数类(Rich Function Classes)
所有的 Flink 函数类都有其Rich 版本
与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能
一个常见的应用场景就是,如果我们希望连接到一个外部数据库进行读写操作,那么将连接操作放在 map()中显然不是个好选择——因为每来一条数据就会重新连接一次数据库;所以我们可以在 open()中建立连接,在 map()中读写数据,而在 close()中关闭连接
另外,富函数类提供了 getRuntimeContext()
方法,可以获取到运行时上下文的一些信息,例如程序执行的并行度,任务名称,以及状态
物理分区
keyBy:逻辑分区
物理分区与 keyBy 一大区别在于,keyBy 之后得到的是一个 KeyedStream,而物理分区之后结果仍是 DataStream,且流中元素数据类型保持不变。从这一点也可以看出,分区算子并不对数据进行转换处理,只是定义了数据的传输方式
- 随机分区(shuffle)
方法:
.shuffle()
每次执行结果都会不相同:
2.轮询分区(Round-Robin)
方法:.rebalance()
;rebalance使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去
3.重缩放分区(rescale)
方法:.rescale()
重缩放和轮询的区别:
①重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中;也就是说,“发牌人”如果有多个,那么 rebalance 的方式是每个发牌人都面向所有人发牌;而 rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌
②从底层实现上看,rebalance 和 rescale 的根本区别在于任务之间的连接机制不同。rebalance将会针对所有上游任务(发送数据方)和所有下游任务(接收数据方)之间建立通信通道,这是一个笛卡尔积的关系;而 rescale 仅仅针对每一个任务和下游对应的部分任务之间建立通信通道,节省了很多资源
示例:
env.addSource(new RichParallelSourceFunction<Integer>() { // 这里使用了并行数据源的富函数版本@Overridepublic void run(SourceContext<Integer> sourceContext) throws Exception {for (int i = 1; i <= 8; i++) {// 将奇数发送到索引为1的并行子任务// 将偶数发送到索引为0的并行子任务if ( i % 2 == getRuntimeContext().getIndexOfThisSubtask()) {sourceContext.collect(i);}}}@Overridepublic void cancel() {}}).setParallelism(2).rescale().print().setParallelism(4);
结果:
可以看到,奇数全部在3,4分区,偶数全部在1,2分区;
如果使用rebalance
方法:
可以看到,奇数偶数平均分配到所有分区
4.广播(broadcast):经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用 DataStream 的 broadcast()
方法,将输入数据复制并发送到下游算子的所有并行任务中去
5.全局分区(global):通过调用.global()
方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了1
6. 自定义分区(Custom):使 用partitionCustom()
方法来自定义分区
// 将自然数按照奇偶分区env.fromElements(1, 2, 3, 4, 5, 6, 7, 8).partitionCustom(new Partitioner<Integer>() {@Overridepublic int partition(Integer key, int numPartitions) { //分区器return key % 2;}}, new KeySelector<Integer, Integer>() { //KeyBy@Overridepublic Integer getKey(Integer value) throws Exception {return value;}}).print().setParallelism(2);env.execute();
Flink 常用API(2)——转换算子+聚合算子相关推荐
- [Flink]Flink常用的DataStream转换算子
目录 3.1 Map 3.2 FlatMap 3.3 Filter 3.4 KeyBy 3.5 Reduce 3.6 Fold 3.7 Aggregations 3.8 Window 3.9 Wind ...
- 大数据之flink常用算子
Flink分为: DataSet(批处理),DataStream(流处理),他们的方法都分别为Source.Transformation.Sink: Source:负责数据的读取 Transforma ...
- 大数据_Flink_Java版_数据处理_流处理API_Transform(3)_Reduce聚合算子---Flink工作笔记0031
然后我们上面一节,说了滚动聚合算子,实现了 传感器数据,我们从流数据中,实时获取温度最大值,并且,对一条数据,我们通过maxBy,也更新了 除了温度值的,其他的字段,维持了一行数据原来的样子 最开始我 ...
- 看完就会flink基础API
文章目录 一.执行环境(Execution Environment) 1.创建执行环境 2.执行模式(Execution Mode) 3.触发程序执行 二.源算子(Source) 1.数据源类准备 2 ...
- Flink Table API和SQL(下)
传送门: Flink Table API和SQL(上)(基本API介绍+流处理表的特性) Flink Table API和SQL(中)(时间属性及窗口+聚合查询+联结查询) Flink Table A ...
- 【基础】Flink -- DataStream API
Flink -- DataStream API 执行环境 Execution Environment 创建执行环境 设置执行模式 触发程序执行 源算子 Source 从集合中读取数据 从文件读取数据 ...
- Flink DataStream API(基础版)
概述 DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的 Flink 代码其实就是基于这种数据类型的处理,所以这套核心API 就以DataStr ...
- Fink DataStream 常用API
Fink DataStream 常用API 一.DataSource 二.Transformation 三.Sink Flink DataStream 常用API主要分为3部分: DataSource ...
- 学习笔记Flink(四)—— Flink基础API及核心数据结构
一.Flink基础API-Flink编程的基本概念 1.1.Flink程序 Flink 程序是实现了分布式集合转换(例如过滤.映射.更新状态.join.分组.定义窗口.聚合)的规范化程序. 集合初始创 ...
最新文章
- module.exports 和 export default
- tc溜溜865手机投屏卡_下半年发布新品手机盘点:骁龙865+是性能之王 红米抢入门市场...
- python 访问需要HTTP Basic Authentication认证的资源
- ASP.NET跨页面传值技巧总结【转】
- has_a php,PHP has encountered a Stack overflow问题解决方法
- python网络爬虫(5)BeautifulSoup的使用示范
- 一个长文档里,包括封面、不同的章节,如果我想封面不设置页眉页脚,每个章节的页眉都不同,请问应该如何设置页眉页脚?
- (2021) 23 [持久化] I/O设备与驱动
- 渗透测试 已学课时 1 个_我14岁上创业课时学到的东西
- 【杂记】我为什么要坚持写博客
- 利用Python实现一个感知机学习算法
- LeetCode--042--接雨水(java版)
- 计算机配色故障,计算机配色模型中存在的限制点
- 由内而外全面造就自己
- 广州物流展相关的个人总结
- 写一个iOS复杂表单的正确姿势
- 小牛采购管理系统 v3.01 bt
- 2022年,有哪些小本生意可以做
- web前端学习笔记——选择器
- 常见web漏洞及防范(转)