spark业务开发-聚合(agg)
spark业务开发-聚合(agg)
- 项目地址:https://gitee.com/cch-bigdata/spark-process.git
输入数据
name,profession,enroll,score
曾凰妹,金融学,北京电子科技学院,637
谢德炜,金融学,北京电子科技学院,542
林逸翔,金融学,北京电子科技学院,543
王丽云,金融学,北京电子科技学院,626
吴鸿毅,金融学,北京电子科技学院,591
施珊珊,经济学类,北京理工大学,581
柯祥坤,经济学类,北京理工大学,650
庄劲聪,经济学类,北京理工大学,551
吴雅思,经济学类,北京理工大学,529
周育传,经济学类,北京理工大学,682
丁俊伟,通信工程,北京电子科技学院,708
庄逸琳,通信工程,北京电子科技学院,708
吴志发,通信工程,北京电子科技学院,578
肖妮娜,通信工程,北京电子科技学院,557
蔡建明,通信工程,北京电子科技学院,583
林逸翔,通信工程,北京电子科技学院,543
数据随便构造的,无任何现实意义
输出数据
+----------------+----------+----+--------+------+------+-----------------+----+-----------------+
| enroll|profession|总数|去重总数|最小值|最大值| 平均值|总和| 标准方差|
+----------------+----------+----+--------+------+------+-----------------+----+-----------------+
|北京电子科技学院| 通信工程| 6| 6| 543| 708|612.8333333333334|3677| 75.1143572605575|
|北京电子科技学院| 金融学| 5| 5| 542| 637| 587.8|2939|44.70682274552733|
| 北京理工大学| 经济学类| 5| 5| 529| 682| 598.6|2993|65.22499520889212|
+----------------+----------+----+--------+------+------+-----------------+----+-----------------+
程序代码
package com.cch.bigdata.spark.process.aggimport com.cch.bigdata.spark.process.AbstractTransform
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, DataFrame, functions}import scala.collection.mutable.ListBuffer//聚合操作
class Aggregater extends AbstractTransform {//需要分组的列数组private val group_columns = Array("enroll","profession")//聚合操作的字段private val agg_columns = Array("name","name","score","score","score","score","score")//聚合模式//Count:数量//DistinctCount:去重数量//min:最小值//max:最大值//avg:平均值//sum:求和//Stddev:标准方差private val agg_mode = Array("Count","DistinctCount","min","max","avg","sum","Stddev")//列对应的中文名称private val new_column_cnames = Array("总数","去重总数","最小值","最大值","平均值","总和","标准方差")override def process(): Unit = {if (agg_columns.isEmpty) {throw new RuntimeException("聚合列不能为空")}if (agg_mode.isEmpty) {throw new RuntimeException("聚合模式不能为空")}if (agg_columns.length != agg_mode.length) {throw new RuntimeException("聚合列和聚合模式不匹配")}//获取输入流val df: DataFrame = loadCsv("src/main/resources/csv/admission.csv",spark)//构造结果列list,用来存储最后需要聚合的列val columnList: ListBuffer[Column] = new ListBuffer()var index = 0;agg_columns.foreach(c => {//聚合模式val aggMode: String = agg_mode(index)if (aggMode.isEmpty) {throw new RuntimeException("聚合模式为空,无法为字段" + c + "做聚合操作")}var columnNewName: String = new_column_cnames(index)if (columnNewName.isEmpty) {columnNewName = c}aggMode match {case "Count" => {columnList.append(count(c).as(columnNewName))}case "DistinctCount" => {columnList.append(countDistinct(c).as(columnNewName))}case "min" => {columnList.append(min(c).as(columnNewName))}case "max" => {columnList.append(functions.max(c).as(columnNewName))}case "sum" =>{columnList.append(functions.sum(c).as(columnNewName))}case "avg" => {columnList.append(avg(c).as(columnNewName))}case "Stddev" => {columnList.append(stddev(c).as(columnNewName))}case _ => {throw new RuntimeException("聚合策略不正确,仅支持[Count/DistinctCount/min/max/avg/Stddev]")}}index += 1})//方法1:提起第一个字段,用于后面的agg参数//使用这种方式多了一个处理第一个参数的步骤//但是可以处理字段别名的问题val firstColumn: Column = columnList(0)//删除第一个字段columnList.remove(0)//方法2:构造map数据结构// 使用方式简单,但是无法使用as别名// df.groupBy("department").agg(Map(// "age" -> "max",// "expense" -> "sum"// ))//如果分组列不为空,先进行分组if (!group_columns.isEmpty) {//分组聚合df.groupBy(group_columns.map(c => {col(c)}): _*).agg(firstColumn, columnList.map(c => {c}): _*).show()}else{//不分组聚合df.agg(firstColumn, columnList.map(c => {c}): _*).show()}}override def getAppName(): String = "聚合操作"
}
package com.cch.bigdata.spark.processimport com.cch.bigdata.spark.process.agg.Aggregaterobject AggTest {def main(args: Array[String]): Unit = {new Aggregater().process()}
}
参数解释
group_columns:需要进行分组聚合时的列名
agg_columns:需要聚合的字段数组
agg_mode:聚合方式
new_column_cnames:聚合后新列名
spark业务开发-聚合(agg)相关推荐
- spark业务开发-空值处理
spark业务开发-空值处理 项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据集 "id","name& ...
- spark业务开发-列选择
spark业务开发-列选择 项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据 "id","name&qu ...
- spark业务开发-添加索引列
spark业务开发-添加索引列 项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据 name,profession,enroll,sco ...
- spark业务开发-union合并(union)
spark业务开发-union合并(union) 项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据集1 id,name,profess ...
- Day651.NoSQL与RDBMS合理搭配问题 -Java业务开发常见错误
NoSQL与RDBMS合理搭配问题 Hello,阿昌来也!今天学习分享记录的关于Nosql数据库和Mysql数据库的一系列对比,架构安排,发挥出每种数据库的优势案例. 近几年,各种非关系型数据库,也就 ...
- Day623.并发编程工具类库使用错误问题 -Java业务开发常见错误
并发编程工具类库使用错误问题 多线程想必大家都知道,且JDK也为我们提供了很多并发编程的工具类库,接下来就是记录对应在业务开发中,可能会出现的并发编程工具类库使用错误的问题 一.线程复用导致信息错乱 ...
- 业务开发时,接口不能对外暴露该如何实现?
来源:blog.csdn.net/m0_71777195/article/details/127243452 在业务开发的时候,经常会遇到某一个接口不能对外暴露,只能内网服务间调用的实际需求.面对这样 ...
- Mybatis Plus版本升级及项目业务开发规范简要说明
Mybatis Plus版本升级及项目业务开发规范简要说明 码农: Alias > 版本升级说明:此次将Mybatis Plus的版本由2.x升级到3.1.2版本,最新版本是3.2,由于3.2版 ...
- 从业务开发中学习和理解架构设计
作者:张东爱(当爱) 阿里自主出行研发团队 一.前言 在软件开发领域经常会接触到架构这个词汇,在我最初的印象中,架构是一个很高级的词汇.它似乎代表了复杂的工程结构.高层次的抽象设计.最新的开发语言特 ...
最新文章
- 西安交通大学17年9月课程考试计算机应用基础,西安交通大学17年9月课程考试《计算机应用基础》作业考核试题 (含主观题)...
- nginx+php+mysql+erlang+mongo环境安装
- 算法导论-概率发生器
- java将HTML文件转化为pdf文件,如何使用Java将HTML网页转换为PDF文件
- java学习之—链表(3)
- 蓝桥杯 ALGO-120 算法训练 学做菜
- Linux平台下裸设备的绑定:
- Pandas速查手册中文
- 基于Opencv实现车牌图片识别系统
- 计算机在输电线路设计中的应用研究,精选:计算机在输电线路基础设计中的应用原稿...
- 迪普交换机恢复出厂设置_【迪普科技官网介绍】迪普科技交换机、防火墙_迪普科技(中国)公司简介-ZOL中关村在线厂商频道...
- 2019 “钱”途光明的 8 大前端开发技术,你还差几个?
- 【Love2d从青铜到王者】第十一篇:Love2d之图像(Images)
- 阿尔法狗怎么用机器学习做决策:马尔科夫链减少搜索空间说起(附PDF公号发“马链搜索”下载)
- andriod自带模拟器使用代理
- Excel-DATEDIF函数计算两日期天数差
- 移动前端自适应适配布局解决方案和比较
- python函数中的变量取出来_在Python中从函数调用中提取变量
- Jmeter登录163邮箱
- Python多线程、多进程最全整理