第 3 章

3.1.1 环境配置

Flink 是一个分布式的流处理框架,所以实际应用一般都需要搭建集群环境。我们在进行Flink 安装部署的学习时,需要准备 3 台 Linux 机器。具体要求如下:

  1. 系统环境为 CentOS 7.5 版本。
  2. 安装 Java 8。
  3. 安装 Hadoop 集群,Hadoop 建议选择 Hadoop 2.7.5 以上版本。
  4. 配置集群节点服务器间时间同步以及免密登录,关闭防火墙。

3.1.2 本地启动

最简单的启动方式,其实是不搭建集群,直接本地启动。本地部署非常简单,直接解压安装包就可以使用,不用进行任何配置;一般用来做一些简单的测试。具体安装步骤如下:

  1. 下载安装包
    进入 Flink 官网,下载 1.13.0 版本安装包 flink-1.13.0-bin-scala_2.12.tgz,注意此处选用对应 scala 版本为 scala 2.12 的安装包。
    网址如下:
    https://flink.apache.org/zh/downloads.html#apache-flink-1136
  2. 解压
    在 hadoop102 节点服务器上创建安装目录/opt/module,将 flink 安装包放在该目录下,并执行解压命令,解压至当前目录。

[liujiahao@hadoop102 software]$ tar -zxvf flink-1.13.0-bin-scala_2.12.tgz -C /opt/module/

  1. 启动
    进入解压后的目录,执行启动命令

$ cd flink-1.13.0/
$ bin/start-cluster.sh

  1. 访问 Web UI
    启动成功后,访问 http://hadoop102:8081,可以对 flink 集群和任务进行监控管理,如图

  2. 关闭集群
    如果想要让 Flink 集群停止运行,可以执行以下命令:

$ bin/stop-cluster.sh

3.1.3 集群启动

  • 可以看到,Flink 本地启动非常简单,直接执行 start-cluster.sh 就可以了。如果我们想要扩展成集群,其实启动命令是不变的,主要是需要指定节点之间的主从关系。
  • Flink 是典型的 Master-Slave 架构的分布式数据处理框架,其中 Master 角色对应着JobManager,Slave 角色则对应 TaskManager。我们对三台节点服务器的角色分配如表 3-1 所示。

  • 具体安装部署步骤如下:
  1. 下载并解压安装包
    具体操作与上节相同。
  2. 修改集群配置
    (1)进入 conf 目录下,修改 flink-conf.yaml 文件,修改jobmanager.rpc.address 参数为hadoop102,如下所示:

$ cd conf/
$ vim flink-conf.yaml

# JobManager 节点地址
jobmanager.rpc.address: hadoop102

这就指定了 hadoop102 节点服务器为 JobManager 节点。
(2)修改 workers 文件,将另外两台节点服务器添加为本 Flink 集群的 TaskManager 节点,具体修改如下:

$ vim workers

hadoop103
hadoop104

这样就指定了 hadoop103 和 hadoop104 为 TaskManager 节点。
(3)另外,在 flink-conf.yaml 文件中还可以对集群中的 JobManager 和 TaskManager 组件
进行优化配置,主要配置项如下:

  • jobmanager.memory.process.size:对 JobManager 进程可使用到的全部内存进行配置,包括 JVM 元空间和其他开销,默认为 1600M,可以根据集群规模进行适当调整。
  • taskmanager.memory.process.size:对 TaskManager 进程可使用到的全部内存进行配置,包括 JVM 元空间和其他开销,默认为 1600M,可以根据集群规模进行适当调整。
  • taskmanager.numberOfTaskSlots:对每个 TaskManager 能够分配的 Slot 数量进行配置,默认为 1,可根据 TaskManager 所在的机器能够提供给 Flink 的 CPU 数量决定。所谓Slot 就是 TaskManager 中具体运行一个任务所分配的计算资源。
  • parallelism.default:Flink 任务执行的默认并行度,优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。
  • 关于 Slot 和并行度的概念,我们会在下一章做详细讲解。
  1. 分发安装目录
    配置修改完毕后,将 Flink 安装目录发给另外两个节点服务器。

$ scp -r ./flink-1.13.0 liujiahao@hadoop103:/opt/module
$ scp -r ./flink-1.13.0 liujiahao@hadoop104:/opt/module

我这边用的是写好的脚本

$ xsync flink-1.13.0

  1. 启动集群
    (1)在 hadoop102 节点服务器上执行 start-cluster.sh 启动 Flink 集群:

$ bin/start-cluster.sh

Starting cluster.
Starting standalonesession daemon on host hadoop102.
Starting taskexecutor daemon on host hadoop103.
Starting taskexecutor daemon on host hadoop104.

(2)查看进程情况:

  1. 访问 Web UI
    启动成功后,同样可以访问 http://hadoop102:8081 对 flink 集群和任务进行监控管理,如图

这里可以明显看到,当前集群的 TaskManager 数量为 2;由于默认每个 TaskManager 的 Slot数量为 1,所以总 Slot 数和可用 Slot 数都为 2。

3.1.4 向集群提交作业

  • 在上一章中,我们已经编写了词频统计的批处理和流处理的示例程序,并在开发环境的模拟集群上做了运行测试。现在既然已经有了真正的集群环境,那接下来我们就要把作业提交上去执行了。
  • 本节我们将以流处理的程序为例,演示如何将任务提交到集群中进行执行。具体步骤如下。
  1. 程序打包
    (1)为方便自定义结构和定制依赖,我们可以引入插件 maven-assembly-plugin 进行打包。在 FlinkTutorial 项目的 pom.xml 文件中添加打包插件的配置,具体如下:
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>

(2)插件配置完毕后,可以使用 IDEA 的 Maven 工具执行 package 命令,出现如下提示即表示打包成功。
打 包 完 成 后 , 在 target 目 录 下 即 可 找 到 所 需 JAR 包 , JAR 包 会 有 两 个 ,FlinkTutorial-1.0-SNAPSHOT.jar 和 FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar,因为集群中已经具备任务运行所需的所有依赖,所以建议使用 FlinkTutorial-1.0-SNAPSHOT.jar。

  1. 在 Web UI 上提交作业
    (1)任务打包完成后,我们打开 Flink 的 WEB UI 页面,在右侧导航栏点击“Submit New Job”,然后点击按钮“+ Add New”,选择要上传运行的 JAR 包,如图

(2)点击该 JAR 包,出现任务配置页面,进行相应配置。
主要配置程序入口主类的全类名,任务运行的并行度,任务运行所需的配置参数和保存点路径等,如图所示,配置完成后,即可点击按钮“Submit”,将任务提交到集群运行

(3)任务提交成功之后,可点击左侧导航栏的“Running Jobs”查看程序运行列表情况,如图(记得hadoop102那开启socket端口:nc -lk 7777,否则会报错)


(4)点击该任务,可以查看任务运行的具体情况,也可以通过点击“Cancel Job”结束任务运行,如图

hadoop102发送数据

hello flink
hello spark
hello world


在Task Managers 的Stuout中能看到结果(为什么不在jobmanager里看?因为taskmanager才是干活的那个,他负责执行jar包里定义好的操作,所以输出也在这儿看),因为并行度是 2 ,所以分了两个线程输出。



3. 命令行提交作业
除了通过 WEB UI 界面提交任务之外,也可以直接通过命令行来提交任务。这里为方便起见,我们可以先把 jar 包直接上传到目录 flink-1.13.0 下

(1)首先需要启动集群。

$ bin/start-cluster.sh

(2)在 hadoop102 中执行以下命令启动 netcat。

$ nc -lk 7777

(3)进入到 Flink 的安装路径下,在命令行使用 flink run 命令提交作业。(这里其实在hadoop103节点上提交也可以,因为提交命令中指定了运行的节点和端口号)

[liujiahao@hadoop103 flink-1.13.0]$ bin/flink run -m hadoop102:8081 -c online.liujiahao.flink01.StreamWordCount -p 2 ./FlinkDemo-1.0-SNAPSHOT.jar

这里的参数 –m 指定了提交到的 JobManager,-c 指定了入口类 -p指定并行度。
(4)在浏览器中打开 Web UI,http://hadoop102:8081 查看应用执行情况,如图

用 netcat 输入数据,可以在 TaskManager 的标准输出(Stdout)看到对应的统计结果。

3.2 部署模式

在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求。Flink 为各种场景提供了不同的部署模式,主要有以下三种:

  • 会话模式(Session Mode)
  • 单作业模式(Per-Job Mode)
  • 应用模式(Application Mode)

它们的区别主要在于:集群的生命周期以及资源的分配方式;以及应用的 main 方法到底在哪里执行——**客户端(Client)**还是 JobManager。接下来我们就做一个展开说明。

3.2.1 会话模式(Session Mode)

会话模式其实最符合常规思维。我们需要先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业,如图所示。集群启动时所有资源就都已经确定,所以所有提交的作业会竞争集群中的资源。

  • 这样的好处很明显,我们只需要一个集群,就像一个大箱子,所有的作业提交之后都塞进去;集群的生命周期是超越于作业之上的,铁打的营盘流水的兵,作业结束了就释放资源,集群依然正常运行。当然缺点也是显而易见的:因为资源是共享的,所以资源不够了,提交新的作业就会失败。另外,同一个 TaskManager 上可能运行了很多作业,如果其中一个发生故障导致 TaskManager 宕机,那么所有作业都会受到影响。
  • 我们在 3.1 节中先启动集群再提交作业,这种方式其实就是会话模式。
  • 会话模式比较适合于单个规模小、执行时间短的大量作业

3.2.2 单作业模式(Per-Job Mode)

  • 会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个提交的作业启动一个集群,这就是所谓的单作业(Per-Job)模式,如图
  • 单作业模式也很好理解,就是严格的一对一集群只为这个作业而生。同样由客户端运行应用程序,然后启动集群,作业被提交给 JobManager,进而分发给 TaskManager 执行。作业完成后,集群就会关闭,所有资源也会释放。这样一来,每个作业都有它自己的JobManager管理,占用独享的资源,即使发生故障,它的 TaskManager 宕机也不会影响其他作业
  • 这些特性使得单作业模式在生产环境运行更加稳定,所以是实际应用的首选模式
  • 需要注意的是,Flink 本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理框架来启动集群,比如 YARN、Kubernetes。

3.2.3 应用模式(Application Mode)

  • 前面提到的两种模式下,应用代码都是在客户端上执行,然后由客户端提交给 JobManager的。但是这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给JobManager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗。
  • 所以解决办法就是,我们不要客户端了,直接把应用提交到 JobManger 上运行。而这也就代表着,我们需要为每一个提交的应用单独启动一个 JobManager,也就是创建一个集群。这个 JobManager 只为执行这一个应用而存在,执行结束之后 JobManager 也就关闭了,这就是所谓的应用模式,如图
  • 应用模式与单作业模式,都是提交作业之后才创建集群;单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由 JobManager 执行应用程序的,并且即使应用包含了多个作业,也只创建一个集群。
  • 总结一下,在会话模式下,集群的生命周期独立于集群上运行的任何作业的生命周期,并且提交的所有作业共享资源。而单作业模式为每个提交的作业创建一个集群,带来了更好的资源隔离,这时集群的生命周期与作业的生命周期绑定。最后,应用模式为每个应用程序创建一个会话集群,在 JobManager 上直接调用应用程序的 main()方法。(而客户端模式和但作业模式都是在客户端上调用的程序的main方法)
  • 我们所讲到的部署模式,相对是比较抽象的概念。实际应用时,一般需要和资源管理平台结合起来,选择特定的模式来分配资源、部署应用。接下来,我们就针对不同的资源提供者(Resource Provider)的场景,具体介绍 Flink 的部署方式。

3.3 独立模式(Standalone)

  • 独立模式(Standalone)是部署 Flink 最基本也是最简单的方式:所需要的所有 Flink 组件,都只是操作系统上运行的一个 JVM 进程。
  • 独立模式是独立运行的,不依赖任何外部的资源管理平台;当然独立也是有代价的:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理。所以独立模式一般只用在开发测试或作业非常少的场景下。
  • 另外,我们也可以将独立模式的集群放在容器中运行。Flink 提供了独立模式的容器化部署方式,可以在 Docker 或者 Kubernetes 上进行部署。

3.3.1 会话模式部署

  • 可以发现,独立模式的特点是不依赖外部资源管理平台,而会话模式的特点是先启动集群、后提交作业。所以,我们在第 3.1 节用的就是独立模式(Standalone)的会话模式部署。

3.3.2 单作业模式部署

  • 在 3.2.2 节中我们提到,Flink 本身无法直接以单作业方式启动集群,一般需要借助一些资源管理平台。所以 Flink 的独立(Standalone)集群并不支持单作业模式部署。

3.3.3 应用模式部署

  • 应用模式下不会提前创建集群,所以不能调用 start-cluster.sh 脚本。我们可以使用同样在bin 目录下的 standalone-job.sh 来创建一个 JobManager。
    具体步骤如下:
    (1)进入到 Flink 的安装路径下,将应用程序的 jar 包放到 lib/目录下。

$ cp ./FlinkTutorial-1.0-SNAPSHOT.jar lib/

(2)执行以下命令,启动 JobManager。

$ ./bin/standalone-job.sh start --job-classname online.liujiahao.flink01.StreamWordCount

这里我们直接指定作业入口类,脚本会到 lib 目录扫描所有的 jar 包。
(3)同样是使用 bin 目录下的脚本,启动 TaskManager。

$ ./bin/taskmanager.sh start

(4)如果希望停掉集群,同样可以使用脚本,命令如下。

$ ./bin/standalone-job.sh stop
$ ./bin/taskmanager.sh stop

3.3.4 高可用(High Availability )

  • 分布式除了提供高吞吐,另一大好处就是有更好的容错性。对于 Flink 而言,因为一般会有多个 TaskManager,即使运行时出现故障,也不需要将全部节点重启,只要尝试重启故障节点就可以了。但是我们发现,针对一个作业而言,管理它的 JobManager 却只有一个,这同样有可能出现单点故障。为了实现更好的可用性,我们需要 JobManager 做一些主备冗余,这就是所谓的高可用(High Availability,简称 HA)。
  • 我们可以通过配置,让集群在任何时候都有一个主 JobManager 和多个备用 JobManagers,如图 3-13 所示,这样主节点故障时就由备用节点来接管集群,接管后作业就可以继续正常运行。主备 JobManager 实例之间没有明显的区别,每个 JobManager 都可以充当主节点或者备节点。


具体配置如下:
(1)进入 Flink 的安装路径下的 conf 目录下,修改配置文件: flink-conf.yaml,增加如下配置。

high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop102:9820/flink/standalone/ha
high-availability.zookeeper.quorum:
hadoop102:2181,hadoop103:2181,hadoop104:2181
high-availability.zookeeper.path.root: /flink-standalone
high-availability.cluster-id: /cluster_liujiahao

(2)修改配置文件: masters,配置备用 JobManager 列表。

hadoop102:8081
hadoop103:8081

(3)分发修改后的配置文件到其他节点服务器。
(4)在/etc/profile.d/my_env.sh 中配置环境变量

export HADOOP_CLASSPATH=hadoop classpath

在配置过程中,需要注意以下几点:
⚫ 需要提前保证 HAOOP_HOME 环境变量配置成功
⚫ 分发到其他节点
具体部署方法如下:
(1)首先启动 HDFS 集群和 Zookeeper 集群。
(2)执行以下命令,启动 standalone HA 集群。

$ bin/start-cluster.sh

(3)可以分别访问两个备用 JobManager 的 Web UI 页面。
http://hadoop102:8081
http://hadoop103:8081
(4)在 zkCli.sh 中查看谁是 leader。

[zk: localhost:2181(CONNECTED) 1] get
/flink-standalone/cluster_atguigu/leader/rest_server_lock

杀死 hadoop102 上的 Jobmanager, 再看 leader。

[zk: localhost:2181(CONNECTED) 7] get
/flink-standalone/cluster_atguigu/leader/rest_server_lock

注意: 不管是不是 leader,从 WEB UI 上是看不到区别的, 都可以提交应用。

3.4 YARN 模式

  • 独立(Standalone)模式由 Flink 自身提供资源,无需其他框架,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但我们知道,Flink 是大数据计算框架,不是资源调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架集成更靠谱。而在目前大数据生态中,国内应用最为广泛的资源管理平台就是 YARN 了。所以接下来我们就将学习,在强大的 YARN 平台上 Flink 是如何集成部署的。
  • 整体来说,YARN 上部署的过程是:客户端把 Flink 应用提交给 Yarn 的 ResourceManager, Yarn 的 ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上,Flink 会部署JobManager 和 TaskManager 的实例,从而启动集群。Flink 会根据运行在 JobManger 上的作业所需要的 Slot 数量动态分配 TaskManager 资源。

3.4.1 相关准备和配置

  • 在 Flink1.8.0 之前的版本,想要以 YARN 模式部署 Flink 任务时,需要 Flink 是有 Hadoop支持的。从 Flink 1.8 版本开始,不再提供基于 Hadoop 编译的安装包,若需要 Hadoop 的环境支持,需要自行在官网下载Hadoop 相关版本的组件 flink-shaded-hadoop-2-uber-2.7.5-10.0.jar,并将该组件上传至 Flink 的 lib 目录下。在 Flink 1.11.0 版本之后,增加了很多重要新特性,其中就包括增加了对Hadoop3.0.0以及更高版本Hadoop的支持,不再提供“flink-shaded-hadoop-*”jar 包,而是通过配置环境变量完成与 YARN 集群的对接。
  • 在将 Flink 任务部署至 YARN 集群之前,需要确认集群是否安装有 Hadoop,保证 Hadoop版本至少在 2.2 以上,并且集群中安装有 HDFS 服务。
    具体配置步骤如下:
    (1)按照 3.1 节所述,下载并解压安装包,并将解压后的安装包重命名为 flink-1.13.0-yarn,本节的相关操作都将默认在此安装路径下执行。
    (2)配置环境变量,增加环境变量配置如下:

$ sudo vim /etc/profile.d/my_env.sh

HADOOP_HOME=/opt/module/hadoop-2.7.5
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`

这里必须保证设置了环境变量 HADOOP_CLASSPATH。
(3)启动 Hadoop 集群,包括 HDFS 和 YARN。

[liujiahao@hadoop102 ~]$ start-dfs.sh
[liujiahao@hadoop103 ~]$ start-yarn.sh

分别在 3 台节点服务器查看进程启动情况。

[liujiahao@hadoop102 ~]$ jps

5190 Jps
5062 NodeManager
4408 NameNode
4589 DataNode

[liujiahao@hadoop103 ~]$ jps

5425 Jps
4680 ResourceManager
5241 NodeManager
4447 DataNode

[liujiahao@hadoop104 ~]$ jps

4731 NodeManager
4333 DataNode
4861 Jps
4478 SecondaryNameNode

(4)进入 conf 目录,修改 flink-conf.yaml 文件,修改以下配置,这些配置项的含义在进行 Standalone 模式配置的时候进行过讲解,若在提交命令中不特定指明,这些配置将作为默认配置。

$ cd /opt/module/flink-1.13.0-yarn/conf/

$ vim flink-conf.yaml

jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1

3.4.2 会话模式部署

YARN 的会话模式与独立集群略有不同,需要首先申请一个 YARN 会话(YARN session)来启动 Flink 集群。具体步骤如下:

  1. 启动集群
    (1)启动 hadoop 集群(HDFS, YARN)。
    (2)执行脚本命令向 YARN 集群申请资源,开启一个 YARN 会话,启动 Flink 集群。

$ bin/yarn-session.sh -nm test

可用参数解读:

  • -d:分离模式,如果你不想让 Flink YARN 客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session 也可以后台运行。
  • -jm(–jobManagerMemory):配置 JobManager 所需内存,默认单位 MB。
  • -nm(–name):配置在 YARN UI 界面上显示的任务名。
  • -qu(–queue):指定 YARN 队列名。
  • -tm(–taskManager):配置每个 TaskManager 所使用内存。

注意:Flink1.11.0 版本不再使用-n 参数和-s 参数分别指定 TaskManager 数量和 slot 数量,YARN 会按照需求动态分配 TaskManager 和 slot。所以从这个意义上讲,YARN 的会话模式也不会把集群资源固定,同样是动态分配的。

YARN Session 启动之后会给出一个 web UI 地址以及一个 YARN application ID,如下所示,用户可以通过 web UI 或者命令行两种方式提交作业。

第 4 章 Flink 运行时架构

4.1 系统架构

  • 对于数据处理系统的架构,最简单的实现方式当然就是单节点。当数据量增大、处理计算更加复杂时,我们可以考虑增加 CPU 数量、加大内存,也就是让这一台机器变得性能更强大,从而提高吞吐量——这就是所谓的 SMP(Symmetrical Multi-Processing,对称多处理)架构。但是这样做问题非常明显:所有 CPU 是完全平等、共享内存和总线资源的,这就势必造成资源竞争;而且随着 CPU 核心数量的增加,机器的成本会指数增长,所以 SMP 的可扩展性是比较差的,无法应对海量数据的处理场景。
  • 于是人们提出了“不共享任何东西”(share-nothing)的分布式架构。从以 Greenplum 为代表的 MPP(Massively Parallel Processing,大规模并行处理)架构,到 Hadoop、Spark 为代表的批处理架构,再到 Storm、Flink 为代表的流处理架构,都是以分布式作为系统架构的基本形态的。
  • 我们已经知道,Flink 就是一个分布式的并行流处理系统。简单来说,它会由多个进程构成,这些进程一般会分布运行在不同的机器上。
  • 正如一个团队,人多了就会难以管理;对于一个分布式系统来说,也需要面对很多棘手的问 题。其中的核心问题有:集群中资源的分配和管理、进程协调调度、持久化和高可用的数据存储,以及故障恢复。
  • 对于这些分布式系统的经典问题,业内已有比较成熟的解决方案和服务。所以 Flink 并不会自己去处理所有的问题,而是利用了现有的集群架构和服务,这样它就可以把精力集中在核心工作——分布式数据流处理上了。Flink 可以配置为独立(Standalone)集群运行,也可以方便地跟一些集群资源管理工具集成使用,比如 YARN、Kubernetes 和 Mesos。Flink 也不会自己去提供持久化的分布式存储,而是直接利用了已有的分布式文件系统(比如 HDFS)或者对象存储(比如 S3)。而对于高可用的配置,Flink 是依靠 Apache ZooKeeper 来完成的。
  • 我们所要重点了解的,就是在 Flink 中有哪些组件、是怎样具体实现一个分布式流处理系统的。如果大家对 Spark 或者 Storm 比较熟悉,那么稍后就会发现,Flink 其实有类似的概念和架构。
  • 4.1.1 整体构成

Flink 的运行时架构中,最重要的就是两大组件:作业管理器(JobManger)和任务管理器(TaskManager)。对于一个提交执行的作业,JobManager 是真正意义上的“管理者(Master),负责管理调度,所以在不考虑高可用的情况下只能有一个;而 TaskManager 是“工作者”(Worker、Slave),负责执行任务处理数据,所以可以有一个或多个。Flink 的作业提交和任务处理时的系统如图 4-1 所示。

这里首先要说明一下“客户端”。其实客户端并不是处理系统的一部分,它只负责作业的提交。具体来说,就是调用程序的 main 方法,将代码转换成“数据流图”(Dataflow Graph),发送给 JobManager。提交之后,任务的执行其实就跟客户端没有关系了;我们可以在客户端选择断开与 JobManager 的连接, 也可以继续保持连接。
之前我们在命令提交作业时,加上的-d 参数,就是表示分离模(detached mode),也就是断开连接。
当然,**客户端可以随时连接到 JobManager,获取当前作业的状态(如:任务执行是否报错之类的等)和执行结果,也可以发送请求取消作业。**我们在上一章中不论通过 Web UI 还是命令行执行“flink run”的相关操作,都是通过客户端实现的。

JobManager在这里会将客户端递交的“数据流图”转换成一个作业(作业图),再对其做一个详细的分析和转换,最终转换成一个可执行的“执行图”,这个可执行的“执行图”就会分发给所有的worker(也就是TaskManager),这里是通过Actor通信系统与TaskManager去做通信,去分发任务执行图等。
JobManager还会做作业的调度和检查点的协调(checkpoint)。
JobManager将所有的任务分发下去之后,所有的事情就都由TaskManager来做了。TaskManager会将所有的任务拆开,放到task slot(任务槽)上去做执行,所以这里也能看得出来,task slot(任务槽)就是执行一个任务所需的最小的一块资源,不同的TaskManager上有多个TaskSlot,也就可以并行执行多个任务,执行的过程中,他们彼此间有数据的交换,也就是说,数据流其实就是在TaskManager之间流动。

JobManager 和 TaskManagers 可以以不同的方式启动:

  • 作为独立(Standalone)集群的进程,直接在机器上启动

  • 在容器中启动

  • 由资源管理平台调度启动,比如 YARN、K8S
    这其实就对应着不同的部署方式。
    TaskManager 启动之后,JobManager 会与它建立连接,并将作业图(JobGraph)转换成可
    执行的“执行图”(ExecutionGraph)分发给可用的 TaskManager,然后就由 TaskManager 具体
    执行任务。接下来,我们就具体介绍一下 JobManger 和 TaskManager 在整个过程中扮演的角色。

  • 这里首先要说明一下“客户端”。其实客户端并不是处理系统的一部分,它只负责作业的提交。具体来说,就是调用程序的 main 方法,将代码转换成“数据流图”(Dataflow Graph),并最终生成作业图(JobGraph),一并发送给 JobManager。提交之后,任务的执行其实就跟客户端没有关系了;我们可以在客户端选择断开与 JobManager 的连接, 也可以继续保持连接。之前我们在命令提交作业时,加上的-d 参数,就是表示分离模式(detached mode),也就是断开连接。

  • 当然,客户端可以随时连接到 JobManager,获取当前作业的状态和执行结果,也可以发送请求取消作业。我们在上一章中不论通过 Web UI 还是命令行执行“flink run”的相关操作,都是通过客户端实现的。

  • JobManager 和 TaskManagers 可以以不同的方式启动:

  1. 作为独立(Standalone)集群的进程,直接在机器上启动
  2. 在容器中启动
  3. 由资源管理平台调度启动,比如 YARN、K8S
  • 这其实就对应着不同的部署方式。
  • TaskManager 启动之后,JobManager 会与它建立连接,并将作业图(JobGraph)转换成可执行的“执行图”(ExecutionGraph)分发给可用的 TaskManager,然后就由 TaskManager 具体执行任务。接下来,我们就具体介绍一下 JobManger 和 TaskManager 在整个过程中扮演的角色。

4.1.2 作业管理器(JobManager)

JobManager 是一个 Flink 集群中任务管理和调度的核心,是控制应用执行的主进程。也就是说,每个应用都应该被唯一的 JobManager 所控制执行。当然,在高可用(HA)的场景下,可能会出现多个 JobManager;这时只有一个是正在运行的领导节点(leader),其他都是备用节点(standby)
JobManger 又包含 3 个不同的组件,下面我们一一讲解。

  1. JobMaster
  • JobMaster 是 JobManager 中最核心的组件负责处理单独的作业(Job)。所以 JobMaster和具体的 Job 是一一对应的,多个 Job 可以同时运行在一个 Flink 集群中, 每个 Job 都有一个自己的 JobMaster。需要注意在早期版本的 Flink 中,没有 JobMaster 的概念;而JobManager的概念范围较小,实际指的就是现在所说的 JobMaster。
  • 在作业提交时,JobMaster 会先接收到要执行的应用。这里所说“应用”一般是客户端提交来的,包括:Jar 包,数据流图(dataflow graph),和作业图(JobGraph)。
  • JobMaster 会把 JobGraph 转换成一个物理层面的数据流图,这个图被叫作“执行图”(ExecutionGraph),它包含了所有可以并发执行的任务。 JobMaster 会向资源管理器(ResourceManager)发出请求,申请执行任务必要的资源。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的 TaskManager 上。
  • 而在运行过程中,JobMaster 会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
  1. 资源管理器(ResourceManager)
  • ResourceManager 主要负责资源的分配和管理,在 Flink 集群中只有一个所谓“资源”,主要是指 TaskManager 的任务槽(task slots),所以ResourceManager分配的就是任务槽(task slots)。任务槽就是 Flink 集群中的资源调配单元,包含了机器用来执行计算的一组 CPU 和内存资源。每一个任务(Task)都需要分配到一个 slot 上执行。
  • 这里注意要把 Flink 内置的 ResourceManager 和其他资源管理平台(比如 YARN)的ResourceManager 区分开。
  • Flink 的 ResourceManager,针对不同的环境和资源管理平台(比如 Standalone 部署,或者YARN),有不同的具体实现。在 Standalone 部署时,因为 TaskManager 是单独启动的(没有Per-Job 模式),所以 ResourceManager 只能分发可用 TaskManager 的任务槽,不能单独启动新TaskManager。
  • 而在有资源管理平台时,就不受此限制。当新的作业申请资源时,ResourceManager 会将有空闲槽位的 TaskManager 分配给 JobMaster。如果 ResourceManager 没有足够的任务槽,它还可以向资源提供平台发起会话,请求提供启动 TaskManager 进程的容器。另外,ResourceManager 还负责停掉空闲的 TaskManager,释放计算资源。
  1. 分发器(Dispatcher)
  • Dispatcher 主要负责提供一个 REST 接口,用来提交应用,并且负责为每一个新提交的作业启动一个新的 JobMaster 组件。Dispatcher 也会启动一个 Web UI,用来方便地展示和监控作业执行的信息。Dispatcher 在架构中并不是必需的,在不同的部署模式下可能会被忽略掉。
    Dispatcher其实就是客户端向jobmanager提交任务的时候的一个通信接口组件。

4.1.3 任务管理器(TaskManager)

  • TaskManager 是 Flink 中的工作进程,数据流的具体计算就是它来做的,所以也被称为**“Worker”**。Flink 集群中必须至少有一个 TaskManager;当然由于分布式计算的考虑,通常会有多个 TaskManager 运行,每一个 TaskManager 都包含了一定数量的任务槽(task slots)。Slot是资源调度的最小单位,slot 的数量限制了 TaskManager 能够并行处理的任务数量。
  • 启动之后,TaskManager 会向资源管理器注册它的 slots;收到资源管理器的指令后,TaskManager 就会将一个或者多个槽位提供给 JobMaster 调用,JobMaster 就可以分配任务来执行了。
  • 在执行过程中,TaskManager 可以缓冲数据,还可以跟其他运行同一应用的 TaskManager交换数据。(每个TaskManager也可以有自己的buff,去做数据的缓冲)

4.2 作业提交流程

了解了 Flink 运行时的基本组件和系统架构,我们再来梳理一下作业提交的具体流程。

4.2.1 高层级抽象视角

Flink 的提交流程,随着部署模式、资源管理平台的不同,会有不同的变化。首先我们从一个高层级的视角(就是看一下他大概是怎么个工作流程),来做一下抽象提炼,看一看作业提交时宏观上各组件是怎样交互协作的。

如图 4-2 所示,具体步骤如下:
(1) 一般情况下,由客户端(App)通过分发器提供的 REST 接口,将作业提交给JobManager。
(2)由分发器启动 JobMaster
(这里是有作业提交之后才启动JobMaster),并将作业(包含 JobGraph)提交给 JobMaster。
(3)JobMaster 将 JobGraph 解析为可执行的 ExecutionGraph,得到所需的资源数量,然后向资源管理器(ResourceManager)请求资源(slots)。
(4)资源管理器(ResourceManager)判断当前是否由足够的可用资源;如果没有,启动新的 TaskManager。
(5)TaskManager 启动之后,向 ResourceManager 注册自己的可用任务槽(slots)。
(6)资源管理器通知 TaskManager 为新的作业提供 slots(注意,这里TaskManager有的话就直接用(),没有的话就走4和5先去启动TaskManager)。
(7)TaskManager 连接到对应的 JobMaster,提供 slots。
(8)JobMaster 将需要执行的任务分发给 TaskManager。
(9)TaskManager 执行任务,互相之间可以交换数据。

如果部署模式不同,或者集群环境不同(例如 Standalone、YARN、K8S 等),其中一些步骤可能会不同或被省略,也可能有些组件会运行在同一个 JVM 进程中。比如我们在上一章实践过的独立集群环境的会话模式,就是需要先启动集群,如果资源不够,只能等待资源释放,而不会直接启动新的 TaskManager。
接下来我们就具体介绍一下不同部署环境下的提交流程。

4.2.2 独立模式(Standalone)

在独立模式(Standalone)下,只有会话模式和应用模式两种部署方式。两者整体来看流程是非常相似的:TaskManager 都需要手动启动,所以当 ResourceManager 收到 JobMaster的请求时,会直接要求 TaskManager 提供资源。而 **JobMaster 的启动时间点,会话模式是预先启动,应用模式则是在作业提交时启动。**提交的整体流程如图 4-3 所示。


我们发现除去第 4 步不会启动 TaskManager,而且直接向已有的 TaskManager 要求资源(这里客户端提交任务时,JobManager是已经提前有了的,是先起动的集群再提交的任务),其他步骤与上一节所讲抽象流程完全一致。

4.2.3 YARN 集群

接下来我们再看一下有资源管理平台时,具体的提交流程。我们以 YARN 为例,分不同的部署模式来做具体说明。

  1. 会话(Session)模式
    在会话模式下,我们需要先启动一个 YARN session,这个会话会创建一个 Flink 集群

    这里只启动了 JobManager,而 TaskManager 可以根据需要动态地启动。在 JobManager 内部,由于还没有提交作业,所以只有 ResourceManager 和 Dispatcher 在运行,如图 4-4 所示。


接下来就是真正提交作业的流程,如图 4-5 所示:
(客户端提交任务,才触发集群的创建)
(1)客户端通过 REST 接口,将作业提交给分发器。
(2)分发器启动 JobMaster,并将作业(包含 JobGraph)提交给 JobMaster。
(3)JobMaster 向资源管理器请求资源(slots)。
(4)资源管理器向 YARN 的资源管理器请求 container 资源。
(5)YARN 启动新的 TaskManager 容器。
(6)TaskManager 启动之后,向 Flink 的资源管理器注册自己的可用任务槽。
(7)资源管理器通知 TaskManager 为新的作业提供 slots。
(8)TaskManager 连接到对应的 JobMaster,提供 slots。
(9)JobMaster 将需要执行的任务分发给 TaskManager,执行任务。
可见,整个流程除了请求资源时要“上报”YARN 的资源管理器,其他与 4.2.1 节所述抽象流程几乎完全一样。

  1. 单作业(Per-Job)模式
    在单作业模式下,Flink 集群不会预先启动,而是在提交作业时,才启动新的 JobManager。
    具体流程如图 4-6 所示。


(1)客户端将作业提交给 YARN 的资源管理器,这一步中会同时将 Flink 的 Jar 包和配置上传到 HDFS,以便后续启动 Flink 相关组件的容器。
(2)YARN 的资源管理器分配 Container 资源,启动 Flink JobManager,并将作业提交给
JobMaster。这里省略了 Dispatcher 组件。
(3)JobMaster 向资源管理器请求资源(slots)。
(4)资源管理器向 YARN 的资源管理器请求 container 资源。
(5)YARN 启动新的 TaskManager 容器。
(6)TaskManager 启动之后,向 Flink 的资源管理器注册自己的可用任务槽。
(7)资源管理器通知 TaskManager 为新的作业提供 slots。
(8)TaskManager 连接到对应的 JobMaster,提供 slots。
(9)JobMaster 将需要执行的任务分发给 TaskManager,执行任务。
可见,区别只在于 JobManager 的启动方式,以及省去了分发器。当第 2 步作业提交给JobMaster,之后的流程就与会话模式完全一样了。

  1. 应用(Application)模式
    应用模式与单作业模式的提交流程非常相似,只是初始提交给 YARN 资源管理器的不再是具体的作业,而是整个应用。一个应用中可能包含了多个作业,这些作业都将在 Flink 集群中启动各自对应的 JobMaster。

4.3 一些重要概念

4.3.1 数据流图(Dataflow Graph)

观察我们之前的代码,发现Flink其实是一个输入,转换,输出的一个过程

  • Flink 是流式计算框架。它的程序结构,其实就是定义了一连串的处理操作,每一个数据输入之后都会依次调用每一步计算。在 Flink 代码中,我们定义的每一个处理转换操作都叫作“算子”(Operator),所以我们的程序可以看作是一串算子构成的管道,数据则像水流一样有序地流过。比如在之前的 WordCount 代码中,基于执行环境调用的socketTextStream()方法,就是一个读取文本流的算子;而后面的flatMap()方法,则是将字符串数据进行分词、转换成二元组的算子。
  • 所有的 Flink 程序都可以归纳为由三部分构成:Source、Transformation 和 Sink。
    1. Source 表示“源算子”,负责读取数据源。
    2. Transformation 表示“转换算子”,利用各种算子进行处理加工。
    3. Sink 表示“下沉算子”,负责数据的输出。
  • 在运行时,Flink 程序会被映射成所有算子按照逻辑顺序连接在一起的一张图,这被称为“逻辑数据流”(logical dataflow),或者叫“数据流图”(dataflow graph)(他的底层就是一个数据流(dataflow))。我们提交作业之后,打开 Flink 自带的 Web UI,点击作业就能看到对应的 dataflow,如图 4-7 所示。在数据流图中,可以清楚地看到 Source、Transformation、Sink 三部分。
  • 数据流图类似于任意的有向无环图(DAG),这一点与 Spark 等其他框架是一致的。图中的每一条数据流(dataflow)以一个或多个 source 算子开始,以一个或多个 sink 算子结束。
  • 在大部分情况下,dataflow 中的算子,和程序中的转换运算是一一对应的关系。那是不是说,我们代码中基于 DataStream API 的每一个方法调用,都是一个算子呢?
  • 并非如此。除了 Source 读取数据和 Sink 输出数据,一个中间的转换算子(Transformation Operator)必须是一个转换处理的操作而在代码中有一些方法调用,数据是没有完成转换的。可能只是对属性做了一个设置(比如 .KeyBy().sum(),这个地方keyby就只是对数据的分区做了一个规划,并没有对数据进行转换处理,它要和后面的.sum()合起来才算一个转换操作),也可能定义的是数据的传递方式而非转换,又或者是需要几个方法合在一起才能表达一个完整的转换操作。例如,在之前的代码中,我们用到了定义分组的方法 keyBy,它就只是一个数据分区操作,而并不是一个算子。事实上,代码中我们可以看到调用其他转换操作之后返回的数据类型是 SingleOutputStreamOperator,说明这是一个算子操作;而 keyBy 之后返回的数据类型是 KeyedStream。感兴趣的读者也可以自行提交任务在 Web UI 中查看。
    这里可以简单的理解为,每一次转换,可以看作是一次算子

4.3.2 并行度(Parallelism)

我们已经清楚了算子和数据流图的概念,那最终执行的任务又是什么呢?容易想到,一个算子操作就应该是一个任务。那是不是程序中的算子数量,就是最终执行的任务数呢?

  1. 什么是并行计算
  • 要解答这个问题,我们需要先梳理一下其他框架分配任务、数据处理的过程。对于 Spark而言,是把根据程序生成的 DAG 划分阶段(stage)、进而分配任务的。而对于 Flink 这样的流式引擎,其实没有划分 stage 的必要。因为数据是连续不断到来的,我们完全可以按照数据流图建立一个“流水线”,前一个操作处理完成,就发往处理下一步操作的节点。如果说 Spark基于 MapReduce 架构的思想是“数据不动代码动”,那么 Flink 就类似“代码不动数据流动”,原因就在于流式数据本身是连续到来的、我们不会同时传输所有数据,这其实是更符合数据流本身特点的处理方式。
  • 在大数据场景下,我们都是依靠分布式架构做并行计算,从而提高数据吞吐量的。既然处理完一个操作就可以把数据发往别处,那我们就可以将不同的算子操作任务,分配到不同的节点上执行了。这样就对任务做了分摊,实现了并行处理。
    **- 但是仔细分析会发现,这种“并行”其实并不彻底。因为算子之间是有执行顺序的,对一条数据来说必须依次执行;而一个算子在同一时刻只能处理一个数据。**比如之前 WordCount,一条数据到来之后,我们必须先用 source 算子读进来、再做 flatMap 转换;一条数据被 source读入的同时,之前的数据可能正在被 flatMap 处理,这样不同的算子任务是并行的。但如果多条数据同时到来,一个算子是没有办法同时处理的,我们还是需要等待一条数据处理完、再处理下一条数据——这并没有真正提高吞吐量。
  • 所以相对于上述的“任务并行”,我们真正关心的,是“数据并行”。也就是说,多条数据同时到来,我们应该可以同时读入,同时在不同节点执行 flatMap 操作
  1. 并行子任务和并行度
  • 怎样实现数据并行呢?其实也很简单,我们**把一个算子操作,“复制”多份到多个节点,数据来了之后就可以到其中任意一个执行(实现同一个算子,同时处理多个数据的功能)。**这样一来,一个算子任务就被拆分成了多个并行的“子任务”(subtasks),再将它们分发到不同节点,就真正实现了并行计算。
  • 在 Flink 执行过程中,每一个算子(operator)可以包含一个或多个子任务(operator subtask),这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行。

并行度:当前并行计算中,某一个特定算子,他做并行计算的子任务数,叫并行度,所以并行度其实是针对的是 算子

  • 一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。这样,包含并行子任务的数据流,就是并行数据流,它需要多个分区(stream partition)来分配并行任务。一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。
  • 如图 4-8 所示,当前数据流中有 source、map、window、sink 四个算子,除最后 sink,其他算子的并行度都为 2。整个程序包含了 7 个子任务,至少需要 2 个分区来并行执行。我们可以说,这段流处理程序的并行度就是 2。
  1. 并行度的设置
    在 Flink 中,可以用不同的方法来设置并行度,它们的有效范围和优先级别也是不同的。

(1)代码中设置

  • 我们在代码中,可以很简单地在算子后跟着调用 setParallelism()方法,来设置当前算子的并行度:
stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);
  • 这种方式设置的并行度,只针对当前算子有效。
  • 另外,我们也可以直接调用执行环境的 setParallelism()方法,全局设定并行度:
env.setParallelism(2);
  • 这样代码中所有算子,默认的并行度就都为 2 了。我们一般不会在程序中设置全局并行度,因为如果在程序中对全局并行度进行硬编码,会导致无法动态扩容。这里要注意的是,由于 keyBy 不是算子,所以无法对 keyBy 设置并行度。

(2)提交应用时设置

  • 在使用 flink run 命令提交应用时,可以增加-p 参数来指定当前应用程序执行的并行度,它的作用类似于执行环境的全局设置:
bin/flink run –p 2 –c online.chenyunde.flink.StreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar
  • 如果我们直接在 Web UI 上提交作业,也可以在对应输入框中直接添加并行度。

(3)配置文件中设置
我们还可以直接在集群的配置文件 flink-conf.yaml 中直接更改默认并行度:

parallelism.default: 2
  • 这个设置对于整个集群上提交的所有作业有效,初始值为 1。无论在代码中设置、还是提交时的-p 参数,都不是必须的;所以在没有指定并行度的时候,就会采用配置文件中的集群默认并行度。在开发环境中,没有配置文件,默认并行度就是当前机器的 CPU 核心数。这也就解释了为什么我们在第二章运行 WordCount 流处理程序时,会看到结果前有 1~4 的分区编号——运行程序的电脑是 4 核 CPU,那么开发环境默认的并行度就是 4。

  • 我们可以总结一下所有的并行度设置方法,它们的优先级如下:

  1. 对于一个算子,首先看在代码中是否单独指定了它的并行度,这个特定的设置优先级最高,会覆盖后面所有的设置。
  2. 如果没有单独设置,那么采用当前代码中执行环境全局设置的并行度。
  3. 如果代码中完全没有设置,那么采用提交时-p 参数指定的并行度。
  4. 如果提交时也未指定-p 参数,那么采用集群配置文件中的默认并行度。
  • 这里需要说明的是,算子的并行度有时会受到自身具体实现的影响。比如之前我们用到的读取 socket 文本流的算子 socketTextStream,它本身就是非并行的 Source 算子,所以无论怎么设置,它在运行时的并行度都是 1,对应在数据流图上就只有一个并行子任务。这一点大家可以自行在 Web UI 上查看验证。
  • 那么实践中怎样设置并行度比较好呢?那就是在代码中只针对算子设置并行度,不设置全局并行度,这样方便我们提交作业时进行动态扩容。

4.3.3 算子链(Operator Chain)

(并行度相同并且是one to one操作的两个算子就可以合并成一个任务,分配同一块资源,就在同一块内存中操作,也就避免了资源的传输和资源的浪费。)

关于“一个作业有多少任务”这个问题,现在已经基本解决了。但如果我们仔细观察 Web UI 上给出的图,如图 4-9 所示,上面的节点似乎跟代码中的算子又不是一一对应的。


很明显,这里的一个节点,会把转换处理的很多个任务都连接在一起,合并成了一个“大任务”。这又是怎么回事呢?

  1. 算子间的数据传输
    回到上一小节的例子,我们先来考察一下算子任务之间数据传输的方式。

    如图 4-10 所示,一个数据流在算子之间传输数据的形式可以是一对一(one-to-one)的直通 (forwarding)模式,也可以是打乱的重分区(redistributing)模式,具体是哪一种形式,取决于算子的种类。
    (1)一对一(One-to-one,forwarding)
  • 这种模式下,数据流维护着分区以及元素的顺序。比如图中的 source 和 map 算子,source算子读取数据之后,可以直接发送给 map 算子做处理,它们之间不需要重新分区,也不需要调整数据的顺序。这就意味着 map 算子的子任务,看到的元素个数和顺序跟 source 算子的子任务产生的完全一样,保证着“一对一”的关系。map、filter、flatMap 等算子都是这种one-to-one的对应关系。
    这种关系类似于 Spark 中的窄依赖。

(2)重分区(Redistributing)

  • 在这种模式下,数据流的分区会发生改变。比图中的 map 和后面的 keyBy/window 算子之间(这里的 keyBy 是数据传输算子,后面的 window、apply 方法共同构成了 window 算子),以及 keyBy/window 算子和 Sink 算子之间,都是这样的关系。
    每一个算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务。例如,keyBy()是分组操作,本质上基于键(key)的哈希值(hashCode)进行了重分区;而当并行度改变时,比如从并行度为 2 的 window 算子,要传递到并行度为 1 的 Sink 算子,这时的数据传输方式是再平衡(rebalance),会把数据均匀地向下游子任务分发出去。这些传输方式都会引起重分区(redistribute)的过程,这一过程类似于 Spark 中的 shuffle。
    总体说来,这种算子间的关系类似于 Spark 中的宽依赖。
  1. 合并算子链
    在 Flink 中,并行度相同的一对一(one to one)算子操作(两者都具备,就可以合并),可以直接链接在一起形成一个“大”的任务(task),这样原来的算子就成为了真正任务里的一部分,如图 4-11 所示。每个 task会被一个线程执行。这样的技术被称为“算子链”(Operator Chain)。
    (合并操作实际上就是介绍了资源的开销)

  • 比如在图 4-11 中的例子中,Source 和 map 之间满足了算子链的要求,所以可以直接合并在一起,形成了一个任务;因为并行度为 2,所以合并后的任务也有两个并行子任务。这样,这个数据流图所表示的作业最终会有 5 个任务,由 5 个线程并行执行。
  • Flink 为什么要有算子链这样一个设计呢?这是因为将算子链接成 task 是非常有效的优化:可以减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。
  • Flink 默认会按照算子链的原则进行链接合并,如果我们想要禁止合并或者自行定义,也可以在代码中对算子做一些特定的设置:
// 禁用算子链
.map(word -> Tuple2.of(word, 1L)).disableChaining();
// 从当前算子开始新链
.map(word -> Tuple2.of(word, 1L)).startNewChain()

4.3.4 作业图(JobGraph)与执行图(ExecutionGraph)

  • 至此,我们已经彻底了解了由代码生成任务的过程,现在来做个梳理总结。
  • 由 Flink 程序直接映射成的数据流图(dataflow graph),也被称为逻辑流图(logical StreamGraph),因为它们表示的是计算逻辑的高级视图。到具体执行环节时,我们还要考虑并行子任务的分配、数据在任务间的传输,以及合并算子链的优化。为了说明最终应该怎样执行一个流处理程序,Flink 需要将逻辑流图进行解析,转换为物理数据流图。
  • 在这个转换过程中,有几个不同的阶段,会生成不同层级的图,其中最重要的就是作业图(JobGraph)和执行图(ExecutionGraph)。Flink 中任务调度执行的图,按照生成顺序可以分成四层:

逻辑流图(StreamGraph)→ 作业图(JobGraph)→ 执行图(ExecutionGraph)→ 物理
图(Physical Graph)

  • 我们可以回忆一下之前处理 socket 文本流的 StreamWordCount 程序:
env.socketTextStream().flatMap(…).keyBy(0).sum(1).print();

如果提交时设置并行度为 2:

bin/flink run –p 2 –c online.chenyunde.flink.StreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar

那么根据之前的分析,除了 socketTextStream()是非并行的 Source 算子,它的并行度始终为 1,其他算子的并行度都为 2。

接下来我们分析一下程序**对应四层调度图(每层都是DAG,有向无环图)**的演变过程,如图 4-12 所示。

  1. 逻辑流图(StreamGraph)
  • 这是根据用户通过 DataStream API 编写的代码生成的最初的 DAG 图,用来表示程序的拓扑结构。这一步一般在客户端完成。
  • 我们可以看到,逻辑流图中的节点,完全对应着代码中的四步算子操作:
  • 源算子 Source(socketTextStream())→扁平映射算子 Flat Map(flatMap()) →分组聚合算子Keyed Aggregation(keyBy/sum()) →输出算子 Sink(print())。
  1. 作业图(JobGraph)
  • StreamGraph 经过优化后生成的就是作业图(JobGraph),这是提交给 JobManager 的数据结构,确定了当前作业中所有任务的划分。主要的优化为: 将多个符合条件的节点链接在一起合并成一个任务节点,形成算子链,这样可以减少数据交换的消耗。JobGraph 一般也是在客户端生成的,在作业提交时传递给 JobMaster。
  • 在图 4-12 中,分组聚合算子(Keyed Aggregation)和输出算子 Sink(print)并行度都为 2,而且是一对一的关系,满足算子链的要求,所以会合并在一起,成为一个任务节点(算子链)。
  1. 执行图(ExecutionGraph)
  • JobMaster 收到 JobGraph 后,会根据它来生成执行图(ExecutionGraph)。ExecutionGraph是 JobGraph 的并行化版本,是调度层最核心的数据结构。
  • 从图 4-12 中可以看到,与 JobGraph 最大的区别就是按照并行度对并行子任务进行了拆分,并明确了任务间数据传输的方式。
  1. 物理图(Physical Graph)
  • JobMaster 生成执行图后, 会将它分发给 TaskManager;各个 TaskManager 会根据执行图部署任务,最终的物理执行过程也会形成一张“图”,一般就叫作物理图(Physical Graph)。这只是具体执行层面的图,并不是一个具体的数据结构。
  • 对应在上图 4-12 中,物理图主要就是在执行图的基础上,进一步确定数据存放的位置和收发的具体方式。有了物理图,TaskManager 就可以对传递来的数据进行处理计算了。
  • 所以我们可以看到,程序里定义了四个算子操作:源(Source)->转换(flatMap)->分组聚合(keyBy/sum)->输出(print);合并算子链进行优化之后,就只有三个任务节点了;再考虑并行度后,一共有 5 个并行子任务,最终需要 5 个线程来执行。

4.3.5 任务(Tasks)和任务槽(Task Slots)

通过前几小节的介绍,我们对任务的生成和分配已经非常清楚了。上一小节中我们最终得到结论:作业划分为 5 个并行子任务,需要 5 个线程并行执行。那在我们将应用提交到 Flink集群之后,到底需要占用多少资源呢?是否需要 5 个 TaskManager 来运行呢?

  1. 任务槽(Task Slots)
  • 之前已经提到过,Flink 中每一个 worker(也就是 TaskManager)都是一个 JVM 进程,它可以启动多个独立的线程,来并行执行多个子任务(subtask)。
  • 所以如果想要执行 5 个任务,并不一定非要 5 个 TaskManager,我们可以让 TaskManager多线程执行任务。如果可以同时运行 5 个线程,那么只要一个 TaskManager 就可以满足我们之前程序的运行需求了。
  • 很显然,TaskManager 的计算资源是有限的,并不是所有任务都可以放在一个TaskManager上并行执行。并行的任务越多,每个线程的资源就会越少。那一个 TaskManager 到底能并行处理多少个任务呢?为了控制并发量,我们需要在 TaskManager 上对每个任务运行所占用的资源做出明确的划分,这就是所谓的任务槽(task slots)。
  • slot 的概念其实在分布式框架中并不陌生。所谓的“槽”是一种形象的表达。如果大家见过传说中的“卡带式游戏机”,就会对它有更直观的认识:游戏机上的卡槽提供了可以运行游戏的接口和资源,我们把游戏卡带插入卡槽,就可以占用游戏机的计算资源,执行卡带中的游戏程序了。一台经典的小霸王游戏机(如图 4-13)一般只有一个卡槽,而在 TaskManager 中,我们可以设置多个 slot,只要插入“卡带”——也就是分配好的任务,就可以并行执行了。
    每个任务槽(task slot)其实表示了 TaskManager 拥有计算资源的一个固定大小的子集。这些资源就是用来独立执行一个子任务的。

    假如一个 TaskManager 有三个 slot,那么它会将管理的内存平均分成三份,每个 slot 独自占据一份。这样一来,我们在 slot 上执行一个子任务时,相当于划定了一块内存“专款专用”,就不需要跟来自其他作业的任务去竞争内存资源了。所以现在我们只要 2 个 TaskManager,就可以并行处理分配好的 5 个任务了,如图 4-14 所示。
  1. 任务槽数量的设置
  • 我们可以通过集群的配置文件来设定 TaskManager 的 slot 数量:
 taskmanager.numberOfTaskSlots: 8
  • 通过调整 slot 的数量,我们就可以控制子任务之间的隔离级别。

  • 具体来说,如果一个 TaskManager 只有一个 slot,那将意味着每个任务都会运行在独立的JVM 中(当然,该 JVM 可能是通过一个特定的容器启动的);而一个 TaskManager 设置多个slot 则意味着多个子任务可以共享同一个 JVM。它们的区别在于:前者任务之间完全独立运行,隔离级别更高、彼此间的影响可以降到最小;而后者在同一个 JVM 进程中运行的任务,将共享 TCP 连接和心跳消息,也可能共享数据集和数据结构,这就减少了每个任务的运行开销,在降低隔离级别的同时提升了性能。

  • 需要注意的是,slot 目前仅仅用来隔离内存,不会涉及 CPU 的隔离。在具体应用时,可以将 slot 数量配置为机器的 CPU 核心数,尽量避免不同任务之间对 CPU 的竞争这也是开发环境默认并行度设为机器 CPU 数量的原因

    1. 任务对任务槽的共享
  • 这样看来,一共有多少任务,我们就需要有多少 slot 来并行处理它们。不过实际提交作业进行测试就会发现,我们之前的 WordCount 程序设置并行度为 2 提交,一共有 5 个并行子任务,可集群即使只有 2 个 task slot 也是可以成功提交并运行的。这又是为什么呢?

  • 我们可以基于之前的例子继续扩展。如果我们保持 sink 任务并行度为 1 不变,而作业提交时设置全局并行度为 6,那么前两个任务节点就会各自有 6 个并行子任务,整个流处理程序则有 13 个子任务。那对于 2 个 TaskManager、每个有 3 个 slot 的集群配置来说,还能否正常运行呢?

  • 完全没有问题。这是因为默认情况下,Flink 是允许子任务共享 slot 的。如图 4-15 所示,只要属于同一个作业,那么对于不同任务节点的并行子任务,就可以放到同一个 slot 上执行。所以对于第一个任务节点 source→map,它的 6 个并行子任务必须分到不同的 slot 上(如果在同一 slot 就没法数据并行了),而第二个任务节点 keyBy/window/apply 的并行子任务却可以和第一个任务节点共享 slot。

  • 于是最终结果就变成了:每个任务节点的并行子任务一字排开,占据不同的 slot;而不同的任务节点的子任务可以共享 slot。一个 slot 中,可以将程序处理的所有任务都放在这里执行,我们把它叫作保存了整个作业的运行管道(pipeline)。

  • 这个特性看起来有点奇怪:我们不是希望并行处理、任务之间相互隔离吗,为什么这里又允许共享 slot 呢?

  • 我们知道,一个 slot 对应了一组独立的计算资源。在之前不做共享的时候,每个任务都平等地占据了一个 slot,但其实不同的任务对资源的占用是不同的。例如这里的前两个任务,source/map 尽管是两个算子合并算子链得到的,但它只是基本的数据读取和简单转换,计算耗时极短,一般也不需要太大的内存空间;而 window 算子所做的窗口操作,往往会涉及大量的数据、状态存储和计算,我们一般把这类任务叫作“资源密集型”(intensive)任务。当它们被平等地分配到独立的 slot 上时,实际运行我们就会发现,大量数据到来时 source/map 和 sink任务很快就可以完成,但 window 任务却耗时很久;于是下游的 sink 任务占据的 slot 就会等待闲置,而上游的 source/map 任务受限于下游的处理能力,也会在快速处理完一部分数据后阻塞对应的资源开始等待(相当于处理背压)。这样资源的利用就出现了极大的不平衡,“忙的忙死,闲的闲死”。

  • 解决这一问题的思路就是允许 slot 共享。当我们将资源密集型和非密集型的任务同时放到一个 slot 中,它们就可以自行分配对资源占用的比例,从而保证最重的活平均分配给所有的 TaskManager。

  • slot 共享另一个好处就是允许我们保存完整的作业管道。这样一来,即使某个 TaskManager出现故障宕机,其他节点也可以完全不受影响,作业的任务可以继续执行。

  • 另外,同一个任务节点的并行子任务是不能共享 slot 的,所以允许 slot 共享之后,运行作业所需的 slot 数量正好就是作业中所有算子并行度的最大值。这样一来,我们考虑当前集群需要配置多少 slot 资源时,就不需要再去详细计算一个作业总共包含多少个并行子任务了,只看最大的并行度就够了。

  • 同一个任务的先后任务是可以共享slot的,也就是说,一个slot上可以有一个完整的任务执行流程。

  • 当然,Flink 默认是允许 slot 共享的,如果希望某个算子对应的任务完全独占一个 slot,或者只有某一部分算子共享 slot,我们也可以通过设置“slot 共享组”(SlotSharingGroup)手动
    指定:

.map(word -> Tuple2.of(word, 1L)).slotSharingGroup(“1”);
  • 这样,只有属于同一个 slot 共享组的子任务,才会开启 slot 共享;不同组之间的任务是完全隔离的,必须分配到不同的 slot 上。在这种场景下,总共需要的 slot 数量,就是各个 slot共享组最大并行度的总和。
  1. 任务槽和并行度的关系
  • 直观上看,slot 就是 TaskManager 为了并行执行任务而设置的,那它和之前讲过的并行度(Parallelism)是不是一回事呢?
  • Slot 和并行度确实都跟程序的并行执行有关,但两者是完全不同的概念。简单来说,task slot 是 静 态 的 概 念 , 是 指 TaskManager 具 有 的 并 发 执 行 能 力() , 可 以 通 过 参 数taskmanager.numberOfTaskSlots 进行配置;而并行度(parallelism)是动态概念,也就是TaskManager 运行程序时实际使用的并发能力,可以通过参数 parallelism.default 进行配置。(这里其实就是说,slot是集群有的资源,我有那么多,可以并行执行那么多任务,但是我 可以不用。而并行度是指我实际运行程序的时候,这个程序的并行执行的子任务数量。这个每个程序不一样,所以说他是动态变化的,而slot是集群固定有的资源的,所以说他是静态的。)换句话说,并行度如果小于等于集群中可用 slot 的总数,程序是可以正常执行的,因为 slot 不一定要全部占用,有十分力气可以只用八分;而如果并行度大于可用 slot 总数,导致超出了并行能力上限,那么心有余力不足,程序就只好等待资源管理器分配更多的资源了。
  • 下面我们再举一个具体的例子。假设一共有 3 个 TaskManager,每一个 TaskManager 中的slot 数量设置为 3 个,那么一共有 9 个 task slot,如图 4-16 所示,表示集群最多能并行执行 9个任务。
  • 而我们定义 WordCount 程序的处理操作是四个转换算子:

source→ flatMap→ reduce→ sink

  • 当所有算子并行度相同时,容易看出 source 和 flatMap 可以合并算子链,于是最终有三个任务节点。
  • 如果我们没有任何并行度设置,而配置文件中默认 parallelism.default=1,那么程序运行的默认并行度为 1,总共有 3 个任务。由于不同算子的任务可以共享任务槽,所以最终占用的 slot 只有 1 个。9 个 slot 只用了 1 个,有 8 个空闲,如图 4-17 中的 Example 1 所示。




  • 如果我们更改默认参数,或者提交作业时设置并行度为 2,那么总共有 6 个任务,共享任务槽之后会占用 2 个 slot,如图 4-18 中 Example 2 所示。同样,就有 7 个 slot 空闲,计算资源没有充分利用。所以可以看到,设置合适的并行度才能提高效率。
  • 那对于这个例子,怎样设置并行度效率最高呢?当然是需要把所有的 slot 都利用起来。考虑到 slot 共享,我们可以直接把并行度设置为 9,这样所有 27 个任务就会完全占用 9 个 slot。这是当前集群资源下能执行的最大并行度,计算资源得到了充分的利用,如图 4-19 中 Example3 所示。
  • 另外再考虑对于某个算子单独设置并行度的场景。例如,如果我们考虑到输出可能是写入文件,那会希望不要并行写入多个文件,就需要设置 sink 算子的并行度为 1。这时其他的算子并行度依然为 9,所以总共会有 19 个子任务。根据 slot 共享的原则,它们最终还是会占用全部的 9 个 slot,而 sink 任务只在其中一个 slot 上执行,如图 4-20 中 Example 4 所示。通过这个例子也可以明确地看到,整个流处理程序的并行度,就应该是所有算子并行度中最大的那个,这代表了运行程序需要的 slot 数量。

4.4 本章总结

  • 在这一章,我们在之前部署运行的基础上,深入介绍了 Flink 的系统架构和不同组件,并进一步针对不同的部署模式详细讲述了作业提交和任务处理的流程。此外,通过展开讲解架构中的一些重要概念,解答了 Flink 任务调度的核心问题,并对分布式流处理架构的设计做了思
    考分析。
  • 本章内容不仅是 Flink 架构知识的学习,更是分布式处理思想的入门。我们可以通过 Flink这样一个经典框架的学习,触摸到分布式架构的底层原理。
  • Flink 流处理架构设计还涉及事件时间、状态管理以及检查点等重要概念,保证分布式流处理系统的低延迟、时间正确性和状态一致性。我们将在后面的章节对这些内容做详细展开。

4.5 一些拓展

从当前算子开始,断掉算子链, .stopChain
从当前算子开始,开始新的算子链 : .startNewChain
将一个任务放到单独的一个slot上去执行(禁止使用slot共享): .slotSharingGroup(“1”) //设置slot共享组,只有在一个共享组里面的算子,才可以共享这个slot
(加上slotSharingGroup 后,集群所需要的资源就会变多,就不仅仅是当前最大的并行度了,而是所有不同共享组里面最大并行度之和)

Flink学习笔记【巨详细!】(二)相关推荐

  1. 【产品经理学习笔记 | 巨详细】2.规划阶段——2.1需求收集:用户访谈和问卷调查方式

    (从规划阶段的需求收集开始,一般不会接触到市场调研) 2 规划阶段 2.1 需求收集 什么是需求? 需求和需要的区别: 需求:某方面未被满足而引发的感受指向具体的事物 需要:指向具体的事物 例: 我需 ...

  2. 【 产品经理学习笔记 | 巨详细】1.1-1.4 初识产品经理

    课程来源:B站(产品经理基础入门(172集全集已更新完毕)) 第一阶段:产品基础 (需求收集.需求管理.需求分析.结构图.流程图.原型.PRD文档.用户画像.后台角色管理-) 产品经理岗位要求的能力和 ...

  3. BizTalk学习笔记系列之二:实例说明如何使用BizTalk

    BizTalk学习笔记系列之二:实例说明如何使用BizTalk --.BizTalk学习笔记系列之二<?XML:NAMESPACE PREFIX = O /> Aaron.Gao,2006 ...

  4. 观察者模式学习笔记(详细)

    观察者模式学习笔记(详细) 一.什么是观察者模式 观察者模式,是定义对象之间的一对多的关系,主要作用是减少对象之间的耦合度,分为两个角色 被观察者:其实就是发布者,发布消息通知所有的观察者 观察者:接 ...

  5. tensorflow学习笔记(三十二):conv2d_transpose (解卷积)

    tensorflow学习笔记(三十二):conv2d_transpose ("解卷积") deconv解卷积,实际是叫做conv_transpose, conv_transpose ...

  6. Windows保护模式学习笔记(十二)—— 控制寄存器

    Windows保护模式学习笔记(十二)-- 控制寄存器 控制寄存器 Cr0寄存器 Cr2寄存器 Cr4寄存器 控制寄存器 描述: 控制寄存器有五个,分别是:Cr0 Cr1 Cr2 Cr3 Cr4 Cr ...

  7. JUC.Condition学习笔记[附详细源码解析]

    JUC.Condition学习笔记[附详细源码解析] 目录 Condition的概念 大体实现流程 I.初始化状态 II.await()操作 III.signal()操作 3个主要方法 Conditi ...

  8. 汇编入门学习笔记 (十二)—— int指令、port

    疯狂的暑假学习之  汇编入门学习笔记 (十二)--  int指令.port 參考: <汇编语言> 王爽 第13.14章 一.int指令 1. int指令引发的中断 int n指令,相当于引 ...

  9. OpenCV学习笔记(十二):边缘检测:Canny(),Sobel(),Laplace(),Scharr滤波器

    OpenCV学习笔记(十二):边缘检测:Canny(),Sobel(),Laplace(),Scharr滤波器 1)滤波:边缘检测的算法主要是基于图像强度的一阶和二阶导数,但导数通常对噪声很敏感,因此 ...

  10. QT学习笔记(十二):透明窗体设置

    QT学习笔记(十二):透明窗体设置 创建 My_Widget 类 基类为QWidget , My_Widget.cpp 源文件中添加代码 #include "widget.h" # ...

最新文章

  1. useradd 命令详解 - [命令操作]
  2. Yarn已过时!Kubeflow实现机器学习调度平台才是未来
  3. python中str和int区别_Python如何比较string和int?
  4. 我的Go语言学习之旅七:创建一个GUI窗体
  5. java生成pdf加密_java使用iText 生成PDF全攻略(表格,加密)
  6. 10 条真心有趣的 Linux 命令
  7. JVM 内存设置大小
  8. 修改Android Studio默认的gradle配置文件
  9. docker-3-常用命令(下)
  10. Eclipse自动生成get和set方法
  11. 使用Blocs For Mac发布网站的方法
  12. unity byte[]的压缩和解压
  13. Spotfire使用经验-自定义饼图中显示的数据量(Top N分析,排名分析)
  14. css td 宽度百分比设置,css怎么设置td的宽度
  15. crontab 问号_轻松搞定crontab和quartz表达式
  16. 01改变世界:没有计算器的日子怎么过——手动时期的计算工具
  17. 浩方对战平台原理初步分析
  18. 安然数据集分析处理_用自然语言处理分析安然会计丑闻
  19. 无法在发生错误时创建会话,请检查 PHP 或网站服务器日志,并正确配置 PHP 安装
  20. 前后端报文传输加密方案

热门文章

  1. Pascal 过程与函数
  2. 球幕投影中内投球和外投球的区别
  3. JVM 直接内存的使用与回收
  4. 深入理解计算机系统 (第 1 节)
  5. 秋招提前批已来,万字长文教你如何增加面试大厂的成功率
  6. TinyOS平台下一些代码的分析
  7. linux怎么添加拼音输入法,ubuntu如何安装中文输入法
  8. Leetcode_49_Anagrams
  9. Visual Studio 2008的使用技巧
  10. iOS开发--开源库