聚合

DataFrames可以提供共同聚合,例如count(),countDistinct(),avg(),max(),min()等。虽然这些功能是专为DataFrames,星火SQL还拥有类型安全的版本,在其中的一些 斯卡拉和 Java的使用强类型数据集的工作。此外,用户不限于预定义的聚合函数,并且可以创建自己的聚合函数。

无用户定义的聚合函数

扩展UserDefinedAggregateFunction 抽象类以实现自定义无类型聚合函数。
例如,用户定义的平均值:
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._object MyAverage extends UserDefinedAggregateFunction {// 集合函数的输入类型def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)// 集合缓冲区的数据类型def bufferSchema: StructType = {StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)}//返回值的数据类型def  dataType : DataType  =  DoubleType
//确定是否返回相同的输出def deterministic: Boolean = true
//初始化给定的聚合缓冲区。缓冲区本身是一个`Row`,除了标准方法之外,比如在索引处检索值(例如,get(),getBoolean()),提供了更新其值的机会。请注意,缓冲区内的数组和映射仍然是不可变的。def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0Lbuffer(1) = 0L}// 更新数据到指定的聚合缓冲区`bufferdef update(buffer: MutableAggregationBuffer, input: Row): Unit = {if (!input.isNullAt(0)) {buffer(0) = buffer.getLong(0) + input.getLong(0)buffer(1) = buffer.getLong(1) + 1}}// 合并两个聚合缓冲剂和存储更新的缓冲器值回`buffer1` def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)}// 计算最终结果def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}// 注册访问的函数
spark.udf.register("myAverage", MyAverage)val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

用户定义聚合函数

强类型数据集的用户定义聚合围绕Aggregator抽象类。
例如,类型安全的用户定义平均值所示:
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregatorcase class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)object MyAverage extends Aggregator[Employee, Average, Double] {// 此聚合的零值。应该满足任何b + zero = b def zero: Average = Average(0L, 0L)// 组合两个值以产生新值。为了提高性能,该函数可以修改`buffer` //并返回它而不是构造一个新的对象def reduce(buffer: Average, employee: Employee): Average = {buffer.sum += employee.salarybuffer.count += 1buffer}// 合并两个中间值def merge(b1: Average, b2: Average): Average = {b1.sum += b2.sumb1.count += b2.countb1}// 转换减少def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count// 指定中间件类型def bufferEncoder: Encoder[Average] = Encoders.product// 为最终输出值类型指定编码器def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+// 将函数转换为`TypedColumn`并给它命名
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

Spark Sql 聚合相关推荐

  1. spark sql 查看分区_Spark 3.0 中七个必须知道的 SQL 性能优化

    本文来自 IBM 东京研究院的高级技术人员 Kazuaki Ishizaki 博士在 Spark Summit North America 2020 的 <SQL Performance Imp ...

  2. Spark基础:(六)Spark SQL

    1.相关介绍 Datasets:一个 Dataset 是一个分布式的数据集合 Dataset 是在 Spark 1.6 中被添加的新接口, 它提供了 RDD 的优点(强类型化, 能够使用强大的 lam ...

  3. Spark SQL玩起来

    标签(空格分隔): Spark [toc] 前言 Spark SQL的介绍只包含官方文档的Getting Started.DataSource.Performance Tuning和Distribut ...

  4. Spark SQL Catalyst源代码分析Optimizer

    /** Spark SQL源代码分析系列*/ 前几篇文章介绍了Spark SQL的Catalyst的核心运行流程.SqlParser,和Analyzer 以及核心类库TreeNode,本文将具体解说S ...

  5. Spark SQL 函数全集

    org.apache.spark.sql.functions是一个Object,提供了约两百多个函数. 大部分函数与Hive的差不多. 除UDF函数,均可在spark-sql中直接使用. 经过impo ...

  6. Spark性能优化 -- Spark SQL、DataFrame、Dataset

    本文将详细分析和总结Spark SQL及其DataFrame.Dataset的相关原理和优化过程. Spark SQL简介 Spark SQL是Spark中 具有 大规模关系查询的结构化数据处理 模块 ...

  7. 如何查询spark版本_掌握Spark SQL中的查询执行

    了解您的查询计划 自从Spark 2.x以来,由于SQL和声明性DataFrame API,在Spark中查询数据已成为一种奢侈. 仅使用几行高级代码就可以表达非常复杂的逻辑并执行复杂的转换. API ...

  8. Spark SQL之queryExecution运行流程解析Logical Plan(三)

    1.整体运行流程 使用下列代码对SparkSQL流程进行分析,让大家明白LogicalPlan的几种状态,理解SparkSQL整体执行流程 // sc is an existing SparkCont ...

  9. spark sql中的窗口函数

    2019独角兽企业重金招聘Python工程师标准>>> databricks博客给出的窗口函数概述 Spark SQL supports three kinds of window ...

最新文章

  1. 几何视角看线性方程组解的情况
  2. php更新so需要重启吗,不重新编译PHP为php增加openssl.so模块
  3. 大数据平台网站日志分析系统
  4. python多进程间通信
  5. 从苹果店员到机器学习工程师:学习AI,我是这样起步的
  6. 十四、Oracle学习笔记:集合操作
  7. MiPony– 杀手级免费网盘下载工具 可挂机下载支持YunFile
  8. C语言段错误的有用总结
  9. aforge java_java(一些java API)或C#(emgucv,dshownet,Aforge.NET)中的实时对象跟踪
  10. nod32 下载几账户翻译
  11. IAST 技术进阶系列(二):全场景多核驱动
  12. win10开机就卡死在桌面上怎么解决
  13. Jetty插件运行报500错误
  14. API 接口监控产品全新改版,免费开放全部功能
  15. github史上最全教程
  16. 如何用快启动pe修复win10系统引导? 神器
  17. 什么是rmi?为什么要使用rmi框架?
  18. C++ 实现 Matrix (矩阵)类
  19. 【财务管理论文】大数据背景下企业财务管理的挑战与机遇(节选)
  20. sea.js简单配置

热门文章

  1. 机器人视觉分析算法_机器视觉处理:目标检测和跟踪
  2. 织梦DedeCMS搜索页面搜索结果总数调用方法
  3. 工薪族巧理财之定期存款中整存整取、零存整取、存本取息之间的微妙区别
  4. Ubuntu 18.04 LTS 安装JDK1.8-Linux-64
  5. 安装最新版本zlib
  6. 宋第一状元宰相吕蒙正三赋
  7. java aes 工具类_Java中的AES加解密工具类:AESUtils
  8. 安卓/Android 点击按钮/返回键跳转返回到 手机系统桌面
  9. 服务器与客户端的TCP连接
  10. LNK2019 unresolved external symbol __iob_func referenced in function _OPENSSL_stderr