spark业务开发-聚合(agg)

  • 项目地址:https://gitee.com/cch-bigdata/spark-process.git

输入数据

name,profession,enroll,score
曾凰妹,金融学,北京电子科技学院,637
谢德炜,金融学,北京电子科技学院,542
林逸翔,金融学,北京电子科技学院,543
王丽云,金融学,北京电子科技学院,626
吴鸿毅,金融学,北京电子科技学院,591
施珊珊,经济学类,北京理工大学,581
柯祥坤,经济学类,北京理工大学,650
庄劲聪,经济学类,北京理工大学,551
吴雅思,经济学类,北京理工大学,529
周育传,经济学类,北京理工大学,682
丁俊伟,通信工程,北京电子科技学院,708
庄逸琳,通信工程,北京电子科技学院,708
吴志发,通信工程,北京电子科技学院,578
肖妮娜,通信工程,北京电子科技学院,557
蔡建明,通信工程,北京电子科技学院,583
林逸翔,通信工程,北京电子科技学院,543

数据随便构造的,无任何现实意义

输出数据

+----------------+----------+----+--------+------+------+-----------------+----+-----------------+
|          enroll|profession|总数|去重总数|最小值|最大值|           平均值|总和|         标准方差|
+----------------+----------+----+--------+------+------+-----------------+----+-----------------+
|北京电子科技学院|  通信工程|   6|       6|   543|   708|612.8333333333334|3677| 75.1143572605575|
|北京电子科技学院|    金融学|   5|       5|   542|   637|            587.8|2939|44.70682274552733|
|    北京理工大学|  经济学类|   5|       5|   529|   682|            598.6|2993|65.22499520889212|
+----------------+----------+----+--------+------+------+-----------------+----+-----------------+

程序代码

package com.cch.bigdata.spark.process.aggimport com.cch.bigdata.spark.process.AbstractTransform
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, DataFrame, functions}import scala.collection.mutable.ListBuffer//聚合操作
class Aggregater extends AbstractTransform {//需要分组的列数组private val group_columns = Array("enroll","profession")//聚合操作的字段private val agg_columns = Array("name","name","score","score","score","score","score")//聚合模式//Count:数量//DistinctCount:去重数量//min:最小值//max:最大值//avg:平均值//sum:求和//Stddev:标准方差private val agg_mode = Array("Count","DistinctCount","min","max","avg","sum","Stddev")//列对应的中文名称private val new_column_cnames = Array("总数","去重总数","最小值","最大值","平均值","总和","标准方差")override def process(): Unit = {if (agg_columns.isEmpty) {throw new RuntimeException("聚合列不能为空")}if (agg_mode.isEmpty) {throw new RuntimeException("聚合模式不能为空")}if (agg_columns.length != agg_mode.length) {throw new RuntimeException("聚合列和聚合模式不匹配")}//获取输入流val df: DataFrame = loadCsv("src/main/resources/csv/admission.csv",spark)//构造结果列list,用来存储最后需要聚合的列val columnList: ListBuffer[Column] = new ListBuffer()var index = 0;agg_columns.foreach(c => {//聚合模式val aggMode: String = agg_mode(index)if (aggMode.isEmpty) {throw new RuntimeException("聚合模式为空,无法为字段" + c + "做聚合操作")}var columnNewName: String = new_column_cnames(index)if (columnNewName.isEmpty) {columnNewName = c}aggMode match {case "Count" => {columnList.append(count(c).as(columnNewName))}case "DistinctCount" => {columnList.append(countDistinct(c).as(columnNewName))}case "min" => {columnList.append(min(c).as(columnNewName))}case "max" => {columnList.append(functions.max(c).as(columnNewName))}case "sum" =>{columnList.append(functions.sum(c).as(columnNewName))}case "avg" => {columnList.append(avg(c).as(columnNewName))}case "Stddev" => {columnList.append(stddev(c).as(columnNewName))}case _ => {throw new RuntimeException("聚合策略不正确,仅支持[Count/DistinctCount/min/max/avg/Stddev]")}}index += 1})//方法1:提起第一个字段,用于后面的agg参数//使用这种方式多了一个处理第一个参数的步骤//但是可以处理字段别名的问题val firstColumn: Column = columnList(0)//删除第一个字段columnList.remove(0)//方法2:构造map数据结构// 使用方式简单,但是无法使用as别名//    df.groupBy("department").agg(Map(//      "age" -> "max",//      "expense" -> "sum"//    ))//如果分组列不为空,先进行分组if (!group_columns.isEmpty) {//分组聚合df.groupBy(group_columns.map(c => {col(c)}): _*).agg(firstColumn, columnList.map(c => {c}): _*).show()}else{//不分组聚合df.agg(firstColumn, columnList.map(c => {c}): _*).show()}}override def getAppName(): String = "聚合操作"
}
package com.cch.bigdata.spark.processimport com.cch.bigdata.spark.process.agg.Aggregaterobject AggTest {def main(args: Array[String]): Unit = {new Aggregater().process()}
}

参数解释

  • group_columns:需要进行分组聚合时的列名

  • agg_columns:需要聚合的字段数组

  • agg_mode:聚合方式

  • new_column_cnames:聚合后新列名

spark业务开发-聚合(agg)相关推荐

  1. spark业务开发-空值处理

    spark业务开发-空值处理 项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据集 "id","name& ...

  2. spark业务开发-列选择

    spark业务开发-列选择 项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据 "id","name&qu ...

  3. spark业务开发-添加索引列

    spark业务开发-添加索引列 项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据 name,profession,enroll,sco ...

  4. spark业务开发-union合并(union)

    spark业务开发-union合并(union) 项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据集1 id,name,profess ...

  5. Day651.NoSQL与RDBMS合理搭配问题 -Java业务开发常见错误

    NoSQL与RDBMS合理搭配问题 Hello,阿昌来也!今天学习分享记录的关于Nosql数据库和Mysql数据库的一系列对比,架构安排,发挥出每种数据库的优势案例. 近几年,各种非关系型数据库,也就 ...

  6. Day623.并发编程工具类库使用错误问题 -Java业务开发常见错误

    并发编程工具类库使用错误问题 多线程想必大家都知道,且JDK也为我们提供了很多并发编程的工具类库,接下来就是记录对应在业务开发中,可能会出现的并发编程工具类库使用错误的问题 一.线程复用导致信息错乱 ...

  7. 业务开发时,接口不能对外暴露该如何实现?

    来源:blog.csdn.net/m0_71777195/article/details/127243452 在业务开发的时候,经常会遇到某一个接口不能对外暴露,只能内网服务间调用的实际需求.面对这样 ...

  8. Mybatis Plus版本升级及项目业务开发规范简要说明

    Mybatis Plus版本升级及项目业务开发规范简要说明 码农: Alias > 版本升级说明:此次将Mybatis Plus的版本由2.x升级到3.1.2版本,最新版本是3.2,由于3.2版 ...

  9. 从业务开发中学习和理解架构设计

    作者:张东爱(当爱)  阿里自主出行研发团队 一.前言 在软件开发领域经常会接触到架构这个词汇,在我最初的印象中,架构是一个很高级的词汇.它似乎代表了复杂的工程结构.高层次的抽象设计.最新的开发语言特 ...

最新文章

  1. 西安交通大学17年9月课程考试计算机应用基础,西安交通大学17年9月课程考试《计算机应用基础》作业考核试题 (含主观题)...
  2. nginx+php+mysql+erlang+mongo环境安装
  3. 算法导论-概率发生器
  4. java将HTML文件转化为pdf文件,如何使用Java将HTML网页转换为PDF文件
  5. java学习之—链表(3)
  6. 蓝桥杯 ALGO-120 算法训练 学做菜
  7. Linux平台下裸设备的绑定:
  8. Pandas速查手册中文
  9. 基于Opencv实现车牌图片识别系统
  10. 计算机在输电线路设计中的应用研究,精选:计算机在输电线路基础设计中的应用原稿...
  11. 迪普交换机恢复出厂设置_【迪普科技官网介绍】迪普科技交换机、防火墙_迪普科技(中国)公司简介-ZOL中关村在线厂商频道...
  12. 2019 “钱”途光明的 8 大前端开发技术,你还差几个?
  13. 【Love2d从青铜到王者】第十一篇:Love2d之图像(Images)
  14. 阿尔法狗怎么用机器学习做决策:马尔科夫链减少搜索空间说起(附PDF公号发“马链搜索”下载)
  15. andriod自带模拟器使用代理
  16. Excel-DATEDIF函数计算两日期天数差
  17. 移动前端自适应适配布局解决方案和比较
  18. python函数中的变量取出来_在Python中从函数调用中提取变量
  19. Jmeter登录163邮箱
  20. Python多线程、多进程最全整理

热门文章

  1. Java设计模式之适配器模式(属于结构型模式)
  2. html5指纹api,头条:如何获取浏览器指纹信息
  3. 小度音箱dueros授权
  4. 魅族升级过程中出现android,魅族官宣魅族17升级Android11底层?结果魅友却怒了
  5. 利用机器学习进行手写数字识别
  6. nucle LED 调光PWM
  7. Java实现新闻门户网站
  8. FORTRAN之父逝世——一位回头浪子的传奇人生
  9. 2021年计算机职业高中分数线,2021中职学校录取分数线
  10. 调用短信接口发送验证码,并对验证码进行验证