项目四:使用SparkSQL开发的简易推荐系统
一、理论知识
1.1最主流的推荐算法:协同过滤
1.2 其他推荐算法的了解
1.3 推荐系统的大纲
1.4 环境说明:
本项目是基于用户画像生成的向量数据进行计算,至于我整个项目,我会尽力把前面两个项目补完整。项目0和项目一可能存在一点瑕疵,我有时间再完善一下(我真的是从头到尾搭建了一遍,路上还遇到不少bug)。
项目0单节点的虚拟机做大数据开发(四万字全)_林柚晞的博客-CSDN博客
项目一实时数仓数据采集_林柚晞的博客-CSDN博客
至于准实时数仓和用户画像的项目我会后面再整理更新。
二、部署
2.1 生成Itemcf(物品协同过滤)的源数据表
因为Itemcf是基于event表(里面有用户id和用户对网页的浏览、点击、点赞等行为)
下面这个表是专门存3天内 用户 的对网站发生点击行为的用户行为表。
start-all.sh
hive --service metastore &
hive --service hiveserver2 &
hive
启动presto
launcher start
presto-cli --server qianfeng01:8090 --catalog hive
show schemas; #查看数据库
下面是在presto中建表
create table dwb_news.user_acticle_action comment 'user article action data' with(format='ORC')
as
with t1 as (
select
distinct_id as uid,
article_id as aid,
case when(event = 'AppPageView') then '点击'
else action_type end as action,
logday as action_date
from ods_news.event
where event in ('NewsAction', 'AppPageView')
and logday >= format_datetime(cast('2022-03-21' as timestamp), 'yyyyMMdd')
and logday < format_datetime(cast('2022-03-23' as timestamp), 'yyyyMMdd')
and article_id <> ''
)
select uid, aid, action, max(action_date) as action_date from t1 where action <> ''
group by uid, aid, action;
select uid, aid, action, action_date from dwb_news.user_acticle_action limit 10;
我是在hive中查询的
其实我们需要前三列数据,转换为(uid,aid,score)
人为设定一个行为的数据,比如:点击:0.1, 分享:0.15, 评论:0.2, 收藏:0.25,点赞:0.3。用户对一个文章发生了行为的时候,就自动转化为评分,这些分数累加在一起就是1 。
基于评分,还有一个时间函数,降低权重,因为规定行为发生越久远,权重越低,用户的兴趣会降低。
在这里我们使用udf函数定义这个行为转换为评分。
框里面是不同数据类型的转换(从上到下),最后的DataFrame要和评分表关联。
2.2 udf函数:读取源数据制作评分表
打开idea新建一个maven工程
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.qf.bigdata</groupId><artifactId>recommend</artifactId><version>1.0</version><properties><scala.version>2.11.12</scala.version><play-json.version>2.3.9</play-json.version><maven-scala-plugin.version>2.10.1</maven-scala-plugin.version><scala-maven-plugin.version>3.2.0</scala-maven-plugin.version><maven-assembly-plugin.version>2.6</maven-assembly-plugin.version><spark.version>2.4.5</spark.version><scope.type>compile</scope.type><json.version>1.2.3</json.version><hbase.version>1.3.6</hbase.version><hadoop.version>2.8.1</hadoop.version><!--compile provided--></properties><dependencies><!--json 包--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${json.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version><scope>${scope.type}</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version><scope>${scope.type}</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>${spark.version}</version><scope>${scope.type}</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.11</artifactId><version>${spark.version}</version><scope>${scope.type}</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>${scope.type}</scope></dependency><dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.6</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version><scope>${scope.type}</scope></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>${scala.version}</version><scope>${scope.type}</scope></dependency><dependency><groupId>com.github.scopt</groupId><artifactId>scopt_2.11</artifactId><version>4.0.0-RC2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-avro_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-jdbc</artifactId><version>2.3.7</version><scope>${scope.type}</scope><exclusions><exclusion><groupId>javax.mail</groupId><artifactId>mail</artifactId></exclusion><exclusion><groupId>org.eclipse.jetty.aggregate</groupId><artifactId>*</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version><scope>${scope.type}</scope></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>${hbase.version}</version><scope>${scope.type}</scope></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>${hbase.version}</version><scope>${scope.type}</scope></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-hadoop2-compat</artifactId><version>${hbase.version}</version><scope>${scope.type}</scope></dependency><dependency><groupId>org.jpmml</groupId><artifactId>jpmml-sparkml</artifactId><version>1.5.9</version></dependency></dependencies><repositories><repository><id>alimaven</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url><releases><updatePolicy>never</updatePolicy></releases><snapshots><updatePolicy>never</updatePolicy></snapshots></repository></repositories><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>${maven-assembly-plugin.version}</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>${scala-maven-plugin.version}</version><executions><!-- 先编译scala,防止 cannot find symbol --><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-archetype-plugin</artifactId><version>2.2</version></plugin><plugin><groupId>org.codehaus.mojo</groupId><artifactId>build-helper-maven-plugin</artifactId><version>1.8</version><executions><!-- Add src/main/scala to eclipse build path --><execution><id>add-source</id><phase>generate-sources</phase><goals><goal>add-source</goal></goals><configuration><sources><source>src/main/java</source></sources></configuration></execution><!-- Add src/test/scala to eclipse build path --><execution><id>add-test-source</id><phase>generate-test-sources</phase><goals><goal>add-test-source</goal></goals><configuration><sources><source>src/test/java</source></sources></configuration></execution></executions></plugin></plugins></build> </project>
并且把包导入。
使用spark编写的,所以这个框架也必须引入scala语法
在项目文件夹右击->add Framework support
我的idea是配置了scala的插件(file->settings->plugins->选择scala)
在main这里添加一个scala文件夹
在resource中要放hive-site.xml,hdfs-site.xml,core-site.xml(就是虚拟机中的hdfs和hive的配置文件)
下面的代码是在scala文件夹中编程
2.2.1 搭建项目基本框架
重点主要是搭建上面的config和spark工具类!
配置类(用于通过类名来解析类,里面还配置了IP地址和主机号)
2.2.1.1 配置类
功能: 环境的参数、AppName类型匹配、解析参数
package comqf.bigdata.confimport org.slf4j.LoggerFactorycase class Config(env:String = "",hBaseZK:String = "",hBasePort:String = "2181",hFileTmpPath:String = "",tableName:String = "recommend:news-cf",irisPath:String = "",proxyUser:String = "root",topK:Int = 10) object Config {private val logger = LoggerFactory.getLogger(Config.getClass.getSimpleName)/*** 解析参数* @param obj : 用于判断解析参数类的类型* @param args : 具体的参数值*/def parseConfig(obj: Object, args: Array[String]): Config = {//1. 获取到程序名称val programName = obj.getClass.getSimpleName.replace("$", "")//2. 类似于getopts命令//2.1 得到解析器val parser = new scopt.OptionParser[Config](s"ItemCF ${programName}") {head(programName, "v1.0")opt[String]('e', "env").required().action((x, config) => config.copy(env = x)).text("dev or prod")opt[String]('x', "proxyUser").optional().action((x, config) => config.copy(proxyUser = x)).text("proxy username")programName match {case "ItemCF" => {logger.info(s"ItemCF is staring ---------------------------->")opt[String]('z', "hBaseZK").required().action((x, config) => config.copy(hBaseZK = x)).text("hBaseZK")opt[String]('p', "hBasePort").required().action((x, config) => config.copy(hBasePort = x)).text("hBasePort")opt[String]('f', "hFileTmpPath").required().action((x, config) => config.copy(hFileTmpPath = x)).text("hFileTmpPath")opt[String]('t', "tableName").required().action((x, config) => config.copy(tableName = x)).text("tableName")opt[Int]('k', "topK").required().action((x, config) => config.copy(topK = x)).text("topK")}case _ =>}}//2.2 解析parser.parse(args, Config()) match {case Some(conf) => confcase None => {logger.error(s"cannot parse args")System.exit(-1)null}}} }
2.2.1.2 Spark工具类
(主要是通过传入的env和appName判断模式是开发还是生产,模式不同其实配置参数也不同)
package comqf.bigdata.utilsimport org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.slf4j.LoggerFactoryobject SparkUtils {private val logger = LoggerFactory.getLogger(SparkUtils.getClass.getSimpleName)/*** 获取到sparksession对象,并且解析两种模式,生产模式还是开发模式*/def getSparkSession(env: String, appName: String): SparkSession = {val conf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.sql.hive.metastore.version", "1.2.1").set("spark.sql.cbo.enabled", "true").set("spark.hadoop.dfs.client.block.write.replica-datanode-on=failure.enable", "true").set("spark.hadoop.dfs.client.block.write.replica-datanode-on=failure.policy", "NEVER")env match {case "prod" => {conf.setAppName(appName+"_prod")SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()}case "dev" => {conf.setMaster("local[*]").setAppName(appName+"_dev").set("spark.sql.hive.metastore.jars", "maven")SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()}case _ =>{logger.error("not match env")System.exit(-1)null}}} }
2.2.2 转换
2.2.2.1设置一个枚举装用户行为的常量
不知道怎么回事那个思维导图方向反了O(∩_∩)O哈哈~
package comqf.bigdata.constantobject Action extends Enumeration {type Action = Valueval CLICK = Value("点击")val SHARE = Value("分享")val COMMENT = Value("评论")val COLLECT = Value("收藏")val LIKE = Value("点赞")//用于打印常量的函数def showAll = this.values.foreach(println)//用于根据名称查询枚举的值def withNameOpt(name:String):Option[Value] = this.values.find(_.toString == name) }
2.2.2.2设置一个常量类专门存放常量的
package comqf.bigdata.constant //常量类:专门存放常量的 object Constant {//时间函数中的有效期,100天咯val ARTICLE_AGING_TIME = 100 }
2.2.2.3设置一个时间类
(具体就获取一个有效时长)
package comqf.bigdata.utilsimport java.text.SimpleDateFormat import java.util.{Calendar, Date}object DateUtils {//字符串转datedef string2date(date:String):Date = {val fmt = new SimpleDateFormat("yyyyMMdd")fmt.parse(date)}//date距离今时间def diffDay2Now(date:String) = {val now :Calendar = Calendar.getInstance() //现在的时间val today:Long = now.getTimeInMillis //现在的毫秒数val current :Long = string2date(date).getTimeval between :Long = (today -current) /(1000 * 3600 * 24) //相差天数Integer.parseInt(String.valueOf(between))}}
2.2.2.3设置一个RateUDF
还有个函数是为了行为值*时间值
package comqf.bigdata.udfs import comqf.bigdata.utils import comqf.bigdata.constant.{Action, Constant} import comqf.bigdata.utils.DateUtilsobject RateUDF {//行为的权重def getActionWeight(action:String) = {Action.withNameOpt(action).getOrElse() match {case Action.CLICK => 0.1fcase Action.LIKE =>0.15fcase Action.COLLECT =>0.2fcase Action.SHARE => 0.25fcase Action.COMMENT => 0.3fcase _ => 0.0f}}//这个函数无论传什么值进去都会生成一个0-1的数private def sigmoid(d:Double):Double = 1 / (1+Math.exp(1.0- d))def getDateWeight(date: String) = {try {//1. 获取(数据价值时间范围-数据行为时间距今的时间)的差var interval:Int = Constant.ARTICLE_AGING_TIME - DateUtils.diffDay2Now(date)if (interval < 0) interval = 1 // 表示行为发生的时间已经超过了数据最有价值的时间val x: Double = interval.toDouble - 7sigmoid(x * 0.8).toFloat}catch {case e:Exception => e.printStackTrace()0.0f}}def action2rate(action:String,date:String) :Float = {//行为权重 * 时间val rate = getActionWeight(action)* getDateWeight(date)return rate}}
2.2.3设置一个ModelData
这是一个训练模型(协同过滤的父类)
package comqf.bigdata.transformer import comqf.bigdata.udfs.RateUDF import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.slf4j.LoggerFactoryclass ModelData(spark:SparkSession,env:String) {private val logger = LoggerFactory.getLogger(ModelData.getClass.getSimpleName)// 注册udf函数spark.udf.register("action2rate", RateUDF.action2rate _)/*** 生成评分表*/def generateEachAction() = {spark.sql(//1. 读取原始数据并建立了虚表s"""|select uid, aid, action, action_date from dwb_news.user_acticle_action|""".stripMargin).createOrReplaceTempView("source_data")//2. 计算spark.sql(s"""|select uid, aid, action, action_date,|action2rate(action, action_date) as rate|from source_data|""".stripMargin).createOrReplaceTempView("source_data_rate")}/*** 将原始数据转换为(uid, aid, rate)* rete:一个用户对一个文章的所有的行为的评分之和* 原始数据:* uid | aid | action | action_date* ------+-------+--------+-------------* 3713 | 21957 | 点赞 | 20211225* 3187 | 3976 | 收藏 | 20211225* 2554 | 14202 | 分享 | 20211225* 1937 | 18172 | 点击 | 20211225* 4500 | 23407 | 分享 | 20211225** 处理之后:* uid | aid | rate* ------+-------+------* 3713 | 21957 | 13* 3187 | 3976 | 14*/def getUserRatingData():DataFrame = {//1. 生成每个评分的评分表generateEachAction()//2. 计算评分spark.sql(s"""|select|cast(uid as bigint) as uid,|cast(aid as bigint) as aid,|cast(sum(rate) as double) as rate|from source_data_rate group by uid, aid order by uid|""".stripMargin)}/*** 关联训练和相似的dataframe,从而获取到文章的相似度的评分*/def joinRateDFAndSimDF(trainning: Dataset[Row], simDF:Dataset[Row]) = {//1. 创建评分表trainning.createOrReplaceTempView("user_rate")simDF.createOrReplaceTempView("sim_item")//2. 执行sqlspark.sql(s"""|select|t1.uid, t1.aid, t1.rate,|t2.aid as aid2, t2.sim_aid, t2.sim, t1.rate * t2.sim as rsp|from user_rate as t1 left join sim_item as t2 on t1.aid = t2.aid|where t2.sim is not null|""".stripMargin)}/*** 为用户推荐topk的内容,同时多虑已经有的行为的内容*/def recommendAllUser(joinDF: DataFrame, topK: Int) = {joinDF.createOrReplaceTempView("rate_sim")spark.sql(s"""|with t1 as(-- 用户对于相似文章的预测评分:预测值|select uid, sim_aid, sum(rsp) / sum(rate) as pre_rate|from rate_sim group by uid, sim_aid|),|t2 as ( -- 剔除一部分已经阅读|select t1.* from t1|left join user_rate as ur on t1.uid = ur.uid and t1.sim_aid = ur.aid|where ur.rate is not null|),|t3 as ( -- 排名|select|uid, sim_aid, pre_rate,|row_number() over(partition by uid order by pre_rate desc) as rank|from t2|)|select|cast(uid as int) as uid,|cast(sim_aid as int) as sim_aid,|cast(pre_rate as double) as pre_rate|from t3 where rank <= ${topK}|""".stripMargin)} }object ModelData {def apply(spark: SparkSession, env: String): ModelData = new ModelData(spark, env) }
2.2.4设置一个物品的协同过滤
功能主要是
(1)获取配置参数
(2)读取数据,并且数据类型转换
(3)关联训练数据与测试数据关联
package comqf.bigdataimport comqf.bigdata.conf.Config import comqf.bigdata.transformer.ItemCFModelData import comqf.bigdata.utils.{HBaseUtils, SparkUtils} import org.apache.log4j.{Level, Logger} import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import org.slf4j.LoggerFactory/*** 基于物品的协同过滤*/ object ItemCF {private val logger = LoggerFactory.getLogger(ItemCF.getClass.getSimpleName)def main(args: Array[String]): Unit = {//1. 准备工作Logger.getLogger("org").setLevel(Level.WARN)val params:Config = Config.parseConfig(ItemCF, args)System.setProperty("HADOOP_USER_NAME", params.proxyUser)logger.warn("job is running, please wait for a moment")val spark:SparkSession = SparkUtils.getSparkSession(params.env, "itemcf app")import spark.implicits._//2. 基础数据处理//2.1 获取到ItemCF的模型对象val modelData = ItemCFModelData(spark, params.env)//2.2 将原始的数据转换为(uid, aid, rate)val rateDF:DataFrame = modelData.getUserRatingData()logger.warn("rateDF ---------------------------------------->")rateDF.show()//2.3 将得到的数据分为两部分:1. 测试数据; 2. 训练数据val Array(training, test) = rateDF.randomSplit(Array(0.6, 0.4))training.cache()//2.4 将dataframe转换坐标矩阵:源数据的矩阵val rateMatrix = modelData.rateDF2Matrix(training)//2.5 求相似矩阵——底层就是利用了求余弦相似度val simMatrix: CoordinateMatrix = rateMatrix.toRowMatrix().columnSimilarities()//2.6 相似度矩阵对象转换dataframeval simDF = modelData.simMatrix2DF(simMatrix)logger.warn("simDF ---------------------------------------->")simDF.show()//2.7 将评分的训练用的df和相似的df关联起来val joinDF = modelData.joinRateDFAndSimDF(training, simDF)logger.warn("joinDF ---------------------------------------->")joinDF.show()training.unpersist()joinDF.cache()//2.8 使用测试数据和之前的散点数据对文章进行预测评分val predictDF = modelData.predictTestData(joinDF, test)logger.warn("predictDF ---------------------------------------->")predictDF.show()joinDF.unpersist()//2.9 计算推荐效果好不好//2.9.1 创建评估器val evaluator = new RegressionEvaluator().setLabelCol("rate") // 真实值.setPredictionCol("pre_rate") // 预测值//2.9.2 计算误差// val rmse: Double = evaluator.setMetricName("rmse").evaluate(predictDF)// logger.warn(s"itemcf rmse:${rmse}")//2.10 取用户topkval recommendDF = modelData.recommendAllUser(joinDF, params.topK)logger.warn("recommendDF ---------------------------------------->")recommendDF.show()//2.11 将结果先在HDFS存放一份,然后再存HBase,因为我自己已经生成一份表了,然后就自己创建了一个新表名recommendDF.write.mode(SaveMode.Overwrite).format("ORC").saveAsTable("dwb_news.itemcf1")//2.12 将数据保存到HBase//2.12.1 获取到hbase工具类val hBaseUtils: HBaseUtils = HBaseUtils(spark, params.hBaseZK, params.hBasePort)logger.warn("hBaseUtils ---------------------------------------->")//2.12.2 df --> rddval convertDF = modelData.recommendDataConvert(recommendDF)val hfileRDD = modelData.itemcf2RDD(convertDF)//2.12.3 保存到hbasehBaseUtils.loadHFile2HBase(hfileRDD,params.tableName,params.hFileTmpPath)//释放资源spark.stop()logger.info("job successful")} }
2.2.5设置一个基于物品的协同过滤策略的模型数据类
ItemCFModelData是继承ModelData的一个协同过滤算法,这个类是对ItemCF中的一些转换内容函数的定义
package comqf.bigdata.transformerimport comqf.bigdata.utils.HBaseUtils import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.slf4j.LoggerFactoryimport scala.collection.mutable.ListBuffer/*** 基于物品的协同过滤策略的模型数据类*/ class ItemCFModelData(spark:SparkSession, env:String) extends ModelData(spark:SparkSession, env:String) {/*** 将推荐算法结果转换为RDD* 需要先建立hbase的表* 行建:uid* 列簇:f1* 列名:itemcf* 值:推荐的分值*/def itemcf2RDD(convertDF: DataFrame) = {convertDF.rdd.sortBy(x => x.get(0).toString).flatMap(row => {//1. 获取到原始数据值val uid: String = row.get(0).toString/** [(sim_aid, pre_rate), (sim_aid, pre_rate), ...]* |* sim_aid:pre_rate, sim_aid:pre_rate, ...*/val items: String = row.getAs[Seq[Row]](1).map(item => {item.getInt(0).toString + ":" + item.getDouble(1).formatted("%.4f")}).mkString(",")//2. 创建集合准备存放这个结果val listBuffer = new ListBuffer[(ImmutableBytesWritable, KeyValue)]//3. 存放val kv = new KeyValue(Bytes.toBytes(uid), Bytes.toBytes("f1"), Bytes.toBytes("itemcf"), Bytes.toBytes(items))//4. 将kv添加到listBufferlistBuffer.append((new ImmutableBytesWritable(), kv))listBuffer})}/*** (1, 2, 11) --> (uid, (sim_aid, pre_rate))* (1, 3, 12)* ---> (uid, [(sim_aid, pre_rate), (sim_aid, pre_rate)])* e.g.* (1, [(2,11), (3,12), ...])*/def recommendDataConvert(recommendDF: DataFrame) = {import spark.implicits._recommendDF.rdd.map(row => (row.getInt(0), (row.getInt(1), row.getDouble(2)))).groupByKey().mapValues(sp => {var seq: Seq[(Int, Double)] = Seq[(Int, Double)]()sp.foreach(tp => {seq :+= (tp._1, tp._2)})seq.sortBy(_._2)}).toDF("uid", "recommendactions")}private val logger = LoggerFactory.getLogger(ItemCFModelData.getClass.getSimpleName)/*** 通过测试数据预测结果*/def predictTestData(joinDF: DataFrame, test: Dataset[Row]) = {//1. 建立虚表joinDF.createOrReplaceTempView("rate_sim")test.createOrReplaceTempView("test_data")//2. 执行sql/** rsp:用户对于与原文中相似的文章的评分* sim:用户对于原文章的评分*/spark.sql(s"""|with t1 as( -- 用户对于相似文章的预测评分:预测值|select uid, sim_aid, sum(rsp) / sum(rate) as pre_rate|from rate_sim group by uid, sim_aid|),|t2 as ( -- 用户对于原文中的评分:真实值|select uid, aid, rate from test_data|)|select t2.*, t1.pre_rate from t2 inner join t1 on t2.aid = t1.sim_aid and t1.uid = t2.uid|where t1.pre_rate is not null|""".stripMargin)}/*** 将矩阵转换为一个Dataframe*/def simMatrix2DF(simMatrix: CoordinateMatrix) = {//1. 获取到矩阵内部的数据:RDDval transformerRDD: RDD[(String, String, Double)] = simMatrix.entries.map {case MatrixEntry(row: Long, column: Long, sim: Double) => (row.toString, column.toString, sim)}//2. rdd-->dataframeval simDF: DataFrame = spark.createDataFrame(transformerRDD).toDF("aid", "sim_aid", "sim")//3. 合并结果simDF.union(simDF.select("aid", "sim_aid", "sim"))}/*** 将评分数据表转化为评分矩阵** uid aid rate uid/aid 1 2 3* 1 1 0.8 1 0.8 0.1* 1 2 0.1 2 0.6* 2 1 0.6 -> 3 0.8* 3 1 0.8 4 0.25* 4 3 0.25*/def rateDF2Matrix(df: DataFrame) = {//1. Row --> MatrixEntryval matrixRDD: RDD[MatrixEntry] = df.rdd.map {case Row(uid: Long, aid: Long, rate: Double) => MatrixEntry(uid, aid, rate)}//2. 返回分布式矩阵new CoordinateMatrix(matrixRDD)}}object ItemCFModelData {def apply(spark: SparkSession, env: String): ItemCFModelData = new ItemCFModelData(spark, env) }
2.2.6HBaseUtils
package comqf.bigdata.utilsimport org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.slf4j.LoggerFactoryobject SparkUtils {private val logger = LoggerFactory.getLogger(SparkUtils.getClass.getSimpleName)/*** 获取到sparksession对象,并且解析两种模式,生产模式还是开发模式*/def getSparkSession(env: String, appName: String): SparkSession = {val conf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.sql.hive.metastore.version", "1.2.1").set("spark.sql.cbo.enabled", "true").set("spark.hadoop.dfs.client.block.write.replica-datanode-on=failure.enable", "true").set("spark.hadoop.dfs.client.block.write.replica-datanode-on=failure.policy", "NEVER")env match {case "prod" => {conf.setAppName(appName+"_prod")SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()}case "dev" => {conf.setMaster("local[*]").setAppName(appName+"_dev").set("spark.sql.hive.metastore.jars", "maven")SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()}case _ =>{logger.error("not match env")System.exit(-1)null}}} }
2.3测试
写到上面就可以完全读取数据并且可以在hbase中创建表
(1)把项目打成jar
如果打包还存在报错,就要查看文件中的包名是否正确!
(2)打开target文件夹后面名字最长的jar包上传到虚拟机中
把上面这个Jar包拖到虚拟机里面
(3)虚拟机
start-all.sh
hive --service hiveserver2 &
hive --service metastore &
zkServer.sh start
start-hbase.sh
hbase shell
#在hbase中
create_namespace 'recommend'
create 'recommend:itemcf', 'f1'
(4) 运行spark程序
${SPARK_HOME}/bin/spark-submit \
--jars /usr/local/hive/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar \
--conf spark.sql.hive.convertMetastoreParquet=false \
--conf spark.executor.heartbeatInterval=120s \
--conf spark.network.timeout=600s \
--conf spark.sql.catalogImplementation=hive \
--conf spark.yarn.submit.waitAppCompletion=false \
--name itemcf \
--conf spark.task.cpus=1 \
--conf spark.executor.cores=4 \
--conf spark.sql.shuffle.partitions=50 \
--master yarn \
--deploy-mode cluster \
--driver-memory 512M \
--executor-memory 3G \
--num-executors 1 \
--class comqf.bigdata.ItemCF \
/data/jar/recommend-1.0-jar-with-dependencies.jar \
-e prod -x root -z 192.168.10.101 -p 2181 -f /tmp/hfile -t recommend:itemcf -k 3
就是我想说明我虚拟机总是有各种各样的bug,我重启了好几次
执行完之后hive会有一个itemcf表(上面那个截图是好久之前做的,我这次hive没辙)
3、ALSCF
dwb_news.user_article_action加载原始数据,然后转换为评分
使用spark mllib的als算法,求解出用户向量矩阵和物品向量的矩阵
评估模型:RMSE
给用户生成推荐列表
存回hbase,recommend:alscf
3.1配置程序的修改,其实还是通过程序名来解析程序
package comqf.bigdata.confimport org.slf4j.LoggerFactorycase class Config(env:String = "",hBaseZK:String = "",hBasePort:String = "2181",hFileTmpPath:String = "",tableName:String = "recommend:news-cf",irisPath:String = "",proxyUser:String = "root",topK:Int = 10) object Config {private val logger = LoggerFactory.getLogger(Config.getClass.getSimpleName)/*** 解析参数* @param obj : 用于判断解析参数类的类型* @param args : 具体的参数值*/def parseConfig(obj: Object, args: Array[String]): Config = {//1. 获取到程序名称val programName = obj.getClass.getSimpleName.replace("$", "")//2. 类似于getopts命令//2.1 得到解析器val parser = new scopt.OptionParser[Config](s"ItemCF ${programName}") {head(programName, "v1.0")opt[String]('e', "env").required().action((x, config) => config.copy(env = x)).text("dev or prod")opt[String]('x', "proxyUser").optional().action((x, config) => config.copy(proxyUser = x)).text("proxy username")programName match {case "ItemCF" => {logger.info(s"ItemCF is staring ---------------------------->")opt[String]('z', "hBaseZK").required().action((x, config) => config.copy(hBaseZK = x)).text("hBaseZK")opt[String]('p', "hBasePort").required().action((x, config) => config.copy(hBasePort = x)).text("hBasePort")opt[String]('f', "hFileTmpPath").required().action((x, config) => config.copy(hFileTmpPath = x)).text("hFileTmpPath")opt[String]('t', "tableName").required().action((x, config) => config.copy(tableName = x)).text("tableName")opt[Int]('k', "topK").required().action((x, config) => config.copy(topK = x)).text("topK")}case "AlsCF" =>logger.info(s"AlsCF is staring ---------------------------->")opt[String]('z', "hBaseZK").required().action((x, config) => config.copy(hBaseZK = x)).text("hBaseZK")opt[String]('p', "hBasePort").required().action((x, config) => config.copy(hBasePort = x)).text("hBasePort")opt[String]('f', "hFileTmpPath").required().action((x, config) => config.copy(hFileTmpPath = x)).text("hFileTmpPath")opt[String]('t', "tableName").required().action((x, config) => config.copy(tableName = x)).text("tableName")opt[Int]('k', "topK").required().action((x, config) => config.copy(topK = x)).text("topK")case _ =>}}//2.2 解析parser.parse(args, Config()) match {case Some(conf) => confcase None => {logger.error(s"cannot parse args")System.exit(-1)null}}} }
3.2 AlsCF
package comqf.bigdataimport comqf.bigdata.AlsCF.logger import comqf.bigdata.ItemCF.logger import comqf.bigdata.conf.Config import comqf.bigdata.transformer.{AlsCFModelData, ItemCFModelData} import comqf.bigdata.utils.{HBaseUtils, SparkUtils} import org.apache.log4j.{Level, Logger} import org.apache.spark.ml.recommendation.{ALS, ALSModel} import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession} import org.slf4j.LoggerFactory/*** 基于model的协同过滤:矩阵分解*/ object AlsCF {private val logger = LoggerFactory.getLogger(AlsCF.getClass.getSimpleName)def main(args: Array[String]): Unit = {//1. 准备工作//1.1 基本准备Logger.getLogger("org").setLevel(Level.WARN)val params:Config = Config.parseConfig(AlsCF, args)System.setProperty("HADOOP_USER_NAME", params.proxyUser)logger.warn("job is running, please wait for a moment")val spark:SparkSession = SparkUtils.getSparkSession(params.env, "alscf app")import spark.implicits._//1.2 将spark的运算进行checkpoint, 因为als迭代很深,DAG过深,RDD的lineage很长,造成内存溢出。spark.sparkContext.setCheckpointDir("/checkpoint/als")//2. 基础数据处理//2.1 获取到ItemCF的模型对象val modelData = AlsCFModelData(spark, params.env)//2.2 将原始数据转换(uid, aid, rate)val rateDF: DataFrame = modelData.getUserRatingData()logger.warn("rateDF ---------------------------------------->")rateDF.show()//2.3 将得到的总数居分为两部分:1. 测试数据, 2.训练数据val Array(training, test) = rateDF.randomSplit(Array(0.6, 0.4))training.cache()//2.4 ALS训练模型//2.4.1 als的配置设置val als = new ALS().setMaxIter(6) // 设置交替最小二乘法的迭代次数,次数越大猜测的值就越接近真实,但是资源消耗越大.setRegParam(0.01) // 防止过渡拟合.setUserCol("uid") // 用户了列.setItemCol("aid") // 物品列.setRatingCol("rate") // 评分列.setColdStartStrategy("drop") // 表示删除这些用户和物品的数据.setNonnegative(true) // 设置非负数.setImplicitPrefs(true) // 开启隐式反馈.setRank(10) // topk//2.4.2 训练出的模型val model: ALSModel = als.fit(training)training.unpersist()//2.5 预测出结果val predictDF: DataFrame = model.transform(test)logger.warn("predictDF ---------------------------------------->")predictDF.show()//2.6 为用户取topk:(id, recommendations)val recommendDF: DataFrame = model.recommendForAllUsers(params.topK)logger.warn("recommendDF ---------------------------------------->")recommendDF.show()//2.7 过滤掉自己不要的值val filterDF = modelData.recommendFilterAlsDataForAllUsers(rateDF, recommendDF)logger.warn("filterDF ---------------------------------------->")filterDF.show()//2.8 先在HDFS保存filterDF.write.mode(SaveMode.Overwrite).format("ORC").saveAsTable("dwb_news.als")//2.9 再在HBase保存val hBaseUtils: HBaseUtils = HBaseUtils(spark, params.hBaseZK, params.hBasePort)logger.warn("hBaseUtils ---------------------------------------->")val convertDF = modelData.recommendDataConvert(filterDF)val hfileRDD = modelData.alscf2RDD(convertDF)hBaseUtils.loadHFile2HBase(hfileRDD, params.tableName, params.hFileTmpPath)//释放资源spark.stop()logger.info("job successful")} }
3.3 Als的协同过滤算法的编写
package comqf.bigdata.transformerimport org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.functions.explodeimport scala.collection.mutable.ListBuffer/*** 为als算法模型提供模型数据*/ class AlsCFModelData(spark:SparkSession, env:String) extends ModelData(spark:SparkSession, env:String) {/*** 将推荐算法的结果转换RDD* 先要新建hbase的表* 行建:uid* 列簇:f1* 列明:als* 值:推荐的分值*/def alscf2RDD(convertDF: DataFrame) = {convertDF.rdd.sortBy(x => x.get(0).toString).flatMap(row => {//1. 获取到原始数据值val uid: String = row.get(0).toString/** [(sim_aid, pre_rate), (sim_aid, pre_rate), ...]* |* sim_aid:pre_rate, sim_aid:pre_rate, ...*/val alses: String = row.getAs[Seq[Row]](1).map(als => {als.getInt(0).toString + ":" + als.getDouble(1).formatted("%.4f")}).mkString(",")//2. 创建集合准备存放这个结果val listBuffer = new ListBuffer[(ImmutableBytesWritable, KeyValue)]//3. 存放val kv = new KeyValue(Bytes.toBytes(uid), Bytes.toBytes("f1"), Bytes.toBytes("alscf"), Bytes.toBytes(alses))//4. 将kv添加到listBufferlistBuffer.append((new ImmutableBytesWritable(), kv))listBuffer})}import spark.implicits._/*** 过滤als算法推荐的数据中我们不要的数据*/def recommendFilterAlsDataForAllUsers(rateDF: DataFrame, recommendDF: DataFrame) = {//1. recommendations数组转化为多列:(uid, recommendations, pre_aid, pre_rate)val transDF: DataFrame = recommendDF.withColumn("recommendations", explode($"recommendations")).withColumn("pre_aid", $"recommendations.aid").withColumn("pre_rate", $"recommendations.rating")//2. 创建虚表rateDF.createOrReplaceTempView("user_rating")transDF.createOrReplaceTempView("als_pred")//3. 过滤掉已有的行为spark.sql(s"""|select|cast(t1.uid as int) as uid,|cast(t1.pre_aid as int) as pre_aid,|cast(t1.pre_rate as double) as pre_rate|from als_pred as t1 left join user_rating as t2 on t1.pre_aid = t2.aid and t1.uid = t2.id|where t2.rate is not null|""".stripMargin)}/*** (1, 2, 11) --> (uid, (sim_aid, pre_rate))* (1, 3, 12)* ---> (uid, [(sim_aid, pre_rate), (sim_aid, pre_rate)])* e.g.* (1, [(2,11), (3,12), ...])*/def recommendDataConvert(recommendDF: DataFrame) = {import spark.implicits._recommendDF.rdd.map(row => (row.getInt(0), (row.getInt(1), row.getDouble(2)))).groupByKey().mapValues(sp => {var seq: Seq[(Int, Double)] = Seq[(Int, Double)]()sp.foreach(tp => {seq :+= (tp._1, tp._2)})seq.sortBy(_._2)}).toDF("uid", "recommendactions")}}object AlsCFModelData {def apply(spark: SparkSession, env: String): AlsCFModelData = new AlsCFModelData(spark, env) }
对于测试
start-all.sh
hive --service metastore &
hive --service hiveserver2 &
zkServer.sh start
start-hbase.sh
hbase shell
然后我们在hbase里面创建一个表
create 'recommend:alscf', 'f1'
把idea中的程序打成jar包,把jar包拖到虚拟机里面
依然还是把最长名字的jar包上传到虚拟机里面(这点要把之前同名的jar删除,然后再上传。其实也不用删除,同名jar也会直接覆盖)
${SPARK_HOME}/bin/spark-submit \
--jars /usr/local/hive/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar \
--conf spark.sql.hive.convertMetastoreParquet=false \
--conf spark.executor.heartbeatInterval=120s \
--conf spark.network.timeout=600s \
--conf spark.sql.catalogImplementation=hive \
--conf spark.yarn.submit.waitAppCompletion=false \
--name als \
--conf spark.task.cpus=1 \
--conf spark.executor.cores=4 \
--conf spark.sql.shuffle.partitions=50 \
--master yarn \
--deploy-mode cluster \
--driver-memory 512M \
--executor-memory 3G \
--num-executors 1 \
--class comqf.bigdata.AlsCF \
/data/jar/recommend-1.0-jar-with-dependencies.jar \
-e prod -x root -z 192.168.10.101 -p 2181 -f /tmp/hfile -t recommend:alscf -k 3
如果正常执行的话,就是hive中的dwb_news会有als表
4 ItemBaseFeature
4.1 原数据的清洗
这一步其实就是把文章的特征进行向量化,文章特征分为数值类型和非数值类型,把非数值类型转换为数值类型,进行特征向量化。
对于文章的基础信息的过滤,开启presto进行查询,就是基于ods_news.news_article
start-all.sh
hive --service hiveserver2 &
hive --serivice metastore &
#开启presto进行查询
launcher start
presto-cli --server qianfeng01:8090 --catalog hive
在presto中进行过滤查询,就是提取中文,把那些标签都过滤掉,以及查询图片的数量
sql语句具体怎么写的,我之前写过这个博客
手撕SQL之Presto查询_林柚晞的博客-CSDN博客_presto查询语句
create table dwb_news.article_base_info with(FORMAT='ORC')
as
with t1 as( -- 查询出所有的特征
select
article_id,
length(array_join(regexp_extract_all(content, '[\u4e00-\u9fa5]+'),';')) as article_num,
(length(content) - length(replace(content, '<img src=',''))) / length('<img src=') as img_num,
type_name,
date_diff('day', cast(format_datetime(date_parse(logday, '%Y%m%d'), 'yyyy-MM-dd') as date), cast(now() as date)) as pub_gap
from ods_news.news_article
where
logday < format_datetime(now(), 'yyyyMMdd')
and
article_id <> '' and content <> ''
)
select
article_id,
max(article_num) as article_num,
max(img_num) as img_num,
max(type_name) as type_name,
max(pub_gap) as pub_gap
from t1 group by article_id;
4.2 数据的逻辑处理
关于数值型类型的逻辑处理 (主要是欧式处理+最小归一化处理)
欧式处理那个公式我的markdown读不出来。
就拿一个小栗子来说
article_id | article_num | img_num
------------+-------------+---------+-----------
24801 | 1194 | 8
24802 | 1924 | 11
24803 | 1308 | 5
文字最大值:1924
图片最大值:11
文字最小值:1194
图片最小值:5
article_id =24801, article_num归一化处理:
0, img_num (8 - 5) / 11 - 5 = 3/8=0.375
article_id=24802 , article_num : 1924 - 1194 / 1924 - 1194 = 1, 11-5 / 11 - 5 = 1
article_id=24803 , article_num: 1308 - 1194 / 1924 - 1194 = 114 / 730 = 0.156, img_num = 0
article_id | article_num | img_num
------------+-------------+---------+-----------
24801 | 0| 0.375
24802 | 1| 1
24803 | 0.156| 0
项目四:使用SparkSQL开发的简易推荐系统相关推荐
- P2P理财项目四个月开发总结
目前项目情况 这个项目从元旦开始开发到现在已经有四个多月的时间了,上线期限也是一拖再拖,从整个项目开发情况来看造成项目延期的原因有很多,简单分析和总结一下这个项目的优缺点,以及在这个项 ...
- 项目四推荐系统源码(十二万字)
目录 背景指路 0 pom.xml 大概的项目框架 1.0 资源 1.1 sparkml2pmml.properties 1.2 core-site.xml 1.3 hdfs-site.xml 1.4 ...
- Android项目开发:简易计步器
Android项目开发:简易计步器的实现 本文将介绍基于Android的加速度传感器和陀螺仪传感器开发一个简易的计步器,基本原理:当检测到加速度发生改变时,使步数加一. activity_main.x ...
- 使用Java+SSM(Spring+SpringMVC+Mybatis)如何开发个性化音乐推荐系统 在线音乐推荐网站 基于用户、项目的协同过滤推荐算法实现MusicRecommendSystemWeb
使用Java+SSM(Spring+SpringMVC+Mybatis)如何开发个性化音乐推荐系统 在线音乐推荐网站 基于用户.项目的协同过滤推荐算法实现MusicRecommendSystemWeb ...
- 如何使用Java+SSM(Spring+SpringMVC+Mybatis)开发个性化新闻推荐系统 在线新闻推荐系统 基于用户项目协同过滤、内容、聚类、关联规则推荐算法实现WebNewsRSMEx
如何使用Java+SSM(Spring+SpringMVC+Mybatis)开发个性化新闻推荐系统 在线新闻推荐系统 基于用户项目协同过滤.内容.聚类.关联规则推荐算法实现WebNewsRSMEx 一 ...
- 使用Java+SSM(Spring+SpringMVC+Mybatis)如何开发个性化职位推荐系统 招聘推荐系统 基于用户、项目的协同过滤推荐算法实现WebPositionCFRS
使用Java+SSM(Spring+SpringMVC+Mybatis)如何开发个性化职位推荐系统 招聘推荐系统 基于用户.项目的协同过滤推荐算法实现WebPositionCFRS 一.项目简介 1. ...
- 如何使用Python+Django+Mysql开发个性化职位推荐系统 招聘网站推荐系统 基于用户、项目的协同过滤推荐算法 基于内容、聚类、关联规则推荐算法WebPositionCFRSPython
如何使用Python+Django+Mysql开发个性化职位推荐系统 招聘网站推荐系统 基于用户.项目的协同过滤推荐算法 基于内容.聚类.关联规则推荐算法WebPositionCFRSPython 一 ...
- 四、项目生命周期和开发生命周期
一.简介 我们知道项目是暂时性.临时性的工作,具有开始时间和结束时间.正如达尔文进化论与马克思主义哲学认为:世界上任何事物都有其产生.发展和灭亡的过程(自然生命周期).项目同样有其生命周期,即开始.计 ...
- 2018-2019-2-20175225 实验四《Android开发基础》实验报告
一.实验报告封面 课程:Java程序设计 班级:1752班 姓名:张元瑞 学号:20175225 指导教师:娄嘉鹏 实验日期:2019年5月14日 实验时间:13:45 - 21:00 实验序号:实验 ...
最新文章
- [kuangbin带你飞]专题五查并集
- matlab操作入门实验报告,matlab操作实验报告
- bzoj4639 博士的选取器
- python控制流水灯_B站智能防挡弹幕的一种python实现
- 【机器学习-西瓜书】三、逻辑回归(LR);线性判别分析(LDA)
- estimator 模型保存与使用
- 数据库优化案例——————某知名零售企业ERP系统
- 我的工程师进阶之路 - 2022更新
- Linux udhcpc/udhcpd 移植
- 【Python笔记】pyspark.sql库
- 配置 Tapestry
- 阿里云Centos镜像虚拟机安装方法
- ggcor |相关系数矩阵可视化
- kali安装步骤失败 选择并安装软件_手机软件安装失败?吉米柚教你几招!
- 工业4.0时代 个性化定制掀起制造业新篇章
- Mysql 启动与关闭
- 上海Cloudera Hadoop大数据培训:CCAH、CCP:DE
- x230黑苹果的一点经验
- 生不带来,死不带去;留与来者,照着镜去。人体节律【智力、情绪、体力】
- 世界语到汉语和英语的自动翻译试验