注意事项:

1.udf、udaf函数的使用都需要使用sqlContext来创建function,如果是scala里需要引用Java的方法或者函数的话,需要包装一下,再写个scala的方法,将Java的返回值输出。

2.scala中的udf函数注册park.sqlContext.udf.register("date_splits",date_splits _)

3.UDTF函数使用的时候,需要创建SparkSession对象,由SparkSession执行sql语句CREATE TEMPORARY FUNCTION myUDTF as '自己实现的UDTF位置’来创建
测试数据

1-174,"121.31583075,30.67559298","121.31583075,30.67784745","121.31848407,30.67784745","121.31848407,30.67559298"
1-175,"121.31848407,30.67559298","121.31848407,30.67784745","121.32113740000001,30.67784745","121.32113740000001,30.67559298"
1-176,"121.32113740000001,30.67559298","121.32113740000001,30.67784745","121.32379073,30.67784745","121.32379073,30.67559298"
1-177,"121.32379073,30.67559298","121.32379073,30.67784745","121.32644406,30.67784745","121.32644406,30.67559298"
1-178,"121.32644406,30.67559298","121.32644406,30.67784745","121.32909739,30.67784745","121.32909739,30.67559298"
1-179,"121.32909739,30.67559298","121.32909739,30.67784745","121.33175072,30.67784745","121.33175072,30.67559298"
1-180,"121.33175072,30.67559298","121.33175072,30.67784745","121.33440404,30.67784745","121.33440404,30.67559298"
1-181,"121.33440404,30.67559298","121.33440404,30.67784745","121.33705737,30.67784745","121.33705737,30.67559298"

本地测试代码

class BaseJob { }
case class LngLatSH1(id:String,lng1:Double,lat1:Double,lng2:Double,lat2:Double,lng3:Double,lat3:Double,lng4:Double,lat4:Double)
object BaseJob{def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName("base_job").enableHiveSupport().master("local[2]").config("spark.sql.warehouse.dir","/user/hive/warehouse").config("spark.sql.shuffle.partitions",100).getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//设置日志输出级别spark.sql("CREATE TEMPORARY FUNCTION myFloatMap as 'hanghai.UDTF_TEST'")
//    spark.sqlContext.udf.register("myFloatMap",myFloatMap _)//这里读取的是hdfs上的文件val dataRDD: RDD[String] = sc.textFile("E:\\idea-workSpace\\wc\\src\\main\\scala\\hanghai\\test_v1.txt")val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(","))val LngLatSH1RDD: RDD[LngLatSH1] = lineArrayRDD.map(x=>LngLatSH1(x(0),x(1).split("\"")(1).toDouble,x(2).split("\"")(0).toDouble,x(3).split("\"")(1).toDouble,x(4).split("\"")(0).toDouble,x(5).split("\"")(1).toDouble,x(6).split("\"")(0).toDouble,x(7).split("\"")(1).toDouble,x(8).split("\"")(0).toDouble))import spark.implicits._val LngLatSH1DF: DataFrame = LngLatSH1RDD.toDF()LngLatSH1DF.createOrReplaceTempView("test")val data = spark.sql("select myFloatMap(lng1) from test").show(3)sc.stop()spark.stop()}
}

1.UDF函数 

import com.alibaba.fastjson.JSONObject;
import org.apache.spark.sql.api.java.UDF2;public class MyUDF implements UDF2<String,String,String> {public String call(String o1, String o2) throws Exception {JSONObject jsonObject = JSONObject.parseObject(o1);return jsonObject.getString(o2);}
}

UDF测试类

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.{DataTypes, StringType, StructField}
import org.apache.spark.sql.{Row, types}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object Test_UDF {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Test_UDF").setMaster("local[2]")val sc = new SparkContext(conf)val ssc = new StreamingContext(sc, Seconds(5))val ds = ssc.socketTextStream("**.***.***.***", 8888)ds.foreachRDD {rdd =>val sqlContext = new HiveContext(sc);sqlContext.udf.register("udf", new MyUDF(), DataTypes.StringType);import sqlContext.implicits._val schema= types.StructType(Seq(StructField("id", StringType, true),StructField("info", StringType, true)))val rowRDD = rdd.map(_.split(" ")).map(p=> Row(p(0),p(1)))val dataFrame = sqlContext.createDataFrame(rowRDD,schema)dataFrame.registerTempTable("test_udf")val dataFrame1 = sqlContext.sql("select id,udf(info,'username') from test_udf")dataFrame1.show();}ssc.start()ssc.awaitTermination()}
}

2.UDAF函数

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._class MyUDAF extends UserDefinedAggregateFunction{override def inputSchema: StructType ={StructType(Array(StructField("str", StringType, true)))}override def bufferSchema: StructType = {StructType(Array(StructField("count", IntegerType, true)))}override def dataType: DataType = {IntegerType}override def deterministic: Boolean = trueoverride def initialize(buffer: MutableAggregationBuffer): Unit ={buffer(0) = 0}override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer(0) = buffer.getAs[Int](0) + 1}override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)}override def evaluate(buffer: Row): Any = {buffer.getAs[Int](0)}
}

UDAF测试类

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.{Row, types}
import org.apache.spark.sql.types.{DataTypes, IntegerType, StringType, StructField}
import org.apache.spark.streaming.{Seconds, StreamingContext}object Test_UDAF {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Test_UDAF").setMaster("local[2]")val sc = new SparkContext(conf)val ssc = new StreamingContext(sc, Seconds(5))val ds = ssc.socketTextStream("**,***,***,***", 8888)ds.foreachRDD {rdd =>val sqlContext = new HiveContext(sc);sqlContext.udf.register("udaf", new MyUDAF);import sqlContext.implicits._val schema = types.StructType(Seq(StructField("name", StringType, true)))val rowRDD = rdd.map(name => Row(name))val dataFrame = sqlContext.createDataFrame(rowRDD, schema)dataFrame.registerTempTable("test_udaf")dataFrame.show()val dataFrame1 = sqlContext.sql("select name,udaf(name) count from test_udaf group by name")dataFrame1.show();}ssc.start()ssc.awaitTermination()}}

UDTF
UDTF需要注意以下几点

不能再使用 sqlContext.udf.register方式来注册自定义函数了,需要创建SparkSession对象,由SparkSession执行sql语句CREATE TEMPORARY FUNCTION myUDTF as '自己实现的UDTF位置’来创建
其次不能把注册函数的语句写在foreachRDD 里面

import java.utilimport org.apache.hadoop.hive.ql.exec.{UDFArgumentException, UDFArgumentLengthException}
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory, StructObjectInspector}class MyUDTF extends GenericUDTF {override def initialize(args: Array[ObjectInspector]): StructObjectInspector = {if (args.length != 1) {throw new UDFArgumentLengthException("UserDefinedUDTF takes only one argument")}if (args(0).getCategory() != ObjectInspector.Category.PRIMITIVE) {throw new UDFArgumentException("UserDefinedUDTF takes string as a parameter")}val fieldNames = new util.ArrayList[String]()val fieldOIs = new util.ArrayList[ObjectInspector]fieldNames.add("name")fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector)fieldNames.add("age")fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector)ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs)}override def close(): Unit = {}override def process(args: Array[AnyRef]): Unit = {val strings = args(0).toString.split(";")for (string <- strings) {val strings1 = string.split(":")forward(strings1)}}
}

UDTF测试类

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.{StringType, StructField}
import org.apache.spark.sql.{Row, SparkSession, types}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object Test_UDTF {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Test_UDTF").setMaster("local[2]")val sc = new SparkContext(conf)val ssc = new StreamingContext(sc, Seconds(5))val ds = ssc.socketTextStream("10.**,**,**", 8888)val session = SparkSession.builder().master("local[2]").appName("Test_UDTF").enableHiveSupport().getOrCreate()//该方式不能使用//session.sql("CREATE TEMPORARY FUNCTION udtf AS 'MyUDTF'")val sqlContext = new HiveContext(sc);sqlContext.sql("CREATE TEMPORARY FUNCTION udtf AS 'MyUDTF'")ds.foreachRDD {rdd =>val schema = types.StructType(Seq(StructField("info", StringType, true)))val rowRDD = rdd.map(p => Row(p))//val dataFrame = session.createDataFrame(rowRDD, schema)val dataFrame = sqlContext.createDataFrame(rowRDD, schema)dataFrame.registerTempTable("test_udtf")val dataFrame1 = session.sql("select udtf(info)as (name,age) from test_udtf")dataFrame1.show();}ssc.start()ssc.awaitTermination()}
}

自定义UDF、UDAF、UDTF函数相关推荐

  1. Hive自定义UDF UDAF UDTF

    Hive是一种构建在Hadoop上的数据仓库,Hive把SQL查询转换为一系列在Hadoop集群中运行的MapReduce作业,是MapReduce更高层次的抽象,不用编写具体的MapReduce方法 ...

  2. 自定义UDF、UDTF函数

    自定义步骤 自定义UDF:继承UDF,重写evaluate方法 自定义UDTF:继承GenericUDTF,重写3个方法:initialize(自定义输出数据的列名和类型),process(将结果返回 ...

  3. Hive 之 用户自定义函数 UDF UDAF UDTF

    一 什么是UDF UDF是UserDefined Function 用户自定义函数的缩写.Hive中除了原生提供的一些函数之外,如果还不能满足我们当前需求,我们可以自定义函数. 除了UDF 之外,我们 ...

  4. udf,udaf,udtf之间的区别

    1.UDF:用户定义(普通)函数,只对单行数值产生作用: 继承UDF类,添加方法 evaluate() /*** @function 自定义UDF统计最小值* @author John**/publi ...

  5. UDF UDAF UDTF 区别

    UDF UDAF UDTF 区别 UDF 概念: User-Defined-Function 自定义函数 .一进一出:只对单行数据产生作用: 实际使用时,UDF函数以匿名函数的形式进行操作使用 背景: ...

  6. Hive 自定义函数编写(UDF,UDAF,UDTF)

    Hive自带了一些函数,比如:max/min等,但是数量有限,自己可以通过自定义 UDF来方便的扩展. 当 Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数. 1. ...

  7. Hive自定义UDF和聚合函数UDAF

    2019独角兽企业重金招聘Python工程师标准>>> 转自:http://computerdragon.blog.51cto.com/6235984/1288567 Hive是一种 ...

  8. hive的udf,udaf,udtf各自依賴兩種class(转载+分析整理)

    Hive自定义函数包括三种UDF.UDAF.UDTF 名稱縮寫 特點 依賴 UDF(User-Defined-Function) 一进一出 org.apache.hadoop.hive.ql.exec ...

  9. udf函数(udf udaf udtf)

    UDF的定义 UDF(User-Defined Functions)即是用户定义的hive函数.hive自带的函数并不能完全满足业务需求,这时就需要我们自定义函数了 UDF的分类 UDF:one to ...

  10. 【Flink】Flink自定义UDF以及为函数启用别名

    Spring Boot 面试精讲 1.概述 1.1 Scalar Functions 标量函数 标量函数,是指返回一个值的函数.标量函数是实现将0,1,或者多个标量值转化为一个新值. 实现一个标量函数 ...

最新文章

  1. 【Python】Label组件 Button组件 Checkbutton组件
  2. java 庖丁解牛api_Java Restful API Best Practices
  3. 定义一个dto对象_正确理解DTO、值对象和POCO
  4. 如何在SAP社区上查找其他SAP从业者遇到的问题或者提新问题
  5. html 数据钩子,22.钩子函数.html
  6. 6张信用卡欠款10000到47000不等,会被起诉坐牢吗?
  7. hashtable,dictionary 从原理上说说有什么异同,哪个性能高一些
  8. ssh-keygen实现免密码登陆
  9. 中国PaaS/SaaS市场研究(2021)
  10. 巡风代码架构简介以及Flask的项目文件结构简介
  11. 西安互联网公司和生活成本
  12. 2022年最全快手市场研报合集(共61份)
  13. excel文档加密破解,简单操作亲测有效
  14. Debian7.5--双屏显示问题
  15. 第一序列任小粟的能力_第一序列:上进青年任小粟得知世界真相后,却加倍强迫六元学习?...
  16. WhyNotWin11(win11升级检测工具)绿色便携版V2.1.0.0下载 | 比微软PC Health Check好用
  17. 算法作业 (三)——— 装箱问题
  18. here is no getter for property named 解决方案及问题分析
  19. 【模拟面试-10年工作】项目多一定是优势吗?
  20. plant simulation物流系统仿真案例

热门文章

  1. CToolBar工具条控制方法
  2. Telnet 服务 开启 步骤
  3. Java 学习笔记 - AutoCloseable、Closeable
  4. linux 中nohup命令的作用
  5. modprobe命令介绍
  6. 数据清洗是清洗什么?
  7. 计算机wonder基础知识,WonderSkills教材来了!一套更适合中国孩子的美国语文教材...
  8. html+css——做一个简单的底部导航栏
  9. 百趣代谢组学文献分享 | 建立基于代谢组学的ICU脓毒症患者预后预测模型
  10. dst发育筛查有意义吗_Dst发育筛查是什么?