第1章 简介

1.1 概要介绍

Flink on Yarn的HA高可用模式,首先依赖于Yarn自身的高可用机制(ResourceManager高可用),并通过Yarn对JobManager进行管理,当JobManager失效时,Yarn将重新启动JobManager。其次Flink Job在恢复时,需要依赖Checkpoint进行恢复,而Checkpoint的快照依赖于远端的存储:HDFS,所以HDFS也必须是高可用,同时JobManager的元数据信息也依赖于HDFS的高可用(namenode的高可用,和多副本机制),再者JobManager元数据的指针信息要依赖于Zookeeper的高可用。本章重点介绍Flink本身的高可用,其他框架的高可用请参考笔者之前的文章。

1.2 Flink on Yarn的优势

相对于 Standalone 模式,在Yarn 模式下有以下几点好处:

1.资源按需使用,提高集群的资源利用率;

2.任务有优先级,根据优先级运行作业;

3.基于 Yarn 调度系统,能够自动化地处理各个角色的 Failover:

JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控;

如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器;

如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManager。

第2章 Flink on Yarn模式运行的方式

2.1 Per-Job

Per-Job模式:简答的说就是直接run job,每次提交的任务Yarn都会分配一个JobManager,执行完之后整个资源会释放,包括JobManager和TaskManager。

Per-Job模式适合比较大的任务、执行时间比较长的任务。

2.2 Session

Session模式:在Session模式中, Dispatcher 和 ResourceManager 是可以复用的;当执行完Job之后JobManager并不会释放,Session 模式也称为多线程模式,其特点是资源会一直存在不会释放。使用时先启动yarn-session,然后再提交job,每次提交job,也都会分配一个JobManager。
Session模式适合比较小的任务、执行时间比较短的任务。该模式不用频繁的申请资源和释放资源。

注:本章先简单的了解这两种模式和使用上的区别,笔者会在后续的章节中对其原理进行较为详细的剖析。敬请期待!

第3章 集群规划

笔者是在原有的3台机器名为Hadoop10*的机器上安装的Flink,所以这里机器名都是Hadoop100、Hadoop101、Hadoop102。

Hadoop100(Flink)

Hadoop101(Flink)

Hadoop102(Flink)

JobManager

TaskManager

第4章 下载安装

Flink官网:https://flink.apache.org/

大家到官网进行下载,先参考我之前的文章进行环境变量的配置:大数据实操篇 No.9-Flink Standalone模式部署及使用

Flink on Yarn部署使用前一定要先安装好Zookeeper和Hadoop(HDFS和Yarn)。

第5章 部署和使用

5.1 修改Hadoop配置

5.1.1 修改yarn-site.xml

修改hadoop配置文件/etc/hadoop/yarn-site.xml,设置application master重启时,尝试的最大次数。

<property><name>yarn.resourcemanager.am.max-attempts</name><value>10</value><description>The maximum number of application master execution attempts.</description>
</property>

5.2 修改masters

修改conf目录下masters文件:

hadoop100:8081
hadoop101:8081
hadoop102:8081

5.3 修改workers

修改conf目录下workers文件:

hadoop100
hadoop101
hadoop102

接下来,笔者用2种模式单独进行部署演示。

5.4 Per-Job模式

5.4.1 修改flink-conf.yaml

修改flink配置文件/conf/flink-conf.yaml

# jobmanager和taskmanager、其他client的RPC通信IP地址,TaskManager用于连接到JobManager/ResourceManager 。HA模式不用配置此项,在master文件中配置,由zookeeper选出leader与standby
jobmanager.rpc.address: localhost# jobmanager和taskmanager、其他client的RPC通信端口,TaskManager用于连接到JobManager/ResourceManager 。HA模式不用配置此项,在master文件中配置,由zookeeper选出leader与standby
jobmanager.rpc.port: 6123# jobmanager JVM heap 内存大小
jobmanager.memory.process.size: 1024m# taskmanager JVM heap 内存大小
taskmanager.memory.process.size: 1024m# 每个taskmanager提供的任务slots数量
# 并行度等于TM 数量乘以每个TM 的Solts 数量 TM=并行度/Solts数量 如果slots数量大于8 则只会起一个TM
taskmanager.numberOfTaskSlots: 3# 并行计算个数
parallelism.default: 6# 高可用模式
high-availability: zookeeper# JobManager元数据保留在文件系统storageDir中 指向此状态的指针存储在ZooKeeper中
high-availability.storageDir: hdfs://hadoop100:9000/flink/ha/# Zookeeper集群
high-availability.zookeeper.quorum: zookeeper110:2181,zookeeper111:2181,zookeeper112:2181# 在zookeeper下的根目录
high-availability.zookeeper.path.root: /flink_yarn# zookeeper节点下的集群ID 该节点下放置了集群所需的所有协调数据 多个flink集群连接同一套zookeeper集群需要配置各自不同的集群ID,官方建议这个配置最好去掉,因为在 Yarn(以及Mesos)模式下,cluster-id 如果不配置的话,会配置成 Yarn 上的 Application ID ,从而可以保证唯一性。
#high-availability.cluster-id: /default_yarn# 单个flink job重启次数 必须小于等于yarn-site.xml中Application Master配置的尝试次数
yarn.application-attempts: 10#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
# jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend)
state.backend: rocksdb
# 检查点的默认目录。Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录。必须从所有参与的进程/节点(即所有TaskManager和JobManager)访问存储路径。
state.checkpoints.dir: hdfs://hadoop100:9000/flink/checkpoints
# 保存点的默认目录。由状态后端用于将保存点写入文件系统(MemoryStateBackend,FsStateBackend,RocksDBStateBackend)。
state.savepoints.dir: hdfs://hadoop100:9000/flink/savepoints
# 是否应该创建增量检查点。对于增量检查点,只存储前一个检查点的差异,而不存储完整的检查点状态。一些状态后端可能不支持增量检查点并忽略此选项。
state.backend.incremental: false
# jobmanager故障恢复策略,指定作业计算如何从任务失败中恢复 full重新启动所有任务以恢复作业 region重新启动可能受任务失败影响的所有任务,region在目前(flink1.11)只对批处理有效,实时计算任然时full
jobmanager.execution.failover-strategy: region
# 全局检查点的保留数量
state.checkpoints.num-retained: 3
# 本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖keyed state backends。当前,MemoryStateBackend不支持本地恢复。
state.backend.local-recovery: true
# 存储基于文件的状态以进行本地恢复的根目录。本地恢复当前仅涵盖keyed state backends
taskmanager.state.local.root-dirs: /opt/flink-tm-state

修改完所有配置后,注意将配置分发到其他机器上。

注意:Hadoop中的yarn.resourcemanager.am.max-attemps和Flink中的yarn.application-attempts配置含义:

flink job失败重启次数,尝试重新启动9次(9次重试+ 1次初始尝试)
flink job(在yarn中称为application)在Jobmanager(或者叫Application Master)恢复时,允许重启的最大次数。
注意,Flink On Yarn环境中,当Jobmanager(Application Master)失败时,yarn会尝试重启JobManager(AM),重启后,会重新启动Flink的Job(application)。因此,flink中的yarn.application-attempts配置不能超过yarn中的yarn.resourcemanager.am.max-attemps。

5.4.2 运行作业

Flink各版本的运行命令可能存在差异,可以先通过-h查看帮助。

$ bin/flink run -h

运行Job

$ bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar -input hdfs://hadoop100:9000/user/hive/warehouse/covid19count.db/t_infectcount/partdate=2020071301 -output hdfs://hadoop100:9000/flink/wordcount-job-example

5.4.3 查看运行情况

资源已经受Yarn的管理,可以直接从Yarn集群的Web UI界面打开ApplicationMaster,从而进入到Flink的Web UI控制台:

运行中的作业:

运行完成的作业:

运行完成后,JobManager和TaskManager就被释放了,此时Yarn中显示的运行结果如下:

Linux系统中显示的运行信息:

5.5 Session模式

5.5.1 修改flink-conf.yaml

修改flink配置文件/conf/flink-conf.yaml

# jobmanager和taskmanager、其他client的RPC通信IP地址,TaskManager用于连接到JobManager/ResourceManager 。HA模式不用配置此项,在master文件中配置,由zookeeper选出leader与standby
jobmanager.rpc.address: localhost# jobmanager和taskmanager、其他client的RPC通信端口,TaskManager用于连接到JobManager/ResourceManager 。HA模式不用配置此项,在master文件中配置,由zookeeper选出leader与standby
jobmanager.rpc.port: 6123# jobmanager JVM heap 内存大小
jobmanager.memory.process.size: 1024m# taskmanager JVM heap 内存大小
taskmanager.memory.process.size: 1024m# 每个taskmanager提供的任务slots数量
taskmanager.numberOfTaskSlots: 3# 并行计算个数
parallelism.default: 3# 高可用模式
high-availability: zookeeper# JobManager元数据保留在文件系统storageDir中 指向此状态的指针存储在ZooKeeper中
high-availability.storageDir: hdfs://hadoop100:9000/flink/ha/# Zookeeper集群
high-availability.zookeeper.quorum: zookeeper110:2181,zookeeper111:2181,zookeeper112:2181# 在zookeeper下的根目录
high-availability.zookeeper.path.root: /flink_yarn# zookeeper节点下的集群ID 该节点下放置了集群所需的所有协调数据 多个flink集群连接同一套zookeeper集群需要配置各自不同的集群ID,官方建议这个配置最好去掉,因为在 Yarn(以及Mesos)模式下,cluster-id 如果不配置的话,会配置成 Yarn 上的 Application ID ,从而可以保证唯一性。
#high-availability.cluster-id: /default_yarn# 单个flink job重启次数 必须小于等于yarn-site.xml中Application Master配置的尝试次数
yarn.application-attempts: 10#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
# jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend)
state.backend: rocksdb
# 检查点的默认目录。Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录。必须从所有参与的进程/节点(即所有TaskManager和JobManager)访问存储路径。
state.checkpoints.dir: hdfs://hadoop100:9000/flink/checkpoints
# 保存点的默认目录。由状态后端用于将保存点写入文件系统(MemoryStateBackend,FsStateBackend,RocksDBStateBackend)。
state.savepoints.dir: hdfs://hadoop100:9000/flink/savepoints
# 是否应该创建增量检查点。对于增量检查点,只存储前一个检查点的差异,而不存储完整的检查点状态。一些状态后端可能不支持增量检查点并忽略此选项。
state.backend.incremental: false
# jobmanager故障恢复策略,指定作业计算如何从任务失败中恢复 full重新启动所有任务以恢复作业 region重新启动可能受任务失败影响的所有任务,region在目前(flink1.11)只对批处理有效,实时计算任然时full
jobmanager.execution.failover-strategy: region
# 全局检查点的保留数量
state.checkpoints.num-retained: 3
# 本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖keyed state backends。当前,MemoryStateBackend不支持本地恢复。
state.backend.local-recovery: true
# 存储基于文件的状态以进行本地恢复的根目录。本地恢复当前仅涵盖keyed state backends
taskmanager.state.local.root-dirs: /opt/flink-tm-state

修改完所有配置后,注意将配置分发到其他机器上。

5.5.2 运行/停止

这介绍Session的两种运行模式,根据自己的实际业务场景使用:

  • 客户端模式

对于客户端模式而言,可以启动多个yarn session,一个yarn session模式对应一个JobManager,并按照需求提交作业,同一个Session中可以提交多个Flink作业。

  • 分离模式

只能启动一个yarn-sission,如果启动多个,后面的session会一直处于等待,同一个yarn-session中可以提交多个Flink作业,通过-d参数指定,表示即客户端在启动Flink Yarn Session后,就不再属于Yarn Cluster的一部分。

$ bin/yarn-session.sh -n 3 -s 3 -jm 1024 -tm 1024 -d

启动yarn-session前可以先了解一下参数的含义,查看参数说明

$ bin/yarn-session.sh -h

启动yarn-session

$ bin/yarn-session.sh -n 3 -s 3 -jm 1024 -tm 1024

启动yarn-session后,检查进程:

[zihao@zookeeper110 ~]$ xsh jps
-----------zookeeper110-----------
7252 QuorumPeerMain
7958 Jps
-----------zookeeper111-----------
7714 Jps
7130 QuorumPeerMain
-----------zookeeper112-----------
7747 Jps
7116 QuorumPeerMain
-----------hadoop100-----------
8162 DataNode
8549 DFSZKFailoverController
8758 NodeManager
8375 JournalNode
14167 FlinkYarnSessionCli
14375 Jps
8031 NameNode
-----------hadoop101-----------
8002 DataNode
8163 JournalNode
8259 DFSZKFailoverController
7910 NameNode
8838 NodeManager
18536 YarnSessionClusterEntrypoint
18633 Jps
8701 ResourceManager
-----------hadoop102-----------
7666 ResourceManager
7523 JournalNode
10405 Jps
7769 NodeManager
7434 DataNode

YarnSessionClusterEntrypoint可以理解为Flink在Yarn上启动的ApplicationMaster,其内部就运行着三各组件:Dispatcher、ResourceManager和 JobManager。它们是在同一个jvm进程中。

停止Yarn-session
hadoop路径查看yarn上运行的任务

$ bin/yarn application –list

停止对应Application-Id对应的任务

$ bin/yarn application -kill application_1598166743428_0001

向Yarn上提交作业

Yarn-session启动后,界面上会提示一个地址(主机和端口),这个地址就是JobManager(也是ApplicationMaster)的地址。

在提交job的时候,可以在yarn-session启动的机器上直接提交,或者在其他机器上通过-m指定jobmanager的地址进行提交:

$ bin/flink run -m hadoop101:39998 ./examples/batch/WordCount.jar -input hdfs://hadoop100:9000/user/hive/warehouse/covid19count.db/t_infectcount/partdate=2020071301 -output hdfs://hadoop100:9000/flink/wordcount-example

注:笔者这里运行的是wordcount示例,将之前HDFS上已有的数据进行了wordcount分析,并将分析结果输出到HDFS上。

5.5.3 查看运行情况

启动yarn-session后:

 提交作业,作业开始运行:

 作业运行完成:

 

在HDFS中查看运行结果:

第6章 高可用演示

在Flink on Yarn模式中,Session方式的高可用是针对JobManager(ApplicationMaster)的高可用,而session模式中启动yarn-session后,该进程常驻在Yarn中,其高可用依赖于Yarn自身的资源管理机制,在JobManager(ApplicationMaster)挂掉之后,Yarn会在集群中重启该进程。

Per-Job方式,内部复用的是standalone JobManager 进程的HA,本章就不再介绍了,详见笔者上一篇文章:大数据实操篇 No.10-Flink Standalone集群HA高可用部署

6.1 启动yarn-session

先按前几章介绍的,把Zookeeper、HDFS、Yarn、Flink全部启动起来。并启动yarn-session。

此时,我们查看以下进程,发现YarnSessionClusterEntrypoint这个进程启动再hadoop100这台机器上:

[zihao@zookeeper110 apache-zookeeper-3.5.7-bin]$ xsh jps
-----------zookeeper110-----------
7142 QuorumPeerMain
7192 Jps
-----------zookeeper111-----------
7156 Jps
7103 QuorumPeerMain
-----------zookeeper112-----------
7133 Jps
7087 QuorumPeerMain
-----------hadoop100-----------
7872 DFSZKFailoverController
7652 JournalNode
8260 FlinkYarnSessionCli
7285 NameNode
8006 NodeManager
8630 YarnSessionClusterEntrypoint
8712 Jps
7402 DataNode
-----------hadoop101-----------
7217 NameNode
8017 NodeManager
8356 Jps
7579 JournalNode
7867 ResourceManager
7293 DataNode
7869 DFSZKFailoverController
-----------hadoop102-----------
7795 Jps
7319 ResourceManager
7209 JournalNode
7099 DataNode
7439 NodeManager

6.2 查看Flink Web UI

通过Yarn集群管理界面打开Flink Web UI:

6.3 Kill掉ApplicationMaster

Kill掉YarnSessionClusterEntrypoint(ApplicationMaster)进程,此时Flink Web UI界面会短暂的不可用,Yarn正在尝试重新启动ApplicationMaster。重试次数由Yarn配置中的yarn.resourcemanager.am.max-attempts和Flink配置中的yarn.application-attempts决定。

稍等片刻后我们再重新再yarn集群管理界面打开Flink Web UI:

在JobManager的log中我们也能发现这次leader选举的日志信息:

6.4 查看恢复后的进程

再重新查看进程,发现YarnSessionClusterEntrypoint已经重新在hadoop102上启动了:

[zihao@zookeeper110 apache-zookeeper-3.5.7-bin]$ xsh jps
-----------zookeeper110-----------
7237 Jps
7142 QuorumPeerMain
-----------zookeeper111-----------
7188 Jps
7103 QuorumPeerMain
-----------zookeeper112-----------
7165 Jps
7087 QuorumPeerMain
-----------hadoop100-----------
7872 DFSZKFailoverController
7652 JournalNode
8260 FlinkYarnSessionCli
7285 NameNode
8006 NodeManager
7402 DataNode
8875 Jps
-----------hadoop101-----------
7217 NameNode
8017 NodeManager
7579 JournalNode
7867 ResourceManager
7293 DataNode
7869 DFSZKFailoverController
8447 Jps
-----------hadoop102-----------
8084 YarnSessionClusterEntrypoint
7319 ResourceManager
8151 Jps
7209 JournalNode
7099 DataNode
7439 NodeManager

这里就演示完了Flink on Yarn Session模式下,JobManager(ApplicaionMaster)的高可用。

至此,Flink on Yarn集群高可用模式的部署及使用就完成了。 后续的章节我们将开始进入实时计算整条链路的实战环节!


欢迎大家扫描下面的二维码,关注笔者的公众号:知数知理,文章会逐步同步到微信公众号(CSDN依然持续更新),方便更多的朋友进行学习:

最后推荐一个课程给大家,也是笔者在学习Flink的过程中发现的一个优质课程《Apache Flink 知其然,知其所以然》,Apache Flink PMC主讲,欢迎大家扫描二维码,加入钉钉群进行学习交流:

大数据实操篇 No.11-Flink on Yarn集群HA高可用部署及使用相关推荐

  1. Flink on yarn集群HA配置

    1.集群规划 Flink on yarn 的HA其实是利用yarn自己的恢复机制. 在这里需要用到ZK,主要是因为虽然flink-on-yarn cluster HA 依赖于Yarn自己的集群机制,但 ...

  2. Flink on yarn 集群HA 配置

    1. HA 集群环境规划     flink on yarn 的HA 其实是利用yarn 自己的恢复机制.在这需要用到zk,主要是因为虽然flink-on-yarn cluster HA 依赖于Yar ...

  3. 【Flink实战系列】Flink 1.11.1 on yarn 集群搭建教程

    前面一篇博客中已经搭建了flink Standalone的集群,需要的可以进去看一下,今天主要来说一下flink on yarn 集群的搭建以及怎么提交任务,之前搭建Flink on yarn的时候用 ...

  4. 10-Flink集群的高可用(搭建篇补充)

    戳更多文章: 1-Flink入门 2-本地环境搭建&构建第一个Flink应用 3-DataSet API 4-DataSteam API 5-集群部署 6-分布式缓存 7-重启策略 8-Fli ...

  5. flink on yarn集群搭建

    环境需求 CentOS7.5.1804.jdk1.8.0_181.zookeeper3.6.2.hadoop3.2.2.flink1.12.2 关于hadoop的安装细节请查看<hadoop3. ...

  6. 20-400-040-高可用-Flink集群的高可用搭建

    1.视界 概述 JobManager 协调每个 Flink 部署.它负责调度和资源管理. 默认情况下,每个 Flink 集群只有一个 JobManager 实例. 这会产生单点故障(SPOF):如果 ...

  7. 大数据实操篇 No.17-Flink State介绍和使用(Datastream API)

    第1章 Flink State介绍 Flink有两种基本类型的状态:Managed State(托管状态).Raw State(原生状态). Managed State是Flink Runtime托管 ...

  8. 文件服务器高可用群集,fastDFS文件服务器(三):集群和高可用环境篇

    解决两个组轮询存储文件问题 上篇文章中我们搭建了单机版的fastDFS,其中storage有两个group,分别为head和other,我们在测试的时候发现文件每次上传都会到head目录下,如果你希望 ...

  9. 2021年大数据Flink(五):Standalone-HA高可用集群模式

    目录 Standalone-HA高可用集群模式 原理 操作 1.集群规划 2.启动ZooKeeper 3.启动HDFS 4.停止Flink集群 5.修改flink-conf.yaml 6.修改mast ...

最新文章

  1. Nginx 挂了怎么办?怎么实现高可用?
  2. 天翼云从业认证课后习题(第四章云场景化解决方案综合应用)
  3. [故障公告]14:40-15:00博客站点web服务器雪崩似的CPU 100%
  4. 搜索文献_【大牛经验分享】如何高效快捷搜索文献?
  5. 学校门口的树C语言算法,C语言校园导游程序设计汇报.doc
  6. 币氪研报|BNB(Binance Coin)
  7. 传感器信号处理仿真实验(c语言实现),均值滤波,滑动滤波
  8. 4月份全球新注册39.2万辆电动汽车 榜首并非Model 3
  9. 基于springboot+vue的旅游信息(旅游线路)网站(前后端分离)
  10. php楼梯有n级台阶,楼梯问题的一些解决方法
  11. 【MDCC 2016】产品与设计峰会现场实录(下)
  12. 米扑代理:爬虫代理IP哪家好
  13. Linux平台提取DSDT,关于DSDT修改-提取软件以及使用方法【详解】
  14. 9.Rust错误处理
  15. 基于Simulink的高速跳频通信系统抗干扰性能
  16. 解决 还原SqlServer时提示文件正在使用
  17. vue .sync 用法
  18. 盲源分离(BSS, Blind Source Separation)
  19. 工程流体力学笔记暂记21 (几种简单的平面势流及其叠加)
  20. 由国内到国外 我的个人软件推广成功之路

热门文章

  1. 检测计量与计算机技术,测试计量技术及仪器
  2. 计算机软著评职称,软著对评副高级职称的作用
  3. 共享系统开发异业联盟怎么做?
  4. Xiyou linux 2017 面试题基本知识点
  5. 小学算术运算测试c语言,用C语言制作一个小学算术运算测试
  6. matlab生成exe执行doc太快,matlab编写的程序生成exe可执行文件的方法.doc
  7. win7 开机自动登录设置
  8. \r \r\n \t的区别,是什么意思
  9. 集成墙面室内装修 四海八荒美出新高度
  10. 10分钟带你了解HTML5