Spark SQL使用window进行统计
在上一篇文章中,首先按照netType进行了统计,接下来添加一个条件,按照城市进行统计:
def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("TopNStatJob").config("spark.sql.sources.partitionColumnTypeInference.enabled", "false").master("local[2]").getOrCreate()val accessDF = spark.read.format("parquet").load("file:///E:/test/clean")// accessDF.printSchema()accessDF.show(false)// 最受欢迎的TopN netType// netTypeAccessTopNStat(spark, accessDF)// 按照地市进行统计TopN课程cityTypeAccessTopNStat(spark, accessDF)spark.stop}
/*** 按照地市进行统计Top3课程** @param spark* @param accessDF*/def cityTypeAccessTopNStat(spark: SparkSession, accessDF: DataFrame): Unit = {val cityAccessTopNDF = accessDF.filter(accessDF.col("day") === "20190702" && accessDF.col("netType") === "wifi").groupBy("day", "uid", "city").agg(count("uid").as("times")).orderBy(desc("times"))cityAccessTopNDF.show(false)// window 函数在Spark SQL的使用cityAccessTopNDF.select(cityAccessTopNDF("day"), cityAccessTopNDF("uid"), cityAccessTopNDF("city"), cityAccessTopNDF("times"), row_number().over(Window.partitionBy("city").orderBy(cityAccessTopNDF("times").desc)).as("times_rank")).filter("times_rank <= 3").show(false)}
运行结果如下:
将结果写入mysql
创建数据表:
create table day_netType_city_access_topn_stat (
day varchar(8) not null,
uid bigint(10) not null,
city varchar(20) not null,
times bigint(10) not null,
times_rank bigint(10) not null,
primary key (day, uid)
)
创建一个Entity
package cn.ac.iie.logcase class DayCityNetTypeAccessStat(day:String, uid: Long, city:String, times: Long, times_rank: Long)
创建Dao
*** 批量保存DayCityNetTypeAccessStat到数据库** @param list*/def insertDayNetTypeCityAccessTopN(list: ListBuffer[DayCityNetTypeAccessStat]): Unit = {var connection: Connection = nullvar pstmt: PreparedStatement = nulltry {connection = MysqlUtils.getConnection()// 设置手动提交connection.setAutoCommit(false)val sql = "insert into day_netType_city_access_topn_stat (day, uid, city, times, times_rank) values (?,?,?,?,?)"pstmt = connection.prepareStatement(sql)for (ele <- list) {pstmt.setString(1, ele.day)pstmt.setLong(2, ele.uid)pstmt.setString(3, ele.city)pstmt.setLong(4, ele.times)pstmt.setLong(5, ele.times_rank)pstmt.addBatch()}pstmt.executeBatch() // 执行批量处理// 手动提交connection.commit()} catch {case e: Exception => e.printStackTrace()} finally {MysqlUtils.release(connection, pstmt)}}
将结果写入到Mysql中
// 将统计结果写入到Mysql中try {top3DF.foreachPartition(partitionOfRecords => {val list = new ListBuffer[DayCityNetTypeAccessStat]partitionOfRecords.foreach(info => {val day = info.getAs[String]("day")val uid = info.getAs[String]("uid").toLongval city = info.getAs[String]("city")val times = info.getAs[Long]("times")val timesRank = info.getAs[Int]("times_rank")list.append(DayCityNetTypeAccessStat(day, uid, city, times, timesRank))})StatDao.insertDayNetTypeCityAccessTopN(list)})} catch {case e: Exception => e.printStackTrace()}
Spark SQL使用window进行统计相关推荐
- Spark Sql窗口函数Window的使用(1)
窗口函数的使用(1) 窗口是非常重要的统计工具,很多数据库都支持窗口函数.Spark从1.4开始支持窗口(window)函数.它主要有以下一些特点: 先对在一组数据行上进行操作,这组数据被称为Fram ...
- 第71课:Spark SQL窗口函数解密与实战学习笔记
第71课:Spark SQL窗口函数解密与实战学习笔记 本期内容: 1 SparkSQL窗口函数解析 2 SparkSQL窗口函数实战 窗口函数是Spark内置函数中最有价值的函数,因为很多关于分组的 ...
- Spark SQL玩起来
标签(空格分隔): Spark [toc] 前言 Spark SQL的介绍只包含官方文档的Getting Started.DataSource.Performance Tuning和Distribut ...
- spark sql基本使用方法介绍(转载)
spark sql基本使用方法介绍 Spark中可以通过spark sql 直接查询Hive或impala中的数据, 一.启动方法 /data/spark-1.4.0-bin-cdh4/bin/spa ...
- dataframe记录数_大数据系列之Spark SQL、DataFrame和RDD数据统计与可视化
Spark大数据分析中涉及到RDD.Data Frame和SparkSQL的操作,本文简要介绍三种方式在数据统计中的算子使用. 1.在IPython Notebook运行Python Spark程序 ...
- 《Spark SQL大数据实例开发》9.2 综合案例实战——电商网站搜索排名统计
<Spark SQL大数据实例开发>9.2 综合案例实战--电商网站搜索排名统计 9.2.1 案例概述 本节演示一个网站搜索综合案例:以京东为例,用户登录京东网站,在搜索栏中输入搜 ...
- 73、Spark SQL之开窗函数以及top3销售额统计案例实战
开窗函数以及top3销售额统计案例实战 Spark 1.4.x版本以后,为Spark SQL和DataFrame引入了开窗函数,比如最经典,最常用的,row_number(),可以让我们实现分组取to ...
- 初识Spark2.0之Spark SQL
内存计算平台Spark在今年6月份的时候正式发布了spark2.0,相比上一版本的spark1.6版本,在内存优化,数据组织,流计算等方面都做出了较大的改变,同时更加注重基于DataFrame数据组织 ...
- Spark SQL原理及常用方法详解(二)
Spark SQL 一.Spark SQL基础知识 1.Spark SQL简介 (1)简单介绍 (2)Datasets & DataFrames (3)Spark SQL架构 (4)Spark ...
最新文章
- 实战分享:淘宝Web 3D应用与游戏开发
- 简单介绍实体类或对象序列化时,忽略为空属性的操作
- 今天,向 6 女性程序员先驱致敬
- 关于printf()与自增自减运算符结和问题
- 向量值函数在计算机工程与应用,拟Newton法在高阶矩阵中的应用-计算机工程与应用.PDF...
- apollo 参数传递_使用Apollo通过WebSocket通过STOMP轻松进行消息传递
- Bootstrap3 如何防止插件冲突
- Mysql复习(基础概念+基础操作)
- LIRe 源代码分析 6:检索(ImageSearcher)[以颜色布局为例]
- Photoshop 入门教程「3」如何缩放和平移图像?
- Oracle Exadata 技术详解 - 李亚
- diy计算机组装注意事项,电脑DIY常见误区有哪些 电脑组装新手注意事项
- 打印机登录无密码计算机,无密码,引发共享打印机拒绝访问故障
- 开弓没有回头箭,遭遇跳槽四大后悔事件,怎么办?
- [Unity插件]着色器关键字分析工具ShaderControl
- @ApiOperation
- surface屏幕自动调节亮度无法关闭
- 小米MIUI备份/小米助手数据通过BAK进行恢复 | 生成MIUI的descript.xml文件进行数据恢复 | 手动恢复MIUI备份/小米助手数据
- 【DASCTF2023】Misc mp3
- Java/Android 进程与线程之 多线程开发(二)
热门文章
- OO开发思想:面向对象的开发方法(Object oriented,OO)
- Redis 一个key-value存储系统 简介
- 磁盘阵列,双机热备,负载均衡
- cron表达式详解 Elastic-Job名次解释
- 日志文件切割服务logrotate配置及crontab定时任务的使用
- PHP 数组遍历 foreach 语法结构
- Linux之查看ubuntu版本
- mysql备份命令和还原命令_mysql数据备份和还原命令
- ndtmapping建图_自动驾驶系列:激光雷达建图和定位(NDT)
- 快速计算文件的MD5/SHA1/SHA256等校验值(Windows/Linux)