2021年大数据Spark(二十九):SparkSQL案例四开窗函数
目录
案例四:开窗函数
概述
介绍
聚合函数和开窗函数
开窗函数分类
聚合开窗函数
排序开窗函数
ROW_NUMBER顺序排序
RANK跳跃排序
DENSE_RANK连续排序
NTILE分组排名[了解]
代码演示
案例四:开窗函数
概述
https://www.cnblogs.com/qiuting/p/7880500.html
介绍
开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。
开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用 GROUP BY 子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。
聚合函数和开窗函数
聚合函数是将多行变成一行,count,avg....
开窗函数是将一行变成多行;
聚合函数如果要显示其他的列必须将列加入到group by中
开窗函数可以不使用group by,直接将所有信息显示出来
开窗函数分类
1.聚合开窗函数
聚合函数(列) OVER(选项),这里的选项可以是PARTITION BY 子句,但不可以是 ORDER BY 子句。
2.排序开窗函数
排序函数(列) OVER(选项),这里的选项可以是ORDER BY 子句,也可以是 OVER(PARTITION BY 子句 ORDER BY 子句),但不可以是 PARTITION BY 子句。
聚合开窗函数
示例1
OVER 关键字表示把聚合函数当成聚合开窗函数而不是聚合函数。
SQL标准允许将所有聚合函数用做聚合开窗函数。
spark.sql("select count(name) from scores").show
spark.sql("select name, class, score, count(name) over() name_count from scores").show
查询结果如下所示:
+----+-----+-----+----------+
|name|class|score|name_count|
+----+-----+-----+----------+
| a1| 1| 80| 11|
| a2| 1| 78| 11|
| a3| 1| 95| 11|
| a4| 2| 74| 11|
| a5| 2| 92| 11|
| a6| 3| 99| 11|
| a7| 3| 99| 11|
| a8| 3| 45| 11|
| a9| 3| 55| 11|
| a10| 3| 78| 11|
| a11| 3| 100| 11|
+----+-----+-----+----------+
示例2
OVER 关键字后的括号中还可以添加选项用以改变进行聚合运算的窗口范围。
如果 OVER 关键字后的括号中的选项为空,则开窗函数会对结果集中的所有行进行聚合运算。
开窗函数的 OVER 关键字后括号中的可以使用 PARTITION BY 子句来定义行的分区来供进行聚合计算。
与 GROUP BY 子句不同,PARTITION BY 子句创建的分区是独立于结果集的,创建的分区只是供进行聚合计算的,而且不同的开窗函数所创建的分区也不互相影响。
下面的 SQL 语句用于显示按照班级分组后每组的人数:
OVER(PARTITION BY class)表示对结果集按照 class 进行分区,并且计算当前行所属的组的聚合计算结果。
spark.sql("select name, class, score, count(name) over(partition by class) name_count from scores").show
查询结果如下所示:
+----+-----+-----+----------+
|name|class|score|name_count|
+----+-----+-----+----------+
| a1| 1| 80| 3|
| a2| 1| 78| 3|
| a3| 1| 95| 3|
| a6| 3| 99| 6|
| a7| 3| 99| 6|
| a8| 3| 45| 6|
| a9| 3| 55| 6|
| a10| 3| 78| 6|
| a11| 3| 100| 6|
| a4| 2| 74| 2|
| a5| 2| 92| 2|
+----+-----+-----+----------+
排序开窗函数
ROW_NUMBER顺序排序
row_number() over(order by score) as rownum 表示按score 升序的方式来排序,并得出排序结果的序号
注意:
在排序开窗函数中使用 PARTITION BY 子句需要放置在ORDER BY 子句之前。
●示例1
spark.sql("select name, class, score, row_number() over(order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| a8| 3| 45| 1|
| a9| 3| 55| 2|
| a4| 2| 74| 3|
| a2| 1| 78| 4|
| a10| 3| 78| 5|
| a1| 1| 80| 6|
| a5| 2| 92| 7|
| a3| 1| 95| 8|
| a6| 3| 99| 9|
| a7| 3| 99| 10|
| a11| 3| 100| 11|
+----+-----+-----+----+
spark.sql("select name, class, score, row_number() over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| a2| 1| 78| 1|
| a1| 1| 80| 2|
| a3| 1| 95| 3|
| a8| 3| 45| 1|
| a9| 3| 55| 2|
| a10| 3| 78| 3|
| a6| 3| 99| 4|
| a7| 3| 99| 5|
| a11| 3| 100| 6|
| a4| 2| 74| 1|
| a5| 2| 92| 2|
+----+-----+-----+----+
RANK跳跃排序
rank() over(order by score) as rank表示按 score升序的方式来排序,并得出排序结果的排名号。
这个函数求出来的排名结果可以并列,并列排名之后的排名将是并列的排名加上并列数
简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第三名,也就是没有了第二名,但是有两个第一名
●示例2
spark.sql("select name, class, score, rank() over(order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| a8| 3| 45| 1|
| a9| 3| 55| 2|
| a4| 2| 74| 3|
| a10| 3| 78| 4|
| a2| 1| 78| 4|
| a1| 1| 80| 6|
| a5| 2| 92| 7|
| a3| 1| 95| 8|
| a6| 3| 99| 9|
| a7| 3| 99| 9|
| a11| 3| 100| 11|
+----+-----+-----+----+
spark.sql("select name, class, score, rank() over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| a2| 1| 78| 1|
| a1| 1| 80| 2|
| a3| 1| 95| 3|
| a8| 3| 45| 1|
| a9| 3| 55| 2|
| a10| 3| 78| 3|
| a6| 3| 99| 4|
| a7| 3| 99| 4|
| a11| 3| 100| 6|
| a4| 2| 74| 1|
| a5| 2| 92| 2|
+----+-----+-----+----+
DENSE_RANK连续排序
dense_rank() over(order by score) as dense_rank 表示按score 升序的方式来排序,并得出排序结果的排名号。
这个函数并列排名之后的排名只是并列排名加1
简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第二名,也就是两个第一名,一个第二名
●示例3
spark.sql("select name, class, score, dense_rank() over(order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| a8| 3| 45| 1|
| a9| 3| 55| 2|
| a4| 2| 74| 3|
| a2| 1| 78| 4|
| a10| 3| 78| 4|
| a1| 1| 80| 5|
| a5| 2| 92| 6|
| a3| 1| 95| 7|
| a6| 3| 99| 8|
| a7| 3| 99| 8|
| a11| 3| 100| 9|
+----+-----+-----+----+
spark.sql("select name, class, score, dense_rank() over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| a2| 1| 78| 1|
| a1| 1| 80| 2|
| a3| 1| 95| 3|
| a8| 3| 45| 1|
| a9| 3| 55| 2|
| a10| 3| 78| 3|
| a6| 3| 99| 4|
| a7| 3| 99| 4|
| a11| 3| 100| 5|
| a4| 2| 74| 1|
| a5| 2| 92| 2|
+----+-----+-----+----+
NTILE分组排名[了解]
ntile(6) over(order by score)as ntile表示按 score 升序的方式来排序,然后 6 等分成 6 个组,并显示所在组的序号。
示例4
spark.sql("select name, class, score, ntile(6) over(order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| a8| 3| 45| 1|
| a9| 3| 55| 1|
| a4| 2| 74| 2|
| a2| 1| 78| 2|
| a10| 3| 78| 3|
| a1| 1| 80| 3|
| a5| 2| 92| 4|
| a3| 1| 95| 4|
| a6| 3| 99| 5|
| a7| 3| 99| 5|
| a11| 3| 100| 6|
+----+-----+-----+----+
spark.sql("select name, class, score, ntile(6) over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| a2| 1| 78| 1|
| a1| 1| 80| 2|
| a3| 1| 95| 3|
| a8| 3| 45| 1|
| a9| 3| 55| 2|
| a10| 3| 78| 3|
| a6| 3| 99| 4|
| a7| 3| 99| 5|
| a11| 3| 100| 6|
| a4| 2| 74| 1|
| a5| 2| 92| 2|
+----+-----+-----+----+
代码演示
package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}/*** Author itcast* Date 2020/9/21 9:33* Desc 使用SparkSQL支持的开窗函数/窗口函数完成对各个班级的学生成绩的排名*/
object RowNumberDemo {case class Score(name: String, clazz: Int, score: Int)def main(args: Array[String]): Unit = {//1.准备环境val spark: SparkSession = SparkSession.builder().appName("WordCount").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._//2.加载数据val scoreDF: DataFrame = sc.makeRDD(Array(Score("a1", 1, 80),Score("a2", 1, 78),Score("a3", 1, 95),Score("a4", 2, 74),Score("a5", 2, 92),Score("a6", 3, 99),Score("a7", 3, 99),Score("a8", 3, 45),Score("a9", 3, 55),Score("a10", 3, 78),Score("a11", 3, 100))).toDF("name", "class", "score")scoreDF.createOrReplaceTempView("t_scores")scoreDF.show()/*+----+-----+-----+|name|class|score|num+----+-----+-----+| a1| 1| 80|| a2| 1| 78|| a3| 1| 95|| a4| 2| 74|| a5| 2| 92|| a6| 3| 99|| a7| 3| 99|| a8| 3| 45|| a9| 3| 55|| a10| 3| 78|| a11| 3| 100|+----+-----+-----+*///使用ROW_NUMBER顺序排序spark.sql("select name, class, score, row_number() over(partition by class order by score) num from t_scores").show()//使用RANK跳跃排序spark.sql("select name, class, score, rank() over(partition by class order by score) num from t_scores").show()//使用DENSE_RANK连续排序spark.sql("select name, class, score, dense_rank() over(partition by class order by score) num from t_scores").show()/*
ROW_NUMBER顺序排序--1234
+----+-----+-----+---+
|name|class|score|num|
+----+-----+-----+---+
| a2| 1| 78| 1|
| a1| 1| 80| 2|
| a3| 1| 95| 3|
| a8| 3| 45| 1|
| a9| 3| 55| 2|| a10| 3| 78| 3|
| a6| 3| 99| 4|
| a7| 3| 99| 5|
| a11| 3| 100| 6|| a4| 2| 74| 1|
| a5| 2| 92| 2|
+----+-----+-----+---+使用RANK跳跃排序--1224
+----+-----+-----+---+
|name|class|score|num|
+----+-----+-----+---+
| a2| 1| 78| 1|
| a1| 1| 80| 2|
| a3| 1| 95| 3|
| a8| 3| 45| 1|
| a9| 3| 55| 2|| a10| 3| 78| 3|
| a6| 3| 99| 4|
| a7| 3| 99| 4|
| a11| 3| 100| 6|| a4| 2| 74| 1|
| a5| 2| 92| 2|
+----+-----+-----+---+DENSE_RANK连续排序--1223
+----+-----+-----+---+
|name|class|score|num|
+----+-----+-----+---+
| a2| 1| 78| 1|
| a1| 1| 80| 2|
| a3| 1| 95| 3|
| a8| 3| 45| 1|
| a9| 3| 55| 2|| a10| 3| 78| 3|
| a6| 3| 99| 4|
| a7| 3| 99| 4|
| a11| 3| 100| 5|| a4| 2| 74| 1|
| a5| 2| 92| 2|
+----+-----+-----+---+*//*val sql ="""|select 字段1,字段2,字段n,|row_number() over(partition by 字段1 order by 字段2 desc) num|from 表名|having num <= 3|""".stripMarginimport org.apache.spark.sql.functions._df.withColumn("num",row_number().over(Window.partitionBy('字段1).orderBy('字段2.desc))).filter('num <= 3).show(false)*/}
}
2021年大数据Spark(二十九):SparkSQL案例四开窗函数相关推荐
- 2021年大数据Spark(十九):Spark Core的共享变量
目录 共享变量 广播变量 累加器 案例演示 共享变量 在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副 ...
- 2021年大数据ELK(十九):使用FileBeat采集Kafka日志到Elasticsearch
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 使用FileBeat采集Kafka日志到Elasticsearch 一.需求分 ...
- 2021年大数据Spark(十二):Spark Core的RDD详解
目录 RDD详解 为什么需要RDD? 什么是RDD? RDD的5大特性 第一个:A list of partitions 第二个:A function for computing each split ...
- 2021年大数据Spark(十五):Spark Core的RDD常用算子
目录 常用算子 基本算子 分区操作函数算子 重分区函数算子 1).增加分区函数 2).减少分区函数 3).调整分区函数 聚合函数算子 Scala集合中的聚合函数 ...
- 2021年大数据Spark(十四):Spark Core的RDD操作
目录 RDD的操作 函数(算子)分类 Transformation函数 Action函数 RDD的操作 有一定开发经验的读者应该都使用过多线程,利用多核 CPU 的并行能力来加快运算速率 ...
- 2021年大数据Spark(十八):Spark Core的RDD Checkpoint
目录 RDD Checkpoint 引入 API 代码演示 总结:持久化和Checkpoint的区别 问题: 答案: 区别: RDD Checkpoint 引入 RDD 数据可以持久化,但是持久化/缓 ...
- 2021年大数据Spark(十六):Spark Core的RDD算子练习
目录 RDD算子练习 map 算子 filter 算子 flatMap 算子 交集.并集.差集.笛卡尔积 distinct 算子 first.take.top 算子 ...
- 2021年大数据Flink(十九):案例一 基于时间的滚动和滑动窗口
目录 案例一 基于时间的滚动和滑动窗口 需求 代码实现 案例一 基于时间的滚动和滑动窗口 需求 nc -lk 9999 有如下数据表示: 信号灯编号和通过该信号灯的车的数量 9,3 9,2 9,7 4 ...
- 2021年大数据Hadoop(十五):Hadoop的联邦机制 Federation
全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 Hadoop的联邦机制 Federation 背景概述 F ...
- 2021年大数据Hadoop(十四):HDFS的高可用机制
全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 HDFS的高可用机制 HDFS高可用介绍 组件介绍 Nam ...
最新文章
- java中execution的作用_一文初步了解Java虚拟机
- html用vue传递数据,Vue组件及数据传递详解
- Linux iterm 快捷键
- poj 2392 Space Elevator
- hihoCoder 1367 等式填空
- clob类型用java怎么存,Java 储存和读取 oracle CLOB 类型字段的实用方法
- 利用Python实现定时发送邮件,实现一款营销工具
- 谁都会做:简单易行的祛斑法 - 健康程序员,至尚生活!
- 《深入浅出MFC》学习笔记
- 只有它才能让云计算、大数据、人工智能大放异彩?它究竟有什么魔力?
- python 读取pdf 两栏_python 读取pdf
- 手把手教你0基础C语言速通
- 单片机学习都时候需要注意的步骤-依葫芦画瓢
- 【微服务】什么是SOA服务架构?
- 迈成专转本三毛计算机,念念不忘,必有回响【迈成专转本学员分享】
- 关于大数据,你应该知道的50个专业术语
- 基于 HTML5 Canvas 的简易 2D 3D 编辑器
- java项目开发实践 pdf_Java项目开发实践 覃遵跃.pdf
- 反向代理haproxy用法详解
- 浅析“高内聚,低耦合”
热门文章
- 2021-2028年中国阻燃装饰行业市场需求与投资规划分析报告
- 2022-2028年中国企业核心路由交换机行业市场前瞻与投资分析报告
- 数据结构(02)— 时间复杂度与空间复杂度转换
- 正向最大匹配 和逆向最大匹配对比比较
- 用gensim学习word2vec
- ZooKeeper简单使用
- PyTorch 自动微分示例
- 人脸识别数据集精粹(上)
- 2021年大数据Flink(三十一):​​​​​​​Table与SQL案例准备 依赖和​​​​​​​程序结构
- Thrift协议与传输选择