[Spark]-结构化数据查询之自定义UDAF
1.自定义弱类型UDAF
1.1 弱类型UDAF定义
弱类型UDAF继承实现 UserDefinedAggregateFunction 抽象类
override def inputSchema: StructType = 输入schema
override def bufferSchema: StructType = 聚合过程schema
override def dataType: DataType = 返回值类型
override def deterministic: Boolean = 是否固定返回值类型
override def initialize(buffer: MutableAggregationBuffer): Unit = 初始化函数,用来初始化基准值
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = 分区内元素如何聚合
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = 分区之间如何聚合
override def evaluate(buffer: Row): Any = 聚合结果计算
整个UDAF处理过程,非常类似RDD的aggregate算子
aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
一个自定义求平均数UDAF例子
object UDAFApp extends App{val spark = SparkSession.builder().master("local[2]").appName("UDAP-App").getOrCreate();import spark.implicits._;val df = spark.read.format("json").load("D:\\data\\employees.json")//UDAF函数注册 只有UserDefinedAggregateFunction才能为SQL注册函数spark.udf.register("cusAvg",MyAvgUDAF)//DF转临时视图df.createTempView("employees_view")spark.sql("select cusAvg(salary) as salary from employees_view").show();//df-api形式df.select(MyAvgUDAF.apply($"salary")).show()spark.close()}object MyAvgUDAF extends UserDefinedAggregateFunction{//输入schemaoverride def inputSchema: StructType = StructType(StructField("input",DoubleType)::Nil);//聚合过程schemaoverride def bufferSchema: StructType = StructType(StructField("Sum",DoubleType)::StructField("Count",LongType)::Nil)//返回值类型override def dataType: DataType = DoubleType//是否固定返回值类型override def deterministic: Boolean = true//初始化函数override def initialize(buffer: MutableAggregationBuffer): Unit = {//设定聚合基准初始值 aggregate算子((0,0))的部分buffer(0) = 0D; //总和0buffer(1) = 0L; //个数0 }override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {//行第一列(Row[0])是否为nullif(!input.isNullAt(0)){//aggregate算子....(seqOp: (U, T) => U 部分buffer(0)= buffer.getDouble(0)+ input.getDouble(0);buffer(1) =buffer.getLong(1)+1;}}override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {//aggregate算子....combOp: (U, U) => U 部分buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0);buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1);}override def evaluate(buffer: Row): Any = buffer.getDouble(0) /buffer.getLong(1) ;}
2.自定义强类型UDAF
自定义强类型UDAF 基础实现类 Aggregator
所以这种定义方式不能在UDF中注册,也不能用在SQL中
一个强类型UDAF定义如下:
object UDAFApp extends App{val spark = SparkSession.builder().master("local[2]").appName("UDAP-App").getOrCreate();import spark.implicits._;val ds = spark.read.format("json").load("D:\\data\\employees.json").as[Employee]//ds-api形式ds.select(MyAverage.toColumn.name("salary")).show()spark.close()}//目标类型定义case class Employee(val name: String,val salary: Long)//聚合类型定义case class Average(var sum: Long, var count: Long)object MyAverage extends Aggregator[Employee, Average, Double] {override def zero: Average = Average(0,0)override def reduce(b: Average, a: Employee): Average = {b.sum += a.salary;b.count += 1b}override def merge(b1: Average, b2: Average): Average = {b1.sum += b2.sum;b1.count += b2.count;b1;}override def finish(reduction: Average): Double = {println(reduction.sum + " "+ reduction.count)reduction.sum.toDouble/reduction.count}override def bufferEncoder: Encoder[Average] = Encoders.productoverride def outputEncoder: Encoder[Double] = Encoders.scalaDouble}
转载于:https://www.cnblogs.com/NightPxy/p/9269171.html
[Spark]-结构化数据查询之自定义UDAF相关推荐
- SQL结构化数据查询语言培训讲义
一.SQL语言 SQL是结构化的查询语言(Structured Query Language),是关系型数据库通讯的标准语言.第一代SQL产品是Oracle,它是当今关系型数据库技术的领导之一. 1. ...
- 独家 | 使用机器学习加速对非结构化数据的查询-第1部分(使用BlazeIt加速聚合和限制查询)...
作者:Daniel Daniel,Peter Bailis和Matei Zaharia 翻译:Kay 校对:王雨桐 本文约2800字,建议阅读13分钟. 本文为大家介绍了针对非结构化数据如何加快聚合和 ...
- 非结构化数据定义、处理方法及重要性
一.非结构化数据定义 不方便用数据库二维逻辑表来表现的数据即称为非结构化数据,包括所有格式的办公文档. 文本.图片. 标准通用标记语言下的子集 XML. HTML.各类报表.图像和音频/视频信息等等. ...
- 结构化数据、半结构化数据、非结构化数据
1 概念 结构化数据 结构化数据可以使用关系型数据库来表示和存储,如MySQL.Oracle.SQL Server等,表现二维形式的数据.可以通过固有键值获取相应信息. 一般特点是:数据以行为单位,一 ...
- 分析非结构化数据和非结构化处理
文章目录 一.非结构化数据的定义 二.非结构化处理的重要性 三.数据类型 四.非结构化处理的方法和手段 1. 采集 2. 查询 3. 存储 4. 前景 一.非结构化数据的定义 非结构化数据是数据结构不 ...
- 浅述非结构化数据与非结构化处理
文章目录 一.非结构化数据的定义 二.非结构化处理的重要性 1. 有大量的非结构化数据需要处理 2. 非结构化数据蕴藏着大量的价值 3. 非结构化处理不需要依靠数据科学家团队 4. 终端用户授权 三. ...
- 【华为云技术分享】Spark如何与深度学习框架协作,处理非结构化数据
随着大数据和AI业务的不断融合,大数据分析和处理过程中,通过深度学习技术对非结构化数据(如图片.音频.文本)进行大数据处理的业务场景越来越多.本文会介绍Spark如何与深度学习框架进行协同工作,在大数 ...
- Spark(六):SparkSQLAndDataFrames对结构化数据集与非结构化数据的处理
为什么80%的码农都做不了架构师?>>> 一:简单了解SparkSQL. Spark SQL 是结构化的数据处理一个Spark模块.与基本的Spark RDD API不同,Sp ...
- 独家 | 使用机器学习对非结构化数据加速查询-第2部分(具有统计保证的近似选择查询)...
作者:Daniel Kang, Edward Gan, Peter Bailis, Tatsunori Hashimoto, and Matei Zaharia 翻译:殷之涵 校对:方星轩 本文约28 ...
- 【数据湖加速篇】 —— 数据湖结构化数据优化与查询加速方案
简介: 近几年,数据湖架构的概念逐渐兴起,很多企业都在尝试构建数据湖.相比较大数据平台,数据湖在数据治理方面提出了更高的要求.对于数据湖场景所提出的新需求,"传统"的大数据工具在很 ...
最新文章
- 修改eclipse启动时eclipse使用的jre
- linux下面实现执行rm命令,显示do not use rm command
- 整数类型及整数类型的显示转换
- 如何让机器像人一样多角度思考?
- 51Nod-1014 X^2 Mod P【暴力】
- javamail 发送、读取邮件
- 设计心理学读书笔记 之一 记忆的结构
- ImDisk 命令行用法
- 大伽「趣」说AI:腾讯云在多个场景中的AI落地实践
- 自我管理-贝尔宾团队角色理论
- 计算机考证一级一般多少钱
- 六轴机器人光机_四轴机器人与六轴机器人有什么区别?
- c++文件保存与读取
- oracle里update+where,Oracle 关联表更新 update ,where exists
- [操作系统精髓与设计原理笔记] Chapter2 操作系统概述
- 你应该知道的10种软件工具
- pip install multiprocessing失败?可以这样解决
- python 爬取https://wall.alphacoders.com上的壁纸(入门级别)
- uniapp的tabBar不显示
- EasyPR--开发详解(2)车牌定位
热门文章
- vs编译之连接器工具警告LNK4099的解决
- 机器学习中的正则化(Regularization)
- mysql java 查寻用户_mysql 查询不同用户 最新的一条记录
- 2017php类库,AMQB官方PHP库
- 基于springboot+vue的房屋租赁系统(前后端分离)
- 微信登录功能的实现直接复制就能使用(封装)
- php基础之常量(系统常量,自定义常量)
- 阶段3 2.Spring_08.面向切面编程 AOP_6 四种常用通知类型
- 对GUID的一点探讨
- USB Host读取U盘成功