1.自定义弱类型UDAF

  1.1 弱类型UDAF定义

    弱类型UDAF继承实现 UserDefinedAggregateFunction 抽象类

    override def inputSchema: StructType = 输入schema

    override def bufferSchema: StructType = 聚合过程schema

    override def dataType: DataType = 返回值类型

    override def deterministic: Boolean = 是否固定返回值类型

    override def initialize(buffer: MutableAggregationBuffer): Unit = 初始化函数,用来初始化基准值

    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = 分区内元素如何聚合

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = 分区之间如何聚合

    override def evaluate(buffer: Row): Any = 聚合结果计算

    整个UDAF处理过程,非常类似RDD的aggregate算子

      aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

    一个自定义求平均数UDAF例子

            object UDAFApp extends App{val spark = SparkSession.builder().master("local[2]").appName("UDAP-App").getOrCreate();import  spark.implicits._;val df = spark.read.format("json").load("D:\\data\\employees.json")//UDAF函数注册 只有UserDefinedAggregateFunction才能为SQL注册函数spark.udf.register("cusAvg",MyAvgUDAF)//DF转临时视图df.createTempView("employees_view")spark.sql("select cusAvg(salary) as salary from employees_view").show();//df-api形式df.select(MyAvgUDAF.apply($"salary")).show()spark.close()}object MyAvgUDAF extends UserDefinedAggregateFunction{//输入schemaoverride def inputSchema: StructType = StructType(StructField("input",DoubleType)::Nil);//聚合过程schemaoverride def bufferSchema: StructType = StructType(StructField("Sum",DoubleType)::StructField("Count",LongType)::Nil)//返回值类型override def dataType: DataType = DoubleType//是否固定返回值类型override def deterministic: Boolean = true//初始化函数override def initialize(buffer: MutableAggregationBuffer): Unit = {//设定聚合基准初始值 aggregate算子((0,0))的部分buffer(0) = 0D; //总和0buffer(1) = 0L; //个数0
            }override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {//行第一列(Row[0])是否为nullif(!input.isNullAt(0)){//aggregate算子....(seqOp: (U, T) => U 部分buffer(0)= buffer.getDouble(0)+ input.getDouble(0);buffer(1) =buffer.getLong(1)+1;}}override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {//aggregate算子....combOp: (U, U) => U 部分buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0);buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1);}override def evaluate(buffer: Row): Any = buffer.getDouble(0) /buffer.getLong(1) ;}

2.自定义强类型UDAF

  自定义强类型UDAF 基础实现类 Aggregator

  所以这种定义方式不能在UDF中注册,也不能用在SQL中

  一个强类型UDAF定义如下:    

            object UDAFApp extends App{val spark = SparkSession.builder().master("local[2]").appName("UDAP-App").getOrCreate();import  spark.implicits._;val ds = spark.read.format("json").load("D:\\data\\employees.json").as[Employee]//ds-api形式ds.select(MyAverage.toColumn.name("salary")).show()spark.close()}//目标类型定义case class Employee(val name: String,val salary: Long)//聚合类型定义case class Average(var sum: Long, var count: Long)object MyAverage extends Aggregator[Employee, Average, Double]  {override def zero: Average = Average(0,0)override def reduce(b: Average, a: Employee): Average = {b.sum += a.salary;b.count +=  1b}override def merge(b1: Average, b2: Average): Average = {b1.sum += b2.sum;b1.count += b2.count;b1;}override def finish(reduction: Average): Double = {println(reduction.sum + "  "+ reduction.count)reduction.sum.toDouble/reduction.count}override def bufferEncoder: Encoder[Average] = Encoders.productoverride def outputEncoder: Encoder[Double] = Encoders.scalaDouble}

转载于:https://www.cnblogs.com/NightPxy/p/9269171.html

[Spark]-结构化数据查询之自定义UDAF相关推荐

  1. SQL结构化数据查询语言培训讲义

    一.SQL语言 SQL是结构化的查询语言(Structured Query Language),是关系型数据库通讯的标准语言.第一代SQL产品是Oracle,它是当今关系型数据库技术的领导之一. 1. ...

  2. 独家 | 使用机器学习加速对非结构化数据的查询-第1部分(使用BlazeIt加速聚合和限制查询)...

    作者:Daniel Daniel,Peter Bailis和Matei Zaharia 翻译:Kay 校对:王雨桐 本文约2800字,建议阅读13分钟. 本文为大家介绍了针对非结构化数据如何加快聚合和 ...

  3. 非结构化数据定义、处理方法及重要性

    一.非结构化数据定义 不方便用数据库二维逻辑表来表现的数据即称为非结构化数据,包括所有格式的办公文档. 文本.图片. 标准通用标记语言下的子集 XML. HTML.各类报表.图像和音频/视频信息等等. ...

  4. 结构化数据、半结构化数据、非结构化数据

    1 概念 结构化数据 结构化数据可以使用关系型数据库来表示和存储,如MySQL.Oracle.SQL Server等,表现二维形式的数据.可以通过固有键值获取相应信息. 一般特点是:数据以行为单位,一 ...

  5. 分析非结构化数据和非结构化处理

    文章目录 一.非结构化数据的定义 二.非结构化处理的重要性 三.数据类型 四.非结构化处理的方法和手段 1. 采集 2. 查询 3. 存储 4. 前景 一.非结构化数据的定义 非结构化数据是数据结构不 ...

  6. 浅述非结构化数据与非结构化处理

    文章目录 一.非结构化数据的定义 二.非结构化处理的重要性 1. 有大量的非结构化数据需要处理 2. 非结构化数据蕴藏着大量的价值 3. 非结构化处理不需要依靠数据科学家团队 4. 终端用户授权 三. ...

  7. 【华为云技术分享】Spark如何与深度学习框架协作,处理非结构化数据

    随着大数据和AI业务的不断融合,大数据分析和处理过程中,通过深度学习技术对非结构化数据(如图片.音频.文本)进行大数据处理的业务场景越来越多.本文会介绍Spark如何与深度学习框架进行协同工作,在大数 ...

  8. Spark(六):SparkSQLAndDataFrames对结构化数据集与非结构化数据的处理

    为什么80%的码农都做不了架构师?>>>    一:简单了解SparkSQL. Spark SQL 是结构化的数据处理一个Spark模块.与基本的Spark RDD API不同,Sp ...

  9. 独家 | 使用机器学习对非结构化数据加速查询-第2部分(具有统计保证的近似选择查询)...

    作者:Daniel Kang, Edward Gan, Peter Bailis, Tatsunori Hashimoto, and Matei Zaharia 翻译:殷之涵 校对:方星轩 本文约28 ...

  10. 【数据湖加速篇】 —— 数据湖结构化数据优化与查询加速方案

    简介: 近几年,数据湖架构的概念逐渐兴起,很多企业都在尝试构建数据湖.相比较大数据平台,数据湖在数据治理方面提出了更高的要求.对于数据湖场景所提出的新需求,"传统"的大数据工具在很 ...

最新文章

  1. 修改eclipse启动时eclipse使用的jre
  2. linux下面实现执行rm命令,显示do not use rm command
  3. 整数类型及整数类型的显示转换
  4. 如何让机器像人一样多角度思考?
  5. 51Nod-1014 X^2 Mod P【暴力】
  6. javamail 发送、读取邮件
  7. 设计心理学读书笔记 之一 记忆的结构
  8. ImDisk 命令行用法
  9. 大伽「趣」说AI:腾讯云在多个场景中的AI落地实践
  10. 自我管理-贝尔宾团队角色理论
  11. 计算机考证一级一般多少钱
  12. 六轴机器人光机_四轴机器人与六轴机器人有什么区别?
  13. c++文件保存与读取
  14. oracle里update+where,Oracle 关联表更新 update ,where exists
  15. [操作系统精髓与设计原理笔记] Chapter2 操作系统概述
  16. 你应该知道的10种软件工具
  17. pip install multiprocessing失败?可以这样解决
  18. python 爬取https://wall.alphacoders.com上的壁纸(入门级别)
  19. uniapp的tabBar不显示
  20. EasyPR--开发详解(2)车牌定位

热门文章

  1. vs编译之连接器工具警告LNK4099的解决
  2. 机器学习中的正则化(Regularization)
  3. mysql java 查寻用户_mysql 查询不同用户 最新的一条记录
  4. 2017php类库,AMQB官方PHP库
  5. 基于springboot+vue的房屋租赁系统(前后端分离)
  6. 微信登录功能的实现直接复制就能使用(封装)
  7. php基础之常量(系统常量,自定义常量)
  8. 阶段3 2.Spring_08.面向切面编程 AOP_6 四种常用通知类型
  9. 对GUID的一点探讨
  10. USB Host读取U盘成功