为了学习spark,在实验楼上找到的一个spark入门课程,在此记录一下学习过程。

我使用的Spark版本为Spark 2.2.0, 实验楼教程使用的是Spark 1.6.1

流程和算法介绍

这个简单的电影推荐系统是根据已有用户对电影的评价系统,针对特定用户输出其可能会感兴趣的电影,构成一个简单的电影推荐系统。

主要步骤

加载数据集,解析成特定格式

划分数据集,分为训练集和测试集

利用交替最小二乘法(ALS)算法,训练用户与电影之间的矩阵模型

基于训练集进行预测,利用测试集来验证预测结果是否有效。

实际上,上述步骤的第三四步是使用了协同过滤算法来推荐电影。

引用知乎上的回答解释协同过滤

举个简单的小例子

我们已知道用户u1喜欢的电影是A,B,C

用户u2喜欢的电影是A, C, E, F

用户u3喜欢的电影是B,D

我们需要解决的问题是:决定对u1是不是应该推荐F这部电影。

基于内容的做法:要分析F的特征和u1所喜欢的A、B、C的特征,需要知道的信息是A(战争片),B(战争片),C(剧情片),如果F(战争片),那么F很大程度上可以推荐给u1,这是基于内容的做法,你需要对item进行特征建立和建模。

协同过滤的办法:那么你完全可以忽略item的建模,因为这种办法的决策是依赖user和item之间的关系,也就是这里的用户和电影之间的关系。我们不再需要知道ABCF哪些是战争片,哪些是剧情片,我们只需要知道用户u1和u2按照item向量表示,他们的相似度比较高,那么我们可以把u2所喜欢的F这部影片推荐给u1。

在Spark MLlib中,协同过滤算法是通过交替最小二乘法(ALS)实现的,具体算法实现在此并不关注。

数据集

数据集来自GroupLens,是一个名为MovieLens的数据集的数据,在此处选择数据量为一百万条的数据集,下载地址

具体代码和分析

1.导入包

我们需要导入以下包

import org.apache.spark.rdd._

import org.apache.spark.sql._

import org.apache.spark.mllib.recommendation.Rating

import org.apache.spark.mllib.recommendation.ALS

import org.apache.spark.mllib.recommendation.MatrixFactorizationModel

mllib包是Spark中的机器学习包,我们这次导入的有ALS,MatrixFactorizationModel,Rating。ALS即为上文提到的交替最小二乘算法,在Spark中ALS算法的返回结果为MatrixFactorizationModel类,最后的Rating是Spark定义的评价Model,对应于我们数据中的Rating.dat中的内容,不用用户再自行定义

然后,我们还需要导入implicits包,这个是Spark中的隐式转换包,可以自动地对一些数据类型进行转换,但是这个包需要在代码中动态导入

val spark = SparkSession.builder.master("local").appName("Predict").getOrCreate()

import spark.implicits._

其中spark为SparkSession类,在Spark 2.2.0中用来代替SparkContext,作为整个程序的入口点

2.数据处理

定义电影、用户数据实体类,用来映射对应的数据

case class Movie(movieId: Int, title: String)

case class User(userId: Int, gender: String, age: Int, occupation: Int, zipCode: String)

定义解析函数,将数据从文件中解析出来

def parseMovieData(data: String): Movie = {

val dataField = data.split("::")

assert(dataField.size == 3)

Movie(dataField(0).toInt, dataField(1))

}

def parseUserData(data: String): User = {

val dataField = data.split("::")

assert(dataField.size == 5)

User(dataField(0).toInt, dataField(1).toString, dataField(2).toInt, dataField(3).toInt, dataField(4).toString)

}

def parseRatingData(data: String): Rating = {

val dataField = data.split("::")

Rating(dataField(0).toInt, dataField(1).toInt, dataField(2).toDouble)

}

导入数据

var moviesData = spark.read.textFile("File:///home/hadoop/ml-1m/movies.dat").map(parseMovieData).cache()

var usersData = spark.read.textFile("File:///home/hadoop/ml-1m/users.dat").map(parseUserData).cache()

var ratingsData = spark.read.textFile("File:///home/hadoop/ml-1m/ratings.dat").map(parseRatingData).cache()

3. 训练模型

// convert to DataFrame

val moviesDF = moviesData.toDF()

val usersDF = usersData.toDF()

val ratingsDF = ratingsData.toDF()

// split to data set and test set

val tempPartitions = ratingsData.randomSplit(Array(0.7, 0.3), 1024L)

val trainingSetOfRatingsData = tempPartitions(0).cache().rdd

val testSetOfRatingData = tempPartitions(1).cache().rdd

// training model

val recomModel = new ALS().setRank(20).setIterations(10).run(trainingSetOfRatingsData)

按7:3的比例将数据集分为训练集和验证集,由于划分出来的数据集为DataSet类型,而ALS算法的run函数接收的参数为RDD类型,所以需要将DataSet转换为RDD,方法很简单,就加上”.rdd"就可以了,如果不转换会报错

spark_error_5.PNG

训练完之后可以调用模型进行推荐,比如要给用户ID为1000的用户推荐适合TA看的10部电影,就可以执行

val recomResult = recomModel.recommendProducts(1000, 10)

结果如下

运行结果

返回的结果包括用户ID,电影ID,和对应的相关性

如果我们要显示电影名,可以执行以下代码

val movieTitles = moviesDF.as[(Int, String)].rdd.collectAsMap()

val recommendMoviesWithTitle = recomResult.map(rating =>(movieTitles(rating.product), rating.rating))

println(recommendMoviesWithTitle.mkString("\n"))

在Spark老版本中,可以直接使用

val movieTitles = moviesDF.map(array => (array(0), array(1))).collectAsMap()

将moviesDF转换为key为电影ID,value为电影名的map,但是在2.2.0中,如果这样写会提示DataSet没有collectAsMap()方法,错误截图如下

错误截图

经过一番搜索后,在StackOverflow上有人提到RDD有collectAsMap()方法,于是就要将moviesDF转换为RDD类型,即上文用到的方法

打印出来的结果如图

转换结果

4.验证模型

如何知道模型是否正确呢?可以用之前从数据集里面划分出来的验证集,通过调用模型得出预测结果,与验证集中的原数据进行对比,可以判断模型的效果如何

val predictResultOfTestSet = recomModel.predict(testSetOfRatingData.map{

case Rating(user, product, rating) => (user, product)

})

val formatResultOfTestSet = testSetOfRatingData.map{

case Rating(user, product, rating) => ((user, product), rating)

}

val formatResultOfPredictionResult = predictResultOfTestSet.map {

case Rating(user, product, rating) => ((user, product), rating)

}

val finalResultForComparison = formatResultOfPredictionResult.join(formatResultOfTestSet)

val MAE = finalResultForComparison.map {

case ((user, product), (ratingOfTest, ratingOfPrediction)) =>

val error = (ratingOfTest - ratingOfPrediction)

Math.abs(error)

}.mean()

在得到测试集的预测评分结果之后,我们用 map 操作和 join 操作将它与测试集的原始数据组合成为 ((用户ID, 电影ID), (测试集原有评分, 预测评分))的格式。这个格式是 Key-Value 形式的,Key 为 (user, product)。我们是要把这里的测试集原有评分与预测时得到的评分相比较,二者的联系就是 user 和 product 相同。

上述代码中首先调用模型进行预测,然后将在测试集上的预测结果和测试集本身的数据都转换为 ((user,product), rating) 的格式,之后将两个数据组合在一起,计算两者之间的评价的差值的绝对值,然后求平均值,这种方法叫做计算平均绝对误差

平均绝对误差( Mean Absolute Error )是所有单个观测值与算术平均值偏差的绝对值的平均。

与平均误差相比,平均绝对误差由于离差被绝对值化,不会出现正负相抵消的情况,所以平均绝对误差能更好地反映预测值误差的实际情况。

最终算出的结果为

image.png

效果还算可以,如果想继续优化可以通过增加ALS的迭代次数和特征矩阵的秩来提高准确率

完整代码

import org.apache.spark.rdd._

import org.apache.spark.sql._

import org.apache.spark.mllib.recommendation.Rating

import org.apache.spark.mllib.recommendation.ALS

import org.apache.spark.mllib.recommendation.MatrixFactorizationModel

object PredictMovie {

case class Movie(movieId: Int, title: String)

case class User(userId: Int, gender: String, age: Int, occupation: Int, zipCode: String)

def parseMovieData(data: String): Movie = {

val dataField = data.split("::")

assert(dataField.size == 3)

Movie(dataField(0).toInt, dataField(1))

}

def parseUserData(data: String): User = {

val dataField = data.split("::")

assert(dataField.size == 5)

User(dataField(0).toInt, dataField(1).toString, dataField(2).toInt, dataField(3).toInt, dataField(4).toString)

}

def parseRatingData(data: String): Rating = {

val dataField = data.split("::")

Rating(dataField(0).toInt, dataField(1).toInt, dataField(2).toDouble)

}

def main(args: Array[String]){

val spark = SparkSession.builder.master("local").appName("Predict").getOrCreate()

import spark.implicits._

var moviesData = spark.read.textFile("File:///home/hadoop/ml-1m/movies.dat").map(parseMovieData _).cache()

var usersData = spark.read.textFile("File:///home/hadoop/ml-1m/users.dat").map(parseUserData _).cache()

var ratingsData = spark.read.textFile("File:///home/hadoop/ml-1m/ratings.dat").map(parseRatingData _).cache()

// convert to DataFrame

val moviesDF = moviesData.toDF()

val usersDF = usersData.toDF()

val ratingsDF = ratingsData.toDF()

// split to data set and test set

val tempPartitions = ratingsData.randomSplit(Array(0.7, 0.3), 1024L)

val trainingSetOfRatingsData = tempPartitions(0).cache().rdd

val testSetOfRatingData = tempPartitions(1).cache().rdd

// training model

val recomModel = new ALS().setRank(20).setIterations(10).run(trainingSetOfRatingsData)

val recomResult = recomModel.recommendProducts(1000, 10)

println(s"Recommend Movie to User ID 1000")

println(recomResult.mkString("\n"))

val movieTitles = moviesDF.as[(Int, String)].rdd.collectAsMap()

val recommendMoviesWithTitle = recomResult.map(rating =>(movieTitles(rating.product), rating.rating))

println(recommendMoviesWithTitle.mkString("\n"))

val predictResultOfTestSet = recomModel.predict(testSetOfRatingData.map{

case Rating(user, product, rating) => (user, product)

})

val formatResultOfTestSet = testSetOfRatingData.map{

case Rating(user, product, rating) => ((user, product), rating)

}

val formatResultOfPredictionResult = predictResultOfTestSet.map {

case Rating(user, product, rating) => ((user, product), rating)

}

val finalResultForComparison = formatResultOfPredictionResult.join(formatResultOfTestSet)

val MAE = finalResultForComparison.map {

case ((user, product), (ratingOfTest, ratingOfPrediction)) =>

val error = (ratingOfTest - ratingOfPrediction)

Math.abs(error)

}.mean()

println(s"mean error: $MAE")

spark.stop()

}

}

电影推荐系统 python简书_【记录|Spark】简单的电影推荐系统相关推荐

  1. 电影推荐系统 python简书_基于Spark的电影推荐系统(实战简介)

    ## 写在前面 一直不知道这个专栏该如何开始写,思来想去,还是暂时把自己对这个项目的一些想法 和大家分享 的形式来展现.有什么问题,欢迎大家一起留言讨论. 这个项目的源代码是在https://gith ...

  2. 电影推荐系统 python简书_分析9000部电影|一个简单的电影推荐系统

    不知道大家平时喜不喜欢看电影来消遣时光,我是比较喜欢看电影的.对我而言,当我看完一部电影,觉得很好看的时候,我就会寻找类似这部电影的其他电影.刚好有这么一个数据集,包含了很多部的电影,于是打算对其进行 ...

  3. 电影推荐系统 python简书_基于django和协同过滤/cnn的电影推荐系统

    技术前端: bootstrap3 + vue + jquery 后端: django 2.2.1 +djangorestframework (MVC框架) 数据库: mysql 数据集: 1. 豆瓣数 ...

  4. 希尔排序python 简书_数据结构_排序_直接插入+希尔排序

    数据结构_排序_直接插入排序+希尔排序 其实主要是为了讲述希尔排序,不过插入排序是希尔排序的基础,因此先来讲直接插入排序. 一.直接插入排序 1.原理 下标 0 1 2 3 4 5 6 7 8 -- ...

  5. 遗传算法 python 简书_基于DEAP库的Python进化算法从入门到入土—(二)简单遗传算法实现...

    前言 在上一篇中,我们已经介绍了如何在DEAP中实现进化算法的基本操作,在这一篇中我们试图将各个操作组装起来,用进化算法解决一个简单的一元函数寻优问题. 进化算法实例 - 一元函数寻优 问题描述与分析 ...

  6. 遗传算法 python 简书_遗传算法入门

    遗传算法简介: 遗传算法(Genetic algorithm)属于演化计算( evolutionary computing),是随着人工智能领域发展而来的一种智能算法.正如它的名字所示,遗传算法是受达 ...

  7. 文科生学python简书_文科生Python教程(一)

    往期传送门:文均:文科生Python教程(零)​zhuanlan.zhihu.com (一)变量是什么?变量是个礼物盒! 在讲礼物盒之前,先帮张三解决他的问题. 在教程(零)中,张三想要统计<兰 ...

  8. selenium python 简书_通过python+selenium3实现浏览器刷简书文章阅读量

    准备工作 下载python,本文以python3.6为例.python3.6下载地址:python3下载地址,选择合适的版本安装.安装成功后,打开命令提示符,在其中输入python,显示如下信息,则说 ...

  9. 希尔排序python 简书_排序:希尔排序(算法)

    文 | 莫若吻 (注:如果想更好的理解希尔排序,请先看看我的上一篇博客插入排序,希望会对你有帮助.) 一.简介 希尔排序(Shell Sort)是插入排序的一种算法,是对直接插入排序的一个优化,也称缩 ...

最新文章

  1. 大型运输行业实战_day14_1_webserivce简单入门
  2. php项目webpack打包,Vue项目webpack打包部署时Tomcat刷新报404错误问题如何处理
  3. 离线轻量级大数据平台Spark之单机部署及Java开发
  4. Linux tar将分割的小文件进行合并
  5. 7系统软raid_使用图形界面来配置RAID
  6. WPF之X名称空间学习
  7. 前端js日期时间格式转换
  8. 彻底搞懂使用MyBatis时为什么Dao层不需要@Repository
  9. java商城系统设计-----积分商城系统
  10. 阿里云MVP精选2018年终盘点:大咖专访+最佳实践,丰富干货等你来!...
  11. 坦然面对:应对前端疲劳
  12. c语言 函数拟合,曲线拟合成Y=a*(X^b)+c*(X^d)函数 - 数学 - 小木虫 - 学术 科研 互动社区...
  13. python自动排课表_LeetCode 207. 课程表 | Python
  14. resnet152训练_resnet152 网络结构
  15. android教你打造独一无二的图片加载框架
  16. 获取当天年月日,及开始结束时间
  17. python turtle库输出文字_Python 海龟 turtle 画图讲解 (五):输入/输出文字及鼠标与键盘交互设计...
  18. 文成小盆友python-num14 - web 前端基础 html ,css, JavaScript
  19. JavaScript-设计模式(四) 原型模式
  20. matlab有一座小山,那里,有一座哭泣的小山作文1000字

热门文章

  1. 安卓AsyncHttpClient使用时出现的java.net.SocketTimeoutException: Read timed out报错解决方法之一
  2. android wifi软开关,rfkill 无线设备软开关
  3. mysql case when用法
  4. @Param注解的使用和解析
  5. hive 正则表达式-regexp
  6. telnet远程登录服务器端口,telnet端口号-TELNET服务的端口号是多少?
  7. 微软鼠标测试软件,微软Precision鼠标评测:Surface生产力工具最佳搭配
  8. RichEdit的用法总结
  9. 2021年伊宁三中高考成绩查询,伊宁三中2015高考榜.doc
  10. android高级UI之Paint Xfermode