hive内置函数_flink教程flink modules详解之使用hive函数
modules概念
通过hive module使用hive函数
内置函数
自定义函数
sql 客户端的使用
原理分析和源码解析
实现
modules概念
flink 提供了一个module的概念,使用户能扩展flink的内置对象,比如内置函数。这个功能是插件化的,用户可以方便的加载或者移除相应的module。
flink内置了CoreModule,并且提供了一个hive module,允许用户在加载了hive module之后使用hive的函数,包括内置函数、自定义hive函数等等。如果多个module里有重名的函数,则以先加载的函数为准。
用户还可以自定义module,只需要实现Module接口即可。如果是在sql 客户端使用,还需要实现ModuleFactory接口,因为加载的时候,flink会使用SPI机制去匹配获取相应的ModuleFactory,然后实例化相应的moudule。
通过hive module使用hive函数
我们以hive module为例,讲解一下如何使用flink提供的module功能,使用hive module的一些注意事项:
- 通过 Hive Metastore 将带有 UDF 的 HiveCatalog 设置为当前会话的 catalog。
- 将带有 UDF 的 jar 包放入 Flink classpath 中,并在代码中引入。
- 使用 Blink planner,flink 1.11默认就是,不用显示指定
内置函数
- 引入pom
org.apache.flinkflink-connector-hive_${scala.binary.version}${flink.version}
- 加载module
String name = "myhive"; String version = "3.1.2"; tEnv.loadModule(name, new HiveModule(version));
- 查看module
System.out.println("list modules ------------------ "); String[] modules = tEnv.listModules(); Arrays.stream(modules).forEach(System.out::println);
运行结果我们看到有两个module
list modules ------------------ coremyhive
- 查看函数
System.out.println("list functions (包含hive函数):------------------ "); String[] functions = tEnv.listFunctions(); Arrays.stream(functions).forEach(System.out::println);
我们看到列出来大概300多个函数,包含flink和hive的内置函数。
- hive函数的使用
在hive里有一个常用的解析json的函数get_json_object,这个可以把json字符串解析之后得到想要的字段,但是flink中没有这个函数,所以我们可以通过这种方式来使用hive的函数,就不用我们自己开发UDF了。
System.out.println("hive 函数的使用: ------------------ "); String sql = "SELECT data,get_json_object(data, '$.name') FROM (VALUES ('{\"name\":\"flink\"}'), ('{\"name\":\"hadoop\"}')) AS MyTable(data)";
List results = Lists.newArrayList(tEnv.sqlQuery(sql) .execute() .collect()); results.stream().forEach(System.out::println);
输出结果:
hive 函数的使用: ------------------ {"name":"flink"},flink{"name":"hadoop"},hadoop
自定义函数
前面我们讲了如何使用hive的内置函数,这个比较简单,接在了hive的module之后就可以用了,还有一种就是如何使用hive的udf函数呢?我们接下来简单聊聊。
- 自定义hive函数
首先我们来自定义一个hive的udf函数
- 引入pom
org.apache.hivehive-exec3.1.2
实现一个自定义函数,就是实现两个int类型数字的加和操作
- 定义函数
public class TestHiveUDF extends UDF{
public IntWritable evaluate(IntWritable i,IntWritable j){ return new IntWritable(i.get() + j.get()); }
}
完整代码:
https://github.com/zhangjun0x01/bigdata-examples/blob/master/hive/src/main/java/com/test/TestHiveUDF.java
- 导入
把相应的jar放到hive的classpath下面
定义函数
add jar /home/work/work/hive/lib/hive-1.0-SNAPSHOT.jar; CREATE FUNCTION mysum AS "com.test.TestHiveUDF";
- 测试
boolean b = Arrays.asList(functions1).contains("mysum"); System.out.println("是否包含自定义函数: " + b);
String sqlUdf = "select mysum(1,2)"; List results1 = Lists.newArrayList(tEnv.sqlQuery(sqlUdf) .execute() .collect()); System.out.println("使用自定义函数处理结果: "); results1.stream().forEach(System.out::println);
完整的代码请参考:
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/modules/HiveModulesTest.java
sql 客户端的使用
在sql-client-defaults.yaml里配置相关的模块,然后就可以使用了.
# Define modules here.
modules: # note the following modules will be of the order they are specified - name: core type: core - name: hive type: hive
原理分析和源码解析
其实相关的源码实现也不难,就是将hive的相关函数转成了flink的函数,我们简单的来看下,主要是在HiveModule类里面。
public class HiveModule implements Module { ............. private final HiveFunctionDefinitionFactory factory; private final String hiveVersion; private final HiveShim hiveShim;
这个里面有三个主要的变量,用于构造函数的factory,hive的版本hiveVersion,以及用于处理不同版本hive的处理类hiveShim。
实现
具体转换函数的方法是getFunctionDefinition,这个方法调用了工厂类的createFunctionDefinitionFromHiveFunction方法,
我们进入 HiveFunctionDefinitionFactory#createFunctionDefinitionFromHiveFunction。
public FunctionDefinition createFunctionDefinitionFromHiveFunction(String name, String functionClassName) { Class clazz; try { clazz = Thread.currentThread().getContextClassLoader().loadClass(functionClassName);
LOG.info("Successfully loaded Hive udf '{}' with class '{}'", name, functionClassName); } catch (ClassNotFoundException e) { throw new TableException( String.format("Failed to initiate an instance of class %s.", functionClassName), e); }
if (UDF.class.isAssignableFrom(clazz)) { LOG.info("Transforming Hive function '{}' into a HiveSimpleUDF", name);
return new ScalarFunctionDefinition( name, new HiveSimpleUDF(new HiveFunctionWrapper<>(functionClassName), hiveShim) ); } ..........
我们看到首先会加载相关函数,这个也就是为什么要求我们把hive的udf jar放到flink的classpath的原因。之后是一堆if else判断,Hive UDF 和 GenericUDF 函数会自动转换成 Flink 中的 ScalarFunction,GenericUDTF 会被自动转换成 Flink 中的 TableFunction,UDAF 和 GenericUDAFResolver2 则转换成 Flink 聚合函数(AggregateFunction).这样当我们就可以在flink中使用相应的hive函数了。
参考资料:
[1].https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_functions.html
更多内容,欢迎关注我的公众号【大数据技术与应用实战】
hive内置函数_flink教程flink modules详解之使用hive函数相关推荐
- python turtle库setpos_Python内置海龟(turtle)库绘图命令详解(二)
继续谈利用海龟库(turtle库)做图.在这篇文章(Python内置海龟(turtle)库绘图命令详解(一))中已经介绍了turtle的一些基本画图命令,包括画布的设计.画笔属性与状态的设置以及画笔的 ...
- python中groupby()函数讲解与示例_详解python中groupby函数通俗易懂
一.groupby 能做什么? python中groupby函数主要的作用是进行数据的分组以及分组后地组内运算! 对于数据的分组和分组运算主要是指groupby函数的应用,具体函数的规则如下: df[ ...
- python函数的使用场景_详解python中strip函数的使用场景
python strip()函数 介绍,需要的朋友可以参考一下 函数原型 声明:s为字符串,rm为要删除的字符序列 s.strip(rm) 删除s字符串中开头.结尾处,位于 rm删除序列的字符 s.l ...
- python中get函数是什么意思_详解python中get函数的用法(附代码)_后端开发
strncmp函数用法详解_后端开发 strncmp函数为字符串比较函数,其函数语法为"int strncmp ( const char * str1, const char * str2, ...
- python序列类型-Python内置序列类型之集合类型详解
1.集合概念 具有某种特定性质的事物的总体,集合里的东西叫作元素.Python中,集合(set)是一个无序不重复元素的序列. 2.集合的创建 可以使用大括号 { } 或者 set() 函数创建集合,注 ...
- Dubbo内置4种负载均衡算法(详解)
1.1 什么是负载均衡 在实际开发中,一个服务基本都是集群模式的,也就是多个功能相同的项目在运行,这样才能承受更高的并发,这时一个请求到这个服务,就需要确定访问哪一个服务器 Dubbo框架内部支持负载 ...
- python内置json模块的作用_python详解json模块
我们在做工作中经常会使用到json模块,今天就简单介绍下json模块 什么是json JSON ,全称为JavaScript Object Notation, 也就是JavaScript 对象标记,它 ...
- sort函数降序排列matlab,详解Matlab中 sort 函数用法
(1)B=sort(A) 对一维或二维数组进行升序排序,并返回排序后的数组,当A为二维时,对数组每一列进行排序. eg: A=[1,5,3],则sort(A)=[1,3,5] A=[1,5,3;2,4 ...
- 大数据入门教程系列之Hive内置函数及自定义函数
本篇文章主要介绍Hive内置函数以及自定义UDF函数和UDFT函数,自定义UDF函数通过一个国际转换中文的例子说明. 操作步骤: ①.准备数据和环境 ②.演示Hive内置函数 ③.自定义UDF函数编写 ...
最新文章
- C++从零实现神经网络(收藏版:两万字长文)
- 在线作图丨高级的微生物分析——在线做Variance Partitioning Analysis(VPA分析)
- 计算机一级判断题2016,2016年12月计算机一级考试WPS判断题及答案
- 10自带sftp服务器_一文讲透FTP和SFTP的区别
- 用计算机来控制飞船在太空中运行,神舟飞船在太空中靠什么动力运行
- Java 下载 Excel模板
- TrueType字体结构
- C++学习笔记(二)——字符 字符串 ASCII码 转义字符
- 建立PCI网卡无盘工作站自动上网(转)
- 香橙派借助语音模块实现语音刷抖音
- WCF学习之旅—WCF第二个示例(五)
- 小米路由器3G建站折腾笔记6 - 总结
- 如何用电脑控制手机屏幕,写工作日志
- html如何自己做一个背景特效,背景效果实现方法总结
- Python:开发语言简介
- 某大型合资企业审批系统上线经验教训总结
- CPU内部结构图和MicroBlaze内部结构图对比
- AJAX——发送GET请求
- DPCM编码算法实现
- 减法公式运算法则_小学数学加减乘除计算运算法则