执行具体任务的TaskManager在要执行向相应的具体的任务之前,都是通过submitTask()方法得到具体所要执行的任务的。

在submitTask()中,部署的任务信息并不包含具体所要执行的目标任务类jar包。

所要任务的抽象是Task类。其实现了Runnnable接口,自然提供了run()方法可提供给线程进行调用。在其构造方法中,以自身为target作为Thread构造函数的参数,得到可执行的线程对象。

executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);

但是在Task()的构造方法中,并没有直接直接执行开始线程的调用,而是在submitTask()方法中调用了startTaskThread()方法。

public void startTaskThread() {executingThread.start();
}

在这时,才是开始了Task的run()方法。

在Task()的构造方法,只给出了job的执行类名和和blob类型的jar包名和classpath名,在Task的run()方法中,也将根据这些信息加载得到目标任务的实现类并执行。

首先,会通过createUserCodeClassLoader()方法去得到目标任务类的类加载器。

userCodeClassLoader = createUserCodeClassloader();private ClassLoader createUserCodeClassloader() throws Exception {long startDownloadTime = System.currentTimeMillis();// triggers the download of all missing jar files from the job managerlibraryCache.registerTask(jobId, executionId, requiredJarFiles, requiredClasspaths);LOG.debug("Getting user code class loader for task {} at library cache manager took {} milliseconds",executionId, System.currentTimeMillis() - startDownloadTime);ClassLoader userCodeClassLoader = libraryCache.getClassLoader(jobId);if (userCodeClassLoader == null) {throw new Exception("No user code classloader available.");}return userCodeClassLoader;
}

在这个方法中涉及到了libraryCache。

libraryCache是一个LiberaryCacheManager类型的对象,用来缓存下载得到的目标jar包,具体实现类为BlobLibraryCacheManager。

@Override
public void registerTask(JobID jobId,ExecutionAttemptID task,@Nullable Collection<PermanentBlobKey> requiredJarFiles,@Nullable Collection<URL> requiredClasspaths) throws IOException {checkNotNull(jobId, "The JobId must not be null.");checkNotNull(task, "The task execution id must not be null.");if (requiredJarFiles == null) {requiredJarFiles = Collections.emptySet();}if (requiredClasspaths == null) {requiredClasspaths = Collections.emptySet();}synchronized (lockObject) {LibraryCacheEntry entry = cacheEntries.get(jobId);if (entry == null) {URL[] urls = new URL[requiredJarFiles.size() + requiredClasspaths.size()];int count = 0;try {// add URLs to locally cached JAR filesfor (PermanentBlobKey key : requiredJarFiles) {urls[count] = blobService.getFile(jobId, key).toURI().toURL();++count;}// add classpathsfor (URL url : requiredClasspaths) {urls[count] = url;++count;}cacheEntries.put(jobId, new LibraryCacheEntry(requiredJarFiles, requiredClasspaths, urls, task, classLoaderResolveOrder, alwaysParentFirstPatterns));} catch (Throwable t) {// rethrow or wrapExceptionUtils.tryRethrowIOException(t);throw new IOException("Library cache could not register the user code libraries.", t);}} else {entry.register(task, requiredJarFiles, requiredClasspaths);}}
}

其中registerTask()方法,正是根据jobId检查是否已经下载得到了相应的jar包,如果没有,则需要前去下载相应的jar包,需要相应的jar包名和classpath名。

在这里新下载得到的jar包转为url作为cacheEnty的一部分,此时,针对这一部分url的类加载器也生成完毕作为cacheEntry的一部分,缓存在LiberaryCacheManager中。

在通过registerTask()方法确保执行任务的对应jar包下载完毕之后,根据jobid得到对应的类加载器,即可准备加载相应的任务类,执行相应的类。

回到run()方法中,通过loadAndInstantiateInvokable()方法加载并实例化目标类。

private static AbstractInvokable loadAndInstantiateInvokable(ClassLoader classLoader,String className,Environment environment) throws Throwable {final Class<? extends AbstractInvokable> invokableClass;try {invokableClass = Class.forName(className, true, classLoader).asSubclass(AbstractInvokable.class);} catch (Throwable t) {throw new Exception("Could not load the task's invokable class.", t);}Constructor<? extends AbstractInvokable> statelessCtor;try {statelessCtor = invokableClass.getConstructor(Environment.class);} catch (NoSuchMethodException ee) {throw new FlinkException("Task misses proper constructor", ee);}// instantiate the classtry {//noinspection ConstantConditions  --> cannot happenreturn statelessCtor.newInstance(environment);} catch (InvocationTargetException e) {// directly forward exceptions from the eager initializationthrow e.getTargetException();} catch (Exception e) {throw new FlinkException("Could not instantiate the task's invokable class.", e);}
}

这里通过得到的类加载器加载相应的类,并取得envirment为参数的构造方法,通过该构造方法实例化一个类 ,作为Task中实际被调用的类的实例,接下来将准备被正是调用。

flink 任务执行类的加载相关推荐

  1. Java类的加载和代码执行顺序

    关于类加载和初始化相关的案例 总的顺序是:先父类后子类,先静态后动态,属性和代码块的初始化遵循正常的出场顺序无论是静态还是动态,但是他们总是先于构造器执行.但是还是需要通过题目的学习来加深我们的理解. ...

  2. JVM-01:类的加载机制

    本文从 纯洁的微笑的博客 转载 原地址:http://www.ityouknow.com/jvm.html 类的加载机制 1.什么是类的加载 类的加载指的是将类的.class文件中的二进制数据读入到内 ...

  3. Java虚拟机中 类的加载过程

    Java中 类的加载过程 例如下面的一段简单的代码 public class HelloWorld {public static void main(String[] args) {System.ou ...

  4. Android类动态加载技术

    Android类动态加载技术 Android应用开发在一般情况下,常规的开发方式和代码架构就能满足我们的普通需求.但是有些特殊问题,常常引发我们进一步的沉思.我们从沉思中产生顿悟,从而产生新的技术形式 ...

  5. java虚拟机学习(四)类的加载过程

    2019独角兽企业重金招聘Python工程师标准>>> 类从虚拟机内存加载到从内存卸载,经历的生命周期是:加载,验证,准备,解析,初始化,使用,卸载这几个阶段, 其中验证,解析,初始 ...

  6. php自动加载类与路由,PHP实现路由与类自动加载步骤详解

    这次给大家带来PHP实现路由与类自动加载步骤详解,PHP实现路由与类自动加载步骤详解的注意事项有哪些,下面就是实战案例,一起来看一下. 项目目录如下 入口文件index.php<?php def ...

  7. Java虚拟机 —— 类的加载机制

    我们知道class文件中存储了类的描述信息和各种细节的数据,在运行Java程序时,虚拟机需要先将类的这些数据加载到内存中,并经过校验.转换.解析和初始化过后,最终形成可以直接使用的Java类型. 类从 ...

  8. 初识jvm-1.Java类的加载机制

    转载: jvm系列---纯洁的微笑 地址: http://www.ityouknow.com/jvm.html 1.什么是类的加载 类的加载指的是将类的.class文件中的二进制数据读入到内存中,将其 ...

  9. 从JVM看类的加载过程与对象实例化过程

    一. 类的加载过程 1. 类的加载过程大致是个什么过程? 我们编写产生.java文件,这些.java文件经过Java编译器编译成拓展名为.class的文件,.class文件中保存着Java代码经转换后 ...

最新文章

  1. 石头剪刀布python代码_我的第一个python程序,石头剪刀布猜拳游戏
  2. spark学习13(spark RDD)
  3. 2021HDU多校9 - 7073 Integers Have Friends 2.0(随机数)
  4. 在一个数组中删除另一个数组存在的值
  5. 给数组添加自定义方法
  6. hashCode到底有什么用?
  7. java aes 解密 文件_Java AES文件加解密
  8. 选择排序、插入排序、冒泡排序、希尔排序算法的总结 - 复杂度、实现和稳定性
  9. glide加载gif图不显示动画_用Python绘制会动的柱形竞赛图
  10. sqoop同步时间戳到mysql_在sqoop导入中使用24小时时间戳
  11. 天正坐标标注显示不全_广联达导入CAD图纸不显示怎么办?
  12. android之签名md5
  13. php 压缩及解压文件,php zip文件的解压与压缩
  14. Linux之镜像源篇
  15. java 二进制转换十六进制的方法_Java 中二进制转换成十六进制的两种实现方法...
  16. eclipse中的英文与汉语对照表
  17. 第6章 访问权限控制
  18. 【Rust 笔记】08-枚举与模式
  19. VS插件——番茄助手快捷键的使用教程
  20. ZOJ 1138 Symbolic Derivation

热门文章

  1. JavaScript之判断用户登录信息
  2. SNMP客户端工具MIB Browser
  3. SpringCloud创建Config Client配置读取
  4. Oracle中一般游标与REF游标的区别
  5. vue——路由router
  6. JDK的下载、安装和配置
  7. CentOS 7设置开机启动服务,添加自定义系统服务(Redis为例,绝对有效)
  8. java多线程的安全问题与死锁(面向厕所编程)
  9. Linux运维面试题之--网页打开缓慢如何优化
  10. Golang 学习笔记(08)—— 文件操作