Hive 之 用户自定义函数 UDF UDAF UDTF
一 什么是UDF
UDF是UserDefined Function 用户自定义函数的缩写。Hive中除了原生提供的一些函数之外,如果还不能满足我们当前需求,我们可以自定义函数。
除了UDF 之外,我们还可以定义聚合函数UDAF 和 Table-Generating函数
二 如何创建UDF函数
2.1编写JAVA类,需要继承UDF类或者GenericUDF
一般需要返回简单数据类型的,继承UDF就可以,然后实现evaluate方法;如果类型稍微复杂的可以使用GenericUDF,然后实现initializegetDisplayString evaluate方法。
publicclass UDFStripDoubleQuotes extends UDF{
private staticfinal String DOUBLE_QUOTES ="\"";
private staticfinal String BLANK_SYMBOL ="";
public Text evaluate (Text text) throws UDFArgumentException{
if (null ==text || BLANK_SYMBOL.equals(text)) {
throw new UDFArgumentException("The function STRIP_DOUBLE_QUOTES(s) takes exactly 1arguments.");
}
Stringtemp = text.toString().trim();
if (temp.startsWith(DOUBLE_QUOTES) ||temp.endsWith(DOUBLE_QUOTES)) {
temp = temp.replace(DOUBLE_QUOTES,BLANK_SYMBOL);
}
return new Text(temp);
}
}
2.2编译这个java类并打成jar包
2.3在hive中添加jar包
hive(hadoop)> add jar /opt/data/UDFStripDoubleQuotes.jar;
Added/opt/data/UDFStripDoubleQuotes.jar to class path
Addedresource: /opt/data/UDFStripDoubleQuotes.jar
2.4创建临时函数和永久函数
2.4.1创建临时函数
语法:CREATETEMPORARY FUCNTION strip_double_quotes
AS' com.hive.udf.UDFStripDoubleQuotes';
2.4.2创建永久函数
CREATEFUNCTION [db_name.]function_name AS class_name
[USINGJAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];
说白了其实就是把jar放到HDFS上,然后指定这个函数是哪一个数据库的,然后跟一个URL,这一个url就是你jar包所放的那个HDFS目录
举个例子:
CREATEFUNCTION hadoop09.lu_str AS 'com.hive.udf.LowerAnd
UpperUDF'USING JAR 'hdfs:/var/hive/udf/lowerOrUpper.jar';
2.5测试
准备测试数据:/opt/data/quotes.txt
"10" "ACCOUNTING" "NEW YORK"
"20" "RESEARCH" "DALLAS"
"30" "SALES" 'CHICAGO'
"40" "OPERATIONS" 'BOSTON'
Hive这边:
CREATETABLE t_dept LIKE dept;
LOADDATA LOCAL INPATH '/opt/data/quotes.txt' INTO TABLE t_dept;
SELECTstrip_double_quotes(dname) name, strip_double_quotes(loc) loc FROM t_dept;
运行结果:
name loc
ACCOUNTING NEW YORK
RESEARCH DALLAS
SALES 'CHICAGO'
OPERATIONS 'BOSTON'
三 如何创建UDAF函数
继承AbstractGenericUDAFAverageEvaluator,并且继承Generic
UDAFEvaluator。GenericUDAFEvaluator就是根据job不同的阶段执行不同的方法。Hive通过GenericUDAFEvaluator.Model来确定job的执行阶段。
那有哪些阶段呢?
PARTIAL1:从原始数据到部分聚合,会调用iterate,terminatePartial方法 -->map的输入 到 输出
PARTIAL2:从部分数据聚合和部分数据聚合,会调用merge和terminatePartial--> map的输出 到reduce输入
FINAL: 从部分数据聚合到全部数据聚合,调用merge和 terminate方法 -->Reduce输入到输出
COMPLETE: 从原始数据到全部数据聚合,会调用iterate和 terminate方法;没有reduce阶段,只有map阶段
有几个注意点:
如果要聚合的数据量比较大,我们需要注意内存是否够,很容易出现内存溢出的问题;
尽可能重用对象,尽量避免new对象,尽量减轻JVM垃圾回收的过程。
publicclass UDAFAdd extendsAbstractGenericUDAFResolver{
static final LogLOG = LogFactory.getLog(UDAFAdd.class.getName());
@Override
publicGenericUDAFEvaluator getEvaluator(TypeInfo[]arguments) throws SemanticException {
//check arguments length
if (arguments.length != 1) {
throw new UDFArgumentException("Exactly one argument is expected.");
}
//check if arguments data type is primitive
if (arguments[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException("Argument is not expected.");
}
switch(((PrimitiveTypeInfo)arguments[0]).getPrimitiveCategory()) {
case BYTE:
case SHORT:
case INT:
case LONG:
return new UDAFAddLong();
case FLOAT:
case DOUBLE:
return new UDAFAddDouble();
default:
throw new UDFArgumentException("Only numeric or string type argumentsare accepted but "
+arguments[0].getTypeName() + " is passed.");
}
}
public staticclass UDAFAddDouble extendsGenericUDAFEvaluator{
privatePrimitiveObjectInspectorinputOI;
privateDoubleWritable result;
//invoke the INIT method on the each stage
@Override
publicObjectInspector init(Modemode, ObjectInspector[] arguments) throws HiveException {
super.init(mode,arguments);
//INIT double value
result = new DoubleWritable(0);
inputOI =(DoubleObjectInspector)arguments[0];
returnPrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
}
/**
* it is used for storing aggregation resultduring the process of aggregation
*@authornickyzhang
*
*/
static class AddDoubleAggextendsAbstractAggregationBuffer{
boolean empty;
double sum;
@Override
public int estimate() {
return JavaDataModel.PRIMITIVES1 + JavaDataModel.PRIMITIVES2;
}
}
/**Get a new aggregation object*/
@Override
public AggregationBuffergetNewAggregationBuffer()throws HiveException {
AddDoubleAggaddDoubleAgg = new AddDoubleAgg();
reset(addDoubleAgg);
return addDoubleAgg;
}
/** Reset the aggregation. This is useful if we want toreuse the same aggregation. */
@Override
public void reset(AggregationBufferagg) throws HiveException {
AddDoubleAggaddDoubleAgg = (AddDoubleAgg)agg;
addDoubleAgg.empty = Boolean.TRUE;
addDoubleAgg.sum = 0;
}
/** Iterate through original data.*/
@Override
public void iterate(AggregationBufferagg, Object[] arguments)throwsHiveException {
if (arguments.length != 1) {
throw new UDFArgumentException("Just one argument expected!");
}
this.merge(agg,arguments);
}
/** Get partial aggregation result.*/
@Override
public ObjectterminatePartial(AggregationBufferagg) throws HiveException {
returnterminate(agg);
}
/**Combiner or Reduce merge the mapper*/
@Override
public void merge(AggregationBufferagg, Object partial)throwsHiveException {
if (partial ==null) {
return;
}
AddDoubleAggaddDoubleAgg = (AddDoubleAgg)agg;
addDoubleAgg.empty =false;
addDoubleAgg.sum += PrimitiveObjectInspectorUtils.getDouble(partial,inputOI);
}
/** Get final aggregation result */
@Override
public Objectterminate(AggregationBufferagg) throws HiveException {
AddDoubleAggaddDoubleAgg = (AddDoubleAgg)agg;
if (addDoubleAgg.empty) {
return null;
}
result.set(addDoubleAgg.sum);
return result;
}
}
public staticclass UDAFAddLong extendsGenericUDAFEvaluator{
privatePrimitiveObjectInspectorinputOI;
private LongWritableresult;
//invoke the INIT method on the each stage
@Override
publicObjectInspector init(Modemode, ObjectInspector[] arguments) throws HiveException {
super.init(mode,arguments);
//INIT double value
result = new LongWritable(0);
inputOI =(LongObjectInspector)arguments[0];
returnPrimitiveObjectInspectorFactory.writableLongObjectInspector;
}
/**
* it is used for storing aggregation resultduring the process of aggregation
*@authornickyzhang
*
*/
static class AddLongAggextends AbstractAggregationBuffer{
boolean empty;
long sum;
@Override
public int estimate() {
return JavaDataModel.PRIMITIVES1 + JavaDataModel.PRIMITIVES2;
}
}
/**Get a new aggregation object*/
@Override
public AggregationBuffergetNewAggregationBuffer()throws HiveException {
AddLongAggaddLongAgg = new AddLongAgg();
reset(addLongAgg);
return addLongAgg;
}
/** Reset the aggregation. This is useful if we want toreuse the same aggregation. */
@Override
public void reset(AggregationBufferagg) throws HiveException {
AddLongAggaddLongAgg = (AddLongAgg)agg;
addLongAgg.empty = Boolean.TRUE;
addLongAgg.sum = 0;
}
/** Iterate through original data.*/
@Override
public void iterate(AggregationBufferagg, Object[] arguments)throwsHiveException {
if (arguments.length != 1) {
throw new UDFArgumentException("Just one argument expected!");
}
this.merge(agg,arguments);
}
/** Get partial aggregation result.*/
@Override
public ObjectterminatePartial(AggregationBufferagg) throws HiveException {
returnterminate(agg);
}
/**Combiner or Reduce merge the mapper*/
@Override
public void merge(AggregationBufferagg, Object partial)throws HiveException{
if (partial ==null) {
return;
}
AddLongAggaddLongAgg = (AddLongAgg)agg;
addLongAgg.empty =false;
addLongAgg.sum += PrimitiveObjectInspectorUtils.getDouble(partial,inputOI);
}
/** Get final aggregation result */
@Override
public Objectterminate(AggregationBufferagg) throws HiveException {
AddLongAggaddLongAgg = (AddLongAgg)agg;
if (addLongAgg.empty) {
return null;
}
result.set(addLongAgg.sum);
return result;
}
}
}
四 如何创建UDTF函数
一般用于解析工作,比如说解析url,然后获取url信息,需要继承GenericUDTF.
publicclass UDTFEmail extendsGenericUDTF{
@Override
publicStructObjectInspector initialize(StructObjectInspectorinspector) throws UDFArgumentException {
if (inspector ==null) {
throw new UDFArgumentException("arguments is null");
}
List args = inspector.getAllStructFieldRefs();
if(CollectionUtils.isEmpty(args) ||args.size()!= 1){
throw new UDFArgumentException("UDF tables only one argument");
}
List<String>fields = new ArrayList<String>();
fields.add("name");
fields.add("email");
List<ObjectInspector>fieldIOList = new ArrayList<ObjectInspector>();
fieldIOList.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldIOList.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
returnObjectInspectorFactory.getStandardStructObjectInspector(fields,fieldIOList);
}
@Override
public void process(Object[]args) throws HiveException {
if(ArrayUtils.isEmpty(args) ||args.length != 1) {
return;
}
Stringname = args[0].toString();
Stringemail = name +"@163.com";
super.forward(new String[] {name,email});
}
@Override
public void close()throws HiveException {
super.forward(new String[] {"complete","finish"});
}
}
五UDF UDAF UDTF 区别
UDF:一进一出
UDAF:多进一出,一般聚合用
UDTF:一进多出
Hive 之 用户自定义函数 UDF UDAF UDTF相关推荐
- 【Flink】Flink Table SQL 用户自定义函数: UDF、UDAF、UDTF
本文总结Flink Table & SQL中的用户自定义函数: UDF.UDAF.UDTF. UDF: 自定义标量函数(User Defined Scalar Function).一行输入一行 ...
- udf,udaf,udtf之间的区别
1.UDF:用户定义(普通)函数,只对单行数值产生作用: 继承UDF类,添加方法 evaluate() /*** @function 自定义UDF统计最小值* @author John**/publi ...
- UDF UDAF UDTF 区别
UDF UDAF UDTF 区别 UDF 概念: User-Defined-Function 自定义函数 .一进一出:只对单行数据产生作用: 实际使用时,UDF函数以匿名函数的形式进行操作使用 背景: ...
- Hive 自定义函数编写(UDF,UDAF,UDTF)
Hive自带了一些函数,比如:max/min等,但是数量有限,自己可以通过自定义 UDF来方便的扩展. 当 Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数. 1. ...
- Hive自定义UDF UDAF UDTF
Hive是一种构建在Hadoop上的数据仓库,Hive把SQL查询转换为一系列在Hadoop集群中运行的MapReduce作业,是MapReduce更高层次的抽象,不用编写具体的MapReduce方法 ...
- hive的udf,udaf,udtf各自依賴兩種class(转载+分析整理)
Hive自定义函数包括三种UDF.UDAF.UDTF 名稱縮寫 特點 依賴 UDF(User-Defined-Function) 一进一出 org.apache.hadoop.hive.ql.exec ...
- Hive中的用户自定义函数UDF
Hive中的自定义函数允许用户扩展HiveQL,是一个非常强大的功能.Hive中具有多种类型的用户自定义函数.show functions命令可以列举出当前Hive会话中的所加载进来的函数,包括内置的 ...
- udf函数(udf udaf udtf)
UDF的定义 UDF(User-Defined Functions)即是用户定义的hive函数.hive自带的函数并不能完全满足业务需求,这时就需要我们自定义函数了 UDF的分类 UDF:one to ...
- 4.2.11 Flink-流处理框架-Table API 与 SQL-函数(Functions)之用户自定义函数 UDF
目录 1.写在前面 2.标量函数(Scalar Functions):一对一 3.表函数(Table Functions):一对多 4.聚合函数(Aggregate Functions) 5.表聚合函 ...
最新文章
- Windows安装配置tidevice
- win2012每次启动显示服务器管理器,win2012r2服务器管理器打开角色.功能出错
- 中国桑叶市发展态势分析与前景动态预测报告场2022-2028年版
- ubuntu自定义安装里怎么选_超市里的五香粉怎么选?看懂配料表,两个小技巧,不怕选不好。...
- 【MPI学习3】MPI并行程序设计模式:不同通信模式MPI并行程序的设计
- 数值分析:插值与拟合
- 继续研究 SO_KEEPALIVE 问题
- mysql sql能力_MySQL SQL优化
- 美团遭遇反垄断调查;微信利用社交垄断封杀西瓜视频;Qt 6 for Python发布|极客头条...
- CCF认证201712-2游戏
- [校园网]绕过校园网使用自己服务器流量教程
- matlab zf预编码,多用户MIMO系统中各种波束成型预编码性能比较(ZF,BD,MMSE,SLNR,MF,SVD)...
- 神经网络学习小记录45——Keras常用学习率下降方式汇总
- python 列表去重拼题a_python list 合并连接字符串的方法 -电脑资料
- 网易云音乐无法正常运行
- 选择正确的云服务,初创企业也能服务上亿用户
- DelphiXE10.3 FMX(FireMonkey) 画图指南 非常全面
- cnn 部分初级代码
- ASP.NET的Application
- Unity-ProjectSetting-Quality-SkinWeights设置导致穿模
热门文章
- 关于容量设计、规划、治理 你知多少?
- mysql 数组变量_如何在MySQL中模拟数组变量?
- 城市代码表_从零开始做一个SLG游戏(六)游戏系统以及配置表
- static在内存层面的作用_C++内存管理笔记
- 算法提高 陶陶摘苹果2(java)
- 中移4G模块-ML302-OpenCpu开发-(MQTT连接阿里云-RRPC通讯)
- 解决JupyterLab/JupyterNotebook安装pycherts后依旧报错报错 ModuleNotFoundError: No module named ‘pyecharts‘
- Auto ARIMA 逐个时间点预测
- Django 模板中 变量 过滤器 标签 的使用方法
- 第四单元作业——UML分析总结学期总结