四、离线推荐服务建设(基于隐语义模型的协同过滤推荐)

目录

  • 四、离线推荐服务建设(基于隐语义模型的协同过滤推荐)
    • 4.1 离线推荐服务
    • 4.2 离线统计服务
    • 4.3 基于隐语义模型的协同过滤推荐
      • 4.3.1 用户商品推荐列表
      • 4.3.2 商品相似度矩阵
      • 4.3.3 模型评估和参数选取
    • 4.4 附件:完整代码

4.1 离线推荐服务

离线推荐服务是综合用户所有的历史数据,利用设定的离线统计算法和离线推荐算法周期性的进行结果统计与保存,计算的结果在一定时间周期内是固定不变的,变更的频率取决于算法调度的频率。

离线推荐服务主要计算一些可以预先进行统计和计算的指标,为实时计算和前端业务相应提供数据支撑。

离线推荐服务主要分为统计推荐、基于隐语义模型的协同过滤推荐以及基于内容和基于Item-CF的相似推荐。我们这一篇主要介绍基于隐语义模型的协同过滤推荐。

4.2 离线统计服务

详见:电商推荐系统四: 离线推荐服务建设(基于统计的推荐)

4.3 基于隐语义模型的协同过滤推荐

项目采用ALS作为协同过滤算法,根据MongoDB中的用户评分表计算离线的用户商品推荐列表以及商品相似度矩阵。

4.3.1 用户商品推荐列表

通过ALS训练出来的Model来计算所有当前用户商品的推荐列表,主要思路如下:

  1. userId和productId做笛卡尔积,产生(userId,productId)的元组
  2. 通过模型预测(userId,productId)对应的评分。
  3. 将预测结果通过预测分值进行排序。
  4. 返回分值最大的K个商品,作为当前用户的推荐列表。

最后生成的数据结构如下:将数据保存到MongoDB的UserRecs表中

新建recommender的子项目OfflineRecommender,引入spark、scala、mongo和jblas的依赖:

<dependencies><dependency><groupId>org.scalanlp</groupId><artifactId>jblas</artifactId><version>${jblas.version}</version></dependency><!-- Spark的依赖引入 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.11</artifactId></dependency><!-- 引入Scala --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId></dependency><!-- 加入MongoDB的驱动 --><!-- 用于代码方式连接MongoDB --><dependency><groupId>org.mongodb</groupId><artifactId>casbah-core_2.11</artifactId><version>${casbah.version}</version></dependency><!-- 用于Spark和MongoDB的对接 --><dependency><groupId>org.mongodb.spark</groupId><artifactId>mongo-spark-connector_2.11</artifactId><version>${mongodb-spark.version}</version></dependency>
</dependencies>

同样经过前期的构建样例类、声明配置、创建SparkSession等步骤,可以加载数据开始计算模型了。

核心代码如下:
src/main/scala/com.recom.offline/OfflineRecommender.scala

case class ProductRating(userId: Int, productId: Int, score: Double, timestamp: Int)case class MongoConfig(uri:String, db:String)// 标准推荐对象,productId,score
case class Recommendation(productId: Int, score:Double)// 用户推荐列表
case class UserRecs(userId: Int, recs: Seq[Recommendation])// 商品相似度(商品推荐)
case class ProductRecs(productId: Int, recs: Seq[Recommendation])object OfflineRecommmeder {// 定义常量val MONGODB_RATING_COLLECTION = "Rating"// 推荐表的名称val USER_RECS = "UserRecs"val PRODUCT_RECS = "ProductRecs"val USER_MAX_RECOMMENDATION = 20def main(args: Array[String]): Unit = {// 定义配置val config = Map("spark.cores" -> "local[*]","mongo.uri" -> "mongodb://localhost:27017/recommender","mongo.db" -> "recommender")// 创建spark sessionval sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")val spark = SparkSession.builder().config(sparkConf).getOrCreate()implicit val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))import spark.implicits._
//读取mongoDB中的业务数据
val ratingRDD = spark
.read
.option("uri",mongoConfig.uri)
.option("collection",MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.rdd
.map(rating=> (rating.userId, rating.productId, rating.score)).cache()
//用户的数据集 RDD[Int]
val userRDD = ratingRDD.map(_._1).distinct()
val prodcutRDD = ratingRDD.map(_._2).distinct()//创建训练数据集
val trainData = ratingRDD.map(x => Rating(x._1,x._2,x._3))
// rank 是模型中隐语义因子的个数, iterations 是迭代的次数, lambda 是ALS的正则化参
val (rank,iterations,lambda) = (50, 5, 0.01)
// 调用ALS算法训练隐语义模型
val model = ALS.train(trainData,rank,iterations,lambda)//计算用户推荐矩阵
val userProducts = userRDD.cartesian(productRDD)
// model已训练好,把id传进去就可以得到预测评分列表RDD[Rating] (userId,productId,rating)
val preRatings = model.predict(userProducts)val userRecs = preRatings
.filter(_.rating > 0)
.map(rating => (rating.user,(rating.product, rating.rating)))
.groupByKey()
.map{case (userId,recs) => UserRecs(userId,recs.toList.sortWith(_._2 >
_._2).take(USER_MAX_RECOMMENDATION).map(x => Recommendation(x._1,x._2)))
}.toDF()userRecs.write
.option("uri",mongoConfig.uri)
.option("collection",USER_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()//TODO:计算商品相似度矩阵// 关闭spark
spark.stop()
}
}

4.3.2 商品相似度矩阵

通过ALS计算商品相似度矩阵,该矩阵用于查询当前商品的相似商品并为实时推荐系统服务。

离线计算的ALS 算法,算法最终会为用户、商品分别生成最终的特征矩阵,分别是表示用户特征矩阵的U(m x k)矩阵,每个用户由 k个特征描述;表示物品特征矩阵的V(n x k)矩阵,每个物品也由 k 个特征描述。

V(n x k)表示物品特征矩阵,每一行是一个 k 维向量,虽然我们并不知道每一个维度的特征意义是什么,但是k 个维度的数学向量表示了该行对应商品的特征。

数据集中任意两个商品间相似度都可以由公式计算得到,商品与商品之间的相似度在一段时间内基本是固定值。最后生成的数据保存到MongoDB的ProductRecs表中。

核心代码如下:

//计算商品相似度矩阵
//获取商品的特征矩阵,数据格式 RDD[(scala.Int, scala.Array[scala.Double])]
val productFeatures = model.productFeatures.map{case (productId,features) =>(productId, new DoubleMatrix(features))
}// 计算笛卡尔积并过滤合并
val productRecs = productFeatures.cartesian(productFeatures).filter{case (a,b) => a._1 != b._1}  .map{case (a,b) =>val simScore = this.consinSim(a._2,b._2) // 求余弦相似度(a._1,(b._1,simScore))}.filter(_._2._2 > 0.6)    .groupByKey()             .map{case (productId,items) =>ProductRecs(productId,items.toList.map(x => Recommendation(x._1,x._2)))}.toDF()productRecs.write.option("uri", mongoConfig.uri).option("collection",PRODUCT_RECS).mode("overwrite").format("com.mongodb.spark.sql").save()
其中,consinSim是求两个向量余弦相似度的函数,代码实现如下:
//计算两个商品之间的余弦相似度
def consinSim(product1: DoubleMatrix, product2:DoubleMatrix) : Double ={product1.dot(product2) / ( product1.norm2()  * product2.norm2() )
}

4.3.3 模型评估和参数选取

在上述模型训练的过程中,我们直接给定了隐语义模型的rank,iterations,lambda三个参数。对于我们的模型,这并不一定是最优的参数选取,所以我们需要对模型进行评估。通常的做法是计算均方根误差(RMSE),考察预测评分与实际评分之间的误差。

有了RMSE,我们可以就可以通过多次调整参数值,来选取RMSE最小的一组作为我们模型的优化选择。

在scala/com.recom.offline/下新建单例对象ALSTrainer,代码主体架构如下:

def main(args: Array[String]): Unit = {val config = Map("spark.cores" -> "local[*]","mongo.uri" -> "mongodb://localhost:27017/recommender","mongo.db" -> "recommender")//创建SparkConfval sparkConf = new SparkConf().setAppName("ALSTrainer").setMaster(config("spark.cores"))//创建SparkSessionval spark = SparkSession.builder().config(sparkConf).getOrCreate()val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))import spark.implicits._//加载评分数据val ratingRDD = spark.read.option("uri",mongoConfig.uri).option("collection",OfflineRecommender.MONGODB_RATING_COLLECTION).format("com.mongodb.spark.sql").load().as[ProductRating].rdd.map(rating => Rating(rating.userId,rating.productId,rating.score)).cache()// 将一个RDD随机切分成两个RDD,用以划分训练集和测试集val splits = ratingRDD.randomSplit(Array(0.8, 0.2))val trainingRDD = splits(0)val testingRDD = splits(1)//输出最优参数adjustALSParams(trainingRDD, testingRDD)//关闭Sparkspark.close()
}

其中adjustALSParams方法是模型评估的核心,输入一组训练数据和测试数据,输出计算得到最小RMSE的那组参数。代码实现如下:
// 输出最终的最优参数

def adjustALSParams(trainData:RDD[Rating], testData:RDD[Rating]): Unit ={// 这里指定迭代次数为5,rank和lambda在几个值中选取调整val result = for(rank <- Array(100,200,250); lambda <- Array(1, 0.1, 0.01, 0.001))yield {val model = ALS.train(trainData,rank,5,lambda)val rmse = getRMSE(model, testData)(rank,lambda,rmse)}// 按照rmse排序println(result.sortBy(_._3).head)
}

计算RMSE的函数getRMSE代码实现如下:

def getRMSE(model:MatrixFactorizationModel, data:RDD[Rating]):Double={val userProducts = data.map(item => (item.user,item.product))val predictRating = model.predict(userProducts)
val real = data.map(item => ((item.user,item.product),item.rating))val predict = predictRating.map(item => ((item.user,item.product),item.rating))// 计算RMSEsqrt(real.join(predict).map{case ((userId,productId),(real,pre))=>// 真实值和预测值之间的差val err = real - preerr * err}.mean())
}

运行代码,我们就可以得到目前数据的最优模型参数。

4.4 附件:完整代码

package com.recom.offlineimport org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.jblas.DoubleMatrixcase class ProductRating(userId: Int,productId:Int,score:Double,timestamp:Int)
case class MongoConfig(uri:String,db:String)//定义标准推荐对象
case class Recommendation(productId: Int,score:Double)
//定义用户推荐列表
case class UserRecs(userId:Int,recs:Seq[Recommendation])
//定义商品相似度列表
case class ProductRecs(productId:Int,recs:Seq[Recommendation])object OfflineRecommender {//定义monogobd中存储的表名val MONGODB_RATING_COLLECTION = "Rating"val USER_RECS="UserRecs"val PRODUCT_RECS="ProductRecs"val USER_MAX_RECOMMENDATION=20def main(args: Array[String]): Unit = {//定义基础配置的集合(可以放入配置文件,通过方法获取属性的值)val config = Map("spark.cores"->"local[*]","mongo.uri"->"mongodb://hadoop102:27017/recommender","mongo.db"->"recommender")//创建一个spark configval sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")//创建一个spark sessionval spark = SparkSession.builder().config(sparkConf).getOrCreate()//导入隐式转换类,在DF和DS转换的过程中会使用到import spark.implicits._//通过隐式类的方法创建mongodb连接对象implicit val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))//加载数据val ratingRDD = spark.read.option("uri", mongoConfig.uri).option("collection", MONGODB_RATING_COLLECTION).format("com.mongodb.spark.sql").load().as[ProductRating].rdd.map(rating=>(rating.userId,rating.productId,rating.score)).cache()//取出所有的用户和商品的数据集,为隐语义模型训练做准备val userRDD: RDD[Int] = ratingRDD.map(_._1).distinct()val productRDD: RDD[Int] = ratingRDD.map(_._2).distinct()//核心计算过程//1.训练隐语义模型val trainData = ratingRDD.map(x=>Rating(x._1,x._2,x._3))//定义隐语义模型的训练参数,rank隐特征个数,iterations迭代次数,lambda:正则化系数val (rank,iterations,lambda)=(5,10,0.01)val model = ALS.train(trainData,rank,iterations,lambda)//2.获取预测评分矩阵,得到用户推荐列表//用userRDD和productRDD做一个笛卡尔积,得到空的userProductsRDD表示的评分矩阵val userProducts: RDD[(Int, Int)] = userRDD.cartesian(productRDD)val preRating: RDD[Rating] = model.predict(userProducts)//从预测评分矩阵中提取得到用户的推荐列表val userRecs: DataFrame = preRating.filter(_.rating > 0).map(rating => (rating.user, (rating.product, rating.rating))).groupByKey().map {case (userId, recs) =>val recomTopN: List[(Int, Double)] = recs.toList.sortWith(_._2 > _._2).take(USER_MAX_RECOMMENDATION)val recommendations: List[Recommendation] = recomTopN.map(x => Recommendation(x._1, x._2))UserRecs(userId, recommendations)}.toDF()userRecs.write.option("uri",mongoConfig.uri).option("collection",USER_RECS).mode("overwrite").format("com.mongodb.spark.sql").save()//3.利用商品特征向量,计算商品的相似度列表val productFeatures: RDD[(Int, DoubleMatrix)] = model.productFeatures.map { case (productId, features) => (productId, new DoubleMatrix(features)) }//两两配对商品,计算余弦相似度val productRecs: DataFrame = productFeatures.cartesian(productFeatures).filter { case (x, y) => x._1 != y._1 }//计算余弦相似度.map { case (a, b) => {val simScore = consinSim(a._2, b._2)(a._1, (b._1, simScore))}}.filter(_._2._2 > 0.4).groupByKey().map {case (productId, recs) =>ProductRecs(productId, recs.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)))}.toDF()productRecs.write.option("uri",mongoConfig.uri).option("collection",PRODUCT_RECS).mode("overwrite").format("com.mongodb.spark.sql").save()//4.利用用户特征向量,计算用户的相似度列表/*val userFeatures: RDD[(Int, DoubleMatrix)] = model.userFeatures.map { case (userId, features) => (userId, new DoubleMatrix(features)) }//两两配对商品,计算余弦相似度val userRecs: DataFrame = userFeatures.cartesian(userFeatures).filter { case (x, y) => x._1 != y._1 }//计算余弦相似度.map { case (a, b) => {val simScore = consinSim(a._2, b._2)(a._1, (b._1, simScore))}}.filter(_._2._2 > 0.4).groupByKey().map {case (userId, recs) =>ProductRecs(userId, recs.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)))}.toDF()*/spark.stop()}def consinSim(product1: DoubleMatrix, product2: DoubleMatrix) ={product1.dot(product2) / (product1.norm2() * product2.norm2())}}

模型调优:

package com.recom.offlineimport breeze.numerics.sqrt
import com.recom.offline.OfflineRecommender.MONGODB_RATING_COLLECTION
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSessionobject ALSTrainer {def main(args: Array[String]): Unit = {//定义基础配置的集合(可以放入配置文件,通过方法获取属性的值)val config = Map("spark.cores"->"local[*]","mongo.uri"->"mongodb://hadoop102:27017/recommender","mongo.db"->"recommender")//创建一个spark configval sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ALSTrainer")//创建一个spark sessionval spark = SparkSession.builder().config(sparkConf).getOrCreate()//导入隐式转换类,在DF和DS转换的过程中会使用到import spark.implicits._//通过隐式类的方法创建mongodb连接对象implicit val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))//加载数据val ratingRDD = spark.read.option("uri", mongoConfig.uri).option("collection", MONGODB_RATING_COLLECTION).format("com.mongodb.spark.sql").load().as[ProductRating].rdd.map(rating=>Rating(rating.userId,rating.productId,rating.score)).cache()//将数据切分为训练集和测试集val splits= ratingRDD.randomSplit(Array(0.8,0.2))val trainingRDD=splits(0)val testingRDD=splits(1)//核心实现:输出最优参数adjustALSParams(trainingRDD,testingRDD)spark.stop()}def adjustALSParams(trainData: RDD[Rating], testData: RDD[Rating])={//定义训练参数val ranks=Array(5,10,20,50)val intertions=Array(10,25,50)val lambdas=Array(1,0.1,0.01)//遍历数组中定义的参数取值(加上迭代次数后会非常吃机器的性能)
//    val result= for (rank<-ranks;intertion<-intertions;lambda<-lambdas) yield{//        val model: MatrixFactorizationModel = ALS.train(trainData,rank,intertion,lambda)
//        val rmse = getRMSE(model,testData)
//        (rank,intertion,lambda,rmse)
//      }val result= for (rank<-ranks;lambda<-lambdas) yield{val model: MatrixFactorizationModel = ALS.train(trainData,rank,10,lambda)val rmse = getRMSE(model,testData)(rank,lambda,rmse)}for (elem <- result) {println(elem)}}def getRMSE(model: MatrixFactorizationModel, data: RDD[Rating])={//构建userProducts,得到预测评分矩阵val userProducts: RDD[(Int, Int)] = data.map(item=>(item.user,item.product))val predictRating = model.predict(userProducts)//按照公式计算rmse,首先把预测评分和实际评分按照(userId,productId)做一个连接val observed: RDD[((Int, Int), Double)] = data.map(item=>((item.user,item.product),item.rating))val predict: RDD[((Int, Int), Double)] = predictRating.map(item=>((item.user,item.product),item.rating))sqrt(observed.join(predict).map{case ((userId,productId),(actual,pre))=>val err = actual - preerr*err}.mean())}}

电商推荐系统四: 基于隐语义模型的协同过滤推荐相关推荐

  1. 电商推荐系统(上):推荐系统架构、数据模型、离线统计与机器学习推荐、历史热门商品、最近热门商品、商品平均得分统计推荐、基于隐语义模型的协同过滤推荐、用户商品推荐列表、商品相似度矩阵、模型评估和参数选取

    文章目录 第1章 项目体系架构设计 1.1 项目系统架构 1.2 项目数据流程 1.3 数据模型 第2章 工具环境搭建 2.1 MongoDB(单节点)环境配置 2.2 Redis(单节点)环境配置 ...

  2. 推荐算法!基于隐语义模型的协同过滤推荐之用户商品推荐列表

    项目采用ALS作为协同过滤算法,根据MongoDB中的用户评分表计算离线的用户商品推荐列表以及商品相似度矩阵. 通过ALS训练出来的Model来计算所有当前用户商品的推荐列表,主要思路如下: 1. u ...

  3. 隐语义模型 VS 协同过滤

    隐语义模型 从数据出发,进行个性化推荐 用户和数据之间有着隐含的联系 隐含因子让计算机能理解就好 将用户和物品通过中介隐含因子联系起来 分解-组合 F隐藏因子 隐语义模型求解 梯度下降方向 迭代求解 ...

  4. python协同过滤电影推荐_推荐系统:基于用户和模型的协同过滤电影推荐

    2018-04-26 1.协同过滤 协同过滤(Collaborative Filtering)字面上的解释就是在别人的帮助下来过滤筛选,协同过滤一般是在海量的用户中发现一小部分和你品味比较相近的,在协 ...

  5. 基于SVD++隐语义模型的信任网络推荐算法

    点击上方蓝字关注我们 基于SVD++隐语义模型的信任网络推荐算法 陈佩武1, 束方兴2 1 平安科技(深圳)有限公司,广东 深圳 518031 2 北京大学互联网研究院(深圳),广东 深圳 51805 ...

  6. Python+Django+Mysql实现购物商城推荐系统 基于用户、项目的协同过滤推荐购物商城系统 网络购物推荐系统 代码实现 源代码下载

    Python+Django+Mysql实现购物商城推荐系统(基于用户.项目的协同过滤推荐算法) 一.项目简介 1.开发工具和实现技术 pycharm2020professional版本,python3 ...

  7. Python+Django+Mysql实现在线电影推荐系统 基于用户、项目的协同过滤推荐在线电影系统 代码实现 源代码下载

    Python+Django+Mysql实现在线电影推荐系统(基于用户.项目的协同过滤推荐算法) 一.项目简介 1.开发工具和实现技术 pycharm2020professional版本,python3 ...

  8. 基于隐语义模型的个性化推荐算法综述-笔记整理

    1. 前期知识 均方根值(RMS)+ 均方根误差(RMSE)+标准差(Standard Deviation) 1.均方根值(RMS)也称作为效值,它的计算方法是先平方.再平均.然后开方. 2.均方根误 ...

  9. 基于云模型的协同过滤推荐算法代码实现(附源代码)

    基于云模型的协同过滤推荐算法代码实现(附源代码) 一.云模型介绍 针对传统推荐系统数据稀疏.相似性计算方法导致共同评分用户少的问题,提出利用云模型概念与定量数值转换的优势,研究云模型(百度百科查看概念 ...

最新文章

  1. 利用脚本生成GUID
  2. 又见灵异事件,li中的span右浮动遇到的问题
  3. Spring+Velocity中模板路径的问题
  4. 研发手Q推广遇到的一系列问题
  5. 【计算机网络复习 数据链路层】3.5.2 ALOHA协议
  6. html怎么设置椭圆文本框,html – 避免CSS border-radius中的椭圆形状
  7. redhat修改mysql编码格式_RedHatLinux下修改MySQL编码方式
  8. vue 赋值不改变_【报Bug】超大BUG,Nvue页面引入组件以后,该页面data里的值不能再赋值改变...
  9. 为什么董明珠愿意让她“孙子”喝飞鹤奶粉?
  10. 升级EXCHANGE2010到2013(C)
  11. 交换排序—冒泡排序(Bubble Sort)
  12. rj45接口引脚定义_RJ45接口针脚定义(各种接口针脚定义)
  13. Matlab绘图方法整理(超完整版)
  14. 论文查重:利用Python查找两个Word文件的相同内容
  15. 获取Audio音乐的总时长
  16. C我语言编程老鼠寻路,数据结构课设-走迷宫游戏.doc
  17. Ubuntu 分卷压缩
  18. html转换下一页,如何转到下一页与HTML和/或JS锚?
  19. Godaddy上的域名如何取消自动续费?
  20. SSH java 面试题

热门文章

  1. 5G只是小儿科?任正非对话人工智能专家2万字实录
  2. Java并发编程:线程池的使用
  3. Smartbi:“最强大脑”智慧检务如何落地?智慧检务系统建设案例
  4. 蚂蚁vp下载_青团社VP青团兼职CEO莫凡荣登2019福布斯中国“30Under30”精英榜
  5. 利用Jenkins自动化部署springboot项目到阿里云服务器(centos8)
  6. Echart的图例居中显示在下方
  7. 【CSS】圣杯、双飞翼布局
  8. PCI和PCIE插槽有什么区别?
  9. 面向 AWS 专家的 Azure 云服务介绍
  10. 网红考研老师张雪峰被人泼鲱鱼罐头,回应:竞争对手所为!