spark中自定义UDAF函数实现的两种方式---UserDefinedAggregateFunction和Aggregator
1.基于UserDefinedAggregateFunction实现平均数的计算
package com.bigdata.wb.sparkimport org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._object SparkUDAF extends UserDefinedAggregateFunction{//输入数据类型override def inputSchema: StructType = StructType(StructField("input", LongType)::Nil)//缓冲区数据类型override def bufferSchema: StructType = StructType(StructField("sum", LongType)::StructField("count", LongType)::Nil)//聚合之后输出数据类型override def dataType: DataType = DoubleType//相同输入是否总能得到相同输出override def deterministic: Boolean = true//初始化缓冲区override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0Lbuffer(1) = 0L}//给聚合函数传入一条数据进行处理override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer(0) = buffer.getLong(0) + input.getLong(0)buffer(1) = buffer.getLong(1) + 1buffer.update(0, buffer(0))buffer.update(1, buffer(1))}//合并聚合函数缓冲区override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)}//计算最终结果override def evaluate(buffer: Row): Any = buffer.getLong(0).toDouble / buffer.getLong(1)
}
2.基于Aggregator实现平均数的计算
package com.bigdata.wb.sparkimport org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoders, SparkSession, TypedColumn}
import org.apache.spark.sql.expressions.{Aggregator, UserDefinedAggregateFunction}/*** @ author spencer* @ date 2020/7/14 13:46* * spark中UDAF中统计平均数*/case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)object SparkUDAFDemo02{def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SparkSQLDemo").setMaster("local[*]")val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()val employeeRDD = spark.read.json("file:///D:\\spark-2.3.0-bin-hadoop2.7\\examples\\src\\main\\resources\\employees.json")import spark.implicits._val employeeDF = employeeRDD.as[Employee]//第一种方式调用udaf,spark3.0.0官网使用的这种方式val averageSalary = MyUDAF.toColumn.name("average_salary")employeeDF.select(averageSalary).show()//第二种方式调用udafemployeeDF.createOrReplaceTempView("employee")spark.udf.register("averageSalary", SparkUDAF)spark.sql("select averageSalary(salary) avg_salary from employee").show()spark.stop()}
}/*** spark3.0.0中自定义UDAF继承Aggregator实现*/
object MyUDAF extends Aggregator[Employee, Average, Double]{override def zero = Average(0L, 0L)override def reduce(buffer: Average, employee: Employee) = {buffer.sum += employee.salarybuffer.count += 1buffer}override def merge(buffer1: Average, buffer2: Average) = {buffer1.sum += buffer2.sumbuffer1.count += buffer2.countbuffer1}override def finish(reduction: Average) = reduction.sum / reduction.countoverride def bufferEncoder = Encoders.productoverride def outputEncoder = Encoders.scalaDouble
}
结果如下:
+--------------+
|average_salary|
+--------------+
| 3750.0|
+--------------++----------+
|avg_salary|
+----------+
| 3750.0|
+----------+
spark中自定义UDAF函数实现的两种方式---UserDefinedAggregateFunction和Aggregator相关推荐
- Spark中RDD转换成DataFrame的两种方式(分别用Java和scala实现)
一:准备数据源 在项目下新建一个student.txt文件,里面的内容为: [plain] view plain copy print? <code class=&q ...
- python循环展示大写字母_python调用大写函数python中字典的循环遍历的两种方式
开发中经常会用到对于字典.列表等数据的循环遍历,但是python中对于字典的遍历对于很多初学者来讲非常陌生,今天就来讲一下python中字典的循环遍历的两种方式. 注意: python2和python ...
- C++类中成员变量的初始化有两种方式
C++类中成员变量的初始化有两种方式: 构造函数初始化列表和构造函数体内赋值.下面看看两种方式有何不同. 成员变量初始化的顺序是按照在那种定义的顺序. 1.内部数据类型(char,int--指针等) ...
- 【❗划重点!C语言函数参数传递只有两种方式(值传递,地址传递),不支持“引用传递”!❗】
引子 上篇文章<C语言函数传参の结构体数组篇>提到了C语言的函数参数传递方式,百度了一一一大圈,有说两种的,也有说三种的,简直把我搞晕了,"值传递和地址传递"是毫无疑问 ...
- shell 不等于_关于shell编程中的整数值比较的两种方式的简单操作实例
谈一谈关于shell编程中的整数值比较的两种方式 Shell编程有时处理一个对象时,需要我们对对象进行测试. 只有符合要求的才采取下一步操作,这样做的好处可以避免程序出错. 这个测试的对象可以是文件. ...
- c语言返回二叉树的大小,C语言中计算二叉树的宽度的两种方式
C语言中计算二叉树的宽度的两种方式 二叉树作为一种很特殊的数据结构,功能上有很大的作用!今天就来看看怎么计算一个二叉树的最大的宽度吧. 采用递归方式 下面是代码内容: int GetMaxWidth( ...
- python中字典的循环遍历的两种方式
开发中经常会用到对于字典.列表等数据的循环遍历,但是python中对于字典的遍历对于很多初学者来讲非常陌生,今天就来讲一下python中字典的循环遍历的两种方式. 注意: python2和python ...
- C语言中存储多个字符串的两种方式
C语言中存储多个字符串的两种方式 方式一 二维字符串数组 声明: char name[4][10] = { "Justinian", "Momo", " ...
- android数据库侵入,Android中实现侵入式状态栏的两种方式
最近对"爸比讲故事"Android版本进行代码重构的时候,对之前版本的大部分界面的头部侵入式效果,作了一个总结和梳理,在期间查阅了thinkcool的博客和结合亲身实践,总结了2种 ...
最新文章
- 大厂项目是如何死掉的?太过真实!
- Block介绍(二)内存管理与其他特性
- Alphabet股价周五跌5.32%:三年最大单日跌幅
- 算法训练 最大的算式
- 《研磨设计模式》读后感一
- 实时音视频助力在线教育风口
- git fatal:HttpRequestException encountered
- android集成测试工具,android – 集成测试和Cucumber测试
- 令人窒息的数学动态图
- Dubbo源码分析:ProxyFactory
- 7-6 统计素数并求和 (20分)_托福、SAT最新考试报告:中国学生托福平均分81分;...
- AccessibilityService的具体应用场景
- AI大牛发起神秘字母接龙,起因竟然是……
- 关于Linux消息队列的简单说明、使用、编码
- linux卸载分区命令,CentOS删除磁盘分区命令
- python出现unexpected indent_Python、unexpected indent错误解决方法
- MSSQL 负载均衡(Moebius)
- git commit时的几种指令
- 操作既简单、实用性强的文件批量改名高手
- 小米手机v3.exo 合并_eXo发布Web Content Management 2.0作为开源