Yarn运行Flink作业

link支持多种部署模式:本地、集群(Standalone/YARN)、云(GCE/EC2)。Standalone部署模式与Spark类似,这里,我们看一下Flink on YARN的部署模式,如下图所示:

实际Flink也实现了满足在YARN集群上运行的各个组件:Flink YARN Client负责与YARN RM通信协商资源请求,Flink JobManager和Flink TaskManager分别申请到Container去运行各自的进程。通过上图可以看到,YARN AM与Flink JobManager在同一个Container中,这样AM可以知道Flink JobManager的地址,从而AM可以申请Container去启动Flink TaskManager。待Flink成功运行在YARN集群上,Flink YARN Client就可以提交Flink Job到Flink JobManager,并进行后续的映射、调度和计算处理。
在YARN上启动一个Flink主要有两种方式:(1)、启动一个YARN session(Start a long-running Flink cluster on YARN);(2)、直接在YARN上提交运行Flink作业(Run a Flink job on YARN)。

Flink YARN Session

首先,看下yarn-session.sh脚本参数

yarn-session.sh脚本参数
用法:必须:-n,--container <arg> 要分配的YARN容器数(=任务管理器数)可选的-D <property=value> 使用给定属性的值-d,--detached 如果存在,则以分离模式运行作业,不启动客户端进程,不打印YARN返回信息-h,--help -id,--applicationId <arg> 附加到正在运行的YARN会话-j,--jar <arg> Flink jar文件的路径-jm,--jobManagerMemory <arg> 具有可选单元的JobManager容器的内存(默认值:MB)-m,--jobmanager <arg> 要连接的JobManager(主站)的地址。 使用此标志连接到指定地址的JobManager配置中的:-n,--container <arg> 要分配的YARN容器数(=任务管理器数)-nl,--nodeLabel <arg> 为YARN应用程序指定YARN节点标签-nm,--name <arg> 在YARN上为应用程序设置自定义名称-q,--query 显示可用的YARN资源(内存,内核)-qu,--queue <arg> 指定YARN队列-s,--slots <arg> 每个TaskManager的槽-sae,--shutdownOnAttachedExit 如果作业以附加模式提交,请在CLI突然终止时执行尽力而为的群集关闭,例如,响应用户中断,例如键入Ctrl + C.-st,--streaming 流模式启动flink-t,--ship <arg> 在指定目录中发送文件(t用于传输)-tm,--taskManagerMemory <arg> 没taskmanager内存数-yd,--yarndetached 如果存在,则以分离模式运行作业(不建议使用;请改为使用非YARN特定选项)-z,--zookeeperNamespace <arg> 命名空间,用于为高可用性模式创建Zookeeper子路径

在启动的是可以指定TaskManager的个数以及内存(默认是1G),也可以指定JobManager的内存,但是JobManager的个数只能是一个。好了,我们开启动一个YARN session:./bin/yarn-session.sh -n 10 -tm 8192 -s 32
上面命令启动了10个TaskManager,每个管理器具有8 GB内存和32个处理插槽(是每个TaskManager,默认是1个核)。
注:以上命令实际启动了11个容器(即使只请求了10个容器),因为ApplicationMaster和Job Manager还有一个额外的容器。
上述命令一直在终端中运行着的,此时可以通过停止unix进程(使用CTRL + C)或在客户端输入“stop”来停止yarn session。
如果想启动一个后台运行的yarn session。使用这个参数:-d 或者 --detached 在这种情况下,flink yarn client将会只提交任务到集群然后关闭自己。
附着到一个已存在的flink yarn session,可以用./bin/yarn-session.sh -id <applicationId>
如果关闭一个已存在的flink yarn session,可以用yarn application -kill <applicationId>

启动了YARN session之后我们如何运行作业呢?很简单,我们可以使用./bin/flink脚本提交作业,同样我们来看看这个脚本支持哪些参数:

flink 命令脚本参数说明操作“run”编译并运行程序。run 操作选项-c,--class <classname> 具有程序入口点的类“main”方法或“getPlan()”方法。仅在JAR文件未在其清单中指定类时才需要。-C,--classpath <url> 向集群中所有节点上的每个用户代码类加载器添加URL。路径必须指定协议(例如file://)并且可以在所有节点上访问(例如,通过NFS共享)。您可以多次使用此选项来指定多个URL。该协议必须由{@link java.net.URLClassLoader}支持。-d,--detached 如果存在,则以分离模式运行作业(不启动客户端,集群提交方式,不在客户端打印返回信息)-n,--allowNonRestoredState 允许跳过无法恢复的保存点状态。如果在触发保存点时从程序中删除了作为程序一部分的运算符,则需要允许此操作。-p,--parallelism <parallelism> 运行程序的并行性。可选标志,用于覆盖配置中指定的默认值。-q,--sysoutLogging 如果存在,则将日志记录输出抑制为标准输出-s,--fromSavepoint <savepointPath> 保存点的路径,用于从中恢复作业(例如hdfs:///flink/savepoint-1537)。-sae,--shutdownOnAttachedExit 如果作业以附加模式提交,请在CLI突然终止时执行尽力而为的群集关闭,例如,响应用户中断,例如键入Ctrl + C.YARN集群模式选项:-d,--detached 如果存在,则以分离模式运行作业-m,--jobmanager <arg> 要连接的JobManager(主站)的地址。使用此标志连接到与配置中指定的JobManager不同的JobManager。-yD <property=value> 使用给定属性的值-yd,--yarndetached 如果存在,则以分离模式运行作业(不建议使用;请改为使用非YARN特定选项)-yh,--yarnhelp yarn session cli帮助(“-yh”不是有效的操作)-yid,--yarnapplicationId <arg> 附加到正在运行的YARN会话-yj,--yarnjar <arg> Flink jar文件的路径-yjm,--yarnjobManagerMemory <arg> 有可选单元的JobManager容器的内存(默认值:MB)-yn,--yarncontainer <arg> 要分配的YARN容器数(=任务管理器数)ynl,--yarnnodeLabel <arg> 为YARN应用程序指定YARN节点标签-ynm,--yarnname <arg> 在YARN上为应用程序设置自定义名称-yq,--yarnquery 显示可用的YARN资源(内存,内核)-yqu,--yarnqueue <arg> 指定YARN队列-ys,--yarnslots <arg> 每个TaskManager的插槽数-yst,--yarnstreaming 以流模式启动Flink-yt,--yarnship <arg> 在指定目录中发送文件(t用于传输)-ytm,--yarntaskManagerMemory <arg> 具有可选单元的每个TaskManager容器的内存(默认值:MB)-yz,--yarnzookeeperNamespace <arg> 命名空间,用于为高可用性模式创建Zookeeper子路径-z,--zookeeperNamespace <arg> 命名空间,用于为高可用性模式创建Zookeeper子路径默认模式的选项:-m,--jobmanager <arg> 要连接的JobManager(主站)的地址。 使用此标志连接到与配置文件中指定的JobManager不同的JobManager。-z,--zookeeperNamespace <arg> 命名空间,用于为高可用性模式创建Zookeeper子路径操作“info”显示程序的优化执行计划(JSON)。语法:info [OPTIONS] <jar-file> <arguments>
“info”动作选项: -c,--class <classname> 具有程序入口点的类(“main”方法或“getPlan()”方法。仅在JAR文件未在其清单中指定类时才需要。
-p,--parallelism <parallelism> 运行程序的并行性。 可选标志,用于覆盖配置中指定的默认值。操作“list”列出了运行和计划的程序。
语法: list [OPTIONS]
"list" 操作选项-r,--running 仅显示正在运行的程序及其JobID-s,--scheduled Show only scheduled programs and their JobIDs
yarn-cluster 模式选项-m,--jobmanager <arg> 要连接的JobManager(主站)的地址。 使用此标志连接到与配置中指定的JobManager不同的JobManager。-yid,--yarnapplicationId <arg> 附加到正在运行的YARN会话-z,--zookeeperNamespace <arg> 命名空间,用于为高可用性模式创建Zookeeper子路径
默认模式的选项:-m,--jobmanager <arg>-z,--zookeeperNamespace <arg>操作“stop”会停止正在运行的程序(仅限流式处理作业)。
语法:stop [OPTIONS] <Job ID>
"stop"操作选项:
yarn-cluster 模式选项-m,--jobmanager <arg> 指定需要操作的非默认的jobmanager地址-yid,--yarnapplicationId <arg> 追加到指定的yarn容器-z,--zookeeperNamespace <arg> 命名空间,用于为高可用性模式创建Zookeeper子路径
默认选项-m,--jobmanager <arg>-z,--zookeeperNamespace <arg>操作“cancel”取消正在运行的程序。
语法:cancel [OPTIONS] <Job ID>
"cancel" 操作选项-s,--withSavepoint <targetDirectory> 触发保存点并取消作业。 目标目录是可选的。 如果未指定目录,则使用配置的缺省目录(state.savepoints.dir)。yarn-cluster 模式选项-m,--jobmanager <arg> 指定需要操作的非默认的jobmanager地址-yid,--yarnapplicationId <arg> 追加到指定的yarn容器-z,--zookeeperNamespace <arg> 命名空间,用于为高可用性模式创建Zookeeper子路径
默认模式的选项:-m,--jobmanager <arg>-z,--zookeeperNamespace <arg>操作"savepoint" 触发正在运行的作业的保存点或处置现有作业。
语法:savepoint [OPTIONS] <Job ID> [<target directory>]
"savepoint"操作选项-d,--dispose <arg> 处置的保存点的路径。-j,--jarfile <jarfile> flink程序jar文件
yarn-cluster 模式选项-m,--jobmanager <arg> 要连接的JobManager(主站)的地址。 使用此标志连接到与配置中指定的JobManager不同的JobManager。-yid,--yarnapplicationId <arg> 附加到正在运行的YARN会话-z,--zookeeperNamespace <arg> 命名空间,用于为高可用性模式创建Zookeeper子路径
默认模式的选项:-m,--jobmanager <arg>-z,--zookeeperNamespace <arg>操作"modify"修改正在运行的作业(例如,并行性的改变)。
语法:modify <Job ID> [OPTIONS]
"modify" 操作选项-h,--help -p,--parallelism <newParallelism> 指定作业的新并行性。-v,--verbose 不推荐使用此选项。
yarn-cluster 模式选项-m,--jobmanager <arg> 指定需要操作的非默认的jobmanager地址-yid,--yarnapplicationId <arg> 追加到指定的yarn容器-z,--zookeeperNamespace <arg> 命名空间,用于为高可用性模式创建Zookeeper子路径
默认模式的选项:-m,--jobmanager <arg>-z,--zookeeperNamespace <arg>

可以自动获取到YARN session的地址,然后我们以WordCount程序启动程序:

./bin/flink run ./examples/batch/WordCount.jar --input hdfs:///user/iteblog/LICENSE --output hdfs:///user/iteblog/result.txt

Run a single Flink job on YARN(推荐)

我们也可以不需要事先启动YARN session,而直接启动一个Flink作业,在这个作业运行完session也就结束了。

#命令行启动示例:
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar --input hdfs:///user/iteblog/LICENSE --output hdfs:///user/iteblog/result.txt

上面的命令同样会启动一个类似于YARN session启动的页面。其中的-yn是指TaskManager的个数,必须指定。

本地提交到yarn_Yarn运行Flink作业 0449相关推荐

  1. 本地提交spark_spark快速入门(三)-------spark部署及运行模式

    spark支持多种部署方案,包括spark自带的standalone资源调度模式(StandAlone):运行在hadoop的yarn资源调度框架中(SparkOnYARN):local本地模式:可以 ...

  2. flink source 同步_如何生成 Flink 作业的交互式火焰图?

    原标题:如何生成 Flink 作业的交互式火焰图? 简介:Flink 是目前最流行的大数据及流式计算框架之一,用户可以使用 Java/Scala/Python 的 DataStream 接口或者标准 ...

  3. MapReduce Job本地提交过程源码跟踪及分析

    MapReduce Job作业的提交过程可以分为本地提交模式与集群模式提交,这两种提交模式与org.apache.hadoop.mapred.LocalJobRunner.org.apache.had ...

  4. 如何生成 Flink 作业的交互式火焰图?

    简介: Flink 是目前最流行的大数据及流式计算框架之一,用户可以使用 Java/Scala/Python 的 DataStream 接口或者标准 SQL 语言来快速实现一个分布式高可用的流式应用, ...

  5. 如何处理分析Flink作业反压的问题?

    本文分享自华为云社区<一个Flink作业反压的问题分析>,原文作者:Yunz Bao . 反压(backpressure)是实时计算应用开发中,特别是流式计算中,十分常见的问题.反压意味着 ...

  6. Blink/Flink作业 性能优化配置及原理

    高性能作业指南 本文通过代码和一些配置信息来优化blink/flink作业的性能. 共分为几部分: 1.group aggregate优化: 开启minibatch,开启localglobal,开启p ...

  7. 如何撤消Git中的最新本地提交?

    我不小心将错误的文件提交给Git ,但是我还没有将提交推送到服务器. 如何撤消本地存储库中的那些提交? #1楼 我想撤消我们共享存储库中的最新五次提交. 我查找了要回滚的修订版ID. 然后我输入以下内 ...

  8. 在Apache Hadoop(多节点群集)中运行Map-Reduce作业

    我们将在这里描述在多节点集群中的Apache Hadoop中运行MapReduce Job的过程. 要在多节点群集中设置Apache Hadoop ,可以阅读设置Apache Hadoop多节点群集 ...

  9. Java 远程mapduce_java – 如何远程运行mapreduce作业

    当我尝试远程运行map-reduce作业(字数计数示例)时遇到了一些问题.我搜索谷歌后仍然无法实现我的目标.我刚刚看到很少关于远程调用map-reduce作业的主题. 以下是问题: >首先,我遇 ...

最新文章

  1. Spring思维导图(MVC篇)
  2. 特斯拉上海超级工厂开工 预计今夏完成初期建设...
  3. 全国计算机二级考试c语言指针,全国计算机二级考试C语言 指针精讲课件.ppt
  4. Http协议中的方法
  5. Microsoft Teams:团队Owner离开公司后,我们该怎么做?
  6. Docker简介和安装
  7. onsize里获取的对话框大小有时会包含滚动条_Python实战分析:获取数据
  8. FileStream构造函数
  9. 爬虫-----自定义框架
  10. python代码_Python发送邮件基础知识与代码讲解!
  11. 2022Android SDK下载与安装
  12. abaqus与python后处理_abaqus用Python批量后处理教程!如何从abaqus导出python
  13. NYOJ 1132 promise me a medal (判断两线段是否相交)
  14. 淘宝网nbsp;E客服帐号nbsp;登录阿里旺旺时nbsp;…
  15. Shell 循环检查的格式
  16. 学习Linux命令:关于ssh命令
  17. 在Power BI中用DAX新建列的方式进行累计求和
  18. java po vo bo是什么以及_JAVA中(PO,VO,TO,BO,DAO,POJO)分别是指什么
  19. FormData是什么
  20. MATLAB得到贝塞尔函数零点

热门文章

  1. Oracle 安装报错 [INS-06101] IP address of localhost could not be determined 解决方法[转]
  2. Oculus关于Internal Error:OVR53225466报错解决方法
  3. 【ZeroClipboard is not defined】的解决方法
  4. 阿里云windows server 2012安装.net framework3.5失败解决方案
  5. 【干货】300余份数字化资料包免费下载(报告、白皮书、方案、政策等)
  6. pytorch中tensor、numpy.array、list三者互相转换
  7. 《统计学习方法》代码全解析——第一部分统计学习方法概论
  8. SGPN: Similarity Group Proposal Network for 3D Point Cloud Instance Segmentation
  9. 浙大PAT乙级1004. 成绩排名 (20)
  10. bst java_图解:二叉搜索树算法(BST)