第四部分-推荐系统-实时推荐

  • 本模块基于第4节得到的模型,开始为用户做实时推荐,推荐用户最有可能喜爱的5部电影。

说明几点

1.数据来源是 testData 测试集的数据。这里面的用户,可能存在于训练集中,也可能是新用户。因此,这里要做处理。
2. SparkStreaming + kakfa

开始Coding

步骤一:在streaming 包下,新建PopularMovies2


package com.csylh.recommend.streamingimport com.csylh.recommend.config.AppConf
import org.apache.spark.sql.SaveMode/*** Description: 个性化推荐** @Author: 留歌36* @Date: 2019/10/18 17:42*/
object PopularMovies2 extends AppConf{def main(args: Array[String]): Unit = {val movieRatingCount = spark.sql("select count(*) c, movieid from trainingdata group by movieid order by c")// 前5部进行推荐val Top5Movies = movieRatingCount.limit(5)Top5Movies.registerTempTable("top5")val top5DF = spark.sql("select a.title from movies a join top5 b on a.movieid=b.movieid")// 把数据写入到HDFS上top5DF.write.mode(SaveMode.Overwrite).parquet("/tmp/top5DF")// 将数据从HDFS加载到Hive数据仓库中去spark.sql("drop table if exists top5DF")spark.sql("create table if not exists top5DF(title string) stored as parquet")spark.sql("load data inpath '/tmp/top5DF' overwrite into table top5DF")// 最终表里应该是5部推荐电影的名称}
}

步骤二:在streaming 包下,新建SparkDirectStreamApp

package com.csylh.recommend.streamingimport com.csylh.recommend.config.AppConf
import kafka.serializer.StringDecoder
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Description:** @Author: 留歌36* @Date: 2019/10/18 16:33*/
object SparkDirectStreamApp extends AppConf{def main(args:Array[String]): Unit ={val ssc = new StreamingContext(sc, Seconds(5))val topics = "movie_topic".split(",").toSetval kafkaParams = Map[String, String]("metadata.broker.list"->"hadoop001:9093,hadoop001:9094,hadoop001:9095","auto.offset.reset" -> "largest" //smallest :从头开始 largest:最新)// Direct 模式:SparkStreaming 主动去Kafka中pull拉数据val modelPath = "/tmp/BestModel/0.8521581387523667"val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)def exist(u: Int): Boolean = {val trainingdataUserIdList = spark.sql("select distinct(userid) from trainingdata").rdd.map(x => x.getInt(0)).collect()  // RDD[row] ==> RDD[Int]trainingdataUserIdList.contains(u)}// 为没有登录的用户推荐电影的策略:// 1.推荐观看人数较多的电影,采用这种策略// 2.推荐最新的电影val defaultrecresult = spark.sql("select * from top5DF").rdd.toLocalIterator// 创建SparkStreaming接收kafka消息队列数据的2种方式// 一种是Direct approache,通过SparkStreaming自己主动去Kafka消息队// 列中查询还没有接收进来的数据,并把他们拉pull到sparkstreaming中。val model = MatrixFactorizationModel.load(ssc.sparkContext, modelPath)val messages = stream.foreachRDD(rdd=> {val userIdStreamRdd = rdd.map(_._2.split("|")).map(x=>x(1)).map(_.toInt)val validusers = userIdStreamRdd.filter(userId => exist(userId))val newusers = userIdStreamRdd.filter(userId => !exist(userId))// 采用迭代器的方式来避开对象不能序列化的问题。// 通过对RDD中的每个元素实时产生推荐结果,将结果写入到redis,或者其他高速缓存中,来达到一定的实时性。// 2个流的处理分成2个sparkstreaming的应用来处理。val validusersIter = validusers.toLocalIteratorval newusersIter = newusers.toLocalIteratorwhile (validusersIter.hasNext) {val u= validusersIter.nextprintln("userId"+u)val recresult = model.recommendProducts(u, 5)val recmoviesid = recresult.map(_.product)println("我为用户" + u + "【实时】推荐了以下5部电影:")for (i <- recmoviesid) {val moviename = spark.sql(s"select title from movies where movieId=$i").first().getString(0)println(moviename)}}while (newusersIter.hasNext) {println("*新用户你好*以下电影为您推荐below movies are recommended for you :")for (i <- defaultrecresult) {println(i.getString(0))}}})ssc.start()ssc.awaitTermination()}
}

步骤三:将创建的项目进行打包上传到服务器
mvn clean package -Dmaven.test.skip=true

步骤四:先编写个性化推荐代码 shell 执行脚本

[root@hadoop001 ml]# vim PopularMovies2.sh
export HADOOP_CONF_DIR=/root/app/hadoop-2.6.0-cdh5.7.0/etc/hadoop$SPARK_HOME/bin/spark-submit \
--class com.csylh.recommend.streaming.PopularMovies2 \
--master spark://hadoop001:7077 \
--name PopularMovies2 \
--driver-memory 10g \
--executor-memory 5g \
/root/data/ml/movie-recommend-1.0.jar

步骤五:执行sh PopularMovies2.sh

确保:

[root@hadoop001 ml]# spark-sql
19/10/20 22:59:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark master: local[*], Application Id: local-1571583574311
spark-sql> show tables;
default links   false
default movies  false
default ratings false
default tags    false
default testdata    false
default top5df  false
default trainingdata    false
default trainingdataasc false
default trainingdatadesc    false
Time taken: 2.232 seconds, Fetched 9 row(s)
spark-sql> select * from top5df;
Follow the Bitch (1996)
Radio Inside (1994)
Faces of Schlock (2005)
Mág (1988)
"Son of Monte Cristo
Time taken: 1.8 seconds, Fetched 5 row(s)
spark-sql> 

步骤六:再编写model实时推荐代码 shell 执行脚本

export HADOOP_CONF_DIR=/root/app/hadoop-2.6.0-cdh5.7.0/etc/hadoop$SPARK_HOME/bin/spark-submit \
--class com.csylh.recommend.streaming.SparkDirectStreamApp \
--master spark://hadoop001:7077 \
--name SparkDirectStreamApp \
--driver-memory 10g \
--executor-memory 5g \
--total-executor-cores 10 \
--jars /root/app/kafka_2.11-1.1.1/libs/kafka-clients-1.1.1.jar \
--packages "mysql:mysql-connector-java:5.1.38,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.2" \
/root/data/ml/movie-recommend-1.0.jar

步骤七:sh SparkDirectStreamApp.sh

// TODO…

有任何问题,欢迎留言一起交流~~
更多文章:基于Spark的电影推荐系统:https://blog.csdn.net/liuge36/column/info/29285

基于Spark的电影推荐系统(推荐系统~7)相关推荐

  1. 基于Spark的电影推荐系统(电影网站)

    第一部分-电影网站: 软件架构: SpringBoot+Mybatis+JSP 项目描述:主要实现电影网站的展现 和 用户的所有动作的地方 技术选型: 技术 名称 官网 Spring Boot 容器 ...

  2. 基于Spark的电影推荐系统(推荐系统~1)

    第四部分-推荐系统-项目介绍 行业背景: 快速:Apache Spark以内存计算为核心 通用 :一站式解决各个问题,ADHOC SQL查询,流计算,数据挖掘,图计算 完整的生态圈 只要掌握Spark ...

  3. 基于Spark的电影推荐系统(推荐系统~4)

    第四部分-推荐系统-模型训练 本模块基于第3节 数据加工得到的训练集和测试集数据 做模型训练,最后得到一系列的模型,进而做 预测. 训练多个模型,取其中最好,即取RMSE(均方根误差)值最小的模型 说 ...

  4. 基于Spark的电影推荐系统(推荐系统~5)

    第四部分-推荐系统-离线推荐 本模块基于第4节得到的模型,开始为用户做离线推荐,推荐用户最有可能喜爱的5部电影. 说明几点 1.主要分为两个模块.其一是为 单个随机用户 做推荐,其二是为 所有用户做推 ...

  5. 电影推荐系统 python简书_基于Spark的电影推荐系统(实战简介)

    ## 写在前面 一直不知道这个专栏该如何开始写,思来想去,还是暂时把自己对这个项目的一些想法 和大家分享 的形式来展现.有什么问题,欢迎大家一起留言讨论. 这个项目的源代码是在https://gith ...

  6. 基于Spark实现电影点评系统用户行为分析—RDD篇(一)

    文章目录 1.项目背景 2.数据描述 3.代码实现 1.项目背景 电影推荐系统(MovieLens)是美国明尼苏达大学(Minnesota)计算机科学与工程学院的GroupLens项目组创办的,是一个 ...

  7. 基于Spark实现电影点评系统用户行为分析—DataFrame篇(二)

    文章目录 1.介绍 2.业务统计 3.代码实现 1.介绍 Spark SQL有三种不同实现方式:(1)使用DataFrame与RDD结合的方式.(2)纯粹使用DataFrame的方式.(3)使用Dat ...

  8. 基于spark的电影数据分析

    目 录 摘 要 I Abstract II 1 绪论 1 1.1 选题背景及意义 1 1.2 研究现状 2 1.3 研究内容及论文组织结构 2 2 关键技术和工具环境 4 2.1 IDEA简介 4 2 ...

  9. 基于Spark MLlib平台的协同过滤算法---电影推荐系统

    协同过滤算法概述 基于模型的协同过滤应用---电影推荐 实时推荐架构分析     一.协同过滤算法概述 本人对算法的研究,目前还不是很深入,这里简单的介绍下其工作原理. 通常,协同过滤算法按照数据使用 ...

最新文章

  1. Ubuntu 20.04 LTS安装搜狗输入法,只需三条命令,还能自动更新
  2. MySQL高可用方案-PXC(Percona XtraDB Cluster)环境部署详解
  3. Linux - alias 定义的变量
  4. Smalidea+IntelliJ IDEA/Android Studio动态调试安卓app教程
  5. apache如何支持asp.net
  6. 企业云存储:为什么中大型企业偏爱自建私有云?
  7. 前端学习(2400):关于aixos的create方法
  8. 数学作图工具_科研论文作图系列-从PPT到AI (一)
  9. java实现定时任务 Schedule
  10. 华为云副总裁薛浩:云原生视频服务,重塑体验,助力产业升级
  11. matlab 排课代码,matlab遗传算法排课问题,程序一直有错,求解答
  12. pip 离线安装_安装不上python的模块怎么办?别怕,我这有妙招!
  13. iOS YYText的使用笔记二(YYLabel聊天表情+文字并排)
  14. web网页实现扫描条形码(安卓+ios适配)
  15. 深夜读萧红《呼兰河传》
  16. java实现中国象棋3:走棋规则的实现
  17. XXL-JOB任务调度
  18. channel is not opened.
  19. 达人评测 r7 7730u和i5 12500h差距 锐龙r77730u和酷睿i512500h对比
  20. html5 video播放按钮放在中间,在html5视频控制区跟踪点击播放按钮(Track clicks to play button in html5 video control area)...

热门文章

  1. 临沂市的企业为什么要申请2023年的专精特新企业
  2. nc65 linux环境搭建,NC65搜索服务器搭建
  3. 【小程序】如何获取微信小程序代码上传密钥?
  4. 火山中文编程 -- 类、方法、参数
  5. 可能是一份最适合你的后端面试指南(部分内容前端同样适用)| 掘金技术征文...
  6. 安装matlab2014b软件时,在安装进度条完成后,没有弹出产品配置说明
  7. [CISCN 2021初赛]隔空传话
  8. Redis安装教程超详细
  9. vue项目中省市区3级联动
  10. 用C语言编写一个小游戏