UDF:用户自定义函数。

可以自定义类实现UDFX接口。

javaAPI:

package com.udf;import javafx.scene.chart.PieChart;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;/*** @author George* @description**/
public class Udf {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setAppName("udf");conf.setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> rdd = sc.parallelize(Arrays.asList("George","GeorgeDage","kangkang"));JavaRDD<Row> map = rdd.map(new Function<String, Row>() {public Row call(String v1) throws Exception {return RowFactory.create(v1);}});List<StructField> fields = new ArrayList<StructField>();fields.add(DataTypes.createStructField("name",DataTypes.StringType,true));StructType schema = DataTypes.createStructType(fields);Dataset<Row> df = sqlContext.createDataFrame(map, schema);df.show();/*** +----------+* |      name|* +----------+* |    George|* |GeorgeDage|* |  kangkang|* +----------+*/df.registerTempTable("user");/*** 根据UDF函数参数的个数来决定是实现哪一个UDF  UDF1,UDF2。。。。UDF1xxx*/sqlContext.udf().register("StrLen", new UDF1<String, Integer>() {public Integer call(String s) throws Exception {return s.length();}},DataTypes.IntegerType);sqlContext.sql("select name,StrLen(name) as length from user").show();/*** +----------+------+* |      name|length|* +----------+------+* |    George|     6|* |GeorgeDage|    10|* |  kangkang|     8|* +----------+------+*/sqlContext.udf().register("StrLen", new UDF2<String, Integer, Integer>() {public Integer call(String s, Integer integer) throws Exception {return s.length()+integer;}}, DataTypes.IntegerType);sqlContext.sql("select name,StrLen(name,10) as length from user").show();/*** +----------+------+* |      name|length|* +----------+------+* |    George|    16|* |GeorgeDage|    20|* |  kangkang|    18|* +----------+------+*/sc.stop();}
}

scalaAPI:

package com.udfimport org.apache.spark.sql.SparkSession/*** UDF用户自定义函数*/
object UdfScalaDemo {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("local").appName("udf").getOrCreate()val list = List[String]("George","lucy","kk","lmdhk")import sparkSession.implicits._val frame = list.toDF("name")frame.createOrReplaceTempView("students")frame.show()/*** +------+* |  name|* +------+* |George|* |  lucy|* |    kk|* | lmdhk|* +------+*/sparkSession.udf.register("STRLEN",(n:String)=>{n.length})sparkSession.sql("select name,STRLEN(name) as length from students sort by length desc").show(100)/*** +------+------+* |  name|length|* +------+------+* |George|     6|* | lmdhk|     5|* |  lucy|     4|* |    kk|     2|* +------+------+*/sparkSession.stop()}
}


UDAF:用户自定义聚合函数。

  • 实现UDAF函数如果要自定义类要继承UserDefinedAggregateFunction类

javaAPI:

package com.udf;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;import java.util.ArrayList;
import java.util.Arrays;/*** @author George* @description*用户自定义聚合函数。*实现UDAF函数如果要自定义类要继承UserDefinedAggregateFunction类**/
public class Udaf {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("udaf");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("George", "kangkang", "GeorgeDage", "limu","George","GeorgeDage"));JavaRDD<Row> map = parallelize.map(new Function<String, Row>() {public Row call(String v1) throws Exception {return RowFactory.create(v1);}});ArrayList<StructField> fields = new ArrayList<StructField>();fields.add(DataTypes.createStructField("name",DataTypes.StringType,true));StructType schema = DataTypes.createStructType(fields);Dataset<Row> frame = sqlContext.createDataFrame(map, schema);frame.show();/*** +----------+* |      name|* +----------+* |    George|* |  kangkang|* |GeorgeDage|* |      limu|* +----------+*/frame.registerTempTable("user");/*** 注册一个UDAF函数,实现统计相同值得个数* 注意:这里可以自定义一个类继承UserDefinedAggregateFunction类也是可以的*/sqlContext.udf().register("StringCount", new UserDefinedAggregateFunction() {/*** 指定输入字段的字段及类型*/@Overridepublic StructType inputSchema() {return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name",DataTypes.StringType, true)));}@Overridepublic DataType dataType() {return DataTypes.IntegerType;}@Overridepublic boolean deterministic() {return true;}/*** 更新 可以认为一个一个地将组内的字段值传递进来 实现拼接的逻辑* buffer.getInt(0)获取的是上一次聚合后的值* 相当于map端的combiner,combiner就是对每一个map task的处理结果进行一次小聚合* 大聚和发生在reduce端.* 这里即是:在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算*/@Overridepublic void update(MutableAggregationBuffer buffer, Row input) {buffer.update(0,buffer.getInt(0)+1);}@Overridepublic StructType bufferSchema() {return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bf", DataTypes.IntegerType, true)));}/*** 合并 update操作,可能是针对一个分组内的部分数据,在某个节点上发生的 但是可能一个分组内的数据,会分布在多个节点上处理* 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来* buffer1.getInt(0) : 大聚和的时候 上一次聚合后的值* buffer2.getInt(0) : 这次计算传入进来的update的结果* 这里即是:最后在分布式节点完成后需要进行全局级别的Merge操作*/@Overridepublic void merge(MutableAggregationBuffer buffer1, Row buffer2) {buffer1.update(0,buffer1.getInt(0) + buffer2.getInt(0));}/*** 初始化一个内部的自己定义的值,在Aggregate之前每组数据的初始化结果*/@Overridepublic void initialize(MutableAggregationBuffer buffer) {buffer.update(0, 0);}/*** 最后返回一个和DataType的类型要一致的类型,返回UDAF最后的计算结果*/@Overridepublic Object evaluate(Row buffer) {return buffer.getInt(0);}});sqlContext.sql("select name ,StringCount(name) from user group by name").show();/*** +----------+------+* |      name|(name)|* +----------+------+* |      limu|     1|* |    George|     2|* |GeorgeDage|     2|* |  kangkang|     1|* +----------+------+*/sc.stop();}
}

scalaAPI:

package com.udfimport org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._class MyUDAF extends UserDefinedAggregateFunction{// 聚合操作时,所处理的数据的类型def bufferSchema: StructType = {DataTypes.createStructType(Array(DataTypes.createStructField("aaa",IntegerType, true)))}// 最终函数返回值的类型def dataType: DataType = {DataTypes.IntegerType}def deterministic: Boolean = {true}// 最后返回一个最终的聚合值     要和dataType的类型一一对应def evaluate(buffer: Row): Any = {buffer.getAs[Int](0)}// 为每个分组的数据执行初始化值def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0}//输入数据的类型def inputSchema: StructType = {DataTypes.createStructType(Array(DataTypes.createStructField("input", StringType, true)))}// 最后merger的时候,在各个节点上的聚合值,要进行merge,也就是合并def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getAs[Int](0)+buffer2.getAs[Int](0)}// 每个组,有新的值进来的时候,进行分组对应的聚合值的计算def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer(0) = buffer.getAs[Int](0)+1}
}
package com.udfimport org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.UserDefinedAggregateFunctionobject UdafScalaDemo {def main(args: Array[String]): Unit = {val session = SparkSession.builder().master("local").appName("udaf").getOrCreate()val list = List[String]("George","lucy","kk","lmdhk","kk")import session.implicits._val frame = list.toDF("name")frame.createOrReplaceTempView("students")/*** 注册UDAF函数*/session.udf.register("NAMECOUNT",new MyUDAF())session.sql("select name,NAMECOUNT(name) as count from students group by name").show(100)/*** +------+-----+* |  name|count|* +------+-----+* |  lucy|    1|* |    kk|    2|* |George|    1|* | lmdhk|    1|* +------+-----+*/session.stop()}
}

图解UDAF:

Spark _27_自定义函数UDF和UDAF相关推荐

  1. Hive自定义函数UDF、UDAF、UDTF

    0.依赖 <dependencies><!--添加hive依赖--><dependency><groupId>org.apache.hive</g ...

  2. spark SQL自定义函数:

    spark SQL 自定义函数: 自定义函数: 第一种:  U D F  (用户自定义函数)函数 特点:  一对一的关系,输入一个值以后输出一个值  (一进一出) 大部分的内置函数都是U D F函数 ...

  3. 【Flink】Flink Table SQL 用户自定义函数: UDF、UDAF、UDTF

    本文总结Flink Table & SQL中的用户自定义函数: UDF.UDAF.UDTF. UDF: 自定义标量函数(User Defined Scalar Function).一行输入一行 ...

  4. T-SQL: 17 个与日期时间相关的自定义函数(UDF),周日作为周的最后一天,均不受 @@DateFirst、语言版本影响...

    CSDN 的 Blog 太滥了!无时不刻地在坏! 开始抢救性搬家 ... ... 到这里重建家园 /* T-SQL: 17 个与日期时间相关的自定义函数(UDF),周日作为周的最后一天,均不受 @@D ...

  5. hive 元数据 自定义_如何在Hive中创建自定义函数UDF及如何直接通过Impala的同步元数据重用UDF的jar文件-阿里云开发者社区...

    如何在Hive中创建自定义函数UDF及使用 如何在Impala中使用Hive的自定义函数 UDF函数开发 使用Intellij工具开发Hive的UDF函数,进行编译: 1.使用Intellij工具通过 ...

  6. T-SQL: 17 个与日期时间相关的自定义函数(UDF),周日作为周的最后一天,均不受 @@DateFirst、语言版本影响!...

    原文:T-SQL: 17 个与日期时间相关的自定义函数(UDF),周日作为周的最后一天,均不受 @@DateFirst.语言版本影响! CSDN 的 Blog 太滥了!无时不刻地在坏! 开始抢救性搬家 ...

  7. 案例解析丨Spark Hive自定义函数应用

    摘要:Spark目前支持UDF,UDTF,UDAF三种类型的自定义函数. 1. 简介 Spark目前支持UDF,UDTF,UDAF三种类型的自定义函数.UDF使用场景:输入一行,返回一个结果,一对一, ...

  8. Spark SQL自定义函数_第五章

    1.自定义函数分类 类似于hive当中的自定义函数, spark同样可以使用自定义函数来实现新的功能. spark中的自定义函数有如下3类 1.UDF(User-Defined-Function) 输 ...

  9. Spark SQL自定义函数

    文章目录 自定义函数分类 自定义UDF 自定义UDAF[了解] 自定义函数分类 类似于hive当中的自定义函数, spark同样可以使用自定义函数来实现新的功能. spark中的自定义函数有如下3类 ...

最新文章

  1. SAP WM初阶之LX09查询TR List
  2. Linux查看文件夹大小的相关命令
  3. 1112 Stucked Keyboard (20 分)【难度: 一般 / 知识点: 模拟】
  4. C#中的变量、常量、数据类型
  5. Java中的ArrayList的初始容量和容量分配
  6. 长春工业大学计算机科学与技术录取分数,2021年长春工业大学各省各专业最低投档录取分数线统计(文科 理科)...
  7. 百度小程序全套源码下载、免费分享,一键生成百度小程序
  8. java中文件处理之图片_Java中的文件处理
  9. 什么是Prettier?
  10. 可爱女人,等你下课——CDN美人串烧
  11. 【记录】qt.qpa.screen: Could not connect to any X display 解决方案
  12. 富文本编辑器:editor.md
  13. 【翻译】200行代码讲透RUST FUTURES (7)
  14. 对你快速了解恶意软件以及病毒和反病毒
  15. 选型笔记之二极管选型
  16. CMNET和CMWAP简单区别
  17. 信息学奥赛一本通——1004:字符三角形
  18. Sun工作站Solaris系统设置双屏显示
  19. Python+Opencv实现图像匹配——模板匹配
  20. 如何实现JSP网页模板 JSP网页母版

热门文章

  1. 2020ICPC沈阳 - United in Stormwind(推公式+FWT+SOSdp)
  2. CodeForces - 1076D Edge Deletion(最短路+贪心/最短路树+bfs)
  3. CodeForces - 444C DZY Loves Colors(线段树+剪枝)
  4. 云计算具有什么平台_究竟什么是云计算?
  5. 机器学习-Stacking方法的原理及实现
  6. bootstrap php zend,Zend Framework教程之Bootstrap类用法概述
  7. PAT ---- 1029. 旧键盘(20)
  8. HDU4357(数学思维题)
  9. 给Source Insight做个外挂系列之一--发现Source Insight
  10. 秒杀多线程第八篇 经典线程同步 信号量Semaphore