UDAF是Hive中用户自定义的聚集函数,Hive内置UDAF函数包括有sum()与count(),UDAF实现有简单与通用两种方式,简单UDAF因为使用Java反射导致性能损失,而且有些特性不能使用,已经被弃用了;在这篇博文中我们将关注Hive中自定义聚类函数-GenericUDAF,UDAF开发主要涉及到以下两个抽象类:

点击(此处)折叠或打开

org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver

org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator

如果你想浏览代码:fork it on Github:https://github.com/rathboma/hive-extension-examples

示例数据准备

首先先创建一张包含示例数据的表:people,该表只有name一列,该列中包含了一个或多个名字,该表数据保存在people.txt文件中。

点击(此处)折叠或打开

~$ cat ./people.txt

John Smith

John and Ann White

Ted Green

Dorothy

把该文件上载到HDFS目录/user/matthew/people中:

点击(此处)折叠或打开

hadoop fs -mkdir people

hadoop fs -put ./people.txt people

下面要创建Hive外部表,在Hive shell中执行

点击(此处)折叠或打开

CREATE EXTERNAL TABLE people (name string)

ROW FORMAT DELIMITED FIELDS

TERMINATED BY '\t'

ESCAPED BY ''

LINES TERMINATED BY '\n'

STORED AS TEXTFILE

LOCATION '/user/matthew/people';

相关抽象类介绍

创建一个GenericUDAF必须先了解以下两个抽象类:

点击(此处)折叠或打开

org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver

org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator

为了更好理解上述抽象类的API,要记住hive只是mapreduce函数,只不过hive已经帮助我们写好并隐藏mapreduce,向上提供简洁的sql函数,所以我们要结合Mapper、Combiner与Reducer来帮助我们理解这个函数。要记住在hadoop集群中有若干台机器,在不同的机器上Mapper与Reducer任务独立运行。

所以大体上来说,这个UDAF函数读取数据(mapper),聚集一堆mapper输出到部分聚集结果(combiner),并且最终创建一个最终的聚集结果(reducer)。因为我们跨域多个combiner进行聚集,所以我们需要保存部分聚集结果。

AbstractGenericUDAFResolver

Resolver很简单,要覆盖实现下面方法,该方法会根据sql传人的参数数据格式指定调用哪个Evaluator进行处理。

点击(此处)折叠或打开

public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException;

GenericUDAFEvaluator

UDAF逻辑处理主要发生在Evaluator中,要实现该抽象类的几个方法。

在理解Evaluator之前,必须先理解objectInspector接口与GenericUDAFEvaluator中的内部类Model。

ObjectInspector

作用主要是解耦数据使用与数据格式,使得数据流在输入输出端切换不同的输入输出格式,不同的Operator上使用不同的格式。可以参考这两篇文章:first post on Hive UDFs、Hive中ObjectInspector的作用,里面有关于objectinspector的介绍。

Model

Model代表了UDAF在mapreduce的各个阶段。

点击(此处)折叠或打开

public static enum Mode {

/**

* PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合

* 将会调用iterate()和terminatePartial()

*/

PARTIAL1,

/**

* PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:

* 将会调用merge() 和 terminatePartial()

*/

PARTIAL2,

/**

* FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合

* 将会调用merge()和terminate()

*/

FINAL,

/**

* COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合

* 将会调用 iterate()和terminate()

*/

COMPLETE

};

一般情况下,完整的UDAF逻辑是一个mapreduce过程,如果有mapper和reducer,就会经历PARTIAL1(mapper),FINAL(reducer),如果还有combiner,那就会经历PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。

而有一些情况下的mapreduce,只有mapper,而没有reducer,所以就会只有COMPLETE阶段,这个阶段直接输入原始数据,出结果。

点击(此处)折叠或打开

GenericUDAFEvaluator的方法

// 确定各个阶段输入输出参数的数据格式ObjectInspectors

public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;

// 保存数据聚集结果的类

abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;

// 重置聚集结果

public void reset(AggregationBuffer agg) throws HiveException;

// map阶段,迭代处理输入sql传过来的列数据

public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;

// map与combiner结束返回结果,得到部分数据聚集结果

public Object terminatePartial(AggregationBuffer agg) throws HiveException;

// combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。

public void merge(AggregationBuffer agg, Object partial) throws HiveException;

// reducer阶段,输出最终结果

public Object terminate(AggregationBuffer agg) throws HiveException;

图解Model与Evaluator关系

实例

下面将讲述一个聚集函数UDAF的实例,我们将计算people这张表中的name列字母的个数。

下面的函数代码是计算指定列中字符的总数(包括空格)

pom文件如下:

点击(此处)折叠或打开

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

TotalNumOfLetters

com.xxxx.udaf

1.0-SNAPSHOT

org.apache.hive

hive-exec

2.6.0

org.apache.hadoop

hadoop-client

2.6.0

org.apache.maven.plugins

maven-jar-plugin

com.xxxx.udaf.xxxx

com.jolira

onejar-maven-plugin

1.4.4

true

onejar

one-jar

org.apache.maven.plugins

maven-compiler-plugin

7

7

代码

点击(此处)折叠或打开

package com.xxxx.udaf;

import org.apache.hadoop.hive.ql.exec.Description;

import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;

import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;

import org.apache.hadoop.hive.ql.metadata.HiveException;

import org.apache.hadoop.hive.ql.parse.SemanticException;

import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;

import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;

import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;

import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;

@Description(name = "letters", value = "__FUNC__(expr) - return the total count chars of the column(返回该列中所有字符串的字符总数)")

public class TotalNumOfLettersGenericUDAF extends AbstractGenericUDAFResolver {

@Override

public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {

if (parameters.length != 1) { // 判断参数长度

throw new UDFArgumentLengthException("Exactly one argument is expected, but " +

parameters.length + " was passed!");

}

ObjectInspector objectInspector = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);

if (objectInspector.getCategory() != ObjectInspector.Category.PRIMITIVE) { // 是不是标准的java Object的primitive类型

throw new UDFArgumentTypeException(0, "Argument type must be PRIMARY. but " +

objectInspector.getCategory().name() + " was passed!");

}

// 如果是标准的java Object的primitive类型,说明可以进行类型转换

PrimitiveObjectInspector in putOI = (PrimitiveObjectInspector) objectInspector;

// 如果是标准的java Object的primitive类型,判断是不是string类型,因为参数只接受string类型

if (in putOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {

throw new UDFArgumentTypeException(0, "Argument type must be Strig, but " +

in putOI.getPrimitiveCategory().name() + " was passed!");

}

return new TotalNumOfLettersEvaluator();

}

public static class TotalNumOfLettersEvaluator extends GenericUDAFEvaluator {

PrimitiveObjectInspector in putIO;

ObjectInspector outputIO;

PrimitiveObjectInspector IntegerIO;

int total = 0;

@Override

public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {

assert (parameters.length == 1);

super.init(m, parameters);

/**

* PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合

* 将会调用iterate()和terminatePartial()

* PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:

* 将会调用merge() 和 terminatePartial()

* FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合

* 将会调用merge()和terminate()

* COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合

* 将会调用 iterate()和terminate()

*/

//map阶段读取sql列,输入为String基础数据格式

if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {

in putIO = (PrimitiveObjectInspector) parameters[0];

} else { //其余阶段,输入为Integer基础数据格式

IntegerIO = (PrimitiveObjectInspector) parameters[0];

}

// 指定各个阶段输出数据格式都为Integer类型

outputIO = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,

ObjectInspectorFactory.ObjectInspectorOptions.JAVA);

return outputIO;

}

/**

* 存储当前字符总数的类

*/

static class LetterSumAgg implements AggregationBuffer {

int sum = 0;

void add(int num) {

sum += num;

}

}

@Override

public AggregationBuffer getNewAggregationBuffer() throws HiveException {

LetterSumAgg result = new LetterSumAgg();

return result;

}

@Override

public void reset(AggregationBuffer aggregationBuffer) throws HiveException {

LetterSumAgg myAgg = new LetterSumAgg();

}

private boolean warned = false;

@Override

public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {

assert (parameters.length == 1);

if (parameters[0] != null) {

LetterSumAgg myAgg = (LetterSumAgg) agg;

Object p = in putIO.getPrimitiveJavaObject(parameters[0]);

myAgg.add(String.valueOf(p).length());

}

}

@Override

public Object terminatePartial(AggregationBuffer agg) throws HiveException {

LetterSumAgg myAgg = (LetterSumAgg) agg;

total += myAgg.sum;

return total;

}

@Override

public void merge(AggregationBuffer agg, Object partial) throws HiveException {

if (partial != null) {

LetterSumAgg myAgg1 = (LetterSumAgg) agg;

Integer partialSum = (Integer) IntegerIO.getPrimitiveJavaObject(partial);

LetterSumAgg myAgg2 = new LetterSumAgg();

myAgg2.add(partialSum);

myAgg1.add(myAgg2.sum);

}

}

@Override

public Object terminate(AggregationBuffer agg) throws HiveException {

LetterSumAgg myAgg = (LetterSumAgg) agg;

total = myAgg.sum;

return myAgg.sum;

}

}

}

使用自定义函数

点击(此处)折叠或打开

ADD JAR ./hive-extension-examples-master/target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar;

CREATE TEMPORARY FUNCTION letters as 'com.xxxx.udaf.TotalNumOfLettersGenericUDAF';

SELECT letters(name) FROM people;

OK

44

Time taken: 20.688 seconds

hive udaf_Hive UDAF 函数的编写相关推荐

  1. hive udaf_Hive自定义函数

    为什么需要自定义函数 hive的内置函数满足不了所有的业务需求. hive提供很多的模块可以自定义功能,比如:自定义函数.serde.输入输出格式等. 常见自定义函数UDF分三种: UDF(User ...

  2. Hive 之 用户自定义函数 UDF UDAF UDTF

    一 什么是UDF UDF是UserDefined Function 用户自定义函数的缩写.Hive中除了原生提供的一些函数之外,如果还不能满足我们当前需求,我们可以自定义函数. 除了UDF 之外,我们 ...

  3. 【大数据开发】SparkSQL——Spark对接Hive、Row类、SparkSQL函数、UDF函数(用户自定义函数)、UDAF函数、性能调优、SparkSQL解决数据倾斜

    文章目录 一.Spark对接Hive准备工作 1.1 集群文件下载 1.2 导入依赖 1.3 打开集群metastore服务 二.Spark对接Hive 2.1 查询Hive 2.2 读取MySQL中 ...

  4. Hive 内置函数及自定义函数

    1.内置函数 使用如下命令查看当前hive版本支持的所有内置函数 show functions; 部分截图: 可以使用如下命令查看某个函数的使用方法及作用,比如查看 upper函数 desc func ...

  5. 大数据入门教程系列之Hive内置函数及自定义函数

    本篇文章主要介绍Hive内置函数以及自定义UDF函数和UDFT函数,自定义UDF函数通过一个国际转换中文的例子说明. 操作步骤: ①.准备数据和环境 ②.演示Hive内置函数 ③.自定义UDF函数编写 ...

  6. hive 的udf 函数使用

    1)依据课程讲解UDF编程案例,完成练习,总结开发UDF步骤,代码贴图,给予注释,重点 2)更改emp 表中名字的大写给为小写. 一:hive 的udf 函数: 1.1 hive UDF 函数概述: ...

  7. hive内置函数_flink教程flink modules详解之使用hive函数

    modules概念 通过hive module使用hive函数 内置函数 自定义函数 sql 客户端的使用 原理分析和源码解析 实现 modules概念 flink 提供了一个module的概念,使用 ...

  8. UDF函数和UDTF函数的图解举例,追加UDAF函数

    简述UDF/UDAF/UDTF是什么,各自解决问题及应用场景 - 玩转大数据 - 博客园 自定义UDF和UDTF函数的两个作用点: 1.埋点log打印日志,方便任务出现问题后进行调试 2.有一些SQL ...

  9. hive的UDF函数的使用。常见UDF函数

    UDF的话一般是hive提供的函数功能满足不了业务需要,我们就会自己来写UDF函数来辅助完成,对于我们常用的函数而言还是哪些常见的聚合函数,如:count.sum.avg.max.min等,其他的话就 ...

最新文章

  1. python中函数和方法的区别?Python编程判断当前获取的对象是函数还是方法
  2. (转载)linux下输入输出重定向和管道符
  3. 【每日算法】C语言8大经典排序算法(2)
  4. Linux基金会宣布开发区块链技术的新团队
  5. excel的宏与VBA入门(三)——流程控制
  6. SAP Spartacus index.html 里的 occ-backend-base-url 如何被解析的?
  7. Nginx编译安装和平滑升级
  8. 全数字实时仿真平台SkyEye的同步数据流语言可信编译器的构造
  9. 机器学习——数据预处理
  10. HITS 算法(Hypertext Induced TopicSelection)
  11. java中判断字符串是否为纯数字
  12. STM(Software Transactional Memory Systems)是什么 怎么用
  13. map集合和javabean对象之间相互转换方法
  14. Highcharts 隐藏右下角的官网链接
  15. 计算机如何取消还原卡,如何关闭硬件还原卡?
  16. 安卓手机安装谷歌框架
  17. matlab interp插值函数
  18. 储存profiles是什么意思_save profile是什么意思
  19. swift tabbar 自定义+号 底部34导航栏适配
  20. 随笔之javamail邮件发送(阿里云企业邮箱)

热门文章

  1. C语言之字符串探究(十):递归逆置字符串
  2. oracle10g em 产生log,如何创建Oracle10G EM dbcontrol
  3. .form文件_含文件上传的form表单AJAX提交小结
  4. html表单实现ajax登陆,node.js+jQuery实现用户登录注册AJAX交互
  5. c语言可以调用汇编语言吗,C语言与汇编语言混编方式
  6. 插件代码_我们开源了一款 SonarQube iOS 代码扫描插件
  7. html5技术英文论文参考文献,英文论文的参考文献范例(精选8篇)
  8. win10 mysql 远程访问_win10 docker部署mysql并启动远程连接
  9. flask 加载配置文件
  10. javascript arraybuffer