Flink 1.11 中的动态加载 udf jar 包
点击上方 "zhisheng"关注, 星标或置顶一起成长
Flink 从入门到精通 系列文章
背景
项目中想要把flink做到平台化,只需要编辑sql便能把任务跑起来,开发过程中遇到一个问题,就是如何能够自动的加载自定义的函数包,因为项目中已经把main打包成一个通用的jar, 使用时只需要把sql信息用参数形式传入就可以. 但是如果sql中需要使用到udf,那么就需要实现flink的动态加载jar
先说结论
在通用的jar main中通过反射使用类加载器,加载对应的jar包
通过反射设置StreamExecutionEnvironment中的configuration的confData中的pipeline.classpaths
具体代码例子如下
public static void main(final String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);String path = "https://...est/template-core-0.0.1-shaded.jar";loadJar(new URL(path));Field configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration");configuration.setAccessible(true);Configuration o = (Configuration)configuration.get(env);Field confData = Configuration.class.getDeclaredField("confData");confData.setAccessible(true);Map<String,Object> temp = (Map<String,Object>)confData.get(o);List<String> jarList = new ArrayList<>();jarList.add(path);temp.put("pipeline.classpaths",jarList);tableEnvironment.executeSql("CREATE FUNCTION ReturnSelf2 AS 'flinksql.function.udf.ReturnSelf2'");tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +" f_sequence INT,\n" +" f_random INT,\n" +" f_random_str STRING,\n" +" ts AS localtimestamp,\n" +" WATERMARK FOR ts AS ts\n" +") WITH (\n" +" 'connector' = 'datagen',\n" +" 'rows-per-second'='5',\n" +"\n" +" 'fields.f_sequence.kind'='sequence',\n" +" 'fields.f_sequence.start'='1',\n" +" 'fields.f_sequence.end'='1000',\n" +"\n" +" 'fields.f_random.min'='1',\n" +" 'fields.f_random.max'='1000',\n" +"\n" +" 'fields.f_random_str.length'='10'\n" +")");tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +" f_random_str STRING" +") WITH (\n" +" 'connector' = 'print'\n" +")");tableEnvironment.executeSql("insert into sinktable " +"select ReturnSelf2(f_random_str) " +"from sourceTable");
}
//动态加载Jar
public static void loadJar(URL jarUrl) {//从URLClassLoader类加载器中获取类的addURL方法Method method = null;try {method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);} catch (NoSuchMethodException | SecurityException e1) {e1.printStackTrace();}// 获取方法的访问权限boolean accessible = method.isAccessible();try {//修改访问权限为可写if (accessible == false) {method.setAccessible(true);}// 获取系统类加载器URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();//jar路径加入到系统url路径里method.invoke(classLoader, jarUrl);} catch (Exception e) {e.printStackTrace();} finally {method.setAccessible(accessible);}
}
再说解决过程
idea中loadjar的尝试
首先是loadJar方法是需要,在idea中,只是用了loadJar是可以加载外部udf包的,正常运行,但是当我打包放到集群上,则会提是找不到对应的class,猜测是因为idea开发中,启动的是一个minicluster, 所以我在idea中启动的时候, main调用loadJar方法, 把udf加载到minicluster JVM进程中, JM跟TM都是在同个JVM进程中运行的, 所以是可以正常启动. 但是如果是在集群环境, 即使是local standalone , 执行jps命令可以发现是JM跟TM是不同的进程的
所以是main即使在JM中 加载了,但是TM进程中没有加载
-C 参数的发现
后面发现flink命令行启动的时候可以添加-C参数, 可以指定其他的jar文件
执行命令
flink --help
所以可以使用如下命令, 我的jar是上传到阿里云的oss上使用的
flink run -C "https://oss-cn-hangzhou.aliyuncs.com/test/cxytest/tanwan-function-0.1.jar..." -c cn.xuying.flink.stream.SimpleTest /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
但是我们项目平台是调用了集群的restapi来管理job的,而flink的restapi对应的接口是没有提供-C的之类的参数的, 也在flink中文邮件列表提了对应的问题,不过没有一个很好的答案
含泪跑源码pipeline.classpaths的发现
中途大佬的参与,发现了flink 客户端跑flinkjob的一些细节,
checkout源码,install,然后可以修改测试代码
找到测试的job打断点
运行test方法,可以发现env的里面的属性中多了-C参数url
然后我们不使用CliFrontendRunTest, 而是直接的执行那个TestJob的main函数, 断点可以发现没有了-C的配置
所以推测, flink 的client主要是解析你的命令行, 加一些参数配置, 然后执行你编写的flink main函数中的代码, 所以我们可以在我们自己的job代码中通过反射强制加入我们自己需要-C的配置 , 如文章开头的代码例子所示
基于 Apache Flink 的实时监控告警系统关于数据中台的深度思考与总结(干干货)日志收集Agent,阴暗潮湿的地底世界
2020 继续踏踏实实的做好自己
公众号(zhisheng)里回复 面经、ClickHouse、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章。
点个赞+在看,少个 bug ????
Flink 1.11 中的动态加载 udf jar 包相关推荐
- android jar 加入图片,Android动态加载外部jar包及jar包中图片等资源文件
Android动态加载外部jar包及jar包中图片等资源文件 Android应用程序由Java开发,因此Java中许多实用的特性,在Android中也有体现.动态加载Class,也就是外部jar包,在 ...
- flink sql udf jar包_FlinkSQL 动态加载 UDF 实现思路
导读: 最近在对 Flink 进行平台化,基于 REST API 构建一个平台实现通过纯 SQL 化编写和管理 Job.尽管 Flink官方希望用户将所有的依赖和业务逻辑打成一个fat jar,这样方 ...
- flinksql获取系统当前时间搓_FlinkSQL 动态加载 UDF 实现思路
导读: 最近在对 Flink 进行平台化,基于 REST API 构建一个平台实现通过纯 SQL 化编写和管理 Job.尽管 Flink官方希望用户将所有的依赖和业务逻辑打成一个fat jar,这样方 ...
- vue+elementui 中src动态加载图片的时候不起作用
vue+elementui 中src动态加载图片的时候不起作用 代码如下: <el-table-column align="center" label="宠物图片& ...
- Android插件化开发之动态加载本地皮肤包进行换肤
Android插件化开发之动态加载本地皮肤包进行换肤 前言: 本文主要讲解如何用开源换肤框架 android-skin-loader-lib来实现加载本地皮肤包文件进行换肤,具体可自行参考框架原理进行 ...
- Android逆向:某薇直播通过ClassLoader加载的jar包解密
声明:案例分析仅供学习交流使用,勿用于任何非法用途.如学习者进一步逆向并对版权方造成损失,请自行承担法律后果,本人概不负责. 简介 热修复和插件化是目前比较热门的技术,它们都是通过ClassLoade ...
- maven加载本地jar包,无法将本地jar包打进项目
加载本地jar包,无法将jar打进包 1.问题介绍 项目依赖某个私有jar包,由于各种原因,没有将jar包打进本地maven库,选择使用放到项目lib下引用,但是打包时lib文件夹下的依赖包打不进 ...
- springboot加载第三方jar包淘宝sdk进行打包编译
开发工具idea,开发springBoot电商项目,需要加载淘宝的sdk,以下有几种方式加载 通过这种方式加载第三方jar包,可以正常开发,但是打包部署服务器运行的时候会报错缺少jar包,无法打包的时 ...
- C#中如何动态加载DockPanel
在WinForm项目中要求实现动态加载DockPanel. 简单研究了下,演示代码如下: 1 DockPanel runPanel = dockManager1.AddPanel(DockingSty ...
- 关于c#中 的动态加载程序集
最近在写一个解析分析程序,需要动态加载卸载程序集(其实就是一个简单的插件框架),我的 思路是在主程序的目录下,创建一个assemblis目录,用来存放插件目录,如果加载插件时将其复制到 此目录,然后主 ...
最新文章
- 如何使用 Mmcv.exe 工具来管理群集消息队列资源[转]
- android xml png,android - 使用.png文件中的形状创建xml聊天气泡 - 堆栈内存溢出
- SAP标准成本核算重要环节详解
- .NET Core 控制台应用程序使用异步(Async)Main方法
- SpringBoot使用JdbcTemplate案例(学习笔记)
- 【短语学习】盈余量分析(earned value analysis)
- POST请求传入中文参数,接收端乱码
- JMail 应用实践(一)--- 简介用Java程序发送email
- win10系统自带的计算机无法使用吗,Win10如何使用系统自带的硬件设备检测工具?...
- Java 基础 匿名对象
- 安卓加密视频播放器使用教程
- delphi android 音乐播放器,Mcool音乐播放器
- 基于JAVA的旅游企业财务管理系统源码【包调试】
- CIS-Linux Centos7最新基线标准进行系统层面基线检测
- 翟菜花:5G时代的顺风车,智能音箱该怎么坐?
- 页面跳转的两种方式(转发和重定向)区别详解:
- 选购家用安防摄像头,考虑室内还是室外,无线还是有线
- vue+ elementui合并单元格后 checkbox多选单选取值问题
- 上传码云遇到git did not exit cleanly 的问题
- 在线旅游市场分析2014年数据