点击上方 "zhisheng"关注, 星标或置顶一起成长

Flink 从入门到精通 系列文章

背景

项目中想要把flink做到平台化,只需要编辑sql便能把任务跑起来,开发过程中遇到一个问题,就是如何能够自动的加载自定义的函数包,因为项目中已经把main打包成一个通用的jar, 使用时只需要把sql信息用参数形式传入就可以. 但是如果sql中需要使用到udf,那么就需要实现flink的动态加载jar

先说结论

  • 在通用的jar main中通过反射使用类加载器,加载对应的jar包

  • 通过反射设置StreamExecutionEnvironment中的configuration的confData中的pipeline.classpaths

具体代码例子如下

public static void main(final String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);String path = "https://...est/template-core-0.0.1-shaded.jar";loadJar(new URL(path));Field configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration");configuration.setAccessible(true);Configuration o = (Configuration)configuration.get(env);Field confData = Configuration.class.getDeclaredField("confData");confData.setAccessible(true);Map<String,Object> temp = (Map<String,Object>)confData.get(o);List<String> jarList = new ArrayList<>();jarList.add(path);temp.put("pipeline.classpaths",jarList);tableEnvironment.executeSql("CREATE FUNCTION ReturnSelf2 AS 'flinksql.function.udf.ReturnSelf2'");tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +" f_sequence INT,\n" +" f_random INT,\n" +" f_random_str STRING,\n" +" ts AS localtimestamp,\n" +" WATERMARK FOR ts AS ts\n" +") WITH (\n" +" 'connector' = 'datagen',\n" +" 'rows-per-second'='5',\n" +"\n" +" 'fields.f_sequence.kind'='sequence',\n" +" 'fields.f_sequence.start'='1',\n" +" 'fields.f_sequence.end'='1000',\n" +"\n" +" 'fields.f_random.min'='1',\n" +" 'fields.f_random.max'='1000',\n" +"\n" +" 'fields.f_random_str.length'='10'\n" +")");tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +"    f_random_str STRING" +") WITH (\n" +"    'connector' = 'print'\n" +")");tableEnvironment.executeSql("insert into sinktable " +"select ReturnSelf2(f_random_str) " +"from sourceTable");
}
//动态加载Jar
public static void loadJar(URL jarUrl) {//从URLClassLoader类加载器中获取类的addURL方法Method method = null;try {method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);} catch (NoSuchMethodException | SecurityException e1) {e1.printStackTrace();}// 获取方法的访问权限boolean accessible = method.isAccessible();try {//修改访问权限为可写if (accessible == false) {method.setAccessible(true);}// 获取系统类加载器URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();//jar路径加入到系统url路径里method.invoke(classLoader, jarUrl);} catch (Exception e) {e.printStackTrace();} finally {method.setAccessible(accessible);}
}

再说解决过程

idea中loadjar的尝试

首先是loadJar方法是需要,在idea中,只是用了loadJar是可以加载外部udf包的,正常运行,但是当我打包放到集群上,则会提是找不到对应的class,猜测是因为idea开发中,启动的是一个minicluster, 所以我在idea中启动的时候, main调用loadJar方法, 把udf加载到minicluster JVM进程中, JM跟TM都是在同个JVM进程中运行的, 所以是可以正常启动. 但是如果是在集群环境, 即使是local standalone , 执行jps命令可以发现是JM跟TM是不同的进程的

所以是main即使在JM中 加载了,但是TM进程中没有加载

-C 参数的发现

后面发现flink命令行启动的时候可以添加-C参数, 可以指定其他的jar文件

执行命令

flink --help

所以可以使用如下命令, 我的jar是上传到阿里云的oss上使用的

flink run -C "https://oss-cn-hangzhou.aliyuncs.com/test/cxytest/tanwan-function-0.1.jar..." -c cn.xuying.flink.stream.SimpleTest /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar

但是我们项目平台是调用了集群的restapi来管理job的,而flink的restapi对应的接口是没有提供-C的之类的参数的, 也在flink中文邮件列表提了对应的问题,不过没有一个很好的答案

含泪跑源码pipeline.classpaths的发现

中途大佬的参与,发现了flink 客户端跑flinkjob的一些细节,

checkout源码,install,然后可以修改测试代码

找到测试的job打断点

运行test方法,可以发现env的里面的属性中多了-C参数url

然后我们不使用CliFrontendRunTest, 而是直接的执行那个TestJob的main函数, 断点可以发现没有了-C的配置

所以推测, flink 的client主要是解析你的命令行, 加一些参数配置, 然后执行你编写的flink main函数中的代码, 所以我们可以在我们自己的job代码中通过反射强制加入我们自己需要-C的配置 , 如文章开头的代码例子所示

基于 Apache Flink 的实时监控告警系统关于数据中台的深度思考与总结(干干货)日志收集Agent,阴暗潮湿的地底世界
2020 继续踏踏实实的做好自己

公众号(zhisheng)里回复 面经、ClickHouse、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章。
点个赞+在看,少个 bug ????

Flink 1.11 中的动态加载 udf jar 包相关推荐

  1. android jar 加入图片,Android动态加载外部jar包及jar包中图片等资源文件

    Android动态加载外部jar包及jar包中图片等资源文件 Android应用程序由Java开发,因此Java中许多实用的特性,在Android中也有体现.动态加载Class,也就是外部jar包,在 ...

  2. flink sql udf jar包_FlinkSQL 动态加载 UDF 实现思路

    导读: 最近在对 Flink 进行平台化,基于 REST API 构建一个平台实现通过纯 SQL 化编写和管理 Job.尽管 Flink官方希望用户将所有的依赖和业务逻辑打成一个fat jar,这样方 ...

  3. flinksql获取系统当前时间搓_FlinkSQL 动态加载 UDF 实现思路

    导读: 最近在对 Flink 进行平台化,基于 REST API 构建一个平台实现通过纯 SQL 化编写和管理 Job.尽管 Flink官方希望用户将所有的依赖和业务逻辑打成一个fat jar,这样方 ...

  4. vue+elementui 中src动态加载图片的时候不起作用

    vue+elementui 中src动态加载图片的时候不起作用 代码如下: <el-table-column align="center" label="宠物图片& ...

  5. Android插件化开发之动态加载本地皮肤包进行换肤

    Android插件化开发之动态加载本地皮肤包进行换肤 前言: 本文主要讲解如何用开源换肤框架 android-skin-loader-lib来实现加载本地皮肤包文件进行换肤,具体可自行参考框架原理进行 ...

  6. Android逆向:某薇直播通过ClassLoader加载的jar包解密

    声明:案例分析仅供学习交流使用,勿用于任何非法用途.如学习者进一步逆向并对版权方造成损失,请自行承担法律后果,本人概不负责. 简介 热修复和插件化是目前比较热门的技术,它们都是通过ClassLoade ...

  7. maven加载本地jar包,无法将本地jar包打进项目

    加载本地jar包,无法将jar打进包 1.问题介绍 ​ 项目依赖某个私有jar包,由于各种原因,没有将jar包打进本地maven库,选择使用放到项目lib下引用,但是打包时lib文件夹下的依赖包打不进 ...

  8. springboot加载第三方jar包淘宝sdk进行打包编译

    开发工具idea,开发springBoot电商项目,需要加载淘宝的sdk,以下有几种方式加载 通过这种方式加载第三方jar包,可以正常开发,但是打包部署服务器运行的时候会报错缺少jar包,无法打包的时 ...

  9. C#中如何动态加载DockPanel

    在WinForm项目中要求实现动态加载DockPanel. 简单研究了下,演示代码如下: 1 DockPanel runPanel = dockManager1.AddPanel(DockingSty ...

  10. 关于c#中 的动态加载程序集

    最近在写一个解析分析程序,需要动态加载卸载程序集(其实就是一个简单的插件框架),我的 思路是在主程序的目录下,创建一个assemblis目录,用来存放插件目录,如果加载插件时将其复制到 此目录,然后主 ...

最新文章

  1. 如何使用 Mmcv.exe 工具来管理群集消息队列资源[转]
  2. android xml png,android - 使用.png文件中的形状创建xml聊天气泡 - 堆栈内存溢出
  3. SAP标准成本核算重要环节详解
  4. .NET Core 控制台应用程序使用异步(Async)Main方法
  5. SpringBoot使用JdbcTemplate案例(学习笔记)
  6. 【短语学习】盈余量分析(earned value analysis)
  7. POST请求传入中文参数,接收端乱码
  8. JMail 应用实践(一)--- 简介用Java程序发送email
  9. win10系统自带的计算机无法使用吗,Win10如何使用系统自带的硬件设备检测工具?...
  10. Java 基础 匿名对象
  11. 安卓加密视频播放器使用教程
  12. delphi android 音乐播放器,Mcool音乐播放器
  13. 基于JAVA的旅游企业财务管理系统源码【包调试】
  14. CIS-Linux Centos7最新基线标准进行系统层面基线检测
  15. 翟菜花:5G时代的顺风车,智能音箱该怎么坐?
  16. 页面跳转的两种方式(转发和重定向)区别详解:
  17. 选购家用安防摄像头,考虑室内还是室外,无线还是有线
  18. vue+ elementui合并单元格后 checkbox多选单选取值问题
  19. 上传码云遇到git did not exit cleanly 的问题
  20. 在线旅游市场分析2014年数据

热门文章

  1. linux系统玩什么游戏,linux系统可以玩什么网游
  2. 秦九韶算法(java实现)
  3. Android 集成谷歌地图
  4. Windows10 添加打印机
  5. linux计算与检查md5值
  6. 《Adobe Photoshop CS6中文版经典教程》目录—导读
  7. 大学计算机应用基础第二版习题答案,大学计算机应用基础练习题库及答案.docx...
  8. java 加减乘除_加减乘除运算(Java)
  9. Java和Spring:发送邮件(以QQ邮箱为例)
  10. GPS 入门 7 —— GPS定位、LSB基站定位、wifi定位区别