文章目录

  • 一、前言
  • 二、TaskManagerRunner
    • 2.1、创建 TaskManagerRunner
      • 2.1.1、创建 TaskExecutorService, 用于创建 TaskExecutor
    • 2.2、启动 TaskManagerRunner
      • 2.2.1、基础服务的初始化, 构建 TaskExecutorService
        • 2.2.1.1、BlobCacheService的初始化
    • 2.3、TaskExecutor的初始化

一、前言

TaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流

必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子

二、TaskManagerRunner

从 taskmanager.sh 脚本我们 知道 taskmanager的启动类是 org.apache.flink.runtime.taskexecutor.TaskManagerRunner

入口 main(), 前几行是检验参数,日志记录,然后调用 runTaskManagerProcessSecurely

    public static void main(String[] args) throws Exception {// startup checks and loggingEnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);SignalHandler.register(LOG);JvmShutdownSafeguard.installAsShutdownHook(LOG);long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();if (maxOpenFileHandles != -1L) {LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);} else {LOG.info("Cannot determine the maximum number of open file descriptors");}runTaskManagerProcessSecurely(args);}

接下来我们查看 runTaskManagerProcessSecurely 方法, 就是加载 flink 的配置文件 flink-config.yaml.

    public static void runTaskManagerProcessSecurely(String[] args) {Configuration configuration = null;try {configuration = loadConfiguration(args);} catch (FlinkParseException fpe) {LOG.error("Could not load the configuration.", fpe);System.exit(FAILURE_EXIT_CODE);}runTaskManagerProcessSecurely(checkNotNull(configuration));}

然后将解析完的配置传入 runTaskManagerProcessSecurely

    public static void runTaskManagerProcessSecurely(Configuration configuration) {// 加载配置FlinkSecurityManager.setFromConfiguration(configuration);// 启动插件管理器final PluginManager pluginManager =PluginUtils.createPluginManagerFromRootFolder(configuration);FileSystem.initialize(configuration, pluginManager);StateChangelogStorageLoader.initialize(pluginManager);int exitCode;Throwable throwable = null;ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);try {// 这个地方我们见了很多次,组件启动,都是通过 SecurityUtils// 安装 安全模块SecurityUtils.install(new SecurityConfiguration(configuration));// 通过安全上下文 启动 taskManagerexitCode =SecurityUtils.getInstalledContext().runSecured(() -> runTaskManager(configuration, pluginManager));} catch (Throwable t) {throwable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);exitCode = FAILURE_EXIT_CODE;}if (throwable != null) {LOG.error("Terminating TaskManagerRunner with exit code {}.", exitCode, throwable);} else {LOG.info("Terminating TaskManagerRunner with exit code {}.", exitCode);}System.exit(exitCode);}

runTaskManager 通过名字我们不难看出,离TaskManager的构建越来越近了。我们点进runTaskManager方法:

public static int runTaskManager(Configuration configuration, PluginManager pluginManager)throws Exception {final TaskManagerRunner taskManagerRunner;try {// 创建 TaskManagerRunnertaskManagerRunner =new TaskManagerRunner(configuration,pluginManager,TaskManagerRunner::createTaskExecutorService); // 创建 TaskManagerRunner 相关服务// 启动 TaskManagerRunnertaskManagerRunner.start();} catch (Exception exception) {throw new FlinkException("Failed to start the TaskManagerRunner.", exception);}// 返回结果码try {return taskManagerRunner.getTerminationFuture().get().getExitCode();} catch (Throwable t) {throw new FlinkException("Unexpected failure during runtime of TaskManagerRunner.",ExceptionUtils.stripExecutionException(t));}}

在这个方法里做了两件事:

  • 1、构建了一个TaskManagerRunner

  • 2、启动TaskManagerRunner

    • 基础服务初始化

实际上,TaskManager启动的所有准备工作,都是在这个TaskManagerRunner中完成的。

2.1、创建 TaskManagerRunner

            // 创建 TaskManagerRunnertaskManagerRunner =new TaskManagerRunner(configuration,pluginManager,TaskManagerRunner::createTaskExecutorService); // 创建 TaskManagerRunner 相关服务

2.1.1、创建 TaskExecutorService, 用于创建 TaskExecutor

TaskManagerRunner::createTaskExecutorServicepublic static TaskExecutorService createTaskExecutorService(Configuration configuration,ResourceID resourceID,RpcService rpcService,HighAvailabilityServices highAvailabilityServices,HeartbeatServices heartbeatServices,MetricRegistry metricRegistry,BlobCacheService blobCacheService,boolean localCommunicationOnly,ExternalResourceInfoProvider externalResourceInfoProvider,WorkingDirectory workingDirectory,FatalErrorHandler fatalErrorHandler)throws Exception {// 构建  TaskExecutorfinal TaskExecutor taskExecutor =startTaskManager(configuration,resourceID,rpcService,highAvailabilityServices,heartbeatServices,metricRegistry,blobCacheService,localCommunicationOnly,externalResourceInfoProvider,workingDirectory,fatalErrorHandler);return TaskExecutorToServiceAdapter.createFor(taskExecutor);}

2.2、启动 TaskManagerRunner

    public void start() throws Exception {synchronized (lock) {// startTaskManagerRunnerServices();taskExecutorService.start();}}

2.2.1、基础服务的初始化, 构建 TaskExecutorService

 private void startTaskManagerRunnerServices() throws Exception {synchronized (lock) {rpcSystem = RpcSystem.load(configuration);//  TaskManager 内部线程池,用来处理从节点内部各个组件的Io的线程池this.executor =Executors.newScheduledThreadPool(Hardware.getNumberCPUCores(), // 线程池大小为当前节点的cpu核心数new ExecutorThreadFactory("taskmanager-future"));// 高可用服务highAvailabilityServices =HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration,executor,AddressResolution.NO_ADDRESS_RESOLUTION,rpcSystem,this);// flink1.12 引入新功能 JMX服务,提供监控信息JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));// 启动RPC服务,内部为Akka模型的ActorSystemrpcService = createRpcService(configuration, highAvailabilityServices, rpcSystem);// 为TaskManager生成了一个ResourceIDthis.resourceId =getTaskManagerResourceID(configuration, rpcService.getAddress(), rpcService.getPort());this.workingDirectory =ClusterEntrypointUtils.createTaskManagerWorkingDirectory(configuration, resourceId);LOG.info("Using working directory: {}", workingDirectory);// 初始化心跳服务,主要是初始化心跳间隔和心跳超时参数配置HeartbeatServices heartbeatServices =HeartbeatServices.fromConfiguration(configuration);// 启动Metric(性能监控) 相关服务metricRegistry =new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration,rpcSystem.getMaximumMessageSizeInBytes(configuration)),ReporterSetup.fromConfiguration(configuration, pluginManager));final RpcService metricQueryServiceRpcService =MetricUtils.startRemoteMetricsRpcService(configuration,rpcService.getAddress(),configuration.getString(TaskManagerOptions.BIND_HOST),rpcSystem);metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId.unwrap());// 在主节点启动的时候,事实上已经启动了有个BolbServer,// 从节点启动的时候,会启动一个BlobCacheService,做文件缓存的服务blobCacheService =BlobUtils.createBlobCacheService(configuration,Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()),highAvailabilityServices.createBlobStore(),null);final ExternalResourceInfoProvider externalResourceInfoProvider =ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig(configuration, pluginManager);//  创建得到一个TaskExecutorService,内部封装了TaskExecutor,同时TaskExecutor的构建也在内部完成taskExecutorService =taskExecutorServiceFactory.createTaskExecutor(this.configuration,this.resourceId.unwrap(),rpcService,highAvailabilityServices,heartbeatServices,metricRegistry,blobCacheService,false,externalResourceInfoProvider,workingDirectory.unwrap(),this);handleUnexpectedTaskExecutorServiceTermination();MemoryLogger.startIfConfigured(LOG, configuration, terminationFuture.thenAccept(ignored -> {}));}}

这里所做的工作和JobManager启动时一样,是一些基础服务的构建和启动,在这里一共做了以下这些工作:

  • 1、初始化了一个TaskManager内部线程池,用来处理从节点内部各个组件的IO,该线程池的大小为当前节点CPU的核心数。

  • 2、构建了一个高可用服务。

  • 3、初始化JMX服务,用于提供监控信息。

  • 4、启动RPC服务,内部为Akka模型的ActorSystem(点此查看Flink 1.13 源码解析前导——Akka通信模型)

  • 4、为TaskManager生成了一个ResourceID。

  • 5、初始化心跳服务,根据配置文件获取心跳间隔时间参数以及心跳超时参数

  • 6、初始化metric服务

  • 7、启动BlobCacheService服务,做文件缓存的服务。

  • 8、构建了一个TaskExecutorService,内部封装了TaskExecutor。

2.2.1.1、BlobCacheService的初始化

在这个构造方法里,主要做了两件事:

  • 1、初始化了一个持久化Blob缓存服务

  • 2、初始化了一个临时Blob缓存服务

在这两个服务的内部,都会在启动的时候启动一个定时服务,就是将过期的某个Job的对应资源都删除掉。

    public BlobCacheService(final Configuration blobClientConfig,final Reference<File> storageDir,final BlobView blobView,@Nullable final InetSocketAddress serverAddress)throws IOException {this(// 持久化Blob缓存服务new PermanentBlobCache(blobClientConfig, storageDir, blobView, serverAddress),// 临时Blob缓存服务new TransientBlobCache(blobClientConfig, storageDir, serverAddress));}

以 PermanentBlobCache 为例

    @VisibleForTestingpublic PermanentBlobCache(final Configuration blobClientConfig,final Reference<File> storageDir,final BlobView blobView,@Nullable final InetSocketAddress serverAddress,BlobCacheSizeTracker blobCacheSizeTracker)throws IOException {super(blobClientConfig,storageDir,blobView,LoggerFactory.getLogger(PermanentBlobCache.class),serverAddress);// Initializing the clean up taskthis.cleanupTimer = new Timer(true);this.cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;// TODO 启动定时任务this.cleanupTimer.schedule(new PermanentBlobCleanupTask(), // 任务cleanupInterval, cleanupInterval);// TODO 为永久BLOB文件提供缓存,包括每个作业的引用计数和分阶段清理。this.blobCacheSizeTracker = blobCacheSizeTracker;registerDetectedJobs();}

我们可以看到有以下操作:

  • 1、首先在方法里通过引用计数的方式,获取所有job引用的资源文件。

  • 2、遍历这些文件,并判断是否过期。

  • 3、如果过期则删除该资源文件夹。

在临时缓存blob服务中也是一样的工作

接下来到了重要环节

2.3、TaskExecutor的初始化

// org.apache.flink.runtime.taskexecutor.TaskManagerRunner#createTaskExecutorServicepublic static TaskExecutorService createTaskExecutorService(Configuration configuration,ResourceID resourceID,RpcService rpcService,HighAvailabilityServices highAvailabilityServices,HeartbeatServices heartbeatServices,MetricRegistry metricRegistry,BlobCacheService blobCacheService,boolean localCommunicationOnly,ExternalResourceInfoProvider externalResourceInfoProvider,WorkingDirectory workingDirectory,FatalErrorHandler fatalErrorHandler)throws Exception {final TaskExecutor taskExecutor =startTaskManager(configuration,resourceID,rpcService,highAvailabilityServices,heartbeatServices,metricRegistry,blobCacheService,localCommunicationOnly,externalResourceInfoProvider,workingDirectory,fatalErrorHandler);/*TODO 封装了一下TaskExecutorTaskExecutor是TaskExecutorToServiceAdapter的成员变量TaskExecutorToServiceAdapter是TaskManagerRunner的成员变量*/return TaskExecutorToServiceAdapter.createFor(taskExecutor);}

可以看到在这里真正初始化了一个TaskExecutor,并将TaskExecutor封装了一下,我们首先来看TaskExecutor的初始化,我们进入startTaskManager方法:

在该方法内部依然是初始化了一些基础服务:

返回Flink1.15源码解析-总目录

Flink1.15源码解析--启动TaskManager相关推荐

  1. flink1.15源码笔记(run模式简单带过,主要看run-application)

    Flink1.15源码 Flink1.15源码 Flink1.15源码 flink入口类(bin/flink) parseAndRun(args) A.run(per-job&yarn-ses ...

  2. 渣渣菜鸡的 ElasticSearch 源码解析 —— 启动流程(上)

    关注我 转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/08/11/es-code02/ 前提 上篇文章写了 ElasticSearch 源码解析 -- ...

  3. APISIX源码解析-启动篇【ops.lua - start】

    APISIX源码解析-启动篇[ops.lua - start] 命令开始入口 ops.execute(env, arg) 命令参数校验 目前仅支持:help.version.init.init_etc ...

  4. Flink 全网最全资源(视频、博客、PPT、入门、原理、实战、性能调优、源码解析、问答等持续更新)

    Flink 学习 https://github.com/zhisheng17/flink-learning 麻烦路过的各位亲给这个项目点个 star,太不易了,写了这么多,算是对我坚持下来的一种鼓励吧 ...

  5. Flink 全网最全资源(视频、博客、PPT、入门、实战、源码解析、问答等持续更新)...

    Flink 学习 github.com/zhisheng17/- 麻烦路过的各位亲给这个项目点个 star,太不易了,写了这么多,算是对我坚持下来的一种鼓励吧! 本项目结构 博客 1.Flink 从0 ...

  6. Flink 源码解析 —— 源码编译运行

    更新一篇知识星球里面的源码分析文章,去年写的,周末自己录了个视频,大家看下效果好吗?如果好的话,后面补录发在知识星球里面的其他源码解析文章. 前言 之前自己本地 clone 了 Flink 的源码,编 ...

  7. webpack那些事:浅入深出-源码解析构建优化

    基础知识回顾 入口(entry) module.exports = {entry: './path/to/my/entry/file.js' }; //或者 module.exports = {ent ...

  8. .Net Core 源码解析

    .Net core 源码解析 启动代码 创建并配置主机Builder CreateDefaultBuilder分析 Host类-用于产生初始的builder静态类 IHostBuilder转变成IWe ...

  9. datax源码解析-JobContainer的初始化阶段解析

    datax源码解析-JobContainer的初始化阶段解析 写在前面 此次源码分析的版本是3.0.因为插件是datax重要的组成部分,源码分析过程中会涉及到插件部分的源码,为了保持一致性,插件都已大 ...

最新文章

  1. golang go build 报错 import cycle not allowed
  2. springcloud config 分布式配置中心
  3. JavaScript.The.Good.Parts阅读笔记(一)假值与===运算符
  4. QT的QDomAttr类的使用
  5. 开启sql server2008的1433端口
  6. Git commit/pull/push的操作步骤
  7. 论破坏计算机信息系统罪,论破坏计算机信息系统罪
  8. Python简单GUI(录音机)
  9. 最普通IT男-苦逼coder杂谈
  10. 2.6一个小工具的使用snipaste
  11. java opts tomcat,tomcat JAVA_OPTS配备
  12. SQL学习笔记(03)_BETWEEN 操作符
  13. html幸运数字游戏,十二生肖的幸运数字
  14. 在线小蝌蚪匿名聊天室源码 用于网站引流
  15. 【笔记】autoCAD无法显示文字解决方案
  16. 【CUDA开发】CUDA的安装、Nvidia显卡型号及测试
  17. 1553B总线基础知识及扩展
  18. C#连接数据库自动生成实体类
  19. java socket send_Socket send函数和recv函数详解
  20. linux统计文件单词数,Linux怎么统计文本的的行数/单词数和字符数?

热门文章

  1. 1028: 积少成多(2级)小林和小树兄弟俩相约存零钱。眼看到年底了,兄弟俩决定算算一共存了多少钱,请帮他们算出来。
  2. HTML 表格标签、列表标签、表单标签(案例: 注册页面)
  3. 为什么【网上邻居】中找不到计算机,都是NetBIOS名惹得祸。
  4. oj1029: 顺时针逆时针
  5. 原生js计时器(学习篇)
  6. 纯JS实现左右滑动布局和滑动
  7. 解决git取消ss代理后仍然访问代理端口的问题
  8. python读取txt文件时,报错【utf-8 codec cant decode byte 0xcc】的解决办法
  9. apache 302重定向
  10. CSS优先级算法如何计算