2019独角兽企业重金招聘Python工程师标准>>>

ALS简介

ALS是alternating least squares的缩写 , 意为交替最小二乘法;而ALS-WR是alternating-least-squares with weighted-λ -regularization的缩写,意为加权正则化交替最小二乘法。该方法常用于基于矩阵分解的推荐系统中。例如:将用户(user)对商品(item)的评分矩阵分解为两个矩阵:一个是用户对商品隐含特征的偏好矩阵,另一个是商品所包含的隐含特征的矩阵。在这个矩阵分解的过程中,评分缺失项得到了填充,也就是说我们可以基于这个填充的评分来给用户最商品推荐了。
ALS is the abbreviation of squares alternating least, meaning the alternating least squares method; and the ALS-WR is alternating-least-squares with weighted- lambda -regularization acronym, meaning weighted regularized alternating least squares method. This method is often used in recommender systems based on matrix factorization. For example, the user (user) score matrix of item is decomposed into two matrices: one is the user preference matrix for the implicit features of the commodity, and the other is the matrix of the implied features of the commodity. In the process of decomposing the matrix, the score missing is filled, that is, we can give the user the most recommended commodity based on the filled score.

ALS-WR算法,简单地说就是:
(数据格式为:userId, itemId, rating, timestamp )
1 对每个userId随机初始化N(10)个factor值,由这些值影响userId的权重。
2 对每个itemId也随机初始化N(10)个factor值。
3 固定userId,从userFactors矩阵和rating矩阵中分解出itemFactors矩阵。即[Item Factors Matrix] = [User Factors Matrix]^-1 * [Rating Matrix].
4 固定itemId,从itemFactors矩阵和rating矩阵中分解出userFactors矩阵。即[User Factors Matrix] = [Item Factors Matrix]^-1 * [Rating Matrix].
5 重复迭代第3,第4步,最后可以收敛到稳定的userFactors和itemFactors。
6 对itemId进行推断就为userFactors * itemId = rating value;对userId进行推断就为itemFactors * userId = rating value。

Spark支持ML和MLLIB两种机器学习库,官方推荐的是ML, 因为ML功能更全面更灵活,未来会主要支持ML。

ML实现ALS推荐:

/*** @author huangyueran* @category ALS-WR*/
public class JavaALSExampleByMl {private static final Logger log = LoggerFactory.getLogger(JavaALSExampleByMl.class);public static class Rating implements Serializable {// 0::2::3::1424380312private int userId; // 0private int movieId; // 2private float rating; // 3private long timestamp; // 1424380312public Rating() {}public Rating(int userId, int movieId, float rating, long timestamp) {this.userId = userId;this.movieId = movieId;this.rating = rating;this.timestamp = timestamp;}public int getUserId() {return userId;}public int getMovieId() {return movieId;}public float getRating() {return rating;}public long getTimestamp() {return timestamp;}public static Rating parseRating(String str) {String[] fields = str.split("::");if (fields.length != 4) {throw new IllegalArgumentException("Each line must contain 4 fields");}int userId = Integer.parseInt(fields[0]);int movieId = Integer.parseInt(fields[1]);float rating = Float.parseFloat(fields[2]);long timestamp = Long.parseLong(fields[3]);return new Rating(userId, movieId, rating, timestamp);}}public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("JavaALSExample").setMaster("local");JavaSparkContext jsc = new JavaSparkContext(conf);SQLContext sqlContext = new SQLContext(jsc);JavaRDD<Rating> ratingsRDD = jsc.textFile("data/sample_movielens_ratings.txt").map(new Function<String, Rating>() {public Rating call(String str) {return Rating.parseRating(str);}});Dataset<Row> ratings = sqlContext.createDataFrame(ratingsRDD, Rating.class);Dataset<Row>[] splits = ratings.randomSplit(new double[]{0.8, 0.2}); // //对数据进行分割,80%为训练样例,剩下的为测试样例。Dataset<Row> training = splits[0];Dataset<Row> test = splits[1];// Build the recommendation model using ALS on the training dataALS als = new ALS().setMaxIter(5) // 设置迭代次数.setRegParam(0.01) // //正则化参数,使每次迭代平滑一些,此数据集取0.1好像错误率低一些。.setUserCol("userId").setItemCol("movieId").setRatingCol("rating");ALSModel model = als.fit(training); // //调用算法开始训练Dataset<Row> itemFactors = model.itemFactors();itemFactors.show(1500);Dataset<Row> userFactors = model.userFactors();userFactors.show();// Evaluate the model by computing the RMSE on the test dataDataset<Row> rawPredictions = model.transform(test); //对测试数据进行预测Dataset<Row> predictions = rawPredictions.withColumn("rating", rawPredictions.col("rating").cast(DataTypes.DoubleType)).withColumn("prediction", rawPredictions.col("prediction").cast(DataTypes.DoubleType));RegressionEvaluator evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("rating").setPredictionCol("prediction");Double rmse = evaluator.evaluate(predictions);log.info("Root-mean-square error = {} ", rmse);jsc.stop();}
}

MLLIB实现ALS推荐:

/*** @category ALS* @author huangyueran**/
public class JavaALSExampleByMlLib {private static final Logger log = LoggerFactory.getLogger(JavaALSExampleByMlLib.class);public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("JavaALSExample").setMaster("local[4]");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> data = jsc.textFile("data/sample_movielens_ratings.txt");JavaRDD<Rating> ratings = data.map(new Function<String, Rating>() {public Rating call(String s) {String[] sarray = StringUtils.split(StringUtils.trim(s), "::");return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]),Double.parseDouble(sarray[2]));}});// Build the recommendation model using ALSint rank = 10;int numIterations = 6;MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);// Evaluate the model on rating dataJavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(new Function<Rating, Tuple2<Object, Object>>() {public Tuple2<Object, Object> call(Rating r) {return new Tuple2<Object, Object>(r.user(), r.product());}});// 预测的评分JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r) {return new Tuple2<Tuple2<Integer, Integer>, Double>(new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());}}));JavaPairRDD<Tuple2<Integer, Integer>, Tuple2<Double, Double>> ratesAndPreds = JavaPairRDD.fromJavaRDD(ratings.map(new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r) {return new Tuple2<Tuple2<Integer, Integer>, Double>(new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());}})).join(predictions);// 得到按照用户ID排序后的评分列表 key:用户idJavaPairRDD<Integer, Tuple2<Integer, Double>> fromJavaRDD = JavaPairRDD.fromJavaRDD(ratesAndPreds.map(new Function<Tuple2<Tuple2<Integer, Integer>, Tuple2<Double, Double>>, Tuple2<Integer, Tuple2<Integer, Double>>>() {public Tuple2<Integer, Tuple2<Integer, Double>> call(Tuple2<Tuple2<Integer, Integer>, Tuple2<Double, Double>> t) throws Exception {return new Tuple2<Integer, Tuple2<Integer, Double>>(t._1._1,new Tuple2<Integer, Double>(t._1._2, t._2._2));}})).sortByKey(false);//     List<Tuple2<Integer,Tuple2<Integer,Double>>> list = fromJavaRDD.collect();
//      for(Tuple2<Integer,Tuple2<Integer,Double>> t:list){
//          System.out.println(t._1+":"+t._2._1+"===="+t._2._2);
//      }JavaRDD<Tuple2<Double, Double>> ratesAndPredsValues = ratesAndPreds.values();double MSE = JavaDoubleRDD.fromRDD(ratesAndPredsValues.map(new Function<Tuple2<Double, Double>, Object>() {public Object call(Tuple2<Double, Double> pair) {Double err = pair._1() - pair._2();return err * err;}}).rdd()).mean();try {FileUtils.deleteDirectory(new File("result"));} catch (IOException e) {e.printStackTrace();}ratesAndPreds.repartition(1).saveAsTextFile("result/ratesAndPreds");//为指定用户推荐10个商品(电影)Rating[] recommendProducts = model.recommendProducts(2, 10);log.info("get recommend result:{}",Arrays.toString(recommendProducts));// 为所有用户推荐TOP N个物品//model.recommendUsersForProducts(10);// 为所有物品推荐TOP N个用户//model.recommendProductsForUsers(10)model.userFeatures().saveAsTextFile("result/userFea");model.productFeatures().saveAsTextFile("result/productFea");log.info("Mean Squared Error = {}" , MSE);}}

以上两种主要是通过Spark进行离线的ALS推荐。还有一种是通过Spark-Streaming流式计算,对像Kafka消息队列中,缓冲的实时数据进行在线(实时)计算。

Spark-Streaming进行ALS实时推荐:

通过Spark-Streaming进行ALS推荐仅仅是其中的一环。真实项目中还涉及了很多其他技术处理。

比如用户行为日志数据的埋点处理,通过flume来进行监控拉取,存储到hdfs中。通过kafka来进行海量行为数据的消费、缓冲。

以及通过Spark机器学习计算后生成的训练模型的离线存储,Web拉取模型进行缓存,对用户进行推荐等等。

/*** @author huangyueran* @category 基于Spark-streaming、kafka的实时推荐模板DEMO 原系统中包含商城项目、logback、flume、hadoop* The real time recommendation template DEMO based on Spark-streaming and Kafka contains the mall project, logback, flume and Hadoop in the original system*/
public final class SparkALSByStreaming {private static final Logger log = LoggerFactory.getLogger(SparkALSByStreaming.class);private static final String KAFKA_ADDR = "middleware:9092";private static final String TOPIC = "RECOMMEND_TOPIC";private static final String HDFS_ADDR = "hdfs://middleware:9000";private static final String MODEL_PATH = "/spark-als/model";//  基于Hadoop、Flume、Kafka、spark-streaming、logback、商城系统的实时推荐系统DEMO//  Real time recommendation system DEMO based on Hadoop, Flume, Kafka, spark-streaming, logback and mall system//  商城系统采集的数据集格式 Data Format:// 用户ID,商品ID,用户行为评分,时间戳// UserID,ItemId,Rating,TimeStamp//    53,1286513,9,1508221762//   53,1172348420,9,1508221762//    53,1179495514,12,1508221762//   53,1184890730,3,1508221762//    53,1210793742,159,1508221762//  53,1215837445,9,1508221762public static void main(String[] args) {System.setProperty("HADOOP_USER_NAME", "root"); // 设置权限用户SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaDirectWordCount").setMaster("local[1]");final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(6));Map<String, String> kafkaParams = new HashMap<String, String>(); // key是topic名称,value是线程数量kafkaParams.put("metadata.broker.list", KAFKA_ADDR); // 指定broker在哪HashSet<String> topicsSet = new HashSet<String>();topicsSet.add(TOPIC); // 指定操作的topic// Create direct kafka stream with brokers and topics// createDirectStream()JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class,StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {public String call(Tuple2<String, String> tuple2) {return tuple2._2();}});JavaDStream<Rating> ratingsStream = lines.map(new Function<String, Rating>() {public Rating call(String s) {String[] sarray = StringUtils.split(StringUtils.trim(s), ",");return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]),Double.parseDouble(sarray[2]));}});// 进行流推荐计算ratingsStream.foreachRDD(new VoidFunction<JavaRDD<Rating>>() {public void call(JavaRDD<Rating> ratings) throws Exception {//  获取到原始的数据集SparkContext sc = ratings.context();RDD<String> textFileRDD = sc.textFile(HDFS_ADDR + "/flume/logs", 3); // 读取原始数据集文件JavaRDD<String> originalTextFile = textFileRDD.toJavaRDD();final JavaRDD<Rating> originaldatas = originalTextFile.map(new Function<String, Rating>() {public Rating call(String s) {String[] sarray = StringUtils.split(StringUtils.trim(s), ",");return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]),Double.parseDouble(sarray[2]));}});log.info("========================================");log.info("Original TextFile Count:{}", originalTextFile.count()); // HDFS中已经存储的原始用户行为日志数据log.info("========================================");//  将原始数据集和新的用户行为数据进行合并JavaRDD<Rating> calculations = originaldatas.union(ratings);log.info("Calc Count:{}", calculations.count());// Build the recommendation model using ALSint rank = 10; // 模型中隐语义因子的个数int numIterations = 6; // 训练次数// 得到训练模型if (!ratings.isEmpty()) { // 如果有用户行为数据MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(calculations), rank, numIterations, 0.01);//  判断文件是否存在,如果存在 删除文件目录Configuration hadoopConfiguration = sc.hadoopConfiguration();hadoopConfiguration.set("fs.defaultFS", HDFS_ADDR);FileSystem fs = FileSystem.get(hadoopConfiguration);Path outpath = new Path(MODEL_PATH);if (fs.exists(outpath)) {log.info("########### 删除" + outpath.getName() + " ###########");fs.delete(outpath, true);}// 保存modelmodel.save(sc, HDFS_ADDR + MODEL_PATH);//  读取modelMatrixFactorizationModel modelLoad = MatrixFactorizationModel.load(sc, HDFS_ADDR + MODEL_PATH);// 为指定用户推荐10个商品(电影)for(int userId=0;userId<30;userId++){ // streaming_sample_movielens_ratings.txtRating[] recommendProducts = modelLoad.recommendProducts(userId, 10);log.info("get recommend result:{}", Arrays.toString(recommendProducts));}}}});// ==========================================================================================jssc.start();try {jssc.awaitTermination();} catch (InterruptedException e) {e.printStackTrace();}// Local Modeltry {Thread.sleep(10000000);} catch (InterruptedException e) {e.printStackTrace();}// jssc.stop();// jssc.close();}}

用户行为数据集

商城系统采集的数据集格式 Data Format:
用户ID,商品ID,用户行为评分,时间戳
UserID,ItemId,Rating,TimeStamp
53,1286513,9,1508221762
53,1172348420,9,1508221762
53,1179495514,12,1508221762
53,1184890730,3,1508221762
53,1210793742,159,1508221762
53,1215837445,9,1508221762

maven依赖

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>2.2.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib_2.10 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.10</artifactId><version>2.2.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>2.2.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>2.2.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_2.10</artifactId><version>1.6.3</version></dependency><!-- https://mvnrepository.com/artifact/log4j/log4j --><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.12</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.12</version></dependency>

以上代码以及数据集可以去Github上的项目找到

https://github.com/huangyueranbbc/Spark_ALS

转载于:https://my.oschina.net/u/4074730/blog/3011783

通过Spark进行ALS离线和Stream实时推荐相关推荐

  1. 实时推荐服务建设——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(八)

    系列文章目录 初识推荐系统--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一) 利用用户行为数据--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二) 项目主要效果展示--基 ...

  2. 电影推荐系统-整体总结(五)实时推荐

    电影推荐系统-整体总结(五)实时推荐 一.Scala代码实现 1.自定义数据类--Model.scala package streamingRecommender/*** @Author : ASUS ...

  3. 电商推荐系统(上):推荐系统架构、数据模型、离线统计与机器学习推荐、历史热门商品、最近热门商品、商品平均得分统计推荐、基于隐语义模型的协同过滤推荐、用户商品推荐列表、商品相似度矩阵、模型评估和参数选取

    文章目录 第1章 项目体系架构设计 1.1 项目系统架构 1.2 项目数据流程 1.3 数据模型 第2章 工具环境搭建 2.1 MongoDB(单节点)环境配置 2.2 Redis(单节点)环境配置 ...

  4. 腾讯数十亿广告的基础是精准实时推荐

     专访腾讯数据平台部总经理蒋杰:腾讯数十亿广告的基础是精准实时推荐 虎嗅注:本文是福布斯中文网"数据大玩家"专栏中的一篇文章.接受提问的蒋杰先生,是腾讯数据平台部总经理,在加入 ...

  5. 练习题︱ python 协同过滤ALS模型实现:商品推荐 + 用户人群放大

    之前的一个练习题:练习题︱豆瓣图书的推荐与搜索.简易版知识引擎构建(neo4j)提及了几种简单的推荐方式. 但是在超大规模稀疏数据上,一般会采用一些规模化的模型,譬如spark-ALS就是其中一款. ...

  6. 【王喆-推荐系统】前沿篇-(task3)流处理平台Flink:实时推荐

    学习总结 (1)Flink 是最具代表性的批流一体的大数据平台.特点:让批处理和流处理共用一套代码,从而既能批量处理已落盘的数据,又能直接处理实时数据流. (2)Flink 提高推荐系统实时性:用户数 ...

  7. 微信视频号的实时推荐技术架构分享

    猜你喜欢 0.2021微信视频号生态洞察报告1.如何搭建一套个性化推荐系统?2.从零开始搭建创业公司后台技术栈3.全民K歌推荐系统算法.架构及后台实现4.微博推荐算法实践与机器学习平台演进5.腾讯PC ...

  8. Spark:HanLP+Word2Vec+LSH实现文本推荐(kotlin)

    Spark:HanLP+Word2Vec+LSH实现文本推荐(kotlin) 文本推荐的基本流程就是首先对目标本文进行关键词提取,接着把关键词转成词向量,再计算词向量的相似性进行推荐.这三个步骤都有现 ...

  9. solor快速_1. SOLR处理实时推荐篇

    一.这里的实时推荐就是计算具体某用户近期操作过的商品集与库中某个商品的关联程度,比如: 二.事实上所有商品都会与用户的实时行为产生一种关联分数,这种分数可以通过协同过滤矩阵计算得到,像这样: 比如,我 ...

最新文章

  1. 位置偏移问题 绘制_AutoCAD教程之绘制螺栓连接组合图
  2. cad线性标注命令_CAD图纸中怎么进行线性标注
  3. linux下基于内存分析的rootkit检测方法
  4. Android java判断字符串包含某个字符段(或替换)
  5. 打包后放在服务器上二级目录找不到解决办法
  6. 现在当兵有什么待遇复原以后_从今年征兵情况来看,以后当兵可能会越来越难,有6点原因...
  7. 就是把努力涂抹在人生的画卷上
  8. 淘宝登货员1.02(重要升级)
  9. iUAP云运维平台v3.0全面支持基于K8s的微服务架构
  10. 设计模式(二) 模板方法
  11. java怎么打增量包_eclipse实现JavaWeb应用增量打包
  12. SpringBoot入门二
  13. Multi-thread--C++11中std::lock_guard的使用
  14. openssl以及openssh升级
  15. ECshop sina
  16. 树莓派上使用 LCD1602 显示状态
  17. 多线程编程与资源同步API和示例
  18. 台湾最大IC封装厂近三成员工停工14天,安世半导体宣布今日起调涨价格!
  19. 企业研发人员配备比例_一般公司职能结构及人员分布比例配备
  20. API解读:StringTokenizer

热门文章

  1. 技术负责人要停止写代码吗?
  2. 程序员的十大谎言,你中了几个? | 每日趣闻
  3. 微软获 OpenAI 独家 GPT-3 模型授权,是潘多拉还是聚宝盆?
  4. 腾讯发布AR导航,“生态+技术”驱动下一代智能座舱体验跃迁
  5. 低价iPhone你买吗? | 每日趣闻
  6. 实现不可变类如何禁止子类化?
  7. C++中public、protected、private的差别
  8. 再论JavaScript原型继承和对象继承
  9. 打印二叉树和为某一值的路径
  10. ecshop的Mysql操作类