目录

  • 1 数据 ETL
  • 2 使用 SQL 分析
  • 3 使用 DSL 分析
  • 4 保存结果数据
  • 5 案例完整代码
  • 6 Shuffle 分区数目问题

1 数据 ETL

使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明:对电影评分数据进行统分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)。数据集ratings.dat总共100万条数据,数据格式如下每行数据各个字段之间使用双冒号分开:

数据集地址:https://grouplens.org/datasets/movielens/


数据处理分析步骤如下:

1. 第一步、读取电影评分数据,从本地文件系统读取
2. 第二步、转换数据,指定Schema信息,封装到DataFrame
3. 第三步、基于SQL方式分析
4. 第四步、基于DSL方式分析

读取电影评分数据,将其转换为DataFrame,使用指定列名方式定义Schema信息,代码如下:

// 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder().master("local[4]").appName(this.getClass.getSimpleName.stripSuffix("$")).getOrCreate()
// 导入隐式转换import spark.implicits._// 1. 读取电影评分数据,从本地文件系统读取
val rawRatingsDS: Dataset[String] = spark.read.textFile("datas/ml-1m/ratings.dat")
// 2. 转换数据
val ratingsDF: DataFrame = rawRatingsDS// 过滤数据..filter(line => null != line && line.trim.split("::").length == 4)// 提取转换数据.mapPartitions { iter =>iter.map { line =>// 按照分割符分割,拆箱到变量中val Array(userId, movieId, rating, timestamp) = line.trim.split("::")// 返回四元组(userId, movieId, rating.toDouble, timestamp.toLong)}}// 指定列名添加Schema.toDF("userId", "movieId", "rating", "timestamp")
/*
root
|-- userId: string (nullable = true)
|-- movieId: string (nullable = true)
|-- rating: double (nullable = false)
|-- timestamp: long (nullable = false)
*/
//ratingsDF.printSchema()
/*
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
| 1| 1193| 5.0|978300760|
| 1| 661| 3.0|978302109|
| 1| 594| 4.0|978302268|
| 1| 919| 4.0|978301368|
+------+-------+------+---------+
*/
//ratingsDF.show(4)

2 使用 SQL 分析

首先将DataFrame注册为临时视图,再编写SQL语句,最后使用SparkSession执行,代码如下:

// TODO: 基于SQL方式分析
// 第一步、注册DataFrame为临时视图
ratingsDF.createOrReplaceTempView("view_temp_ratings")
// 第二步、编写SQL
val top10MovieDF: DataFrame = spark.sql("""|SELECT| movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating|FROM| view_temp_ratings|GROUP BY| movieId|HAVING| cnt_rating > 2000|ORDER BY| avg_rating DESC, cnt_rating DESC|LIMIT| 10
""".stripMargin)
//top10MovieDF.printSchema()
top10MovieDF.show(10, truncate = false)

应用scala的stripMargin方法,在scala中stripMargin默认是“|”作为出来连接符,在多行换行的行头前面加一个“|”符号即可。

代码实例:

val speech = “”"abc

|def"“”.stripMargin

运行的结果为:

abc

ldef

运行程序结果如下:

3 使用 DSL 分析

调用Dataset中函数,采用链式编程分析数据,核心代码如下:

// TODO: 基于DSL=Domain Special Language(特定领域语言) 分析import org.apache.spark.sql.functions._val resultDF: DataFrame = ratingsDF// 选取字段.select($"movieId", $"rating")// 分组:按照电影ID,获取平均评分和评分次数.groupBy($"movieId").agg( //round(avg($"rating"), 2).as("avg_rating"), //count($"movieId").as("cnt_rating") //)// 过滤:评分次数大于2000.filter($"cnt_rating" > 2000)// 排序:先按照评分降序,再按照次数降序.orderBy($"avg_rating".desc, $"cnt_rating".desc)// 获取前10.limit(10)
//resultDF.printSchema()
resultDF.show(10)

Round函数返回一个数值,该数值是按照指定的小数位数进行四舍五入运算的结果。除数值外,也可对日期进行舍入运算。
round(3.19, 1) 将 3.19 四舍五入到一个小数位 (3.2)
round(2.649, 1) 将 2.649 四舍五入到一个小数位 (2.6)
round(-5.574, 2) 将 -5.574 四舍五入到两小数位 (-5.57)

其中使用SparkSQL中自带函数库functions,在org.apache.spark.sql.functions中,包含常用函
数,有些与Hive中函数库类似,但是名称不一样。

使用需要导入函数库:import org.apache.spark.sql.functions._

4 保存结果数据

将分析结果数据保存到外部存储系统中,比如保存到MySQL数据库表中或者CSV文件中。

// TODO: 将分析的结果数据保存MySQL数据库和CSV文件
// 结果DataFrame被使用多次,缓存
resultDF.persist(StorageLevel.MEMORY_AND_DISK)
// 1. 保存MySQL数据库表汇总
resultDF.coalesce(1) // 考虑降低分区数目.write.mode("overwrite").option("driver", "com.mysql.cj.jdbc.Driver").option("user", "root").option("password", "123456").jdbc("jdbc:mysql://node1.oldlu.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode = true","db_test.tb_top10_movies",new Properties ())// 2. 保存CSV文件:每行数据中个字段之间使用逗号隔开resultDF.coalesce (1).write.mode ("overwrite").csv ("datas/top10-movies")// 释放缓存数据resultDF.unpersist ()

查看数据库中结果表的数据:

5 案例完整代码

电影评分数据分析,经过数据ETL、数据分析(SQL分析和DSL分析)及最终保存结果,整套
数据处理分析流程,其中涉及到很多数据细节,完整代码如下

import java.util.Properties
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.storage.StorageLevel/*** 需求:对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)*/
object SparkTop10Movie {def main(args: Array[String]): Unit = {// 构建SparkSession实例对象val spark: SparkSession = SparkSession.builder().master("local[4]").appName(this.getClass.getSimpleName.stripSuffix("$"))// TODO: 设置shuffle时分区数目.config("spark.sql.shuffle.partitions", "4").getOrCreate()// 导入隐式转换import spark.implicits._// 1. 读取电影评分数据,从本地文件系统读取val rawRatingsDS: Dataset[String] = spark.read.textFile("datas/ml-1m/ratings.dat")// 2. 转换数据val ratingsDF: DataFrame = rawRatingsDS// 过滤数据.filter(line => null != line && line.trim.split("::").length == 4)// 提取转换数据.mapPartitions { iter =>iter.map { line =>// 按照分割符分割,拆箱到变量中val Array(userId, movieId, rating, timestamp) = line.trim.split("::")// 返回四元组(userId, movieId, rating.toDouble, timestamp.toLong)}}// 指定列名添加Schema.toDF("userId", "movieId", "rating", "timestamp")/*root|-- userId: string (nullable = true)|-- movieId: string (nullable = true)|-- rating: double (nullable = false)|-- timestamp: long (nullable = false)*///ratingsDF.printSchema()/*+------+-------+------+---------+|userId|movieId|rating|timestamp|+------+-------+------+---------+| 1| 1193| 5.0|978300760|| 1| 661| 3.0|978302109|| 1| 594| 4.0|978302268|| 1| 919| 4.0|978301368|+------+-------+------+---------+*///ratingsDF.show(4)// TODO: 基于SQL方式分析// 第一步、注册DataFrame为临时视图ratingsDF.createOrReplaceTempView("view_temp_ratings")// 第二步、编写SQLval top10MovieDF: DataFrame = spark.sql("""|SELECT| movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating|FROM| view_temp_ratings|GROUP BY| movieId|HAVING| cnt_rating > 2000|ORDER BY| avg_rating DESC, cnt_rating DESC|LIMIT| 10
""".stripMargin)//top10MovieDF.printSchema()top10MovieDF.show(10, truncate = false)println("===============================================================")// TODO: 基于DSL=Domain Special Language(特定领域语言) 分析import org.apache.spark.sql.functions._val resultDF: DataFrame = ratingsDF// 选取字段.select($"movieId", $"rating")// 分组:按照电影ID,获取平均评分和评分次数.groupBy($"movieId").agg( //round(avg($"rating"), 2).as("avg_rating"), //count($"movieId").as("cnt_rating") //)// 过滤:评分次数大于2000.filter($"cnt_rating" > 2000)// 排序:先按照评分降序,再按照次数降序.orderBy($"avg_rating".desc, $"cnt_rating".desc)// 获取前10.limit(10)//resultDF.printSchema()resultDF.show(10)// TODO: 将分析的结果数据保存MySQL数据库和CSV文件// 结果DataFrame被使用多次,缓存resultDF.persist(StorageLevel.MEMORY_AND_DISK)// 1. 保存MySQL数据库表汇总resultDF.coalesce(1) // 考虑降低分区数目.write.mode("overwrite").option("driver", "com.mysql.cj.jdbc.Driver").option("user", "root").option("password", "123456").jdbc("jdbc:mysql://node1.oldlu.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode = true","db_test.tb_top10_movies",new Properties ())// 2. 保存CSV文件:每行数据中个字段之间使用逗号隔开resultDF.coalesce (1).write.mode ("overwrite").csv ("datas/top10-movies")// 释放缓存数据resultDF.unpersist ()// 应用结束,关闭资源Thread.sleep (10000000)spark.stop ()}}

6 Shuffle 分区数目问题

运行上述程序时,查看WEB UI监控页面发现,某个Stage中有200个Task任务,也就是说RDD有200分区Partition。
原因:在SparkSQL中当Job中产生Shuffle时,默认的分区数(spark.sql.shuffle.partitions )为
200,在实际项目中要合理的设置。在构建SparkSession实例对象时,设置参数的值:

// 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.master("local[4]")
.appName(this.getClass.getSimpleName.stripSuffix("$"))
// TODO: 设置shuffle时分区数目
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
// 导入隐式转换
import spark.implicits._

大数据Spark电影评分数据分析相关推荐

  1. Spark综合练习——电影评分数据分析

    我正在参加年度博客之星评选,请大家帮我投票打分,您的每一分都是对我的支持与鼓励. 2021年「博客之星」参赛博主:Maynor大数据 (感谢礼品.红包免费送!) https://bbs.csdn.ne ...

  2. 2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析

    目录 案例三:电影评分数据分析 代码实现 Shuffle分区数 案例三:电影评分数据分析 使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明: 对 ...

  3. 大数据Spark(二十八):SparkSQL案例三电影评分数据分析

    文章目录 案例三:电影评分数据分析 代码实现 Shuffle分区数 案例三:电影评分数据分析 使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程

  4. 大数据Spark(python版)

    大数据 大数据,Spark,Hadoop,python,pyspark 大数据Spark(python版) 前言(环境说明): 1.下载和安装 1)安装java JDK 2)安装Hadoop(伪分布式 ...

  5. 2016年大数据Spark“蘑菇云”行动代码学习之AdClickedStreamingStats模块分析

    2016年大数据Spark"蘑菇云"行动代码学习之AdClickedStreamingStats模块分析     系统背景:用户使用终端设备(IPAD.手机.浏览器)等登录系统,系 ...

  6. 光环大数据spark文档_推荐大数据Spark必读书目

    我有一个非常要好的同事,无数次帮我解决了业务上的痛.技术能力很强,业务方面也精通.而且更耐得住加班,并且是自愿加班,毫无怨言.不像我,6点到准时走人了.但就是这么一位兢兢业业的技术人,却一直没有升职加 ...

  7. 大数据|Spark技术在京东智能供应链预测的应用案例深度剖析(一)

    大数据|Spark技术在京东智能供应链预测的应用案例深度剖析(一) 2017-03-27 11:58  浏览次数:148 1. 背景 前段时间京东公开了面向第二个十二年的战略规划,表示京东将全面走向技 ...

  8. 推荐大数据Spark必读书目

    点击蓝色"有关SQL"关注我哟 加个"星标",天天与10000人一起快乐成长 我有一个非常要好的同事,无数次帮我解决了业务上的痛.技术能力很强,业务方面也精通. ...

  9. 大数据Spark超经典视频链接全集

    论坛贴吧等信息发布参考模板 Scala.Spark史上最全面.最详细.最彻底的一整套视频全集(特别是机器学习.Spark Core解密.Spark性能优化.Spark面试宝典.Spark项目案例等). ...

最新文章

  1. 和12岁小同志搞创客开发:如何驱动各类型传感器?
  2. 全国计算机等级考试第3套,全国计算机等级考试四级计算机网络第3套试题
  3. 简明 Git 命令速查表
  4. 在Windows 2003中集成RAID卡驱动
  5. 7-4 求链式线性表的倒数第K项(最佳解法)(List容器)
  6. Win11系统如何刷新按钮
  7. Android学习之自定义标题栏
  8. 标贝科技推出「留声机」TTS方案,高还原、个性化声效提升交互意愿
  9. 使用Websocket框架之GatewayWorker开发电商平台买家与卖家实时通讯
  10. Puppet 笔记 package file services
  11. Fortran 95简单教程(二)
  12. 看图识物_‎App Store 上的“认识动物-看图识物大全、学英语”
  13. html表格突出显示,java-如何突出显示jtable单元格的文本?
  14. 200行Html5+CSS3+JS代码实现动态圣诞树
  15. 【SLAM】ORB-SLAM3解析——帧Frame()的构建(2)
  16. Android Studio 3.0~3.x正式版填坑之路
  17. python做股票系统_GitHub - jiuweng/stock: stock,股票系统。使用python进行开发。
  18. cam350 不能打开光绘文件_CAM350导入Gerber文件出错的原因
  19. 虚拟机VM16版本提示开启虚拟化,进入Bios发现是开启的
  20. 【Qt】Qt MQTT文档

热门文章

  1. 微信小程序 —— 倒计时(展示时钟)
  2. 又一起“删库”:链家程序员怒删公司 9TB 数据,被判 7 年!
  3. 更好的确保企业数据安全,华为云数据灾备解决方案更便捷
  4. 杨辉三角金字塔c语言编程,scratch编程绘制数字金字塔(杨辉三角)
  5. argo workflow 部署
  6. 淘宝/天猫API:img2text-图片识别商品接口
  7. 苏州优步车主之家司机端下载
  8. 使用 FFmpeg 开发播放器基础--使用 ffmpeg 解码视频文件
  9. MATLAB中安装YALMIP及CPLEX详细步骤
  10. 美国泛达网络:新一代通用型数据中心机柜