推荐系统(一)基于协同过滤算法开发离线推荐
什么是离线推荐
所谓的离线推荐其实就是根据用户产生的行为日志,后台设定一个离线统计算法和离线推荐算法的任务来对这些行为日志进行周期性的统计,统计过后的结果数据为前台或者实时分析提供数据的支撑。离线推荐要求实时性不高。
离线推荐算法之协同过滤
协同过滤其实就是借助大量已有的用户偏好来估计用户对其未接触过的物品的偏好程度。其基本原理就是相似度。
基于用户协同过滤推荐
所谓的基于用户协同过滤推荐就是利用用户的偏好,来计算当前用户与其他各个用户偏好的相似度,然后抽取相似度靠前的商品,向当前用户推荐他没接触过的商品。这种商品的获得规则就是根据其他用户与该用户进行在行为或者洗好上进行相似度统计,取出相似度靠前的商品来进行推荐。
基于商品协同过滤推荐
所谓的基于商品协同过滤推荐其实就是根据商品来向用户推荐。例如A商品有一群用户喜欢,那么当再出现跟A商品相似度比较高的商品,那么就会进行商品的推荐。
相似度度量
欧几里得度量
欧几里得度量(也成欧氏距离)是一种常用的距离定义,特指在N维空间内,计算两个点之间的实际距离或者是向量自然长度。在二维空间和三维空间中的欧氏距离就是两个点之间的真是距离。
余弦相似度
余弦相似度又称余弦相似性,用来计算二维空间中两个向量的夹角的余弦值来评估它们的相似度。
协同过滤之ALS离线推荐算法
所谓的ALS优化其实就是对协同过滤的优化,涉及到矩阵预算
ALS离线推荐算法用法
训练模型
- 创建训练数据。利用用户偏好数据封装成Spark Mllib内部的Rating对象trainRDD。
- 基于Spark Mllib的ALS创建训练模型。利用
ALS.train(trainRDD, rank, iterations, lambda)
方法来创建训练模型。trainRDD一定是根据用户偏好数据封装的Spark Mllib内部的Rating对象,rank是商品特征数,iterations是迭代计算次数,lambda是正则化参数。
计算用户推荐矩阵
所谓的用户推荐矩阵的原理,其实就是根据当前用户的偏好于其他各个用户的偏好进行相似度上的计算,然后得到一个相似度的值,值越大说明两个用户的喜好相似度越大,我们把该用户对应的相似度比较大的用户叫做他的"邻居",我们就会把这个邻居喜好的,并且当前用户没有见过的商品推荐给当前用户。这就是基于用户的推荐矩阵
package com.lyzimport breeze.linalg.rank
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.jblas.DoubleMatrix
object OfflineRecommend {/*** Movie数据集,数据集字段通过分割** 151^ 电影的ID* Rob Roy (1995)^ 电影的名称* In the highlands ....^ 电影的描述* 139 minutes^ 电影的时长* August 26, 1997^ 电影的发行日期* 1995^ 电影的拍摄日期* English ^ 电影的语言* Action|Drama|Romance|War ^ 电影的类型* Liam Neeson|Jessica Lange... 电影的演员* Michael Caton-Jones 电影的导演** tag1|tag2|tag3|.... 电影的Tag**/case class Movie(val mid: Int, val name: String, val descri: String, val timelong: String, val issue: String,val shoot: String, val language: String, val genres: String, val actors: String, val directors: String)/*** Rating数据集,用户对于电影的评分数据集,用,分割** 1, 用户的ID* 31, 电影的ID* 2.5, 用户对于电影的评分* 1260759144 用户对于电影评分的时间*/case class MovieRating(uid: Int, mid: Int, score: Double, timestamp: Int)case class MongoConfig(val uri: String, val db: String)//推荐case class Recommendation(rid: Int, r: Double)// 用户的推荐case class UserRecs(uid: Int, recs: Seq[Recommendation])//电影的相似度case class MovieRecs(uid: Int, recs: Seq[Recommendation])val MONGODB_RATING_COLLECTION = "Rating"val MONGODB_MOVIE_COLLECTION = "Movie"val USER_MAX_RECOMMENDATION = 10val USER_RECS = "UserRecs"def main(args: Array[String]): Unit = {val config = Map("spark.cores" -> "local[*]","mongo.uri" -> "mongodb://hadoop001:27017/recommend","mongo.db" -> "reommend")//创建一个SparkConf配置val sparkConf = new SparkConf().setAppName("OfflineRecommender").setMaster(config("spark.cores")).set("spark.executor.memory", "6G").set("spark.driver.memory", "2G")//基于SparkConf创建一个SparkSessionval spark = SparkSession.builder().config(sparkConf).getOrCreate()//创建一个MongoDBConfigval mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))import spark.implicits._/*** 计算用户推荐矩阵,利用协同过滤里的ALS算法计算用户推荐矩阵*///抽取评分数据集val movieRatingRDD: RDD[(Int, Int, Double, Int)] = spark.read.option("uri", mongoConfig.uri) //指定mongodb集群地址.option("collection", MONGODB_RATING_COLLECTION) //指定mongodb数据库.format("com.mongodb.spark.sql").load() //加载数据.as[MovieRating] //将DataFrame转换成DataSet.rdd //将DataSet转换成RDD.map(rating => (rating.uid, rating.mid, rating.score, rating.timestamp)).cache() //将RDD的Rating转换成元组val userRDD: RDD[Int] = movieRatingRDD.map(_._1).distinct() //获取去重后的uid的RDD//抽取uid数据集val movieRDD: RDD[Int] = spark.read.option("uri", mongoConfig.uri).option("collection", MONGODB_MOVIE_COLLECTION).format("com.mongodb.spark.sql").load().as[Movie] //将DataFrame转换成DataSet.rdd //将Dataset转换成RDD.map(_.mid).cache() //将RDD里的mid转换成新的RDD//创建模型训练所需的数据集val trainRDD: RDD[Rating] = movieRatingRDD.map(rating => Rating(rating._1, rating._2, rating._3))//这三个参数是怎么确定的呢?其实是我们自己测出来的,//找出三个最优的参数的原理其实用到的数 "均方跟误差",如果均方跟误差越小,那么就说名参数越优。下边会介绍怎么计算均方根误差val (rank, iterations, lambda) = (50, 10, 0.01)//创建训练模型 trainRDD:需要训练的数据集,rank:计算时候使用Movie的特征数量,iterations:迭代计算的次数,val model: MatrixFactorizationModel = ALS.train(trainRDD, rank, iterations, lambda)//创建一个用户与产品的矩阵val userProducts: RDD[(Int, Int)] = userRDD.cartesian(movieRDD)//利用模型开始计算val ratingRDD: RDD[Rating] = model.predict(userProducts)val userRecsDF: DataFrame = ratingRDD.filter(_.rating > 0).map(rating => (rating.user, (rating.product, rating.rating))).groupByKey().map {case (user, recs) => UserRecs(user, recs.toList.sortWith(_._2 > _._2).take(10).map(x => Recommendation(x._1, x._2)))}.toDF()spark.stop()}
}
计算商品相似度矩阵
基于商品的协同过滤算法类似于基于用户协同过滤算法。基于商品协同过滤算法的原理其实就是利用用户的偏好,来计算商品与商品的相似度,如果用户A偏好P1商品,如果出现了相似度接近与P1的商品P2,那么这种算法就会将P2推荐给用户。
package com.lyzimport breeze.linalg.rank
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.jblas.DoubleMatrixobject OfflineRecommend {/*** Movie数据集,数据集字段通过分割** 151^ 电影的ID* Rob Roy (1995)^ 电影的名称* In the highlands ....^ 电影的描述* 139 minutes^ 电影的时长* August 26, 1997^ 电影的发行日期* 1995^ 电影的拍摄日期* English ^ 电影的语言* Action|Drama|Romance|War ^ 电影的类型* Liam Neeson|Jessica Lange... 电影的演员* Michael Caton-Jones 电影的导演** tag1|tag2|tag3|.... 电影的Tag**/case class Movie(val mid: Int, val name: String, val descri: String, val timelong: String, val issue: String,val shoot: String, val language: String, val genres: String, val actors: String, val directors: String)/*** Rating数据集,用户对于电影的评分数据集,用,分割** 1, 用户的ID* 31, 电影的ID* 2.5, 用户对于电影的评分* 1260759144 用户对于电影评分的时间*/case class MovieRating(uid: Int, mid: Int, score: Double, timestamp: Int)/*** MongoDB的连接配置** @param uri MongoDB的连接* @param db MongoDB要操作数据库*/case class MongoConfig(val uri: String, val db: String)//推荐case class Recommendation(rid: Int, r: Double)// 用户的推荐case class UserRecs(uid: Int, recs: Seq[Recommendation])//电影的相似度case class MovieRecs(uid: Int, recs: Seq[Recommendation])val MONGODB_RATING_COLLECTION = "Rating"val MONGODB_MOVIE_COLLECTION = "Movie"val USER_MAX_RECOMMENDATION = 10val USER_RECS = "UserRecs"def main(args: Array[String]): Unit = {val config = Map("spark.cores" -> "local[*]","mongo.uri" -> "mongodb://hadoop001:27017/recommend","mongo.db" -> "reommend")//创建一个SparkConf配置val sparkConf = new SparkConf().setAppName("OfflineRecommender").setMaster(config("spark.cores")).set("spark.executor.memory", "6G").set("spark.driver.memory", "2G")//基于SparkConf创建一个SparkSessionval spark = SparkSession.builder().config(sparkConf).getOrCreate()//创建一个MongoDBConfigval mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))import spark.implicits._/*** 计算用户推荐矩阵,利用协同过滤里的ALS算法计算用户推荐矩阵*///抽取评分数据集val movieRatingRDD: RDD[(Int, Int, Double, Int)] = spark.read.option("uri", mongoConfig.uri) //指定mongodb集群地址.option("collection", MONGODB_RATING_COLLECTION) //指定mongodb数据库.format("com.mongodb.spark.sql").load() //加载数据.as[MovieRating] //将DataFrame转换成DataSet.rdd //将DataSet转换成RDD.map(rating => (rating.uid, rating.mid, rating.score, rating.timestamp)).cache() //将RDD的Rating转换成元组//创建模型训练所需的数据集val trainRDD: RDD[Rating] = movieRatingRDD.map(rating => Rating(rating._1, rating._2, rating._3))val (rank, iterations, lambda) = (50, 10, 0.01)//创建训练模型 trainRDD:需要训练的数据集,rank:计算时候使用Movie的特征数量,iterations:迭代计算的次数,val model: MatrixFactorizationModel = ALS.train(trainRDD, rank, iterations, lambda)/*** 计算电影相似度矩阵,利用余弦相似度*///获取电影特征矩阵val features: RDD[(Int, Array[Double])] = model.productFeatures//转换电影特征矩阵,为计算余弦相似度提供参数val midDobleMatrix: RDD[(Int, DoubleMatrix)] = features.map {//DoubleMatrix:就是计算余弦相似度所需要的参数,这个参数以评分数组作为参数。case (mid, feature) => (mid, new DoubleMatrix(feature))}//将两个电影特征矩阵进行笛卡尔积转换,因为一个数据矩阵里的每一行都是每个电影对应的特征,//由于我们要计算每个电影之间的相似度,为了方便 我们就需要进行笛卡尔积操作,然后过滤点重复的项,//这样每个电影都会与其他电影一一对应起来了,然后就可以计他们的相似度矩阵val movieRecs: RDD[(Int, (Int, Double))] = midDobleMatrix.cartesian(midDobleMatrix).filter {case (a, b) => a._1 != b._1}.map {//计算两个电影特征的相似度//a._2.dot(b._2):多个特征向量的累加和//a._2.norm2() * b._2.norm1():多个向量平方的累加和的平方根的乘机//电影a与电影b的余弦相似度case (a, b) => (a._1, (b._1, a._2.dot(b._2) / a._2.norm2() * b._2.norm1()))}.filter(_._2._2 > 0.6) //抽取出余弦相似度大于0.6的电影spark.stop()}
}
利用均方根误差来计算ALS最优参数
所谓的均方根误差就是通过真实值和预测值进行数学上的逻辑来算出一个值。这个值越小说明误差越小,参数最优公式如下
package com.atguigu.offline
import breeze.numerics.sqrt
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSessionobject ALSTriner {def main(args: Array[String]): Unit = {val config = Map("spark.cores" -> "local[*]","mongo.uri" -> "mongodb://linux:27017/recommender","mongo.db" -> "recommender")//创建SparkConfval sparkConf = new SparkConf().setAppName("ALSTrainer").setMaster(config("spark.cores"))//创建SparkSessionval spark = SparkSession.builder().config(sparkConf).getOrCreate()val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))import spark.implicits._//加载评分数据val ratingRDD = spark.read.option("uri",mongoConfig.uri).option("collection",OfflineRecommender.MONGODB_RATING_COLLECTION).format("com.mongodb.spark.sql").load().as[MovieRating].rdd.map(rating => Rating(rating.uid,rating.mid,rating.score)).cache()//输出最优参数adjustALSParams(ratingRDD)//关闭Sparkspark.close()}// 输出最终的最优参数def adjustALSParams(trainData:RDD[Rating]): Unit ={val result = for(rank <- Array(30,40,50,60,70); lambda <- Array(1, 0.1, 0.001))yield {val model = ALS.train(trainData,rank,5,lambda)val rmse = getRmse(model,trainData)(rank,lambda,rmse)}println(result.sortBy(_._3).head)}def getRmse(model:MatrixFactorizationModel, trainData:RDD[Rating]):Double={//需要构造一个usersProducts RDD[(Int,Int)]val userMovies = trainData.map(item => (item.user,item.product))val predictRating = model.predict(userMovies)val real = trainData.map(item => ((item.user,item.product),item.rating))val predict = predictRating.map(item => ((item.user,item.product),item.rating))sqrt(real.join(predict).map{case ((uid,mid),(real,pre))=>// 真实值和预测值之间的两个差值val err = real - preerr * err}.mean())}}
基于类别来统计各个电影评分的Top
在电影评分数据中根据电影的id进行分组,然后算出每个电影的平均分
利用电影信息数据与统计后的数据进行Join操作,让每个电影与对应的分数对应起来
将电影的所有类别的RDD与join以后的数据信息RDD进行笛卡尔积的操作,主要是为了过滤出与电影类型能够匹配上的电影信息数据。
去除冗余数据,也就是业务逻辑不需要的数据信息,也就是数据中不需要的字段。
根据去除冗余数据的数据信息进行groupByKey的操作,拿到以电影类型为Key,电影信息集合为Value的数据集
将groupByKey操作以后的数据进行转换,将电影类型对应的电影信息集合中的评分利用sortWith进行排序,利用take(10)取出前十条数据。
将数据封装成对象保存到数据库中
import java.text.SimpleDateFormatimport org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} //Mongo配置 case class MongoConf(uri: String, db: String)case class Movie()case class MovieTypeRecommend(movieType: String, movieRecommend: Seq[MovieRecommend])case class MovieRecommend(mid: Int, score: Double)object StatisticsRecommend {val MONGODB_MOVIE_COLLECTION = "Movie"val MONGODB_RATING_COLLECTION = "Rating"def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName("StatisticsRecommend").setMaster("local[*]")val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()val mongoConf = MongoConf("mongodb://hadoop001:27017/recommend", "recommend")//导入隐式转换import spark.implicits._//加载Movie电影信息数据val movieDF: DataFrame = spark.read.option("uri", mongoConf.uri).option("collection", MONGODB_MOVIE_COLLECTION).format("com.mongodb.spark.sql").load().as[Movie].toDF()//加载Rating电影评分数据val ratingDF: DataFrame = spark.read.option("uri", mongoConf.uri).option("collection", MONGODB_RATING_COLLECTION).format("com.mongodb.spark.sql").load().as[Movie].toDF()//统计评分最多的电影ratingDF.createOrReplaceTempView("rating")// spark.sql("select mid,count(mid) as ratingCount from rating group by mid ").show(10)val dateFormat = new SimpleDateFormat("yyyyMM")//spark.udf.register("timeStampFormat", (time: Int) => dateFormat.format(new Date(time.toLong)))//按月份统计评分数,mid count,time//spark.sql("select mid ,count(mid) as ratingCount,timeStampFormat(timestamp) as yearmonth from rating group by mid,yearmonth").show(100)//val movieWithRatingDF: DataFrame = movieDF.join(ratingDF,Seq("mid","mid"),"left")//movieWithRatingDF.distinct().select("mid","name","uid").groupBy($"mid",$"name").sum("mid").sort("sum(mid)").show(10)//按照电影类别来统计每种类别电影对应的评分TOP10//所有的电影类别val movieTypeList = List("Action", "Adventure", "Animation", "Comedy", "Ccrime", "Documentary", "Drama", "Family", "Fantasy", "Foreign", "History", "Horror", "Music", "Mystery", "Romance", "Science", "Tv", "Thriller", "War", "Western")//将电影类别转换成RDD,已供后边进行笛卡尔积操作val movieTypeRDD: RDD[String] = spark.sparkContext.makeRDD(movieTypeList)//先算出每个电影的平均评分val newRatingDF: DataFrame = spark.sql("select mid,avg(score) as avg from rating group by mid")//将Movie与转换后的新的RatingDF进行Join查询,这个Join查询的策略是查询两个DF都能够匹配到的数据,匹配不到的数据会被过滤掉val movieWithNewRatingDF: DataFrame = movieDF.join(newRatingDF, Seq("mid", "mid"))//将电影类型和电影信息进行笛卡尔积操作,生成N*M条数据,每个类型都会与每个电影进行聚集,这样所有电影都会分配到不同到电影类别,val movieTypeRecommendDF: DataFrame = movieTypeRDD.cartesian(movieWithNewRatingDF.rdd)//过滤掉类型不匹配的电影,因为笛卡尔积就是数据的冗余,每种电影类型都会和每一个电影进行聚集,聚集以后的数据集,如果电影//类型与电影信息中的电影类型不配的话就说名这电影不是该类型下的电影,也就过滤掉了,因为这个部分数据是错误数据,这个操作就是为了//起到匹配每个电影对应的电影类型,把对应不上的也就是错误的数据过滤掉.filter {case (movieType, row) => row.getAs[String]("genres").toLowerCase().contains(movieType.toLowerCase())}//这Map功能就是为了减少不必要的字段.map {case (movieType, row) => (movieType, (row.getAs[Int]("mid"), row.getAs[Double]("avg")))}//这几个group by功能是将相同类型的电影聚集在一起,生成的数据结构是(moveType,Iterable(mid,avg)).groupByKey()//这个Map对象是为了封装成case class对象,并且对groupByKey以后的数据进行排序,排序规则就是对avg评分进行排序,用到了sortWith方法//并且用到了take方法取出前十条数据.map(iter => MovieTypeRecommend(iter._1, iter._2.toList.sortWith(_._2 > _._2).take(10).map(iter => MovieRecommend(iter._1, iter._2)))).toDF()//将每种类型电影的评分Top10保存到MongoDB中movieTypeRecommendDF.write.option("uri", mongoConf.uri).option("collection", "Movie_Type_Recommend").format("com.mongodb.spark.sql").mode(SaveMode.Overwrite).save()spark.stop()} }
推荐系统(一)基于协同过滤算法开发离线推荐相关推荐
- 基于协同过滤算法的电影推荐系统设计(二) - ALS算法详解
0 系列文章目录 0.1 基于协同过滤算法的电影推荐系统设计(一) - 项目简介 0.2 基于协同过滤算法的电影推荐系统设计(二) - 推荐系统介绍 ALS是alternating least squ ...
- 毕业设计之基于协同过滤算法的电影推荐系统设计(一) - 项目简介
由于本人今年毕业,为完成毕设特地想着实现一个简单的推荐系统设计,思来想去,小电影不就是很好的切入点嘛! 于是诞生该项目,将会一步步带着大家实现一个自己的电影推荐系统. 1 研究目标 从应用场景来看,基 ...
- 基于协同过滤算法的商品推荐购物电商系统
一.介绍 商品推荐是针对用户面对海量的商品信息而不知从何下手的一种解决方案,它可以根据用户的喜好,年龄,点击量,购买量以及各种购买行为来为用户推荐合适的商品.在本项目中采用的是基于用户的协同过滤的推荐 ...
- 基于协同过滤算法的书籍推荐 毕业设计-附源码101555
摘 要 21世纪的今天,随着社会的不断发展与进步,人们对于信息科学化的认识,已由低层次向高层次发展,由原来的感性认识向理性认识提高,管理工作的重要性已逐渐被人们所认识,科学化的管理,使信息存储达到准 ...
- (附源码)python+mysql+基于协同过滤算法的书籍推荐 毕业设计101555
摘 要 21世纪的今天,随着社会的不断发展与进步,人们对于信息科学化的认识,已由低层次向高层次发展,由原来的感性认识向理性认识提高,管理工作的重要性已逐渐被人们所认识,科学化的管理,使信息存储达到准确 ...
- 基于协同过滤算法SSM个性化推荐购物商城设计
程序开发软件:Eclipse或Idea 数据库:mysql 此网站是一个全品类的购物商城系统,然后在传统的购物商城的基础上面加入了协同过滤算法,包括了基于用户的协同过滤算法和基于商品的协同过滤算法 ...
- 【毕业设计_课程设计】基于协同过滤算法的个性化推荐系统(源码+论文)
文章目录 0 项目说明 1 研究目的 2 研究方法 3 系统设计 3.1 前台模块 3.1.1 首页 3.1.2 个人中心 3.1.3 发布者中心 3.2 后台模块 3.2.1 首页 3.2.2 新闻 ...
- 计算机毕业设计之java+ssm基于协同过滤算法的图书推荐系统
计算机毕业设计之java+ssm基于协同过滤算法的图书推荐系统 项目介绍 "互联网:"的战略实施后,很多行业的信息化水平都有了很大的提升.但是目前很多行业的管理仍是通过人工管理的方 ...
- 基于协同过滤算法的个性化新闻推荐系统
基于协同过滤算法的个性化新闻推荐系统能够根据对用户在网站内的操作记录的分析,为用户推荐可能喜欢的新闻内容.另外,该系统还实现了新闻的新增.改.查.删操作,以及新闻的评论和回复.新闻评论管理等. 二.研 ...
最新文章
- 使用SqlBulkCopy进行数据大批量的迁移
- [置顶] PHP如何扩展和如何在linux底层对php扩展?
- PAT甲级1004 Counting Leaves (30分):[C++题解]树、邻接表存储树、dfs遍历树
- 查找数组中未出现的和出现2次的数值 Set Mismatch
- 在windows中手动安装第三方模块
- HDU 1231 最大连续子序列 (动态规划)
- 【温故知新】——原生js中常用的四种循环方式
- mysql mac 中文乱码_Mac mysql 解决中文乱码
- code vs 集成tfs_10大Python集成开发环境和代码编辑器
- 手淘移动适配方案flexible.js兼容bug处理
- 0.3-87 GHz频段手持频谱分析仪 —— SAF Spectrum Compact
- centos7.5安装mysql数据库_CentOS7.5安装Mysql5.7.22
- 双曲余弦函数实现(C语言)
- 手机开热点但是电脑一直连接不上_电脑连接不上手机热点解决办法
- 网络的层级及各层级的作用
- 常用软件安装及破解——IntelliJ IDEA
- Addon SuperEdge 让原生 K8s 集群可管理边缘应用和节点
- php Spreadsheet Csv,使用 PhpSpreadsheet 实现读取写入 Execl
- Unity3D 2018 3.0新手入门
- 利用会声会影在局部,打马赛克