执行具体任务的TaskManager在要执行向相应的具体的任务之前,都是通过submitTask()方法得到具体所要执行的任务的。

在submitTask()中,部署的任务信息并不包含具体所要执行的目标任务类jar包。

所要任务的抽象是Task类。其实现了Runnnable接口,自然提供了run()方法可提供给线程进行调用。在其构造方法中,以自身为target作为Thread构造函数的参数,得到可执行的线程对象。

executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);

但是在Task()的构造方法中,并没有直接直接执行开始线程的调用,而是在submitTask()方法中调用了startTaskThread()方法。

public void startTaskThread() {executingThread.start();
}

在这时,才是开始了Task的run()方法。

在Task()的构造方法,只给出了job的执行类名和和blob类型的jar包名和classpath名,在Task的run()方法中,也将根据这些信息加载得到目标任务的实现类并执行。

首先,会通过createUserCodeClassLoader()方法去得到目标任务类的类加载器。

userCodeClassLoader = createUserCodeClassloader();private ClassLoader createUserCodeClassloader() throws Exception {long startDownloadTime = System.currentTimeMillis();// triggers the download of all missing jar files from the job managerlibraryCache.registerTask(jobId, executionId, requiredJarFiles, requiredClasspaths);LOG.debug("Getting user code class loader for task {} at library cache manager took {} milliseconds",executionId, System.currentTimeMillis() - startDownloadTime);ClassLoader userCodeClassLoader = libraryCache.getClassLoader(jobId);if (userCodeClassLoader == null) {throw new Exception("No user code classloader available.");}return userCodeClassLoader;
}

在这个方法中涉及到了libraryCache。

libraryCache是一个LiberaryCacheManager类型的对象,用来缓存下载得到的目标jar包,具体实现类为BlobLibraryCacheManager。

@Override
public void registerTask(JobID jobId,ExecutionAttemptID task,@Nullable Collection<PermanentBlobKey> requiredJarFiles,@Nullable Collection<URL> requiredClasspaths) throws IOException {checkNotNull(jobId, "The JobId must not be null.");checkNotNull(task, "The task execution id must not be null.");if (requiredJarFiles == null) {requiredJarFiles = Collections.emptySet();}if (requiredClasspaths == null) {requiredClasspaths = Collections.emptySet();}synchronized (lockObject) {LibraryCacheEntry entry = cacheEntries.get(jobId);if (entry == null) {URL[] urls = new URL[requiredJarFiles.size() + requiredClasspaths.size()];int count = 0;try {// add URLs to locally cached JAR filesfor (PermanentBlobKey key : requiredJarFiles) {urls[count] = blobService.getFile(jobId, key).toURI().toURL();++count;}// add classpathsfor (URL url : requiredClasspaths) {urls[count] = url;++count;}cacheEntries.put(jobId, new LibraryCacheEntry(requiredJarFiles, requiredClasspaths, urls, task, classLoaderResolveOrder, alwaysParentFirstPatterns));} catch (Throwable t) {// rethrow or wrapExceptionUtils.tryRethrowIOException(t);throw new IOException("Library cache could not register the user code libraries.", t);}} else {entry.register(task, requiredJarFiles, requiredClasspaths);}}
}

其中registerTask()方法,正是根据jobId检查是否已经下载得到了相应的jar包,如果没有,则需要前去下载相应的jar包,需要相应的jar包名和classpath名。

在这里新下载得到的jar包转为url作为cacheEnty的一部分,此时,针对这一部分url的类加载器也生成完毕作为cacheEntry的一部分,缓存在LiberaryCacheManager中。

在通过registerTask()方法确保执行任务的对应jar包下载完毕之后,根据jobid得到对应的类加载器,即可准备加载相应的任务类,执行相应的类。

回到run()方法中,通过loadAndInstantiateInvokable()方法加载并实例化目标类。

private static AbstractInvokable loadAndInstantiateInvokable(ClassLoader classLoader,String className,Environment environment) throws Throwable {final Class<? extends AbstractInvokable> invokableClass;try {invokableClass = Class.forName(className, true, classLoader).asSubclass(AbstractInvokable.class);} catch (Throwable t) {throw new Exception("Could not load the task's invokable class.", t);}Constructor<? extends AbstractInvokable> statelessCtor;try {statelessCtor = invokableClass.getConstructor(Environment.class);} catch (NoSuchMethodException ee) {throw new FlinkException("Task misses proper constructor", ee);}// instantiate the classtry {//noinspection ConstantConditions  --> cannot happenreturn statelessCtor.newInstance(environment);} catch (InvocationTargetException e) {// directly forward exceptions from the eager initializationthrow e.getTargetException();} catch (Exception e) {throw new FlinkException("Could not instantiate the task's invokable class.", e);}
}

这里通过得到的类加载器加载相应的类,并取得envirment为参数的构造方法,通过该构造方法实例化一个类 ,作为Task中实际被调用的类的实例,接下来将准备被正是调用。

flink 任务执行类的加载相关推荐

  1. Java类的加载和代码执行顺序

    关于类加载和初始化相关的案例 总的顺序是:先父类后子类,先静态后动态,属性和代码块的初始化遵循正常的出场顺序无论是静态还是动态,但是他们总是先于构造器执行.但是还是需要通过题目的学习来加深我们的理解. ...

  2. JVM-01:类的加载机制

    本文从 纯洁的微笑的博客 转载 原地址:http://www.ityouknow.com/jvm.html 类的加载机制 1.什么是类的加载 类的加载指的是将类的.class文件中的二进制数据读入到内 ...

  3. Java虚拟机中 类的加载过程

    Java中 类的加载过程 例如下面的一段简单的代码 public class HelloWorld {public static void main(String[] args) {System.ou ...

  4. Android类动态加载技术

    Android类动态加载技术 Android应用开发在一般情况下,常规的开发方式和代码架构就能满足我们的普通需求.但是有些特殊问题,常常引发我们进一步的沉思.我们从沉思中产生顿悟,从而产生新的技术形式 ...

  5. java虚拟机学习(四)类的加载过程

    2019独角兽企业重金招聘Python工程师标准>>> 类从虚拟机内存加载到从内存卸载,经历的生命周期是:加载,验证,准备,解析,初始化,使用,卸载这几个阶段, 其中验证,解析,初始 ...

  6. php自动加载类与路由,PHP实现路由与类自动加载步骤详解

    这次给大家带来PHP实现路由与类自动加载步骤详解,PHP实现路由与类自动加载步骤详解的注意事项有哪些,下面就是实战案例,一起来看一下. 项目目录如下 入口文件index.php<?php def ...

  7. Java虚拟机 —— 类的加载机制

    我们知道class文件中存储了类的描述信息和各种细节的数据,在运行Java程序时,虚拟机需要先将类的这些数据加载到内存中,并经过校验.转换.解析和初始化过后,最终形成可以直接使用的Java类型. 类从 ...

  8. 初识jvm-1.Java类的加载机制

    转载: jvm系列---纯洁的微笑 地址: http://www.ityouknow.com/jvm.html 1.什么是类的加载 类的加载指的是将类的.class文件中的二进制数据读入到内存中,将其 ...

  9. 从JVM看类的加载过程与对象实例化过程

    一. 类的加载过程 1. 类的加载过程大致是个什么过程? 我们编写产生.java文件,这些.java文件经过Java编译器编译成拓展名为.class的文件,.class文件中保存着Java代码经转换后 ...

最新文章

  1. 虚拟现实大会ChinaVR2015报告之-From Visual Content to Virtual Reality Data-driven Intelligence Production
  2. java销售_销售转向java编程的开始之路
  3. php cgi路径解析,php.ini中的cgi.fix_pathinfo选项
  4. JDBC连接MySQL数据库代码模板
  5. NumPy Beginner's Guide 2e 带注释源码 二、NumPy 基础入门
  6. c语言程序设计万年历的显示,C语言程序设计万年历
  7. ubuntu16 下 源码配置Lnmp环境
  8. Chrome Frame
  9. Linux中文档与目录的特殊权限
  10. Java 程序设计基础知识
  11. Java代码审计: ClassLoader应用
  12. 强化学习平台安装 Mujoco、mujoco-py、gym、baseline
  13. 科赫雪花java_科赫雪花的Java递归实现
  14. 艾尔米特插值的MATLAB实现,埃尔米特(Hermite)插值
  15. Linux scp远程文件/目录传输 用ps和grep命令寻找僵尸进程
  16. Cisco(62)——PBR策略路由案例
  17. Python安装失败0x80070642错误解决方法
  18. DirectX、Direct3D、OpenGL的区别(DX、D3D、OpenGL)
  19. MARA常规物料数据
  20. antlr (updating)

热门文章

  1. Kerberos加密级别不支持的问题
  2. C# Windows Workflow Fundation之状态机
  3. xss绕过字符过滤_XSS绕过实战练习
  4. 宁夏公安打传销端窝点为春节保平安
  5. 看看async,await 是如何简化异步的调用WCF!
  6. HDU 5752 Sqrt Bo【枚举,大水题】
  7. Oracle技术之索引与Null值对于Hints及执行计划的影响
  8. java json格式化工具类
  9. shell 将两行内容合并到同一行
  10. 更新FreeBSD Ports的方法