Spark UDAF用户自定义聚合函数
文章目录
- 处理流程
- 弱类型
- 强类型
UDAF的特点就是:N:1,目的就是为了做聚合(group by)
处理流程
首先准备好数据源:
这里我们人为的将其分为2个分区:
按照group by字段进行分组:
在每一个partition内的每一个分组内,按照目标字段(以age为例)操作,执行update方法
group by字段相同的为一组,拉取数据:
最后分别针对group by分成的几组,执行merge操作!
弱类型
/*** 弱类型自定义UDAF*/class MyAvgUDAF extends UserDefinedAggregateFunction{/*** 输入列的类型:age:Int*/override def inputSchema: StructType = {val fields: Array[StructField] = Array(StructField("input name", IntegerType))StructType(fields)}/*** 临时变量的类型:sum和count*/override def bufferSchema: StructType = {val fields: Array[StructField] = Array(StructField("sum", LongType),StructField("count", LongType))StructType(fields)}/*** 返回值类型*/override def dataType: DataType = DoubleType/*** 一致性:同样的输入,是否返回同样的结果*/override def deterministic: Boolean = true/*** 中间变量的值:sum=0,count=0*/override def initialize(buffer: MutableAggregationBuffer): Unit = {//初始化sum的值为0buffer(0)=0L//初始化count的值为0buffer(1)=0L}/*** 累加,类似Combiner,在每个task内执行*/override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {//sum初始值val sum: Long = buffer.getAs[Long](0)//count初始值val count: Long = buffer.getAs[Long](1)//取出目标:ageval age: Int = input.getAs[Int](0)//更新sumbuffer.update(0,sum+age)//更新countbuffer.update(1,count+1)}/*** 合并所有task中,改分组的所有sum和count,类似Reducer*/override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {//取出之前统计的sumval sum: Long = buffer1.getAs[Long](0)//取出之前统计的countval count: Long = buffer1.getAs[Long](1)//当前task统计的sumval taskSum: Long = buffer2.getAs[Long](0)//当前task统计的countval taskCount: Long = buffer2.getAs[Long](1)//merge合并buffer1.update(0,sum+taskSum)buffer1.update(1,count+taskCount)}/*** 计算得到最终结果*/override def evaluate(buffer: Row): Any = {//最终sumval sum: Long = buffer.getAs[Long](0)//最终countval count: Long = buffer.getAs[Long](1)sum.toDouble/count}
}
调用:
val list = List(("lisi",20,"man"),("wangwu",30,"woman"),("zhaoliu",12,"man"),("lilei",40,"woman"),("hanmeimei11",30,"woman"),("hanmeimei22",80,"woman"),("hanmeimei33",90,"man"),("hanmeimei44",100,"man"))val rdd: RDD[(String,Int,String)] = spark.sparkContext.parallelize(list,2)rdd.mapPartitionsWithIndex((index,iter)=>{println(s"index:${index} data:${iter.toList}")iter}).collect()import spark.implicits._val df: DataFrame = rdd.toDF("name","age","sex")//注册成表df.createOrReplaceTempView("emp")//创建自定义UDAFval myavg = new MyAvgUDAF//注册spark.udf.register("myavg",myavg)//执行spark.sql("""|select sex,myavg(age) avg_age|from emp|group by sex""".stripMargin).show()
强类型
强类型更简洁方便
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator/*** 强类型自定义UDAF*///样例类,作为中间变量:sum、count
case class AvgAgeBuff(var sum:Long,var count:Long)/*** Aggregator[IN,BUFF,OUT]* IN: 函数参数类型* BUFF: 计算的中间变量类型* OUT: 函数的最终结果类型*/
class MyAvgAggregator extends Aggregator[Int,AvgAgeBuff,Double]{override def zero: AvgAgeBuff = AvgAgeBuff(0L,0L)/*** 针对每个partition的每个组,进行合并*/override def reduce(b: AvgAgeBuff, age: Int): AvgAgeBuff = {//原来的sum + 现在的age//原来的count + 1AvgAgeBuff(b.sum+age,b.count+1)}/*** 合并相同组*/override def merge(b1: AvgAgeBuff, b2: AvgAgeBuff): AvgAgeBuff = {AvgAgeBuff(b1.sum+b2.sum,b1.count+b2.count)}/*** 最终结果*/override def finish(reduction: AvgAgeBuff): Double = reduction.sum.toDouble/reduction.count/*** 指定中间变量的编码方式*/override def bufferEncoder: Encoder[AvgAgeBuff] = Encoders.product[AvgAgeBuff]/*** 指定最终结果的编码方式*/override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
调用:
val list = List(("lisi",20,"man"),("wangwu",30,"woman"),("zhaoliu",12,"man"),("lilei",40,"woman"),("hanmeimei11",30,"woman"),("hanmeimei22",80,"woman"),("hanmeimei33",90,"man"),("hanmeimei44",100,"man"))val rdd: RDD[(String,Int,String)] = spark.sparkContext.parallelize(list,2)rdd.mapPartitionsWithIndex((index,iter)=>{println(s"index:${index} data:${iter.toList}")iter}).collect()import spark.implicits._val df: DataFrame = rdd.toDF("name","age","sex")//注册成表df.createOrReplaceTempView("emp")//创建自定义UDAFval aggregator = new MyAvgAggregatorimport org.apache.spark.sql.functions._//转换udaf对象val func: Any = udaf(aggregator)//注册spark.udf.register("myavg",func)spark.sql("""|select sex,myavg(age) avg_age|from emp|group by sex""".stripMargin).show()
Spark UDAF用户自定义聚合函数相关推荐
- Spark 2.3.0 用户自定义聚合函数UserDefinedAggregateFunction和Aggregator
Spark 2.3.0 用户自定义聚合函数UserDefinedAggregateFunction和Aggregator 一.无类型的用户自定于聚合函数(Untyped User-Defined Ag ...
- 【极简spark教程】spark聚合函数
聚合函数分为两类,一种是spark内置的常用聚合函数,一种是用户自定义聚合函数 UDAF 不带类型的UDAF[较常用] 继承UserDefinedAggregateFunction 定义输入数据的sc ...
- Spark SQL操作之-函数汇总篇-下
Spark SQL操作之-自定义函数篇-下 环境说明 自定义函数分类 用户自定义函数(UDF) 用户自定义聚合函数(UDAF) 环境说明 1. JDK 1.8 2. Spark 2.1 自定义函数分类 ...
- Hive学习---4、函数(单行函数、高级聚合函数、炸裂函数、窗口函数)
1.函数 1.1 函数简介 Hive会将常用的逻辑封装成函数给用户进行使用,类似java中的函数. 好处:避免用户反复写逻辑,可以直接拿来使用 重点:用户需要知道函数叫什么,能做什么 Hive提供了大 ...
- 【大数据】Presto开发自定义聚合函数
Presto 在交互式查询任务中担当着重要的职责.随着越来越多的人开始使用 SQL 在 Presto 上分析数据,我们发现需要将一些业务逻辑开发成类似 Hive 中的 UDF,提高 SQL 使用人员的 ...
- Spark 2.2.1 SQL UDAF用户自定义函数案例
Spark 2.2.1 SQL UDAF用户自定义函数案例 UDAF:User Defined Aggregation Function,用户自定义的聚合函数,函数本身作用于数据集合,能够在聚合操作的 ...
- Spark:group by和聚合函数使用
groupBy分组和使用agg聚合函数demo: df.show +----+-----+---+ |YEAR|MONTH|NUM| +----+-----+---+ |2017| 1| 10| |2 ...
- Spark踩坑填坑-聚合函数-序列化异常
Spark踩坑填坑-聚合函数-序列化异常 一.Spark聚合函数特殊场景 二.spark sql group by 三.Spark Caused by: java.io.NotSerializable ...
- SparkSQL自定义AVG强类型聚合函数与弱类型聚合函数汇总
AVG是求平均值,所以输出类型是Double类型 1)创建弱类型聚合函数类extends UserDefinedAggregateFunction class MyAgeFunction extend ...
最新文章
- 零售业将成人工智能系统支出额最高的行业
- 开发中使用UEditor编辑器的注意事项
- css3动画-animation
- 5G时代 我国在通信技术领域弯道超车指日可待
- zuul默认的路由规则及禁用路由规则
- jzoj4802-[GDOI2017模拟9.24]探险计划【费用流,拆点】
- Qt关于Tcp通信步骤的总结
- 【前端】Github Pages 与域名关联简明教程
- BZOJ2241 [SDOI2011]打地鼠 【模拟】
- UVA1292-----Strategic game-----树形DP解决树上的最小点覆盖问题
- wifi连接过程抓包
- Sketch54 for mac汉化破解版
- Ubuntu系统清理瘦身
- 什么是Express框架
- 365抽奖软件 v6.1.7
- 【coq】函数语言设计 笔记 01 - basics
- linux iio设备
- AXI4协议学习:架构、信号定义、工作时序和握手机制
- 火狐浏览器如何导入和导出书签收藏夹
- SwinUNet2022
热门文章
- mongo explain分析详解
- 网络报错:“The connection is not for this device.”
- 微信内置浏览器的JsAPI(WeixinJSBridge续)[转载]
- Zabbix2.4安装和问题总结(一)
- Mysql找回管理员password
- 【HTML】DAY 4
- 开发js插件之所遇--02[DOM]
- JavaScript 对象中this的指向问题
- 全局变量在主函数调用过程中被中断修改的问题
- EasyNVR摄像机网页直播中,推流组件EasyRTMP推送RTMP扩展支持HEVC(H.265)的方案