一 什么是UDF

UDF是UserDefined Function 用户自定义函数的缩写。Hive中除了原生提供的一些函数之外,如果还不能满足我们当前需求,我们可以自定义函数。

除了UDF 之外,我们还可以定义聚合函数UDAF 和 Table-Generating函数

二 如何创建UDF函数

2.1编写JAVA类,需要继承UDF类或者GenericUDF

一般需要返回简单数据类型的,继承UDF就可以,然后实现evaluate方法;如果类型稍微复杂的可以使用GenericUDF,然后实现initializegetDisplayString evaluate方法。

publicclass UDFStripDoubleQuotes extends UDF{

private staticfinal String DOUBLE_QUOTES ="\"";

private staticfinal String BLANK_SYMBOL ="";

public  Text evaluate (Text text) throws UDFArgumentException{

if (null ==text || BLANK_SYMBOL.equals(text)) {

throw new UDFArgumentException("The function STRIP_DOUBLE_QUOTES(s) takes exactly 1arguments.");

}

Stringtemp = text.toString().trim();

if (temp.startsWith(DOUBLE_QUOTES) ||temp.endsWith(DOUBLE_QUOTES)) {

temp = temp.replace(DOUBLE_QUOTES,BLANK_SYMBOL);

}

return new Text(temp);

}

}

2.2编译这个java类并打成jar包

2.3在hive中添加jar包

hive(hadoop)> add jar /opt/data/UDFStripDoubleQuotes.jar;

Added/opt/data/UDFStripDoubleQuotes.jar to class path

Addedresource: /opt/data/UDFStripDoubleQuotes.jar

2.4创建临时函数和永久函数

2.4.1创建临时函数

语法:CREATETEMPORARY FUCNTION strip_double_quotes

AS' com.hive.udf.UDFStripDoubleQuotes';

2.4.2创建永久函数

CREATEFUNCTION [db_name.]function_name AS class_name

[USINGJAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];

说白了其实就是把jar放到HDFS上,然后指定这个函数是哪一个数据库的,然后跟一个URL,这一个url就是你jar包所放的那个HDFS目录

举个例子:

CREATEFUNCTION hadoop09.lu_str AS 'com.hive.udf.LowerAnd

UpperUDF'USING JAR 'hdfs:/var/hive/udf/lowerOrUpper.jar';

2.5测试

准备测试数据:/opt/data/quotes.txt

"10"    "ACCOUNTING"    "NEW YORK"

"20"    "RESEARCH"      "DALLAS"

"30"    "SALES" 'CHICAGO'

"40"    "OPERATIONS"    'BOSTON'

Hive这边:

CREATETABLE t_dept LIKE dept;

LOADDATA LOCAL INPATH '/opt/data/quotes.txt' INTO TABLE t_dept;

SELECTstrip_double_quotes(dname) name, strip_double_quotes(loc) loc FROM t_dept;

运行结果:

name                   loc

ACCOUNTING  NEW YORK

RESEARCH        DALLAS

SALES                 'CHICAGO'

OPERATIONS    'BOSTON'

三 如何创建UDAF函数

继承AbstractGenericUDAFAverageEvaluator,并且继承Generic

UDAFEvaluator。GenericUDAFEvaluator就是根据job不同的阶段执行不同的方法。Hive通过GenericUDAFEvaluator.Model来确定job的执行阶段。

那有哪些阶段呢?

PARTIAL1:从原始数据到部分聚合,会调用iterate,terminatePartial方法 -->map的输入 到 输出

PARTIAL2:从部分数据聚合和部分数据聚合,会调用merge和terminatePartial--> map的输出 到reduce输入

FINAL: 从部分数据聚合到全部数据聚合,调用merge和 terminate方法 -->Reduce输入到输出

COMPLETE: 从原始数据到全部数据聚合,会调用iterate和 terminate方法;没有reduce阶段,只有map阶段

有几个注意点:

如果要聚合的数据量比较大,我们需要注意内存是否够,很容易出现内存溢出的问题;

尽可能重用对象,尽量避免new对象,尽量减轻JVM垃圾回收的过程。

publicclass UDAFAdd extendsAbstractGenericUDAFResolver{

static final LogLOG = LogFactory.getLog(UDAFAdd.class.getName());

@Override

publicGenericUDAFEvaluator getEvaluator(TypeInfo[]arguments) throws SemanticException {

//check arguments length

if (arguments.length != 1) {

throw new UDFArgumentException("Exactly one argument is expected.");

}

//check if arguments data type is primitive

if (arguments[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {

throw new UDFArgumentException("Argument is not expected.");

}

switch(((PrimitiveTypeInfo)arguments[0]).getPrimitiveCategory()) {

case BYTE:

case SHORT:

case INT:

case LONG:

return new UDAFAddLong();

case FLOAT:

case DOUBLE:

return new UDAFAddDouble();

default:

throw new UDFArgumentException("Only numeric or string type argumentsare accepted but "

+arguments[0].getTypeName() + " is passed.");

}

}

public staticclass UDAFAddDouble extendsGenericUDAFEvaluator{

privatePrimitiveObjectInspectorinputOI;

privateDoubleWritable result;

//invoke the INIT method on the each stage

@Override

publicObjectInspector init(Modemode, ObjectInspector[] arguments) throws HiveException {

super.init(mode,arguments);

//INIT double value

result = new DoubleWritable(0);

inputOI =(DoubleObjectInspector)arguments[0];

returnPrimitiveObjectInspectorFactory.writableDoubleObjectInspector;

}

/**

* it is used for storing aggregation resultduring the process of aggregation

*@authornickyzhang

*

*/

static class AddDoubleAggextendsAbstractAggregationBuffer{

boolean empty;

double sum;

@Override

public int estimate() {

return JavaDataModel.PRIMITIVES1 + JavaDataModel.PRIMITIVES2;

}

}

/**Get a new aggregation object*/

@Override

public AggregationBuffergetNewAggregationBuffer()throws HiveException {

AddDoubleAggaddDoubleAgg = new AddDoubleAgg();

reset(addDoubleAgg);

return addDoubleAgg;

}

/** Reset the aggregation. This is useful if we want toreuse the same aggregation. */

@Override

public void reset(AggregationBufferagg) throws HiveException {

AddDoubleAggaddDoubleAgg = (AddDoubleAgg)agg;

addDoubleAgg.empty = Boolean.TRUE;

addDoubleAgg.sum = 0;

}

/** Iterate through original data.*/

@Override

public void iterate(AggregationBufferagg, Object[] arguments)throwsHiveException {

if (arguments.length != 1) {

throw new UDFArgumentException("Just one argument expected!");

}

this.merge(agg,arguments);

}

/** Get partial aggregation result.*/

@Override

public ObjectterminatePartial(AggregationBufferagg) throws HiveException {

returnterminate(agg);

}

/**Combiner or Reduce merge the mapper*/

@Override

public void merge(AggregationBufferagg, Object partial)throwsHiveException {

if (partial ==null) {

return;

}

AddDoubleAggaddDoubleAgg = (AddDoubleAgg)agg;

addDoubleAgg.empty =false;

addDoubleAgg.sum += PrimitiveObjectInspectorUtils.getDouble(partial,inputOI);

}

/** Get final aggregation result */

@Override

public Objectterminate(AggregationBufferagg) throws HiveException {

AddDoubleAggaddDoubleAgg = (AddDoubleAgg)agg;

if (addDoubleAgg.empty) {

return null;

}

result.set(addDoubleAgg.sum);

return result;

}

}

public staticclass UDAFAddLong extendsGenericUDAFEvaluator{

privatePrimitiveObjectInspectorinputOI;

private LongWritableresult;

//invoke the INIT method on the each stage

@Override

publicObjectInspector init(Modemode, ObjectInspector[] arguments) throws HiveException {

super.init(mode,arguments);

//INIT double value

result = new LongWritable(0);

inputOI =(LongObjectInspector)arguments[0];

returnPrimitiveObjectInspectorFactory.writableLongObjectInspector;

}

/**

* it is used for storing aggregation resultduring the process of aggregation

*@authornickyzhang

*

*/

static class AddLongAggextends AbstractAggregationBuffer{

boolean empty;

long sum;

@Override

public int estimate() {

return JavaDataModel.PRIMITIVES1 + JavaDataModel.PRIMITIVES2;

}

}

/**Get a new aggregation object*/

@Override

public AggregationBuffergetNewAggregationBuffer()throws HiveException {

AddLongAggaddLongAgg = new AddLongAgg();

reset(addLongAgg);

return addLongAgg;

}

/** Reset the aggregation. This is useful if we want toreuse the same aggregation. */

@Override

public void reset(AggregationBufferagg) throws HiveException {

AddLongAggaddLongAgg = (AddLongAgg)agg;

addLongAgg.empty = Boolean.TRUE;

addLongAgg.sum = 0;

}

/** Iterate through original data.*/

@Override

public void iterate(AggregationBufferagg, Object[] arguments)throwsHiveException {

if (arguments.length != 1) {

throw new UDFArgumentException("Just one argument expected!");

}

this.merge(agg,arguments);

}

/** Get partial aggregation result.*/

@Override

public ObjectterminatePartial(AggregationBufferagg) throws HiveException {

returnterminate(agg);

}

/**Combiner or Reduce merge the mapper*/

@Override

public void merge(AggregationBufferagg, Object partial)throws HiveException{

if (partial ==null) {

return;

}

AddLongAggaddLongAgg = (AddLongAgg)agg;

addLongAgg.empty =false;

addLongAgg.sum += PrimitiveObjectInspectorUtils.getDouble(partial,inputOI);

}

/** Get final aggregation result */

@Override

public Objectterminate(AggregationBufferagg) throws HiveException {

AddLongAggaddLongAgg = (AddLongAgg)agg;

if (addLongAgg.empty) {

return null;

}

result.set(addLongAgg.sum);

return result;

}

}

}

四 如何创建UDTF函数

一般用于解析工作,比如说解析url,然后获取url信息,需要继承GenericUDTF.

publicclass UDTFEmail extendsGenericUDTF{

@Override

publicStructObjectInspector initialize(StructObjectInspectorinspector) throws UDFArgumentException {

if (inspector ==null) {

throw new UDFArgumentException("arguments is null");

}

List args = inspector.getAllStructFieldRefs();

if(CollectionUtils.isEmpty(args) ||args.size()!= 1){

throw new UDFArgumentException("UDF tables only one argument");

}

List<String>fields = new ArrayList<String>();

fields.add("name");

fields.add("email");

List<ObjectInspector>fieldIOList = new ArrayList<ObjectInspector>();

fieldIOList.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

fieldIOList.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

returnObjectInspectorFactory.getStandardStructObjectInspector(fields,fieldIOList);

}

@Override

public void process(Object[]args) throws HiveException {

if(ArrayUtils.isEmpty(args) ||args.length != 1) {

return;

}

Stringname = args[0].toString();

Stringemail = name +"@163.com";

super.forward(new String[] {name,email});

}

@Override

public void close()throws HiveException {

super.forward(new String[] {"complete","finish"});

}

}

五UDF UDAF UDTF 区别

UDF:一进一出

UDAF:多进一出,一般聚合用

UDTF:一进多出

Hive 之 用户自定义函数 UDF UDAF UDTF相关推荐

  1. 【Flink】Flink Table SQL 用户自定义函数: UDF、UDAF、UDTF

    本文总结Flink Table & SQL中的用户自定义函数: UDF.UDAF.UDTF. UDF: 自定义标量函数(User Defined Scalar Function).一行输入一行 ...

  2. udf,udaf,udtf之间的区别

    1.UDF:用户定义(普通)函数,只对单行数值产生作用: 继承UDF类,添加方法 evaluate() /*** @function 自定义UDF统计最小值* @author John**/publi ...

  3. UDF UDAF UDTF 区别

    UDF UDAF UDTF 区别 UDF 概念: User-Defined-Function 自定义函数 .一进一出:只对单行数据产生作用: 实际使用时,UDF函数以匿名函数的形式进行操作使用 背景: ...

  4. Hive 自定义函数编写(UDF,UDAF,UDTF)

    Hive自带了一些函数,比如:max/min等,但是数量有限,自己可以通过自定义 UDF来方便的扩展. 当 Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数. 1. ...

  5. Hive自定义UDF UDAF UDTF

    Hive是一种构建在Hadoop上的数据仓库,Hive把SQL查询转换为一系列在Hadoop集群中运行的MapReduce作业,是MapReduce更高层次的抽象,不用编写具体的MapReduce方法 ...

  6. hive的udf,udaf,udtf各自依賴兩種class(转载+分析整理)

    Hive自定义函数包括三种UDF.UDAF.UDTF 名稱縮寫 特點 依賴 UDF(User-Defined-Function) 一进一出 org.apache.hadoop.hive.ql.exec ...

  7. Hive中的用户自定义函数UDF

    Hive中的自定义函数允许用户扩展HiveQL,是一个非常强大的功能.Hive中具有多种类型的用户自定义函数.show functions命令可以列举出当前Hive会话中的所加载进来的函数,包括内置的 ...

  8. udf函数(udf udaf udtf)

    UDF的定义 UDF(User-Defined Functions)即是用户定义的hive函数.hive自带的函数并不能完全满足业务需求,这时就需要我们自定义函数了 UDF的分类 UDF:one to ...

  9. 4.2.11 Flink-流处理框架-Table API 与 SQL-函数(Functions)之用户自定义函数 UDF

    目录 1.写在前面 2.标量函数(Scalar Functions):一对一 3.表函数(Table Functions):一对多 4.聚合函数(Aggregate Functions) 5.表聚合函 ...

最新文章

  1. Windows安装配置tidevice
  2. win2012每次启动显示服务器管理器,win2012r2服务器管理器打开角色.功能出错
  3. 中国桑叶市发展态势分析与前景动态预测报告场2022-2028年版
  4. ubuntu自定义安装里怎么选_超市里的五香粉怎么选?看懂配料表,两个小技巧,不怕选不好。...
  5. 【MPI学习3】MPI并行程序设计模式:不同通信模式MPI并行程序的设计
  6. 数值分析:插值与拟合
  7. 继续研究 SO_KEEPALIVE 问题
  8. mysql sql能力_MySQL SQL优化
  9. 美团遭遇反垄断调查;微信利用社交垄断封杀西瓜视频;Qt 6 for Python发布|极客头条...
  10. CCF认证201712-2游戏
  11. [校园网]绕过校园网使用自己服务器流量教程
  12. matlab zf预编码,多用户MIMO系统中各种波束成型预编码性能比较(ZF,BD,MMSE,SLNR,MF,SVD)...
  13. 神经网络学习小记录45——Keras常用学习率下降方式汇总
  14. python 列表去重拼题a_python list 合并连接字符串的方法 -电脑资料
  15. 网易云音乐无法正常运行
  16. 选择正确的云服务,初创企业也能服务上亿用户
  17. DelphiXE10.3 FMX(FireMonkey) 画图指南 非常全面
  18. cnn 部分初级代码
  19. ASP.NET的Application
  20. Unity-ProjectSetting-Quality-SkinWeights设置导致穿模

热门文章

  1. 关于容量设计、规划、治理 你知多少?
  2. mysql 数组变量_如何在MySQL中模拟数组变量?
  3. 城市代码表_从零开始做一个SLG游戏(六)游戏系统以及配置表
  4. static在内存层面的作用_C++内存管理笔记
  5. 算法提高 陶陶摘苹果2(java)
  6. 中移4G模块-ML302-OpenCpu开发-(MQTT连接阿里云-RRPC通讯)
  7. 解决JupyterLab/JupyterNotebook安装pycherts后依旧报错报错 ModuleNotFoundError: No module named ‘pyecharts‘
  8. Auto ARIMA 逐个时间点预测
  9. Django 模板中 变量 过滤器 标签 的使用方法
  10. 第四单元作业——UML分析总结学期总结