  • 《Flink教程(01)- Flink知识图谱》
  • 《Flink教程(02)- Flink入门》


在上一篇博客 《Flink教程(01)- Flink知识图谱》里面的物理部署层,我们知道了Flink有几种部署模式,根据本地或集群分为以下几种:

  • Local(本地单机模式):学习测试时使用
  • Standalone(独立集群模式):Flink自带集群,开发测试环境使用
  • StandaloneHA(独立集群高可用模式):Flink自带集群,开发测试环境使用
  • On Yarn(计算资源统一由Hadoop YARN管理):生产环境使用


2.1 工作原理


  1. Flink程序由JobClient进行提交;
  2. JobClient将作业提交给JobManager
  3. JobManager负责协调资源分配和作业执行,资源分配完成后,任务将提交给相应的TaskManager
  4. TaskManager启动一个线程以开始执行,TaskManager会向JobManager报告状态更改,如开始执行,正在进行或已完成;
  5. 作业执行完成后,结果将发送回客户端(JobClient)。

2.2 安装部署


  • https://archive.apache.org/dist/flink/



tar -zxvf flink-1.12.0-bin-scala_2.12.tgz


chown -R root:root /export/server/flink-1.12.0


mv flink-1.12.0 flink
ln -s /export/server/flink-1.12.0 /export/server/flink

2.3 测试验证

1. 准备文件/root/words.txt

vim /root/words.txt


hello me you her
hello me you
hello me

2. 启动Flink本地“集群”



 - TaskManagerRunner- StandaloneSessionClusterEntrypoint

4.访问FlinkWeb UI: http://node1:8081/#/overview


5. 执行官方示例:

/export/server/flink/bin/flink run
/export/server/flink/examples/batch/WordCount.jar --input
/root/words.txt --output /root/out

6. 停止Flink


启动shell交互式窗口(目前所有Scala 2.12版本的安装包暂时都不支持Scala Shell)

/export/server/flink/bin/start-scala-shell.sh local


benv.readTextFile("/root/words.txt").flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1).print()



3.1 工作原理


  1. client客户端提交任务给JobManager
  2. JobManager负责申请任务运行所需要的资源并管理任务和资源;
  3. JobManager分发任务给TaskManager执行;
  4. TaskManager定期向JobManager汇报状态。

3.2 安装部署


  • 服务器: node1(Master + Slave): JobManager + TaskManager
  • 服务器: node2(Slave): TaskManager
  • 服务器: node3(Slave): TaskManager


vim /export/server/flink/conf/flink-conf.yaml


jobmanager.rpc.address: node1
taskmanager.numberOfTaskSlots: 2
web.submit.enable: true#历史服务器
jobmanager.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/
historyserver.web.address: node1
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/


vim /export/server/flink/conf/masters




vim /export/server/flink/conf/workers




vim /etc/profile


export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop


scp -r /export/server/flink node2:/export/server/flink
scp -r /export/server/flink node3:/export/server/flink
scp  /etc/profile node2:/etc/profile
scp  /etc/profile node3:/etc/profile

for i in {2..3}; do scp -r flink node$i:$PWD; done


source /etc/profile

3.3 测试验证

1. 启动集群,在node1上执行如下命令



/export/server/flink/bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all
/export/server/flink/bin/taskmanager.sh start|start-foreground|stop|stop-all

2. 启动历史服务器

/export/server/flink/bin/historyserver.sh start

3. 访问Flink UI界面或使用jps查看

  • http://node1:8081/#/overview
  • http://node1:8082/#/overview

TaskManager界面:可以查看到当前Flink集群中有多少个TaskManager,每个TaskManagerslots、内存、CPU Core是多少

4. 执行官方测试案例

/export/server/flink/bin/flink run
/export/server/flink/examples/batch/WordCount.jar --input
hdfs://node1:8020/wordcount/input/words.txt --output
hdfs://node1:8020/wordcount/output/result.txt  --parallelism 2

5. 查看历史日志

  • http://node1:50070/explorer.html#/flink/completed-jobs
  • http://node1:8082/#/overview

6. 停止Flink集群


4.1 工作原理

从之前的架构中我们可以很明显的发现 JobManager有明显的单点问题(SPOF,single point of failure)。JobManager 肩负着任务调度以及资源分配,一旦 JobManager出现意外,其后果可想而知。


  • Zookeeper的帮助下,一个 StandaloneFlink集群会同时有多个活着的 JobManager,其中只有一个处于工作状态,其他处于Standby状态。
  • 当工作中的 JobManager 失去连接后(如宕机或Crash),Zookeeper会从 Standby中选一个新的 JobManager 来接管 Flink 集群。

4.2 安装部署


  • 服务器: node1(Master + Slave): JobManager + TaskManager
  • 服务器: node2(Master + Slave):JobManager + TaskManager
  • 服务器:node3(Slave): TaskManager


zkServer.sh status
zkServer.sh stop
zkServer.sh start






vim /export/server/flink/conf/flink-conf.yaml


state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints
high-availability: zookeeper
high-availability.storageDir: hdfs://node1:8020/flink/ha/
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181


state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints
high-availability: zookeeper
# 存储JobManager的元数据到HDFS
high-availability.storageDir: hdfs://node1:8020/flink/ha/
# 配置ZK集群地址
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181


vim /export/server/flink/conf/masters



scp -r /export/server/flink/conf/flink-conf.yaml node2:/export/server/flink/conf/
scp -r /export/server/flink/conf/flink-conf.yaml node3:/export/server/flink/conf/
scp -r /export/server/flink/conf/masters node2:/export/server/flink/conf/
scp -r /export/server/flink/conf/masters node3:/export/server/flink/conf/


vim /export/server/flink/conf/flink-conf.yaml


jobmanager.rpc.address: node2





cat /export/server/flink/log/flink-root-standalonesession-0-node1.log




  • 下载地址:https://flink.apache.org/downloads.html
  • 放入lib目录(cd /export/server/flink/lib
  • 分发(for i in {2..3}; do scp -r flink-shaded-hadoop-2-uber-2.7.5-10.0.jar node$i:$PWD; done




4.3 测试验证

1. 访问WebUI

  • http://node1:8081/#/job-manager/config
  • http://node2:8081/#/job-manager/config

2. 执行wc

/export/server/flink/bin/flink run

3. kill掉其中一个master


/export/server/flink/bin/flink run

5. 停止集群


5.1 使用Yarn优势

在实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下:



原因3:基于Yarn调度系统,能够自动化地处理各个角色的 Failover(容错)

  • JobManager进程和TaskManager进程都由 Yarn NodeManager监控
  • 如果 JobManager进程异常退出,则 Yarn ResourceManager会重新调度 JobManager到其他机器
  • 如果TaskManager 进程异常退出,JobManager会收到消息并重新向Yarn ResourceManager 申请资源,重新启动 TaskManager

5.2 工作原理


  1. Client上传jar包和配置文件到HDFS集群上
  2. ClientYarn ResourceManager提交任务并申请资源
  3. ResourceManager分配Container资源并启动AppMaster
  4. 然后AppMaster加载FlinkJar包和配置构建环境,启动JobManagerJobManagerApplicationMaster运行在同一个container上。
  5. 一旦它们被成功启动,AppMaster就知道JobManager的地址(AppMaster它自己所在的机器),它就会为TaskManager生成一个新的Flink配置文件(他们就可以连接到JobManager),这个配置文件也被上传到HDFS上。
  6. 此外,AppMaster容器也提供了Flinkweb服务接口,YARN所分配的所有端口都是临时端口,这允许用户并行执行多个Flink
  7. ApplicationMasterResourceManager申请工作资源,NodeManager加载FlinkJar包和配置构建环境并启动TaskManager
  8. TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务

5.3 两种方式

5.3.1 Session模式


5.3.2 Per-Job模式


5.4 安装部署


vim /export/server/hadoop/etc/hadoop/yarn-site.xml


<!-- 关闭yarn内存检查 -->


  • 是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true
  • 在这里面我们需要关闭,因为对于flink使用yarn模式下,很容易内存超标,这个时候yarn会自动杀掉job


scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node2:/export/server/hadoop/etc/hadoop/yarn-site.xml
scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node3:/export/server/hadoop/etc/hadoop/yarn-site.xml



5.5 测试验证

5.5.1 Session模式

yarn-session.sh(开辟资源) +flink run(提交任务)

1. 在yarn上启动一个Flink会话,node1上执行以下命令

/export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d


# -n 表示申请2个容器,这里指的就是多少个taskmanager
# -tm 表示每个TaskManager的内存大小
# -s 表示每个TaskManager的slots数量
# -d 表示以后台程序方式运行

WARN org.apache.hadoop.hdfs.DFSClient - Caught exception

2. 查看UI界面:http://node1:8088/cluster

3.使用flink run提交任务

/export/server/flink/bin/flink run


/export/server/flink/bin/flink run

4. 通过上方的ApplicationMaster可以进入Flink的管理界面

5. 关闭yarn-session:

yarn application -kill application_1599402747874_0001

rm -rf /tmp/.yarn-properties-root

5.5.2 Per-Job分离模式

1. 直接提交job

/export/server/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024
# -m  jobmanager的地址
# -yjm 1024 指定jobmanager的内存信息
# -ytm 1024 指定taskmanager的内存信息

2. 查看UI界面:http://node1:8088/cluster


在之前版本中如果使用的是flink on yarn方式,想切换回standalone模式的话,如果报错需要删除:【/tmp/.yarn-properties-root】即:rm -rf /tmp/.yarn-properties-root

06 参数总结

[root@node1 bin]# /export/server/flink/bin/flink --help
./flink <ACTION> [OPTIONS] [ARGUMENTS]The following actions are available:Action "run" compiles and runs a program.Syntax: run [OPTIONS] <jar-file> <arguments>"run" action options:-c,--class <classname>               Class with the program entry point("main()" method). Only needed if theJAR file does not specify the class inits manifest.-C,--classpath <url>                 Adds a URL to each user codeclassloader  on all nodes in thecluster. The paths must specify aprotocol (e.g. file://) and beaccessible on all nodes (e.g. by meansof a NFS share). You can use thisoption multiple times for specifyingmore than one URL. The protocol mustbe supported by the {@linkjava.net.URLClassLoader}.-d,--detached                        If present, runs the job in detachedmode-n,--allowNonRestoredState           Allow to skip savepoint state thatcannot be restored. You need to allowthis if you removed an operator fromyour program that was part of theprogram when the savepoint wastriggered.-p,--parallelism <parallelism>       The parallelism with which to run theprogram. Optional flag to override thedefault value specified in theconfiguration.-py,--python <pythonFile>            Python script with the program entrypoint. The dependent resources can beconfigured with the `--pyFiles`option.-pyarch,--pyArchives <arg>           Add python archive files for job. Thearchive files will be extracted to theworking directory of python UDFworker. Currently only zip-format issupported. For each archive file, atarget directory be specified. If thetarget directory name is specified,the archive file will be extracted toa name can directory with thespecified name. Otherwise, the archivefile will be extracted to a directorywith the same name of the archivefile. The files uploaded via thisoption are accessible via relativepath. '#' could be used as theseparator of the archive file path andthe target directory name. Comma (',')could be used as the separator tospecify multiple archive files. Thisoption can be used to upload thevirtual environment, the data filesused in Python UDF (e.g.: --pyArchivesfile:///tmp/py37.zip,file:///tmp/data.zip#data --pyExecutablepy37.zip/py37/bin/python). The datafiles could be accessed in Python UDF,e.g.: f = open('data/data.txt', 'r').-pyexec,--pyExecutable <arg>         Specify the path of the pythoninterpreter used to execute the pythonUDF worker (e.g.: --pyExecutable/usr/local/bin/python3). The pythonUDF worker depends on Python 3.5+,Apache Beam (version == 2.23.0), Pip(version >= 7.1.0) and SetupTools(version >= 37.0.0). Please ensurethat the specified environment meetsthe above requirements.-pyfs,--pyFiles <pythonFiles>        Attach custom python files for job.These files will be added to thePYTHONPATH of both the local clientand the remote python UDF worker. Thestandard python resource file suffixessuch as .py/.egg/.zip or directory areall supported. Comma (',') could beused as the separator to specifymultiple files (e.g.: --pyFilesfile:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip).-pym,--pyModule <pythonModule>       Python module with the program entrypoint. This option must be used inconjunction with `--pyFiles`.-pyreq,--pyRequirements <arg>        Specify a requirements.txt file whichdefines the third-party dependencies.These dependencies will be installedand added to the PYTHONPATH of thepython UDF worker. A directory whichcontains the installation packages ofthese dependencies could be specifiedoptionally. Use '#' as the separatorif the optional parameter exists(e.g.: --pyRequirementsfile:///tmp/requirements.txt#file:///tmp/cached_dir).-s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the jobfrom (for examplehdfs:///flink/savepoint-1537).-sae,--shutdownOnAttachedExit        If the job is submitted in attachedmode, perform a best-effort clustershutdown when the CLI is terminatedabruptly, e.g., in response to a userinterrupt, such as typing Ctrl + C.Options for Generic CLI mode:-D <property=value>   Allows specifying multiple generic configurationoptions. The available options can be found athttps://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html-e,--executor <arg>   DEPRECATED: Please use the -t option instead which isalso available with the "Application Mode".The name of the executor to be used for executing thegiven job, which is equivalent to the"execution.target" config option. The currentlyavailable executors are: "remote", "local","kubernetes-session", "yarn-per-job", "yarn-session".-t,--target <arg>     The deployment target for the given application,which is equivalent to the "execution.target" configoption. For the "run" action the currently availabletargets are: "remote", "local", "kubernetes-session","yarn-per-job", "yarn-session". For the"run-application" action the currently availabletargets are: "kubernetes-application","yarn-application".Options for yarn-cluster mode:-d,--detached                        If present, runs the job in detachedmode-m,--jobmanager <arg>                Set to yarn-cluster to use YARNexecution mode.-yat,--yarnapplicationType <arg>     Set a custom application type for theapplication on YARN-yD <property=value>                 use value for given property-yd,--yarndetached                   If present, runs the job in detachedmode (deprecated; use non-YARNspecific option instead)-yh,--yarnhelp                       Help for the Yarn session CLI.-yid,--yarnapplicationId <arg>       Attach to running YARN session-yj,--yarnjar <arg>                  Path to Flink jar file-yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container withoptional unit (default: MB)-ynl,--yarnnodeLabel <arg>           Specify YARN node label for the YARNapplication-ynm,--yarnname <arg>                Set a custom name for the applicationon YARN-yq,--yarnquery                      Display available YARN resources(memory, cores)-yqu,--yarnqueue <arg>               Specify YARN queue.-ys,--yarnslots <arg>                Number of slots per TaskManager-yt,--yarnship <arg>                 Ship files in the specified directory(t for transfer)-ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container withoptional unit (default: MB)-yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeepersub-paths for high availability mode-z,--zookeeperNamespace <arg>        Namespace to create the Zookeepersub-paths for high availability modeOptions for default mode:-D <property=value>             Allows specifying multiple genericconfiguration options. The availableoptions can be found athttps://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html-m,--jobmanager <arg>           Address of the JobManager to which toconnect. Use this flag to connect to adifferent JobManager than the one specifiedin the configuration. Attention: Thisoption is respected only if thehigh-availability configuration is NONE.-z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-pathsfor high availability modeAction "run-application" runs an application in Application Mode.Syntax: run-application [OPTIONS] <jar-file> <arguments>Options for Generic CLI mode:-D <property=value>   Allows specifying multiple generic configurationoptions. The available options can be found athttps://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html-e,--executor <arg>   DEPRECATED: Please use the -t option instead which isalso available with the "Application Mode".The name of the executor to be used for executing thegiven job, which is equivalent to the"execution.target" config option. The currentlyavailable executors are: "remote", "local","kubernetes-session", "yarn-per-job", "yarn-session".-t,--target <arg>     The deployment target for the given application,which is equivalent to the "execution.target" configoption. For the "run" action the currently availabletargets are: "remote", "local", "kubernetes-session","yarn-per-job", "yarn-session". For the"run-application" action the currently availabletargets are: "kubernetes-application","yarn-application".Action "info" shows the optimized execution plan of the program (JSON).Syntax: info [OPTIONS] <jar-file> <arguments>"info" action options:-c,--class <classname>           Class with the program entry point("main()" method). Only needed if the JARfile does not specify the class in itsmanifest.-p,--parallelism <parallelism>   The parallelism with which to run theprogram. Optional flag to override thedefault value specified in theconfiguration.Action "list" lists running and scheduled programs.Syntax: list [OPTIONS]"list" action options:-a,--all         Show all programs and their JobIDs-r,--running     Show only running programs and their JobIDs-s,--scheduled   Show only scheduled programs and their JobIDsOptions for Generic CLI mode:-D <property=value>   Allows specifying multiple generic configurationoptions. The available options can be found athttps://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html-e,--executor <arg>   DEPRECATED: Please use the -t option instead which isalso available with the "Application Mode".The name of the executor to be used for executing thegiven job, which is equivalent to the"execution.target" config option. The currentlyavailable executors are: "remote", "local","kubernetes-session", "yarn-per-job", "yarn-session".-t,--target <arg>     The deployment target for the given application,which is equivalent to the "execution.target" configoption. For the "run" action the currently availabletargets are: "remote", "local", "kubernetes-session","yarn-per-job", "yarn-session". For the"run-application" action the currently availabletargets are: "kubernetes-application","yarn-application".Options for yarn-cluster mode:-m,--jobmanager <arg>            Set to yarn-cluster to use YARN executionmode.-yid,--yarnapplicationId <arg>   Attach to running YARN session-z,--zookeeperNamespace <arg>    Namespace to create the Zookeepersub-paths for high availability modeOptions for default mode:-D <property=value>             Allows specifying multiple genericconfiguration options. The availableoptions can be found athttps://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html-m,--jobmanager <arg>           Address of the JobManager to which toconnect. Use this flag to connect to adifferent JobManager than the one specifiedin the configuration. Attention: Thisoption is respected only if thehigh-availability configuration is NONE.-z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-pathsfor high availability modeAction "stop" stops a running program with a savepoint (streaming jobs only).Syntax: stop [OPTIONS] <Job ID>"stop" action options:-d,--drain                           Send MAX_WATERMARK before taking thesavepoint and stopping the pipelne.-p,--savepointPath <savepointPath>   Path to the savepoint (for examplehdfs:///flink/savepoint-1537). If nodirectory is specified, the configureddefault will be used("state.savepoints.dir").Options for Generic CLI mode:-D <property=value>   Allows specifying multiple generic configurationoptions. The available options can be found athttps://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html-e,--executor <arg>   DEPRECATED: Please use the -t option instead which isalso available with the "Application Mode".The name of the executor to be used for executing thegiven job, which is equivalent to the"execution.target" config option. The currentlyavailable executors are: "remote", "local","kubernetes-session", "yarn-per-job", "yarn-session".-t,--target <arg>     The deployment target for the given application,which is equivalent to the "execution.target" configoption. For the "run" action the currently availabletargets are: "remote", "local", "kubernetes-session","yarn-per-job", "yarn-session". For the"run-application" action the currently availabletargets are: "kubernetes-application","yarn-application".Options for yarn-cluster mode:-m,--jobmanager <arg>            Set to yarn-cluster to use YARN executionmode.-yid,--yarnapplicationId <arg>   Attach to running YARN session-z,--zookeeperNamespace <arg>    Namespace to create the Zookeepersub-paths for high availability modeOptions for default mode:-D <property=value>             Allows specifying multiple genericconfiguration options. The availableoptions can be found athttps://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html-m,--jobmanager <arg>           Address of the JobManager to which toconnect. Use this flag to connect to adifferent JobManager than the one specifiedin the configuration. Attention: Thisoption is respected only if thehigh-availability configuration is NONE.-z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-pathsfor high availability modeAction "cancel" cancels a running program.Syntax: cancel [OPTIONS] <Job ID>"cancel" action options:-s,--withSavepoint <targetDirectory>   **DEPRECATION WARNING**: Cancellinga job with savepoint is deprecated.Use "stop" instead.Trigger savepoint and cancel job.The target directory is optional. Ifno directory is specified, theconfigured default directory(state.savepoints.dir) is used.Options for Generic CLI mode:-D <property=value>   Allows specifying multiple generic configurationoptions. The available options can be found athttps://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html-e,--executor <arg>   DEPRECATED: Please use the -t option instead which isalso available with the "Application Mode".The name of the executor to be used for executing thegiven job, which is equivalent to the"execution.target" config option. The currentlyavailable executors are: "remote", "local","kubernetes-session", "yarn-per-job", "yarn-session".-t,--target <arg>     The deployment target for the given application,which is equivalent to the "execution.target" configoption. For the "run" action the currently availabletargets are: "remote", "local", "kubernetes-session","yarn-per-job", "yarn-session". For the"run-application" action the currently availabletargets are: "kubernetes-application","yarn-application".Options for yarn-cluster mode:-m,--jobmanager <arg>            Set to yarn-cluster to use YARN executionmode.-yid,--yarnapplicationId <arg>   Attach to running YARN session-z,--zookeeperNamespace <arg>    Namespace to create the Zookeepersub-paths for high availability modeOptions for default mode:-D <property=value>             Allows specifying multiple genericconfiguration options. The availableoptions can be found athttps://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html-m,--jobmanager <arg>           Address of the JobManager to which toconnect. Use this flag to connect to adifferent JobManager than the one specifiedin the configuration. Attention: Thisoption is respected only if thehigh-availability configuration is NONE.-z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-pathsfor high availability modeAction "savepoint" triggers savepoints for a running job or disposes existing ones.Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]"savepoint" action options:-d,--dispose <arg>       Path of savepoint to dispose.-j,--jarfile <jarfile>   Flink program JAR file.Options for Generic CLI mode:-D <property=value>   Allows specifying multiple generic configurationoptions. The available options can be found athttps://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html-e,--executor <arg>   DEPRECATED: Please use the -t option instead which isalso available with the "Application Mode".The name of the executor to be used for executing thegiven job, which is equivalent to the"execution.target" config option. The currentlyavailable executors are: "remote", "local","kubernetes-session", "yarn-per-job", "yarn-session".-t,--target <arg>     The deployment target for the given application,which is equivalent to the "execution.target" configoption. For the "run" action the currently availabletargets are: "remote", "local", "kubernetes-session","yarn-per-job", "yarn-session". For the"run-application" action the currently availabletargets are: "kubernetes-application","yarn-application".Options for yarn-cluster mode:-m,--jobmanager <arg>            Set to yarn-cluster to use YARN executionmode.-yid,--yarnapplicationId <arg>   Attach to running YARN session-z,--zookeeperNamespace <arg>    Namespace to create the Zookeepersub-paths for high availability modeOptions for default mode:-D <property=value>             Allows specifying multiple genericconfiguration options. The availableoptions can be found athttps://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html-m,--jobmanager <arg>           Address of the JobManager to which toconnect. Use this flag to connect to adifferent JobManager than the one specifiedin the configuration. Attention: Thisoption is respected only if thehigh-availability configuration is NONE.-z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-pathsfor high availability mode

