Hive是一种构建在Hadoop上的数据仓库,Hive把SQL查询转换为一系列在Hadoop集群中运行的MapReduce作业,是MapReduce更高层次的抽象,不用编写具体的MapReduce方法。Hive将数据组织为表,这就使得HDFS上的数据有了结构,元数据即表的模式,都存储在名为metastore的数据库中。

可以在hive的外壳环境中直接使用dfs访问hadoop的文件系统命令。

Hive可以允许用户编写自己定义的函数UDF,来在查询中使用。Hive中有3种UDF:

UDF: 操作单个数据行,产生单个数据行;

UDAF: 操作多个数据行,产生一个数据行。

UDTF: 操作一个数据行,产生多个数据行一个表作为输出。

用户构建的UDF使用过程如下:

第一步:继承UDF或者UDAF或者UDTF,实现特定的方法。

第二步:将写好的类打包为jar。如hivefirst.jar.

第三步:进入到Hive外壳环境中,利用add jar /home/hadoop/hivefirst.jar.注册该jar文件

第四步:为该类起一个别名,create temporary function mylength as 'com.whut.StringLength';这里注意UDF只是为这个Hive会话临时定义的。

第五步:在select中使用mylength();

自定义UDF

    1.继承org.apache.hadoop.hive.ql.exec.UDF

2.实现evaluate函数,evaluate函数支持重载

 
  1. package cn.sina.stat.hive.udf;

  2. import java.util.Arrays;

  3. import org.apache.hadoop.hive.ql.exec.UDF;

  4. public final class SortFieldContent extends UDF {

  5. public String evaluate( final String str, String delimiter) {

  6. if (str == null ) {

  7. return null ;

  8. }

  9. if (delimiter == null) {

  10. delimiter = "," ;

  11. }

  12. String[] strs = str.split(delimiter);

  13. Arrays. sort(strs);

  14. String result = "" ;

  15. for (int i = 0; i < strs. length; i++) {

  16. if (result.length() > 0) {

  17. result.concat(delimiter);

  18. }

  19. result.concat(strs[i]);

  20. }

  21. return result;

  22. }

  23. public String evaluate( final String str, String delimiter, String order) {

  24. if (str == null ) {

  25. return null ;

  26. }

  27. if (delimiter == null) {

  28. delimiter = "," ;

  29. }

  30. if (order != null && order.toUpperCase().equals( "ASC" )) {

  31. return evaluate(str, delimiter);

  32. } else {

  33. String[] strs = str.split(delimiter);

  34. Arrays. sort(strs);

  35. String result = "" ;

  36. for (int i = strs. length - 1; i >= 0; i--) {

  37. if (result.length() > 0) {

  38. result.concat(delimiter);

  39. }

  40. result.concat(strs[i]);

  41. }

  42. return result;

  43. }

  44. }

  45. }

注意事项:

1,一个用户UDF必须继承org.apache.hadoop.hive.ql.exec.UDF;

2,一个UDF必须要包含有evaluate()方法,但是该方法并不存在于UDF中。evaluate的参数个数以及类型都是用户自己定义的。在使用的时候,Hive会调用UDF的evaluate()方法。

自定义UDAF

1.函数类继承org.apache.hadoop.hive.ql.exec.UDAF

内部类实现接口org.apache.hadoop.hive.ql.exec.UDAFEvaluator

2.Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数

具体执行过程如图:

 
  1. package cn.sina.stat.hive.udaf;

  2. import java.util.Arrays;

  3. import org.apache.hadoop.hive.ql.exec.UDAF;

  4. import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

  5. public class ConcatClumnGroupByKeyWithOrder extends UDAF {

  6. public static class ConcatUDAFEvaluator implements UDAFEvaluator {

  7. public static class PartialResult {

  8. String result;

  9. String delimiter;

  10. String order;

  11. }

  12. private PartialResult partial;

  13. public void init() {

  14. partial = null;

  15. }

  16. public boolean iterate(String value, String delimiter, String order) {

  17. if (value == null) {

  18. return true;

  19. }

  20. if (partial == null) {

  21. partial = new PartialResult();

  22. partial.result = new String("");

  23. if (delimiter == null || delimiter.equals("")) {

  24. partial.delimiter = new String(",");

  25. } else {

  26. partial.delimiter = new String(delimiter);

  27. }

  28. if (order != null

  29. && (order.toUpperCase().equals("ASC") || order

  30. .toUpperCase().equals("DESC"))) {

  31. partial.order = new String(order);

  32. } else {

  33. partial.order = new String("ASC");

  34. }

  35. }

  36. if (partial.result.length() > 0) {

  37. partial.result = partial.result.concat(partial.delimiter);

  38. }

  39. partial.result = partial.result.concat(value);

  40. return true;

  41. }

  42. public PartialResult terminatePartial() {

  43. return partial;

  44. }

  45. public boolean merge(PartialResult other) {

  46. if (other == null) {

  47. return true;

  48. }

  49. if (partial == null) {

  50. partial = new PartialResult();

  51. partial.result = new String(other.result);

  52. partial.delimiter = new String(other.delimiter);

  53. partial.order = new String(other.order);

  54. } else {

  55. if (partial.result.length() > 0) {

  56. partial.result = partial.result.concat(partial.delimiter);

  57. }

  58. partial.result = partial.result.concat(other.result);

  59. }

  60. return true;

  61. }

  62. public String terminate() {

  63. String[] strs = partial.result.split(partial.delimiter);

  64. Arrays.sort(strs);

  65. String result = new String("");

  66. if (partial.order.equals("DESC")) {

  67. for (int i = strs.length - 1; i >= 0; i--) {

  68. if (result.length() > 0) {

  69. result.concat(partial.delimiter);

  70. }

  71. result.concat(strs[i]);

  72. }

  73. } else {

  74. for (int i = 0; i < strs.length; i++) {

  75. if (result.length() > 0) {

  76. result.concat(partial.delimiter);

  77. }

  78. result.concat(strs[i]);

  79. }

  80. }

  81. return new String(result);

  82. }

  83. }

  84. }

注意事项:

1,用户的UDAF必须继承了org.apache.hadoop.hive.ql.exec.UDAF;

2,用户的UDAF必须包含至少一个实现了org.apache.hadoop.hive.ql.exec的静态类,诸如常见的实现了 UDAFEvaluator。

3,一个计算函数必须实现的5个方法的具体含义如下:

init():主要是负责初始化计算函数并且重设其内部状态,一般就是重设其内部字段。一般在静态类中定义一个内部字段来存放最终的结果。

iterate():每一次对一个新值进行聚集计算时候都会调用该方法,计算函数会根据聚集计算结果更新内部状态。当输入值合法或者正确计算了,则就返回true。

terminatePartial():Hive需要部分聚集结果的时候会调用该方法,必须要返回一个封装了聚集计算当前状态的对象。

merge():Hive进行合并一个部分聚集和另一个部分聚集的时候会调用该方法。

terminate():Hive最终聚集结果的时候就会调用该方法。计算函数需要把状态作为一个值返回给用户。

4,部分聚集结果的数据类型和最终结果的数据类型可以不同。

自定义UDTF

1.继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
2.实现initialize, process, close三个方法
     a.initialize初始化验证,返回字段名和字段类型
     b.初始化完成后,调用process方法,对传入的参数进行处理,通过forword()方法把结果返回
     c.最后调用close()方法进行清理工作

 
  1. package cn.sina.stat.hive.udtf;

  2. import java.util.ArrayList;

  3. import java.util.Arrays;

  4. import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;

  5. import org.apache.hadoop.hive.ql.exec.UDFArgumentException;

  6. import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;

  7. import org.apache.hadoop.hive.ql.metadata.HiveException;

  8. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

  9. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;

  10. import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

  11. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

  12. public class SortFieldExplodeToPair extends GenericUDTF {

  13. @Override

  14. public void close() throws HiveException {

  15. // TODO Auto-generated method stub

  16. }

  17. @Override

  18. public StructObjectInspector initialize(ObjectInspector[] args)

  19. throws UDFArgumentException {

  20. if (args.length != 3) {

  21. throw new UDFArgumentLengthException(

  22. "SortFieldExplodeToPair takes only three argument");

  23. }

  24. if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {

  25. throw new UDFArgumentException(

  26. "SortFieldExplodeToPair takes string as first parameter");

  27. }

  28. if (args[1].getCategory() != ObjectInspector.Category.PRIMITIVE) {

  29. throw new UDFArgumentException(

  30. "SortFieldExplodeToPair takes string as second parameter");

  31. }

  32. if (args[2].getCategory() != ObjectInspector.Category.PRIMITIVE) {

  33. throw new UDFArgumentException(

  34. "SortFieldExplodeToPair takes string as third parameter");

  35. }

  36. if (args[2] == null

  37. || !(args[2].toString().toUpperCase().equals("ASC") || args[2]

  38. .toString().toUpperCase().equals("DESC"))) {

  39. throw new UDFArgumentException(

  40. "SortFieldExplodeToPair third parameter must be \"ASC\" or \"DESC\"");

  41. }

  42. ArrayList<String> fieldNames = new ArrayList<String>();

  43. ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

  44. fieldNames.add("col1");

  45. fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

  46. return ObjectInspectorFactory.getStandardStructObjectInspector(

  47. fieldNames, fieldOIs);

  48. }

  49. private final String[] forwardStr = new String[1];

  50. @Override

  51. public void process(Object[] args) throws HiveException {

  52. String input = args[0].toString();

  53. String delimiter = args[1].toString();

  54. String order = args[2].toString();

  55. String[] strList = input.split(delimiter);

  56. Arrays.sort(strList);

  57. if (strList.length > 1) {

  58. if (order.toUpperCase().equals("DESC")) {

  59. for (int i = strList.length - 1; i > 0; i--) {

  60. forwardStr[0] = strList[i].concat(delimiter).concat(

  61. strList[i - 1]);

  62. forward(forwardStr);

  63. }

  64. } else {

  65. for (int i = 0; i < strList.length - 1; i++) {

  66. forwardStr[0] = strList[i].concat(delimiter).concat(

  67. strList[i + 1]);

  68. forward(forwardStr);

  69. }

  70. }

  71. } else {

  72. forward(strList);

  73. }

  74. }

Hive自定义UDF UDAF UDTF相关推荐

  1. hive的udf,udaf,udtf各自依賴兩種class(转载+分析整理)

    Hive自定义函数包括三种UDF.UDAF.UDTF 名稱縮寫 特點 依賴 UDF(User-Defined-Function) 一进一出 org.apache.hadoop.hive.ql.exec ...

  2. Hive 之 用户自定义函数 UDF UDAF UDTF

    一 什么是UDF UDF是UserDefined Function 用户自定义函数的缩写.Hive中除了原生提供的一些函数之外,如果还不能满足我们当前需求,我们可以自定义函数. 除了UDF 之外,我们 ...

  3. udf,udaf,udtf之间的区别

    1.UDF:用户定义(普通)函数,只对单行数值产生作用: 继承UDF类,添加方法 evaluate() /*** @function 自定义UDF统计最小值* @author John**/publi ...

  4. UDF UDAF UDTF 区别

    UDF UDAF UDTF 区别 UDF 概念: User-Defined-Function 自定义函数 .一进一出:只对单行数据产生作用: 实际使用时,UDF函数以匿名函数的形式进行操作使用 背景: ...

  5. Hive 自定义函数编写(UDF,UDAF,UDTF)

    Hive自带了一些函数,比如:max/min等,但是数量有限,自己可以通过自定义 UDF来方便的扩展. 当 Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数. 1. ...

  6. Hive自定义UDF和聚合函数UDAF

    2019独角兽企业重金招聘Python工程师标准>>> 转自:http://computerdragon.blog.51cto.com/6235984/1288567 Hive是一种 ...

  7. Hive 自定义UDF函数讲解

    目录 一.UDF描述 二.UDF种类 三.自定义实现UDF和UDTF 3.1 需求 3.2 项目pom文件 3.3 Hive建表测试及数据 3.4UDF函数编写 3.5 UDTF函数编写 四:添加到h ...

  8. 自定义UDF、UDTF函数

    自定义步骤 自定义UDF:继承UDF,重写evaluate方法 自定义UDTF:继承GenericUDTF,重写3个方法:initialize(自定义输出数据的列名和类型),process(将结果返回 ...

  9. Hive自定义UDF的JAR包加入运行环境的方法

    Hive开发udf函数打包jar文件后,需将jar文件放入hive的运行环境,方法有三. 先将http://blog.csdn.net/fjssharpsword/article/details/70 ...

最新文章

  1. OpenStack 实现技术分解 (6) 通用库 — oslo_log
  2. 使用C#删除一个字符串数组中的空字符串
  3. javascript学习系列(10):数组中的slice方法
  4. REVERSE-PRACTICE-BUUCTF-18
  5. mongodb$pull数组更新操作符
  6. 中国移动上市第二日逼近破发线
  7. tplink无线受限 服务器无响应,tplink怎么设置密码(tplink服务器无响应)
  8. Android 编码规范
  9. HBase 学习(三) JavaAPI的使用
  10. VB6 如何添加自定义函数 模块 把代码放到一个模块中
  11. 语料库资源————(一)
  12. ivms4200 远程桌面访问测试过程及问题汇总
  13. 计算机机房的维护方案,机房设备维护方案.doc
  14. 增强版唐奇安通道策略
  15. Spatiotemporal Multi-Graph Convolution Network for Ride-Hailing Demand Forecasting
  16. C语言数字图像处理进阶---6 LOMO滤镜
  17. 移动 App 应用测试方法与思路
  18. 全像素双核激光对焦拍照是个什么厉害玩意儿
  19. 激光测距仪系统设计 c语言程序),基于时差法的激光测距方法与应用
  20. CorelDRAWX4的C++插件开发(四十一)纯C++插件开发(5)实现六个纯虚函数

热门文章

  1. 用计算机连接路由器,用路由器怎么连接两台电脑
  2. ngixn+tomcat负载均衡 动静分离配置 (nginx反向代理)
  3. linux mmu的实现的讲解_Linux中的段
  4. eclipse中的WEB项目打包部署到tomcat .
  5. linux 进程映射空间 libc,为什么不能在64位内核的32位Linux进程中映射(MAP_FIXED)最高虚拟页面?...
  6. excel mysql实时交换数据_Excel与数据库的数据交互
  7. redis 连接池_SpringBoot整合redis
  8. 如何判断两个平面相交_数学提高平面与平面垂直的判定方法是什么
  9. java 前端页面传过来的值怎么防止篡改_杂谈!了解一些额外知识,让你的前端开发锦上添花...
  10. python可以做哪些东西_自己总结的一些东西