系列文章目录

  1. 初识推荐系统——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一)
  2. 利用用户行为数据——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二)
  3. 项目主要效果展示——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(三)
  4. 项目体系架构设计——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(四)
  5. 基础环境搭建——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(五)
  6. 创建项目并初始化业务数据——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(六)
  7. 离线推荐服务建设——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(七)
  8. 实时推荐服务建设——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(八)
  9. 综合业务服务与用户可视化建设——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(九)
  10. 程序部署与运行——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(十)

项目资源下载

  1. 电影推荐系统网站项目源码Github地址(可Fork可Clone)
  2. 电影推荐系统网站项目源码Gitee地址(可Fork可Clone)
  3. 电影推荐系统网站项目源码压缩包下载(直接使用)
  4. 电影推荐系统网站项目源码所需全部工具合集打包下载(spark、kafka、flume、tomcat、azkaban、elasticsearch、zookeeper)
  5. 电影推荐系统网站项目源数据(可直接使用)
  6. 电影推荐系统网站项目个人原创论文
  7. 电影推荐系统网站项目前端代码
  8. 电影推荐系统网站项目前端css代码

文章目录

  • 系列文章目录
  • 项目资源下载
  • 前言
  • 一、实时推荐服务需求
  • 二、实时推荐算法设计
  • 三、实时推荐算法实现
    • 3.1 获取用户的K次最近评分
    • 3.2 获取当前电影最相似的K个电影
    • 3.3 电影推荐优先级计算
    • 3.4 将结果保存到MongoDBMongoDBMongoDB
    • 3.5 更新实时推荐结果
  • 四、实时系统联调
    • 4.1 启动实时系统的基本组件
    • 4.2 启动ZookeeperZookeeperZookeeper
    • 4.3 启动KafkaKafkaKafka
    • 4.4 启动KafkaStreamingKafka StreamingKafkaStreaming程序
    • 4.5 配置并启动FlumeFlumeFlume
    • 4.6 启动业务系统后台
  • 总结

前言

这篇博文算是整个系列的一个重点,是利用离线推荐服务的得到的数据进行实时推荐,并利用KafkaKafkaKafka、FlumeFlumeFlume等进行实时流传输,其中不仅包括代码还包括一些配置,需要读者对Linux系统比较熟悉,因为大部分都是在Linux系统上进行操作。本篇博文分四部分,分别是:实时推荐服务需求、实时推荐算法设计、实时推荐算法实现、实时系统联调。下面就开始今天的学习吧!


一、实时推荐服务需求

实时计算与离线计算应用于推荐系统上最大的不同在于实时计算推荐结果应该是反应最近一段时间用户近期的偏好,而离线计算推荐结果则是根据用户从第一次评分起的所有评分记录来计算用户总体的偏好
  用户对物品的偏好随着时间的推移总是会改变的。比如一个用户uuu在某时刻对电影ppp给予了极高的评分,那么在近期一段时间内,uuu极有可能很喜欢与电影ppp类似的其他电影;而如果用户uuu在某时刻对电影qqq给予了极低的评分,那么在近期一段时间,uuu极有可能不喜欢与电影qqq类似的其他电影。所以对于实时推荐,当用户对一个电影进行了评价之后,用户会希望推荐结果基于最近这几次评分进行一定的更新,使得推荐结果匹配用户近期的偏好,满足用户近期的口味
  如果实时推荐继续采用离线推荐中的ALSALSALS算法,由于算法运行时间巨大,不具有实时更新得到新的推荐结果的能力。并且由于算法本身使用的是评分表,用户本次评分之后只更新了总评分表中的一项,使得算法运行后的推荐结果与用户本次评分之前的推荐结果基本没有多少变化,从而给用户一种推荐结果一直没变化的感觉,很影响用户体验
  另外,在实时推荐中由于时间性能上要满足实时或者准实时的要求,所以算法的计算量不能太大,避免复杂、过多的计算造成用户体验的下降。鉴于此,推荐精度往往不会很高。实时推荐系统更关心推荐结果的动态变化能力,只要更新推荐结果的理由合理即可,至于推荐精度的要求可以适当放宽
  所以对于实时推荐算法,主要有两点需求:
  ①:用户本次评分后、或最近几个评分后系统可以明显的更新推荐结果
  ②:计算量不大,满足响应时间上的实时或者准实时要求

二、实时推荐算法设计

当用户uuu对电影ppp进行了评分,将触发一次对uuu的推荐结果的更新。由于用户uuu对电影ppp评分,对于用户uuu来说,他与ppp最相似的电影们之间的推荐强度将发生变化,所以选取与电影ppp最相似的KKK个电影作为侯选电影
  每个侯选电影按照“推荐优先级”这一权重作为衡量这个电影被推荐给用户uuu的优先级
  这些电影将根据用户uuu最近的若干评分计算出各自对用户uuu的推荐优先级,然后与上次对用户uuu的实时推荐结果进行基于推荐优先级的合并、替换得到更新后的推荐结果
  具体来说:首先获取用户uuu按时间顺序最近的KKK个评分,记为RKRKRK;获取电影ppp的最相似的KKK个电影集合,记为SSS;然后对于每个电影q∈Sq \in Sq∈S,计算推荐优先级EuqE_{uq}Euq​,计算公式如下:
Euq=∑r∈RKsim(q,r)∗Rrsim_sum+lgmax{incount,1}−lgmax{recount,1}E_{uq}=\frac{\sum_{r\in{RK}}sim(q,r)*R_{r}}{sim\_sum}+lgmax\{incount,1\}-lgmax\{recount,1\}Euq​=sim_sum∑r∈RK​sim(q,r)∗Rr​​+lgmax{incount,1}−lgmax{recount,1}

其中的参数意义如下:

  • RrR_rRr​表示用户uuu对电影rrr的评分
  • sim(q,r)sim(q,r)sim(q,r)表示电影qqq与电影rrr的相似度,设定最小相似度为0.6,当电影qqq和电影rrr相似度低于0.6的阈值,则视为两者不相关并忽略
  • sim_sumsim\_sumsim_sum表示qqq与RKRKRK中电影相似度大于最高阈值的个数
  • incountincountincount表示RKRKRK中与电影qqq相似的、且本身评分较高(≥3)的电影个数
  • recountrecountrecount表示RKRKRK中与电影qqq相似的、且本身评分较低(<3)的电影个数

此公式的意义为:首先对于每个候选电影qqq,从uuu最近的KKK个评分中,找出与qqq相似度较高(≥0.6≥0.6≥0.6)的uuu已评分电影,对于这些电影中的每个电影rrr,将rrr与qqq的相似度乘以用户uuu对rrr的评分,将这些乘积计算平均数,作为用户uuu对电影qqq的评分预测即:
∑r∈RKsim(q,r)∗Rrsim_sum\frac{\sum_{r\in{RK}}sim(q,r)*R_{r}}{sim\_sum}sim_sum∑r∈RK​sim(q,r)∗Rr​​

然后,将uuu最近的KKK个评分中与电影qqq相似的、且本身评分较高(≥3)的电影个数记为incountincountincount,计算lgmax{incount,1}lgmax \{ incount,1 \}lgmax{incount,1}作为电影qqq的“增强因子”,意义在于电影qqq与uuu的最近KKK个评分中的nnn个高评分(≥3)电影相似,则电影qqq的优先级被增加lgmax{incount,1}lgmax \{ incount,1 \}lgmax{incount,1}。如果电影qqq与uuu的最近KKK个评分中相似的高评分电影越多,也就是说nnn越大,则电影qqq更应该被推荐,所以推荐优先级被增强的幅度较大;如果电影qqq与uuu的最近KKK个评分中相似的高评分电影越少,也就是nnn越小,则推荐优先级被增强的幅度较小
  而后,将uuu最近的KKK个评分中与电影qqq相似的、且本身评分较低(<3)的电影个数记为recountrecountrecount,计算lgmax{recount,1}lgmax \{ recount,1 \}lgmax{recount,1}作为电影qqq的“削弱因子”,意义在于电影qqq与uuu的最近KKK个评分中的nnn个低评分(<3)电影相似,则电影qqq的优先级被削减lgmax{recount,1}lgmax \{ recount,1 \}lgmax{recount,1}。如果电影qqq与uuu的最近KKK个评分中相似的低评分电影越多,也就是说nnn越大,则电影qqq更不应该被推荐,所以推荐优先级被减弱的幅度较大;如果电影qqq与uuu的最近KKK个评分中相似的低评分电影越少,也就是nnn越小,则推荐优先级被减弱的幅度较小
  最后,将增强因子增加到上述的预测评分中,并减去削弱因子,得到最终的qqq电影对于uuu的推荐优先级。在计算完每个侯选电影qqq的EuqE_{uq}Euq​后,将生成一组<电影qqq的ID,qqq的推荐优先级>的列表updatedListupdatedListupdatedList:
updatedList=⋃q∈S{qID,Euq}updatedList = \bigcup_{q \in S}^{}\left \{ qID,E_{uq} \right \}updatedList=q∈S⋃​{qID,Euq​}
  而在本次为用户uuu实时推荐之前的上一次实时推荐结果RecRecRec也是一组<电影mmm的ID,mmm的推荐优先级>的列表,其大小也为KKK:
Rec=⋃m∈Rec{mID,Eum},len(Rec)=KRec = \bigcup_{m \in Rec}^{}\left \{ mID,E_{um} \right \},len(Rec)=KRec=m∈Rec⋃​{mID,Eum​},len(Rec)=K
  接下来,将updated_Supdated\_Supdated_S与本次为uuu实时推荐之前的上一次实时推荐结果RecRecRec进行合并、替换形成新的推荐结果NewRecNewRecNewRec:
NewRec=topK(i∈Rec∪updatedList,cmp=Eui)NewRec = topK(i \in Rec \cup updatedList,cmp=E_{ui})NewRec=topK(i∈Rec∪updatedList,cmp=Eui​)
  其中,iii表示updated_Supdated\_Supdated_S与RecRecRec的电影集合中的每个电影,topKtopKtopK是一个函数,表示从Rec∪updated_SRec \cup updated\_SRec∪updated_S中选择出最大的KKK个电影,cmp=Euicmp = E_{ui}cmp=Eui​表示topKtopKtopK函数将推荐优先级EuiE_{ui}Eui​值最大的KKK个电影选出来。最终,NewRecNewRecNewRec即为经过用户uuu对电影ppp评分后触发的实时推荐得到的最新推荐结果
  总之,实时推荐算法流程基本如下:

  1. 用户uuu对电影ppp进行了评分,触发了实时推荐的一次计算
  2. 选出电影ppp最相似的KKK个电影作为集合SSS
  3. 获取用户uuu最近时间内的KKK条评分,包含本次评分,作为集合RKRKRK
  4. 计算电影的推荐优先级,产生集合updated_Supdated\_Supdated_S

将updated_Supdated\_Supdated_S与上次对用户uuu的推荐结果利用上述公式进行合并,产生新的推荐结果NewRecNewRecNewRec,作为最终输出
  在recommender下新建子项目StreamingRecommender,引入Spark、Scalc、MongoDB、Redis和Kafka的依赖:

<dependencies><!-- Spark的依赖引入 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId></dependency><!-- 引入Scala --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId></dependency><!-- 加入MongoDB的驱动 --><!-- 用于代码方式连接MongoDB --><dependency><groupId>org.mongodb</groupId><artifactId>casbah-core_2.11</artifactId><version>${casbah.version}</version></dependency><!-- 用于Spark和MongoDB的对接 --><dependency><groupId>org.mongodb.spark</groupId><artifactId>mongo-spark-connector_2.11</artifactId><version>${mongodb-spark.version}</version></dependency><!-- redis --><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version></dependency><!-- kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.2.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency>
</dependencies>

代码中首先定义样例类和一个连接助手对象(用于建立Redis和MongoDB连接),并在src/main/scala/com.IronmanJay.streaming/StreamingRecommender.scala中定义一些常量:

package com.IronmanJay.streamingimport com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis// 定义连接助手对象,序列化
object ConnHelper extends Serializable {lazy val jedis = new Jedis("linux")lazy val mongoClient = MongoClient(MongoClientURI("mongodb://linux:27017/recommender"))
}case class MongoConfig(uri: String, db: String)// 定义一个基准推荐对象
case class Recommendation(mid: Int, score: Double)// 定义基于预测评分的用户推荐列表
case class UserRecs(uid: Int, recs: Seq[Recommendation])// 定义基于LFM电影特征向量的电影相似度列表
case class MovieRecs(mid: Int, recs: Seq[Recommendation])object StreamingRecommender {val MAX_USER_RATINGS_NUM = 20val MAX_SIM_MOVIES_NUM = 20val MONGODB_STREAM_RECS_COLLECTION = "StreamRecs"val MONGODB_RATING_COLLECTION = "Rating"val MONGODB_MOVIE_RECS_COLLECTION = "MovieRecs"def main(args: Array[String]): Unit = {}
}

实时推荐主体代码如下:

def main(args: Array[String]): Unit = {val config = Map("spark.cores" -> "local[*]","mongo.uri" -> "mongodb://linux:27017/recommender","mongo.db" -> "recommender","kafka.topic" -> "recommender")val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StreamingRecommender")// 创建一个SparkSessionval spark = SparkSession.builder().config(sparkConf).getOrCreate()// 拿到streaming contextval sc = spark.sparkContextval ssc = new StreamingContext(sc, Seconds(2)) // batch durationimport spark.implicits._implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))// 加载电影相似度矩阵数据,把它广播出去val simMovieMatrix = spark.read.option("uri", mongoConfig.uri).option("collection", MONGODB_MOVIE_RECS_COLLECTION).format("com.mongodb.spark.sql").load().as[MovieRecs].rdd.map { movieRecs => // 为了查询相似度方便,转换成map(movieRecs.mid, movieRecs.recs.map(x => (x.mid, x.score)).toMap)}.collectAsMap()val simMovieMatrixBroadCast = sc.broadcast(simMovieMatrix)// 定义kafka连接参数val kafkaParam = Map("bootstrap.servers" -> "linux:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "recommender","auto.offset.reset" -> "latest")// 通过kafka创建一个DStreamval kafkaStream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Array(config("kafka.topic")), kafkaParam))// 把原始数据UID|MID|SCORE|TIMESTAMP 转换成评分流val ratingStream = kafkaStream.map {msg =>val attr = msg.value().split("\\|")(attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt)}// 继续做流式处理,核心实时算法部分ratingStream.foreachRDD {rdds =>rdds.foreach {case (uid, mid, score, timestamp) => {println("rating data coming! >>>>>>>>>>>>>>>>")// 1. 从redis里获取当前用户最近的K次评分,保存成Array[(mid, score)]val userRecentlyRatings = getUserRecentlyRating(MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis)// 2. 从相似度矩阵中取出当前电影最相似的N个电影,作为备选列表,Array[mid]val candidateMovies = getTopSimMovies(MAX_SIM_MOVIES_NUM, mid, uid, simMovieMatrixBroadCast.value)// 3. 对每个备选电影,计算推荐优先级,得到当前用户的实时推荐列表,Array[(mid, score)]val streamRecs = computeMovieScores(candidateMovies, userRecentlyRatings, simMovieMatrixBroadCast.value)// 4. 把推荐数据保存到mongodbsaveDataToMongoDB(uid, streamRecs)}}}// 开始接收和处理数据ssc.start()println(">>>>>>>>>>>>>>> streaming started!")ssc.awaitTermination()}

三、实时推荐算法实现

实时推荐算法的前提:

  1. 在Redis集群中存储了每一个用户最近对电影的KKK次评分,实时算法可以快速获取
  2. 离线推荐算法已经将电影相似度矩阵提前计算到了MongoDB中
  3. Kafka已经获取到了用户实时的评分数据

算法过程如下:
  实时推荐算法输入为一个评分<userId, mid, rate, timestamp>,而执行的核心内容包括:获取userIduserIduserId最近KKK次评分、获取midmidmid最相似的KKK个电影、计算候选电影的推荐优先级、更新对userIduserIduserId的实时推荐结果

3.1 获取用户的K次最近评分

业务服务器在接收用户评分的时候,默认会将该评分情况以userIduserIduserId、midmidmid、rateraterate、timestamptimestamptimestamp的格式插入到Redis中的该用户对应的队列当中,在实时算法中,只需要通过Redis客户端获取相对应的队列内容即可

// redis操作返回的是java类,为了用map操作需要引入转换类import scala.collection.JavaConversions._def getUserRecentlyRating(num: Int, uid: Int, jedis: Jedis): Array[(Int, Double)] = {// 从redis读取数据,用户评分数据保存在 uid:UID 为key的队列里,value是 MID:SCOREjedis.lrange("uid:" + uid, 0, num - 1).map {item => // 具体每个评分又是以冒号分隔的两个值val attr = item.split("\\:")(attr(0).trim.toInt, attr(1).trim.toDouble)}.toArray
}

3.2 获取当前电影最相似的K个电影

在离线算法中,已经预先将电影的相似度矩阵进行了计算,所以每个电影(midmidmid)的最相似的KKK个电影很容易获取:从MongoDB中读取MOVIERecs数据,从midmidmid在simHashsimHashsimHash对应的子哈希表中获取相似度前KKK大的那些电影。输出是数据类型为Array[Int]的数组,表示与midmidmid最相似的电影集合,并命名为candidateMovies以作为侯选电影集合

/*** 获取跟当前电影做相似的num个电影,作为备选电影** @param num       相似电影的数量* @param mid       当前电影ID* @param uid       当前评分用户ID* @param simMovies 相似度矩阵* @return 过滤之后的备选电影列表*/
def getTopSimMovies(num: Int, mid: Int, uid: Int,simMovies:scala.collection.Map[Int,scala.collection.immutable.Map[Int, Double]])(implicit mongoConfig: MongoConfig): Array[Int] = {// 1. 从相似度矩阵中拿到所有相似的电影val allSimMovies = simMovies(mid).toArray// 2. 从mongodb中查询用户已看过的电影val ratingExist = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).find(MongoDBObject("uid" -> uid)).toArray.map {item => item.get("mid").toString.toInt}// 3. 把看过的过滤,得到输出列表allSimMovies.filter(x => !ratingExist.contains(x._1)).sortWith(_._2 > _._2).take(num).map(x => x._1)
}

3.3 电影推荐优先级计算

对于侯选电影集合simHashsimHashsimHash和userIduserIduserId的最近KKK个评分recentRatings,算法代码内容如下:

def computeMovieScores(candidateMovies: Array[Int],userRecentlyRatings: Array[(Int, Double)],simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Array[(Int, Double)] = {// 定义一个ArrayBuffer,用于保存每一个备选电影的基础得分val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()// 定义一个HashMap,保存每一个备选电影的增强减弱因子val increMap = scala.collection.mutable.HashMap[Int, Int]()val decreMap = scala.collection.mutable.HashMap[Int, Int]()for (candidateMovie <- candidateMovies; userRecentlyRating <- userRecentlyRatings) {// 拿到备选电影和最近评分电影的相似度val simScore = getMoviesSimScore(candidateMovie, userRecentlyRating._1, simMovies)if (simScore > 0.7) {// 计算备选电影的基础推荐得分scores += ((candidateMovie, simScore * userRecentlyRating._2))if (userRecentlyRating._2 > 3) {increMap(candidateMovie) = increMap.getOrDefault(candidateMovie, 0) + 1} else {decreMap(candidateMovie) = decreMap.getOrDefault(candidateMovie, 0) + 1}}}// 根据备选电影的mid做groupby,根据公式去求最后的推荐评分scores.groupBy(_._1).map {// groupBy之后得到的数据 Map( mid -> ArrayBuffer[(mid, score)] )case (mid, scoreList) =>(mid, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)))}.toArray.sortWith(_._2 > _._2)
}

其中,getMoviesimScore是取侯选电影和已评分电影的相似度,代码如下:

// 获取两个电影之间的相似度def getMoviesSimScore(mid1: Int, mid2: Int, simMovies: scala.collection.Map[Int,scala.collection.immutable.Map[Int, Double]]): Double = {simMovies.get(mid1) match {case Some(sims) => sims.get(mid2) match {case Some(score) => scorecase None => 0.0}case None => 0.0}
}

而log是对数运算,这里实现为取10的对数(常用对数):

// 求一个数的对数,利用换底公式,底数默认为10def log(m: Int): Double = {val N = 10math.log(m) / math.log(N)
}

3.4 将结果保存到MongoDBMongoDBMongoDB

saveRecsToMongoDB实现了结果的保存:

def saveDataToMongoDB(uid: Int, streamRecs: Array[(Int,
Double)])(implicit mongoConfig: MongoConfig): Unit = {// 定义到StreamRecs表的连接val streamRecsCollection = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_STREAM_RECS_COLLECTION)// 如果表中已有uid对应的数据,则删除streamRecsCollection.findAndRemove(MongoDBObject("uid" -> uid))// 将streamRecs数据存入表中streamRecsCollection.insert(MongoDBObject("uid" -> uid,"recs" -> streamRecs.map(x => MongoDBObject("mid" -> x._1, "score" -> x._2))))
}

3.5 更新实时推荐结果

当计算出候选电影的推荐优先级的数组updatedRecommends<mid, E>后,这个数组将被发送到Web后台服务器,与后台服务器上userIduserIduserId的上次实时推荐结果recentRecommends<mid, E>进行合并、替换并选出优先级EEE前KKK大的电影作为本次新的实时推荐。具体而言:

  1. 合并:将updatedRecommends 与recentRecommends并集合成为一个新的<mid, E>数组
  2. 替换(去重):当updatedRecommends 与recentRecommends有重复的电影midmidmid,recentRecommends中midmidmid的推荐优先级由于是上次实时推荐的结果,于是将其作废,并替换成代表了更新后的updatedRecommends的midmidmid的推荐优先级
  3. 选取TopKTopKTopK:在合并、替换后的<mid, E>数组上,根据每个MOVIE的推荐优先级,选择出前KKK大的电影,作为本次实时推荐的最终结果

四、实时系统联调

系统实时推荐的数据流向是:业务系统->日志->Flume日志采集->Kafka Streaming数据清洗和预处理->Spark Streaming流式计算。在完成实时推荐服务的代码后,应该与其他工具进行连调测试,确保系统正常运行

4.1 启动实时系统的基本组件

启动实时推荐系统StreamingRecommender以及MongoDB、Redis

4.2 启动ZookeeperZookeeperZookeeper

bin/zkServer.sh start

4.3 启动KafkaKafkaKafka

bin/kafka-server-start.sh -daemon ./config/server.properties

4.4 启动KafkaStreamingKafka StreamingKafkaStreaming程序

在recommender下新建module,KafkaStreaming,主要用来做日志数据的预处理,过滤出需要的内容。pom.xml文件需要引入依赖;

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>0.10.2.1</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.2.1</version></dependency>
</dependencies><build><finalName>kafkastream</finalName><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><configuration><archive><manifest><mainClass>com.IronmanJay.kafkastream.Application</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>

在src/main/java下新建java类com.IronmanJay.kafkastreaming.Application

package com.IronmanJay.kafkastream;import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;import java.util.Properties;public class Application {public static void main(String[] args) {String brokers = "linux:9092";String zookeepers = "linux:2181";// 输入和输出的topicString from = "log";String to = "recommender";// 定义kafka streaming的配置Properties settings = new Properties();settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeepers);settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);// 创建 kafka stream 配置对象StreamsConfig config = new StreamsConfig(settings);// 创建一个拓扑建构器TopologyBuilder builder = new TopologyBuilder();// 定义流处理的拓扑结构builder.addSource("SOURCE", from).addProcessor("PROCESSOR", () -> new LogProcessor(), "SOURCE").addSink("SINK", to, "PROCESSOR");KafkaStreams streams = new KafkaStreams(builder, config);streams.start();System.out.println("Kafka stream started!>>>>>>>>>>>");}
}

这个程序会将topic为"log"的信息流获取来做处理,并以"recommender"为新的topic转发出去,流处理程序LogProcess.java如下所示:

package com.IronmanJay.kafkastream;import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;public class LogProcessor implements Processor<byte[], byte[]> {private ProcessorContext context;@Overridepublic void init(ProcessorContext processorContext) {this.context = processorContext;}@Overridepublic void process(byte[] dummy, byte[] line) {// 把收集到的日志信息用string表示String input = new String(line);// 根据前缀MOVIE_RATING_PREFIX:从日志信息中提取评分数据if (input.contains("MOVIE_RATING_PREFIX:")) {System.out.println("movie rating data coming!>>>>>>>>>>>" + input);input = input.split("MOVIE_RATING_PREFIX:")[1].trim();context.forward("logProcessor".getBytes(), input.getBytes());}}@Overridepublic void punctuate(long l) {}@Overridepublic void close() {}
}

完成代码后,启动Application

4.5 配置并启动FlumeFlumeFlume

在FlumeFlumeFlume的conf下新建log-kafka.properties,对FlumeFlumeFlume连接Kafka做配置:

agent.sources = exectail
agent.channels = memoryChannel
agent.sinks = kafkasink# For each one of the sources, the type is defined
agent.sources.exectail.type = exec
# 下面这个路径是需要收集日志的绝对路径,改为自己的日志目录
agent.sources.exectail.command = tail –f 自己的日志目录路径
agent.sources.exectail.interceptors=i1
agent.sources.exectail.interceptors.i1.type=regex_filter
# 定义日志过滤前缀的正则
agent.sources.exectail.interceptors.i1.regex=.+MOVIE_RATING_PREFIX.+
# The channel can be defined as follows.
agent.sources.exectail.channels = memoryChannel# Each sink's type must be defined
agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkasink.kafka.topic = log
agent.sinks.kafkasink.kafka.bootstrap.servers = linux:9092
agent.sinks.kafkasink.kafka.producer.acks = 1
agent.sinks.kafkasink.kafka.flumeBatchSize = 20#Specify the channel the sink should use
agent.sinks.kafkasink.channel = memoryChannel# Each channel's type is defined.
agent.channels.memoryChannel.type = memory# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 10000

配置好后,启动FlumeFlumeFlume:

./bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent -Dflume.root.logger=INFO,console

4.6 启动业务系统后台

将业务代码加入系统中。注意在businessServer/src/main/resources/下的log4j.properties中,log4j.appender.file.File的值应该替换为自己的日志目录,与FlumeFlumeFlume中的配置应该相同
  启动业务系统后台,访问linux:8088/index.html,点击某个电影进行评分,查看实时推荐列表是否会发生变化


总结

这篇博文是整个系列博文的重点,又是难点,没想到洋洋洒洒写了大约2万字,希望读者可以认认真真一步一步的操作,当系统做到这里的时候,基本已经成形了,后面我将会给大家讲解系统的后台与前端服务功能的实现,讲解框架与实现原理,那就,下篇博文见啦!

实时推荐服务建设——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(八)相关推荐

  1. 项目体系架构设计——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(四)

    系列文章目录 初识推荐系统--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一) 利用用户行为数据--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二) 项目主要效果展示--基 ...

  2. 利用用户行为数据——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二)

    系列文章目录 初识推荐系统--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一) 利用用户行为数据--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二) 项目主要效果展示--基 ...

  3. 基础环境搭建——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(五)

    系列文章目录 初识推荐系统--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一) 利用用户行为数据--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二) 项目主要效果展示--基 ...

  4. 基于Spark平台的协同过滤实时电影推荐系统

    文章目录 摘要 0 引言 1 协同过滤算法 2 实时推荐服务 3 电影推荐系统设计部署 3.1 架构设计 3.2 系统功能设计 3.3 数据库设计 4 系统运行效果 5 结语 参考文献 摘要 摘要:随 ...

  5. 订单支付和评论——基于Django框架的天天生鲜电商网站项目系列博客(十五)

    系列文章目录 需求分析--基于Django框架的天天生鲜电商网站项目系列博客(一) 网站框架搭建--基于Django框架的天天生鲜电商网站项目系列博客(二) 用户注册模块--基于Django框架的天天 ...

  6. 网站框架搭建——基于Django框架的天天生鲜电商网站项目系列博客(二)

    系列文章目录 需求分析--基于Django框架的天天生鲜电商网站项目系列博客(一) 网站框架搭建--基于Django框架的天天生鲜电商网站项目系列博客(二) 用户注册模块--基于Django框架的天天 ...

  7. 基于Spark MLlib平台的协同过滤算法---电影推荐系统

    协同过滤算法概述 基于模型的协同过滤应用---电影推荐 实时推荐架构分析     一.协同过滤算法概述 本人对算法的研究,目前还不是很深入,这里简单的介绍下其工作原理. 通常,协同过滤算法按照数据使用 ...

  8. spark MLlib平台的协同过滤算法---电影推荐系统

    又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用. 说到推荐系统,大家可能立马会想到协同过滤算法.本文基于Spark MLlib平台实 ...

  9. python基于用户画像和协同过滤实现电影推荐系统

    1.概要 传统电影推荐系统大多使用协同过滤算法实现电影推荐,主要实现机理是通过用户评分及用户观影历史数据抽象为多维向量利用欧式距离或其他向量计算公式实现推荐,本文中将采用常用的机器学习算法Kmeans ...

最新文章

  1. NSUserDefaults的用法
  2. [转载] 晓说——第16期:古代科举那些事——由来
  3. python代码大全表解释-python中的字典用法大全的代码
  4. 程序员面试题精选100题(26)-和为n连续正数序列[算法]
  5. C++ Primer 5th笔记(chap 18 大型程序工具)类型转换与多个基类
  6. oracle9i用expdp导出全库,Linux下Oracle 11g数据库全库自动备份(EXPDP)
  7. 拓展 - WebRTC 多视频网络拓扑之三种架构
  8. GitBash添加tree命令
  9. 今晚7点30,腾讯专家与你共探广告智能创意新可能
  10. kuangbin 基础DP1
  11. xp计算机连接不上网络打印机驱动,XP系统下如何设置连接网络打印机?
  12. 《查拉图斯特拉如是说》读书笔记
  13. 3G爱立信告警信息提取
  14. java中逗号运算符的含义_逗号运算符什么时候有用?
  15. linux下的软件包
  16. 端到端的语音识别模型
  17. C语言--棋盘麦粒问题
  18. java 多线程ppt_多线程ppt|doc|docx|pptx转换pdf 【转】
  19. 振镜可以用计算机控制,振镜扫描式打标头
  20. mysql启动提示少vc90.crt_近乎(Spacebuilder) v4.1.0.1源码

热门文章

  1. 6、Xcode导入第三方库(Alamofire)的方法
  2. 老徐教你学C语言(C语言进门教程)
  3. 简单为蒲公英在线教学系统进行优化-01
  4. TVS二极管-百度百科
  5. TMS320C645x DSP SRIO寄存器(六)——LSU控制寄存器和拥塞控制
  6. 差异表达分析之FDR
  7. 在React中设置一个复选框“ check”属性
  8. react 全家桶安装
  9. 3-多点温度采集模块设计--DS18B20驱动函数模块设计
  10. Dapp开发教程二一 Asch Dapp Asset