目录

  • Data set
  • Create SparkSession and Load data
  • EDA
  • Feature Engineering
  • Splitting Dataset
  • Train Model
  • Evaluation
  • Recommend Movie

Data set

  来自一个开源电影数据集,约10万条数据。

Create SparkSession and Load data

from pyspark.sql import SparkSessionspark = SparkSession.builder.appName('RS').getOrCreate()# load dataset
df = spark.read.csv('./Data/movie_ratings_df.csv', inferSchema=True, header=True)

EDA

df.show(10)
+------+------------+------+
|userId|       title|rating|
+------+------------+------+
|   196|Kolya (1996)|     3|
|    63|Kolya (1996)|     3|
|   226|Kolya (1996)|     5|
|   154|Kolya (1996)|     3|
|   306|Kolya (1996)|     5|
|   296|Kolya (1996)|     4|
|    34|Kolya (1996)|     5|
|   271|Kolya (1996)|     4|
|   201|Kolya (1996)|     4|
|   209|Kolya (1996)|     4|
+------+------------+------+
only showing top 10 rows
print((df.count(), len(df.columns)))
(100000, 3)
df.printSchema()
root|-- userId: integer (nullable = true)|-- title: string (nullable = true)|-- rating: integer (nullable = true)
# user recording
userCount_df = df.groupBy('userId').count().orderBy('count', ascending=False)# Spark's DataFrame --> Pandas's DataFrame
user_df = userCount_df.toPandas()
user_df.head()
userId count
0 405 737
1 655 685
2 13 636
3 450 540
4 276 518
user_df.tail()
userId count
938 242 20
939 571 20
940 873 20
941 475 20
942 36 20

Feature Engineering

  • label encode

    • Instance = stringIndexer(inputCol=‘xxx’, outputCol=‘yyy’)
    • model = Instance.fit(df)
    • new_df = model.transform(df)
  • reverse operation
    • Instance = IndexToString(inputCol=‘yyy’, outputCol=‘xxx’, labels=model.labels)
    • df = Instance.transform(new_df)
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, IndexToString
stringIndexer = StringIndexer(inputCol='title', outputCol='title_num')model = stringIndexer.fit(df)new_df  = model.transform(df)new_df.show(10)
+------+------------+------+---------+
|userId|       title|rating|title_num|
+------+------------+------+---------+
|   196|Kolya (1996)|     3|    287.0|
|    63|Kolya (1996)|     3|    287.0|
|   226|Kolya (1996)|     5|    287.0|
|   154|Kolya (1996)|     3|    287.0|
|   306|Kolya (1996)|     5|    287.0|
|   296|Kolya (1996)|     4|    287.0|
|    34|Kolya (1996)|     5|    287.0|
|   271|Kolya (1996)|     4|    287.0|
|   201|Kolya (1996)|     4|    287.0|
|   209|Kolya (1996)|     4|    287.0|
+------+------------+------+---------+
only showing top 10 rows
new_df.groupBy('title_num').count().orderBy('count', ascending=False).show(10)
+---------+-----+
|title_num|count|
+---------+-----+
|      0.0|  583|
|      1.0|  509|
|      2.0|  508|
|      3.0|  507|
|      4.0|  485|
|      5.0|  481|
|      6.0|  478|
|      7.0|  452|
|      8.0|  431|
|      9.0|  429|
+---------+-----+
only showing top 10 rows

Splitting Dataset

train_df, test_df = new_df.randomSplit([0.7, 0.3])print('train_df, (%d, %d)'%(train_df.count(), len(train_df.columns)))
print('test_df, (%d, %d)'%(test_df.count(), len(test_df.columns)))
train_df, (69917, 4)
test_df, (30083, 4)

Train Model

ALS

  • nonnegative=‘True’, 没有分数为负值的评分
  • coldStartStrategy=‘drop’,防止冷启动,分数为NaN
from pyspark.ml.recommendation import ALSrec = ALS(maxIter=10, regParam=0.01, userCol='userId', itemCol='title_num', ratingCol='rating', nonnegative=True,coldStartStrategy='drop')rs_model = rec.fit(train_df)

Evaluation

test_df.show(4)
+------+--------------------+------+---------+
|userId|               title|rating|title_num|
+------+--------------------+------+---------+
|     1|Apocalypse Now (1...|     3|     91.0|
|     1|    Apollo 13 (1995)|     4|     51.0|
|     1|Aristocats, The (...|     2|    575.0|
|     1|Army of Darkness ...|     4|    289.0|
+------+--------------------+------+---------+
only showing top 4 rows
test = test_df.select(['userId','title','title_num'])test_pred = rs_model.transform(test)test_pred.show()
+------+--------------------+---------+----------+
|userId|               title|title_num|prediction|
+------+--------------------+---------+----------+
|   463|That Thing You Do...|    148.0| 2.5579684|
|   251|That Thing You Do...|    148.0| 3.9390717|
|   193|That Thing You Do...|    148.0|  3.070894|
|   642|That Thing You Do...|    148.0| 3.9546824|
|   101|That Thing You Do...|    148.0| 3.6771653|
|   406|That Thing You Do...|    148.0| 3.0834832|
|   731|That Thing You Do...|    148.0| 2.8964741|
|   159|That Thing You Do...|    148.0|  5.029048|
|   606|That Thing You Do...|    148.0| 3.6251612|
|   336|That Thing You Do...|    148.0| 2.9238327|
|   330|That Thing You Do...|    148.0| 4.2745767|
|    93|That Thing You Do...|    148.0| 3.2026477|
|   654|That Thing You Do...|    148.0| 4.2684965|
|   152|That Thing You Do...|    148.0| 4.3452826|
|   760|That Thing You Do...|    148.0|  4.768305|
|   178|That Thing You Do...|    148.0| 4.1157365|
|   839|That Thing You Do...|    148.0| 4.2218776|
|   500|That Thing You Do...|    148.0| 2.9480288|
|   432|That Thing You Do...|    148.0| 2.7699072|
|   676|That Thing You Do...|    148.0| 2.6120367|
+------+--------------------+---------+----------+
only showing top 20 rows
from pyspark.ml.evaluation import RegressionEvaluatortest_pred = rs_model.transform(test_df)evaluate_result = RegressionEvaluator(metricName='rmse', predictionCol='prediction', labelCol='rating')rmse = evaluate_result.evaluate(test_pred)print('test rmse is %f'%rmse)
test rmse is 1.036787

Recommend Movies

nunique_movies = new_df.select('title').distinct()nunique_movies.count()
1664
# total movies
a = nunique_movies.alias('a')
user_id = 66
# user_id = 66,看过的电影
watched_movies = new_df.filter(new_df['userId'] == user_id).select('title').distinct()
b = watched_movies.alias('b')
# a join b
total_movies = a.join(b, a.title == b.title, how='left')
total_movies.show(25)
+--------------------+--------------------+
|               title|               title|
+--------------------+--------------------+
|   Annie Hall (1977)|                null|
|Heavenly Creature...|                null|
|       Psycho (1960)|                null|
|Snow White and th...|                null|
|Night of the Livi...|                null|
|When We Were King...|                null|
| If Lucy Fell (1996)|                null|
|    Fair Game (1995)|                null|
| Three Wishes (1995)|                null|
|         Cosi (1996)|                null|
|Paris, France (1993)|                null|
|Spanking the Monk...|                null|
|I'll Do Anything ...|                null|
|        Mondo (1996)|                null|
| Evil Dead II (1987)|                null|
|    Threesome (1994)|                null|
|Last Action Hero ...|                null|
|Reality Bites (1994)|                null|
|Colonel Chabert, ...|                null|
|   Blue Chips (1994)|                null|
|A Chef in Love (1...|                null|
|    Nico Icon (1995)|                null|
|English Patient, ...|English Patient, ...|
|Marvin's Room (1996)|                null|
|Crows and Sparrow...|                null|
+--------------------+--------------------+
only showing top 25 rows
user_66_not_watched_movies = total_movies.where(col('b.title').isNull()).select(a.title).distinct()print('user 66 没有看过的电影 %d'%user_66_not_watched_movies.count())
user 66 没有看过的电影 1626
user_66_not_watched_movies = user_66_not_watched_movies.withColumn('userId', lit(int(user_id)))user_66_not_watched_movies.show(10, False)
+--------------------------------------+------+
|title                                 |userId|
+--------------------------------------+------+
|Annie Hall (1977)                     |66    |
|Heavenly Creatures (1994)             |66    |
|Psycho (1960)                         |66    |
|Snow White and the Seven Dwarfs (1937)|66    |
|Night of the Living Dead (1968)       |66    |
|When We Were Kings (1996)             |66    |
|If Lucy Fell (1996)                   |66    |
|Fair Game (1995)                      |66    |
|Three Wishes (1995)                   |66    |
|Cosi (1996)                           |66    |
+--------------------------------------+------+
only showing top 10 rows

Top 10 Recommend Movies

user_66_df = model.transform(user_66_not_watched_movies)   # title >> title_numuser_66_rs = rs_model.transform(user_66_df).orderBy('prediction', ascending=False)user_66_rs.show(10)
+--------------------+------+---------+----------+
|               title|userId|title_num|prediction|
+--------------------+------+---------+----------+
|Ruby in Paradise ...|    66|    887.0|  6.453057|
|Misrables, Les (...|    66|    911.0|  5.965375|
| Apostle, The (1997)|    66|    572.0|  5.873897|
|Miami Rhapsody (1...|    66|   1022.0| 5.8026986|
|  Schizopolis (1996)|    66|   1372.0|  5.692012|
|       Harlem (1993)|    66|   1369.0|  5.661992|
|Mina Tannenbaum (...|    66|   1286.0|  5.637907|
|In the Bleak Midw...|    66|    996.0|  5.628146|
|Double vie de V...|    66|    904.0|  5.467951|
|     Faithful (1996)|    66|   1129.0| 5.2826753|
+--------------------+------+---------+----------+
only showing top 10 rows

Pyspark:电影推荐相关推荐

  1. 基于PySpark和ALS算法实现基本的电影推荐流程

    文章目录 1.PySpark简介 2.Pyspark接口用法 读取数据源 常用算子 完整的wordcount示例 3.基于PySpark和ALS的电影推荐流程 数据集背景 读取用户数据 训练模型 调用 ...

  2. 第四课.KNN电影推荐

    目录 基于近邻用户的协同过滤 基于近邻物品的协同过滤 相似度计算-Jaccard相似度 实验:基于KNN的电影推荐系统 简介 movielens 数据集 模型实现 基于近邻用户的协同过滤 基于近邻用户 ...

  3. python亲和性分析法推荐电影论文_数据挖掘-MovieLens数据集_电影推荐_亲和性分析_Aprioro算法...

    #!/usr/bin/env python2 # -*- coding: utf-8 -*- """ Created on Tue Feb  7 14:38:33 201 ...

  4. ML之RS:基于用户的CF+LFM实现的推荐系统(基于相关度较高的用户实现电影推荐)

    ML之RS:基于用户的CF+LFM实现的推荐系统(基于相关度较高的用户实现电影推荐) 目录 输出结果 实现代码 输出结果 实现代码 #ML之RS:基于CF和LFM实现的推荐系统 import nump ...

  5. 电影推荐之《哈利·波特与火焰杯》 隐私策略(Privacy policy)

    1.隐私政策涵盖您对本应用的使用. 2.电影推荐之<哈利·波特与火焰杯>不会收集.存储.分享您的任何个人信息或者与您的设备相关的信息.我们不会收集任何统计数据和分析数据,也不会跟踪用户的行 ...

  6. 基于混合云存储系统的电影推荐引擎小结

    基于混合云存储系统的电影推荐引擎 推荐算法部分是Mahout下的Taste实现的, 数据集采用GroupLens 的数据集合,将这些数据集转换到mysql数据库中 其中Taste:http://mah ...

  7. Python基于用户协同过滤算法电影推荐的一个小改进

    之前曾经推送过这个问题的一个实现,详见:Python基于用户协同过滤算法的电影推荐代码demo 在当时的代码中没有考虑一种情况,如果选出来的最相似用户和待测用户完全一样,就没法推荐电影了.所以,在实际 ...

  8. 1.3 基于协同过滤的电影推荐案例

    1.3 案例–基于协同过滤的电影推荐 学习目标 应用基于用户的协同过滤实现电影评分预测 应用基于物品的协同过滤实现电影评分预测 1 User-Based CF 预测电影评分 数据集下载 下载地址:Mo ...

  9. 基于协同过滤的电影推荐

    日萌社 人工智能AI:Keras PyTorch MXNet TensorFlow PaddlePaddle 深度学习实战(不定时更新) 1.4 案例--基于协同过滤的电影推荐 学习目标 应用基于用户 ...

  10. 计算机毕业设计JAVA电影推荐网站mybatis+源码+调试部署+系统+数据库+lw

    计算机毕业设计JAVA电影推荐网站mybatis+源码+调试部署+系统+数据库+lw 计算机毕业设计JAVA电影推荐网站mybatis+源码+调试部署+系统+数据库+lw 本源码技术栈: 项目架构:B ...

最新文章

  1. element vue 动态单选_软件更新丨vue-element-admin 4.0.0 beta 发布,后台集成方案
  2. python中关于操作时间的方法(一):使用time模块
  3. 第一届WebRTCon在上海举行
  4. 因为喜欢,所以选择它
  5. 数据分析学习笔记——数据可视化
  6. SAP云平台和第三方CRM解决方案(火锅)互联
  7. 打印用户在指定时间段内做过的SAP Fiori Launchpad personalization明细
  8. Sentinel控制台
  9. Qt获取本地网卡信息
  10. 【英语学习】【WOTD】 putsch 释义/词源/示例
  11. P50发布!网友:滚筒洗衣机也能打电话了
  12. PLSA隐变量主题模型的公式推导解惑
  13. poj 3420 Quad Tiling 【矩阵乘法】
  14. linux缺页异常处理--内核空间
  15. 关于空间分析的一点小思考
  16. Qt中mouseMoveEvent在MainWindow中使用
  17. java中BOM是什么_Java处理带BOM的文本情况是什么?
  18. windows系统的包管理器【choco】
  19. 西瓜视频下载软件有吗
  20. 【云原生 | Kubernetes 系列】---CephFS和OSS

热门文章

  1. 锁php_基于 Redis 实现分布式锁及对应的 PHP 实现源码
  2. Java 后端MD5加密
  3. 7 php程序的调试方法_PHP 程序员的调试技术
  4. java ssh 启动时间_java ssh项目启动异常说明
  5. eclipse生成ant build.xml打war包
  6. 使用Jest对原生TypeScript项目进行UI测试
  7. linux、mysql、nginx、tomcat 环境下压力测试的主要调试参数
  8. MapXtreme 2005 学习心得 了解新建MapXtreme项目结构(二)
  9. 三目运算符?:结合性
  10. Linux 建立ftp站点