2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析
目录
案例三:电影评分数据分析
代码实现
Shuffle分区数
案例三:电影评分数据分析
使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明:
对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于200)。
数据格式如下,每行数据各个字段之间使用双冒号分开:
数据处理分析步骤如下:
- 第一步、读取电影评分数据,从本地文件系统读取
- 第二步、转换数据,指定Schema信息,封装到DataFrame
- 第三步、基于SQL方式分析
- 第四步、基于DSL方式分析
代码实现
电影评分数据分析,经过数据ETL、数据分析(SQL分析和DSL分析)及最终保存结果,整套数据处理分析流程,其中涉及到很多数据细节,完整代码如下:
package cn.itcast.sqlimport java.util.Propertiesimport org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.storage.StorageLevel/*** 需求:对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)*/
object SparkTop10Movie {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// TODO: 设置shuffle时分区数目.config("spark.sql.shuffle.partitions", "4").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._// 1. 读取电影评分数据,从本地文件系统读取val rawRatingsDS: Dataset[String] = spark.read.textFile("data/input/rating_100k.data")// 2. 转换数据val ratingsDF: DataFrame = rawRatingsDS// 过滤数据.filter(line => null != line && line.trim.split("\t").length == 4)// 提取转换数据.mapPartitions{iter =>iter.map{line =>// 按照分割符分割,拆箱到变量中val Array(userId, movieId, rating, timestamp) = line.trim.split("\t")// 返回四元组(userId, movieId, rating.toDouble, timestamp.toLong)}}// 指定列名添加Schema.toDF("userId", "movieId", "rating", "timestamp")/*root|-- userId: string (nullable = true)|-- movieId: string (nullable = true)|-- rating: double (nullable = false)|-- timestamp: long (nullable = false)*/ratingsDF.printSchema()/*+------+-------+------+---------+|userId|movieId|rating|timestamp|+------+-------+------+---------+| 1| 1193| 5.0|978300760|| 1| 661| 3.0|978302109|| 1| 594| 4.0|978302268|| 1| 919| 4.0|978301368|+------+-------+------+---------+*/ratingsDF.show(4)// TODO: 基于SQL方式分析// 第一步、注册DataFrame为临时视图ratingsDF.createOrReplaceTempView("view_temp_ratings")// 第二步、编写SQLval top10MovieDF: DataFrame = spark.sql("""|SELECT| movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating|FROM| view_temp_ratings|GROUP BY| movieId|HAVING| cnt_rating > 200|ORDER BY| avg_rating DESC, cnt_rating DESC|LIMIT| 10""".stripMargin)//top10MovieDF.printSchema()top10MovieDF.show(10, truncate = false)println("===============================================================")// TODO: 基于DSL=Domain Special Language(特定领域语言) 分析import org.apache.spark.sql.functions._val resultDF: DataFrame = ratingsDF// 选取字段.select($"movieId", $"rating")// 分组:按照电影ID,获取平均评分和评分次数.groupBy($"movieId").agg(round(avg($"rating"), 2).as("avg_rating"),count($"movieId").as("cnt_rating"))// 过滤:评分次数大于200.filter($"cnt_rating" > 200)// 排序:先按照评分降序,再按照次数降序.orderBy($"avg_rating".desc, $"cnt_rating".desc)// 获取前10.limit(10)//resultDF.printSchema()resultDF.show(10)/*// TODO: 将分析的结果数据保存MySQL数据库和CSV文件// 结果DataFrame被使用多次,缓存resultDF.persist(StorageLevel.MEMORY_AND_DISK)// 1. 保存MySQL数据库表汇总resultDF.coalesce(1).write.mode("overwrite").option("driver", "com.mysql.jdbc.Driver").option("user", "root").option("password", "root").jdbc("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","top10_movies",new Properties())// 2. 保存CSV文件:每行数据中个字段之间使用逗号隔开resultDF.coalesce(1).write.mode("overwrite").csv("data/output/top10-movies")// 释放缓存数据resultDF.unpersist()*/spark.stop()}
}
Shuffle分区数
运行上述程序时,查看WEB UI监控页面发现,某个Stage中有200个Task任务,也就是说RDD有200分区Partition。
原因:在SparkSQL中当Job中产生Shuffle时,默认的分区数(spark.sql.shuffle.partitions )为200,在实际项目中要合理的设置。可以在构建SparkSession实例对象时进行设置
val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// TODO: 设置shuffle时分区数目.config("spark.sql.shuffle.partitions", "4").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析相关推荐
- 2021年大数据Spark(十八):Spark Core的RDD Checkpoint
目录 RDD Checkpoint 引入 API 代码演示 总结:持久化和Checkpoint的区别 问题: 答案: 区别: RDD Checkpoint 引入 RDD 数据可以持久化,但是持久化/缓 ...
- 大数据Spark(二十八):SparkSQL案例三电影评分数据分析
文章目录 案例三:电影评分数据分析 代码实现 Shuffle分区数 案例三:电影评分数据分析 使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程
- 2021年大数据ELK(十八):Beats 简单介绍和FileBeat工作原理
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 Beats 简单介绍和FileBeat工作原理 一.Beats 二.FileB ...
- 客快物流大数据项目(二十八):大数据服务器环境准备
目录 大数据服务器环境准备 一.服务器规划 二.Linux虚拟机环境搭建
- 2021年大数据Spark(十二):Spark Core的RDD详解
目录 RDD详解 为什么需要RDD? 什么是RDD? RDD的5大特性 第一个:A list of partitions 第二个:A function for computing each split ...
- 2021年大数据Spark(十九):Spark Core的共享变量
目录 共享变量 广播变量 累加器 案例演示 共享变量 在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副 ...
- 2021年大数据Spark(十五):Spark Core的RDD常用算子
目录 常用算子 基本算子 分区操作函数算子 重分区函数算子 1).增加分区函数 2).减少分区函数 3).调整分区函数 聚合函数算子 Scala集合中的聚合函数 ...
- 2021年大数据Spark(十四):Spark Core的RDD操作
目录 RDD的操作 函数(算子)分类 Transformation函数 Action函数 RDD的操作 有一定开发经验的读者应该都使用过多线程,利用多核 CPU 的并行能力来加快运算速率 ...
- 2021年大数据Spark(十六):Spark Core的RDD算子练习
目录 RDD算子练习 map 算子 filter 算子 flatMap 算子 交集.并集.差集.笛卡尔积 distinct 算子 first.take.top 算子 ...
最新文章
- CocoaPods导入的库其头文件导入的方法
- UVA 10515 - Powers Et Al.(数论)
- js rsa解密中文乱码_建议收藏 | 最全的 JS 逆向入门教程合集
- hust1342(流量有上下界的最小流)
- 面试:MySQL 架构
- 用gSOAP开发Web Service程序
- MyBatis-面试题
- 69. 二叉树的层次遍历Python实现
- Kong API Gateway 配置文件详解
- linux下备份msyql数据库
- pywin32下载python3.6_pywin32 py3.7下载
- Java旅游管理系统本科生毕业设计任务书
- Apowersoft ApowerMirror v1.4.5 终身商业授权破解版 安卓/iPhone投屏控制软件
- python网络爬虫网易云音乐guihub_GitHub - GreatV/CloudMusic-Crawler: 网易云音乐爬虫,数据可视化。...
- pdf怎么转图片,可得到高清图
- 快速幂算法(qwe)
- python将一句话重复n次输出_在Python中创建单项重复n次的列表
- gradient设置上下渐变_CSS3中渐变gradient详解
- ESP32学习笔记(2)——GPIO接口使用
- 测向交叉定位matlab,一种三维多站测向交叉定位算法