使用Spark DataFrame实现基于物品的协同过滤算法(ItemCF)
简书不支持Markdown Math语法,请移步https://glassywing.github.io/2018/04/10/spark-itemcf/
简介
当前spark支持的协同过滤算法只有ALS(基于模型的协同过滤算法),但ALS算法对于某些特定的问题,效果并不理想,不像mahout提供了各种推荐算法。为了享受到spark在速度上带来的提升同时为满足一些业务需求,遂使用spark构建ItemCF算法。同时spark提供了新的DataFrame数据类型,使算法开发更加清晰和易于实现,
前提
本文需要你对基于物品的协同过滤算法(ItemCF)的基本计算过程有一定了解,若未了解过ItemCF,请参阅《基于领域的协同过滤算法 : UserCF and ItemCF》。
本文使用spark的DataFrame数据类型进行开发,而非RDD。若不了解DataFrame,请参阅《Spark SQL, DataFrames and Datasets Guide》。
常用相似度计算公式
协同过滤算法中最重要的部分是要计算物品间的相似度,对于不同的场景,可以应用不同的相似度计算公式来计算相似度,常用的相似度计算公式如下所示:
同现相似度(Co Occurrence)
同现相似度公式
$$ w(x,y)=\frac{|N(x)\cap{N(y)}|}{|N(x)|} $$
公式中分母是喜欢物品x的用户数,而分子则是同时对物品x和物品y感兴趣的用户数。因此,上述公式可用理解为对物品x感兴趣的用户有多大概率也对y感兴趣 (和关联规则类似)
但上述的公式存在一个问题,如果物品y是热门物品,有很多人都喜欢,则会导致W(x, y)很大,接近于1。因此会造成任何物品都和热门物品交有很大的相似度。为此我们用如下公式进行修正:
改进的同现相似度公式
$$ w(x,y)=\frac{|N(x)\cap{N(y)}|}{\sqrt{|N(x)||N(y)|}} $$
这个格式惩罚了物品y的权重,因此减轻了热门物品和很多物品相似的可能性。(也归一化了)
欧几里得相似度(Eucledian Similarity)
欧几里得相似度根据欧几里得距离计算而来,距离越近相似度越高,反之相反。
欧几里得距离定义
在数学中,欧几里得距离或欧几里得度量是欧几里得空间中两点间“普通”(即直线)距离。使用这个距离,欧氏空间成为度量空间。相关联的范数称为欧几里得范数。较早的文献称之为毕达哥拉斯度量。
欧几里得距离公式
$$ \ d_{X,Y}=\sqrt{ \sum_{i=1}n(x_i-y_i)2} $$
皮尔逊相似度
皮尔逊相关系数,即概率论中的相关系数,取值范围[-1,+1]。当大于零时,两个变量正相关,当小于零时表示两个向量负相关。
皮尔逊积矩相关系数定义
两个变量之间的皮尔逊相关系数定义为两个变量之间的协方差和标准差的商:
皮尔逊积矩相关系数公式
$$ \rho_{X,Y}=\frac{cov(X,Y)}{\sigma_{x}\sigma_{y}}=\frac{E((X-\mu_x)(Y-\mu_y))}{\sigma_{x}\sigma_{y}}=\frac{E(XY)-E(X)E(Y)}{\sqrt{E(X2)-E2(X)}\sqrt{E(Y2)-E2(Y)}} $$
余弦相似度(Cosine Similarity)
利用多维空间两点与所设定的点形成夹角的余弦值范围为[-1,1],值越大,说明夹角越大,两点相距就越远,相似度就越小。
向量间余弦定义
多维空间两点与所设定的点形成夹角的余弦值
余弦计算公式
$$ sim_{X,Y}=\frac{XY}{||X||||Y||}=\frac{ \sum_{i=1}n(x_iy_i)}{\sqrt{\sum_{i=1}n(x_i)2}*\sqrt{\sum_{i=1}n(y_i)^2}} $$
公式中$ x_i $表示第i个用户对物品x的评分,$ y_i $同理。
该公式只考虑到了用户的评分,很可能评分较高的物品会排在前面而不管物品的其它信息,改进版的余弦相似度计算公式如下:
改进的余弦相似度计算公式
$$ sim_{X,Y}=\frac{XYnum_{X\cap{Y}}}{||X||||Y||num_{X}log10(10+num_{Y})} $$
改进公式考虑到了两个向量相同个体个数、X向量大小、Y向量大小,注意:
$$ \ sim_{X,Y}\neq sim_{Y,X} $$
Tanimoto 相似度(Jaccard 系数)
Tanimoto相似度也称为Jaccard系数,是Cosine相似度扩展,多用于文档相似度就算。此相似度不考虑评价值,只考虑两个集合共同个体数量。
Jaccard 系数公式
$$ sim(x,y)=\frac{X\cap{Y}}{||X||+||Y||-||X\cap{Y}||} $$
预测用户评分公式
$$ pred_{u,p}=\frac{\sum_{i\in{ratedItems(u)}}{sim(i,p)r_{u,i}}}{\sum_{i\in{ratedItems(u)}}{sim(i,p)}} $$
公式中u指用户,p值物品,ratedItems(u)指用户u评价过的物品,sim指相似度(item之间的),r指用户对物品评分。
构建ItemCFModel
类定义
// 物品信息case class Item(itemId: Int, itemName: String)// 用户-物品-评分case class Rating(userId: Int, itemId: Int, rating: Float)// 用户信息case class User(userId: Int, userName: String)
相似度度量
/*** SIMILARITY MEASURES*/object SimilarityMeasures { /*** The Co-occurrence similarity between two vectors A, B is* |N(i) ∩ N(j)| / sqrt(|N(i)||N(j)|)*/def cooccurrence(numOfRatersForAAndB: Long, numOfRatersForA: Long, numOfRatersForB: Long): Double = {numOfRatersForAAndB / math.sqrt(numOfRatersForA * numOfRatersForB)} /*** The correlation between two vectors A, B is* cov(A, B) / (stdDev(A) * stdDev(B))** This is equivalent to* [n * dotProduct(A, B) - sum(A) * sum(B)] /* sqrt{ [n * norm(A)^2 - sum(A)^2] [n * norm(B)^2 - sum(B)^2] }*/def correlation(size: Double, dotProduct: Double, ratingSum: Double,rating2Sum: Double, ratingNormSq: Double, rating2NormSq: Double): Double = { val numerator = size * dotProduct - ratingSum * rating2Sum val denominator = scala.math.sqrt(size * ratingNormSq - ratingSum * ratingSum) *scala.math.sqrt(size * rating2NormSq - rating2Sum * rating2Sum)numerator / denominator} /*** Regularize correlation by adding virtual pseudocounts over a prior:* RegularizedCorrelation = w * ActualCorrelation + (1 - w) * PriorCorrelation* where w = # actualPairs / (# actualPairs + # virtualPairs).*/def regularizedCorrelation(size: Double, dotProduct: Double, ratingSum: Double,rating2Sum: Double, ratingNormSq: Double, rating2NormSq: Double,virtualCount: Double, priorCorrelation: Double): Double = { val unregularizedCorrelation = correlation(size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq) val w = size / (size + virtualCount)w * unregularizedCorrelation + (1 - w) * priorCorrelation} /*** The cosine similarity between two vectors A, B is* dotProduct(A, B) / (norm(A) * norm(B))*/def cosineSimilarity(dotProduct: Double, ratingNorm: Double, rating2Norm: Double): Double = {dotProduct / (ratingNorm * rating2Norm)} /*** The improved cosine similarity between two vectors A, B is* dotProduct(A, B) * num(A ∩ B) / (norm(A) * norm(B) * num(A) * log10(10 + num(B)))*/def improvedCosineSimilarity(dotProduct: Double, ratingNorm: Double, rating2Norm: Double, numAjoinB: Long, numA: Long, numB: Long): Double = {dotProduct * numAjoinB / (ratingNorm * rating2Norm * numA * math.log10(10 + numB))} /*** The Jaccard Similarity between two sets A, B is* |Intersection(A, B)| / |Union(A, B)|*/def jaccardSimilarity(usersInCommon: Double, totalUsers1: Double, totalUsers2: Double): Double = { val union = totalUsers1 + totalUsers2 - usersInCommonusersInCommon / union} }
计算物品相似度
def fit(ratings: Dataset[Rating]): ItemCFModel = { this.ratings = Option(ratings) val numRatersPerItem = ratings.groupBy("itemId").count().alias("nor").coalesce(defaultParallelism) // 在原记录基础上加上item的打分者的数量val ratingsWithSize = ratings.join(numRatersPerItem, "itemId").coalesce(defaultParallelism) // 执行内联操作ratingsWithSize.join(ratingsWithSize, "userId").toDF("userId", "item1", "rating1", "nor1", "item2", "rating2", "nor2").selectExpr("userId", "item1", "rating1", "nor1", "item2", "rating2", "nor2", "rating1 * rating2 as product", "pow(rating1, 2) as rating1Pow", "pow(rating2, 2) as rating2Pow").coalesce(defaultParallelism).createOrReplaceTempView("joined") // 计算必要的中间数据,注意此处有WHERE限定,只计算了一半的数据量val sparseMatrix = spark.sql( """|SELECT item1|, item2|, count(userId) as size|, sum(product) as dotProduct|, sum(rating1) as ratingSum1|, sum(rating2) as ratingSum2|, sum(rating1Pow) as ratingSumOfSq1|, sum(rating2Pow) as ratingSumOfSq2|, first(nor1) as nor1|, first(nor2) as nor2|FROM joined|WHERE item1 < item2|GROUP BY item1, item2""".stripMargin).coalesce(defaultParallelism).cache() // 计算物品相似度var sim = sparseMatrix.map(row => { val size = row.getAs[Long](2) val dotProduct = row.getAs[Double](3) val ratingSum1 = row.getAs[Double](4) val ratingSum2 = row.getAs[Double](5) val ratingSumOfSq1 = row.getAs[Double](6) val ratingSumOfSq2 = row.getAs[Double](7) val numRaters1 = row.getAs[Long](8) val numRaters2 = row.getAs[Long](9) val cooc = cooccurrence(size, numRaters1, numRaters2) val corr = correlation(size, dotProduct, ratingSum1, ratingSum2, ratingSumOfSq1, ratingSumOfSq2) val regCorr = regularizedCorrelation(size, dotProduct, ratingSum1, ratingSum2,ratingSumOfSq1, ratingSumOfSq2, PRIOR_COUNT, PRIOR_CORRELATION) val cosSim = cosineSimilarity(dotProduct, scala.math.sqrt(ratingSumOfSq1), scala.math.sqrt(ratingSumOfSq2)) val impCosSim = improvedCosineSimilarity(dotProduct, math.sqrt(ratingSumOfSq1), math.sqrt(ratingSum2), size, numRaters1, numRaters2) val jaccard = jaccardSimilarity(size, numRaters1, numRaters2)(row.getInt(0), row.getInt(1), cooc, corr, regCorr, cosSim, impCosSim, jaccard)}).toDF("itemId_01", "itemId_02", "cooc", "corr", "regCorr", "cosSim", "impCosSim", "jaccard") // 最终的物品相似度sim.withColumnRenamed("itemId_01", "itemId_02").withColumnRenamed("itemId_02", "itemId_01").union(sim).repartition(defaultParallelism) // 重新分区,以便数据均匀分布,方便下游用户使用.cache()similarities = Option(sim) this}
用户推荐
/*** 为指定的用户推荐num个物品** @param users 用户集* @param num 为每个用户推荐的物品数量* @return 推荐表*/def recommendForUsers(users: Dataset[User], num: Int): DataFrame = { // similarityMeasure为相似度算法名var sim = similarities.get.select("itemId_01", "itemId_02", similarityMeasure) // 获得评分表val rits = ratings.get val project: DataFrame = users.selectExpr("userId as user", "userName") // 进行子投影,此处左表数量远小于右表,执行左连接.join(rits, $"user" <=> rits("userId"), "left").drop($"user") // 选择与用户相关的物品以及评分.select("userId", "itemId", "rating") // 获得用户感兴趣的物品与其它物品的相似度project.join(sim, $"itemId" <=> sim("itemId_01")).selectExpr("userId", "itemId_01 as relatedItem", "itemId_02 as otherItem", similarityMeasure, s"$similarityMeasure * rating as simProduct").coalesce(defaultParallelism).createOrReplaceTempView("tempTable")spark.sql( s"""|SELECT userId|, otherItem|, sum(simProduct) / sum($similarityMeasure) as rating|FROM tempTable|GROUP BY userId, otherItem|ORDER BY userId asc, rating desc""".stripMargin) // 过滤结果.rdd.map(row => (row.getInt(0), (row.getInt(1), row.getDouble(2)))).groupByKey().mapValues(xs => { var sequence = Seq[(Int, Double)]() val iter = xs.iterator var count = 0while (iter.hasNext && count < num) { val rat = iter.next() if (rat._2 != Double.NaN)sequence :+= (rat._1, rat._2)count += 1}sequence}).toDF("userId", "recommended")}
相似度计算结果展示
数据来源
数据来自MovieLens,MovieLens数据集是一个关于电影评分的数据集,里面包含了从IMDB, The Movie DataBase上面得到的用户对电影的评分信息。
计算出的物品间相似度
以下展示了使用同现相似度,余弦相似度以及改进版进行相似度计算后(其它相似度请自行测试)的电影间的相似度,并以《星球大战(1977)》进行测试的结果(只显示了前20个结果)。
令人惊讶的是余弦相似度的结果似乎不太令人满意,这似乎是因为余弦相似度只和用户评分有关(更适用于推荐5星电影,不关心电影的类型等),也可能是我的算法出现了差错,欢迎指正。
同现相似度结果展示
movie1 | movie2 | coocurrence |
---|---|---|
星球大战(1977) | 绝地归来(1983) | 0.8828826458931883 |
星球大战(1977) | 迷失方舟攻略(1981) | 0.7679353753201742 |
星球大战(1977) | 帝国反击,(1980) | 0.7458505006229118 |
星球大战(1977) | 教父,The(1972) | 0.7275434127191666 |
星球大战(1977) | 法戈(1996) | 0.7239858668831711 |
星球大战(1977) | 独立日(ID4)(1996) | 0.723845113716724 |
星球大战(1977) | 沉默的羔羊,The(1991) | 0.7025515983155468 |
星球大战(1977) | 印第安纳琼斯和最后的十字军东征(1989) | 0.6920306174608959 |
星球大战(1977) | 低俗小说(1994) | 0.6885437675802282 |
星球大战(1977) | 星际迷航:第一次接触(1996) | 0.6850249237265413 |
星球大战(1977) | 回到未来(1985) | 0.6840536741086217 |
星球大战(1977) | 逃亡者,The(1993) | 0.6710463728397225 |
星球大战(1977) | 摇滚,The(1996) | 0.6646215466055597 |
星球大战(1977) | 终结者,The(1984) | 0.6636319257721421 |
星球大战(1977) | 阿甘正传(1994) | 0.6564951869930893 |
星球大战(1977) | 终结者2:审判日(1991) | 0.653467518885383 |
星球大战(1977) | Princess Bride,The(1987) | 0.6534487891771482 |
星球大战(1977) | 异形(1979) | 0.648232034779792 |
星球大战(1977) | E.T。外星(1982) | 0.6479990753086882 |
星球大战(1977) | 巨蟒和圣杯(1974) | 0.6476896799641126 |
余弦相似度结果展示
余弦相似度
movie1 | movie2 | cosSim |
---|---|---|
星球大战(1977) | Infinity(1996) | 1.0 |
星球大战(1977) | Mostro,Il(1994) | 1.0 |
星球大战(1977) | Boys,Les(1997) | 1.0 |
星球大战(1977) | 陌生人,(1994) | 1.0 |
星球大战(1977) | 爱是一切(1996) | 1.0 |
星球大战(1977) | 巴黎是女人(1995) | 1.0 |
星球大战(1977) | 遇难者,A(1937) | 1.0 |
星球大战(1977) | 馅饼在天空(1995) | 1.0 |
星球大战(1977) | 世纪(1993) | 1.0 |
星球大战(1977) | 天使在我的肩膀(1946) | 1.0 |
星球大战(1977) | 这里来曲奇(1935) | 1.0 |
星球大战(1977) | 力量98(1995) | 1.0 |
星球大战(1977) | 滑稽女郎(1943) | 1.0 |
星球大战(1977) | 火山(1996) | 1.0 |
星球大战(1977) | 难忘的夏天(1994) | 1.0 |
星球大战(1977) | Innocents,The(1961) | 1.0 |
星球大战(1977) | Sleepover(1995) | 1.0 |
星球大战(1977) | 木星的妻子(1994) | 1.0 |
星球大战(1977) | 我的生活与时代与安东宁·阿托(En compagnie d'Antonin Artaud)(1993) | 1.0 |
星球大战(1977) | Bent(1997) | 1.0 |
改进余弦相似度结果展示
改进余弦相似度
movie1 | movie2 | impCosSim |
---|---|---|
星球大战(1977) | 绝地归来(1983) | 0.6151374130038775 |
星球大战(1977) | 失落方舟攻略(1981) | 0.5139215764696529 |
星球大战(1977) | 法戈(1996) | 0.4978221397190352 |
星球大战(1977) | 帝国反击,The(1980) | 0.47719131109655355 |
星球大战(1977) | 教父,The(1972) | 0.4769568086870377 |
星球大战(1977) | 沉默的羔羊,The(1991) | 0.449096021012343 |
星球大战(1977) | 独立日(ID4)(1996) | 0.4334888029282058 |
星球大战(1977) | 低俗小说(1994) | 0.43054394420596026 |
星球大战(1977) | 联系(1997) | 0.4093441266211224 |
星球大战(1977) | 印第安纳琼斯和最后的十字军东征(1989) | 0.4080635382244593 |
星球大战(1977) | 回到未来(1985) | 0.4045977014813726 |
星球大战(1977) | 星际迷航:第一次接触(1996) | 0.40036290288050874 |
星球大战(1977) | 逃亡者,The(1993) | 0.3987919640908379 |
星球大战(1977) | Princess Bride,The(1987) | 0.39490206690864144 |
星球大战(1977) | 摇滚,The(1996) | 0.39100622194841916 |
星球大战(1977) | 巨蟒与圣杯(1974) | 0.3799595474408077 |
星球大战(1977) | 终结者,The(1984) | 0.37881311350029406 |
星球大战(1977) | 阿甘正传(1994) | 0.3755685058241706 |
星球大战(1977) | 终结者2:审判日(1991) | 0.37184317281514295 |
星球大战(1977) | 杰瑞马奎尔(1996) | 0.370478212770262 |
作者:manlier
链接:https://www.jianshu.com/p/169aad3cfddd
使用Spark DataFrame实现基于物品的协同过滤算法(ItemCF)相关推荐
- 推荐系统实践(二)----基于物品的协同过滤算法(ItemCF)
上一篇博客我简单讲了一下基于用户的协同过滤算法,这里我们一起来学习一下另一种:基于物品的协同过滤算法.基于物品的协同过滤算法是目前业界应用最多的算法,亚马逊.Netflix.Hulu.YouTub ...
- 基于物品的协同过滤算法ItemCF
基于物品的协同过滤算法ItemCF 基于item的协同过滤,通过用户对不同item的评分来评测item之间的相似性,基于item之间的相似性做出推荐.简单来讲就是:给用户推荐和他之前喜欢的物品相似的物 ...
- Hadoop案例之基于物品的协同过滤算法ItemCF
Hadoop案例之基于物品的协同过滤算法ItemCF 转载自:http://blog.csdn.net/qq1010885678/article/details/50751607?locationNu ...
- itemCF matlab算法,基于物品的协同过滤算法(ItemCF)
物品相似度计算 余弦相似度公式: 其中 , 分别表示对物品 , 喜欢的用户数, 为同时喜欢 和 的人数.我们这里还是使用漫威英雄举例:假设目前共有5个用户: A.B.C.D.E:共有5个漫威英雄人物: ...
- ItemCF,基于物品的协同过滤算法
转载自 ItemCF,基于物品的协同过滤算法 ItemCF:Item Collaboration Filter,基于物品的协同过滤 算法核心思想:给用户推荐那些和他们之前喜欢的物品相似的物品. 比 ...
- 推荐算法 itemcf java_推荐系统之基于物品的协同过滤算法(ItemCF)
推荐系统之基于物品的协同过滤算法(ItemCF) 发布时间:2018-03-04 16:55, 浏览次数:1778 , 标签: ItemCF 推荐系统之基于物品的协同过滤算法(ItemCF) 前端时间 ...
- 大数据Hadoop学习之————基于物品的协同过滤算法实现物品推荐
一.基础概念 协同过滤算法一般分为两种实现: 基于用户的协同过滤算法(userCF):通过寻找相似兴趣的其他用户,为指定用户推荐物品.比如用户A喜欢商品A.B,用户B也喜欢商品A和B,则可以认为用户A ...
- 推荐系统实战(2)——基于物品的协同过滤算法(代码实现),U-CF和I-CF的比较
这里加点东西:有利于理解 1基于CF的推荐算法 1.1算法简介 CF(协同过滤)简单来形容就是利用兴趣相投的原理进行推荐,协同过滤主要分两类,一类是基于物品的协同过滤算法,另一种是基于用户的协同过滤算 ...
- [推荐算法]ItemCF,基于物品的协同过滤算法
[推荐算法]ItemCF,基于物品的协同过滤算法 标签: ItemCF基于用户的协同过滤算法 2015-03-09 15:11 4144人阅读 评论(1) 收藏 举报 本文章已收录于: 分类: ...
最新文章
- C++ 三五法则,看看你能不能理解
- 算法-链表-给定一个数小于该值的在左边等于在中间大于的在右边
- 贝叶斯网络结构学习之K2算法(基于FullBNT-1.0.4的MATLAB实现)
- JDK1.7安装配置环境变量+图文说明Jmeter安装
- OS_CORE.C(1)
- asp.net常用函数 选择自 UAM_Richard 的 Blog
- 电池供电的电容麦_太阳能航空障碍灯供电机制设计
- leetcode95. 不同的二叉搜索树 II(递归)
- yii2中的rules 自定义验证规则详解
- Python 代理类实现和控制访问与修改属性的权限
- 怎么查询AI论文的源代码?
- 7-14 排座位 (25 分)
- Swift 的变化:从 2.2 到 3.0 会带来什么
- Astah Pro 快捷键
- 计算机考研 专业课 数据结构
- java中国象棋棋子走法,JS 中国象棋(1):校验棋子走法
- 阿里云网盘开启公测!不限速、2T永久免费空间!!
- 我参加NVIDIA Sky Hackathon 训练文件的路径设置
- Sentiment Classification towards Question-Answering with Hierarchical Matching Network 论文阅读笔记
- 数学建模实战9(聚类分析)