04_Flink-HA高可用、Standalone集群模式、Flink-Standalone集群重要参数详解、集群节点重启及扩容、启动组件、Flink on Yarn、启动命令等
1.4.Flink集群安装部署standalone+yarn
1.4.1.Standalone集群模式
1.4.2.Flink-Standalone集群重要参数详解
1.4.3.集群节点重启及扩容
1.4.3.1.启动jobmanager
1.4.3.2.启动taskmanger
1.4.3.3.Flink standalone集群中job的容错
1.4.4.Flink on Yarn
1.4.4.1.原理介绍
1.4.4.2.FLINK on yarn集群部署
1.4.5.Flink on Yarn的两种运行方式
1.4.5.1.第一种[yarn-session.sh(开辟资源)+flink run(提交任务)]
1.4.5.2.第二种[flink run -m yarn-cluster(开辟资源 + 提交任务)]
1.4.5.3…/bin/yarn-session.sh命令分析
1.4.5.4./bin/flink run命令分析
1.4.5.5.Flink在yarn上的分布
1.4.5.6.Flink on yarn内部实现
1.4.Flink集群安装部署standalone+yarn
1.4.1.Standalone集群模式
Standalone集群架构展示:Client客户端提交任务给JobManager,JobManager负责Flink集群计算资源管理,并分发任务给TaskManager执行,TaskManager定期向JobManager汇报状态。
依赖环境
Jdk1.8及以上[配置JAVA_HOME环境变量]
ssh免密登录[集群内节点之间免密登录]
集群规划
master(JobManager) + slave/worker(TaskManager)
hadoop4(master)
hadoop5(salve)
hadoop6(salve)
集群安装
A:修改conf/flink-conf.yaml
jobmanager.rpc.address: hadoop4
taskmanager.numberOfTaskSlots: 32 (由于服务器上的CPU核数是32核,所以,此处写成32)
B:修改conf/masters文件内容:
hadoop4:8081
C:修改conf/workers (这里将三台机器都作为worker节点使用)
hadoop4
hadoop5
hadoop6
D:拷贝到其它节点
cd /home/admin/installed/flink-1.11.1 (在hadoop4机器上)
scp -r * root@hadoop5:$PWD
scp -r * root@hadoop6:$PWD
E:在hadoop4节点启动
[root@hadoop4 flink-1.11.1]# pwd
/home/admin/installed/flink-1.11.1
[root@hadoop4 flink-1.11.1]# bin/start-cluster.sh
F:访问集群:http://xxx.xxx.xxxx.xxx:8081/#/overview
1.4.2.Flink-Standalone集群重要参数详解
jobmanager.memory.process.size: 1600m The total process memory size for the JobManager.
taskmanager.memory.process.size: 1728m The total process memory size for the TaskManager
taskmanager.numberOfTaskSlots: 20 每台机器上可用CPU数量
parallelism.default: 1 The parallelism used for programs that did not specify and other parallelism
slot和parallelism总结
1.slot是静态的概念,是指taskmanager具有的并发执行能力。
2.parallelism是动态的概念,是指程序运行时实际使用的并发能力。
3.设置合适的parallelism能提高运算效率,太多了和太少了都不行。
1.4.3.集群节点重启及扩容
1.4.3.1.启动jobmanager
如果集群中的jobmanager进程挂了,执行下面命令启动。
bin/jobmanager.sh start
bin/jobmanager.sh stop
1.4.3.2.启动taskmanger
添加新的taskmanager节点或者重启taskmanager节点
bin/taskmanager.sh start
bin/taskmanager.sh stop
1.4.3.3.Flink standalone集群中job的容错
jobmanager挂掉
正在执行的任务会失败
存在单点故障,(Flink支持HA)
taskmanager挂掉
如果有多余的taskmanger节点,flink会自动把任务调度到其它节点执行。
1.4.4.Flink on Yarn
FLINK on yarn模式的原理是依靠YARN来调度FLINK任务,目前在企业中使用较多。这种模式的好处是可以充分利用集群资源,提高集群机器的利用率,并且只需要1套Hadoop集群,就可以执行MapReduce、Spark和FLINK任务,操作非常方便,运维方面也很轻松。
1.4.4.1.原理介绍
1)当启动一个新的 Flink YARN Client 会话时,客户端首先会检查所请求的资源(容器和内存)是否可用。之后,它会上传 Flink 配置和 JAR 文件到 HDFS。。
2)客 户 端 请 求 一个 YARN 容 器 启动 ApplicationMaster 。 JobManager 和ApplicationMaster(AM)运行在同一个容器中,一旦它们成功地启动了,AM 就能够知道JobManager 的地址,它会为 TaskManager 生成一个新的 Flink 配置文件(这样它才能连上 JobManager),该文件也同样会被上传到 HDFS。另外,AM 容器还提供了 Flink 的Web 界面服务。Flink 用来提供服务的端口是由用户和应用程序 ID 作为偏移配置的,这使得用户能够并行执行多个 YARN 会话。
3)之后,AM 开始为 Flink 的 TaskManager 分配容器(Container),从 HDFS 下载 JAR 文件和修改过的配置文件。一旦这些步骤完成了,Flink 就安装完成并准备接受任务了。
Flink n on n Yarn 模式在使用的时候又可以分为两Session-Cluster和Per-Job-Cluster
Session-Cluster
这种模式是在 YARN 中提前初始化一个 Flink 集群(称为 Flinkyarn-session),开辟指定的资源,以后的 Flink 任务都提交到这里。这个 Flink 集群会常驻在 YARN 集群中,除非手工停止。这种方式创建的 Flink 集群会独占资源,不管有没有 Flink 任务在执行,YARN 上面的其他任务都无法使用这些资源。
Per-Job-Cluster
这种模式,每次提交 Flink 任务都会创建一个新的 Flink 集群,每个 Flink 任务之间相互独立、互不影响,管理方便。任务执行完成之后创建的 Flink集群也会消失,不会额外占用资源,按需使用,这使资源利用率达到最大,在工作中推荐使用这种模式。
1.4.4.2.FLINK on yarn集群部署
依赖环境
本次部署hadoop3.1.0版本
Hdfs & yarn
Flink on Yarn的两种使用方式
第一种: 在Yarn中初始化一个flink集群,开辟指定的资源,以后提交任务都向这里提交。这个flink集群会常驻在Yarn集群中,除非手工停止。
第二中(推荐): 每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。
1.4.5.Flink on Yarn的两种运行方式
1.4.5.1.第一种[yarn-session.sh(开辟资源)+flink run(提交任务)]
启动一个一直运行的flink集群
./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 [-d]
附着到一个已存在的flink yarn session
./bin/yarn-session.sh -id application_1463870264508_0029
执行任务
./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hadoop100:9000/LICENSE -output hdfs://hadoop100:9000/wordcount-result.txt
停止任务 【web界面或者命令行执行cancel命令】
关于命令含义:
[root@hadoop4 flink-1.11.1]# ./bin/yarn-session.sh --help
Usage:必选:-n,--container <arg> 表示分配容器的数量(也就是 TaskManager 的数量)可选-at,--applicationType <arg> Set a custom application type for the application on YARN-D <property=value> use value for given property (动态属性)-d,--detached If present, runs the job in detached mode (独立运行)-h,--help Help for the Yarn session CLI.-id,--applicationId <arg> Attach to running YARN session (附着到一个已存在的flink yarn session)-j,--jar <arg> Path to Flink jar file -jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) (JobManager容器的内存大小)-m,--jobmanager <arg> Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.-nl,--nodeLabel <arg> Specify YARN node label for the YARN application-nm,--name <arg> Set a custom name for the application on YARN (在YARN上为一个自定义的应用设置一个名字)-q,--query Display available YARN resources (memory, cores) 显示yarn中可用的资源 (内存, cpu核数)-qu,--queue <arg> Specify YARN queue. 指定YARN队列-s,--slots <arg> taskmanager分配多少个slots(处理进程)。建议设置为每个机器的CPU核数-t,--ship <arg> Ship files in the specified directory (t for transfer) -tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) 每个TaskManager容器的内存 [in MB]-yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode 针对HA模式在zookeeper上创建NameSpace
1.4.5.2.第二种[flink run -m yarn-cluster(开辟资源 + 提交任务)]
启动集群,执行任务
$FLINK_HOME/bin/flink run -d -m yarn-cluster \
-p 1 \
-yjm 1024m \
-ytm 1024m \
-ynm IssuePassComplete \
-c com.tianque.issue.flink.handler.IssuePassCompleteFlinkHandlerByCustomRedisSink \
/home/admin/installed/flink-jars/IssuePassCompleteFlinkHandler.jar \
--port 38092
注意:client端必须要设置YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_HOME环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败
1.4.5.3…/bin/yarn-session.sh命令分析
用法: 必选 -n,--container <arg> 分配多少个yarn容器 (=taskmanager的数量) 可选 -D <arg> 动态属性 -d,--detached 独立运行 -jm,--jobManagerMemory <arg> JobManager的内存 [in MB] -nm,--name 在YARN上为一个自定义的应用设置一个名字 -q,--query 显示yarn中可用的资源 (内存, cpu核数) -qu,--queue <arg> 指定YARN队列. -s,--slots <arg> 每个TaskManager使用的slots数量 -tm,--taskManagerMemory <arg> 每个TaskManager的内存 [in MB] -z,--zookeeperNamespace <arg> 针对HA模式在zookeeper上创建NameSpace -id,--applicationId <yarnAppId> YARN集群上的任务id,附着到一个后台运行的yarn session中
1.4.5.4./bin/flink run命令分析
run [OPTIONS] <jar-file> <arguments> "run" 操作参数:
-c,--class <classname> 如果没有在jar包中指定入口类,则需要在这里通过这个参数指定
-m,--jobmanager <host:port> 指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager
-p,--parallelism <parallelism> 指定程序的并行度。可以覆盖配置文件中的默认值。默认查找当前yarn集群中已有的yarn-session信息中的jobmanager【/tmp/.yarn-properties-root】:
./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1连接指定host和port的jobmanager:
./bin/flink run -m hadoop100:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1启动一个新的yarn-session:
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
注意:yarn session命令行的选项也可以使用./bin/flink 工具获得。它们都有一个y或者yarn的前缀
例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
再如以下运行案例:
如果一直报akka,AskTimeoutException错误,可以尝试添加akka.ask.timeout=120000s, 依然显示该错误。
注意这里数字和时间单位之间,必须有个空格。
flink run \-m yarn-cluster \-ynm applicaiton-name \-yqu queue \-p 4000 \-ys 4 \-yjm 10g \-ytm 10g \-yD akka.client.timeout="120000 s" \-yD akka.ask.timeout="120000 s" \-yD web.timeout=120000 \-yD rest.retry.max-attempts=100 \-c your-class \your-jar your-input your-output
1.4.5.5.Flink在yarn上的分布
Flink on Yarn
ResourceManager
NodeManager
AppMaster(jobmanager和它运行在一个Container中)
Container(taskmanager运行在上面)
使用on-yarn的好处
提高集群机器的利用率
一套集群,可以执行MR任务,spark任务,flink任务等…
1.4.5.6.Flink on yarn内部实现
步骤如下:
1、上传flink jar包和配置
2、申请资源和请求AppMaster容器
3、分配AppMaster容器
分配worker
04_Flink-HA高可用、Standalone集群模式、Flink-Standalone集群重要参数详解、集群节点重启及扩容、启动组件、Flink on Yarn、启动命令等相关推荐
- 蚁群算法--旅行商(TSP)问题详解
蚁群算法--旅行商(TSP)问题详解 蚁群算法 问题与分析 结果显示与分析 蚁群算法 蚁群算法(ant colony optimization)最早是由Marco Dorigo等人在1991年提出,他 ...
- 大数据实操篇 No.11-Flink on Yarn集群HA高可用部署及使用
第1章 简介 1.1 概要介绍 Flink on Yarn的HA高可用模式,首先依赖于Yarn自身的高可用机制(ResourceManager高可用),并通过Yarn对JobManager进行管理,当 ...
- Centos7.6+Hadoop 3.1.2(HA)+Zookeeper3.4.13+Hbase1.4.9(HA)+Hive2.3.4+Spark2.4.0(HA)高可用集群搭建
本文转自https://mshk.top/2019/03/centos-hadoop-zookeeper-hbase-hive-spark-high-availability/,因为原链接打不开,故在 ...
- Hadoop 3.1.2(HA)+Zookeeper3.4.13+Hbase1.4.9(HA)+Hive2.3.4+Spark2.4.0(HA)高可用集群搭建
目录 目录 1.前言 1.1.什么是 Hadoop? 1.1.1.什么是 YARN? 1.2.什么是 Zookeeper? 1.3.什么是 Hbase? 1.4.什么是 Hive 1.5.什么是 Sp ...
- HA高可用集群与RHCS集群套件
一.HA基本概念 linux高可用集群(HA)原理详解:https://blog.csdn.net/xiaoyi23000/article/details/80163344 负载均衡.集群.高可用(H ...
- Hadoop HA 高可用集群搭建
Hadoop HA 高可用集群搭建 一.首先配置集群信息 1 vi /etc/hosts 二.安装zookeeper 1.解压至/usr/hadoop/下 1 tar -zxvf zookeeper- ...
- Kubeadm 1.9 HA 高可用集群本地离线镜像部署【已验证】
k8s介绍 k8s 发展速度很快,目前很多大的公司容器集群都基于该项目,如京东,腾讯,滴滴,瓜子二手车,易宝支付,北森等等. kubernetes1.9版本发布2017年12月15日,每三个月一个迭代 ...
- [K8s 1.9实践]Kubeadm 1.9 HA 高可用 集群 本地离线镜像部署
Kubeadm HA 1.9 高可用 集群 本地离线部署 k8s介绍 k8s 发展速度很快,目前很多大的公司容器集群都基于该项目,如京东,腾讯,滴滴,瓜子二手车,易宝支付,北森等等. kubernet ...
- 基于Kubeadm部署Kubernetes1.13.3 HA 高可用集群
Table of Contents 目录 基于Kubeadm部署Kubernetes1.13.3 HA 高可用集群 01. 部署目的 1.1 Kubernetes的特性 1.2 贴微服务,开发环境快速 ...
最新文章
- ubuntu下wps无法使用搜狗输入法输入中文
- MAC能登录微信,浏览器连不上网
- 冒泡排序--通过冒泡算法让数组中最大的值成为数组中最后一个值
- Keepalived配置日志文件
- 解方程(codevs 3732)
- jquery css 定义背景不重复
- SAP UI5 resource servlet
- Another way to define Angular controller
- 【IT笔试面试题整理】位操作
- 如何打造组织级敏捷,你想知道的都在这里!
- asp.net ajax的学习第一篇
- git 拉取远程其他分支代码_git切换远程分支并拉取远程分支代码
- Visual studio Code的C/C++开发环境搭建
- 【Python】元组和列表相关知识总结
- vim中实现javascript代码自动完成功能
- axivion和astree_基于LabVIEW的IVI编程 IVI Programme Based on LabVIEW.pdf
- ATA/SATA/SCSI/SAS/FC总线简介
- Entity Framework 无法加载指定的元数据资源。
- 学堂云大学计算机答案,学堂云的答案哪里找?
- windows下搭建voip服务器
热门文章
- 深度学习英文文献_文献速递 | 预测术后30天死亡率的深度学习模型
- MYSQL数据库的优化(二)
- Python使用远程仓库时建议忽略的文件
- Django框架(21.Django中设置cookie以及获取cookie)
- VTK:颜色字形用法实战
- wxWidgets:wxWindowCreateEvent类用法
- boost::sort模块实现spreadsort 反向字符串排序示例
- boost::tuple用法的测试程序
- boost::geometry:::detail::overlay::get_clusters用法的测试程序
- 测试core :: demangled_name