1.概述

转载: Flink1.10基于工厂模式的任务提交与SPI机制

Flink任务执行模式包含了yarn-session、standalone、per-job、local, 在1.10中又增加k8s的执行模式,那么在任务提交过程中如何根据不同的执行模式进行任务提交呢?

主要通过两个接口来实现:PipelineExecutorFactoryPipelineExecutorPipelineExecutorFactory用于在不同模式下创建不同的PipelineExecutor, 用于提交任务,PipelineExecutorFactory表示的一个创建执行器工厂接口,PipelineExecutor 表示一个执行器接口,正如你所想这里使用的就是经典的工厂设计模式,在任务提交过程中会根据不同的提交模式, 使用不同的PipelineExecutorFactory创建不同的PipelineExecutor。

public interface PipelineExecutorFactory {/*** Returns the name of the executor that this factory creates.*/String getName();/**根据configuration判断是否满足当前的factory*/boolean isCompatibleWith(final Configuration configuration);/*** 获取对应模式下的executor*/PipelineExecutor getExecutor(final Configuration configuration);
}

PipelineExecutorFactory几个实现分别为:

  1. LocalExecutorFactory(local)

  2. RemoteExecutorFactory(standalone)

  3. YarnJobClusterExecutorFactory(per-job)

  4. YarnSessionClusterExecutorFactory(yarn-session)

public interface PipelineExecutor {/*** 执行任务*/CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception;
}

PipelineExecutor对应实现:

  1. RemoteExecutor(standalone)

  2. LocalExecutor(local)

  3. YarnJobClusterExecutor(per-job)

  4. YarnSessionClusterExecutor(yarn-session)


那么具体是如何选择factory呢?由PipelineExecutorServiceLoader接口来完成,其只有一个实现类DefaultExecutorServiceLoader, 透过命名你可能会才想到这里面用到了ServiceLoader,你的猜想是正确的,它就是通过SPI机制去加载flink所提供的不同factory,在META-INF.services 下可以找到其对应的配置:


DefaultExecutorServiceLoader.java部分源码

//SPI机制
private static final ServiceLoader<PipelineExecutorFactory> defaultLoader = ServiceLoader.load(PipelineExecutorFactory.class);//获取对应的factory
public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {checkNotNull(configuration);final List<PipelineExecutorFactory> compatibleFactories = new ArrayList<>();final Iterator<PipelineExecutorFactory> factories = defaultLoader.iterator();while (factories.hasNext()) {try {final PipelineExecutorFactory factory = factories.next();//判断标准 根据任务启动配置if (factory != null && factory.isCompatibleWith(configuration)) {compatibleFactories.add(factory);}} catch (Throwable e) {if (e.getCause() instanceof NoClassDefFoundError) {LOG.info("Could not load factory due to missing dependencies.");} else {throw e;}}}//只能有一个factory符合要求if (compatibleFactories.size() > 1) {final String configStr =configuration.toMap().entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).collect(Collectors.joining("\n"));throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");}return compatibleFactories.isEmpty() ? null : compatibleFactories.get(0);
}

ServiceLoader.load(PipelineExecutorFactory.class) 会从类路径的META-INF.services下找到PipelineExecutorFactory的全路径文件,然后实例化出所有的factory,通过PipelineExecutorFactory.isCompatibleWith找到匹配的factory。

此处的PipelineExecutor体系参考:【Flink】Flink PipelineExecutor 体系

【Flink】FLink PipelineExecutorFactory 基于工厂模式的任务提交与SPI机制相关推荐

  1. php实现工厂模式,PHP基于工厂模式实现的计算器实例

    本文实例讲述了PHP基于工厂模式实现的计算器.分享给大家供大家参考.具体如下: abstract class Calculator { private $number1; private $numbe ...

  2. Python 设计模式之工厂模式

    工厂模式是一个在软件开发中用来创建对象的设计模式. 工厂模式包涵一个超类.这个超类提供一个抽象化的接口来创建一个特定类型的对象,而不是决定哪个对象可以被创建. 为了实现此方法,需要创建一个工厂类并返回 ...

  3. 二、工厂模式——在工厂里能找到你的对象

    文章目录 工厂模式 1. 简单工厂 2. 缓存方式 3. 工厂模式 4. 工厂的工厂 5. 抽象工厂 6. 工厂模式与IOC容器 总结 设计模式是面向问题.场景而总结产生的设计思路.是解决问题的套路. ...

  4. 策略模式+工厂模式(反射)+枚举代替 大量 if..else if..

    实际项目中我们经常碰到需要使用if-else-if的分支判断这种情况. 这种写法带来一些弊端. 一旦分支多太多,逻辑复杂,会导致代码十分冗长,增加阅读难度. 如果需要增加或减少分支,需要改动if-el ...

  5. 浅谈Spring框架应用的设计模式(一)——工厂模式

    文章目录 前言 一.工厂模式介绍 1.简单工厂模式 (1)静态工厂模式 (2)利用反射机制实现的简单工厂 2.工厂方法模式 3.抽象工厂模式 二.Spring框架中工厂模式的重要应用 1.BeanFa ...

  6. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [二] 请大家看原文去. 接上文Flink 1.12.2 源码分析 : yarn-per-job模式浅析 [一 ...

  7. 2021年大数据Flink(三):​​​​​​​Flink安装部署 Local本地模式

    目录 Flink安装部署 Local本地模式 原理 操作 测试 Flink安装部署 Flink支持多种安装模式 - Local-本地单机模式,学习测试时使用 - Standalone-独立集群模式,F ...

  8. Flink运行时架构及各部署模式下作业提交流程

    1.运行时架构 1.1 核心组件 1.1.1 JobManager 作业管理器,对于一个提交执行的作业,JobManager 是真正意义上的"管理者"(Master),负责管理调度 ...

  9. 【设计模式-手写源码-附1】-简单工厂模式-基于魔兽争霸冰封王座

    1:主题拆解 ①依赖倒置原则-SimpleFactory ②简单工厂+ 配置文件=可配置 ③简单工厂+ 配置文件+反射=可配置可扩展 ④简单工厂升级IOC控制反转 2:基本介绍 ①学习设计模式的套路: ...

最新文章

  1. 浅谈话题模型:LSA、PLSA、LDA
  2. 新登月计划!阿里云ET城市大脑成为国家AI开放创新平台
  3. Cordova error:npm install -g ios-deploy
  4. File转化为MultipartFile
  5. oracle tax 中国税,oracle_TAX_税基础设置操作手册.doc
  6. Mysql学习(三)之数据库管理工具Navicat
  7. VTK:Shaders之BozoShader
  8. PRICAI 2016 论文精选 | 基于车辆优先级优化交通系统的道路分布
  9. apache 配置虚拟域名默认站点问题
  10. 《纳什均衡与博弈论》纳什博弈论及对自然法则的研究
  11. 解决pr调用麦克风的问题
  12. 李四光预测地震 中国60年内将有4次特大地震
  13. 一学就会的虚拟化技术之hyper-v桌面虚拟化
  14. mysql jion on 三表_MySQL 三表连接(join)
  15. rails rjs select method help
  16. CMake I 设置编译器标志
  17. 对付不良商家,恶补攒机知识
  18. 最长公共子序列、最长连续公共子序列、最长递增子序列
  19. Linux编程定时执行某函数
  20. 痞子衡嵌入式:大话双核i.MXRT1170之单独在线调试从核工程的方法(IAR篇)

热门文章

  1. Reno7系列全球首发IMX709超感光猫眼镜头:OPPO/索尼联合打造
  2. 小米平板5有望8月发布:骁龙870+120Hz刷新率屏
  3. 百度APP月活跃用户达5.6亿,日登录用户占比超75%
  4. 闲鱼的真正用法,其实是找对象
  5. 首发联发科天玑820!Redmi 10X发布:售价1599元起
  6. iOS 14代码泄露iPhone 12系列细节:有且仅有两款配备ToF 3D镜头
  7. 快手春晚10亿元红包玩法来了:最高得2020元现金
  8. 倒了血霉!先是贾跃亭后有罗永浩,被拖欠4400多万,最惨公司无疑了
  9. 荣耀9X将搭载麒麟810处理器:全球四大7nm芯片之一无法低调
  10. 盖茨庆祝万维网诞生30周年 庆幸自己有机会影响数字革命