从《Apache Flink本地部署》这篇文章中可以看到,我们启动集群都是通过脚本start-cluster.sh开始执行。
我们的源码解析之路就从flink的bash脚本入手。

start-cluster.sh
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

. "$bin"/config.sh

# Start the JobManager instance(s)
shopt -s nocasematch
if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then
    # HA Mode
    readMasters

echo "Starting HA cluster with ${#MASTERS[@]} masters."

for ((i=0;i<${#MASTERS[@]};++i)); do
        master=${MASTERS[i]}
        webuiport=${WEBUIPORTS[i]}

if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then
            "${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}"
        else
            ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start ${master} ${webuiport} &"
        fi
    done

else
    echo "Starting cluster."

# Start single JobManager on this machine
    "$FLINK_BIN_DIR"/jobmanager.sh start
fi
shopt -u nocasematch

# Start TaskManager instance(s)
TMSlaves start
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
从代码上看,不管是启动HA模式,还是非HA模式,脚本都会调用jobmanager.sh

###jobmanager.sh

STARTSTOP=$1
...
ENTRYPOINT=standalonesession
...
if [[ $STARTSTOP == "start-foreground" ]]; then
    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"
else
    "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
fi
1
2
3
4
5
6
7
8
9
这里的STARTSTOP=$1,其实就是STARTSTOP=start,所以这里会走flink-daemon.sh脚本这条线。

###flink-daemon.sh脚本
核心分为以下三个代码块

# Start/stop a Flink daemon.
USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]"

STARTSTOP=$1
DAEMON=$2
ARGS=("${@:3}") # get remaining arguments as array

1
2
3
4
5
6
7
从脚本的使用定义来看flink-daemon.sh可以启动taskexecutor|zookeeper|historyserver|standalonesession|standalonejob
这里的DAEMON=$2从上一个脚本传递的值可以的值DAEMON=standalonesession

case $DAEMON in
    (taskexecutor)
        CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
    ;;

(zookeeper)
        CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
    ;;

(historyserver)
        CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
    ;;

(standalonesession)
        CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
    ;;

(standalonejob)
        CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint
    ;;

(*)
        echo "Unknown daemon '${DAEMON}'. $USAGE."
        exit 1
    ;;
esac
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
根据$DAEMON来指定所需要运行的类名称,这里是
CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClust

case $STARTSTOP in

(start)
        # Rotate log files
        rotateLogFilesWithPrefix "$FLINK_LOG_DIR" "$FLINK_LOG_PREFIX"

# Print a warning if daemons are already running on host
        if [ -f "$pid" ]; then
          active=()
          while IFS='' read -r p || [[ -n "$p" ]]; do
            kill -0 $p >/dev/null 2>&1
            if [ $? -eq 0 ]; then
              active+=($p)
            fi
          done < "${pid}"

count="${#active[@]}"

if [ ${count} -gt 0 ]; then
            echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME."
          fi
        fi

# Evaluate user options for local variable expansion
        FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})

echo "Starting $DAEMON daemon on host $HOSTNAME."
        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null &

mypid=$!

# Add to pid file if successful start
        if [[ ${mypid} =~ ${IS_NUMBER} ]] && kill -0 $mypid > /dev/null 2>&1 ; then
            echo $mypid >> "$pid"
        else
            echo "Error starting $DAEMON daemon."
            exit 1
        fi
    ;;
    ...
esac
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
case $STARTSTOP in程序块中包含三个功能:start、stop、stop-all。
我们这里通过start功能进行启动

StandaloneSessionClusterEntrypoint.java
public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint {

public StandaloneSessionClusterEntrypoint(Configuration configuration) {
        super(configuration);
    }

@Override
    protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
        return new SessionDispatcherResourceManagerComponentFactory(StandaloneResourceManagerFactory.INSTANCE);
    }

public static void main(String[] args) {
        // startup checks and logging
        EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
        SignalHandler.register(LOG);
        JvmShutdownSafeguard.installAsShutdownHook(LOG);

EntrypointClusterConfiguration entrypointClusterConfiguration = null;
        final CommandLineParser<EntrypointClusterConfiguration> commandLineParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory());

try {
            entrypointClusterConfiguration = commandLineParser.parse(args);
        } catch (FlinkParseException e) {
            LOG.error("Could not parse command line arguments {}.", args, e);
            commandLineParser.printHelp(StandaloneSessionClusterEntrypoint.class.getSimpleName());
            System.exit(1);
        }

Configuration configuration = loadConfiguration(entrypointClusterConfiguration);

StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration);

ClusterEntrypoint.runClusterEntrypoint(entrypoint);
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
StandaloneSessionClusterEntrypoint的main方法的执行流程如下:

通过commandLineParser对象解析参数信息
loadConfiguration加载配置
通过配置实例化StandaloneSessionClusterEntrypoint对象
最终通过ClusterEntrypoint的runClusterEntrypoint方法运行StandaloneSessionClusterEntrypoint实例
ClusterEntrypoint.java
private void runCluster(Configuration configuration) throws Exception {
        synchronized (lock) {
            initializeServices(configuration);

// write host information into configuration
            configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
            configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);

clusterComponent = dispatcherResourceManagerComponentFactory.create(
                configuration,
                commonRpcService,
                haServices,
                blobServer,
                heartbeatServices,
                metricRegistry,
                archivedExecutionGraphStore,
                new AkkaQueryServiceRetriever(
                    metricQueryServiceActorSystem,
                    Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
                this);

clusterComponent.getShutDownFuture().whenComplete(
                (ApplicationStatus applicationStatus, Throwable throwable) -> {
                    if (throwable != null) {
                        shutDownAsync(
                            ApplicationStatus.UNKNOWN,
                            ExceptionUtils.stringifyException(throwable),
                            false);
                    } else {
                        // This is the general shutdown path. If a separate more specific shutdown was
                        // already triggered, this will do nothing
                        shutDownAsync(
                            applicationStatus,
                            null,
                            true);
                    }
                });
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
runCluster方法中流程如下:

protected void initializeServices(Configuration configuration) throws Exception {

LOG.info("Initializing cluster services.");

synchronized (lock) {
            final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS);
            final String portRange = getRPCPortRange(configuration);

commonRpcService = createRpcService(configuration, bindAddress, portRange);

// update the configuration used to create the high availability services
            configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
            configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

haServices = createHaServices(configuration, commonRpcService.getExecutor());
            blobServer = new BlobServer(configuration, haServices.createBlobStore());
            blobServer.start();
            heartbeatServices = createHeartbeatServices(configuration);
            metricRegistry = createMetricRegistry(configuration);

// TODO: This is a temporary hack until we have ported the MetricQueryService to the new RpcEndpoint
            // Start actor system for metric query service on any available port
            metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem(configuration, bindAddress, LOG);
            metricRegistry.startQueryService(metricQueryServiceActorSystem, null);

archivedExecutionGraphStore = createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor());

transientBlobCache = new TransientBlobCache(
                configuration,
                new InetSocketAddress(
                    commonRpcService.getAddress(),
                    blobServer.getPort()));
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
initializeServices(configuration); 初始化服务
创建RPC服务
创建HA服务
创建blob服务
创建心跳服务
创建metrice注册
创建ActorSystem
创建ArchivedExecutionGraphStore
创建TransientBlobCache
创建dispatcherResourceManagerComponentFactory对象
dispatcherResourceManagerComponentFactory的create方法参数中我们可以看到很多服务相关的信息:
configuration,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
AkkaQueryServiceRetriever
这块主要是Flink UI上所要展示的内容相关信息。

总结
到这里我们就了解了Apache Flink是如何通过start-cluster.sh脚本执行到最后的程序运行启动的全流程。从下一篇文章开始会根据ScoketWindowWorkCount这个示例,开始讲解任务运行后相关的一些内容。
————————————————
版权声明:本文为CSDN博主「Jonathan-Wei」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/WeiJonathan/article/details/84979831

Flink源码解析 | 从Example出发:理解Flink启动流程相关推荐

  1. [源码解析] 当 Java Stream 遇见 Flink

    [源码解析] 当 Java Stream 遇见 Flink 文章目录 [源码解析] 当 Java Stream 遇见 Flink 0x00 摘要 0x01 领域 1.1 Flink 1.2 Java ...

  2. Flink 源码解析 —— 源码编译运行

    更新一篇知识星球里面的源码分析文章,去年写的,周末自己录了个视频,大家看下效果好吗?如果好的话,后面补录发在知识星球里面的其他源码解析文章. 前言 之前自己本地 clone 了 Flink 的源码,编 ...

  3. 【Flink】Flink 源码阅读笔记(15)- Flink SQL 整体执行框架

    1.概述 转载:Flink 源码阅读笔记(15)- Flink SQL 整体执行框架 在数据处理领域,无论是实时数据处理还是离线数据处理,使用 SQL 简化开发将会是未来的整体发展趋势.尽管 SQL ...

  4. 【Flink】Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型

    1.概述 转载:Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型 相似文章:[Flink]Flink 基于 MailBox 实现的 StreamTask 线程模型 Fl ...

  5. 【Flink】Flink 源码阅读笔记(18)- Flink SQL 中的流和动态表

    1.概述 转载:Flink 源码阅读笔记(18)- Flink SQL 中的流和动态表

  6. 【Flink】Flink 源码阅读笔记(16)- Flink SQL 的元数据管理

    1.概述 转载:Flink 源码阅读笔记(16)- Flink SQL 的元数据管理 Flink 源码阅读笔记(17)- Flink SQL 中的时间属

  7. Flink 源码解析2--JobGraph的生成

    上一节介绍了StreamGraph的生成,这个实际上只对应 Flink 作业在逻辑上的执行计划图.Flink 会进一步对 StreamGraph 进行转换,得到另一个执行计划图,即JobGraph.然 ...

  8. springboot启动源码解析(三):初始化启动上下文、初始化监听器列表、发布开始启动事件

    此章节主要对springboot启动过程中,发生的[初始化启动上下文].[初始化监听器列表].[发布springboot开始启动事件]进行源码解析,对应的代码如图1所示: 图1: // 首先初始化一个 ...

  9. Flink 源码解析1--StreamGraph的生成

    1. 先来简单看一下入门的WordCount程序 1.首先数据源会产生随机的数字数据流(0-10内的数字)形式,然后通过flink的transformation将数据进行单词计数,再print输出 / ...

最新文章

  1. SpringMVC之context-dispatcher.xml,了解基本的控制器
  2. Java当中的HashSet
  3. 信息学奥赛一本通 1034:计算三角形面积 | OpenJudge NOI 1.3 17
  4. 【读书札记】《怦然心动——情感化交互设计指南》
  5. 目标检测数据集制作常用脚本集合
  6. 关于突然不能上网的问题的解决
  7. 概率论与环境数理统计 20210222
  8. php 留言回复,PHP 留言板后台管理回复及删除留言处理
  9. PDF如何转Excel?学会这3个方法,1分钟就能实现转换
  10. 计算机分屏解决方案,电脑一机多屏显示解决方案
  11. mysql tmp mysql.sock_MySQL搭建过程中的“/tmp/mysql.sock错误解决
  12. Padding Oracle攻击(POODLE)技术分析
  13. Linux种修改用户主目录命令,usermod命令怎么修改用户主目录
  14. python中的列表
  15. 28.QT-QPainter介绍
  16. 技能梳理6@NODEMCU+BH1750+DS18B20+DHT11+PWM电机+ONENET
  17. 常用飞轮构型的角动量包络
  18. 什么是java的关键字_java中常见的关键字
  19. vscode调整界面字体大小方法
  20. https抓包工具httpsMon

热门文章

  1. 用jQuery打造个性网站
  2. 新书上市|一位家长的忠告:长大后不成才的孩子,父母都忽视了这个点!
  3. 微信小程序真机调试常见问题汇总
  4. chatgpt国内能用吗?详细解读gpt的使用方法
  5. 电游入侵传统教育,用练级学习
  6. Chrome开发者工具-阅读列表
  7. sql server 2016不能全部用到CPU的逻辑核心数的问题
  8. 华为平远程windows电脑
  9. 如何提高团队管理能力
  10. ug900-vivado-logic-simulation中文文档 | Xilinx