2019独角兽企业重金招聘Python工程师标准>>>

前言

随着大数据时代的到来,数据当中挖取金子的工作越来越有吸引力。利用Spark在内存迭代运算、机器学习领域强悍性能的优势,使用spark处理数据挖掘问题就显得很有实际价值。这篇文章给大家分享一个spark MLlib 的推荐实战例子。我将会分享怎样用spark MLlib做一个电影评分的推荐系统。使用到的算法是user-based协同过滤。如果对Spark MLlib不太了解的,请阅读我的上一篇博客。

推荐系统的对比

应该说,自从Amazone公布了协同过滤算法后,在推荐系统领域,它就占据了很重要的地位。不像传统的内容推荐,协同过滤不需要考虑物品的属性问题,用户的行为,行业问题等,只需要建立用户与物品的关联关系即可,可以物品之间更多的内在关系,类似于经典的啤酒与尿不湿的营销案例。所以,讲到推荐必须要首先分享协同过滤。

Spark MLlib中的协同过滤

协同过滤常被应用于推荐系统。这些技术旨在补充用户-商品关联矩阵中所缺失的部分。MLlib当前支持基于模型的协同过滤,其中用户和商品通过一小组隐语义因子进行表达,并且这些因子也用于预测缺失的元素。为此,我们实现了交替最小二乘法(ALS) 来学习这些隐性语义因子。在 MLlib 中的实现有如下的参数:

numBlocks 是用于并行化计算的分块个数 (设置为-1为自动配置)。
rank 是模型中隐语义因子的个数。
iterations 是迭代的次数。
lambda 是ALS的正则化参数。
implicitPrefs 决定了是用显性反馈ALS的版本还是用适用隐性反馈数据集的版本。
alpha 是一个针对于隐性反馈 ALS 版本的参数,这个参数决定了偏好行为强度的基准

隐性反馈 vs 显性反馈

基于矩阵分解的协同过滤的标准方法一般将用户商品矩阵中的元素作为用户对商品的显性偏好。

在许多的现实生活中的很多场景中,我们常常只能接触到隐性的反馈(例如游览,点击,购买,喜欢,分享等等)在 MLlib 中所用到的处理这种数据的方法来源于文献: Collaborative Filtering for Implicit Feedback Datasets。 本质上,这个方法将数据作为二元偏好值和偏好强度的一个结合,而不是对评分矩阵直接进行建模。因此,评价就不是与用户对商品的显性评分而是和所观察到的用户偏好强度关联了起来。然后,这个模型将尝试找到隐语义因子来预估一个用户对一个商品的偏好。

目前可用的协同过滤的算法:

  • ALS

数据准备

数据准备,MoiveLens的数据集,有100k到10m的数据都有。我们这里选择100k的数据。

对下载的数据解压之后,会出现很多文件,我们需要使用u.data和u.user文件。详细的数据说明可以参见README。

u.data是用户对电影评分的数据,也是训练集。数据分别表示userId,moiveId,评分rate,时间戳。如下图所示

u.user是用户的个人信息数据,用以推荐使用,分别表示userId,age,sex,job,zip code。我们只使用userId即可。如下图所示

实现的功能

这里有10w条用户对电影的评分,从1-5分,1分表示差劲,5分表示非常好看。根据用户对电影的喜好,给用户推荐可能感兴趣的电影。

实现思路

代码实现如下:

1、加载u.data数据到rating RDD中

2、对rating RDD的数据进行分解,只需要userId,moiveId,rating

3、使用rating RDD训练ALS模型

4、使用ALS模型为u.user中的用户进行电影推荐,数据保存到HBase中

5、评估模型的均方差

代码

 
  1. package com.ml.recommender

  2. import org.apache.spark.SparkContext._

  3. import org.apache.spark.SparkConf

  4. import org.apache.spark.mllib.recommendation._

  5. import org.apache.spark.rdd.{ PairRDDFunctions, RDD }

  6. import org.apache.spark.SparkContext

  7. import scala.collection.mutable.HashMap

  8. import java.util.List

  9. import java.util.ArrayList

  10. import scopt.OptionParser

  11. import com.ml.util.HbaseUtil

  12. /**

  13. * moivelens 电影推荐

  14. *

  15. */

  16. object MoiveRecommender {

  17. val numRecommender = 10

  18. case class Params(

  19. input: String = null,

  20. numIterations: Int = 20,

  21. lambda: Double = 1.0,

  22. rank: Int = 10,

  23. numUserBlocks: Int = -1,

  24. numProductBlocks: Int = -1,

  25. implicitPrefs: Boolean = false,

  26. userDataInput: String = null)

  27. def main(args: Array[String]) {

  28. val defaultParams = Params()

  29. val parser = new OptionParser[Params]("MoiveRecommender") {

  30. head("MoiveRecommender: an example app for ALS on MovieLens data.")

  31. opt[Int]("rank")

  32. .text(s"rank, default: ${defaultParams.rank}}")

  33. .action((x, c) => c.copy(rank = x))

  34. opt[Int]("numIterations")

  35. .text(s"number of iterations, default: ${defaultParams.numIterations}")

  36. .action((x, c) => c.copy(numIterations = x))

  37. opt[Double]("lambda")

  38. .text(s"lambda (smoothing constant), default: ${defaultParams.lambda}")

  39. .action((x, c) => c.copy(lambda = x))

  40. opt[Int]("numUserBlocks")

  41. .text(s"number of user blocks, default: ${defaultParams.numUserBlocks} (auto)")

  42. .action((x, c) => c.copy(numUserBlocks = x))

  43. opt[Int]("numProductBlocks")

  44. .text(s"number of product blocks, default: ${defaultParams.numProductBlocks} (auto)")

  45. .action((x, c) => c.copy(numProductBlocks = x))

  46. opt[Unit]("implicitPrefs")

  47. .text("use implicit preference")

  48. .action((_, c) => c.copy(implicitPrefs = true))

  49. opt[String]("userDataInput")

  50. .required()

  51. .text("use data input path")

  52. .action((x, c) => c.copy(userDataInput = x))

  53. arg[String]("<input>")

  54. .required()

  55. .text("input paths to a MovieLens dataset of ratings")

  56. .action((x, c) => c.copy(input = x))

  57. note(

  58. """

  59. |For example, the following command runs this app on a synthetic dataset:

  60. |

  61. | bin/spark-submit --class com.zachary.ml.MoiveRecommender \

  62. | examples/target/scala-*/spark-examples-*.jar \

  63. | --rank 5 --numIterations 20 --lambda 1.0 \

  64. | data/mllib/u.data

  65. """.stripMargin)

  66. }

  67. parser.parse(args, defaultParams).map { params =>

  68. run(params)

  69. } getOrElse {

  70. System.exit(1)

  71. }

  72. }

  73. def run(params: Params) {

  74. //本地运行模式,读取本地的spark主目录

  75. var conf = new SparkConf().setAppName("Moive Recommendation")

  76. .setSparkHome("D:\\work\\hadoop_lib\\spark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4")

  77. conf.setMaster("local[*]")

  78. //集群运行模式,读取spark集群的环境变量

  79. //var conf = new SparkConf().setAppName("Moive Recommendation")

  80. val context = new SparkContext(conf)

  81. //加载数据

  82. val data = context.textFile(params.input)

  83. /**

  84. * *MovieLens ratings are on a scale of 1-5:

  85. * 5: Must see

  86. * 4: Will enjoy

  87. * 3: It's okay

  88. * 2: Fairly bad

  89. * 1: Awful

  90. */

  91. val ratings = data.map(_.split("\t") match {

  92. case Array(user, item, rate, time) => Rating(user.toInt, item.toInt, rate.toDouble)

  93. })

  94. //使用ALS建立推荐模型

  95. //也可以使用简单模式 val model = ALS.train(ratings, ranking, numIterations)

  96. val model = new ALS()

  97. .setRank(params.rank)

  98. .setIterations(params.numIterations)

  99. .setLambda(params.lambda)

  100. .setImplicitPrefs(params.implicitPrefs)

  101. .setUserBlocks(params.numUserBlocks)

  102. .setProductBlocks(params.numProductBlocks)

  103. .run(ratings)

  104. predictMoive(params, context, model)

  105. evaluateMode(ratings, model)

  106. //clean up

  107. context.stop()

  108. }

  109. /**

  110. * 模型评估

  111. */

  112. private def evaluateMode(ratings: RDD[Rating], model: MatrixFactorizationModel) {

  113. //使用训练数据训练模型

  114. val usersProducets = ratings.map(r => r match {

  115. case Rating(user, product, rate) => (user, product)

  116. })

  117. //预测数据

  118. val predictions = model.predict(usersProducets).map(u => u match {

  119. case Rating(user, product, rate) => ((user, product), rate)

  120. })

  121. //将真实分数与预测分数进行合并

  122. val ratesAndPreds = ratings.map(r => r match {

  123. case Rating(user, product, rate) =>

  124. ((user, product), rate)

  125. }).join(predictions)

  126. //计算均方差

  127. val MSE = ratesAndPreds.map(r => r match {

  128. case ((user, product), (r1, r2)) =>

  129. var err = (r1 - r2)

  130. err * err

  131. }).mean()

  132. //打印出均方差值

  133. println("Mean Squared Error = " + MSE)

  134. }

  135. /**

  136. * 预测数据并保存到HBase中

  137. */

  138. private def predictMoive(params: Params, context: SparkContext, model: MatrixFactorizationModel) {

  139. var recommenders = new ArrayList[java.util.Map[String, String]]();

  140. //读取需要进行电影推荐的用户数据

  141. val userData = context.textFile(params.userDataInput)

  142. userData.map(_.split("\\|") match {

  143. case Array(id, age, sex, job, x) => (id)

  144. }).collect().foreach(id => {

  145. //为用户推荐电影

  146. var rs = model.recommendProducts(id.toInt, numRecommender)

  147. var value = ""

  148. var key = 0

  149. //保存推荐数据到hbase中

  150. rs.foreach(r => {

  151. key = r.user

  152. value = value + r.product + ":" + r.rating + ","

  153. })

  154. //成功,则封装put对象,等待插入到Hbase中

  155. if (!value.equals("")) {

  156. var put = new java.util.HashMap[String, String]()

  157. put.put("rowKey", key.toString)

  158. put.put("t:info", value)

  159. recommenders.add(put)

  160. }

  161. })

  162. //保存到到HBase的[recommender]表中

  163. //recommenders是返回的java的ArrayList,可以自己用Java或者Scala写HBase的操作工具类,这里我就不给出具体的代码了,应该可以很快的写出

  164. HbaseUtil.saveListMap("recommender", recommenders)

  165. }

  166. }

运行

1、在scala IDE(或者eclipse安装scala插件)运行:

设置工程名,main类等

设置运行参数

--rank 10 --numIterations 40 --lambda 0.01 --userDataInput D:\\ml_data\\data_col\\ml-100k\\ml-100k\\u.user D:\\ml_data\\data_col\\ml-100k\\ml-100k\\u.data

2、在集群中运行如下:

/bin/spark-submit --jars hbase-client-0.98.0.2.1.5.0-695-hadoop2.jar,hbase-common-0.98.0.2.1.5.0-695-hadoop2.jar,hbase-protocol-0.98.0.2.1.5.0-695-hadoop2.jar,htrace-core-2.04.jar,protobuf-java-2.5.0.jar --master yarn-cluster --class com.ml.recommender.MoiveRecommender moive.jar

--rank 10 --numIterations 40 --lambda 0.01 --userDataInput hdfs:/spark_test/u.user hdfs:/spark_test/u.data

注意:

--jars表示项目需要的依赖包

moive.jar表示项目打包的名称

运行结果

均方差如下所示

HBase中推荐数据如下所示

比如 939 用户的推荐电影(格式 moivedID:rating):516:7.574462241760971,1056:6.979575106203245,1278:6.918614235693566,1268:6.914693317049802,1169:6.881813878580957,1316:6.681612000425281,564:6.622223206958775,909:6.597412586878512,51:6.539969654136097,1385:6.503960660826889

优化

1、可以调整这些参数,不断优化结果,使均方差变小。比如iterations越多,lambda较小,均方差会较小,推荐结果较优

numBlocks 是用于并行化计算的分块个数 (设置为-1为自动配置)。
rank 是模型中隐语义因子的个数。
iterations 是迭代的次数。
lambda 是ALS的正则化参数。
implicitPrefs 决定了是用显性反馈ALS的版本还是用适用隐性反馈数据集的版本。
alpha 是一个针对于隐性反馈 ALS 版本的参数,这个参数决定了偏好行为强度的基准。

2、可以写一个程序去读取HBase的推荐数据,对外暴露一个rest接口,这样可以更方便展示。

转载于:https://my.oschina.net/xiaominmin/blog/1837117

Spark MLlib系列(二):基于协同过滤的电影推荐系统相关推荐

  1. python协同过滤电影推荐_基于协同过滤的电影推荐系统的设计与实现

    龙源期刊网 http://www.qikan.com.cn 基于协同过滤的电影推荐系统的设计与实现 作者:张玉叶

  2. Python机器学习实战教学——基于协同过滤的电影推荐系统(超详细教学,算法分析)

    注重版权,转载请注明原作者和原文链接 作者:Yuan-Programmer 结尾处有效果展示 文章目录 引言 一.技术原理 (一)推荐算法介绍 (二)主流距离计算法 (三)余弦距离计算法 二.数据介绍 ...

  3. 基于协同过滤的电影推荐系统

    推荐系统 推荐系统是一种信息过滤系统,可以提高搜索结果的质量,并提供搜索项或者与用户的搜索历史相对应的内容.通常运用于预测用户对某项商品的评价或者偏好,国内很多的公司都有使用到,淘宝,京东使用它来向用 ...

  4. 基于Spark平台的协同过滤实时电影推荐系统

    文章目录 摘要 0 引言 1 协同过滤算法 2 实时推荐服务 3 电影推荐系统设计部署 3.1 架构设计 3.2 系统功能设计 3.3 数据库设计 4 系统运行效果 5 结语 参考文献 摘要 摘要:随 ...

  5. 利用用户行为数据——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二)

    系列文章目录 初识推荐系统--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一) 利用用户行为数据--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二) 项目主要效果展示--基 ...

  6. 项目体系架构设计——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(四)

    系列文章目录 初识推荐系统--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一) 利用用户行为数据--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二) 项目主要效果展示--基 ...

  7. 基础环境搭建——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(五)

    系列文章目录 初识推荐系统--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一) 利用用户行为数据--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二) 项目主要效果展示--基 ...

  8. 实时推荐服务建设——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(八)

    系列文章目录 初识推荐系统--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一) 利用用户行为数据--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二) 项目主要效果展示--基 ...

  9. 【推荐系统案例】基于协同过滤的电影推荐

    案例--基于协同过滤的电影推荐 1. 数据集下载 2. 数据集加载 3. 相似度计算 4. User-Based CF 预测评分算法实现 5. Item-Based CF 预测评分算法实现 前面我们已 ...

  10. 1.3 基于协同过滤的电影推荐案例

    1.3 案例–基于协同过滤的电影推荐 学习目标 应用基于用户的协同过滤实现电影评分预测 应用基于物品的协同过滤实现电影评分预测 1 User-Based CF 预测电影评分 数据集下载 下载地址:Mo ...

最新文章

  1. 数学大神攻克猜字游戏Wordle,求解算法成绩逼近理论极限,连信息论都用上了...
  2. 搭建网络及服务器系统,网络服务器搭建与管理
  3. 巧妙的 排序+去重——C语言
  4. Three.js 入门示例
  5. WSL:WSL(Windows Subsystem for Linux)的简介、安装、使用方法之详细攻略
  6. MySQL中述职类型的长度问题
  7. led内部结构_科普PCB,DPC,陶瓷PCB对于LED封装有哪些差异? - led显示屏_高清led显示屏价格_led显示屏生产厂家...
  8. DWRUtil未定义的问题
  9. 标题: ZZ- Linux 系统裁减指南(LiPS)
  10. Dynamics CRM 2013 installation
  11. Delphi通过ADOQuery控件实现Sqlserver数据库多结果集的数据打印
  12. 全球首发!计算机视觉Polygon Mesh Processing总结9——Triangle-Based Remeshing
  13. Eclipse安装应知应会
  14. socks代理转http代理
  15. 鸿蒙三千法则排名,三千法则名称大全_十大最强法则
  16. 【学习笔记】Kruskal 重构树(BZOJ3551【ONTAK2010】Peaks加强版)
  17. 《如何有效阅读一本书-超实用笔记读书法》
  18. 当女程序员遇到了问题......太真实了
  19. 网络空间拟态防御CMD(Cyber Mimic Defense)
  20. PoW 、PoS , DPoS 算法

热门文章

  1. 如何从官网直接下载iTunes?
  2. [SSL_CHX][2022-02-23]进制转换
  3. 虚拟机之Jvm、dalvik、art联系和区别
  4. 项目npm install报错gyp info it worked if it ends with ok
  5. 屏库是个很好的网站,囊括了几乎所有型号的显示屏
  6. 新形势下安全风险评估实践
  7. 数组unshift方法及重构
  8. AS3动画效果公式,常用处理公式代码,基本运动公式,三角公式
  9. 表示整数x的绝对值大于5时值为真的c语言表达式是——.,1表示'整数x的绝对值大于5'时值为'真'的C语言表达式是_____...
  10. 数组之concat注意事项-不更改原数组