在上一篇文章中,首先按照netType进行了统计,接下来添加一个条件,按照城市进行统计:

  def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("TopNStatJob").config("spark.sql.sources.partitionColumnTypeInference.enabled", "false").master("local[2]").getOrCreate()val accessDF = spark.read.format("parquet").load("file:///E:/test/clean")//    accessDF.printSchema()accessDF.show(false)// 最受欢迎的TopN netType// netTypeAccessTopNStat(spark, accessDF)// 按照地市进行统计TopN课程cityTypeAccessTopNStat(spark, accessDF)spark.stop}
/*** 按照地市进行统计Top3课程** @param spark* @param accessDF*/def cityTypeAccessTopNStat(spark: SparkSession, accessDF: DataFrame): Unit = {val cityAccessTopNDF = accessDF.filter(accessDF.col("day") === "20190702" && accessDF.col("netType") === "wifi").groupBy("day", "uid", "city").agg(count("uid").as("times")).orderBy(desc("times"))cityAccessTopNDF.show(false)// window 函数在Spark SQL的使用cityAccessTopNDF.select(cityAccessTopNDF("day"), cityAccessTopNDF("uid"), cityAccessTopNDF("city"), cityAccessTopNDF("times"), row_number().over(Window.partitionBy("city").orderBy(cityAccessTopNDF("times").desc)).as("times_rank")).filter("times_rank <= 3").show(false)}

运行结果如下:

将结果写入mysql

创建数据表:

create table day_netType_city_access_topn_stat (
day varchar(8) not null,
uid bigint(10) not null,
city varchar(20) not null,
times bigint(10) not null,
times_rank  bigint(10) not null,
primary key (day, uid)
)

创建一个Entity

package cn.ac.iie.logcase class DayCityNetTypeAccessStat(day:String, uid: Long, city:String, times: Long, times_rank: Long)

创建Dao

*** 批量保存DayCityNetTypeAccessStat到数据库** @param list*/def insertDayNetTypeCityAccessTopN(list: ListBuffer[DayCityNetTypeAccessStat]): Unit = {var connection: Connection = nullvar pstmt: PreparedStatement = nulltry {connection = MysqlUtils.getConnection()// 设置手动提交connection.setAutoCommit(false)val sql = "insert into day_netType_city_access_topn_stat (day, uid, city, times, times_rank) values (?,?,?,?,?)"pstmt = connection.prepareStatement(sql)for (ele <- list) {pstmt.setString(1, ele.day)pstmt.setLong(2, ele.uid)pstmt.setString(3, ele.city)pstmt.setLong(4, ele.times)pstmt.setLong(5, ele.times_rank)pstmt.addBatch()}pstmt.executeBatch() // 执行批量处理// 手动提交connection.commit()} catch {case e: Exception => e.printStackTrace()} finally {MysqlUtils.release(connection, pstmt)}}

将结果写入到Mysql中

// 将统计结果写入到Mysql中try {top3DF.foreachPartition(partitionOfRecords => {val list = new ListBuffer[DayCityNetTypeAccessStat]partitionOfRecords.foreach(info => {val day = info.getAs[String]("day")val uid = info.getAs[String]("uid").toLongval city = info.getAs[String]("city")val times = info.getAs[Long]("times")val timesRank = info.getAs[Int]("times_rank")list.append(DayCityNetTypeAccessStat(day, uid, city, times, timesRank))})StatDao.insertDayNetTypeCityAccessTopN(list)})} catch {case e: Exception => e.printStackTrace()}

Spark SQL使用window进行统计相关推荐

  1. Spark Sql窗口函数Window的使用(1)

    窗口函数的使用(1) 窗口是非常重要的统计工具,很多数据库都支持窗口函数.Spark从1.4开始支持窗口(window)函数.它主要有以下一些特点: 先对在一组数据行上进行操作,这组数据被称为Fram ...

  2. 第71课:Spark SQL窗口函数解密与实战学习笔记

    第71课:Spark SQL窗口函数解密与实战学习笔记 本期内容: 1 SparkSQL窗口函数解析 2 SparkSQL窗口函数实战 窗口函数是Spark内置函数中最有价值的函数,因为很多关于分组的 ...

  3. Spark SQL玩起来

    标签(空格分隔): Spark [toc] 前言 Spark SQL的介绍只包含官方文档的Getting Started.DataSource.Performance Tuning和Distribut ...

  4. spark sql基本使用方法介绍(转载)

    spark sql基本使用方法介绍 Spark中可以通过spark sql 直接查询Hive或impala中的数据, 一.启动方法 /data/spark-1.4.0-bin-cdh4/bin/spa ...

  5. dataframe记录数_大数据系列之Spark SQL、DataFrame和RDD数据统计与可视化

    Spark大数据分析中涉及到RDD.Data Frame和SparkSQL的操作,本文简要介绍三种方式在数据统计中的算子使用. 1.在IPython Notebook运行Python Spark程序 ...

  6. 《Spark SQL大数据实例开发》9.2 综合案例实战——电商网站搜索排名统计

    <Spark SQL大数据实例开发>9.2 综合案例实战--电商网站搜索排名统计 9.2.1 案例概述     本节演示一个网站搜索综合案例:以京东为例,用户登录京东网站,在搜索栏中输入搜 ...

  7. 73、Spark SQL之开窗函数以及top3销售额统计案例实战

    开窗函数以及top3销售额统计案例实战 Spark 1.4.x版本以后,为Spark SQL和DataFrame引入了开窗函数,比如最经典,最常用的,row_number(),可以让我们实现分组取to ...

  8. 初识Spark2.0之Spark SQL

    内存计算平台Spark在今年6月份的时候正式发布了spark2.0,相比上一版本的spark1.6版本,在内存优化,数据组织,流计算等方面都做出了较大的改变,同时更加注重基于DataFrame数据组织 ...

  9. Spark SQL原理及常用方法详解(二)

    Spark SQL 一.Spark SQL基础知识 1.Spark SQL简介 (1)简单介绍 (2)Datasets & DataFrames (3)Spark SQL架构 (4)Spark ...

最新文章

  1. 实战分享:淘宝Web 3D应用与游戏开发
  2. 简单介绍实体类或对象序列化时,忽略为空属性的操作
  3. 今天,向 6 女性程序员先驱致敬
  4. 关于printf()与自增自减运算符结和问题
  5. 向量值函数在计算机工程与应用,拟Newton法在高阶矩阵中的应用-计算机工程与应用.PDF...
  6. apollo 参数传递_使用Apollo通过WebSocket通过STOMP轻松进行消息传递
  7. Bootstrap3 如何防止插件冲突
  8. Mysql复习(基础概念+基础操作)
  9. LIRe 源代码分析 6:检索(ImageSearcher)[以颜色布局为例]
  10. Photoshop 入门教程「3」如何缩放和平移图像?
  11. Oracle Exadata 技术详解 - 李亚
  12. diy计算机组装注意事项,电脑DIY常见误区有哪些 电脑组装新手注意事项
  13. 打印机登录无密码计算机,无密码,引发共享打印机拒绝访问故障
  14. 开弓没有回头箭,遭遇跳槽四大后悔事件,怎么办?
  15. [Unity插件]着色器关键字分析工具ShaderControl
  16. @ApiOperation
  17. surface屏幕自动调节亮度无法关闭
  18. 小米MIUI备份/小米助手数据通过BAK进行恢复 | 生成MIUI的descript.xml文件进行数据恢复 | 手动恢复MIUI备份/小米助手数据
  19. 【DASCTF2023】Misc mp3
  20. Java/Android 进程与线程之 多线程开发(二)

热门文章

  1. OO开发思想:面向对象的开发方法(Object oriented,OO)
  2. Redis 一个key-value存储系统 简介
  3. 磁盘阵列,双机热备,负载均衡
  4. cron表达式详解 Elastic-Job名次解释
  5. 日志文件切割服务logrotate配置及crontab定时任务的使用
  6. PHP 数组遍历 foreach 语法结构
  7. Linux之查看ubuntu版本
  8. mysql备份命令和还原命令_mysql数据备份和还原命令
  9. ndtmapping建图_自动驾驶系列:激光雷达建图和定位(NDT)
  10. 快速计算文件的MD5/SHA1/SHA256等校验值(Windows/Linux)