一 .环境说明

在前面几期的课程里面讲过了 Flink 开发环境的搭建和应用的部署以及运行,今天的课程主要是讲 Flink 的客户端操作。

本次讲解以实际操作为主。这次课程是基于社区的 Flink 1.12.1 版本,操作系统是 Mac 系统,浏览器是 Google Chrome 浏览器。

二 .概要

Flink 提供了丰富的客户端操作来提交任务和与任务进行交互,包括 Flink 命令行,Scala Shell,SQL Client,Restful API 和 Web。Flink 首先提供的最重要的是命令行,其次是 SQL Client 用于提交 SQL 任务的运行,还有就是 Scala Shell 提交 Table API 的任务。同时,Flink 也提供了Restful 服务,用户可以通过 http 方式进行调用。此外,还有 Web 的方式可以提交任务。

在 Flink 安装目录的 bin 目录下面可以看到有 flink, start-scala-shell.sh 和 sql-client.sh 等文件,这些都是客户端操作的入口。

三 .Flink 命令行

Flink 的命令行参数很多,输入 flink – h 能看到完整的说明:

./bin/flink -h

如果想看某一个命令的参数,比如 run 命令,输入:

flink-1.7.2 bin/flink run -h

官方文档

[root@xxx flink-1.12.1]# ./bin/flink -h
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
./flink <ACTION> [OPTIONS] [ARGUMENTS]The following actions are available:Action "run" compiles and runs a program.Syntax: run [OPTIONS] <jar-file> <arguments>"run" action options:-c,--class <classname>               Class with the program entry point("main()" method). Only needed if theJAR file does not specify the class inits manifest.-C,--classpath <url>                 Adds a URL to each user code  classloader  on all nodes in the cluster. The paths must specify a  protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple times for specifying more than one URL. The protocol must be supported by the {@link java.net.URLClassLoader}.-d,--detached                        If present, runs the job in detached mode -n,--allowNonRestoredState           Allow to skip savepoint state that cannot be restored. You need to allow this if you removed an operator from your program that was part of the program when the savepoint was triggered.-p,--parallelism <parallelism>       The parallelism with which to run the program. Optional flag to override the default value specified in the configuration.-s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).-sae,--shutdownOnAttachedExit        If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated  abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.Options for Generic CLI mode:-D <property=value>   Allows specifying multiple generic configuration  options. The available options can be found at  https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html-e,--executor <arg>   DEPRECATED: Please use the -t option instead which is  also available with the "Application Mode".The name of the executor to be used for executing the  given job, which is equivalent to the "execution.target" config option. The currently  available executors are: "remote", "local",  "kubernetes-session", "yarn-per-job", "yarn-session".-t,--target <arg>     The deployment target for the given application,  which is equivalent to the "execution.target" config option. For the "run" action the currently available targets are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". For the "run-application" action the currently available  targets are: "kubernetes-application",  "yarn-application".Options for yarn-cluster mode:-d,--detached                        If present, runs the job in detached  mode-m,--jobmanager <arg>                Set to yarn-cluster to use YARN  execution mode.-yat,--yarnapplicationType <arg>     Set a custom application type for the  application on YARN-yD <property=value>                 use value for given property-yd,--yarndetached                   If present, runs the job in detached  mode (deprecated; use non-YARN specific option instead)-yh,--yarnhelp                       Help for the Yarn session CLI.-yid,--yarnapplicationId <arg>       Attach to running YARN session-yj,--yarnjar <arg>                  Path to Flink jar file-yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)-ynl,--yarnnodeLabel <arg>           Specify YARN node label for the YARN  application-ynm,--yarnname <arg>                Set a custom name for the application  on YARN-yq,--yarnquery                      Display available YARN resources  (memory, cores)-yqu,--yarnqueue <arg>               Specify YARN queue.-ys,--yarnslots <arg>                Number of slots per TaskManager-yt,--yarnship <arg>                 Ship files in the specified directory (t for transfer)-ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)-yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode-z,--zookeeperNamespace <arg>        Namespace to create the Zookeeper sub-paths for high availability modeOptions for default mode:-D <property=value>             Allows specifying multiple generic configuration options. The available options can be found at  https://ci.apache.org/projects/flink/flink-  docs-stable/ops/config.html-m,--jobmanager <arg>           Address of the JobManager to which to connect.Use this flag to connect to a different JobManager than the one specified in the configuration. Attention: This option is respected only if the  high-availability configuration is NONE.-z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths  for high availability modeAction "run-application" runs an application in Application Mode.Syntax: run-application [OPTIONS] <jar-file> <arguments>Options for Generic CLI mode:-D <property=value>   Allows specifying multiple generic configuration options. The available options can be found at  https://ci.apache.org/projects/flink/flink-docs-stabl e/ops/config.html-t,--target <arg>     The deployment target for the given application,  which is equivalent to the "execution.target" config option. For the "run" action the currently available targets are: "remote", "local", "kubernetes-session",  "yarn-per-job", "yarn-session". For the   "run-application" action the currently available   targets are: "kubernetes-application",  "yarn-application".Action "info" shows the optimized execution plan of the program (JSON).Syntax: info [OPTIONS] <jar-file> <arguments>"info" action options:-c,--class <classname>           Class with the program entry point ("main()" method). Only needed if the JAR file does not specify the class in its manifest.-p,--parallelism <parallelism>   The parallelism with which to run the program. Optional flag to override the default value specified in the  configuration.Action "list" lists running and scheduled programs.Syntax: list [OPTIONS]"list" action options:-a,--all         Show all programs and their JobIDs-r,--running     Show only running programs and their JobIDs-s,--scheduled   Show only scheduled programs and their JobIDsOptions for Generic CLI mode:-D <property=value>   Allows specifying multiple generic configuration options.The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html-t,--target <arg>     The deployment target for the given application, which is equivalent to the "execution.target" config option. For the "run" action the currently available  targets are: "remote", "local", "kubernetes-session",  "yarn-per-job", "yarn-session". For the  "run-application" action the currently available   targets are: "kubernetes-application", "yarn-application".Options for yarn-cluster mode:-m,--jobmanager <arg>            Set to yarn-cluster to use YARN execution  mode.-yid,--yarnapplicationId <arg>   Attach to running YARN session-z,--zookeeperNamespace <arg>    Namespace to create the Zookeeper    sub-paths for high availability modeOptions for default mode:-D <property=value>             Allows specifying multiple generic  configuration options. The available  options can be found at  https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html-m,--jobmanager <arg>           Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specifiedin the configuration. Attention: This  option is respected only if the high-availability configuration is NONE.-z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability modeAction "stop" stops a running program with a savepoint (streaming jobs only).Syntax: stop [OPTIONS] <Job ID>"stop" action options:-d,--drain                           Send MAX_WATERMARK before taking the savepoint and stopping the pipelne.-p,--savepointPath <savepointPath>   Path to the savepoint (for example hdfs:///flink/savepoint-1537).If no  directory is specified, the configured  default will be used ("state.savepoints.dir").Options for Generic CLI mode:-D <property=value>   Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html-t,--target <arg>     The deployment target for the given application, which is equivalent to the "execution.target" config option. For the "run" action the currently available targets are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". For the "run-application" action the currently available targets are: "kubernetes-application", "yarn-application".Options for yarn-cluster mode:-m,--jobmanager <arg>            Set to yarn-cluster to use YARN execution mode.-yid,--yarnapplicationId <arg>   Attach to running YARN session-z,--zookeeperNamespace <arg>    Namespace to create the Zookeeper sub-paths for high availability modeOptions for default mode:-D <property=value>             Allows specifying multiple generic  configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink- docs-stable/ops/config.html-m,--jobmanager <arg>           Address of the JobManager to which to connect.Use this flag to connect to a different JobManager than the one specified in the configuration. Attention: This  option is respected only if the  high-availability configuration is NONE.-z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths  for high availability modeAction "cancel" cancels a running program.Syntax: cancel [OPTIONS] <Job ID>"cancel" action options:-s,--withSavepoint <targetDirectory>   **DEPRECATION WARNING**: Cancelling a job with savepoint is deprecated.Use "stop" instead.Trigger savepoint and cancel job. The target directory is optional. If  no directory is specified, the  configured default directory    (state.savepoints.dir) is used.Options for Generic CLI mode:-D <property=value>   Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html-t,--target <arg>     The deployment target for the given application, which is equivalent to the "execution.target" config  option. For the "run" action the currently available  targets are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". For the  "run-application" action the currently available  targets are: "kubernetes-application",  "yarn-application".Options for yarn-cluster mode:-m,--jobmanager <arg>            Set to yarn-cluster to use YARN execution mode.-yid,--yarnapplicationId <arg>   Attach to running YARN session-z,--zookeeperNamespace <arg>    Namespace to create the Zookeeper sub-paths for high availability modeOptions for default mode:-D <property=value>             Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html-m,--jobmanager <arg>           Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specifiedin the configuration. Attention: This  option is respected only if the  high-availability configuration is NONE.-z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability modeAction "savepoint" triggers savepoints for a running job or disposes existing ones.Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]"savepoint" action options:-d,--dispose <arg>       Path of savepoint to dispose.-j,--jarfile <jarfile>   Flink program JAR file.Options for Generic CLI mode:-D <property=value>   Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html-t,--target <arg>     The deployment target for the given application,which is equivalent to the "execution.target" config option. For the "run" action the currently available targets are: "remote", "local", "kubernetes-session",  "yarn-per-job", "yarn-session". For the  "run-application" action the currently available  targets are: "kubernetes-application", "yarn-application".Options for yarn-cluster mode:-m,--jobmanager <arg>            Set to yarn-cluster to use YARN execution mode.-yid,--yarnapplicationId <arg>   Attach to running YARN session-z,--zookeeperNamespace <arg>    Namespace to create the Zookeeper sub-paths for high availability modeOptions for default mode:-D <property=value>             Allows specifying multiple generic configuration options. The available options can be found at  https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html-m,--jobmanager <arg>           Address of the JobManager to which to connect. Use this flag to connect to a   different JobManager than the one specified in the configuration. Attention: This option is respected only if the high-availability configuration is NONE.-z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode

3.1. Standalone

  • 首先启动一个 Standalone 的集群:

BoYi-Pro:flink-1.12.1 sysadmin$ bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host BoYi-Pro.lan.
Starting taskexecutor daemon on host BoYi-Pro.lan.
  • 打开 http://127.0.0.1:8081 能看到 Web 界面。

3.1.1. Run

运行任务,以 Flink 自带的例子 TopSpeedWindowing 为例:

BoYi-Pro:flink-1.12.1 sysadmin$ bin/flink run -d examples/streaming/TopSpeedWindowing.jar
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 02fa782293dc0c01baf9b4902236b4dd
  • 运行起来后默认是 1 个并发:

  • 点左侧「Task Manager」,然后点「Stdout」能看到输出日志:

    或者查看本地 Log 目录下的 *.out 文件

tail -f flink-sysadmin-taskexecutor-0-BoYi-Pro.lan.out

3.1.2. List

查看任务列表:

BoYi-Pro:log sysadmin$ flink list -m 127.0.0.1:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
16.02.2021 01:09:08 : 02fa782293dc0c01baf9b4902236b4dd : CarTopSpeedWindowingExample (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
BoYi-Pro:log sysadmin$

3.1.3.Stop

停止任务。通过 -m 来指定要停止的 JobManager 的主机地址和端口。

  • 如果想用stop必须实现StoppableFunction接口,否则报错.
BoYi-Pro:log sysadmin$ flink stop -m 127.0.0.1:8081  02fa782293dc0c01baf9b4902236b4dd
Suspending job "02fa782293dc0c01baf9b4902236b4dd" with a savepoint.------------------------------------------------------------The program finished with the following exception:org.apache.flink.util.FlinkException: Could not stop with a savepoint job "02fa782293dc0c01baf9b4902236b4dd".at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:585)at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1006)at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:573)at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1073)at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: Config key [state.savepoints.dir] is not set. Property [targetDirectory] must be provided.

从日志里面能看出 Stop 命令执行失败了。一个 Job 能够被 Stop 要求所有的 Source 都是可以 Stoppable 的,即实现了 StoppableFunction 接口。

/*** 需要能 stoppable 的函数必须实现这个接口,例如流式任务的 source。* stop() 方法在任务收到 STOP 信号的时候调用。* source 在接收到这个信号后,必须停止发送新的数据且优雅的停止。*/
@PublicEvolving
public interface StoppableFunction {/*** 停止 source。与 cancel() 不同的是,这是一个让 source 优雅停止的请求。* 等待中的数据可以继续发送出去,不需要立即停止。*/void stop();
}

3.1.4.Cancel

取消任务。如果在 conf/flink-conf.yaml 里面配置了 state.savepoints.dir,会保存 Savepoint,否则不会保存 Savepoint。

BoYi-Pro:log sysadmin$ flink cancel -m 127.0.0.1:8081 02fa782293dc0c01baf9b4902236b4dd
Cancelling job 02fa782293dc0c01baf9b4902236b4dd.
Cancelled job 02fa782293dc0c01baf9b4902236b4dd.

也可以在停止的时候显示指定 Savepoint 目录。

BoYi-Pro:flink-1.12.1 sysadmin$ flink cancel -m 127.0.0.1:8081 -s /tmp/savepoint e145eea8f0ff0a696af5fefa0a259846
DEPRECATION WARNING: Cancelling a job with savepoint is deprecated. Use "stop" instead.
Cancelling job e145eea8f0ff0a696af5fefa0a259846 with savepoint to /tmp/savepoint.
Cancelled job e145eea8f0ff0a696af5fefa0a259846. Savepoint stored in file:/tmp/savepoint/savepoint-e145ee-061f7da6c990.
BoYi-Pro:flink-1.12.1 sysadmin$ ls /tmp/savepoint
savepoint-e145ee-061f7da6c990
  • 取消和停止(流作业)的区别如下:

cancel() 调用,立即调用作业算子的 cancel() 方法,以尽快取消它们。如果算子在接到 cancel() 调用后没有停止,Flink 将开始定期中断算子线程的执行,直到所有算子停止为止。

stop() 调用,是更优雅的停止正在运行流作业的方式。stop() 仅适用于 Source 实现了 StoppableFunction 接口的作业。当用户请求停止作业时,作业的所有 Source 都将接收 stop() 方法调用。直到所有 Source 正常关闭时,作业才会正常结束。这种方式,使作业正常处理完所有作业。

3.1.5.Savepoint

触发 Savepoint。

BoYi-Pro:flink-1.12.1 sysadmin$ bin/flink savepoint -m 127.0.0.1:8081 e6665485734cf8e91d0a924f07eeb302 /tmp/savepoint
Triggering savepoint for job e6665485734cf8e91d0a924f07eeb302.
Waiting for response...
Savepoint completed. Path: file:/tmp/savepoint/savepoint-e66654-85b071592103
You can resume your program from this savepoint with the run command.
BoYi-Pro:flink-1.12.1 sysadmin$ ls -l /tmp/savepoint
total 0
drwxr-xr-x  3 sysadmin  wheel  96  2 16 02:09 savepoint-e66654-85b071592103
BoYi-Pro:flink-1.12.1 sysadmin$

说明:Savepoint 和 Checkpoint 的区别(详见文档):

  • Checkpoint 是增量做的,每次的时间较短,数据量较小,只要在程序里面启用后会自动触发,用户无须感知;Checkpoint 是作业 failover 的时候自动使用,不需要用户指定。

  • Savepoint 是全量做的,每次的时间较长,数据量较大,需要用户主动去触发。Savepoint 一般用于程序的版本更新(详见文档),Bug 修复,A/B Test 等场景,需要用户指定。

  • 通过 -s 参数从指定的 Savepoint 启动:

BoYi-Pro:flink-1.12.1 sysadmin$ flink run -d -s /tmp/savepoint/savepoint-e66654-ca80b929a31a ./examples/streaming/TopSpeedWindowing.jar
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 45c617bcd9e20f99ccc80dc81f94370b

查看 JobManager 的日志,能够看到类似这样的 Log:

3.1.6.Modify

修改任务并行度。 截止到当前1.12.1版本尚不支持modify命令, 可以通过配置文件修改默认并行度.
如果想修改并行度,可以先停止使用cancel + savepoint 停止任务, 然后在根据savepoint重新启动任务的时候设置并行度.

conf/flink-conf.yaml 将 Task Slot 数从默认的 1 改为 4,并配置 Savepoint 目录.

taskmanager.numberOfTaskSlots: 4
state.savepoints.dir: file:///tmp/savepoint

启动命令: bin/start-cluster.sh
停止命令: bin/stop-cluster.sh

修改参数后需要重启集群生效,然后再启动任务:

bin/stop-cluster.sh && bin/start-cluster.sh
bin/flink run -d examples/streaming/TopSpeedWindowing.jar

从页面上能看到 Task Slot 变为了 4,这时候任务的默认并发度是 1。


3.1.7.Info

Info 命令是用来查看 Flink 任务的执行计划(StreamGraph)的。

BoYi-Pro:flink-1.12.1 sysadmin$ flink info examples/streaming/TopSpeedWindowing.jar
----------------------- Execution Plan -----------------------
{"nodes" : [ {"id" : 1,"type" : "Source: Custom Source","pact" : "Data Source","contents" : "Source: Custom Source","parallelism" : 1}, {"id" : 2,"type" : "Timestamps/Watermarks","pact" : "Operator","contents" : "Timestamps/Watermarks","parallelism" : 1,"predecessors" : [ {"id" : 1,"ship_strategy" : "FORWARD","side" : "second"} ]}, {"id" : 4,"type" : "Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","pact" : "Operator","contents" : "Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","parallelism" : 1,"predecessors" : [ {"id" : 2,"ship_strategy" : "HASH","side" : "second"} ]}, {"id" : 5,"type" : "Sink: Print to Std. Out","pact" : "Data Sink","contents" : "Sink: Print to Std. Out","parallelism" : 1,"predecessors" : [ {"id" : 4,"ship_strategy" : "FORWARD","side" : "second"} ]} ]
}
--------------------------------------------------------------No description provided.

拷贝输出的 Json 内容,粘贴到这个网站:http://flink.apache.org/visualizer/


点击Draw

可以和实际运行的物理执行计划对比:

3.2. Yarn per-job

3.2.1. 单任务 Attach 模式

默认是 Attach 模式,即客户端会一直等待直到程序结束才会退出。

  • 通过 -m yarn-cluster 指定 Yarn 模式

  • Yarn 上显示名字为 Flink session cluster,这个 Batch 的 Wordcount 任务运行完会 FINISHED。

  • 客户端能看到结果输出

[root@master01 flink-1.12.1]#  ./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2021-02-16 15:54:42,222 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink-1.12.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-02-16 15:54:42,397 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://master01:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 15:54:42,653 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at master01/192.168.0.23:8050
2021-02-16 15:54:42,782 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at henghe-030/192.168.101.30:10200
2021-02-16 15:54:42,801 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-02-16 15:54:42,960 INFO  org.apache.hadoop.conf.Configuration                         [] - found resource resource-types.xml at file:/etc/hadoop/3.1.5.0-152/0/resource-types.xml
2021-02-16 15:54:43,029 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured JobManager memory is 1600 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 448 MB may not be used by Flink.
2021-02-16 15:54:43,030 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured TaskManager memory is 1728 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 320 MB may not be used by Flink.
2021-02-16 15:54:43,030 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cluster specification: ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=1728, slotsPerTaskManager=1}
2021-02-16 15:54:43,467 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory      [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2021-02-16 15:54:48,109 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://master01:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 15:54:48,152 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1613318015145_0002
2021-02-16 15:54:48,396 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1613318015145_0002
2021-02-16 15:54:48,397 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2021-02-16 15:54:48,401 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
2021-02-16 15:54:53,460 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2021-02-16 15:54:53,461 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface henghe-030:33895 of application 'application_1613318015145_0002'.
Job has been submitted with JobID c0fb5c95eff1a1d6447a2b5e975b6d5f
Program execution finished
Job with JobID c0fb5c95eff1a1d6447a2b5e975b6d5f has finished.
Job Runtime: 13486 ms
Accumulator Results:
- 9ddb483b217c1caa5e24b9eccbe384a5 (java.util.ArrayList) [170 elements](a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)

如果我们以 attach 模式运行 streaming 的任务,客户端会一直等待不退出,可以运行以下的
例子试验下:

./bin/flink run -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar

3.2.2. 单任务detached

● 由于是 detached 模式,客户端提交完任务就退出了
● Yarn 上显示为 Flink per-job cluster


[root@master01 flink-1.12.1]# ./bin/flink run -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2021-02-16 17:00:33,224 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink-1.12.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-02-16 17:00:33,410 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://master01:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 17:00:33,670 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at master01/192.168.0.23:8050
2021-02-16 17:00:33,806 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at henghe-030/192.168.101.30:10200
2021-02-16 17:00:33,819 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-02-16 17:00:33,974 INFO  org.apache.hadoop.conf.Configuration                         [] - found resource resource-types.xml at file:/etc/hadoop/3.1.5.0-152/0/resource-types.xml
2021-02-16 17:00:34,039 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured JobManager memory is 1600 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 448 MB may not be used by Flink.
2021-02-16 17:00:34,040 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured TaskManager memory is 1728 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 320 MB may not be used by Flink.
2021-02-16 17:00:34,040 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cluster specification: ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=1728, slotsPerTaskManager=1}
2021-02-16 17:00:34,464 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory      [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2021-02-16 17:00:39,197 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://master01:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 17:00:39,242 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1613318015145_0003
2021-02-16 17:00:39,483 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1613318015145_0003
2021-02-16 17:00:39,484 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2021-02-16 17:00:39,487 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
2021-02-16 17:00:46,077 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2021-02-16 17:00:46,079 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface henghe-030:42671 of application 'application_1613318015145_0003'.
Job has been submitted with JobID 66d59ef65f870013dd8f6f195efa6258[root@master01 flink-1.12.1]# ./bin/flink run -yd -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2021-02-16 17:04:23,375 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink-1.12.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-02-16 17:04:23,551 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://master01:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 17:04:23,801 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at master01/192.168.0.23:8050
2021-02-16 17:04:23,927 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at henghe-030/192.168.101.30:10200
2021-02-16 17:04:23,938 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-02-16 17:04:24,096 INFO  org.apache.hadoop.conf.Configuration                         [] - found resource resource-types.xml at file:/etc/hadoop/3.1.5.0-152/0/resource-types.xml
2021-02-16 17:04:24,159 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured JobManager memory is 1600 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 448 MB may not be used by Flink.
2021-02-16 17:04:24,159 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured TaskManager memory is 1728 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 320 MB may not be used by Flink.
2021-02-16 17:04:24,159 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cluster specification: ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=1728, slotsPerTaskManager=1}
2021-02-16 17:04:24,579 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory      [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2021-02-16 17:04:29,160 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://master01:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 17:04:29,202 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1613318015145_0004
2021-02-16 17:04:29,443 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1613318015145_0004
2021-02-16 17:04:29,443 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2021-02-16 17:04:29,447 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
2021-02-16 17:04:36,029 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2021-02-16 17:04:36,031 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The Flink YARN session cluster has been started in detached mode. In order to stop Flink gracefully, use the following command:
$ echo "stop" | ./bin/yarn-session.sh -id application_1613318015145_0004
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1613318015145_0004
Note that killing Flink might not clean up all job artifacts and temporary files.
2021-02-16 17:04:36,032 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface henghe-030:39693 of application 'application_1613318015145_0004'.
Job has been submitted with JobID 81b5ad3509dea923e686a0a4a962f3b1

3.3. yarn session

./bin/yarn-session.sh -tm 2048 -s 3

表示启动一个 yarn session 集群,每个TM的内存是2G,每个TM有3个slot。

[root@master01 flink-1.12.1]#
[root@master01 flink-1.12.1]# ./bin/yarn-session.sh -tm 2048 -s 3
2021-02-16 17:11:16,097 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, localhost
2021-02-16 17:11:16,102 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123
2021-02-16 17:11:16,102 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.memory.process.size, 1600m
2021-02-16 17:11:16,102 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1728m
2021-02-16 17:11:16,102 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2021-02-16 17:11:16,102 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: parallelism.default, 1
2021-02-16 17:11:16,103 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.execution.failover-strategy, region
2021-02-16 17:11:16,340 WARN  org.apache.hadoop.util.NativeCodeLoader                      [] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2021-02-16 17:11:16,401 INFO  org.apache.flink.runtime.security.modules.HadoopModule       [] - Hadoop user set to hdfs (auth:SIMPLE)
2021-02-16 17:11:16,414 INFO  org.apache.flink.runtime.security.modules.JaasModule         [] - Jaas file will be created as /tmp/jaas-6503414141525855669.conf.
2021-02-16 17:11:16,455 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink-1.12.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-02-16 17:11:16,682 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://master01:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 17:11:17,018 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at master01/192.168.0.23:8050
2021-02-16 17:11:17,174 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at henghe-030/192.168.101.30:10200
2021-02-16 17:11:17,216 INFO  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2021-02-16 17:11:17,371 INFO  org.apache.hadoop.conf.Configuration                         [] - found resource resource-types.xml at file:/etc/hadoop/3.1.5.0-152/0/resource-types.xml
2021-02-16 17:11:17,435 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured JobManager memory is 1600 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 448 MB may not be used by Flink.
2021-02-16 17:11:17,436 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cluster specification: ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=2048, slotsPerTaskManager=3}
2021-02-16 17:11:17,886 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory      [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2021-02-16 17:11:22,799 INFO  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2021-02-16 17:11:22,818 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://master01:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 17:11:22,858 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1613318015145_0006
2021-02-16 17:11:23,099 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1613318015145_0006
2021-02-16 17:11:23,099 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2021-02-16 17:11:23,102 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
2021-02-16 17:11:28,156 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2021-02-16 17:11:28,158 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface salve02:32875 of application 'application_1613318015145_0006'.
JobManager Web Interface: http://salve02:32875
  • 客户端默认是attach模式,不会退出

  • Yarn上显示为 Flink session cluster


  • /tmp 下⽣生成了了⼀一个⽂文件
[root@master01 tmp]# cat /tmp/.yarn-properties-root
#Generated YARN properties file
#Tue Feb 16 17:11:28 CST 2021
dynamicPropertiesString=
applicationID=application_1613318015145_0006
  • 提交任务

./bin/flink run ./examples/batch/WordCount.jar

将会根据 /tmp/.yarn-properties-admin 文件内容提交到了刚启动的 session

[root@henghe-023 flink-1.12.1]# ./bin/flink run ./examples/batch/WordCount.jar
2021-02-16 17:22:20,526 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2021-02-16 17:22:20,526 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2021-02-16 17:22:21,029 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink-1.12.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-02-16 17:22:21,214 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://henghe-023:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 17:22:21,470 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at henghe-023/192.168.100.23:8050
2021-02-16 17:22:21,594 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at henghe-030/192.168.101.30:10200
2021-02-16 17:22:21,606 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-02-16 17:22:21,703 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface henghe-031:32875 of application 'application_1613318015145_0006'.
Job has been submitted with JobID b0a19e30545d1a41d80b7727a98cb217
Program execution finished
Job with JobID b0a19e30545d1a41d80b7727a98cb217 has finished.
Job Runtime: 6485 ms
Accumulator Results:
- e840feeb46f0be541988fc925e9f41a9 (java.util.ArrayList) [170 elements](a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)

3.4. 提交到指定的 session

通过 -yid 参数来提交到指定的session。

./bin/flink run -d -p 30 -m yarn-cluster -yid application_1613318015145_0006 ./examples/streaming/TopSpeedWindowing.jar

[root@master flink-1.12.1]# ./bin/flink run -d -p 30 -m yarn-cluster -yid application_1613318015145_0006 ./examples/streaming/TopSpeedWindowing.jar2021-02-16 17:25:12,364 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2021-02-16 17:25:12,364 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2021-02-16 17:25:12,847 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink-1.12.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-02-16 17:25:13,025 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://henghe-023:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 17:25:13,267 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at henghe-023/192.168.100.23:8050
2021-02-16 17:25:13,389 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at henghe-030/192.168.101.30:10200
2021-02-16 17:25:13,401 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-02-16 17:25:13,499 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface henghe-031:32875 of application 'application_1613318015145_0006'.
Job has been submitted with JobID 2a02b50e550b05259344c417a71dc641

3.5. SQL Client

3.5.1.基本用法

  • 启动

./bin/sql-client.sh embedded

  • 帮助

help;

  • 执行

SELECT ‘Hello World’;

[root@henghe-023 flink-1.12.1]# ./bin/sql-client.sh embedded
2021-02-16 18:32:59,319 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2021-02-16 18:32:59,319 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
No default environment specified.
Searching for '/opt/flink-1.12.1/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/opt/flink-1.12.1/conf/sql-client-defaults.yaml
No session environment specified.Command history file path: /root/.flink-sql-history▒▓██▓██▒▓████▒▒█▓▒▓███▓▒▓███▓░░        ▒▒▒▓██▒  ▒░██▒   ▒▒▓▓█▓▓▒░      ▒██████▒         ░▒▓███▒    ▒█▒█▒░▓█            ███   ▓░▒██▓█       ▒▒▒▒▒▓██▓░▒░▓▓██░ █   ▒▒░       ███▓▓█ ▒█▒▒▒████░   ▒▓█▓      ██▒▒▒ ▓███▒░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓███▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓█▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒▓█   ▒█▓   ░     █░                ▒█              █▓█▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░█▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒███   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░______ _ _       _       _____  ____  _         _____ _ _            _  BETA|  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | || |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_|  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|| |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_|_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.Flink SQL> help;
The following commands are available:CLEAR      Clears the current terminal.
CREATE TABLE        Create table under current catalog and database.
DROP TABLE      Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'
CREATE VIEW     Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'
DESCRIBE        Describes the schema of a table with the given name.
DROP VIEW       Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'
EXPLAIN     Describes the execution plan of a query or table with the given name.
HELP        Prints the available commands.SQL Query Result (Table)Refresh: 1 s                                            Page: Last of 1                                        Updated: UnknownEXPR$0

3.5.2. Select 查询

Flink SQL> SELECT ‘Hello World’;


按 ”Q” 退出这个界面
打开 http://127.0.0.1:8081 能看到这条 Select 语句产生的查询任务已经结束了。这个查询采用的是读取固定数据集的 Custom Source,输出用的是 Stream Collect Sink,且只输出一条结果。

注意:如果本机的临时目录存在类似 .yarn-properties-baoniu 的文件,任务会提交到 Yarn 上。

3.5.3. Explain

Explain 命令可以查看 SQL 的执行计划。

Flink SQL> explain SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;== Abstract Syntax Tree ==    // 抽象语法树
LogicalAggregate(group=[{0}], cnt=[COUNT()])
+- LogicalValues(tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])== Optimized Logical Plan ==     // 优化后的逻辑执行计划
GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])
+- Exchange(distribution=[hash[name]])+- Values(type=[RecordType(VARCHAR(5) name)], tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])== Physical Execution Plan ==   // 物理执行计划
Stage 13 : Data Sourcecontent : Source: Values(tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])Stage 15 : Operatorcontent : GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])ship_strategy : HASHFlink SQL>

3.5.3.结果展示

SQL Client 支持两种模式来维护并展示查询结果:

  • table mode: 在内存中物化查询结果,并以分页 table 形式展示。用户可以通过以下命令启用 table mode;

SET execution.result-mode=table

  • changlog mode: 不会物化查询结果,而是直接对 continuous query 产生的添加和撤回(retractions)结果进行展示。

SET execution.result-mode=changelog

接下来通过实际的例子进行演示。

  • Table mode
Flink SQL> SET execution.result-mode=table;
[INFO] Session property has been set.Flink SQL> SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
2021-02-16 18:41:37,976 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink-1.12.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-02-16 18:41:37,986 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://henghe-023:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 18:41:38,033 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at henghe-023/192.168.100.23:8050
2021-02-16 18:41:38,034 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at henghe-030/192.168.101.30:10200
2021-02-16 18:41:38,034 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-02-16 18:41:38,040 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface henghe-031:32875 of application 'application_1613318015145_0006'.
[INFO] Result retrieval cancelled.


  • Changlog mode
Flink SQL> SET execution.result-mode=changelog;
[INFO] Session property has been set.Flink SQL> SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
2021-02-16 18:43:50,969 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink-1.12.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-02-16 18:43:50,981 INFO  org.apache.hadoop.yarn.client.api.impl.TimelineReaderClientImpl [] - Initialized TimelineReader URI=http://henghe-023:8198/ws/v2/timeline/, clusterId=yarn_cluster
2021-02-16 18:43:51,037 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at henghe-023/192.168.100.23:8050
2021-02-16 18:43:51,038 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at henghe-030/192.168.101.30:10200
2021-02-16 18:43:51,039 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-02-16 18:43:51,046 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface henghe-031:32875 of application 'application_1613318015145_0006'.
[INFO] Result retrieval cancelled.


其中 ‘-’ 代表的就是撤回消息。


3.5.4 其他指令

Flink SQL> help
> ;
The following commands are available:CLEAR      Clears the current terminal.
CREATE TABLE        Create table under current catalog and database.
DROP TABLE      Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'
CREATE VIEW     Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'
DESCRIBE        Describes the schema of a table with the given name.
DROP VIEW       Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'
EXPLAIN     Describes the execution plan of a query or table with the given name.
HELP        Prints the available commands.
INSERT INTO     Inserts the results of a SQL SELECT query into a declared table sink.
INSERT OVERWRITE        Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.
QUIT        Quits the SQL CLI client.
RESET       Resets all session configuration properties.
SELECT      Executes a SQL SELECT query on the Flink cluster.
SET     Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties.
SHOW FUNCTIONS      Shows all user-defined and built-in functions.
SHOW TABLES     Shows all registered tables.
SOURCE      Reads a SQL SELECT query from a file and executes it on the Flink cluster.
USE CATALOG     Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'
USE     Sets the current default database. Experimental! Syntax: 'USE <name>;'Hint: Make sure that a statement ends with ';' for finalizing (multi-line) statements.

3.6. Restful API

如何通过 Rest API 来提交 Jar 包和执行任务。
更详细的操作请参考 Flink 的 Restful API 文档:https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/rest_api.html

➜  flink-1.7.2 curl http://127.0.0.1:8081/overview
{"taskmanagers":1,"slots-total":4,"slots-available":0,"jobs-running":3,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0,"flink-version":"1.7.2","flink-commit":"ceba8af"}%➜  flink-1.7.2 curl -X POST -H "Expect:" -F "jarfile=@/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/examples/streaming/TopSpeedWindowing.jar" http://127.0.0.1:8081/jars/upload
{"filename":"/var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/flink-web-124c4895-cf08-4eec-8e15-8263d347efc2/flink-web-upload/6077eca7-6db0-4570-a4d0-4c3e05a5dc59_TopSpeedWindowing.jar","status":"success"}%       ➜  flink-1.7.2 curl http://127.0.0.1:8081/jars
{"address":"http://localhost:8081","files":[{"id":"6077eca7-6db0-4570-a4d0-4c3e05a5dc59_TopSpeedWindowing.jar","name":"TopSpeedWindowing.jar","uploaded":1553743438000,"entry":[{"name":"org.apache.flink.streaming.examples.windowing.TopSpeedWindowing","description":null}]}]}%➜  flink-1.7.2 curl http://127.0.0.1:8081/jars/6077eca7-6db0-4570-a4d0-4c3e05a5dc59_TopSpeedWindowing.jar/plan
{"plan":{"jid":"41029eb3feb9132619e454ec9b2a89fb","name":"CarTopSpeedWindowingExample","nodes":[{"id":"90bea66de1c231edf33913ecd54406c1","parallelism":1,"operator":"","operator_strategy":"","description":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction) -> Sink: Print to Std. Out","inputs":[{"num":0,"id":"cbc357ccb763df2852fee8c4fc7d55f2","ship_strategy":"HASH","exchange":"pipelined_bounded"}],"optimizer_properties":{}},{"id":"cbc357ccb763df2852fee8c4fc7d55f2","parallelism":1,"operator":"","operator_strategy":"","description":"Source: Custom Source -> Timestamps/Watermarks","optimizer_properties":{}}]}}%                                                                                                                                                    ➜  flink-1.7.2 curl -X POST http://127.0.0.1:8081/jars/6077eca7-6db0-4570-a4d0-4c3e05a5dc59_TopSpeedWindowing.jar/run
{"jobid":"04d80a24b076523d3dc5fbaa0ad5e1ad"}%

四 .Web

在 Flink Dashboard 页面左侧可以看到有个「Submit new Job」的地方,用户可以上传 Jar 包和显示执行计划和提交任务。
Web 提交功能主要用于新手入门和演示用。

官方原文 :
https://ververica.cn/developers/apache-flink-zero-basic-introduction-iv-client-operation/

[官方Flink入门笔记 ] 五、客户端操作相关推荐

  1. [官方Flink入门笔记 ] 三、开发环境搭建和应用的配置、部署及运行

    一.Flink 开发环境部署和配置 Flink 是一个以 Java 及 Scala 作为开发语言的开源大数据项目,代码开源在 GitHub 上,并使用 Maven 来编译和构建项目.对于大部分使用 F ...

  2. flink学习笔记(五)

    文章目录 八.TableAPI和FlinkSQL 简介 simple example 创建TableEnviroment 表(Table) 从文件读取数据 从kafka读取数据 表的查询(Table ...

  3. Java基础入门笔记-Eclipse快捷操作

    Eclipse快捷操作 文件的拷贝:Eclipse与文件管理器之间可以互相拷贝. 1.文件拷贝到Eclipse. 2.从Eclipse拷贝文件. 同步:如果手工修改了文件,则要在Eclipse里刷新一 ...

  4. MySQL入门笔记(五):另存数据为文件(导出数据)

    在select查询数据之后,要把结果数据保存到文件里,则需要另存数据为文件的操作,即导出数据 mysql> select user,host into outfile 'out.csv' fie ...

  5. 区块链安全入门笔记(五) | 慢雾科普

    异形攻击--Alien Attack 异形攻击(Alien Attack)实际上是一个所有公链都可能面临的问题,又称地址池污染,是指诱使同类链的节点互相侵入和污染的一种攻击手法,漏洞的主要原因是同类链 ...

  6. tensorflow没有这个参数_TensorFlow入门笔记(五) : 神经网络参数与TensorFlow变量

    神经网络参数简介 在TensorFlow中,变量(tf.Variable)的作用就是保存和更新神经网络中的参数.和其他编程语言类似,在TensorFlow中的变量也需要初始值.因为在TensorFlo ...

  7. 激光SLAM入门笔记(五):前端配准 II

    前端配准 II 1.爬山法(拟梯度法) 1.1 示意图 1.2 基本思想 1.3 得分函数定义 1.4 算法流程 1.5 伪代码 2.高斯牛顿优化方法 2.1 示意图 2.2 数学描述 2.3 优化方 ...

  8. opencv 入门笔记五 padding(图像加边框)

    1.传统艺能了,上代码: cv.copyMakeBorder(src, top, bottom, left, right, borderType, dst=None, value=None)     ...

  9. 五、Flink入门--客户端操作

    客户端操作 1.客户端操作总体概览 2. Flink命令行模式 2.1 stand-alone模式 2.3 yarn模式 2.3.1 单任务模式 2.3.2 yarn-session模式 3. sca ...

最新文章

  1. Installation error: INSTALL_FAILED_INSUFFICIENT_STORAGE
  2. C盘满了怎么办?如何清理
  3. 汇编: 使用[bx]代替[0]获取内存数据
  4. spring in action小结4.1
  5. linux设定时间查看文件,查看linux系统,服务,配置文件被修改的时间
  6. opengl绘制长方体线框_OpenGL绘制长方体
  7. 51单片机基本刷屏测试实验_基于单片机的发动机振动速度、位移和加速度测量方法...
  8. python 读取邮件内容_利用Python imaplib和email模块 读取邮件文本内容及附件内容...
  9. 用XSLT和XML改进Struts
  10. 如何修正EPS编辑框中汉字输入显示为问号
  11. c语言打码软件官方下载,触动精灵人工打码软件
  12. wget: unable to resolve host address的解决方法
  13. 自己写歌怎么编曲?4款超好用编曲软件推荐
  14. win7科学计算机的用法,win7系统自带的计算器使用的修复步骤
  15. UCSC 基因组浏览器配置详解
  16. 2019 Multi-University Training Contest 2:Beauty Of Unimodal Sequence(DP + 贪心构造)
  17. 亲自动手写一个深度学习框架
  18. 基于Opencv的图像处理-高光调整算法
  19. 网络安全kali渗透学习 web渗透入门 ARL资产侦察灯塔系统搭建及使用
  20. 【软件安装】MySQL8.0安装图文教程及可视化工具Navicat安装

热门文章

  1. java基础之TreeMap
  2. 区块链关键技术1(笔记)
  3. 彻底搞懂弹性布局flex
  4. mysql不等于条件不包含NULL值问题
  5. 创基MIFI扩展坞自带4G上网功能扩展坞
  6. 数据库怎么设计字典表
  7. 如何关闭电脑自动更新?方案三部曲带你走出自动更新的阴影
  8. pandas筛选数据_2_条件筛选
  9. 闪光网彭亮《我死,我想留下什么》
  10. 圆形百分比进度条效果