第四部分-推荐系统-数据ETL

  • 本模块完成数据清洗,并将清洗后的数据load到Hive数据表里面去

前置准备:

spark +hive

vim $SPARK_HOME/conf/hive-site.xml <?xml version="1.0"?><?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration><property><name>hive.metastore.uris</name><value>thrift://hadoop001:9083</value></property></configuration>
  • 启动Hive metastore server

[root@hadoop001 conf]# nohup hive --service metastore &

[root@hadoop001 conf]# netstat -tanp | grep 9083
tcp 0 0 0.0.0.0:9083 0.0.0.0:* LISTEN 24787/java
[root@hadoop001 conf]#

测试:
[root@hadoop001 ~]# spark-shell --master local[2]

scala> spark.sql("select * from liuge_db.dept").show;
+------+-------+-----+
|deptno|  dname|  loc|
+------+-------+-----+
|     1|  caiwu| 3lou|
|     2|  renli| 4lou|
|     3|  kaifa| 5lou|
|     4|qiantai| 1lou|
|     5|lingdao|4 lou|
+------+-------+-----+

==》保证Spark SQL 能够访问到Hive 的元数据才行。

然而我们采用的是standalone模式:需要启动master worker
[root@hadoop001 sbin]# pwd
/root/app/spark-2.4.3-bin-2.6.0-cdh5.7.0/sbin
[root@hadoop001 sbin]# ./start-all.sh

[root@hadoop001 sbin]# jps
26023 Master
26445 Worker

Spark常用端口

8080 spark.master.ui.port    Master WebUI
8081    spark.worker.ui.port    Worker WebUI
18080   spark.history.ui.port   History server WebUI
7077    SPARK_MASTER_PORT       Master port
6066    spark.master.rest.port  Master REST port
4040    spark.ui.port           Driver WebUI

这个时候打开:http://hadoop001:8080/

开始项目Coding

IDEA+Scala+Maven进行项目的构建

步骤一: 新建scala项目后,可以参照如下pom进行配置修改

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.csylh</groupId><artifactId>movie-recommend</artifactId><version>1.0</version><inceptionYear>2008</inceptionYear><properties><scala.version>2.11.8</scala.version><spark.version>2.4.3</spark.version></properties><repositories><repository><id>scala-tools.org</id><name>Scala-Tools Maven2 Repository</name><url>http://scala-tools.org/repo-releases</url></repository></repositories><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.1.1</version></dependency><!--// 0.10.2.1--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.39</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency></dependencies><build><!--<sourceDirectory>src/main/scala</sourceDirectory>--><!--<testSourceDirectory>src/test/scala</testSourceDirectory>--><plugins><plugin><!-- see http://davidb.github.com/scala-maven-plugin --><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.1.3</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.13</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><!-- If you have classpath issue like NoDefClassError,... --><!-- useManifestOnlyJar>false</useManifestOnlyJar --><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin></plugins></build>
</project>

步骤二:新建com.csylh.recommend.dataclearer.SourceDataETLApp

import com.csylh.recommend.entity.{Links, Movies, Ratings, Tags}
import org.apache.spark.sql.{SaveMode, SparkSession}/*** Description:*    hadoop001  file:///root/data/ml/ml-latest 下的文件*    ====>  SparkSQL ETL*    ===>  load data to Hive数据仓库** @Author: 留歌36* @Date: 2019-07-12 13:48*/
object SourceDataETLApp{def main(args: Array[String]): Unit = {// 面向SparkSession编程val spark = SparkSession.builder()
//          .master("local[2]").enableHiveSupport() //开启访问Hive数据, 要将hive-site.xml等文件放入Spark的conf路径.getOrCreate()val sc = spark.sparkContext// 设置RDD的partitions 的数量一般以集群分配给应用的CPU核数的整数倍为宜, 4核8G ,设置为8就可以// 问题一:为什么设置为CPU核心数的整数倍?// 问题二:数据倾斜,拿到数据大的partitions的处理,会消耗大量的时间,因此做数据预处理的时候,需要考量会不会发生数据倾斜val minPartitions = 8//  在生产环境中一定要注意设置spark.sql.shuffle.partitions,默认是200,及需要配置分区的数量val shuffleMinPartitions = "8"spark.sqlContext.setConf("spark.sql.shuffle.partitions",shuffleMinPartitions)/*** 1*/import spark.implicits._val links = sc.textFile("file:///root/data/ml/ml-latest/links.txt",minPartitions) //DRIVER.filter(!_.endsWith(",")) //EXRCUTER.map(_.split(",")) //EXRCUTER.map(x => Links(x(0).trim.toInt, x(1).trim.toInt, x(2).trim.toInt)) //EXRCUTER.toDF()println("===============links===================:",links.count())links.show()// 把数据写入到HDFS上links.write.mode(SaveMode.Overwrite).parquet("/tmp/links")// 将数据从HDFS加载到Hive数据仓库中去spark.sql("drop table if exists links")spark.sql("create table if not exists links(movieId int,imdbId int,tmdbId int) stored as parquet")spark.sql("load data inpath '/tmp/links' overwrite into table links")/*** 2*/val movies = sc.textFile("file:///root/data/ml/ml-latest/movies.txt",minPartitions).filter(!_.endsWith(",")).map(_.split(",")).map(x => Movies(x(0).trim.toInt, x(1).trim.toString, x(2).trim.toString)).toDF()println("===============movies===================:",movies.count())movies.show()// 把数据写入到HDFS上movies.write.mode(SaveMode.Overwrite).parquet("/tmp/movies")// 将数据从HDFS加载到Hive数据仓库中去spark.sql("drop table if exists movies")spark.sql("create table if not exists movies(movieId int,title String,genres String) stored as parquet")spark.sql("load data inpath '/tmp/movies' overwrite into table movies")/*** 3*/val ratings = sc.textFile("file:///root/data/ml/ml-latest/ratings.txt",minPartitions).filter(!_.endsWith(",")).map(_.split(",")).map(x => Ratings(x(0).trim.toInt, x(1).trim.toInt, x(2).trim.toDouble, x(3).trim.toInt)).toDF()println("===============ratings===================:",ratings.count())ratings.show()// 把数据写入到HDFS上ratings.write.mode(SaveMode.Overwrite).parquet("/tmp/ratings")// 将数据从HDFS加载到Hive数据仓库中去spark.sql("drop table if exists ratings")spark.sql("create table if not exists ratings(userId int,movieId int,rating Double,timestamp int) stored as parquet")spark.sql("load data inpath '/tmp/ratings' overwrite into table ratings")/*** 4*/val tags = sc.textFile("file:///root/data/ml/ml-latest/tags.txt",minPartitions).filter(!_.endsWith(",")).map(x => rebuild(x))  // 注意这个坑的解决思路.map(_.split(",")).map(x => Tags(x(0).trim.toInt, x(1).trim.toInt, x(2).trim.toString, x(3).trim.toInt)).toDF()tags.show()// 把数据写入到HDFS上tags.write.mode(SaveMode.Overwrite).parquet("/tmp/tags")// 将数据从HDFS加载到Hive数据仓库中去spark.sql("drop table if exists tags")spark.sql("create table if not exists tags(userId int,movieId int,tag String,timestamp int) stored as parquet")spark.sql("load data inpath '/tmp/tags' overwrite into table tags")}/*** 该方法是用于处理不符合规范的数据* @param input* @return*/private def rebuild(input:String): String ={val a = input.split(",")val head = a.take(2).mkString(",")val tail = a.takeRight(1).mkStringval tag = a.drop(2).dropRight(1).mkString.replaceAll("\"","")val output = head + "," + tag + "," + tailoutput}
}

再有一些上面主类引用到的case 对象,你可以理解为Java 实体类

package com.csylh.recommend.entity/*** Description: 数据的schema** @Author: 留歌36* @Date: 2019-07-12 13:46*/
case class Links(movieId:Int,imdbId:Int,tmdbId:Int)
package com.csylh.recommend.entity/*** Description: TODO** @Author: 留歌36* @Date: 2019-07-12 14:09*/
case class Movies(movieId:Int,title:String,genres:String)
package com.csylh.recommend.entity/*** Description: TODO** @Author: 留歌36* @Date: 2019-07-12 14:10*/
case class Ratings(userId:Int,movieId:Int,rating:Double,timestamp:Int)
package com.csylh.recommend.entity/*** Description: TODO** @Author: 留歌36* @Date: 2019-07-12 14:11*/
case class Tags(userId:Int,movieId:Int,tag:String,timestamp:Int)

步骤三:将创建的项目进行打包上传到服务器
mvn clean package -Dmaven.test.skip=true

[root@hadoop001 ml]# ll -h movie-recommend-1.0.jar
-rw-r--r--. 1 root root 156K 10月 20 13:56 movie-recommend-1.0.jar
[root@hadoop001 ml]#

步骤四:提交运行上面的jar,编写shell脚本

[root@hadoop001 ml]# vim etl.sh
export HADOOP_CONF_DIR=/root/app/hadoop-2.6.0-cdh5.7.0/etc/hadoop

$SPARK_HOME/bin/spark-submit
–class com.csylh.recommend.dataclearer.SourceDataETLApp
–master spark://hadoop001:7077
–name SourceDataETLApp
–driver-memory 10g
–executor-memory 5g
/root/data/ml/movie-recommend-1.0.jar

步骤五:sh etl.sh 即可

先把数据写入到HDFS上
创建Hive表
load 数据到表

sh etl.sh之前:

[root@hadoop001 ml]# hadoop fs -ls /tmp
19/10/20 19:26:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
drwx------   - root supergroup          0 2019-04-01 16:27 /tmp/hadoop-yarn
drwx-wx-wx   - root supergroup          0 2019-04-02 09:33 /tmp/hive[root@hadoop001 ml]# hadoop fs -ls /user/hive/warehouse
19/10/20 19:27:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[root@hadoop001 ml]#

sh etl.sh之后:
这里的shell 是 ,spark on standalone,后面会spark on yarn。其实也没差,都可以

[root@hadoop001 ~]# hadoop fs -ls /tmp
19/10/20 19:43:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 6 items
drwx------   - root supergroup          0 2019-04-01 16:27 /tmp/hadoop-yarn
drwx-wx-wx   - root supergroup          0 2019-04-02 09:33 /tmp/hive
drwxr-xr-x   - root supergroup          0 2019-10-20 19:42 /tmp/links
drwxr-xr-x   - root supergroup          0 2019-10-20 19:42 /tmp/movies
drwxr-xr-x   - root supergroup          0 2019-10-20 19:43 /tmp/ratings
drwxr-xr-x   - root supergroup          0 2019-10-20 19:43 /tmp/tags
[root@hadoop001 ~]# hadoop fs -ls /user/hive/warehouse
19/10/20 19:43:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 4 items
drwxr-xr-x   - root supergroup          0 2019-10-20 19:42 /user/hive/warehouse/links
drwxr-xr-x   - root supergroup          0 2019-10-20 19:42 /user/hive/warehouse/movies
drwxr-xr-x   - root supergroup          0 2019-10-20 19:43 /user/hive/warehouse/ratings
drwxr-xr-x   - root supergroup          0 2019-10-20 19:43 /user/hive/warehouse/tags
[root@hadoop001 ~]#

这样我们就把数据etl到我们的数据仓库里了,接下来,基于这份基础数据做数据加工

有任何问题,欢迎留言一起交流~~
更多文章:基于Spark的电影推荐系统:https://blog.csdn.net/liuge36/column/info/29285

基于Spark的电影推荐系统(推荐系统~2)相关推荐

  1. 基于Spark的电影推荐系统(电影网站)

    第一部分-电影网站: 软件架构: SpringBoot+Mybatis+JSP 项目描述:主要实现电影网站的展现 和 用户的所有动作的地方 技术选型: 技术 名称 官网 Spring Boot 容器 ...

  2. 基于Spark的电影推荐系统(推荐系统~1)

    第四部分-推荐系统-项目介绍 行业背景: 快速:Apache Spark以内存计算为核心 通用 :一站式解决各个问题,ADHOC SQL查询,流计算,数据挖掘,图计算 完整的生态圈 只要掌握Spark ...

  3. 基于Spark的电影推荐系统(推荐系统~4)

    第四部分-推荐系统-模型训练 本模块基于第3节 数据加工得到的训练集和测试集数据 做模型训练,最后得到一系列的模型,进而做 预测. 训练多个模型,取其中最好,即取RMSE(均方根误差)值最小的模型 说 ...

  4. 基于Spark的电影推荐系统(推荐系统~5)

    第四部分-推荐系统-离线推荐 本模块基于第4节得到的模型,开始为用户做离线推荐,推荐用户最有可能喜爱的5部电影. 说明几点 1.主要分为两个模块.其一是为 单个随机用户 做推荐,其二是为 所有用户做推 ...

  5. 电影推荐系统 python简书_基于Spark的电影推荐系统(实战简介)

    ## 写在前面 一直不知道这个专栏该如何开始写,思来想去,还是暂时把自己对这个项目的一些想法 和大家分享 的形式来展现.有什么问题,欢迎大家一起留言讨论. 这个项目的源代码是在https://gith ...

  6. 基于Spark的电影推荐系统(推荐系统~7)

    第四部分-推荐系统-实时推荐 本模块基于第4节得到的模型,开始为用户做实时推荐,推荐用户最有可能喜爱的5部电影. 说明几点 1.数据来源是 testData 测试集的数据.这里面的用户,可能存在于训练 ...

  7. 基于Spark实现电影点评系统用户行为分析—RDD篇(一)

    文章目录 1.项目背景 2.数据描述 3.代码实现 1.项目背景 电影推荐系统(MovieLens)是美国明尼苏达大学(Minnesota)计算机科学与工程学院的GroupLens项目组创办的,是一个 ...

  8. 基于Spark实现电影点评系统用户行为分析—DataFrame篇(二)

    文章目录 1.介绍 2.业务统计 3.代码实现 1.介绍 Spark SQL有三种不同实现方式:(1)使用DataFrame与RDD结合的方式.(2)纯粹使用DataFrame的方式.(3)使用Dat ...

  9. 基于spark的电影数据分析

    目 录 摘 要 I Abstract II 1 绪论 1 1.1 选题背景及意义 1 1.2 研究现状 2 1.3 研究内容及论文组织结构 2 2 关键技术和工具环境 4 2.1 IDEA简介 4 2 ...

  10. 基于Spark MLlib平台的协同过滤算法---电影推荐系统

    协同过滤算法概述 基于模型的协同过滤应用---电影推荐 实时推荐架构分析     一.协同过滤算法概述 本人对算法的研究,目前还不是很深入,这里简单的介绍下其工作原理. 通常,协同过滤算法按照数据使用 ...

最新文章

  1. 深度 | 人工智能全局概览:通用智能的当前困境和未来可能
  2. 关于css的基础知识点
  3. sklearn自学指南(part17)--稳健回归-异常值和建模误差
  4. 区块链以信用为基础,所以目前在中国不可行.
  5. 软件工程综合实践专题——个人博客作业1
  6. Koa 中间件的执行
  7. 基于ARM IP的看门狗设计与功能验证
  8. 点击类选择器,获取索引
  9. Java使用poi导出Excel之格式设置
  10. pictureBox sizemode=zoom时图片像素坐标
  11. Linux的网卡配置 vi /etc/sysconfig/network-scripts/ifcfg-ens33
  12. Centos7下为nvidia显卡安装驱动
  13. 不执著才叫看破,不完美才叫人生。
  14. MOS管损坏典型问题分析
  15. ENGINEER01 - 分区规划和使用,LVM逻辑卷
  16. 一篇简易的MODBUS 转 profibus 网关 将ABB ACS800变频器接入 profibus 总线案例
  17. mac jade 安装
  18. JS JQuery添加、替换、删除元素class属性
  19. Android 制作一款短视频app软件可在线观看(视频实时更新)
  20. 研究型论文_基于特征值分布和人工智能的网络入侵检测系统的研究与实现

热门文章

  1. 树莓派4 PWM控制风扇转速
  2. 从吃喝玩乐到学习,71个良心网站,看完你会回来点收藏
  3. 下载和安装R、RStudio !~~~
  4. php递归算法的简单示例,php递归函数 php递归算法经典实例大全 | 帮助信息-动天数据...
  5. 使用tushare获取A股数据
  6. 屏幕距离和坐便转换工具_简单好用的视频分辨率转换器推荐
  7. linux 单网卡 双ip,Linux操作系统单网卡双IP的设置
  8. 数据库精选 60 道高频面试题(含答案),值得收藏
  9. 腾讯全球首款JS代码坦克 CodeTank
  10. Python3使用BFS实现湖北省到全国省级行政区