目录

  • 用户商品推荐列表
  • 商品相似度矩阵
  • 扩展写到Mysql里

用户商品推荐列表

通过ALS训练出来的Model来计算所有当前用户商品的推荐列表,主要思路如下:
1.userId和productId做笛卡尔积,产生(userId,productId)的元组
2.通过模型预测(userId,productId)对应的评分。
3.将预测结果通过预测分值进行排序。
4.返回分值最大的K个商品,作为当前用户的推荐列表。
最后生成的数据结构如下:将数据保存到MongoDB的UserRecs表中

新建recommender的子项目OfflineRecommender,引入spark、scala、mongo和jblas的依赖:

<dependencies><dependency><groupId>org.scalanlp</groupId><artifactId>jblas</artifactId><version>${jblas.version}</version></dependency><!-- Spark的依赖引入 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.11</artifactId></dependency><!-- 引入Scala --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId></dependency><!-- 加入MongoDB的驱动 --><!-- 用于代码方式连接MongoDB --><dependency><groupId>org.mongodb</groupId><artifactId>casbah-core_2.11</artifactId><version>${casbah.version}</version></dependency><!-- 用于Spark和MongoDB的对接 --><dependency><groupId>org.mongodb.spark</groupId><artifactId>mongo-spark-connector_2.11</artifactId><version>${mongodb-spark.version}</version></dependency>
</dependencies>

同样经过前期的构建样例类、声明配置、创建SparkSession等步骤,可以加载数据开始计算模型了。
核心代码如下:
src/main/scala/com.donglin.offline/OfflineRecommender.scala

import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrixcase class ProductRating(userId:Int,productId:Int,score:Double,timestamp:Int)
case class MongoConfig(uri:String,db:String)//定义标准推荐对象
case class Recommendation(productId: Int, score:Double)
// 定义用户的推荐列表
case class UserRecs(userId :Int,recs:Seq[Recommendation])
//定义商品相似度列表
case class ProductRecs(productId:Int,recs:Seq[Recommendation])object OfflineRecommender {//定义mongodb中存储表名val MONGODB_RATING_COLLECTION = "Rating"val USER_RECS = "UserRecs"val PRODUCT_RECS = "ProductRecs"val USER_MAX_RECOMMENDATION = 20def main(args: Array[String]): Unit = {val config = Map("spark.cores" -> "local[*]","mongo.uri" -> "mongodb://hadoop12:27017/recommender","mongo.db" -> "recommender")//创建一个spark configval sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")//创建spark sessionval spark = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._implicit val  mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))//加载数据val ratingRDD = spark.read.option("uri",mongoConfig.uri).option("collection",MONGODB_RATING_COLLECTION).format("com.mongodb.spark.sql").load().as[ProductRating].rdd.map(rating => (rating.userId,rating.productId,rating.score)).cache()//提取出所有用户和商品的数据集val userRDD = ratingRDD.map(_._1).distinct()val productRDD = ratingRDD.map(_._2).distinct()//TODO :核心计算过程//1.训练隐语义模型val trainData = ratingRDD.map(x=>Rating(x._1,x._2,x._3))//定义茉寻训练的参数  rank隐特征个数,iterations迭代次数,lambda正则化系数val (rank,iterations,lambda) = (5 , 10 , 0.01)val model = ALS.train(trainData,rank,iterations,lambda)//2.获得预测评分矩阵,得到用户的推荐列表// 用userRDD 和 productRDD做一个笛卡尔积,得到空的userProductsRDDval userProducts = userRDD.cartesian(productRDD)val preRationg = model.predict(userProducts)//从预测评分矩阵中提取得到用户推荐列表val userRecs = preRationg.filter(_.rating>0).map(raing => (raing.user,(raing.product,raing.rating))).groupByKey().map{case (userId,recs) =>UserRecs(userId,recs.toList.sortWith(_._2>_._2).take(USER_MAX_RECOMMENDATION).map(x=>Recommendation(x._1,x._2)))}.toDF()userRecs.write.option("uri",mongoConfig.uri).option("collection",USER_RECS).mode("overwrite").format("com.mongodb.spark.sql").save()//3.利用商品的特征向量,计算商品相似度列表spark.stop()}}

商品相似度矩阵

通过ALS计算商品相似度矩阵,该矩阵用于查询当前商品的相似商品并为实时推荐系统服务。
离线计算的ALS 算法,算法最终会为用户、商品分别生成最终的特征矩阵,分别是表示用户特征矩阵的U(m x k)矩阵,每个用户由 k个特征描述;表示物品特征矩阵的V(n x k)矩阵,每个物品也由 k 个特征描述。
V(n x k)表示物品特征矩阵,每一行是一个 k 维向量,虽然我们并不知道每一个维度的特征意义是什么,但是k 个维度的数学向量表示了该行对应商品的特征。
所以,每个商品用V(n x k)每一行的向量表示其特征,于是任意两个商品 p:特征向量为,商品q:特征向量为之间的相似度sim(p,q)可以使用和的余弦值来表示:

数据集中任意两个商品间相似度都可以由公式计算得到,商品与商品之间的相似度在一段时间内基本是固定值。最后生成的数据保存到MongoDB的ProductRecs表中。

   val productFeatures = model.productFeatures.map{case (productId,features) => (productId,new DoubleMatrix(features))}//两两配对商品,计算余弦相似度val productRecs = productFeatures.cartesian(productFeatures).filter{case (a,b) => a._1 != b._1}//计算余弦相似度.map{case(a,b) =>val simScore = consinSim(a._2,b._2)(a._1,(b._1,simScore))}.filter(_._2._2 > 0.4).groupByKey().map{case (productId,recs) =>ProductRecs(productId,recs.toList.sortWith(_._2>_._2).map(x=>Recommendation(x._1,x._2)))}.toDF()productRecs.write.option("uri",mongoConfig.uri).option("collection",PRODUCT_RECS).mode("overwrite").format("com.mongodb.spark.sql").save()

其中,consinSim是求两个向量余弦相似度的函数,代码实现如下:

//计算两个商品之间的余弦相似度
def consinSim(product1: DoubleMatrix, product2:DoubleMatrix) : Double ={product1.dot(product2) / ( product1.norm2()  * product2.norm2() )
}

最终代码实现

import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix
case class ProductRating(userId:Int,productId:Int,score:Double,timestamp:Int)
case class MongoConfig(uri:String,db:String)//定义标准推荐对象
case class Recommendation(productId: Int, score:Double)
// 定义用户的推荐列表
case class UserRecs(userId :Int,recs:Seq[Recommendation])
//定义商品相似度列表
case class ProductRecs(productId:Int,recs:Seq[Recommendation])object OfflineRecommender {//定义mongodb中存储表名val MONGODB_RATING_COLLECTION = "Rating"val USER_RECS = "UserRecs"val PRODUCT_RECS = "ProductRecs"val USER_MAX_RECOMMENDATION = 3def main(args: Array[String]): Unit = {val config = Map("spark.cores" -> "local[*]","mongo.uri" -> "mongodb://hadoop12:27017/recommender","mongo.db" -> "recommender")//创建一个spark configval sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")//创建spark sessionval spark = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._implicit val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))//加载数据val ratingRDD = spark.read.option("uri", mongoConfig.uri).option("collection", MONGODB_RATING_COLLECTION).format("com.mongodb.spark.sql").load().as[ProductRating].rdd.map(rating => (rating.userId, rating.productId, rating.score)).cache()//提取出所有用户和商品的数据集val userRDD = ratingRDD.map(_._1).distinct()val productRDD = ratingRDD.map(_._2).distinct()//TODO :核心计算过程//1.训练隐语义模型val trainData = ratingRDD.map(x => Rating(x._1, x._2, x._3))//定义茉寻训练的参数  rank隐特征个数,iterations迭代次数,lambda正则化系数val (rank, iterations, lambda) = (10,10,0.01)val model = ALS.train(trainData, rank, iterations, lambda)//2.获得预测评分矩阵,得到用户的推荐列表// 用userRDD 和 productRDD做一个笛卡尔积,得到空的userProductsRDDval userProducts = userRDD.cartesian(productRDD)//preRating输出是这样的  Rating(3455,286997,4.401263661039027)val preRating = model.predict(userProducts)//从预测评分矩阵中提取得到用户列表val userRecs = preRating.filter(_.rating > 0).map(rating => (rating.user, (rating.product, rating.rating))).groupByKey().map {case (userId, recs) =>UserRecs(userId, recs.toList.sortWith(_._2 > _._2).take(USER_MAX_RECOMMENDATION).map(x => Recommendation(x._1,x._2)))}.toDF()userRecs.write.option("uri",mongoConfig.uri).option("collection",USER_RECS).mode("overwrite").format("com.mongodb.spark.sql").save()//3.利用商品的特征向量,计算商品相似度列表val productFeatures = model.productFeatures.map {case (productId, feature) => (productId, new DoubleMatrix(feature))}/*productFeatures输出(470839,[-0.405416876077652; 0.007109188940376043; -0.13823118805885315;0.6249048113822937; -0.35293328762054443; -1.0173797607421875;-0.28891870379447937; 0.01371031068265438; 0.7766180038452148; 0.3023674488067627])*///两两配对商品,计算余弦相似度val productRecs = productFeatures.cartesian(productFeatures).filter {case (a, b) => a._1 != b._1}//计算余弦相似度.map {case (a, b) =>val simScore = consinSim(a._2, b._2)(a._1, (b._1, simScore))}.filter(_._2._2 > 0.4).groupByKey().map {case (productId, recs) =>ProductRecs(productId, recs.toList.sortWith(_._2 > _._2).take(USER_MAX_RECOMMENDATION).map(x => Recommendation(x._1, x._2)))}.toDF()productRecs.write.option("uri",mongoConfig.uri).option("collection",PRODUCT_RECS).mode("overwrite").format("com.mongodb.spark.sql").save()spark.close()}def consinSim(product1: DoubleMatrix, product2: DoubleMatrix):Double={//模长是norm2product1.dot(product2)/(product1.norm2() * product2.norm2())}}

查看结果

扩展写到Mysql里

遇到了报错

Exception in thread "main" java.lang.IllegalArgumentException: Can't get JDBC type for array<struct<productId:int,score:double>>

原因是mysql里面不能存储array,尽量存在redis mongodb里

大数据技术之电商推荐系统(6) | 基于LFM的离线推荐模块相关推荐

  1. 大数据技术之电商推荐系统(4) | 初始化业务数据

    目录 一.数据加载准备 Products数据集 Ratings数据集 日志管理配置文件 二.数据初始化到MongoDB 启动MongoDB数据库(略) 数据加载程序主体实现 三.扩展写到Mysql里 ...

  2. 《Storm技术内幕与大数据实践》作者陈敏敏谈大数据技术在电商领域的应用

    在10月15~17日的QCon上海2015上,1号店资深架构师.<Storm技术内幕与大数据实践>一书作者陈敏敏将分享<1号店通用精准化平台架构以及大数据营销实践>.在大会开始 ...

  3. 大数据技术在电商的应用

    1.大数据技术与跨境电子商务综述 (1)大数据技术.大数据量,是指数据量极大,不能使用传统的数据采集方法.传统的数据库.传统的研究方法对数据集进行分析.传统的数据分析往往采用样本,采用推理的方法,用常 ...

  4. Flink_大数据技术之电商用户行为分析

    大数据技术之电商用户行为分析 第1章 项目整体介绍 1.1 电商的用户行为 电商平台中的用户行为频繁且较复杂,系统上线运行一段时间后,可以收集到大量的用户行为数据,进而利用大数据技术进行深入挖掘和分析 ...

  5. 尚硅谷大数据技术之电商用户行为数据分析

    尚硅谷大数据技术之电商用户行为分析 第1章 项目整体介绍 1.1 电商的用户行为 电商平台中的用户行为频繁且较复杂,系统上线运行一段时间后,可以收集到大量的用户行为数据,进而利用大数据技术进行深入挖掘 ...

  6. 大数据下的电商风控体系——李学庆

    由51CTO举办的WOT"互联网+"时代大数据技术峰会上,来自京东商城安全管理部经理李学庆做了以<大数据下的电商风控体系>为主题的演讲.本文章是把本次分享的干货亮点整理 ...

  7. 大数据项目实战——电商推荐系统设计

    摘要 1 项目体系架构设计 1.1系统架构设计 项目以推荐系统建设领域知名的经过修改过的中文亚马逊电商数据集作为依托,以某电商网站真实业务数据架构为基础,构建了符合实践项目的一体化的电商推荐系统,包含 ...

  8. 除了啤酒与尿布 大数据又助电商玩口碑营销

    在快消行业,啤酒与尿布成了大数据应用的经典案例:全球零售业巨头沃尔玛通过对消费者购物行为进行分析时发现,男性顾客在购买婴儿尿片时,常常会顺便搭配几瓶啤酒来犒劳自己,于是沃尔玛将啤酒和尿布陈列在一起,大 ...

  9. 电商大数据项目(二)-推荐系统实战之实时分析以及离线分析

    电商大数据项目-推荐系统实战(一)环境搭建以及日志,人口,商品分析 https://blog.51cto.com/6989066/2325073 电商大数据项目-推荐系统实战之推荐算法 https:/ ...

最新文章

  1. (2006, 'MySQL server has gone away') 错误解决 - dba007的空间 - 51CTO技术博客
  2. 使用Image类提示内存不足问题
  3. json lib java_Json-lib 进行java与json字符串转换之二
  4. java连接cdh集群_有一台电脑需要通过beeline的方式连接到CDHclouderahadoop集群,提示报错。...
  5. dedecms联动筛选_DEDECMS分类信息按联动类别筛选的实现方法
  6. [转]解决微信及360浏览器无法读取本地图片问题
  7. C#學習基礎------事件和索引指示器
  8. VB 删除数组中的重复元素
  9. 程序员有多少读过《人性的弱点》?项目经理呢?
  10. AD9371+ZC706 NO-OS初始工程搭建-Vivado2019.2
  11. Java实现黑客帝国代码雨(待机屏保)
  12. 使用 Flash 描述复杂的社交网络
  13. 「学习笔记」CSS基础(2)
  14. win7计算机系统还原,如何使用Win7系统自带还原修复电脑
  15. 智能暖风机——FAQ
  16. filezilla提示 local: unable to open
  17. google s2 java开发文档
  18. 完成静态服务器——Node.js摸石头系列之四
  19. 〖Python 数据库开发实战 - MySQL篇㉙〗- MySQL 字符函数
  20. 百度地图 key_百度地址智能解析amp;百度地图更新

热门文章

  1. C语言 提取软件文字,使用OCR文字识别软件如何将图中文字识别提取出来
  2. Nodejs xlsx导出导出
  3. 熟悉又陌生的 k8s 字段:finalizers
  4. bilstm-crf_序列标注问题
  5. 闭关之 C++ Template 笔记(一):PartⅠ基本概念(一)
  6. 如何更改计算机名称及查看自己计算机的型号
  7. Android蓝牙开发系列文章-蓝牙mesh(一)
  8. 蓝桥杯 时间管理大师
  9. 梅科尔工作室-孙溢博-鸿蒙笔记1
  10. 解决phpstorm运行很卡问题