一、理论知识

1.1最主流的推荐算法:协同过滤

1.2 其他推荐算法的了解

1.3 推荐系统的大纲

1.4 环境说明:

本项目是基于用户画像生成的向量数据进行计算,至于我整个项目,我会尽力把前面两个项目补完整。项目0和项目一可能存在一点瑕疵,我有时间再完善一下(我真的是从头到尾搭建了一遍,路上还遇到不少bug)。

项目0单节点的虚拟机做大数据开发(四万字全)_林柚晞的博客-CSDN博客

项目一实时数仓数据采集_林柚晞的博客-CSDN博客

至于准实时数仓和用户画像的项目我会后面再整理更新。

二、部署

2.1 生成Itemcf(物品协同过滤)的源数据表

因为Itemcf是基于event表(里面有用户id和用户对网页的浏览、点击、点赞等行为)

下面这个表是专门存3天内  用户 的对网站发生点击行为的用户行为表。

start-all.sh

hive --service metastore &

hive --service hiveserver2 &

hive

启动presto

launcher start

presto-cli --server qianfeng01:8090 --catalog hive

show schemas;     #查看数据库

下面是在presto中建表

create table dwb_news.user_acticle_action comment 'user article action data' with(format='ORC')
as
with t1 as (
    select
    distinct_id as uid,
    article_id as aid,
    case when(event = 'AppPageView') then '点击'
    else action_type end as action,
    logday as action_date
    from ods_news.event
    where event in ('NewsAction', 'AppPageView')
    and logday >= format_datetime(cast('2022-03-21' as timestamp), 'yyyyMMdd')
    and logday < format_datetime(cast('2022-03-23' as timestamp), 'yyyyMMdd')
    and article_id <> ''
)
select uid, aid, action, max(action_date) as action_date from t1 where action <> ''
group by uid, aid, action;

select uid, aid, action, action_date from dwb_news.user_acticle_action limit 10;

我是在hive中查询的

 

其实我们需要前三列数据,转换为(uid,aid,score)

人为设定一个行为的数据,比如:点击:0.1, 分享:0.15, 评论:0.2, 收藏:0.25,点赞:0.3。用户对一个文章发生了行为的时候,就自动转化为评分,这些分数累加在一起就是1 。

基于评分,还有一个时间函数,降低权重,因为规定行为发生越久远,权重越低,用户的兴趣会降低。

在这里我们使用udf函数定义这个行为转换为评分。

框里面是不同数据类型的转换(从上到下),最后的DataFrame要和评分表关联。

2.2 udf函数:读取源数据制作评分表

打开idea新建一个maven工程

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.qf.bigdata</groupId><artifactId>recommend</artifactId><version>1.0</version><properties><scala.version>2.11.12</scala.version><play-json.version>2.3.9</play-json.version><maven-scala-plugin.version>2.10.1</maven-scala-plugin.version><scala-maven-plugin.version>3.2.0</scala-maven-plugin.version><maven-assembly-plugin.version>2.6</maven-assembly-plugin.version><spark.version>2.4.5</spark.version><scope.type>compile</scope.type><json.version>1.2.3</json.version><hbase.version>1.3.6</hbase.version><hadoop.version>2.8.1</hadoop.version><!--compile provided--></properties><dependencies><!--json 包--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${json.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version><scope>${scope.type}</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version><scope>${scope.type}</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>${spark.version}</version><scope>${scope.type}</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.11</artifactId><version>${spark.version}</version><scope>${scope.type}</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>${scope.type}</scope></dependency><dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.6</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version><scope>${scope.type}</scope></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>${scala.version}</version><scope>${scope.type}</scope></dependency><dependency><groupId>com.github.scopt</groupId><artifactId>scopt_2.11</artifactId><version>4.0.0-RC2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-avro_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-jdbc</artifactId><version>2.3.7</version><scope>${scope.type}</scope><exclusions><exclusion><groupId>javax.mail</groupId><artifactId>mail</artifactId></exclusion><exclusion><groupId>org.eclipse.jetty.aggregate</groupId><artifactId>*</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version><scope>${scope.type}</scope></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>${hbase.version}</version><scope>${scope.type}</scope></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>${hbase.version}</version><scope>${scope.type}</scope></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-hadoop2-compat</artifactId><version>${hbase.version}</version><scope>${scope.type}</scope></dependency><dependency><groupId>org.jpmml</groupId><artifactId>jpmml-sparkml</artifactId><version>1.5.9</version></dependency></dependencies><repositories><repository><id>alimaven</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url><releases><updatePolicy>never</updatePolicy></releases><snapshots><updatePolicy>never</updatePolicy></snapshots></repository></repositories><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>${maven-assembly-plugin.version}</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>${scala-maven-plugin.version}</version><executions><!-- 先编译scala,防止 cannot find symbol --><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-archetype-plugin</artifactId><version>2.2</version></plugin><plugin><groupId>org.codehaus.mojo</groupId><artifactId>build-helper-maven-plugin</artifactId><version>1.8</version><executions><!-- Add src/main/scala to eclipse build path --><execution><id>add-source</id><phase>generate-sources</phase><goals><goal>add-source</goal></goals><configuration><sources><source>src/main/java</source></sources></configuration></execution><!-- Add src/test/scala to eclipse build path --><execution><id>add-test-source</id><phase>generate-test-sources</phase><goals><goal>add-test-source</goal></goals><configuration><sources><source>src/test/java</source></sources></configuration></execution></executions></plugin></plugins></build>
</project>

并且把包导入。

使用spark编写的,所以这个框架也必须引入scala语法

在项目文件夹右击->add Framework support

我的idea是配置了scala的插件(file->settings->plugins->选择scala)

在main这里添加一个scala文件夹

在resource中要放hive-site.xml,hdfs-site.xml,core-site.xml(就是虚拟机中的hdfs和hive的配置文件)

下面的代码是在scala文件夹中编程

2.2.1  搭建项目基本框架

重点主要是搭建上面的config和spark工具类

配置类(用于通过类名来解析类,里面还配置了IP地址和主机号)

2.2.1.1 配置类

功能:  环境的参数、AppName类型匹配、解析参数

package comqf.bigdata.confimport org.slf4j.LoggerFactorycase class Config(env:String = "",hBaseZK:String = "",hBasePort:String = "2181",hFileTmpPath:String = "",tableName:String = "recommend:news-cf",irisPath:String = "",proxyUser:String = "root",topK:Int = 10)
object Config {private val logger = LoggerFactory.getLogger(Config.getClass.getSimpleName)/*** 解析参数* @param obj : 用于判断解析参数类的类型* @param args : 具体的参数值*/def parseConfig(obj: Object, args: Array[String]): Config = {//1. 获取到程序名称val programName = obj.getClass.getSimpleName.replace("$", "")//2. 类似于getopts命令//2.1 得到解析器val parser = new scopt.OptionParser[Config](s"ItemCF ${programName}") {head(programName, "v1.0")opt[String]('e', "env").required().action((x, config) => config.copy(env = x)).text("dev or prod")opt[String]('x', "proxyUser").optional().action((x, config) => config.copy(proxyUser = x)).text("proxy username")programName match {case "ItemCF" => {logger.info(s"ItemCF is staring ---------------------------->")opt[String]('z', "hBaseZK").required().action((x, config) => config.copy(hBaseZK = x)).text("hBaseZK")opt[String]('p', "hBasePort").required().action((x, config) => config.copy(hBasePort = x)).text("hBasePort")opt[String]('f', "hFileTmpPath").required().action((x, config) => config.copy(hFileTmpPath = x)).text("hFileTmpPath")opt[String]('t', "tableName").required().action((x, config) => config.copy(tableName = x)).text("tableName")opt[Int]('k', "topK").required().action((x, config) => config.copy(topK = x)).text("topK")}case _ =>}}//2.2 解析parser.parse(args, Config()) match {case Some(conf) => confcase None => {logger.error(s"cannot parse args")System.exit(-1)null}}}
}

2.2.1.2 Spark工具类

(主要是通过传入的env和appName判断模式是开发还是生产,模式不同其实配置参数也不同)

package comqf.bigdata.utilsimport org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactoryobject SparkUtils {private val logger = LoggerFactory.getLogger(SparkUtils.getClass.getSimpleName)/*** 获取到sparksession对象,并且解析两种模式,生产模式还是开发模式*/def getSparkSession(env: String, appName: String): SparkSession = {val conf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.sql.hive.metastore.version", "1.2.1").set("spark.sql.cbo.enabled", "true").set("spark.hadoop.dfs.client.block.write.replica-datanode-on=failure.enable", "true").set("spark.hadoop.dfs.client.block.write.replica-datanode-on=failure.policy", "NEVER")env match {case "prod" => {conf.setAppName(appName+"_prod")SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()}case "dev" => {conf.setMaster("local[*]").setAppName(appName+"_dev").set("spark.sql.hive.metastore.jars", "maven")SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()}case _ =>{logger.error("not match env")System.exit(-1)null}}}
}

2.2.2 转换

2.2.2.1设置一个枚举装用户行为的常量

不知道怎么回事那个思维导图方向反了O(∩_∩)O哈哈~

package comqf.bigdata.constantobject Action extends Enumeration {type Action = Valueval CLICK = Value("点击")val SHARE = Value("分享")val COMMENT = Value("评论")val COLLECT = Value("收藏")val LIKE = Value("点赞")//用于打印常量的函数def showAll = this.values.foreach(println)//用于根据名称查询枚举的值def withNameOpt(name:String):Option[Value] = this.values.find(_.toString == name)
}

2.2.2.2设置一个常量类专门存放常量的

package comqf.bigdata.constant
//常量类:专门存放常量的
object Constant {//时间函数中的有效期,100天咯val ARTICLE_AGING_TIME = 100
}

2.2.2.3设置一个时间类

(具体就获取一个有效时长)

package comqf.bigdata.utilsimport java.text.SimpleDateFormat
import java.util.{Calendar, Date}object DateUtils {//字符串转datedef string2date(date:String):Date = {val fmt = new SimpleDateFormat("yyyyMMdd")fmt.parse(date)}//date距离今时间def diffDay2Now(date:String) = {val now :Calendar = Calendar.getInstance()   //现在的时间val today:Long = now.getTimeInMillis   //现在的毫秒数val current :Long = string2date(date).getTimeval between :Long = (today -current) /(1000 * 3600 * 24)   //相差天数Integer.parseInt(String.valueOf(between))}}

2.2.2.3设置一个RateUDF

还有个函数是为了行为值*时间值

package comqf.bigdata.udfs
import comqf.bigdata.utils
import comqf.bigdata.constant.{Action, Constant}
import comqf.bigdata.utils.DateUtilsobject RateUDF {//行为的权重def  getActionWeight(action:String) = {Action.withNameOpt(action).getOrElse() match {case Action.CLICK => 0.1fcase Action.LIKE =>0.15fcase Action.COLLECT =>0.2fcase Action.SHARE => 0.25fcase Action.COMMENT => 0.3fcase _ => 0.0f}}//这个函数无论传什么值进去都会生成一个0-1的数private def sigmoid(d:Double):Double = 1 / (1+Math.exp(1.0- d))def getDateWeight(date: String) = {try {//1. 获取(数据价值时间范围-数据行为时间距今的时间)的差var interval:Int = Constant.ARTICLE_AGING_TIME - DateUtils.diffDay2Now(date)if (interval < 0) interval = 1 // 表示行为发生的时间已经超过了数据最有价值的时间val x: Double = interval.toDouble - 7sigmoid(x * 0.8).toFloat}catch {case e:Exception => e.printStackTrace()0.0f}}def action2rate(action:String,date:String) :Float = {//行为权重 * 时间val rate = getActionWeight(action)* getDateWeight(date)return rate}}

2.2.3设置一个ModelData

这是一个训练模型(协同过滤的父类

package comqf.bigdata.transformer
import comqf.bigdata.udfs.RateUDF
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.slf4j.LoggerFactoryclass ModelData(spark:SparkSession,env:String) {private val logger = LoggerFactory.getLogger(ModelData.getClass.getSimpleName)// 注册udf函数spark.udf.register("action2rate", RateUDF.action2rate _)/*** 生成评分表*/def generateEachAction() = {spark.sql(//1. 读取原始数据并建立了虚表s"""|select uid, aid, action, action_date from dwb_news.user_acticle_action|""".stripMargin).createOrReplaceTempView("source_data")//2. 计算spark.sql(s"""|select uid, aid, action, action_date,|action2rate(action, action_date) as rate|from source_data|""".stripMargin).createOrReplaceTempView("source_data_rate")}/*** 将原始数据转换为(uid, aid, rate)* rete:一个用户对一个文章的所有的行为的评分之和* 原始数据:* uid  |  aid  | action | action_date* ------+-------+--------+-------------* 3713 | 21957 | 点赞   | 20211225* 3187 | 3976  | 收藏   | 20211225* 2554 | 14202 | 分享   | 20211225* 1937 | 18172 | 点击   | 20211225* 4500 | 23407 | 分享   | 20211225** 处理之后:* uid  |  aid  |  rate* ------+-------+------* 3713 | 21957 | 13* 3187 | 3976 | 14*/def getUserRatingData():DataFrame = {//1. 生成每个评分的评分表generateEachAction()//2. 计算评分spark.sql(s"""|select|cast(uid as bigint) as uid,|cast(aid as bigint) as aid,|cast(sum(rate) as double) as rate|from source_data_rate group by uid, aid order by uid|""".stripMargin)}/*** 关联训练和相似的dataframe,从而获取到文章的相似度的评分*/def joinRateDFAndSimDF(trainning: Dataset[Row], simDF:Dataset[Row]) = {//1. 创建评分表trainning.createOrReplaceTempView("user_rate")simDF.createOrReplaceTempView("sim_item")//2. 执行sqlspark.sql(s"""|select|t1.uid, t1.aid, t1.rate,|t2.aid as aid2, t2.sim_aid, t2.sim, t1.rate * t2.sim as rsp|from user_rate as t1 left join sim_item as t2 on t1.aid = t2.aid|where t2.sim is not null|""".stripMargin)}/*** 为用户推荐topk的内容,同时多虑已经有的行为的内容*/def recommendAllUser(joinDF: DataFrame, topK: Int) = {joinDF.createOrReplaceTempView("rate_sim")spark.sql(s"""|with t1 as(-- 用户对于相似文章的预测评分:预测值|select uid, sim_aid, sum(rsp) / sum(rate) as pre_rate|from rate_sim group by uid, sim_aid|),|t2 as ( -- 剔除一部分已经阅读|select t1.* from t1|left join user_rate as ur on t1.uid = ur.uid and t1.sim_aid = ur.aid|where ur.rate is not null|),|t3 as ( -- 排名|select|uid, sim_aid, pre_rate,|row_number() over(partition by uid order by pre_rate desc) as rank|from t2|)|select|cast(uid as int) as uid,|cast(sim_aid as int) as sim_aid,|cast(pre_rate as double) as pre_rate|from t3 where rank <= ${topK}|""".stripMargin)}
}object ModelData {def apply(spark: SparkSession, env: String): ModelData = new ModelData(spark, env)
}

2.2.4设置一个物品的协同过滤

功能主要是

(1)获取配置参数

(2)读取数据,并且数据类型转换

(3)关联训练数据与测试数据关联

package comqf.bigdataimport comqf.bigdata.conf.Config
import comqf.bigdata.transformer.ItemCFModelData
import comqf.bigdata.utils.{HBaseUtils, SparkUtils}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.slf4j.LoggerFactory/*** 基于物品的协同过滤*/
object ItemCF {private val logger = LoggerFactory.getLogger(ItemCF.getClass.getSimpleName)def main(args: Array[String]): Unit = {//1. 准备工作Logger.getLogger("org").setLevel(Level.WARN)val params:Config = Config.parseConfig(ItemCF, args)System.setProperty("HADOOP_USER_NAME", params.proxyUser)logger.warn("job is running, please wait for a moment")val spark:SparkSession = SparkUtils.getSparkSession(params.env, "itemcf app")import spark.implicits._//2. 基础数据处理//2.1 获取到ItemCF的模型对象val modelData = ItemCFModelData(spark, params.env)//2.2 将原始的数据转换为(uid, aid, rate)val rateDF:DataFrame = modelData.getUserRatingData()logger.warn("rateDF ---------------------------------------->")rateDF.show()//2.3 将得到的数据分为两部分:1. 测试数据; 2. 训练数据val Array(training, test) = rateDF.randomSplit(Array(0.6, 0.4))training.cache()//2.4 将dataframe转换坐标矩阵:源数据的矩阵val rateMatrix = modelData.rateDF2Matrix(training)//2.5 求相似矩阵——底层就是利用了求余弦相似度val simMatrix: CoordinateMatrix = rateMatrix.toRowMatrix().columnSimilarities()//2.6 相似度矩阵对象转换dataframeval simDF = modelData.simMatrix2DF(simMatrix)logger.warn("simDF ---------------------------------------->")simDF.show()//2.7 将评分的训练用的df和相似的df关联起来val joinDF = modelData.joinRateDFAndSimDF(training, simDF)logger.warn("joinDF ---------------------------------------->")joinDF.show()training.unpersist()joinDF.cache()//2.8 使用测试数据和之前的散点数据对文章进行预测评分val predictDF = modelData.predictTestData(joinDF, test)logger.warn("predictDF ---------------------------------------->")predictDF.show()joinDF.unpersist()//2.9 计算推荐效果好不好//2.9.1 创建评估器val evaluator = new RegressionEvaluator().setLabelCol("rate") // 真实值.setPredictionCol("pre_rate") // 预测值//2.9.2 计算误差//    val rmse: Double = evaluator.setMetricName("rmse").evaluate(predictDF)//    logger.warn(s"itemcf rmse:${rmse}")//2.10 取用户topkval recommendDF = modelData.recommendAllUser(joinDF, params.topK)logger.warn("recommendDF ---------------------------------------->")recommendDF.show()//2.11 将结果先在HDFS存放一份,然后再存HBase,因为我自己已经生成一份表了,然后就自己创建了一个新表名recommendDF.write.mode(SaveMode.Overwrite).format("ORC").saveAsTable("dwb_news.itemcf1")//2.12 将数据保存到HBase//2.12.1 获取到hbase工具类val hBaseUtils: HBaseUtils = HBaseUtils(spark, params.hBaseZK, params.hBasePort)logger.warn("hBaseUtils ---------------------------------------->")//2.12.2 df --> rddval convertDF = modelData.recommendDataConvert(recommendDF)val hfileRDD = modelData.itemcf2RDD(convertDF)//2.12.3 保存到hbasehBaseUtils.loadHFile2HBase(hfileRDD,params.tableName,params.hFileTmpPath)//释放资源spark.stop()logger.info("job successful")}
}

2.2.5设置一个基于物品的协同过滤策略的模型数据类

ItemCFModelData是继承ModelData的一个协同过滤算法,这个类是对ItemCF中的一些转换内容函数的定义

package comqf.bigdata.transformerimport comqf.bigdata.utils.HBaseUtils
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.slf4j.LoggerFactoryimport scala.collection.mutable.ListBuffer/*** 基于物品的协同过滤策略的模型数据类*/
class ItemCFModelData(spark:SparkSession, env:String) extends ModelData(spark:SparkSession, env:String) {/*** 将推荐算法结果转换为RDD* 需要先建立hbase的表* 行建:uid* 列簇:f1* 列名:itemcf* 值:推荐的分值*/def itemcf2RDD(convertDF: DataFrame) = {convertDF.rdd.sortBy(x => x.get(0).toString).flatMap(row => {//1. 获取到原始数据值val uid: String = row.get(0).toString/** [(sim_aid, pre_rate), (sim_aid, pre_rate), ...]* |* sim_aid:pre_rate, sim_aid:pre_rate, ...*/val items: String = row.getAs[Seq[Row]](1).map(item => {item.getInt(0).toString + ":" + item.getDouble(1).formatted("%.4f")}).mkString(",")//2. 创建集合准备存放这个结果val listBuffer = new ListBuffer[(ImmutableBytesWritable, KeyValue)]//3. 存放val kv = new KeyValue(Bytes.toBytes(uid), Bytes.toBytes("f1"), Bytes.toBytes("itemcf"), Bytes.toBytes(items))//4. 将kv添加到listBufferlistBuffer.append((new ImmutableBytesWritable(), kv))listBuffer})}/*** (1, 2, 11) --> (uid, (sim_aid, pre_rate))* (1, 3, 12)*             ---> (uid, [(sim_aid, pre_rate), (sim_aid, pre_rate)])*             e.g.*             (1, [(2,11), (3,12), ...])*/def recommendDataConvert(recommendDF: DataFrame) = {import spark.implicits._recommendDF.rdd.map(row => (row.getInt(0), (row.getInt(1), row.getDouble(2)))).groupByKey().mapValues(sp => {var seq: Seq[(Int, Double)] = Seq[(Int, Double)]()sp.foreach(tp => {seq :+= (tp._1, tp._2)})seq.sortBy(_._2)}).toDF("uid", "recommendactions")}private val logger = LoggerFactory.getLogger(ItemCFModelData.getClass.getSimpleName)/*** 通过测试数据预测结果*/def predictTestData(joinDF: DataFrame, test: Dataset[Row]) = {//1. 建立虚表joinDF.createOrReplaceTempView("rate_sim")test.createOrReplaceTempView("test_data")//2. 执行sql/** rsp:用户对于与原文中相似的文章的评分* sim:用户对于原文章的评分*/spark.sql(s"""|with t1 as( -- 用户对于相似文章的预测评分:预测值|select uid, sim_aid, sum(rsp) / sum(rate) as pre_rate|from rate_sim group by uid, sim_aid|),|t2 as ( -- 用户对于原文中的评分:真实值|select uid, aid, rate from test_data|)|select t2.*, t1.pre_rate from t2 inner join t1 on t2.aid = t1.sim_aid and t1.uid = t2.uid|where t1.pre_rate is not null|""".stripMargin)}/*** 将矩阵转换为一个Dataframe*/def simMatrix2DF(simMatrix: CoordinateMatrix) = {//1. 获取到矩阵内部的数据:RDDval transformerRDD: RDD[(String, String, Double)] = simMatrix.entries.map {case MatrixEntry(row: Long, column: Long, sim: Double) => (row.toString, column.toString, sim)}//2. rdd-->dataframeval simDF: DataFrame = spark.createDataFrame(transformerRDD).toDF("aid", "sim_aid", "sim")//3. 合并结果simDF.union(simDF.select("aid", "sim_aid", "sim"))}/*** 将评分数据表转化为评分矩阵** uid aid rate                    uid/aid   1     2     3* 1   1   0.8                     1        0.8   0.1* 1   2   0.1                     2        0.6* 2   1   0.6          ->         3        0.8* 3   1   0.8                     4                    0.25* 4   3   0.25*/def rateDF2Matrix(df: DataFrame) = {//1. Row --> MatrixEntryval matrixRDD: RDD[MatrixEntry] = df.rdd.map {case Row(uid: Long, aid: Long, rate: Double) => MatrixEntry(uid, aid, rate)}//2. 返回分布式矩阵new CoordinateMatrix(matrixRDD)}}object ItemCFModelData {def apply(spark: SparkSession, env: String): ItemCFModelData = new ItemCFModelData(spark, env)
}

2.2.6HBaseUtils

package comqf.bigdata.utilsimport org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactoryobject SparkUtils {private val logger = LoggerFactory.getLogger(SparkUtils.getClass.getSimpleName)/*** 获取到sparksession对象,并且解析两种模式,生产模式还是开发模式*/def getSparkSession(env: String, appName: String): SparkSession = {val conf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.sql.hive.metastore.version", "1.2.1").set("spark.sql.cbo.enabled", "true").set("spark.hadoop.dfs.client.block.write.replica-datanode-on=failure.enable", "true").set("spark.hadoop.dfs.client.block.write.replica-datanode-on=failure.policy", "NEVER")env match {case "prod" => {conf.setAppName(appName+"_prod")SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()}case "dev" => {conf.setMaster("local[*]").setAppName(appName+"_dev").set("spark.sql.hive.metastore.jars", "maven")SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()}case _ =>{logger.error("not match env")System.exit(-1)null}}}
}

2.3测试

写到上面就可以完全读取数据并且可以在hbase中创建表

(1)把项目打成jar

如果打包还存在报错,就要查看文件中的包名是否正确!

(2)打开target文件夹后面名字最长的jar包上传到虚拟机中

把上面这个Jar包拖到虚拟机里面

(3)虚拟机

start-all.sh

hive --service hiveserver2 &

hive --service metastore &

zkServer.sh start

start-hbase.sh

hbase shell

#在hbase中

create_namespace 'recommend'

create 'recommend:itemcf', 'f1'

(4) 运行spark程序

${SPARK_HOME}/bin/spark-submit \
--jars /usr/local/hive/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar \
--conf spark.sql.hive.convertMetastoreParquet=false \
--conf spark.executor.heartbeatInterval=120s \
--conf spark.network.timeout=600s \
--conf spark.sql.catalogImplementation=hive \
--conf spark.yarn.submit.waitAppCompletion=false \
--name itemcf \
--conf spark.task.cpus=1 \
--conf spark.executor.cores=4 \
--conf spark.sql.shuffle.partitions=50 \
--master yarn \
--deploy-mode cluster \
--driver-memory 512M \
--executor-memory 3G \
--num-executors 1 \
--class comqf.bigdata.ItemCF \
/data/jar/recommend-1.0-jar-with-dependencies.jar \
-e prod -x root -z 192.168.10.101 -p 2181 -f /tmp/hfile -t recommend:itemcf -k 3

就是我想说明我虚拟机总是有各种各样的bug,我重启了好几次

执行完之后hive会有一个itemcf表(上面那个截图是好久之前做的,我这次hive没辙)

3、ALSCF

  1. dwb_news.user_article_action加载原始数据,然后转换为评分

  2. 使用spark mllib的als算法,求解出用户向量矩阵和物品向量的矩阵

  3. 评估模型:RMSE

  4. 给用户生成推荐列表

  5. 存回hbase,recommend:alscf

3.1配置程序的修改,其实还是通过程序名来解析程序

package comqf.bigdata.confimport org.slf4j.LoggerFactorycase class Config(env:String = "",hBaseZK:String = "",hBasePort:String = "2181",hFileTmpPath:String = "",tableName:String = "recommend:news-cf",irisPath:String = "",proxyUser:String = "root",topK:Int = 10)
object Config {private val logger = LoggerFactory.getLogger(Config.getClass.getSimpleName)/*** 解析参数* @param obj : 用于判断解析参数类的类型* @param args : 具体的参数值*/def parseConfig(obj: Object, args: Array[String]): Config = {//1. 获取到程序名称val programName = obj.getClass.getSimpleName.replace("$", "")//2. 类似于getopts命令//2.1 得到解析器val parser = new scopt.OptionParser[Config](s"ItemCF ${programName}") {head(programName, "v1.0")opt[String]('e', "env").required().action((x, config) => config.copy(env = x)).text("dev or prod")opt[String]('x', "proxyUser").optional().action((x, config) => config.copy(proxyUser = x)).text("proxy username")programName match {case "ItemCF" => {logger.info(s"ItemCF is staring ---------------------------->")opt[String]('z', "hBaseZK").required().action((x, config) => config.copy(hBaseZK = x)).text("hBaseZK")opt[String]('p', "hBasePort").required().action((x, config) => config.copy(hBasePort = x)).text("hBasePort")opt[String]('f', "hFileTmpPath").required().action((x, config) => config.copy(hFileTmpPath = x)).text("hFileTmpPath")opt[String]('t', "tableName").required().action((x, config) => config.copy(tableName = x)).text("tableName")opt[Int]('k', "topK").required().action((x, config) => config.copy(topK = x)).text("topK")}case "AlsCF" =>logger.info(s"AlsCF is staring ---------------------------->")opt[String]('z', "hBaseZK").required().action((x, config) => config.copy(hBaseZK = x)).text("hBaseZK")opt[String]('p', "hBasePort").required().action((x, config) => config.copy(hBasePort = x)).text("hBasePort")opt[String]('f', "hFileTmpPath").required().action((x, config) => config.copy(hFileTmpPath = x)).text("hFileTmpPath")opt[String]('t', "tableName").required().action((x, config) => config.copy(tableName = x)).text("tableName")opt[Int]('k', "topK").required().action((x, config) => config.copy(topK = x)).text("topK")case _ =>}}//2.2 解析parser.parse(args, Config()) match {case Some(conf) => confcase None => {logger.error(s"cannot parse args")System.exit(-1)null}}}
}

3.2 AlsCF

package comqf.bigdataimport comqf.bigdata.AlsCF.logger
import comqf.bigdata.ItemCF.logger
import comqf.bigdata.conf.Config
import comqf.bigdata.transformer.{AlsCFModelData, ItemCFModelData}
import comqf.bigdata.utils.{HBaseUtils, SparkUtils}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.recommendation.{ALS, ALSModel}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
import org.slf4j.LoggerFactory/*** 基于model的协同过滤:矩阵分解*/
object AlsCF {private val logger = LoggerFactory.getLogger(AlsCF.getClass.getSimpleName)def main(args: Array[String]): Unit = {//1. 准备工作//1.1 基本准备Logger.getLogger("org").setLevel(Level.WARN)val params:Config = Config.parseConfig(AlsCF, args)System.setProperty("HADOOP_USER_NAME", params.proxyUser)logger.warn("job is running, please wait for a moment")val spark:SparkSession = SparkUtils.getSparkSession(params.env, "alscf app")import spark.implicits._//1.2 将spark的运算进行checkpoint, 因为als迭代很深,DAG过深,RDD的lineage很长,造成内存溢出。spark.sparkContext.setCheckpointDir("/checkpoint/als")//2. 基础数据处理//2.1 获取到ItemCF的模型对象val modelData = AlsCFModelData(spark, params.env)//2.2 将原始数据转换(uid, aid, rate)val rateDF: DataFrame = modelData.getUserRatingData()logger.warn("rateDF ---------------------------------------->")rateDF.show()//2.3 将得到的总数居分为两部分:1. 测试数据, 2.训练数据val Array(training, test) = rateDF.randomSplit(Array(0.6, 0.4))training.cache()//2.4 ALS训练模型//2.4.1 als的配置设置val als = new ALS().setMaxIter(6) // 设置交替最小二乘法的迭代次数,次数越大猜测的值就越接近真实,但是资源消耗越大.setRegParam(0.01) // 防止过渡拟合.setUserCol("uid") // 用户了列.setItemCol("aid") // 物品列.setRatingCol("rate") // 评分列.setColdStartStrategy("drop") // 表示删除这些用户和物品的数据.setNonnegative(true) // 设置非负数.setImplicitPrefs(true) // 开启隐式反馈.setRank(10) // topk//2.4.2 训练出的模型val model: ALSModel = als.fit(training)training.unpersist()//2.5 预测出结果val predictDF: DataFrame = model.transform(test)logger.warn("predictDF ---------------------------------------->")predictDF.show()//2.6 为用户取topk:(id, recommendations)val recommendDF: DataFrame = model.recommendForAllUsers(params.topK)logger.warn("recommendDF ---------------------------------------->")recommendDF.show()//2.7 过滤掉自己不要的值val filterDF = modelData.recommendFilterAlsDataForAllUsers(rateDF, recommendDF)logger.warn("filterDF ---------------------------------------->")filterDF.show()//2.8 先在HDFS保存filterDF.write.mode(SaveMode.Overwrite).format("ORC").saveAsTable("dwb_news.als")//2.9 再在HBase保存val hBaseUtils: HBaseUtils = HBaseUtils(spark, params.hBaseZK, params.hBasePort)logger.warn("hBaseUtils ---------------------------------------->")val convertDF = modelData.recommendDataConvert(filterDF)val hfileRDD = modelData.alscf2RDD(convertDF)hBaseUtils.loadHFile2HBase(hfileRDD, params.tableName, params.hFileTmpPath)//释放资源spark.stop()logger.info("job successful")}
}

3.3 Als的协同过滤算法的编写

package comqf.bigdata.transformerimport org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions.explodeimport scala.collection.mutable.ListBuffer/*** 为als算法模型提供模型数据*/
class AlsCFModelData(spark:SparkSession, env:String) extends ModelData(spark:SparkSession, env:String) {/*** 将推荐算法的结果转换RDD* 先要新建hbase的表* 行建:uid* 列簇:f1* 列明:als* 值:推荐的分值*/def alscf2RDD(convertDF: DataFrame) = {convertDF.rdd.sortBy(x => x.get(0).toString).flatMap(row => {//1. 获取到原始数据值val uid: String = row.get(0).toString/** [(sim_aid, pre_rate), (sim_aid, pre_rate), ...]* |* sim_aid:pre_rate, sim_aid:pre_rate, ...*/val alses: String = row.getAs[Seq[Row]](1).map(als => {als.getInt(0).toString + ":" + als.getDouble(1).formatted("%.4f")}).mkString(",")//2. 创建集合准备存放这个结果val listBuffer = new ListBuffer[(ImmutableBytesWritable, KeyValue)]//3. 存放val kv = new KeyValue(Bytes.toBytes(uid), Bytes.toBytes("f1"), Bytes.toBytes("alscf"), Bytes.toBytes(alses))//4. 将kv添加到listBufferlistBuffer.append((new ImmutableBytesWritable(), kv))listBuffer})}import spark.implicits._/*** 过滤als算法推荐的数据中我们不要的数据*/def recommendFilterAlsDataForAllUsers(rateDF: DataFrame, recommendDF: DataFrame) = {//1. recommendations数组转化为多列:(uid, recommendations, pre_aid, pre_rate)val transDF: DataFrame = recommendDF.withColumn("recommendations", explode($"recommendations")).withColumn("pre_aid", $"recommendations.aid").withColumn("pre_rate", $"recommendations.rating")//2. 创建虚表rateDF.createOrReplaceTempView("user_rating")transDF.createOrReplaceTempView("als_pred")//3. 过滤掉已有的行为spark.sql(s"""|select|cast(t1.uid as int) as uid,|cast(t1.pre_aid as int) as pre_aid,|cast(t1.pre_rate as double) as pre_rate|from als_pred as t1 left join user_rating as t2 on t1.pre_aid = t2.aid and t1.uid = t2.id|where t2.rate is not null|""".stripMargin)}/*** (1, 2, 11) --> (uid, (sim_aid, pre_rate))* (1, 3, 12)*             ---> (uid, [(sim_aid, pre_rate), (sim_aid, pre_rate)])*             e.g.*             (1, [(2,11), (3,12), ...])*/def recommendDataConvert(recommendDF: DataFrame) = {import spark.implicits._recommendDF.rdd.map(row => (row.getInt(0), (row.getInt(1), row.getDouble(2)))).groupByKey().mapValues(sp => {var seq: Seq[(Int, Double)] = Seq[(Int, Double)]()sp.foreach(tp => {seq :+= (tp._1, tp._2)})seq.sortBy(_._2)}).toDF("uid", "recommendactions")}}object AlsCFModelData {def apply(spark: SparkSession, env: String): AlsCFModelData = new AlsCFModelData(spark, env)
}

对于测试

start-all.sh

hive --service metastore &

hive --service hiveserver2 &

zkServer.sh start

start-hbase.sh

hbase shell

然后我们在hbase里面创建一个表

create 'recommend:alscf', 'f1'

把idea中的程序打成jar包,把jar包拖到虚拟机里面

依然还是把最长名字的jar包上传到虚拟机里面(这点要把之前同名的jar删除,然后再上传。其实也不用删除,同名jar也会直接覆盖)

${SPARK_HOME}/bin/spark-submit \
--jars /usr/local/hive/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar \
--conf spark.sql.hive.convertMetastoreParquet=false \
--conf spark.executor.heartbeatInterval=120s \
--conf spark.network.timeout=600s \
--conf spark.sql.catalogImplementation=hive \
--conf spark.yarn.submit.waitAppCompletion=false \
--name als \
--conf spark.task.cpus=1 \
--conf spark.executor.cores=4 \
--conf spark.sql.shuffle.partitions=50 \
--master yarn \
--deploy-mode cluster \
--driver-memory 512M \
--executor-memory 3G \
--num-executors 1 \
--class comqf.bigdata.AlsCF \
/data/jar/recommend-1.0-jar-with-dependencies.jar \
-e prod -x root -z 192.168.10.101 -p 2181 -f /tmp/hfile -t recommend:alscf -k 3

如果正常执行的话,就是hive中的dwb_news会有als表

4 ItemBaseFeature

4.1 原数据的清洗

这一步其实就是把文章的特征进行向量化,文章特征分为数值类型和非数值类型,把非数值类型转换为数值类型,进行特征向量化。

对于文章的基础信息的过滤,开启presto进行查询,就是基于ods_news.news_article

start-all.sh

hive --service hiveserver2 &

hive --serivice metastore &

#开启presto进行查询

launcher start

presto-cli --server qianfeng01:8090 --catalog hive

在presto中进行过滤查询,就是提取中文,把那些标签都过滤掉,以及查询图片的数量

sql语句具体怎么写的,我之前写过这个博客

手撕SQL之Presto查询_林柚晞的博客-CSDN博客_presto查询语句

create table dwb_news.article_base_info with(FORMAT='ORC')
as
with t1 as( -- 查询出所有的特征
    select
    article_id,
    length(array_join(regexp_extract_all(content, '[\u4e00-\u9fa5]+'),';')) as article_num,
    (length(content) - length(replace(content, '<img src=',''))) / length('<img src=') as img_num,
    type_name,
    date_diff('day', cast(format_datetime(date_parse(logday, '%Y%m%d'), 'yyyy-MM-dd') as date), cast(now() as date)) as pub_gap
    from ods_news.news_article
    where
    logday < format_datetime(now(), 'yyyyMMdd')
    and
    article_id <> '' and content <> ''
)
select
article_id,
max(article_num) as article_num,
max(img_num) as img_num,
max(type_name) as type_name,
max(pub_gap) as pub_gap
from t1 group by article_id;

4.2 数据的逻辑处理

关于数值型类型的逻辑处理 (主要是欧式处理+最小归一化处理)

欧式处理那个公式我的markdown读不出来。

就拿一个小栗子来说

article_id | article_num | img_num

------------+-------------+---------+-----------

24801 | 1194 | 8

24802 | 1924 | 11

24803 | 1308 | 5

文字最大值:1924

图片最大值:11

文字最小值:1194

图片最小值:5

article_id =24801, article_num归一化处理:

0, img_num (8 - 5) / 11 - 5 = 3/8=0.375

article_id=24802 , article_num : 1924 - 1194 / 1924 - 1194 = 1, 11-5 / 11 - 5 = 1

article_id=24803 , article_num: 1308 - 1194 / 1924 - 1194 = 114 / 730 = 0.156, img_num = 0

article_id | article_num | img_num

------------+-------------+---------+-----------

24801 | 0| 0.375

24802 | 1| 1

24803 | 0.156| 0

项目四:使用SparkSQL开发的简易推荐系统相关推荐

  1. P2P理财项目四个月开发总结

    目前项目情况         这个项目从元旦开始开发到现在已经有四个多月的时间了,上线期限也是一拖再拖,从整个项目开发情况来看造成项目延期的原因有很多,简单分析和总结一下这个项目的优缺点,以及在这个项 ...

  2. 项目四推荐系统源码(十二万字)

    目录 背景指路 0 pom.xml 大概的项目框架 1.0 资源 1.1 sparkml2pmml.properties 1.2 core-site.xml 1.3 hdfs-site.xml 1.4 ...

  3. Android项目开发:简易计步器

    Android项目开发:简易计步器的实现 本文将介绍基于Android的加速度传感器和陀螺仪传感器开发一个简易的计步器,基本原理:当检测到加速度发生改变时,使步数加一. activity_main.x ...

  4. 使用Java+SSM(Spring+SpringMVC+Mybatis)如何开发个性化音乐推荐系统 在线音乐推荐网站 基于用户、项目的协同过滤推荐算法实现MusicRecommendSystemWeb

    使用Java+SSM(Spring+SpringMVC+Mybatis)如何开发个性化音乐推荐系统 在线音乐推荐网站 基于用户.项目的协同过滤推荐算法实现MusicRecommendSystemWeb ...

  5. 如何使用Java+SSM(Spring+SpringMVC+Mybatis)开发个性化新闻推荐系统 在线新闻推荐系统 基于用户项目协同过滤、内容、聚类、关联规则推荐算法实现WebNewsRSMEx

    如何使用Java+SSM(Spring+SpringMVC+Mybatis)开发个性化新闻推荐系统 在线新闻推荐系统 基于用户项目协同过滤.内容.聚类.关联规则推荐算法实现WebNewsRSMEx 一 ...

  6. 使用Java+SSM(Spring+SpringMVC+Mybatis)如何开发个性化职位推荐系统 招聘推荐系统 基于用户、项目的协同过滤推荐算法实现WebPositionCFRS

    使用Java+SSM(Spring+SpringMVC+Mybatis)如何开发个性化职位推荐系统 招聘推荐系统 基于用户.项目的协同过滤推荐算法实现WebPositionCFRS 一.项目简介 1. ...

  7. 如何使用Python+Django+Mysql开发个性化职位推荐系统 招聘网站推荐系统 基于用户、项目的协同过滤推荐算法 基于内容、聚类、关联规则推荐算法WebPositionCFRSPython

    如何使用Python+Django+Mysql开发个性化职位推荐系统 招聘网站推荐系统 基于用户.项目的协同过滤推荐算法 基于内容.聚类.关联规则推荐算法WebPositionCFRSPython 一 ...

  8. 四、项目生命周期和开发生命周期

    一.简介 我们知道项目是暂时性.临时性的工作,具有开始时间和结束时间.正如达尔文进化论与马克思主义哲学认为:世界上任何事物都有其产生.发展和灭亡的过程(自然生命周期).项目同样有其生命周期,即开始.计 ...

  9. 2018-2019-2-20175225 实验四《Android开发基础》实验报告

    一.实验报告封面 课程:Java程序设计 班级:1752班 姓名:张元瑞 学号:20175225 指导教师:娄嘉鹏 实验日期:2019年5月14日 实验时间:13:45 - 21:00 实验序号:实验 ...

最新文章

  1. [kuangbin带你飞]专题五查并集
  2. matlab操作入门实验报告,matlab操作实验报告
  3. bzoj4639 博士的选取器
  4. python控制流水灯_B站智能防挡弹幕的一种python实现
  5. 【机器学习-西瓜书】三、逻辑回归(LR);线性判别分析(LDA)
  6. estimator 模型保存与使用
  7. 数据库优化案例——————某知名零售企业ERP系统
  8. 我的工程师进阶之路 - 2022更新
  9. Linux udhcpc/udhcpd 移植
  10. 【Python笔记】pyspark.sql库
  11. 配置 Tapestry
  12. 阿里云Centos镜像虚拟机安装方法
  13. ggcor |相关系数矩阵可视化
  14. kali安装步骤失败 选择并安装软件_手机软件安装失败?吉米柚教你几招!
  15. 工业4.0时代 个性化定制掀起制造业新篇章
  16. Mysql 启动与关闭
  17. 上海Cloudera Hadoop大数据培训:CCAH、CCP:DE
  18. x230黑苹果的一点经验
  19. 生不带来,死不带去;留与来者,照着镜去。人体节律【智力、情绪、体力】
  20. 世界语到汉语和英语的自动翻译试验

热门文章

  1. npm创建angular项目
  2. python爬虫——使用selenium爬取微博数据(一)
  3. java 输出大于n的质数_Java 计算并打印第n个质数
  4. 如何恢复红米手机数据
  5. 【博弈】 各种博弈的搬运整理
  6. Redis集群模式源码分析
  7. 使用DirectPlay进行网络互联(3)
  8. 带蒙版的安卓剪辑软件_想用手机做自媒体?推荐这几款剪辑软件
  9. 技术培训 | 青云QingCloud 对象存储应用与实践
  10. [时间序列分析][4]--AR模型,MA模型,ARMA模型介绍