导读: 最近在对 Flink 进行平台化,基于 REST API 构建一个平台实现通过纯 SQL 化编写和管理 Job。尽管 Flink官方希望用户将所有的依赖和业务逻辑打成一个fat jar,这样方便提交。但我们在开发的过程中想对用户自定义 UDF Jar 进行管理,想将 UDF Jar 存储管理在阿里云 OSS ,在 Job 中通过动态加载的方式将 UDF Jar 加载进来,取代之前将 UDF 和 Job 打成一个 fat jar 的方式。下面将从几点展开讨论:

  • 将 UDF 写到 Job 中并打成一个 fat jar 的实现方式
  • 动态加载 UDF Jar 代码调整
  • 代码调整后存在的问题
  • 解决 UDF Jar URL 分发的思路

环境

  • Flink 1.11.2
  • 部署方式:Flink on Kubernetes
  • 部署模式: Session Cluster

将 UDF 写到 Job 中并打成一个 fat jar 的方式

下面是一个简单采用 FlinkSQL 编写 Job 的例子。使用 datagen 连接器作为 Source 生成数据, print 作为 Sink 将结果打印到控制台。自定义的一个简单 UDF自定义函数(returnSelf)。

   public static void main(String[] args) throws Exception {        //创建流运行时环境        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();        //采用BlinkPlanner        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();        //创建StreamTable环境        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);        bsEnv.setParallelism(1);        bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'");        bsTableEnv.executeSql("CREATE TABLE sourceTable (" +                "    f_sequence INT," +                "    f_random INT," +                "    f_random_str STRING," +                "    ts AS localtimestamp," +                "    WATERMARK FOR ts AS ts" +                "    ) WITH (" +                "    'connector' = 'datagen'," +                "    'rows-per-second'='5'," +                "    'fields.f_sequence.kind'='sequence'," +                "    'fields.f_sequence.start'='1'," +                "    'fields.f_sequence.end'='1000'," +                "    'fields.f_random.min'='1'," +                "    'fields.f_random.max'='1000'," +                "    'fields.f_random_str.length'='10'" +                ")");        bsTableEnv.executeSql("CREATE TABLE sinktable (" +                "   f_random_str STRING" +                ") WITH (" +                "   'connector' = 'print'" +                ")");        bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable");    }

要将该 Job 提交给远程 Flink 集群时,我们需要将 Job(包括自定义 UDF) 打成一个 fat Jar。但这并不是我们期望的操作,由于打成 fat jar 会显得比较臃肿,同时不方便管理 UDF Jar ,有些 UDF 具有通用性,可复用。所以我们希望将自定义的UDF Jar 独立出来保存管理,并在 Job 中通过动态加载的方式使用,如下图:

动态加载 UDF Jar 代码调整

  • 将 returnSelf 并独立打成一个 UDF Jar 上传到阿里云OSS。
  • 在 Job 的 main() 方法中新增动态加载的代码
 public static void main(String[] args) throws Exception {        //创建流运行时环境        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();        //采用BlinkPlanner        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();        //创建StreamTable环境        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);        bsEnv.setParallelism(1);        // 动态加载        String funJarPath = "UDF jar 在 OSS 中所在的 URL 路径";        loadJar(new URL(funJarPath));        bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'");        bsTableEnv.executeSql("CREATE TABLE sourceTable (" +                "    f_sequence INT," +                "    f_random INT," +                "    f_random_str STRING," +                "    ts AS localtimestamp," +                "    WATERMARK FOR ts AS ts" +                "    ) WITH (" +                "    'connector' = 'datagen'," +                "    'rows-per-second'='5'," +                "    'fields.f_sequence.kind'='sequence'," +                "    'fields.f_sequence.start'='1'," +                "    'fields.f_sequence.end'='1000'," +                "    'fields.f_random.min'='1'," +                "    'fields.f_random.max'='1000'," +                "    'fields.f_random_str.length'='10'" +                ")");        bsTableEnv.executeSql("CREATE TABLE sinktable (" +                "   f_random_str STRING" +                ") WITH (" +                "   'connector' = 'print'" +                ")");        bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable");    }    //动态加载Jar    public static void loadJar(URL jarUrl) {        //从URLClassLoader类加载器中获取类的addURL方法        Method method = null;        try {            method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);        } catch (NoSuchMethodException | SecurityException e1) {            e1.printStackTrace();        }        // 获取方法的访问权限        boolean accessible = method.isAccessible();        try {            //修改访问权限为可写            if (accessible == false) {                method.setAccessible(true);            }            // 获取系统类加载器            URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();            //jar路径加入到系统url路径里            method.invoke(classLoader, jarUrl);        } catch (Exception e) {            e.printStackTrace();        } finally {            method.setAccessible(accessible);        }    }

修改后,我们将 UDF jar 存放到 OSS 中进行管理。当 Job 需要依赖某个 UDF 时,只需要通过动态加载就可以完成。动态加载使用 URLClassLoader 实现,使用被管理于 OSS 的 UDF Jar 的 URL 将 Jar 加载进 JVM 中,并取得 returnSelf 类。

代码调整后存在的问题

运行结果:代码调整后,在本地 IDEA 运行程序(即,启动了 Mini Cluster集群)是可以成功运行的。但是当发布到远程 Flink 集群上时(采用 Flink on K8S , Session Cluster 部署模式),会出现找不到 UDF 异常,如下:

Caused by: java.lang.ClassNotFoundException: flinksql.function.udf.ReturnSelf

分析:这是由于 Flink 的部署方式有多种。在本地运行的启动的是 MiniCluster,即 JobManager 和 TaskManager 在同一个JVM 进程中。而我们在远程部署 Flink on Kubernetes 的 Session Cluster 集群 JobManager 和 TaskManager 是不同的 JVM 进程。

在 Session 模式下,客户端在 main() 方法开始执行直到 env.execute() 方法之前需要完成以下三件事情

  • 获取作业所需的依赖项
  • 通过执行环境分析并取得逻辑计划,即StreamGraph→JobGraph
  • 将依赖项和JobGraph上传到集群中

只有在这些都完成之后,才会通过env.execute() 方法触发 Flink 运行时真正地开始执行作业。所以在本地运行的 Mini Cluster,因为都处于同一个 JVM 进程,客户端运行 main() 方法进行动态加载后将依赖项和 JobGraph 提交给 JobMananger 再由 TaskManager 执行 Job。

而当在远程集群时,客户端实现动态加载 Jar 后将依赖项和 JobGraph 提交给 JobMananger,但是由于 JobMananger 和 TaskMananger 是处于不同的 JVM进程中,且没有对自定义 UDF Jar URL 进行分发,这会让 TaskMananger 在运行任务时出现 Class Not Found 异常,这是因为 TaskMananger 没有进行类加载,JVM 中没有 returnSelf 类所导致。

解决 UDF Jar 分发的思路

基于以上问题我们查阅了一些相关资料及阅读源码,以以下三点为条件

  • 基于采用 Session 模式部署
  • 基于 REST API 提交 Job 而不采用命令行方式
  • 不改动 Flink 源码

分析:官网提供了一个 -C 参数,大致用法就是把用户自定义 Jar 放到一个 JobMananger 和 TaskMananger 都能访问到的存储地方,然后通过命令行方式启动 Job 时使用 -C 参数,后面加上自定义 Jar 的URLs 就可以实现分发。

但是我们平台由于采用 REST API,而提交 Job 的 API 并没有提供该参数,所以在不改变 Flink 源码的前提下进行源码研究,最后发现可以在 main 中将 UDF Jar 的 URL 加到配置项 pipeline.classpaths 中,也就是曲线救国实现了 -C 的效果。在 main 中增加以下代码片段:

Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration");        configurationField.setAccessible(true);        Configuration o = (Configuration)configurationField.get(bsEnv);        Field confData = Configuration.class.getDeclaredField("confData");        confData.setAccessible(true);        Map temp = (Map)confData.get(o);        List jarList = new ArrayList<>();        jarList.add(funJarPath);        temp.put("pipeline.classpaths",jarList);

完整代码

public static void main(String[] args) throws Exception {        //创建流运行时环境        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();        //采用BlinkPlanner        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();        //创建StreamTable环境        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);        bsEnv.setParallelism(1);        // 动态加载        String funJarPath = "UDF jar 在 OSS 中所在的 URL 路径";        loadJar(new URL(funJarPath));        Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration");        configurationField.setAccessible(true);        Configuration o = (Configuration)configurationField.get(bsEnv);        Field confData = Configuration.class.getDeclaredField("confData");        confData.setAccessible(true);        Map temp = (Map)confData.get(o);        List jarList = new ArrayList<>();        jarList.add(funJarPath);        temp.put("pipeline.classpaths",jarList);        bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'");        bsTableEnv.executeSql("CREATE TABLE sourceTable (" +                "    f_sequence INT," +                "    f_random INT," +                "    f_random_str STRING," +                "    ts AS localtimestamp," +                "    WATERMARK FOR ts AS ts" +                "    ) WITH (" +                "    'connector' = 'datagen'," +                "    'rows-per-second'='5'," +                "    'fields.f_sequence.kind'='sequence'," +                "    'fields.f_sequence.start'='1'," +                "    'fields.f_sequence.end'='1000'," +                "    'fields.f_random.min'='1'," +                "    'fields.f_random.max'='1000'," +                "    'fields.f_random_str.length'='10'" +                ")");        bsTableEnv.executeSql("CREATE TABLE sinktable (" +                "   f_random_str STRING" +                ") WITH (" +                "   'connector' = 'print'" +                ")");        bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable");    }    //动态加载Jar    public static void loadJar(URL jarUrl) {        //从URLClassLoader类加载器中获取类的addURL方法        Method method = null;        try {            method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);        } catch (NoSuchMethodException | SecurityException e1) {            e1.printStackTrace();        }        // 获取方法的访问权限        boolean accessible = method.isAccessible();        try {            //修改访问权限为可写            if (accessible == false) {                method.setAccessible(true);            }            // 获取系统类加载器            URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();            //jar路径加入到系统url路径里            method.invoke(classLoader, jarUrl);        } catch (Exception e) {            e.printStackTrace();        } finally {            method.setAccessible(accessible);        }    }

最后

以上就是在 Flink on K8S 集群 Session 模式下, FlinkSQL 动态加载 Jar 的解决方案。由于 REST API 没有提供 -C 效果,自定义 Jar URL 没有分发到 TaskMananger,导致 TaskMananger 没有进行类加载到其 JVM 中。通过在 Job 的 main 方法中增加动态加载方法及配置 pipeline.classpaths,可以达到不改动 Flink 源码的情况下实现 -C 效果。以上方案刚实现不久,还不保证是否有其他未知的问题,如果有更好的解决方案或者该方案中存在错误或者疏漏也欢迎提出共同讨论。

感谢您的阅读,如果喜欢本文欢迎关注和转发,本头条号将坚持持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。

flinksql获取系统当前时间搓_FlinkSQL 动态加载 UDF 实现思路相关推荐

  1. flink sql udf jar包_FlinkSQL 动态加载 UDF 实现思路

    导读: 最近在对 Flink 进行平台化,基于 REST API 构建一个平台实现通过纯 SQL 化编写和管理 Job.尽管 Flink官方希望用户将所有的依赖和业务逻辑打成一个fat jar,这样方 ...

  2. flinksql获取系统当前时间搓_sql 时间戳

    SQL 时间戳转换为日期 原文:SQL 时间戳转换为日期 DATEADD(s, Timestamp + 8*3600, '1970-01-01 00:00:00') 其中Timestamp为10位的时 ...

  3. flinksql获取系统当前时间搓_DNF:从剑魂角度看工作服,不仅不是地摊货,更是超越了手搓套...

    工作服一直以来都得不到认可,被很多玩家调侃为"地摊货",主要原因还是在于,可以直接升级获取,好像牌面是有那么一点低,但从另一方面来说,工作服也只能从升级获取,就算欧皇也拿它没办法, ...

  4. QQ 玩一玩获取用户图像昵称以及CocosCreator动态加载网络图片

    文章目录 1.CocosCreator 加载图片的几种方式 2.QQ 玩一玩通过openId获取用户图像.昵称 3. 源码 QQ 玩一玩获取用户图像.昵称以及CocosCreator加载图片的几种方式 ...

  5. ceisum 加载geojson,使用 Cesium 动态加载 GeoJSON 数据

    前言 需求是这样的,我需要在地图中显示 08 年到现在的地震情况,地震都是发生在具体的时间点的,那么问题就来了,如何实现地震情况按照时间动态渲染而不是一次全部加载出来. 一. 方案分析 这里面牵扯到两 ...

  6. android jar 加入图片,Android动态加载外部jar包及jar包中图片等资源文件

    Android动态加载外部jar包及jar包中图片等资源文件 Android应用程序由Java开发,因此Java中许多实用的特性,在Android中也有体现.动态加载Class,也就是外部jar包,在 ...

  7. Android安全与逆向之Dex动态加载

     Dex动态加载是为了解决什么问题? 在Android系统中,一个App的所有代码都在一个Dex文件里面. Dex是一个类似Jar的存储了多个Java编译字节码的归档文件. 因为Android系统 ...

  8. python 反射和动态加载_Python的反射

    什么是反射 反射是一个很重要的概念,它可以把字符串映射到实例的变量或者实例的方法然后可以去执行调用.修改等操作.它有四个重要的方法: getattr 获取指定字符串名称的对象属性 setattr 为对 ...

  9. 十九、动态加载脚本和样式

    十九.动态加载脚本和样式 本章主要讲解上一章剩余的获取位置的DOM方法.动态加载脚本和样式.   1.元素位置 上一章已经通过几组属性可以获取元素所需的位置,那么这节课补充一个DOM的方法:getBo ...

最新文章

  1. Linux工业嵌入式应用
  2. VLC 关键模块结构分析
  3. 网络上的FreeBSD在线文档
  4. SAP ABAP规划 使用LOOP READ TABLE该方法取代双LOOP内部表的方法
  5. 微型计算机选用要点,微型计算机原理以及应用考试_new要点分析.doc
  6. vue控制元素的隐藏和显示
  7. python selenium chrome获取每个请求内容_python+selenium调用chrome打开网址获取内容
  8. wget在线扒站程序php源码
  9. iOS 对arc的一点深入理解
  10. 上下求索——基于双向推理的多跳知识库问答技术
  11. 麻将游戏软件————附带核心算法
  12. php error unexpected,PHP错误syntax error unexpected T-FUNCTION的解决方案-深圳做网站-创络...
  13. Linux c 地址空间 堆栈 数据段 代码段 变量存储位置
  14. 以太坊 solidity msg对象
  15. Spring Boot项目 Spring Configuration Check Unmapped Spring configuration files found
  16. [Maven进阶]多环境配置与应用
  17. 编程语言-2-处理器架构、指令集和汇编语言
  18. jquery中的$()是什么
  19. Torch 入门教程
  20. oracle增加表空间大小

热门文章

  1. 2017.3.22 小z的袜子 思考记录
  2. Intel Sandy Bridge/Ivy Bridge架构/微架构/流水线 (9) - 流水线前端/微指令队列循环流侦测器LSD
  3. C# winform程序怎么打包成安装项目(图解)
  4. python tornado websocket_Python Tornado实现WEB服务器Socket服务器共存并实现交互的方法...
  5. 一个简单限速器的java实现[1]
  6. 调用另外一个文件_从零开始学Python-Day52-文件读写
  7. 迭代终止准则的三种形式_一种经验模态分解筛选迭代过程终止准则的方法与流程...
  8. 广度优先搜索_快速入门广度优先搜索
  9. 坏消息!FCC默许美国ISP在未经批准之情况下出售用户数据!
  10. 内存问题排查手段及相关文件介绍