目录

  • 一.数据加载准备
    • Products数据集
    • Ratings数据集
    • 日志管理配置文件
  • 二.数据初始化到MongoDB
    • 启动MongoDB数据库(略)
    • 数据加载程序主体实现
  • 三.扩展写到Mysql里

一.数据加载准备

在src/main/目录下,可以看到已有的默认源文件目录是java,我们可以将其改名为scala。将数据文products.csv,ratings.csv复制到资源文件目录src/main/resources下,我们将从这里读取数据并加载到mongodb中。
数据

链接:https://pan.baidu.com/s/1eYfyuzhb0MbTtcGX2okcxg
提取码:5vd5

Products数据集

数据格式:
productId,name,categoryIds, amazonId, imageUrl, categories, tags

  • Product数据集有7个字段,每个字段之间通过“^”符号进行分割。其中的categoryIds、amazonId对于内容特征没有实质帮助,我们只需要其它5个字段:

Ratings数据集

数据格式:
userId,prudcutId,rating,timestamp
例如:
4867,457976,5.0,1395676800

日志管理配置文件

log4j对日志的管理,需要通过配置文件来生效。在src/main/resources下新建配置文件log4j.properties,写入以下内容:

log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%5L)  :  %m%n

二.数据初始化到MongoDB

启动MongoDB数据库(略)

数据加载程序主体实现

  • 我们会为原始数据定义几个样例类,通过SparkContext的textFile方法从文件中读取数据,并转换成DataFrame,再利用Spark SQL提供的write方法进行数据的分布式插入。
  • 在DataLoader/src/main/scala下新建package,命名为com.donglin.recommender,新建名为DataLoader的scala class文件。
    程序主体代码如下:
import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 3982                         商品ID* Fuhlen富勒 M                 商品名称* 1057,439,736                 商品分类ID   x* B009EJN4T2                   亚马逊ID     x* https://images-cn-4          商品的图片URL* 外设产品|鼠标|电脑/办公        商品分类* 富勒|鼠标|电子产品|           商品UGC标签*/
case class Product(productId:Int,name :String,imageUrl:String,categories:String,tags:String)/*** 4867            用户ID* 457976          商品ID* 5.0             评分* 1395676800      时间戳*/
case class Rating(userId:Int,productId:Int,socre:Double,timestamp:Int)//MongoDB的连接配置
case class MongoConfig(uri:String,db:String)object DataLoader {//定义数据文件路径val RRODUCT_DATA_PATH = "D:\\idea\\ECommerceRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\products.csv"val RATING_DATA_PATH = "D:\\idea\\ECommerceRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\ratings.csv"//定义mongodb存储的表名val MONGODB_PRODUCT_COLLECTION = "Product"val MONGODB_RATING_COLLECTION = "Rating"def main(args: Array[String]): Unit = {val conf = Map("spark.cores" -> "local[*]","mongo.url" -> "mongodb://hadoop12:27017/recommender","mongo.db" -> "recommender")//创建一个spark configval sparkConf = new SparkConf().setMaster(conf("spark.cores")).setAppName("DataLoader")//创建spark sessionval spark = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._//加载数据val productRDD = spark.sparkContext.textFile(RRODUCT_DATA_PATH)val productDF = productRDD.map(item =>{//product通过^分隔,切分出来val dataArray = item.split("\\^")//转换成ProductProduct(dataArray(0).trim.toInt,dataArray(1).trim,dataArray(4).trim,dataArray(5).trim,dataArray(6).trim)}).toDF()val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)val ratingDF = ratingRDD.map(item =>{val dataArray = item.split(",")Rating(dataArray(0).trim.toInt,dataArray(1).trim.toInt,dataArray(2).trim.toDouble,dataArray(3).trim.toInt)}).toDF()implicit val mongoConfig = MongoConfig( conf("mongo.url"), conf("mongo.db"))storeDataInMogoDB(productDF,ratingDF)spark.stop()}def storeDataInMogoDB(productDF:DataFrame,ratingDF:DataFrame)(implicit mongoConfig:MongoConfig): Unit ={//新建一个mongodb的连接,客户端val mongoClient = MongoClient(MongoClientURI(mongoConfig.uri))//定义要操作的mongodb表,可以理解为 db.Productval productCollection = mongoClient(mongoConfig.db)(MONGODB_PRODUCT_COLLECTION)val ratingCollection = mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)//如果表已经存在,则删掉productCollection.dropCollection()ratingCollection.dropCollection()//将当前数据存入对应的表中productDF.write.option("uri",mongoConfig.uri).option("collection",MONGODB_PRODUCT_COLLECTION).mode("overwrite").format("com.mongodb.spark.sql").save()ratingDF.write.option("uri",mongoConfig.uri).option("collection",MONGODB_RATING_COLLECTION).mode("overwrite").format("com.mongodb.spark.sql").save()//对表创建索引productCollection.createIndex(MongoDBObject("productId" -> 1))ratingCollection.createIndex(MongoDBObject("productId" -> 1))ratingCollection.createIndex(MongoDBObject("userId" -> 1))mongoClient.close()}}

使用mongodb查看结果

三.扩展写到Mysql里

依赖

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version>
</dependency>

代码实现

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/*** 3982                         商品ID* Fuhlen富勒 M                 商品名称* 1057,439,736                 商品分类ID   x* B009EJN4T2                   亚马逊ID     x* https://images-cn-4          商品的图片URL* 外设产品|鼠标|电脑/办公        商品分类* 富勒|鼠标|电子产品|           商品UGC标签*/
case class Product(productId:Int,name :String,imageUrl:String,categories:String,tags:String)/*** 4867            用户ID* 457976          商品ID* 5.0             评分* 1395676800      时间戳*/
case class Rating(userId:Int,productId:Int,score:Double,timestamp:Int)//Mysql的连接配置
case class MysqlConfig(url:String,user:String,pw:String)object DataLoader {//定义数据文件路径val RRODUCT_DATA_PATH = "D:\\idea\\ECommerceRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\products.csv"val RATING_DATA_PATH = "D:\\idea\\ECommerceRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\ratings.csv"//定义mongodb存储的表名val MYSQL_PRODUCT_COLLECTION = "Product"val MYSQL_RATING_COLLECTION = "Rating"def main(args: Array[String]): Unit = {val conf = Map("spark.cores" -> "local[*]","mysql.url" -> "jdbc:mysql://hadoop12:3306/recommend?useUnicode=true&characterEncoding=UTF-8","mysql.user" -> "root","mysql.pw" -> "yy8266603")//创建一个spark configval sparkConf = new SparkConf().setMaster(conf("spark.cores")).setAppName("DataLoader")//创建spark sessionval spark = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._//加载数据val productRDD = spark.sparkContext.textFile(RRODUCT_DATA_PATH)val productDF = productRDD.map(item =>{//product通过^分隔,切分出来val dataArray = item.split("\\^")//转换成ProductProduct(dataArray(0).trim.toInt,dataArray(1).trim,dataArray(4).trim,dataArray(5).trim,dataArray(6).trim)}).toDF()val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)val ratingDF = ratingRDD.map(item =>{val dataArray = item.split(",")Rating(dataArray(0).trim.toInt,dataArray(1).trim.toInt,dataArray(2).trim.toDouble,dataArray(3).trim.toInt)}).toDF()implicit val mysqlConfig = MysqlConfig(conf("mysql.url"),conf("mysql.user"),conf("mysql.pw"))storeDataInMysql(productDF,ratingDF)spark.stop()}def storeDataInMysql(productDF:DataFrame,ratingDF:DataFrame)(implicit mysqlConfig: MysqlConfig): Unit ={//将当前数据存入对应的表中productDF.write.option("url",mysqlConfig.url).option("user",mysqlConfig.user).option("password",mysqlConfig.pw).option("dbtable",MYSQL_PRODUCT_COLLECTION).mode(SaveMode.Overwrite).format("jdbc").save()ratingDF.write.option("url",mysqlConfig.url).option("user",mysqlConfig.user).option("password",mysqlConfig.pw).option("dbtable",MYSQL_RATING_COLLECTION).mode(SaveMode.Overwrite).format("jdbc").save()}}

大数据技术之电商推荐系统(4) | 初始化业务数据相关推荐

  1. 大数据技术之电商推荐系统(6) | 基于LFM的离线推荐模块

    目录 用户商品推荐列表 商品相似度矩阵 扩展写到Mysql里 用户商品推荐列表 通过ALS训练出来的Model来计算所有当前用户商品的推荐列表,主要思路如下: 1.userId和productId做笛 ...

  2. 《Storm技术内幕与大数据实践》作者陈敏敏谈大数据技术在电商领域的应用

    在10月15~17日的QCon上海2015上,1号店资深架构师.<Storm技术内幕与大数据实践>一书作者陈敏敏将分享<1号店通用精准化平台架构以及大数据营销实践>.在大会开始 ...

  3. 大数据技术在电商的应用

    1.大数据技术与跨境电子商务综述 (1)大数据技术.大数据量,是指数据量极大,不能使用传统的数据采集方法.传统的数据库.传统的研究方法对数据集进行分析.传统的数据分析往往采用样本,采用推理的方法,用常 ...

  4. 大数据实战电商推荐系统(1)-数据加载和存储

    文章目录 1. 创建文件+配置文件 2.代码 3.结果展示 数据集: 链接:https://pan.baidu.com/s/1PbHV-pq_fF-ltQhj6yh5Hw 提取码:q0bv 1. 创建 ...

  5. 大数据之电商推荐系统

    #大数据之电商推荐系统# 项目系统架构 数据整理 商品数据 商品ID 商品名称 商品种类 商品图片URL 商品标签 productId name categories imageUrl tags 评分 ...

  6. 基于Hadoop开发的大数据实战项目——电商日志分享系统

    项目介绍 大数据电商日志平台项目以某电商网站真实的业务数据架构为基础,将数据从收集到使用通过前端应用程序,后端程序,数据分析,平台部署等多方位的闭环的业务实现.形成了一套符合教学体系的电商日志分析项目 ...

  7. 电商推荐系统三:创建项目并初始化业务数据

    三.创建项目并初始化业务数据 目录 三.创建项目并初始化业务数据 3.1 在IDEA中创建maven项目 3.1.1 项目框架搭建 3.1.2 声明项目中工具的版本信息 3.1.3 添加项目依赖 3. ...

  8. 【大数据实战电商推荐系统】

    文章目录 第1章 项目体系框架设计 第2章 工具环境搭建 第3章 项目创建并初始化业务数据 3.1 IDEA创建Maven项目(略) 3.2 数据加载准备(说明书) 3.3 数据初始化到MongoDB ...

  9. 大数据项目实战——电商推荐系统设计

    摘要 1 项目体系架构设计 1.1系统架构设计 项目以推荐系统建设领域知名的经过修改过的中文亚马逊电商数据集作为依托,以某电商网站真实业务数据架构为基础,构建了符合实践项目的一体化的电商推荐系统,包含 ...

最新文章

  1. R语言data.table导入数据实战:data.table使用字符向量创建新的数据列
  2. Sql Server之旅——第一站 那些给我们带来福利的系统视图
  3. 搭建DNS主、从服务实验
  4. 阿里巴巴大数据实践—阿里巴巴的数据模型实践综述
  5. windows环境 安装python的虚拟环境,安装第三方包的总结
  6. Python Base64模块的使用
  7. 爬虫常用Xpath和CSS3选择器对比
  8. 视频会议之BigBlueButton
  9. Android中常用适配器理解及使用
  10. 敏捷软件质量保证的方法与实践
  11. 点亮led灯的个数_点亮一个led灯程序
  12. 【51单片机】DS1302时钟芯片
  13. 竹间智能以AI能力打通医疗链路全流程,让说明书变成“虚拟健康顾问”
  14. 搭建Jpress博客系统,超详细(保姆及教学)
  15. 2010年Ei收录的中国期刊
  16. LeetCode1219 黄金矿工
  17. 爬虫速成(二):数据获取
  18. 第三章-集合论 3.2-Russell 悖论(选读)
  19. [Practical.Vim(2012.9)].Drew.Neil.Tip94 学习摘要
  20. 有无可能在非IOS系统上实现苹果为网易/腾讯邮箱做的实时推送

热门文章

  1. 首先的亚当和末后的亚当_亚当·史密斯(Adam Smith)是一名敏捷主义者吗?
  2. Recsys21 | 浅谈推荐系统如何在NLP的肩膀上前进
  3. 5000字详解数据安全治理
  4. 我的世界服务器在线数量变量,我的世界支持上百变量 最好的计分板插件
  5. openSUSE当服务器稳定吗,OpenSUSE 服务器系统部署
  6. UDS之0x22、0x2E服务
  7. 中国AI军团称霸全球口语翻译大赛!搜狗夺冠,讯飞阿里二三
  8. js把html转换成word,js实现word转换为html
  9. oldboy的视频和江民防火墙有冲突吗?
  10. 沉浸式 3D 场景下的多视点视频 增强算法研究