本文转自http://www.tuicool.com/articles/fANvieZ,所有权力归原作者所有。

本文主要通过Spark官方的例子,理解ALS协同过滤算法的原理和编码过程。

协同过滤

协同过滤 常被应用于推荐系统,旨在补充用户-商品关联矩阵中所缺失的部分。MLlib当前支持基于模型的协同过滤,其中用户和商品通过一小组隐语义因子进行表达,并且这些因子也用于预测缺失的元素。Spark MLlib实现了 交替最小二乘法 (ALS) 来学习这些隐性语义因子。在 MLlib 中的实现有如下的参数:

  • numBlocks 是用于并行化计算的分块个数 (设置为-1,为自动配置)。
  • rank 是模型中隐语义因子的个数。
  • iterations 是迭代的次数。
  • lambda 是ALS的正则化参数。
  • implicitPrefs 决定了是用显性反馈ALS的版本还是用适用隐性反馈数据集的版本。
  • alpha 是一个针对于隐性反馈 ALS 版本的参数,这个参数决定了偏好行为强度的基准。

可以调整这些参数,不断优化结果,使均方差变小。比如:iterations越多,lambda较小,均方差会较小,推荐结果较优。

隐性反馈 vs 显性反馈

基于矩阵分解的协同过滤的标准方法一般将用户商品矩阵中的元素作为用户对商品的显性偏好。

在许多的现实生活中的很多场景中,我们常常只能接触到隐性的反馈(例如游览,点击,购买,喜欢,分享等等)在 MLlib 中所用到的处理这种数据的方法来源于文献: Collaborative Filtering for Implicit Feedback Datasets 。 本质上,这个方法将数据作为二元偏好值和偏好强度的一个结合,而不是对评分矩阵直接进行建模。因此,评价就不是与用户对商品的显性评分而是和所观察到的用户偏好强度关联了起来。然后,这个模型将尝试找到隐语义因子来预估一个用户对一个商品的偏好。

示例

下面代码的均在 Spark 的home目录下运行,并且示例中加载的文件路径为data/mllib/als/test.data ,文件中每一行包括一个用户id、商品id和评分。我们使用默认的 ALS.train() 方法来构建推荐模型并评估模型的均方差。

查看文件内容:

$ more data/mllib/als/test.data
1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,1,5.0
2,2,1.0
2,3,5.0
2,4,1.0
3,1,1.0
3,2,5.0
3,3,1.0
3,4,5.0
4,1,1.0
4,2,5.0
4,3,1.0
4,4,5.0

下面例子来自 http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html ,并做了稍许修改。

Scala 示例

下面代码可以在spark-shell中运行:

import org.apache.log4j.Logger
import org.apache.log4j.Levelimport org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating//设置日志级别
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)// 加载并解析数据
val data = sc.textFile("data/mllib/als/test.data")/**
 * Product ratings are on a scale of 1-5:
 * 5: Must see
 * 4: Will enjoy
 * 3: It's okay
 * 2: Fairly bad
 * 1: Awful
 */
val ratings = data.map(_.split(',') match { case Array(user, product, rate) =>Rating(user.toInt, product.toInt, rate.toDouble)})//使用ALS训练数据建立推荐模型
val rank = 10
val numIterations = 20
val model = ALS.train(ratings, rank, numIterations, 0.01)//从 ratings 中获得只包含用户和商品的数据集
val usersProducts = ratings.map { case Rating(user, product, rate) =>(user, product)
}//使用推荐模型对用户商品进行预测评分,得到预测评分的数据集
val predictions = model.predict(usersProducts).map { case Rating(user, product, rate) => ((user, product), rate)}//将真实评分数据集与预测评分数据集进行合并
val ratesAndPreds = ratings.map { case Rating(user, product, rate) => ((user, product), rate)
}.join(predictions).sortByKey()  //ascending or descending //然后计算均方差,注意这里没有调用 math.sqrt方法
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) => val err = (r1 - r2)err * err
}.mean()//打印出均方差值
println("Mean Squared Error = " + MSE)
//Mean Squared Error = 1.37797097094789E-5

上面的例子中调用了 ALS.train(ratings, rank, numIterations, 0.01) ,我们还可以设置其他参数,调用方式如下:

val model = new ALS().setRank(params.rank).setIterations(params.numIterations).setLambda(params.lambda).setImplicitPrefs(params.implicitPrefs).setUserBlocks(params.numUserBlocks).setProductBlocks(params.numProductBlocks).run(training)

Java示例

import scala.Tuple2;import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import org.apache.spark.SparkConf;public class CollaborativeFiltering {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("Collaborative Filtering Example");JavaSparkContext sc = new JavaSparkContext(conf);// Load and parse the dataString path = "data/mllib/als/test.data";JavaRDD<String> data = sc.textFile(path);JavaRDD<Rating> ratings = data.map(new Function<String, Rating>() {public Rating call(String s) {String[] sarray = s.split(",");return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]), Double.parseDouble(sarray[2]));}});// Build the recommendation model using ALSint rank = 10;int numIterations = 20;MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01); // Evaluate the model on rating dataJavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(new Function<Rating, Tuple2<Object, Object>>() {public Tuple2<Object, Object> call(Rating r) {return new Tuple2<Object, Object>(r.user(), r.product());}});JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){return new Tuple2<Tuple2<Integer, Integer>, Double>(new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());}}));JavaRDD<Tuple2<Double, Double>> ratesAndPreds = JavaPairRDD.fromJavaRDD(ratings.map(new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){return new Tuple2<Tuple2<Integer, Integer>, Double>(new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());}})).join(predictions).values();double MSE = JavaDoubleRDD.fromRDD(ratesAndPreds.map(new Function<Tuple2<Double, Double>, Object>() {public Object call(Tuple2<Double, Double> pair) {Double err = pair._1() - pair._2();return err * err;}}).rdd()).mean();System.out.println("Mean Squared Error = " + MSE);}
}

Python示例

下面代码可以在 pyspark 中运行:

from pyspark.mllib.recommendation import ALS
from numpy import array# Load and parse the data
data = sc.textFile("data/mllib/als/test.data")
ratings = data.map(lambda line: array([float(x) for x in line.split(',')]))# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 20
model = ALS.train(ratings, rank, numIterations)# Evaluate the model on training data
testdata = ratings.map(lambda p: (int(p[0]), int(p[1])))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).reduce(lambda x, y: x + y)/ratesAndPreds.count()
print("Mean Squared Error = " + str(MSE))

总结

协同过滤ALS算法推荐过程

协同过滤ALS算法推荐过程如下:

  • 加载数据到 ratings RDD,每行记录包括:user, product, rate
  • 从 ratings 得到用户商品的数据集:(user, product)
  • 使用ALS对 ratings 进行训练
  • 通过 model 对用户商品进行预测评分:((user, product), rate)
  • 从 ratings 得到用户商品的实际评分:((user, product), rate)
  • 合并预测评分和实际评分的两个数据集,并求均方差

保存推荐结果

上面的例子只是对训练集并进行了评分,我们还可以进一步的给用户推荐商品。以 Scala 程序为例,在原来代码基础上继续执行下面代码:

//为每个用户进行推荐,推荐的结果可以以用户id为key,结果为value存入redis或者hbase中
val users=data.map(_.split(",") match {  case Array(user, product, rate) => (user)
}).distinct().collect()
//users: Array[String] = Array(4, 2, 3, 1)users.foreach(user => {  //依次为用户推荐商品   var rs = model.recommendProducts(user.toInt, numIterations)  var value = ""  var key = 0  //拼接推荐结果rs.foreach(r => {  key = r.user  value = value + r.product + ":" + r.rating + ","  })  println(key.toString+"   " + value)  }
)
//4   4:4.9948551991729,2:4.9948551991729,3:1.0007160894300133,1:1.0007160894300133,
//2   1:4.994747095003154,3:4.994747095003154,2:1.0007376098628127,4:1.0007376098628127,
//3   2:4.9948551991729,4:4.9948551991729,3:1.0007160894300133,1:1.0007160894300133,
//1   3:4.994747095003154,1:4.994747095003154,2:1.0007376098628127,4:1.0007376098628127,

上面的代码调用 model.recommendProducts 方法分别对用户进行推荐,其实在之前的代码中已经计算出了预测的评分,我们可以通过 predictions 或者 ratesAndPreds 来得到最后的推荐结果:

//对预测结果按预测的评分排序
predictions.collect.sortBy(_._2)
//Array[((Int, Int), Double)] = Array(((4,1),1.0007160894300133), ((3,1),1.0007160894300133), ((4,3),1.0007160894300133), ((3,3),1.0007160894300133), ((1,4),1.0007376098628127), ((2,4),1.0007376098628127), ((1,2),1.0007376098628127), ((2,2),1.0007376098628127), ((1,1),4.994747095003154), ((2,1),4.994747095003154), ((1,3),4.994747095003154), ((2,3),4.994747095003154), ((4,4),4.9948551991729), ((3,4),4.9948551991729), ((4,2),4.9948551991729), ((3,2),4.9948551991729))//对预测结果按用户进行分组,然后合并推荐结果,这部分代码待修正
predictions.map{ case ((user, product), rate) => (user, (product,rate) )}.groupByKey.collect//格式化测试评分和实际评分的结果
val formatedRatesAndPreds = ratesAndPreds.map {case ((user, product), (rate, pred)) => user + "," + product + "," + rate + "," + pred
}
//Array(2,1,5.0,4.994747095003154, 4,4,5.0,4.9948551991729, 4,2,5.0,4.9948551991729, 4,1,1.0,1.0007160894300133, 3,4,5.0,4.9948551991729, 1,4,1.0,1.0007376098628127, 3,1,1.0,1.0007160894300133, 2,3,5.0,4.994747095003154, 1,2,1.0,1.0007376098628127, 1,1,5.0,4.994747095003154, 2,2,1.0,1.0007376098628127, 2,4,1.0,1.0007376098628127, 3,2,5.0,4.9948551991729, 3,3,1.0,1.0007160894300133, 4,3,1.0,1.0007160894300133, 1,3,5.0,4.994747095003154)

提示:

因为我对Scala和Spark的语法还不是很熟悉,所以上面的代码中 通过 predictions 或者 ratesAndPreds 来得到最后的推荐结果 的代码尚未给出,待我后续再补充,请见谅!

上面的代码是依次遍历用户然后分别对用户进行推荐,推荐的结果可以以csv、json格式存入到hdfs或者以key/value方式存入到Cassandra、HBase、Redis、Mongodb等分布式数据库。

  • 使用Cassandra保存推荐结果
  • Simple example on how to use recommenders in Spark / MLlib using the Play framework and Mongodb

将数据集分为训练数据和测试数据

参考 ALSBenchmark.scala ,一个 完整的scala代码 如下:

import scala.collection.mutableimport org.apache.log4j.{Level, Logger}
import scopt.OptionParserimport org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD/**
 * An example app for ALS on MovieLens data (http://grouplens.org/datasets/movielens/).
 * Run with
 *
 * bin/run-example org.apache.spark.examples.mllib.MovieLensALS
 *
 * A synthetic dataset in MovieLens format can be found at `data/mllib/sample_movielens_data.txt`.
 * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
 */
object MovieLensALS {case class Params(input: String = null,kryo: Boolean = false,numIterations: Int = 20,lambda: Double = 1.0,rank: Int = 10,numUserBlocks: Int = -1,numProductBlocks: Int = -1,implicitPrefs: Boolean = false)def main(args: Array[String]) {val defaultParams = Params()val parser = new OptionParser[Params]("MovieLensALS") {head("MovieLensALS: an example app for ALS on MovieLens data.")opt[Int]("rank").text(s"rank, default: ${defaultParams.rank}}").action((x, c) => c.copy(rank = x))opt[Int]("numIterations").text(s"number of iterations, default: ${defaultParams.numIterations}").action((x, c) => c.copy(numIterations = x))opt[Double]("lambda").text(s"lambda (smoothing constant), default: ${defaultParams.lambda}").action((x, c) => c.copy(lambda = x))opt[Unit]("kryo").text("use Kryo serialization").action((_, c) => c.copy(kryo = true))opt[Int]("numUserBlocks").text(s"number of user blocks, default: ${defaultParams.numUserBlocks} (auto)").action((x, c) => c.copy(numUserBlocks = x))opt[Int]("numProductBlocks").text(s"number of product blocks, default: ${defaultParams.numProductBlocks} (auto)").action((x, c) => c.copy(numProductBlocks = x))opt[Unit]("implicitPrefs").text("use implicit preference").action((_, c) => c.copy(implicitPrefs = true))arg[String]("<input>").required().text("input paths to a MovieLens dataset of ratings").action((x, c) => c.copy(input = x))}parser.parse(args, defaultParams).map { params =>run(params)} getOrElse {System.exit(1)}}def run(params: Params) {val conf = new SparkConf().setAppName(s"MovieLensALS with $params")if (params.kryo) {conf.registerKryoClasses(Array(classOf[mutable.BitSet], classOf[Rating])).set("spark.kryoserializer.buffer.mb", "8")}val sc = new SparkContext(conf)Logger.getRootLogger.setLevel(Level.WARN)//是否显性反馈val implicitPrefs = params.implicitPrefs//数据集val ratings = sc.textFile(params.input).map { line =>val fields = line.split("::")if (implicitPrefs) {/*
         * MovieLens ratings are on a scale of 1-5:
         * 5: Must see
         * 4: Will enjoy
         * 3: It's okay
         * 2: Fairly bad
         * 1: Awful
         * So we should not recommend a movie if the predicted rating is less than 3.
         * To map ratings to confidence scores, we use
         * 5 -> 2.5, 4 -> 1.5, 3 -> 0.5, 2 -> -0.5, 1 -> -1.5. This mappings means unobserved
         * entries are generally between It's okay and Fairly bad.
         * The semantics of 0 in this expanded world of non-positive weights
         * are "the same as never having interacted at all".
         */Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5)} else {Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)}}.cache()val numRatings = ratings.count()val numUsers = ratings.map(_.user).distinct().count()val numMovies = ratings.map(_.product).distinct().count()println(s"Got $numRatings ratings from $numUsers users on $numMovies movies.")//拆分数据,80%为训练集,20%为测试集val splits = ratings.randomSplit(Array(0.8, 0.2))val training = splits(0).cache()val test = if (params.implicitPrefs) {/*
       * 0 means "don't know" and positive values mean "confident that the prediction should be 1".
       * Negative values means "confident that the prediction should be 0".
       * We have in this case used some kind of weighted RMSE. The weight is the absolute value of
       * the confidence. The error is the difference between prediction and either 1 or 0,
       * depending on whether r is positive or negative.
       */splits(1).map(x => Rating(x.user, x.product, if (x.rating > 0) 1.0 else 0.0))} else {splits(1)}.cache()val numTraining = training.count()val numTest = test.count()println(s"Training: $numTraining, test: $numTest.")ratings.unpersist(blocking = false)val start = System.currentTimeMillis()val model = new ALS().setRank(params.rank).setIterations(params.numIterations).setLambda(params.lambda).setImplicitPrefs(params.implicitPrefs).setUserBlocks(params.numUserBlocks).setProductBlocks(params.numProductBlocks).run(training)val end = System.currentTimeMillis()println("Train Time = " + (end-start)*1.0/1000)val rmse = computeRmse(model, test, params.implicitPrefs)println(s"Train RMSE = " + computeRmse(model, training,params.implicitPrefs))println(s"Test RMSE = $rmse.")sc.stop()}/** Compute RMSE (Root Mean Squared Error). */def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], implicitPrefs: Boolean) = {def mapPredictedRating(r: Double) = if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else rval predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))val predictionsAndRatings = predictions.map{ x =>((x.user, x.product), mapPredictedRating(x.rating))}.join(data.map(x => ((x.user, x.product), x.rating))).valuesmath.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).mean())}
}

总结

通过本文主要熟悉了如何对输入数据进行推荐和评分,要想完全掌握本文中的示例代码并做到融会贯通,还需要花些时间熟悉和掌握Scala和Spark DataFrame的语法。希望这篇文章能够对你理解Spark的协同过滤算法有所帮助。

后续还需要掌握以下内容:

  • ALS中各个参数对推荐结果和评分的影响
  • 如何保存推荐结果

Spark MLlib中的协同过滤相关推荐

  1. 基于Spark MLlib平台的协同过滤算法---电影推荐系统

    协同过滤算法概述 基于模型的协同过滤应用---电影推荐 实时推荐架构分析     一.协同过滤算法概述 本人对算法的研究,目前还不是很深入,这里简单的介绍下其工作原理. 通常,协同过滤算法按照数据使用 ...

  2. spark MLlib平台的协同过滤算法---电影推荐系统

    又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用. 说到推荐系统,大家可能立马会想到协同过滤算法.本文基于Spark MLlib平台实 ...

  3. 使用spark mllib中协同过滤推荐算法ALS建立推荐模型

    使用spark mllib中协同过滤推荐算法ALS建立推荐模型 package com.yyds.tags.ml.rs.rddimport org.apache.spark.mllib.evaluat ...

  4. Spark MLlib中支持二次训练的模型算法

    在Spark MLlib中可以做二次训练的模型 大家好,我是心情有点低落的一拳超人 今天给大家带来我整理的Spark 3.0.1 MLlib库中可以做二次训练的模型总结,首先给大家介绍一下什么是二次训 ...

  5. 离线轻量级大数据平台Spark之MLib机器学习协同过滤ALS实例

    1.协同过滤 协同过滤(Collaborative Filtering,简称CF,WIKI上的定义是:简单来说是利用某个兴趣相投.拥有共同经验之群体的喜好来推荐感兴趣的资讯给使用者,个人透过合作的机制 ...

  6. 基于MLlib的机器学习--协同过滤与推荐

    <Spark快速大数据分析> 11.5.4 协同过滤与推荐 协同过滤是一种根据用户对各种产品的交互与评分来推荐新产品的推荐系统技术. 协同过滤引入的地方就在于它只需要输入一系列用户/产品的 ...

  7. 协同过滤算法_机器学习 | 简介推荐场景中的协同过滤算法,以及SVD的使用

    本文始发于个人公众号:TechFlow,原创不易,求个关注 今天是机器学习专题的第29篇文章,我们来聊聊SVD在上古时期的推荐场景当中的应用. 推荐的背后逻辑 有没有思考过一个问题,当我们在淘宝或者是 ...

  8. TensorFlow2实现协同过滤算法中的矩阵分解(首家基于TS2版本)

    目标: 用TensorFlow2,实现协同过滤算法中的矩阵分解.网上找的绝大部分是基于一个模板复制出来的,且基于TensorFlow1,因此本人亲自动手,用TensorFlow2实现. 好奇为什么Te ...

  9. 推荐系统中协同过滤算法实现分析

    原创博客,欢迎转载,转载请注明:http://my.oschina.net/BreathL/blog/62519 最近研究Mahout比较多,特别是里面协同过滤算法:于是把协同过滤算法的这个实现思路与 ...

最新文章

  1. 华为存储iscsi配置_网络+存储+虚拟化:三大要素构建新网络
  2. Leetcode 刷题笔记
  3. Java线程总结(转)
  4. 【Redis】redis 哨兵模式
  5. 进大学时高考成绩是班里第一,同样也是努力学习,为什么大学时做不到第一了呢?
  6. MFC使用简单总结(便于以后查阅)
  7. 向视图中插入的数据能进入到基本表中去吗?_数据库调优,调的是什么及常见手法...
  8. 初入门-游戏设计思路拆解
  9. 碰到 oracle 10g ORA-00257
  10. 计算机及信息系统管理规范,系统运维管理 计算机信息系统运营和维护管理规范方案.doc...
  11. 山海经鸿蒙手游iOS 版本,山海经鸿蒙异兽手游下载,山海经鸿蒙异兽手游最新官方版 v1.0-手游汇...
  12. FaceBook 扎克伯格的创业史
  13. html 防网页假死,html5 WebWorkers 防止浏览器假死
  14. translate函数的用法
  15. myisam和innodb区别
  16. 海子-JVM的内存区域划分
  17. Avaya CEO:愿意与思科、微软共享客户
  18. 南开大学统计与数据科学院夏令营
  19. TinkerCAD使用笔记000--制作4080铝型材模型
  20. Linux网络协议栈3--neighbor子系统

热门文章

  1. spring事务源码解析
  2. Codeforces Beta Round #8 C. Looking for Order 状压dp
  3. CSS彻底研究(1)
  4. java列表框_Java图形用户界面之列表框
  5. RTT时钟管理篇——RTT定时器超时判断理解
  6. html 内嵌xml数据库,是否可以在SQLite数据库中存储XML/HTML文件?
  7. gp数据库迁移数据到mysql_greenplum数据迁移
  8. mac 10.13 配置 php,MacOS10.13.6 升级后 PHP7.3配置
  9. python统计汉字字数_Python 统计字数的思路详解
  10. 如何优雅地添加MGR节点?