大家好,我是雷恩Layne,这是《深入浅出flink》系列的第七篇文章,我旨在用最直白的语言写好flink,希望能让所有看到的人一目了然。如果大家喜欢,欢迎点赞、关注,也欢迎留言,共同交流flink的点点滴滴 O(∩_∩)O

文章目录

  • 1. keyBy
  • 2. broadcast
  • 3. rebalance
  • 4. rescale
  • 5. shuffle
  • 6. global
  • 7. partitionCustom

flink任务在执行过程中,一个流(stream)包含一个或多个分区(Stream partition)。TaskManager中的一个slot的subtask就是一个stream partition(流分区),一个Job的流(stream)分布在多个不同的Slot上执行。每一个算子可以包含一个或多个子任务(subtask),这些subtask执行在不同的分区中,本质是在不同的线程、不同的物理机或不同的容器中彼此互不依赖地执行。

flink中的重分区算子定义上下游subtask之间数据传递的方式。SubTask之间进行数据传递模式有两种,一种是one-to-one(forwarding)模式,另一种是redistributing的模式。

  • One-to-one:数据不需要重新分布,上游SubTask生产的数据与下游SubTask受到的数据完全一致,数据不需要重分区,也就是数据不需要经过IO,比如上图中source->map的数据传递形式就是One-to-One方式。常见的map、fliter、flatMap等算子的SubTask的数据传递都是one-to-one的对应关系。类似于spark中的窄依赖。
  • Redistributing:数据需要通过shuffle过程重新分区,需要经过IO,比如上图中的map->keyBy。创建的keyBy、broadcast、rebalance、shuffle等算子的SubTask的数据传递都是Redistributing方式,但它们具体数据传递方式是不同的。类似于spark中的宽依赖。

flink中的重分区算子除了keyBy以外,还有broadcast、rebalance、shuffle、rescale、global、partitionCustom等多种算子,它们的分区方式各不相同。需要注意的是,这些算子中除了keyBy能将DataStream转化为KeyedStream外,其它重分区算子均不会改变Stream的类型。

现在就让我们来一探究竟。


1. keyBy

keyBy :DataStream -> KeyedStream,按照key的hashcode将一个流划分为不相交的分区。具有相同 Keys 的所有记录在同一分区。

KeyBy主要有三种分区方法:

//1. 根据Tuple第fields个元素的hashcode分区
keyBy(int... fields)//2.根据Bean的fields的hashcode分区
keyBy(String... fields)//3.自定义keySelector提取key来分区,T是传入的数据类型,K是提取key的数据类型
keyBy(KeySelector<T, K> key)

具体示例在之前的文章详细梳理flink中常见的dataSteam算子已介绍过。

2. broadcast

broadcast :DataStream -> DataStream,给下游算子所有的subtask都广播一份数据。

举例:设置并行度为3,打印broadcast下游subtask的数据

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);DataStream<String> dataStream = env.fromElements("hello", "world", "flink");
DataStream<String> broadcast = dataStream.broadcast();broadcast.print();
env.execute();

执行输出:

3> hello
3> world
3> flink
2> hello
2> world
2> flink
1> hello
1> world
1> flink

并行度为3,每一个算子都有三个subtask,所以经过broadcast后,下游每一个subtask就会接收到所有上游任务发送过来的数据。

3. rebalance

rebalance :DataStream -> DataStream,随机轮询发送数据。

举例来说,假如A算子作为上游算子,有3个SubTask,并行度为3;下游B算子,有2个SubTask,并行度为2,数据传递方式是rebalance。数据具体传递形式:首先生成一个随机数,决定第一个数据发往下游的哪个subtask,假如生成随机是i,下游的任务数是n,则A的SubTask1中第一个数据发送到B的第(i+1)%n的subtask,执行i=(i+1)%n, A的SubTask1第二个数据发送到B的第(i+1)%n的subtask,i=(i+1)%n,从而轮询发送数据。同理,A的SubTask2也是如此。

当上下游算子并行度不一样时,默认的数据传递方式是rebalance,当下游算子并行度一样时,默认的数据传递方式是forward。

forward也是flink中的算子,因为它只是让数据在当前的分区进行上下游传递,并没有进行shuffle,所以不属于shuffle类的算子。

看一段简单的代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> dataStream = env.fromElements("hello", "world", "flink");
dataStream.print();
env.execute();

在该代码中,我们设置了程序并行度为3,但实际执行过程中是fromElements算子并行度为1,print并行度3。那为什么fromElements的并行度不是3呢?这是因为这个source没有实现ParallelSourceFunction 接口or 继承RichParallelSourceFunction,具体可参考我的博客:flink常见的单并行度和多并行度Source。

将上述代码打包上传到flink集群,可以查看执行计划如下:

可以看到,上下游subtask的数据传递方式为rebalance,这就我们就能推测数据的执行结果,即Sink的每个subtask都会接收到一个数据:

1> hello
2> world
3> flink

将上述代码修改如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);DataStream<String> dataStream = env.fromElements("hello", "world", "flink");
DataStream<String> rebalance = dataStream.rebalance();rebalance.print();
env.execute();

此时,相当于人为将fromElements和print之间数据传递方式定义为rebalance,也就是说,无论它们的并行度是否相同,数据传递方式都为rebalance。

4. rescale

rescale :DataStream -> DataStream,重新分组,在组内进行rebalance(轮询),数据传输的范围小一点。

如下图所示,假如上游有2个分区(即两个subtask),下游4个分区,rebalance是让每一个上游subtask对下游轮询发送数据,而rescale是将上下游分区的任务平均划分为2组,在每个分组内rebalance发送数据。

5. shuffle

shuffle :DataStream -> DataStream,完全随机发送数据,也就是说,上游任务发送给下游任务的数据是随机发送的。

shuffle的底层是ShufflePartitioner,实现方式如下:

public class ShufflePartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;private Random random = new Random();@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {return random.nextInt(numberOfChannels);}@Overridepublic StreamPartitioner<T> copy() {return new ShufflePartitioner<T>();}@Overridepublic String toString() {return "SHUFFLE";}
}

selectChannel决定了将数据发往下游哪一个分区中,可以看到,代码中是通过random.nextInt生成的,也就是随机发送数据。

6. global

global :DataStream -> DataStream,数据传递给下游第一个分区(或下游第一个slot或下游算子的第一个并行子任务),一般将所有数据汇总在一起时使用。

示例:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);DataStream<String> dataStream = env.fromElements("hello", "world", "flink");
DataStream<String> global = dataStream.global();global.print();
env.execute();

执行输出:

1> hello
1> world
1> flink

可以看到,虽然global的并行度为3,但是只有第一个子任务输出了数据。

7. partitionCustom

partitionCustom :DataStream -> DataStream,用户自定义重分区方式。

示例:自定义分区,按照传入数据的hashcode分区

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<Tuple2<String, Long>> dataStream = env.fromElements(new Tuple2<>("hello", 1L),new Tuple2<>("world", 3L),new Tuple2<>("flink", 5L),new Tuple2<>("world", 99L));DataStream<Tuple2<String, Long>> tuple2DataStream = dataStream.partitionCustom(new Partitioner<String>() {@Overridepublic int partition(String key, int numPartitions) {//key是分区的字段,numPartitions下游分区数return key.hashCode() % numPartitions;}
}, 0);//0将Tuple2的第一个字段作为分区字段tuple2DataStream.print();
env.execute();

执行输出:

1> (world,3)
1> (world,99)
2> (hello,1)
2> (flink,5)

【深入浅出flink】第7篇:从原理剖析flink中所有的重分区方式keyBy、broadcast、rebalance、rescale、shuffle、global、partitionCustom相关推荐

  1. java基础提升篇:深入剖析Java中的装箱和拆箱

    一.什么是装箱?什么是拆箱? 我们知道 Java为每种基本数据类型都提供了对应的包装器类型,至于为什么会为每种基本数据类型提供包装器类型在此不进行阐述,有兴趣的朋友可以查阅相关资料.在Java SE5 ...

  2. Docker的深入浅出(入门新手篇)

    Docker的深入浅出(入门新手篇) (持续更新中......) 什么是Docker? 这个问题百度的话会有很多标准答案,但是晦涩难懂,鄙人从小语文不好所以在以下内容鄙人会用个人粗俗的语音加通俗的理解 ...

  3. 第三十九篇:Flink 面试基础篇

    你好,欢迎来到第 39 课时,本课时我们主要讲解"Flink 面试-基础篇". 到目前为止,关于 Flink 的学习我们就告一段落了,接下来我们将进入最后一个面试模块的学习.在当前 ...

  4. Elasticsearch分布式一致性原理剖析(一)-节点篇

    2019独角兽企业重金招聘Python工程师标准>>> 摘要: ES目前是最流行的开源分布式搜索引擎系统,其使用Lucene作为单机存储引擎并提供强大的搜索查询能力.学习其搜索原理, ...

  5. Elasticsearch分布式一致性原理剖析(三)-Data篇

    前言 "Elasticsearch分布式一致性原理剖析"系列将会对Elasticsearch的分布式一致性原理进行详细的剖析,介绍其实现方式.原理以及其存在的问题等(基于6.2版本 ...

  6. python原理书籍_python书籍推荐:《深入浅出深度学习:原理剖析与Python实践》

    在过去的这十年,深度学习已经席卷了整个科技界和工业界,2016年谷歌阿尔法狗打败围棋世界冠军李世石,更是使其成为备受瞩目的技术焦点. 今日,小编就为大家推荐一本能让初学者和"老司机" ...

  7. 【es】es 分布式一致性原理剖析(二)-Meta篇

    1.概述 转载:Elasticsearch分布式一致性原理剖析(二)-Meta篇 前言 "Elasticsearch分布式一致性原理剖析"系列将会对Elasticsearch的分布 ...

  8. 【es】es 分布式一致性原理剖析(三)-Data篇

    1.概述 转载:Elasticsearch分布式一致性原理剖析(三)-Data篇 前言 "Elasticsearch分布式一致性原理剖析"系列将会对Elasticsearch的分布 ...

  9. 【es】es 分布式一致性原理剖析 节点篇

    1.概述 好文章:Elasticsearch分布式一致性原理剖析(一)-节点篇 前言 "Elasticsearch分布式一致性原理剖析"系列将会对Elasticsearch的分布式 ...

  10. 原理剖析(第 009 篇)ReentrantReadWriteLock工作原理分析

    2019独角兽企业重金招聘Python工程师标准>>> 原理剖析(第 009 篇)ReentrantReadWriteLock工作原理分析 一.大致介绍 1.在前面章节了解了AQS和 ...

最新文章

  1. Spring事务异常回滚,try catch 捕获异常不回滚
  2. pytho作线性拟合、多项式拟合、对数拟合
  3. Project Tango 的一些应用
  4. TEGer看过来,他二哥带你去看大世界!
  5. House Building HDU - 5538
  6. 长生不死、名人复活?疯狂的AI时代,人类竟要靠IA实现“永生”
  7. 在线求CR,你觉得我这段Java代码还有优化的空间吗?
  8. jdk,Eclipse,SWTDesigner安装【原创】
  9. hive load data外部表报错_从0开始学大数据-Hive基础篇
  10. pmp培训机构哪个好?各pmp培训机构排名如何?
  11. java小项目-房屋出租系统
  12. C++句柄类(智能指针)小结
  13. 参数问题:nested exception is java.lang.NumberFormatException: For input string: “null“,已解决。
  14. 常用信号去噪与信号回归方法的原理及MATLAB实现
  15. c语言挖地雷游戏,c扫雷小游戏
  16. 串口服务器调试助手使用教程,串口调试助手使用教程【操作方式】
  17. qcap 教程_给winpe添加explorer教程(续):文件列表
  18. IPC、Binder及AIDL原理机制
  19. 上线项目 Docker部署项目到服务器总结
  20. vue利用事件委托实现按钮互斥,并传递对应的值

热门文章

  1. 大疆2018网申之机器学习算法工程师笔试题B卷
  2. html打印去掉页码和日期,PPT打印讲义时如何去掉日期页码?
  3. 微信公众号采集之免费采集公众号爆文工具
  4. mysql生成数据字典
  5. 一、Python复习教程(重点)- 基础
  6. JS 中提交表单Form方法
  7. 问题:office应用(word、ppt、excel、oneNote) 您的组织策略阻止我们为您完成此操作 解决办法
  8. 安卓模拟ibeacon_android iBeacon开发模拟实例
  9. switch中使用枚举
  10. mysql 登录 无密码_重置mysql的密码/无密码登录mysql