1.基于UserDefinedAggregateFunction实现平均数的计算

package com.bigdata.wb.sparkimport org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._object SparkUDAF extends UserDefinedAggregateFunction{//输入数据类型override def inputSchema: StructType = StructType(StructField("input", LongType)::Nil)//缓冲区数据类型override def bufferSchema: StructType = StructType(StructField("sum", LongType)::StructField("count", LongType)::Nil)//聚合之后输出数据类型override def dataType: DataType = DoubleType//相同输入是否总能得到相同输出override def deterministic: Boolean = true//初始化缓冲区override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0Lbuffer(1) = 0L}//给聚合函数传入一条数据进行处理override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer(0) = buffer.getLong(0) + input.getLong(0)buffer(1) = buffer.getLong(1) + 1buffer.update(0, buffer(0))buffer.update(1, buffer(1))}//合并聚合函数缓冲区override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)}//计算最终结果override def evaluate(buffer: Row): Any = buffer.getLong(0).toDouble / buffer.getLong(1)
}

2.基于Aggregator实现平均数的计算

package com.bigdata.wb.sparkimport org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoders, SparkSession, TypedColumn}
import org.apache.spark.sql.expressions.{Aggregator, UserDefinedAggregateFunction}/*** @ author spencer* @ date 2020/7/14 13:46* * spark中UDAF中统计平均数*/case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)object SparkUDAFDemo02{def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SparkSQLDemo").setMaster("local[*]")val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()val employeeRDD = spark.read.json("file:///D:\\spark-2.3.0-bin-hadoop2.7\\examples\\src\\main\\resources\\employees.json")import spark.implicits._val employeeDF = employeeRDD.as[Employee]//第一种方式调用udaf,spark3.0.0官网使用的这种方式val averageSalary = MyUDAF.toColumn.name("average_salary")employeeDF.select(averageSalary).show()//第二种方式调用udafemployeeDF.createOrReplaceTempView("employee")spark.udf.register("averageSalary", SparkUDAF)spark.sql("select averageSalary(salary) avg_salary from employee").show()spark.stop()}
}/*** spark3.0.0中自定义UDAF继承Aggregator实现*/
object MyUDAF extends Aggregator[Employee, Average, Double]{override def zero = Average(0L, 0L)override def reduce(buffer: Average, employee: Employee) = {buffer.sum += employee.salarybuffer.count += 1buffer}override def merge(buffer1: Average, buffer2: Average) = {buffer1.sum += buffer2.sumbuffer1.count += buffer2.countbuffer1}override def finish(reduction: Average) = reduction.sum / reduction.countoverride def bufferEncoder = Encoders.productoverride def outputEncoder = Encoders.scalaDouble
}

结果如下:

+--------------+
|average_salary|
+--------------+
|        3750.0|
+--------------++----------+
|avg_salary|
+----------+
|    3750.0|
+----------+

spark中自定义UDAF函数实现的两种方式---UserDefinedAggregateFunction和Aggregator相关推荐

  1. Spark中RDD转换成DataFrame的两种方式(分别用Java和scala实现)

    一:准备数据源       在项目下新建一个student.txt文件,里面的内容为:         [plain] view plain copy print? <code class=&q ...

  2. python循环展示大写字母_python调用大写函数python中字典的循环遍历的两种方式

    开发中经常会用到对于字典.列表等数据的循环遍历,但是python中对于字典的遍历对于很多初学者来讲非常陌生,今天就来讲一下python中字典的循环遍历的两种方式. 注意: python2和python ...

  3. C++类中成员变量的初始化有两种方式

    C++类中成员变量的初始化有两种方式: 构造函数初始化列表和构造函数体内赋值.下面看看两种方式有何不同. 成员变量初始化的顺序是按照在那种定义的顺序. 1.内部数据类型(char,int--指针等) ...

  4. 【❗划重点!C语言函数参数传递只有两种方式(值传递,地址传递),不支持“引用传递”!❗】

    引子 上篇文章<C语言函数传参の结构体数组篇>提到了C语言的函数参数传递方式,百度了一一一大圈,有说两种的,也有说三种的,简直把我搞晕了,"值传递和地址传递"是毫无疑问 ...

  5. shell 不等于_关于shell编程中的整数值比较的两种方式的简单操作实例

    谈一谈关于shell编程中的整数值比较的两种方式 Shell编程有时处理一个对象时,需要我们对对象进行测试. 只有符合要求的才采取下一步操作,这样做的好处可以避免程序出错. 这个测试的对象可以是文件. ...

  6. c语言返回二叉树的大小,C语言中计算二叉树的宽度的两种方式

    C语言中计算二叉树的宽度的两种方式 二叉树作为一种很特殊的数据结构,功能上有很大的作用!今天就来看看怎么计算一个二叉树的最大的宽度吧. 采用递归方式 下面是代码内容: int GetMaxWidth( ...

  7. python中字典的循环遍历的两种方式

    开发中经常会用到对于字典.列表等数据的循环遍历,但是python中对于字典的遍历对于很多初学者来讲非常陌生,今天就来讲一下python中字典的循环遍历的两种方式. 注意: python2和python ...

  8. C语言中存储多个字符串的两种方式

    C语言中存储多个字符串的两种方式 方式一 二维字符串数组 声明: char name[4][10] = { "Justinian", "Momo", " ...

  9. android数据库侵入,Android中实现侵入式状态栏的两种方式

    最近对"爸比讲故事"Android版本进行代码重构的时候,对之前版本的大部分界面的头部侵入式效果,作了一个总结和梳理,在期间查阅了thinkcool的博客和结合亲身实践,总结了2种 ...

最新文章

  1. 大厂项目是如何死掉的?太过真实!
  2. Block介绍(二)内存管理与其他特性
  3. Alphabet股价周五跌5.32%:三年最大单日跌幅
  4. 算法训练 最大的算式
  5. 《研磨设计模式》读后感一
  6. 实时音视频助力在线教育风口
  7. git fatal:HttpRequestException encountered
  8. android集成测试工具,android – 集成测试和Cucumber测试
  9. 令人窒息的数学动态图
  10. Dubbo源码分析:ProxyFactory
  11. 7-6 统计素数并求和 (20分)_托福、SAT最新考试报告:中国学生托福平均分81分;...
  12. AccessibilityService的具体应用场景
  13. AI大牛发起神秘字母接龙,起因竟然是……
  14. 关于Linux消息队列的简单说明、使用、编码
  15. linux卸载分区命令,CentOS删除磁盘分区命令
  16. python出现unexpected indent_Python、unexpected indent错误解决方法
  17. MSSQL 负载均衡(Moebius)
  18. git commit时的几种指令
  19. 操作既简单、实用性强的文件批量改名高手
  20. 小米手机v3.exo 合并_eXo发布Web Content Management 2.0作为开源

热门文章

  1. 实现了个类似blood brothers中的转轴特效
  2. go proxy 设置
  3. Angular2+/Angular7 + Swiper
  4. 字典序 java_java实现对map的字典序排序操作示例
  5. idea官网下载(版本自选)
  6. 我为什么不提倡过1024节呢?
  7. POJ - 1611 The Suspects
  8. ios马甲包上架(本人经历)
  9. 程序员如果到了30岁还一事无成,就别只想着琢磨技术了!
  10. 88.【员工管理系统-springBoot】