Spark_UDAF
2024-05-13 12:04:39
import org.apache.spark.SparkContext
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}/*** 自定义函数:* UDF:User- Defined Funcation;用户定义(普通)函数,只对单行数值产生作用; 一进一出* UDAF:User- Defined Aggregation Funcation;用户定义聚合函数,对多行数据产生作用(sum()、avg()...),多进一出* UDTF:User- Defined Table-Generating Functions;用户定义表生成函数,输入一行 输出多行,一进多出**/// 多进一出
object UDAF {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().appName("UDAF").master("local[*]").getOrCreate()val sc: SparkContext = sparkSession.sparkContextimport sparkSession.implicits._val students: Seq[Student] = Seq(Student(1, "zhangsan", "F", 22),Student(2, "lisi", "M", 38),Student(3, "wangwu", "M", 13),Student(4, "zhaoliu", "F", 17),Student(5, "songba", "M", 32),Student(6, "sunjiu", "M", 16),Student(7, "qianshiyi", "F", 17),Student(8, "yinshier", "F", 15),Student(9, "fangshisan", "M", 12),Student(10, "yeshisan", "F", 11),Student(11, "ruishiyi", "F", 26),Student(12, "chenshier", "M", 28))// seq to df : 1. roDF 2.spark.createDataFrameval frame: DataFrame = sparkSession.createDataFrame(students)frame.printSchema()import org.apache.spark.sql.functions._sparkSession.udf.register("myAvg",new MyAgeAvgFunction)frame.createOrReplaceTempView("students")val resultDF: DataFrame = sparkSession.sql("select gender,myAvg(age) avgage from students group by gender")resultDF.printSchema()resultDF.show(false)}
}//自定义聚合函数 UDAF 继承UserDefinedAggregateFunction
class MyAgeAvgFunction extends UserDefinedAggregateFunction{//聚合函数的输出数据的数据结构override def inputSchema: StructType = {new StructType().add("age",LongType)}//在缓冲区内的数据结构//sum 用来记录 所有年龄值相加的总和 43 + 52 + 61 + 78 = 234 => sum//count 用来记录 相加各个的总和 1 + 1 + 1 + 1 = 4 => countoverride def bufferSchema: StructType = {new StructType().add("sum",LongType).add("count",LongType)}//定义当前函数返回值的类型 sum/count 得到 Double类型override def dataType: DataType = DoubleType//聚合函数幂等override def deterministic: Boolean = true//初始值override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0)=0L //记录 传入所有用户年龄相加的总和buffer(1)=0L //记录 传入所有用户年龄的个数}// 传入一条新数据后需要进入处理// 将Row() 对象中的值与buffer(0) 数据相加// buffer(1)数据个数加一override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer(0) = buffer.getLong(0) + input.getLong(0)buffer(1) = buffer.getLong(1) + 1}//合并 各分区内的数据//例如 p1(321,6) p2(128,2) p3(219,3)override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {//计算年龄相加的总和buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)//总人数buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)}//计算最终结果override def evaluate(buffer: Row): Any = {buffer.getLong(0)/buffer.getLong(1).toDouble}
}
Spark_UDAF相关推荐
- SparkSQL中UDAF案例分析
SparkSQL中UDAF案例分析 1.统计单词的个数 package com.bynear.spark_sql; import org.apache.spark.sql.Row; import or ...
最新文章
- 解析深度神经网络背后的数学原理!
- 《我也能做CTO之.程序员职业规划》新书出版推举
- 解决Github速度太慢的几种方案
- wxWidgets:wxTopLevelWindow类用法
- python在工厂中的应用_Python工厂方法
- Eigen 简明教程之如何从Python转到Eigen
- 51单片机课程设计—温度报警系统(包含实验报告,仿真图)
- ccccccccccccccccccccc
- 九橡项目工时管理系统部署笔记
- 总结命令行05:Kafka
- 基于Leaflet 的Web地图客户端应用程序开发框架
- 世界杯为战斗民族的历史再添荣耀与光辉_数字体验_新浪博客
- Pomelo MMORPG
- 常见 SQL 面试题:经典 50 例
- Spring Boot Post接口数据加解密
- Google GSON GsonBuilder().setDateFormat(yyyy-MM-dd HH:mm:ss)不能格式化Data
- BRAT的安装、配置、标注操作
- 【牛客网】 小白月赛16 D小阳买水果
- (批处理)使用bat如何取出文件夹里子文件夹下的文件?
- 分享我的公众号留言功能迁移成功的故事
热门文章
- 4.UiCollection API 详细介绍
- 解题报告: LeetCode Max Points on a Line
- vim打开出现的文档^M什么
- java 基本类型共享_Java基础数据类型
- 7-5 考试座位号 (15 分)
- VS2019-C++警告-C6385读取数据无效
- PAT乙级 1013 数素数
- sklearn查看数据
- 教师计算机应用研讨交流,计算机应用技术专业技能比赛研讨交流活动在济南信息工程学校举行...
- oracle rlw,Oracle数据库终于支持R语言 发力数据挖掘