文章目录

  • 处理流程
  • 弱类型
  • 强类型

UDAF的特点就是:N:1,目的就是为了做聚合(group by)

处理流程

首先准备好数据源:

这里我们人为的将其分为2个分区:


按照group by字段进行分组:

在每一个partition内的每一个分组内,按照目标字段(以age为例)操作,执行update方法


group by字段相同的为一组,拉取数据:


最后分别针对group by分成的几组,执行merge操作!

弱类型

/*** 弱类型自定义UDAF*/class MyAvgUDAF extends  UserDefinedAggregateFunction{/*** 输入列的类型:age:Int*/override def inputSchema: StructType = {val fields: Array[StructField] = Array(StructField("input name", IntegerType))StructType(fields)}/*** 临时变量的类型:sum和count*/override def bufferSchema: StructType = {val fields: Array[StructField] = Array(StructField("sum", LongType),StructField("count", LongType))StructType(fields)}/*** 返回值类型*/override def dataType: DataType = DoubleType/*** 一致性:同样的输入,是否返回同样的结果*/override def deterministic: Boolean = true/*** 中间变量的值:sum=0,count=0*/override def initialize(buffer: MutableAggregationBuffer): Unit = {//初始化sum的值为0buffer(0)=0L//初始化count的值为0buffer(1)=0L}/*** 累加,类似Combiner,在每个task内执行*/override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {//sum初始值val sum: Long = buffer.getAs[Long](0)//count初始值val count: Long = buffer.getAs[Long](1)//取出目标:ageval age: Int = input.getAs[Int](0)//更新sumbuffer.update(0,sum+age)//更新countbuffer.update(1,count+1)}/*** 合并所有task中,改分组的所有sum和count,类似Reducer*/override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {//取出之前统计的sumval sum: Long = buffer1.getAs[Long](0)//取出之前统计的countval count: Long = buffer1.getAs[Long](1)//当前task统计的sumval taskSum: Long = buffer2.getAs[Long](0)//当前task统计的countval taskCount: Long = buffer2.getAs[Long](1)//merge合并buffer1.update(0,sum+taskSum)buffer1.update(1,count+taskCount)}/*** 计算得到最终结果*/override def evaluate(buffer: Row): Any = {//最终sumval sum: Long = buffer.getAs[Long](0)//最终countval count: Long = buffer.getAs[Long](1)sum.toDouble/count}
}

调用:

    val list = List(("lisi",20,"man"),("wangwu",30,"woman"),("zhaoliu",12,"man"),("lilei",40,"woman"),("hanmeimei11",30,"woman"),("hanmeimei22",80,"woman"),("hanmeimei33",90,"man"),("hanmeimei44",100,"man"))val rdd: RDD[(String,Int,String)] = spark.sparkContext.parallelize(list,2)rdd.mapPartitionsWithIndex((index,iter)=>{println(s"index:${index}    data:${iter.toList}")iter}).collect()import spark.implicits._val df: DataFrame = rdd.toDF("name","age","sex")//注册成表df.createOrReplaceTempView("emp")//创建自定义UDAFval myavg = new MyAvgUDAF//注册spark.udf.register("myavg",myavg)//执行spark.sql("""|select  sex,myavg(age) avg_age|from emp|group by sex""".stripMargin).show()

强类型

强类型更简洁方便

import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator/*** 强类型自定义UDAF*///样例类,作为中间变量:sum、count
case class AvgAgeBuff(var sum:Long,var count:Long)/*** Aggregator[IN,BUFF,OUT]*   IN: 函数参数类型*   BUFF: 计算的中间变量类型*   OUT: 函数的最终结果类型*/
class MyAvgAggregator extends Aggregator[Int,AvgAgeBuff,Double]{override def zero: AvgAgeBuff = AvgAgeBuff(0L,0L)/*** 针对每个partition的每个组,进行合并*/override def reduce(b: AvgAgeBuff, age: Int): AvgAgeBuff = {//原来的sum + 现在的age//原来的count + 1AvgAgeBuff(b.sum+age,b.count+1)}/*** 合并相同组*/override def merge(b1: AvgAgeBuff, b2: AvgAgeBuff): AvgAgeBuff = {AvgAgeBuff(b1.sum+b2.sum,b1.count+b2.count)}/*** 最终结果*/override def finish(reduction: AvgAgeBuff): Double = reduction.sum.toDouble/reduction.count/*** 指定中间变量的编码方式*/override def bufferEncoder: Encoder[AvgAgeBuff] = Encoders.product[AvgAgeBuff]/*** 指定最终结果的编码方式*/override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

调用:

  val list = List(("lisi",20,"man"),("wangwu",30,"woman"),("zhaoliu",12,"man"),("lilei",40,"woman"),("hanmeimei11",30,"woman"),("hanmeimei22",80,"woman"),("hanmeimei33",90,"man"),("hanmeimei44",100,"man"))val rdd: RDD[(String,Int,String)] = spark.sparkContext.parallelize(list,2)rdd.mapPartitionsWithIndex((index,iter)=>{println(s"index:${index}    data:${iter.toList}")iter}).collect()import spark.implicits._val df: DataFrame = rdd.toDF("name","age","sex")//注册成表df.createOrReplaceTempView("emp")//创建自定义UDAFval aggregator = new MyAvgAggregatorimport org.apache.spark.sql.functions._//转换udaf对象val func: Any = udaf(aggregator)//注册spark.udf.register("myavg",func)spark.sql("""|select  sex,myavg(age) avg_age|from emp|group by sex""".stripMargin).show()

Spark UDAF用户自定义聚合函数相关推荐

  1. Spark 2.3.0 用户自定义聚合函数UserDefinedAggregateFunction和Aggregator

    Spark 2.3.0 用户自定义聚合函数UserDefinedAggregateFunction和Aggregator 一.无类型的用户自定于聚合函数(Untyped User-Defined Ag ...

  2. 【极简spark教程】spark聚合函数

    聚合函数分为两类,一种是spark内置的常用聚合函数,一种是用户自定义聚合函数 UDAF 不带类型的UDAF[较常用] 继承UserDefinedAggregateFunction 定义输入数据的sc ...

  3. Spark SQL操作之-函数汇总篇-下

    Spark SQL操作之-自定义函数篇-下 环境说明 自定义函数分类 用户自定义函数(UDF) 用户自定义聚合函数(UDAF) 环境说明 1. JDK 1.8 2. Spark 2.1 自定义函数分类 ...

  4. Hive学习---4、函数(单行函数、高级聚合函数、炸裂函数、窗口函数)

    1.函数 1.1 函数简介 Hive会将常用的逻辑封装成函数给用户进行使用,类似java中的函数. 好处:避免用户反复写逻辑,可以直接拿来使用 重点:用户需要知道函数叫什么,能做什么 Hive提供了大 ...

  5. 【大数据】Presto开发自定义聚合函数

    Presto 在交互式查询任务中担当着重要的职责.随着越来越多的人开始使用 SQL 在 Presto 上分析数据,我们发现需要将一些业务逻辑开发成类似 Hive 中的 UDF,提高 SQL 使用人员的 ...

  6. Spark 2.2.1 SQL UDAF用户自定义函数案例

    Spark 2.2.1 SQL UDAF用户自定义函数案例 UDAF:User Defined Aggregation Function,用户自定义的聚合函数,函数本身作用于数据集合,能够在聚合操作的 ...

  7. Spark:group by和聚合函数使用

    groupBy分组和使用agg聚合函数demo: df.show +----+-----+---+ |YEAR|MONTH|NUM| +----+-----+---+ |2017| 1| 10| |2 ...

  8. Spark踩坑填坑-聚合函数-序列化异常

    Spark踩坑填坑-聚合函数-序列化异常 一.Spark聚合函数特殊场景 二.spark sql group by 三.Spark Caused by: java.io.NotSerializable ...

  9. SparkSQL自定义AVG强类型聚合函数与弱类型聚合函数汇总

    AVG是求平均值,所以输出类型是Double类型 1)创建弱类型聚合函数类extends UserDefinedAggregateFunction class MyAgeFunction extend ...

最新文章

  1. 零售业将成人工智能系统支出额最高的行业
  2. 开发中使用UEditor编辑器的注意事项
  3. css3动画-animation
  4. 5G时代 我国在通信技术领域弯道超车指日可待
  5. zuul默认的路由规则及禁用路由规则
  6. jzoj4802-[GDOI2017模拟9.24]探险计划【费用流,拆点】
  7. Qt关于Tcp通信步骤的总结
  8. 【前端】Github Pages 与域名关联简明教程
  9. BZOJ2241 [SDOI2011]打地鼠 【模拟】
  10. UVA1292-----Strategic game-----树形DP解决树上的最小点覆盖问题
  11. wifi连接过程抓包
  12. Sketch54 for mac汉化破解版
  13. Ubuntu系统清理瘦身
  14. 什么是Express框架
  15. 365抽奖软件 v6.1.7
  16. 【coq】函数语言设计 笔记 01 - basics
  17. linux iio设备
  18. AXI4协议学习:架构、信号定义、工作时序和握手机制
  19. 火狐浏览器如何导入和导出书签收藏夹
  20. SwinUNet2022

热门文章

  1. mongo explain分析详解
  2. 网络报错:“The connection is not for this device.”
  3. 微信内置浏览器的JsAPI(WeixinJSBridge续)[转载]
  4. Zabbix2.4安装和问题总结(一)
  5. Mysql找回管理员password
  6. 【HTML】DAY 4
  7. 开发js插件之所遇--02[DOM]
  8. JavaScript 对象中this的指向问题
  9. 全局变量在主函数调用过程中被中断修改的问题
  10. EasyNVR摄像机网页直播中,推流组件EasyRTMP推送RTMP扩展支持HEVC(H.265)的方案