目录

案例三:电影评分数据分析

代码实现

Shuffle分区数


案例三:电影评分数据分析

使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明:

对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于200)

数据格式如下,每行数据各个字段之间使用双冒号分开:

数据处理分析步骤如下:

  1. 第一步、读取电影评分数据,从本地文件系统读取
  2. 第二步、转换数据,指定Schema信息,封装到DataFrame
  3. 第三步、基于SQL方式分析
  4. 第四步、基于DSL方式分析

代码实现

电影评分数据分析,经过数据ETL、数据分析(SQL分析和DSL分析)及最终保存结果,整套数据处理分析流程,其中涉及到很多数据细节,完整代码如下:

package cn.itcast.sqlimport java.util.Propertiesimport org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.storage.StorageLevel/*** 需求:对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)*/
object SparkTop10Movie {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// TODO: 设置shuffle时分区数目.config("spark.sql.shuffle.partitions", "4").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._// 1. 读取电影评分数据,从本地文件系统读取val rawRatingsDS: Dataset[String] = spark.read.textFile("data/input/rating_100k.data")// 2. 转换数据val ratingsDF: DataFrame = rawRatingsDS// 过滤数据.filter(line => null != line && line.trim.split("\t").length == 4)// 提取转换数据.mapPartitions{iter =>iter.map{line =>// 按照分割符分割,拆箱到变量中val Array(userId, movieId, rating, timestamp) = line.trim.split("\t")// 返回四元组(userId, movieId, rating.toDouble, timestamp.toLong)}}// 指定列名添加Schema.toDF("userId", "movieId", "rating", "timestamp")/*root|-- userId: string (nullable = true)|-- movieId: string (nullable = true)|-- rating: double (nullable = false)|-- timestamp: long (nullable = false)*/ratingsDF.printSchema()/*+------+-------+------+---------+|userId|movieId|rating|timestamp|+------+-------+------+---------+|     1|   1193|   5.0|978300760||     1|    661|   3.0|978302109||     1|    594|   4.0|978302268||     1|    919|   4.0|978301368|+------+-------+------+---------+*/ratingsDF.show(4)// TODO: 基于SQL方式分析// 第一步、注册DataFrame为临时视图ratingsDF.createOrReplaceTempView("view_temp_ratings")// 第二步、编写SQLval top10MovieDF: DataFrame = spark.sql("""|SELECT|  movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating|FROM|  view_temp_ratings|GROUP BY|  movieId|HAVING|  cnt_rating > 200|ORDER BY|  avg_rating DESC, cnt_rating DESC|LIMIT|  10""".stripMargin)//top10MovieDF.printSchema()top10MovieDF.show(10, truncate = false)println("===============================================================")// TODO: 基于DSL=Domain Special Language(特定领域语言) 分析import org.apache.spark.sql.functions._val resultDF: DataFrame = ratingsDF// 选取字段.select($"movieId", $"rating")// 分组:按照电影ID,获取平均评分和评分次数.groupBy($"movieId").agg(round(avg($"rating"), 2).as("avg_rating"),count($"movieId").as("cnt_rating"))// 过滤:评分次数大于200.filter($"cnt_rating" > 200)// 排序:先按照评分降序,再按照次数降序.orderBy($"avg_rating".desc, $"cnt_rating".desc)// 获取前10.limit(10)//resultDF.printSchema()resultDF.show(10)/*// TODO: 将分析的结果数据保存MySQL数据库和CSV文件// 结果DataFrame被使用多次,缓存resultDF.persist(StorageLevel.MEMORY_AND_DISK)// 1. 保存MySQL数据库表汇总resultDF.coalesce(1).write.mode("overwrite").option("driver", "com.mysql.jdbc.Driver").option("user", "root").option("password", "root").jdbc("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","top10_movies",new Properties())// 2. 保存CSV文件:每行数据中个字段之间使用逗号隔开resultDF.coalesce(1).write.mode("overwrite").csv("data/output/top10-movies")// 释放缓存数据resultDF.unpersist()*/spark.stop()}
}

​​​​​​​Shuffle分区数

运行上述程序时,查看WEB UI监控页面发现,某个Stage中有200个Task任务,也就是说RDD有200分区Partition。

原因:在SparkSQL中当Job中产生Shuffle时,默认的分区数(spark.sql.shuffle.partitions )为200,在实际项目中要合理的设置。可以在构建SparkSession实例对象时进行设置

val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// TODO: 设置shuffle时分区数目.config("spark.sql.shuffle.partitions", "4").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._

2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析相关推荐

  1. 2021年大数据Spark(十八):Spark Core的RDD Checkpoint

    目录 RDD Checkpoint 引入 API 代码演示 总结:持久化和Checkpoint的区别 问题: 答案: 区别: RDD Checkpoint 引入 RDD 数据可以持久化,但是持久化/缓 ...

  2. 大数据Spark(二十八):SparkSQL案例三电影评分数据分析

    文章目录 案例三:电影评分数据分析 代码实现 Shuffle分区数 案例三:电影评分数据分析 使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程

  3. 2021年大数据ELK(十八):Beats 简单介绍和FileBeat工作原理

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 Beats 简单介绍和FileBeat工作原理 一.Beats 二.FileB ...

  4. 客快物流大数据项目(二十八):大数据服务器环境准备

    目录 大数据服务器环境准备 一.服务器规划 二.Linux虚拟机环境搭建

  5. 2021年大数据Spark(十二):Spark Core的RDD详解

    目录 RDD详解 为什么需要RDD? 什么是RDD? RDD的5大特性 第一个:A list of partitions 第二个:A function for computing each split ...

  6. 2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量

    目录 共享变量 广播变量 累加器 ​​​​​​​案例演示 共享变量 在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副 ...

  7. 2021年大数据Spark(十五):Spark Core的RDD常用算子

    目录 常用算子 基本算子 分区操作函数算子 重分区函数算子 1).增加分区函数 2).减少分区函数 3).调整分区函数 ​​​​​​​聚合函数算子 ​​​​​​​Scala集合中的聚合函数 ​​​​​ ...

  8. 2021年大数据Spark(十四):Spark Core的RDD操作

    目录 RDD的操作 函数(算子)分类 Transformation函数 ​​​​​​​Action函数 RDD的操作 有一定开发经验的读者应该都使用过多线程,利用多核 CPU 的并行能力来加快运算速率 ...

  9. 2021年大数据Spark(十六):Spark Core的RDD算子练习

    目录 RDD算子练习 map 算子 filter 算子 flatMap 算子 交集.并集.差集.笛卡尔积 distinct 算子 ​​​​​​​​​​​​​​first.take.top 算子 ​​​ ...

最新文章

  1. CocoaPods导入的库其头文件导入的方法
  2. UVA 10515 - Powers Et Al.(数论)
  3. js rsa解密中文乱码_建议收藏 | 最全的 JS 逆向入门教程合集
  4. hust1342(流量有上下界的最小流)
  5. 面试:MySQL 架构
  6. 用gSOAP开发Web Service程序
  7. MyBatis-面试题
  8. 69. 二叉树的层次遍历Python实现
  9. Kong API Gateway 配置文件详解
  10. linux下备份msyql数据库
  11. pywin32下载python3.6_pywin32 py3.7下载
  12. Java旅游管理系统本科生毕业设计任务书
  13. Apowersoft ApowerMirror v1.4.5 终身商业授权破解版 安卓/iPhone投屏控制软件
  14. python网络爬虫网易云音乐guihub_GitHub - GreatV/CloudMusic-Crawler: 网易云音乐爬虫,数据可视化。...
  15. pdf怎么转图片,可得到高清图
  16. 快速幂算法(qwe)
  17. python将一句话重复n次输出_在Python中创建单项重复n次的列表
  18. gradient设置上下渐变_CSS3中渐变gradient详解
  19. ESP32学习笔记(2)——GPIO接口使用
  20. 测向交叉定位matlab,一种三维多站测向交叉定位算法

热门文章

  1. 2022-2028年中国PPS树脂产业研究及前瞻分析报告
  2. Python 多线程总结(1)- thread 模块
  3. 在装有Ubuntu16.04的VMware虚拟机下安装OpenCV3.2.0
  4. 手撸一个JdbcTemplate,带你了解其原理
  5. 微信支付回调重复通知,正确的响应
  6. Yolov3网络架构分析
  7. 华为+长安研发芯片?长安蔚来更名“阿维塔科技”
  8. 端到端TVM编译器(下)
  9. Tengine Framework基础
  10. TensorRT 基于Yolov3的开发