• modules概念

  • 通过hive module使用hive函数

    • 内置函数

    • 自定义函数

  • sql 客户端的使用

  • 原理分析和源码解析

    • 实现

modules概念

flink 提供了一个module的概念,使用户能扩展flink的内置对象,比如内置函数。这个功能是插件化的,用户可以方便的加载或者移除相应的module。

flink内置了CoreModule,并且提供了一个hive module,允许用户在加载了hive module之后使用hive的函数,包括内置函数、自定义hive函数等等。如果多个module里有重名的函数,则以先加载的函数为准。

用户还可以自定义module,只需要实现Module接口即可。如果是在sql 客户端使用,还需要实现ModuleFactory接口,因为加载的时候,flink会使用SPI机制去匹配获取相应的ModuleFactory,然后实例化相应的moudule。

通过hive module使用hive函数

我们以hive module为例,讲解一下如何使用flink提供的module功能,使用hive module的一些注意事项:

  • 通过 Hive Metastore 将带有 UDF 的 HiveCatalog 设置为当前会话的 catalog。
  • 将带有 UDF 的 jar 包放入 Flink classpath 中,并在代码中引入。
  • 使用 Blink planner,flink 1.11默认就是,不用显示指定

内置函数

  • 引入pom
   org.apache.flinkflink-connector-hive_${scala.binary.version}${flink.version}
  • 加载module
  String name = "myhive";  String version = "3.1.2";  tEnv.loadModule(name, new HiveModule(version));
  • 查看module
  System.out.println("list modules ------------------ ");  String[] modules = tEnv.listModules();  Arrays.stream(modules).forEach(System.out::println);

运行结果我们看到有两个module

list modules ------------------ coremyhive
  • 查看函数
 System.out.println("list functions (包含hive函数):------------------  ");  String[] functions = tEnv.listFunctions();  Arrays.stream(functions).forEach(System.out::println);

我们看到列出来大概300多个函数,包含flink和hive的内置函数。

  • hive函数的使用

在hive里有一个常用的解析json的函数get_json_object,这个可以把json字符串解析之后得到想要的字段,但是flink中没有这个函数,所以我们可以通过这种方式来使用hive的函数,就不用我们自己开发UDF了。

 System.out.println("hive 函数的使用:  ------------------  ");  String sql = "SELECT data,get_json_object(data, '$.name')  FROM (VALUES ('{\"name\":\"flink\"}'), ('{\"name\":\"hadoop\"}')) AS MyTable(data)";

  List results = Lists.newArrayList(tEnv.sqlQuery(sql)                                             .execute()                                             .collect());  results.stream().forEach(System.out::println);

输出结果:

hive 函数的使用:  ------------------  {"name":"flink"},flink{"name":"hadoop"},hadoop

自定义函数

前面我们讲了如何使用hive的内置函数,这个比较简单,接在了hive的module之后就可以用了,还有一种就是如何使用hive的udf函数呢?我们接下来简单聊聊。

  • 自定义hive函数

首先我们来自定义一个hive的udf函数

  1. 引入pom
    org.apache.hivehive-exec3.1.2

实现一个自定义函数,就是实现两个int类型数字的加和操作

  1. 定义函数
public class TestHiveUDF extends UDF{

 public IntWritable evaluate(IntWritable i,IntWritable j){  return new IntWritable(i.get() + j.get()); }

}

完整代码:
https://github.com/zhangjun0x01/bigdata-examples/blob/master/hive/src/main/java/com/test/TestHiveUDF.java

  1. 导入

把相应的jar放到hive的classpath下面

定义函数

add jar /home/work/work/hive/lib/hive-1.0-SNAPSHOT.jar; CREATE  FUNCTION mysum AS "com.test.TestHiveUDF"; 
  1. 测试
  boolean b = Arrays.asList(functions1).contains("mysum");  System.out.println("是否包含自定义函数: " + b);

  String sqlUdf = "select mysum(1,2)";  List results1 = Lists.newArrayList(tEnv.sqlQuery(sqlUdf)                                         .execute()                                         .collect());  System.out.println("使用自定义函数处理结果: ");  results1.stream().forEach(System.out::println);

完整的代码请参考:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/modules/HiveModulesTest.java

sql 客户端的使用

在sql-client-defaults.yaml里配置相关的模块,然后就可以使用了.

# Define modules here.

modules: # note the following modules will be of the order they are specified  - name: core    type: core  - name: hive    type: hive

原理分析和源码解析

其实相关的源码实现也不难,就是将hive的相关函数转成了flink的函数,我们简单的来看下,主要是在HiveModule类里面。

public class HiveModule implements Module {    ............. private final HiveFunctionDefinitionFactory factory; private final String hiveVersion; private final HiveShim hiveShim;

这个里面有三个主要的变量,用于构造函数的factory,hive的版本hiveVersion,以及用于处理不同版本hive的处理类hiveShim。

实现

具体转换函数的方法是getFunctionDefinition,这个方法调用了工厂类的createFunctionDefinitionFromHiveFunction方法,

我们进入 HiveFunctionDefinitionFactory#createFunctionDefinitionFromHiveFunction。

public FunctionDefinition createFunctionDefinitionFromHiveFunction(String name, String functionClassName) {  Class clazz;  try {   clazz = Thread.currentThread().getContextClassLoader().loadClass(functionClassName);

   LOG.info("Successfully loaded Hive udf '{}' with class '{}'", name, functionClassName);  } catch (ClassNotFoundException e) {   throw new TableException(    String.format("Failed to initiate an instance of class %s.", functionClassName), e);  }

  if (UDF.class.isAssignableFrom(clazz)) {   LOG.info("Transforming Hive function '{}' into a HiveSimpleUDF", name);

   return new ScalarFunctionDefinition(    name,    new HiveSimpleUDF(new HiveFunctionWrapper<>(functionClassName), hiveShim)   );  }  ..........

我们看到首先会加载相关函数,这个也就是为什么要求我们把hive的udf jar放到flink的classpath的原因。之后是一堆if else判断,Hive UDF 和 GenericUDF 函数会自动转换成 Flink 中的 ScalarFunction,GenericUDTF 会被自动转换成 Flink 中的 TableFunction,UDAF 和 GenericUDAFResolver2 则转换成 Flink 聚合函数(AggregateFunction).这样当我们就可以在flink中使用相应的hive函数了。

参考资料:
[1].https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_functions.html

更多内容,欢迎关注我的公众号【大数据技术与应用实战】

image

hive内置函数_flink教程flink modules详解之使用hive函数相关推荐

  1. python turtle库setpos_Python内置海龟(turtle)库绘图命令详解(二)

    继续谈利用海龟库(turtle库)做图.在这篇文章(Python内置海龟(turtle)库绘图命令详解(一))中已经介绍了turtle的一些基本画图命令,包括画布的设计.画笔属性与状态的设置以及画笔的 ...

  2. python中groupby()函数讲解与示例_详解python中groupby函数通俗易懂

    一.groupby 能做什么? python中groupby函数主要的作用是进行数据的分组以及分组后地组内运算! 对于数据的分组和分组运算主要是指groupby函数的应用,具体函数的规则如下: df[ ...

  3. python函数的使用场景_详解python中strip函数的使用场景

    python strip()函数 介绍,需要的朋友可以参考一下 函数原型 声明:s为字符串,rm为要删除的字符序列 s.strip(rm) 删除s字符串中开头.结尾处,位于 rm删除序列的字符 s.l ...

  4. python中get函数是什么意思_详解python中get函数的用法(附代码)_后端开发

    strncmp函数用法详解_后端开发 strncmp函数为字符串比较函数,其函数语法为"int strncmp ( const char * str1, const char * str2, ...

  5. python序列类型-Python内置序列类型之集合类型详解

    1.集合概念 具有某种特定性质的事物的总体,集合里的东西叫作元素.Python中,集合(set)是一个无序不重复元素的序列. 2.集合的创建 可以使用大括号 { } 或者 set() 函数创建集合,注 ...

  6. Dubbo内置4种负载均衡算法(详解)

    1.1 什么是负载均衡 在实际开发中,一个服务基本都是集群模式的,也就是多个功能相同的项目在运行,这样才能承受更高的并发,这时一个请求到这个服务,就需要确定访问哪一个服务器 Dubbo框架内部支持负载 ...

  7. python内置json模块的作用_python详解json模块

    我们在做工作中经常会使用到json模块,今天就简单介绍下json模块 什么是json JSON ,全称为JavaScript Object Notation, 也就是JavaScript 对象标记,它 ...

  8. sort函数降序排列matlab,详解Matlab中 sort 函数用法

    (1)B=sort(A) 对一维或二维数组进行升序排序,并返回排序后的数组,当A为二维时,对数组每一列进行排序. eg: A=[1,5,3],则sort(A)=[1,3,5] A=[1,5,3;2,4 ...

  9. 大数据入门教程系列之Hive内置函数及自定义函数

    本篇文章主要介绍Hive内置函数以及自定义UDF函数和UDFT函数,自定义UDF函数通过一个国际转换中文的例子说明. 操作步骤: ①.准备数据和环境 ②.演示Hive内置函数 ③.自定义UDF函数编写 ...

最新文章

  1. C++从零实现神经网络(收藏版:两万字长文)
  2. 在线作图丨高级的微生物分析——在线做Variance Partitioning Analysis(VPA分析)
  3. 计算机一级判断题2016,2016年12月计算机一级考试WPS判断题及答案
  4. 10自带sftp服务器_一文讲透FTP和SFTP的区别
  5. 用计算机来控制飞船在太空中运行,神舟飞船在太空中靠什么动力运行
  6. Java 下载 Excel模板
  7. TrueType字体结构
  8. C++学习笔记(二)——字符 字符串 ASCII码 转义字符
  9. 建立PCI网卡无盘工作站自动上网(转)
  10. 香橙派借助语音模块实现语音刷抖音
  11. WCF学习之旅—WCF第二个示例(五)
  12. 小米路由器3G建站折腾笔记6 - 总结
  13. 如何用电脑控制手机屏幕,写工作日志
  14. html如何自己做一个背景特效,背景效果实现方法总结
  15. Python:开发语言简介
  16. 某大型合资企业审批系统上线经验教训总结
  17. CPU内部结构图和MicroBlaze内部结构图对比
  18. AJAX——发送GET请求
  19. DPCM编码算法实现
  20. 减法公式运算法则_小学数学加减乘除计算运算法则

热门文章

  1. Python爬虫百度搜索
  2. Acwing第 29 场周赛【完结】
  3. Acwing第 26 场周赛【完结】
  4. 深度优先搜索(DFS)相关习题
  5. SQL语言之数据定义语言(Oracle)
  6. python中uss的用法_使用不同内存ussag管理Python多进程进程进程
  7. 【数据结构】对排序的综合总结
  8. 三面腾讯,竟然挂在了JVM上…
  9. 程序员入职国企,1周上班5小时,晒出薪资感叹:腾讯当CEO也不去
  10. 「starter推荐」简单高效 Excel 导出工具