写在前面

在 【Flink源码】再谈 Flink 程序提交流程(中) 一文中,笔者后来发现谬误颇多,且随着 Flink 版本的更迭,部分方法实现方式已发生较大改变。因此,思虑再三决定针对 JobManager 相关源码根据最新的 Flink 版本(1.17)单独成文。


JobManager 是什么?

Flink 的主节点 JobManager 是一个逻辑上的主节点,针对不同的部署模式,主节点的实现类也不一样
JobManager 有三大核心内容:ResourceManager、Dispatcher 和 WebMonitorEndpoint

  • ResourceManager:Flink集群的资源管理器,只有一个,负责 Slot 的管理和申请等工作
  • Dispatcher
    • 负责接收用户提交 JobGraph,然后启动一个 JobMaster,类似于 Yarn 中的 AppMaster 和 Spark 中的 Driver
    • 内有一个持久服务:JobGraphStore,负责存储 JobGraph。当构建执行图或物理执行图时主节点宕机并恢复,则可以从这里重新拉取作业 JobGraph
  • WebMonitorEndpoint:Rest 服务,内部有一个 Netty 服务,客户端的所有请求都由该组件接收处理

当 Client 提交一个 Job 到集群时(Client 会把 Job 构建成一个 JobGraph),主节点接收到提交的 Job 的 Rest 请求后,WebMonitorEndpoint 会通过 Router 进行解析找到对应的 Handler 来执行处理,处理完毕后交由 Dispatcher,Dispatcher 负责搭起 JobMaster 来负责这个 Job 内部的 Task 的部署执行,执行 Task 所需的资源由 JobMaster 向 ResourceManager 申请

JobManager 启动源码

JobManager 启动流程

JobManager 的启动流程分为三个部分:

  • 初始化 8 个基础服务
  • 创建工厂实例
  • 通过不同的工厂实例创建三大核心组件 ResourceManager、Dispatcher、WebMonitorEndpoint

主节点准备工作

我们以 Standalone 模式为例,下同
找到主节点启动类 StandaloneSessionClusterEntrypoint

StandaloneSessionClusterEntrypoint.java

public static void main(String[] args) {// startup checks and loggingEnvironmentInformation.logEnvironmentInfo(LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);SignalHandler.register(LOG);JvmShutdownSafeguard.installAsShutdownHook(LOG);// 解析 flink run 命令的参数final EntrypointClusterConfiguration entrypointClusterConfiguration =ClusterEntrypointUtils.parseParametersOrExit(args,new EntrypointClusterConfigurationParserFactory(),StandaloneSessionClusterEntrypoint.class);// 解析 flink-conf.yaml 配置文件Configuration configuration = loadConfiguration(entrypointClusterConfiguration);// 创建主节点StandaloneSessionClusterEntrypoint entrypoint =new StandaloneSessionClusterEntrypoint(configuration);// 启动主节点ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}

在这个入口类主要做了四件事:

  1. 解析提交作业命令的参数
  2. 解析 flink-conf.yaml 配置文件
  3. 创建主节点
  4. 启动主节点

首先来看解析 flink-conf.yaml 的过程

public static Configuration loadConfiguration(final String configDir, @Nullable final Configuration dynamicProperties) {if (configDir == null) {throw new IllegalArgumentException("Given configuration directory is null, cannot load configuration");}final File confDirFile = new File(configDir);if (!(confDirFile.exists())) {throw new IllegalConfigurationException("The given configuration directory name '"+ configDir+ "' ("+ confDirFile.getAbsolutePath()+ ") does not describe an existing directory.");}// get Flink yaml configuration file// TODO 读取flink-conf.yaml文件final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);// 文件不存在则报错if (!yamlConfigFile.exists()) {throw new IllegalConfigurationException("The Flink config file '"+ yamlConfigFile+ "' ("+ yamlConfigFile.getAbsolutePath()+ ") does not exist.");}// TODO 解析flink-conf.yaml文件Configuration configuration = loadYAMLResource(yamlConfigFile);if (dynamicProperties != null) {configuration.addAll(dynamicProperties);}return configuration;
}

首先根据 conf 路径将文件读进来,再通过 loadYAMLResource() 方法解析文件中的配置,并将 configuration 返回出去

主节点启动过程

ClusterEntrypoint.java

public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();try {// 启动主类clusterEntrypoint.startCluster();} catch (ClusterEntrypointException e) {LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),e);System.exit(STARTUP_FAILURE_RETURN_CODE);}int returnCode;Throwable throwable = null;try {returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();} catch (Throwable e) {throwable = ExceptionUtils.stripExecutionException(e);returnCode = RUNTIME_FAILURE_RETURN_CODE;}LOG.info("Terminating cluster entrypoint process {} with exit code {}.",clusterEntrypointName,returnCode,throwable);System.exit(returnCode);
}public void startCluster() throws ClusterEntrypointException {LOG.info("Starting {}.", getClass().getSimpleName());try {FlinkSecurityManager.setFromConfiguration(configuration);PluginManager pluginManager =PluginUtils.createPluginManagerFromRootFolder(configuration);configureFileSystems(configuration, pluginManager);SecurityContext securityContext = installSecurityContext(configuration);ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);securityContext.runSecured((Callable<Void>)() -> {runCluster(configuration, pluginManager);return null;});} catch (Throwable t) {final Throwable strippedThrowable =ExceptionUtils.stripException(t, UndeclaredThrowableException.class);try {// clean up any partial stateshutDownAsync(ApplicationStatus.FAILED,ShutdownBehaviour.GRACEFUL_SHUTDOWN,ExceptionUtils.stringifyException(strippedThrowable),false).get(INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(),TimeUnit.MILLISECONDS);} catch (InterruptedException | ExecutionException | TimeoutException e) {strippedThrowable.addSuppressed(e);}throw new ClusterEntrypointException(String.format("Failed to initialize the cluster entrypoint %s.",getClass().getSimpleName()),strippedThrowable);}
}

我们继续进入 runCluster 方法
该方法是主节点启动的核心方法,主要做了三件事:

  1. 初始化主节点对外提供服务的时候所需要的三大核心组件启动时所需的基础服务
  2. 初始化一个 DispatcherResourceManagerComponentFactory 工厂实例,内部初始化了三大核心组件的工厂实例
  3. 根据工厂类和基础环境,创建三大核心组件

首先来看初始化八大基础服务

protected void initializeServices(Configuration configuration, PluginManager pluginManager)throws Exception {LOG.info("Initializing cluster services.");synchronized (lock) {resourceId =configuration.getOptional(JobManagerOptions.JOB_MANAGER_RESOURCE_ID).map(value ->DeterminismEnvelope.deterministicValue(new ResourceID(value))).orElseGet(() ->DeterminismEnvelope.nondeterministicValue(ResourceID.generate()));LOG.debug("Initialize cluster entrypoint {} with resource id {}.",getClass().getSimpleName(),resourceId);workingDirectory =ClusterEntrypointUtils.createJobManagerWorkingDirectory(configuration, resourceId);LOG.info("Using working directory: {}.", workingDirectory);rpcSystem = RpcSystem.load(configuration);// 初始化和启动 AkkaRpcService,内部包装了 ActorSystem// 创建一个 AkkaRpc 服务,基于 Akka 的 RpcService 实现// commonRpcService 是一个基于 Akka 的 ActorSystem,其实就是一个 TCP 的 RPC 服务,端口:6123commonRpcService =RpcUtils.createRemoteRpcService(rpcSystem,configuration,configuration.getString(JobManagerOptions.ADDRESS),getRPCPortRange(configuration),configuration.getString(JobManagerOptions.BIND_HOST),configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));// 启动一个 JMXService,用于客户端连接 JobManager,JVM 监控JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));// update the configuration used to create the high availability servicesconfiguration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());// 初始化 IO 线程池,大小为当前节点 CPU 核心数 * 4ioExecutor =Executors.newFixedThreadPool(ClusterEntrypointUtils.getPoolSize(configuration),new ExecutorThreadFactory("cluster-io"));// 初始化一个基于 Zookeeper 的 HA 服务:ZookeeperHaServiceshaServices = createHaServices(configuration, ioExecutor, rpcSystem);// 初始化大文件存储 BlobServer 服务端// 所谓大文件例如上传 Flink-job 的 jar 时所依赖的一些需要一起上传的 jar,或者 TaskManager 上传的 log 文件等blobServer =BlobUtils.createBlobServer(configuration,Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()),haServices.createBlobStore());blobServer.start();configuration.setString(BlobServerOptions.PORT, String.valueOf(blobServer.getPort()));// 心跳服务heartbeatServices = createHeartbeatServices(configuration);delegationTokenManager =KerberosDelegationTokenManagerFactory.create(getClass().getClassLoader(),configuration,commonRpcService.getScheduledExecutor(),ioExecutor);// 启动 Metric(性能监控)相关服务,内部也是启动一个 ActorSystemmetricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);final RpcService metricQueryServiceRpcService =MetricUtils.startRemoteMetricsRpcService(configuration,commonRpcService.getAddress(),configuration.getString(JobManagerOptions.BIND_HOST),rpcSystem);metricRegistry.startQueryService(metricQueryServiceRpcService, null);final String hostname = RpcUtils.getHostname(commonRpcService);processMetricGroup =MetricUtils.instantiateProcessMetricGroup(metricRegistry,hostname,ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));// 初始化一个用来存储 ExecutionGraph 的 Store,实现是 FileArchivedExecutionGraphStore// JobGraphStore 会在 Dispatcher 启动时启动executionGraphInfoStore =createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor());}
}

初始化的服务:

  1. commonRPCService:基于 Akka 的 RpcService 实现。内部包装了 ActorSystem,这个服务其实就是一个 TCP 的 RPC 服务,端口为 6123
  2. JMXService:启动一个 JMXService,用于客户端连接 JobManager JVM 监控
  3. IOExecutor:启动一个线程池,大小为当前节点 CPU 核心数 * 4
  4. haServices:初始化一个基于 Zookeeper 的 HA 服务 ZookeeperHaServices,提供对高可用性的所有服务的访问注册,分布式计数器和领导人选举
  5. BlobServer:初始化大文件存储 BlobServer 服务端,所谓大文件例如上传 Flink-job 的 jar 时所依赖的一些需要一起上传的 jar,或者 TaskManager 上传的 log 文件等
  6. heartbeatServices:提供心跳所需的所有服务,包括创建心跳接收器和心跳发送者
  7. metricRegistry:启动 Metric(性能监控)相关服务,内部也是启动一个 ActorSystem,跟踪所有已注册的 Metric,作为连接 MetricGroup 和 MetricReporter
  8. archivedExecutionGraphStore:存储执行图 ExecutionGraph 的可序列化形式。注意此处不是 JobGraphStore,JobGraphStore 会在 Dispatcher 启动时启动

接下来创建核心工厂类

找到 StandaloneSessionClusterEntrypoint 类

StandaloneSessionClusterEntrypoint.java

protected DefaultDispatcherResourceManagerComponentFactorycreateDispatcherResourceManagerComponentFactory(Configuration configuration) {// 创建第一个工厂 StandaloneResourceManagerFactoryreturn DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(StandaloneResourceManagerFactory.getInstance());
}

进入 createSessionComponentFactory 方法

DefaultDispatcherResourceManagerComponentFactory.java

public static DefaultDispatcherResourceManagerComponentFactory createSessionComponentFactory(ResourceManagerFactory<?> resourceManagerFactory) {// 构建工厂return new DefaultDispatcherResourceManagerComponentFactory(// 第二个工厂DefaultDispatcherRunnerFactory.createSessionRunner(SessionDispatcherFactory.INSTANCE),// 第一个工厂resourceManagerFactory,// 第三个工厂SessionRestEndpointFactory.INSTANCE);
}

可见,主节点一共创建了三个核心组件的工厂实例:

  1. 生产 DefaultDispatcherRunner
  2. 生产 StandaloneResourceManager
  3. 生产 DispatcherRestEndpoint

接下来通过工厂实例创建 ResourceManager、DispatcherRunner、WebMonitorEndpoint

  1. DispatcherRunner,实现是:DefaultDispatcherRunner
  2. ResourceManager,实现是:StandaloneResourceManager
  3. WebMonitorEndpoint,实现是:DispatcherRestEndpoint

我们从 dispatcherResourceManagerComponentFactory.create 开始看

第一步:首先初始化一些监控服务

DefaultDispatcherResourceManagerComponentFactory.java

// 监控 Dispatcher
dispatcherLeaderRetrievalService =highAvailabilityServices.getDispatcherLeaderRetriever();
// 监控 ResourceManager
resourceManagerRetrievalService =highAvailabilityServices.getResourceManagerLeaderRetriever();
// ResourceManager 的 GatewayRetriever
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =new RpcGatewayRetriever<>(rpcService,DispatcherGateway.class,DispatcherId::fromUuid,new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));

第二步:构建一个线程池用于执行 WebMonitorEndpoint 所接收到的 client 发送过来的请求

// 创建线程池用于执行 WebMonitorEndpoint 所接收到的 client 发送过来的请求
final ScheduledExecutorService executor =WebMonitorEndpoint.createExecutorService(configuration.getInteger(RestOptions.SERVER_NUM_THREADS),configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),"DispatcherRestEndpoint");

第三步:初始化 MetricFetcher,刷新间隔 10s

// 初始化 MetricFetcher,刷新间隔 10s
final long updateInterval =configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
final MetricFetcher metricFetcher =updateInterval == 0? VoidMetricFetcher.INSTANCE: MetricFetcherImpl.fromConfiguration(configuration,metricQueryServiceRetriever,dispatcherGatewayRetriever,executor);

第四步:创建 WebMonitorEndpoint 实例,并启动,在Standalone 模式下为:DispatcherRestEndpoint 该实例内部会启动一个 Netty 服务端,绑定了一堆 Handler

// 创建 WebMonitorEndpoint 实例,在 Standalone 模式下为:DispatcherRestEndpoint
// 该实例内部会启动一个 Netty 服务端,绑定了一堆 Handler
webMonitorEndpoint =restEndpointFactory.createRestEndpoint(configuration,dispatcherGatewayRetriever,resourceManagerGatewayRetriever,blobServer,executor,metricFetcher,highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),fatalErrorHandler);
// 启动 WebMonitorEndpoint
log.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();

第五步:创建 ResourceManager 对象

  1. ResourceManager 是一个 RpcEndpoint(Actor),当构建好对象后启动时会触发 onStart(Actor 的 perStart 生命周期方法)方法
  2. ResourceManager 也是一个 LeaderContender,也会执行竞选,会执行竞选结果方法
  3. ResourceManagerService 具有两个心跳服务和两个定时服务:
    • 两个心跳服务:从节点和主节点之间的心跳,Job 的主控程序和主节点之间的心跳
    • 两个定时服务:TaskManager 的超时检查服务 Slot 申请的超时检查服务
// 创建 ResourceManager 对象
resourceManagerService =ResourceManagerServiceImpl.create(resourceManagerFactory,configuration,resourceId,rpcService,highAvailabilityServices,heartbeatServices,delegationTokenManager,fatalErrorHandler,new ClusterInformation(hostname, blobServer.getPort()),webMonitorEndpoint.getRestBaseUrl(),metricRegistry,hostname,ioExecutor);

第六步:构建了一个 DispatcherRunner,注意不是 Dispatcher,Dispatcher 的构建和启动时再 DispatcherRunner 内部实现的

// 创建 dispatcherRunner 对象并启动
log.debug("Starting Dispatcher.");
dispatcherRunner =dispatcherRunnerFactory.createDispatcherRunner(highAvailabilityServices.getDispatcherLeaderElectionService(),fatalErrorHandler,new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),ioExecutor,rpcService,partialDispatcherServices);

第七步:启动 ResourceManager

// 启动 ResourceManager
log.debug("Starting ResourceManagerService.");
resourceManagerService.start();

至此,JobManager 启动完毕

关于 ResourceManager、WebMonitorEndpoint、Dispatcher 的启动流程留待后文讨论

【Flink源码】JobManager启动流程相关推荐

  1. 渣渣菜鸡的 ElasticSearch 源码解析 —— 启动流程(上)

    关注我 转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/08/11/es-code02/ 前提 上篇文章写了 ElasticSearch 源码解析 -- ...

  2. Python源码学习:启动流程简析

    Python源码分析 本文环境python2.5系列 参考书籍<<Python源码剖析>> Python简介: python主要是动态语言,虽然Python语言也有编译,生成中 ...

  3. Android4.0源码Launcher启动流程分析【android源码Launcher系列一】

    最近研究ICS4.0的Launcher,发现4.0和2.3有稍微点区别,但是区别不是特别大,所以我就先整理一下Launcher启动的大致流程. Launcher其实是贯彻于手机的整个系统的,时时刻刻都 ...

  4. Android中ICS4.0源码Launcher启动流程分析【android源码Launcher系列一】

    最近研究ICS4.0的Launcher,发现4.0和2.3有稍微点区别,但是区别不是特别大,所以我就先整理一下Launcher启动的大致流程.Launcher其实是贯彻于手机的整个系统的,时时刻刻都在 ...

  5. Android 源码 PackageManagerService 启动流程分析

    <Android 源码 installPackage 流程分析>一节着重分析了 apk 安装流程,接下来我们分析 PackageManagerService 启动时都做了些什么? 执行 P ...

  6. uboot 2021.10源码分析(启动流程)

    uboot版本:2021.10 平台:armv8  rk3399  eMMC 16G  LPDDR4 4G 本文主要基于uboot的执行流程进行分析而忽略了相关细节,从uboot的基本框架结构着手,新 ...

  7. redhad环境android源码编译,启动流程  |  Android 开源项目  |  Android Open Source Project...

    建议的设备启动流程如下所示: 图 1. 启动时验证流程 适用于 A/B 设备的流程 如果设备使用的是 A/B 系统,则启动流程略有不同.必须先使用启动控件 HAL 将要启动的槽位标记为 SUCCESS ...

  8. 【Android 启动过程】Activity 启动源码分析 ( ActivityThread 流程分析 二 )

    文章目录 前言 一.ActivityManagerService.attachApplicationLocked 二.ActivityStackSupervisor.attachApplication ...

  9. 【Flink】 Flink 源码之 SQL 执行流程

    1.概述 转载:Flink 源码之 SQL 执行流程 2.前言 本篇为大家带来Flink执行SQL流程的分析.它的执行步骤概括起来包含: 解析.使用Calcite的解析器,解析SQL为语法树(SqlN ...

最新文章

  1. python描边_【基础】学习笔记52-Python3 matplotlib绘图-热力图1
  2. Javascript中的树结构
  3. 数学建模——K-means聚类模型Python代码
  4. pid调节软件_非常实用的PID算法和PID控制原理
  5. c语言中Gretchen函数的功能,听过很多的歌的音乐达人给我推荐一下
  6. Deep Learning 【Nature review】
  7. 剑指offer面试题32 - I. 从上到下打印二叉树(二叉树)(BFS)
  8. python零基础自学教材-零基础的小白怎么学python?
  9. 阿里负载均衡,配置中间证书问题(在starcom申请免费DV ssl)
  10. 如何由 XRD 图谱确定所做的样品是准晶结构
  11. Java暴力破解Wifi
  12. CentOS 8系统时间校准
  13. 三十六计珍藏版(下)
  14. P5.JS绘制动态图形
  15. 计算机毕业设计Java教育培训机构信息管理系统(源码+系统+mysql数据库+lW文档)
  16. VPCS使用教程:模拟GNS3虚拟PC
  17. UE5自定义编辑器Slate插件
  18. 2022R2移动式压力容器充装考试题及答案
  19. 计算机论文 游戏,计算机游戏论文3000字_计算机游戏毕业论文范文模板.doc
  20. unity3d游戏场景制作

热门文章

  1. android系统广播有哪些
  2. html页面透明度属性,css透明度属性是什么?
  3. Blender中实现一个Mobius环
  4. 五个月的时间视力从0.4-1.2,她究竟用了什么“好方法”?
  5. R语言中的遗传算法详细解析
  6. c语言程序设计编程题库,C语言程序设计习题库.doc
  7. C++ STL源码剖析——P1、P2、P3、P4、P5、P6、P7
  8. SpringCloud精妙的设计,你还不知道?
  9. 帮忙设计一个数字电子时钟的课程设计
  10. 黑色炫酷网址安全跳转源码 GO跳转PHP页面