转换算子(Transformation)

映射(map)

用于将数据流中的数据进行转换,形成新的数据流

“一一映射”,消费一个元素就产出一个元素

参数:接口 MapFunction 的实现

方法:map

返回值类型:DataStream

通过匿名类/实现类实现:

// 传入匿名类,实现 MapFunction
//Event是输入,String是输出
stream.map(new MapFunction<Event, String>() {@Overridepublic String map(Event e) throws Exception {return e.user;}});// 传入 MapFunction 的实现类stream.map(new UserExtractor()).print();env.execute();
public static class UserExtractor implements MapFunction<Event, String> {@Overridepublic String map(Event e) throws Exception {return e.user;}}

源码相关:

调用map方法,可以看到返回的是一个SingleOutputStreamOperator

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {TypeInformation<R> outType = TypeExtractor.getMapReturnTypes((MapFunction)this.clean(mapper), this.getType(), Utils.getCallLocationName(), true);return this.map(mapper, outType);}

而SingleOutputStreamOperator 类本身也继承自 DataStream 类,在DataStream类的基础上对一些参数进行了更详细的设置;

public class SingleOutputStreamOperator<T> extends DataStream<T>

过滤(filter)

参数:FilterFunction 接口的实现

方法:filter

返回值类型:DataStream

 // 传入匿名类实现FilterFunctionstream.filter(new FilterFunction<Event>() {@Overridepublic boolean filter(Event e) throws Exception {return e.user.equals("Mary");}});// 传入FilterFunction实现类stream.filter(new UserFilter()).print();env.execute();}public static class UserFilter implements FilterFunction<Event> {@Overridepublic boolean filter(Event e) throws Exception {return e.user.equals("Mary");}}

扁平映射(flatMap)

先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理

参数:FlatMapFunction 接口的实现

方法:flatMap

返回值类型:flatMap 并没有直接定义返回值类型,而是通过一个“收集器”(Collector)来指定输出

同 map 一样,flatMap 也可以使用 Lambda 表达式或者 FlatMapFunction 接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同

public static class MyFlatMap implements FlatMapFunction<Event, String> {@Overridepublic void flatMap(Event value, Collector<String> out) throws Exception
{if (value.user.equals("Mary")) {out.collect(value.user);} else if (value.user.equals("Bob")) {out.collect(value.user);out.collect(value.url);}}}

聚合算子(Aggregation)

按键分区(keyBy)

对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率,因此要聚合,先分区;

keyBy 通过指定键(key),可以将一条流从逻辑上划分成不同的分区,也就是并行处理的子任务,对应着任务槽(slot);这样具有相同的 key 的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个 slot中进行处理

在keyBy()方法的内部,是通过计算 key 的哈希值(hash code),对分区数进行取模运算来实现的。所以这里 key 如果是 POJO 的话,必须要重写 *hashCode() *方法

实现方法:

// 使用 Lambda 表达式KeyedStream<Event, String> keyedStream = stream.keyBy(e -> e.user);// 使用匿名类实现 KeySelectorKeyedStream<Event, String> keyedStream1 = stream.keyBy(new KeySelector<Event, String>() {@Overridepublic String getKey(Event e) throws Exception {return e.user;}});//结果:将 DataStream 转换为KeyedStream

简单聚合

参数:

传入的参数决定了聚合的依据,

对于元组有两种形式,位置和名称:

stream.keyBy(r -> r.f0).sum(1).print();(元组中1号位置)

stream.keyBy(r -> r.f0).sum("f1").print();(元组中名称为f1的数据)

对于POJO,只能传入字段名:

stream.keyBy(e -> e.user).max( "timestamp" ).print();

通过聚合操作,从 KeyedStream 转换成了DataStream

也就是说,先分区,再聚合,得到的依然是一个DataStream

归约聚合(reduce)

reduce 算子是一个一般化的聚合统计操作

实现 ReduceFunction 接口,接口里需要实现 reduce()方法

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 通过reduce算子实现sum和maxBy的功能env.addSource(new ClickSource()).map(new MapFunction<Event, Tuple2<String,Long>>() {@Overridepublic Tuple2<String, Long> map(Event event) throws Exception {return Tuple2.of(event.getUser(),1L);}}).keyBy(e -> e.f0).reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> t1, Tuple2<String, Long> t2) throws Exception {return Tuple2.of(t1.f0, t1.f1+t2.f1);}}).keyBy(e -> true).reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> t1, Tuple2<String, Long> t2) throws Exception {return t1.f1 > t2.f1 ? t1 : t2;}}).print();env.execute();}

用户自定义函数(UDF)

富函数类(Rich Function Classes)

所有的 Flink 函数类都有其Rich 版本

与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能

一个常见的应用场景就是,如果我们希望连接到一个外部数据库进行读写操作,那么将连接操作放在 map()中显然不是个好选择——因为每来一条数据就会重新连接一次数据库;所以我们可以在 open()中建立连接,在 map()中读写数据,而在 close()中关闭连接

另外,富函数类提供了 getRuntimeContext()方法,可以获取到运行时上下文的一些信息,例如程序执行的并行度,任务名称,以及状态

物理分区

keyBy:逻辑分区

物理分区与 keyBy 一大区别在于,keyBy 之后得到的是一个 KeyedStream,而物理分区之后结果仍是 DataStream,且流中元素数据类型保持不变。从这一点也可以看出,分区算子并不对数据进行转换处理,只是定义了数据的传输方式


  1. 随机分区(shuffle)

    方法:.shuffle()

每次执行结果都会不相同:

2.轮询分区(Round-Robin)

方法:.rebalance();rebalance使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去

3.重缩放分区(rescale)

方法:.rescale()

重缩放和轮询的区别:

①重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中;也就是说,“发牌人”如果有多个,那么 rebalance 的方式是每个发牌人都面向所有人发牌;而 rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌

②从底层实现上看,rebalance 和 rescale 的根本区别在于任务之间的连接机制不同。rebalance将会针对所有上游任务(发送数据方)和所有下游任务(接收数据方)之间建立通信通道,这是一个笛卡尔积的关系;而 rescale 仅仅针对每一个任务和下游对应的部分任务之间建立通信通道,节省了很多资源

示例:

 env.addSource(new RichParallelSourceFunction<Integer>() {  // 这里使用了并行数据源的富函数版本@Overridepublic void run(SourceContext<Integer> sourceContext) throws Exception {for (int i = 1; i <= 8; i++) {// 将奇数发送到索引为1的并行子任务// 将偶数发送到索引为0的并行子任务if ( i % 2 == getRuntimeContext().getIndexOfThisSubtask()) {sourceContext.collect(i);}}}@Overridepublic void cancel() {}}).setParallelism(2).rescale().print().setParallelism(4);

结果:

可以看到,奇数全部在3,4分区,偶数全部在1,2分区;

如果使用rebalance方法:

可以看到,奇数偶数平均分配到所有分区

4.广播(broadcast):经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用 DataStream 的 broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去

5.全局分区(global):通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了1

6. 自定义分区(Custom):使 用partitionCustom()方法来自定义分区

// 将自然数按照奇偶分区env.fromElements(1, 2, 3, 4, 5, 6, 7, 8).partitionCustom(new Partitioner<Integer>() {@Overridepublic int partition(Integer key, int numPartitions) { //分区器return key % 2;}}, new KeySelector<Integer, Integer>() { //KeyBy@Overridepublic Integer getKey(Integer value) throws Exception {return value;}}).print().setParallelism(2);env.execute();

Flink 常用API(2)——转换算子+聚合算子相关推荐

  1. [Flink]Flink常用的DataStream转换算子

    目录 3.1 Map 3.2 FlatMap 3.3 Filter 3.4 KeyBy 3.5 Reduce 3.6 Fold 3.7 Aggregations 3.8 Window 3.9 Wind ...

  2. 大数据之flink常用算子

    Flink分为: DataSet(批处理),DataStream(流处理),他们的方法都分别为Source.Transformation.Sink: Source:负责数据的读取 Transforma ...

  3. 大数据_Flink_Java版_数据处理_流处理API_Transform(3)_Reduce聚合算子---Flink工作笔记0031

    然后我们上面一节,说了滚动聚合算子,实现了 传感器数据,我们从流数据中,实时获取温度最大值,并且,对一条数据,我们通过maxBy,也更新了 除了温度值的,其他的字段,维持了一行数据原来的样子 最开始我 ...

  4. 看完就会flink基础API

    文章目录 一.执行环境(Execution Environment) 1.创建执行环境 2.执行模式(Execution Mode) 3.触发程序执行 二.源算子(Source) 1.数据源类准备 2 ...

  5. Flink Table API和SQL(下)

    传送门: Flink Table API和SQL(上)(基本API介绍+流处理表的特性) Flink Table API和SQL(中)(时间属性及窗口+聚合查询+联结查询) Flink Table A ...

  6. 【基础】Flink -- DataStream API

    Flink -- DataStream API 执行环境 Execution Environment 创建执行环境 设置执行模式 触发程序执行 源算子 Source 从集合中读取数据 从文件读取数据 ...

  7. Flink DataStream API(基础版)

    概述   DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的 Flink 代码其实就是基于这种数据类型的处理,所以这套核心API 就以DataStr ...

  8. Fink DataStream 常用API

    Fink DataStream 常用API 一.DataSource 二.Transformation 三.Sink Flink DataStream 常用API主要分为3部分: DataSource ...

  9. 学习笔记Flink(四)—— Flink基础API及核心数据结构

    一.Flink基础API-Flink编程的基本概念 1.1.Flink程序 Flink 程序是实现了分布式集合转换(例如过滤.映射.更新状态.join.分组.定义窗口.聚合)的规范化程序. 集合初始创 ...

最新文章

  1. module.exports 和 export default
  2. tc溜溜865手机投屏卡_下半年发布新品手机盘点:骁龙865+是性能之王 红米抢入门市场...
  3. python 访问需要HTTP Basic Authentication认证的资源
  4. ASP.NET跨页面传值技巧总结【转】
  5. has_a php,PHP has encountered a Stack overflow问题解决方法
  6. python网络爬虫(5)BeautifulSoup的使用示范
  7. 一个长文档里,包括封面、不同的章节,如果我想封面不设置页眉页脚,每个章节的页眉都不同,请问应该如何设置页眉页脚?
  8. (2021) 23 [持久化] I/O设备与驱动
  9. 渗透测试 已学课时 1 个_我14岁上创业课时学到的东西
  10. 【杂记】我为什么要坚持写博客
  11. 利用Python实现一个感知机学习算法
  12. LeetCode--042--接雨水(java版)
  13. 计算机配色故障,计算机配色模型中存在的限制点
  14. 由内而外全面造就自己
  15. 广州物流展相关的个人总结
  16. 写一个iOS复杂表单的正确姿势
  17. 小牛采购管理系统 v3.01 bt
  18. 2022年,有哪些小本生意可以做
  19. web前端学习笔记——选择器
  20. 常见web漏洞及防范(转)

热门文章

  1. 2、星光STM32F03串口通信调试
  2. 如何在Hexo博客发布文章
  3. 2021年熔化焊接与热切割考试题及熔化焊接与热切割模拟考试题
  4. H - Hangar Hurdles
  5. Linux提权,吃透这篇文章就够了
  6. 保姆级Red Hat没有yum命令、报错This system is not registered to Red Hat Subscription Management.
  7. ASP.NET 2.0 中的主版頁面
  8. 【音乐随想】道,流浪者之歌 与神思者
  9. Miscellaneos:ISV
  10. 学习心得,书读百遍其义自见!