学完以后,第一感受,确实会比python代码来得简洁

先上代码

第一步,用pysql处理原始数据,包含计算冷却得分(不是今天的重点,暂不会在后面细说了)

import findspark
findspark.init()
import pathlib
import sys
import json
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
import pyspark.sql.functions as psf
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import IntegerType
import math
import argparse
import time
import osprojectdir = str(pathlib.Path(os.path.abspath(__file__)).parent.parent.parent)
sys.path.append(projectdir)
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from collections import defaultdictdef get_dataframe():# 读取csv文件至dataframe# Spark -- DataFrame按指定分隔符读取和写入文件 https://blog.csdn.net/Aeve_imp/article/details/107520678df = spark.read.format('csv').option('sep', '\t').option('header', True).load(r"file:///home/admin/pyspark_script3/data/4days/history_behavior_data/*.csv")# df.show(2,False)"""+-------+--------+--------+--------+----------+----------+----------------+-----------+--------+|user_id|video_id|is_watch|is_share|is_collect|is_comment|watch_start_time|watch_label|pt_d    |+-------+--------+--------+--------+----------+----------+----------------+-----------+--------+|2897092|41270   |0       |0       |0         |0         |null            |0          |20210428||4849237|33564   |0       |0       |0         |0         |null            |0          |20210428|+-------+--------+--------+--------+----------+----------+----------------+-----------+--------+"""# 划分训练集测试集df.createOrReplaceTempView('df')train_sql = """select *from dfwhere pt_d<='20210430' limit 50"""test_sql = """select *from dfwhere pt_d='20210501'"""train_df = spark.sql(train_sql)test_df = spark.sql(test_sql)# test_df.show(2, False)"""+-------+--------+--------+--------+----------+----------+----------------+-----------+--------+|user_id|video_id|is_watch|is_share|is_collect|is_comment|watch_start_time|watch_label|pt_d    |+-------+--------+--------+--------+----------+----------+----------------+-----------+--------+|4256452|30150   |0       |0       |0         |0         |null            |0          |20210501||4256452|10499   |0       |0       |0         |0         |null            |0          |20210501|+-------+--------+--------+--------+----------+----------+----------------+-----------+--------+"""return train_df, test_dfdef build_model(df):df.createOrReplaceTempView("df")def compute_score(input):""":param input:[" "]:return:"""line=input[0].split(",")a = int(line[0])b = int(line[1])c = int(line[2])d = float(line[3])e = int(line[4])score = (a * 2 + b * 2 + c * 2 + 2 * d / 9.0) + 1   #当日得分值域[1:9]alpha = -(math.log(1.0 / 9.0)) / (3 * 1.0)day_diff = float(20210430 - e)cooling_score = float(1 + score * math.exp(-alpha * day_diff))  #冷却得分值域[1:10]return float(cooling_score)spark.udf.register("compute_score", compute_score)x_sql="""select user_id,video_id,compute_score(collect_list(concat_ws(',',is_share,is_collect,is_comment,watch_label,pt_d))) as cooling_scorefrom dfgroup by user_id,video_id"""x_df=spark.sql(x_sql)   #得到每条含冷却得分的行为记录表x_df#x_df.show(10,False)"""+-------+--------+--------+-------------+|user_id|video_id|pt_d    |cooling_score|+-------+--------+--------+-------------+|100    |15077   |20210429|0.5          ||100    |26061   |20210430|1.0          ||100    |32054   |20210430|1.0          ||100    |41270   |20210429|0.5          ||100    |45295   |20210428|0.25         ||100000 |46900   |20210429|0.5          ||1000001|12968   |20210428|0.25         ||1000003|2946    |20210430|1.0          ||1000004|29808   |20210430|1.0          ||1000006|25416   |20210428|0.25         |+-------+--------+--------+-------------+"""x_df.createOrReplaceTempView('x_df')y_sql="""select user_id,video_id,sum (cooling_score) as scorefrom x_dfgroup by user_id,video_idorder by user_id,video_id"""y_df=spark.sql(y_sql)   # 得到每条含冷却得分的行为记录表x_df#y_df.show()'''+-------+--------+------------------+|user_id|video_id|             score|+-------+--------+------------------+|1000442|   11926|1.2311204247835448||1000442|   47688|1.2311204247835448||1004457|   13497|1.2311204247835448||1004457|   20202|1.2311204247835448||1004457|   23551|1.2311204247835448|
'''return y_df

第二步,计算电影之间的相似度(包含将数据转换成稀疏向量喂入MinHashLSH内,得到物品相似度矩阵)

# 调用函数
def run_main():###获取数据train, test = get_dataframe()train=build_model(train)#test=build_model(test)"""+-------+--------+------------------+|user_id|video_id|             score|+-------+--------+------------------+|1000442|   11926|1.2311204247835448||1000442|   47688|1.2311204247835448|"""train=train.rdd#test=test.rdddef takeSecond(elem):return elem[2]def getTopN(x, k):x.sort(key=takeSecond, reverse=True)x = x[:k]return xdef toCSVLine(data):output_str = str(data[0]) + "\t"return output_str + ','.join([str(d[1]) for d in data[1]])#unionRDD = train.union(test)unionRDD = train#因为没有用到验证集,所以unionRDD就是train'''
+-------+--------+------------------+
|user_id|video_id|             score|
+-------+--------+------------------+
|1120079|   41040|1.2311204247835448|
|1682647|   41270|1.2311204247835448|
|1828321|   41270|1.2311204247835448|
|1936005|   10249|1.2311204247835448|
|1936005|   12968|1.2311204247835448|
|1936005|   41040|1.2311204247835448|
|1936005|   41270|1.2311204247835448|
|1936005|    6693|1.2311204247835448|'''userItemRDD = unionRDD.map(lambda x: (x[0], x[1]))# 取上表前两列# 基于LSH的操作max_user_id = userItemRDD.map(lambda x: int(x[0])).distinct().max() #取userid的最大值#5906637itemUserRDD = userItemRDD.map(lambda x: (x[1], [x[0]])).reduceByKey(lambda x, y: x + y) #得到 每个itemid被哪些userid观看'''+-----+--------------------+|   _1|                  _2|+-----+--------------------+| 3717|[1004457, 100520,...||  248|[1122683, 4372546...||35040|[1122683, 1878054...||37407|[1122683, 1620767...||41706|           [1435007]||33419|           [1931401]||17444|           [2271444]|'''itemUserRDD = itemUserRDD.map(lambda x: (x[0], max_user_id + 1, x[1]))#在上表的中间插入了一列max_user_id,为方便后面将数据转化为稀疏向量'''+-----+-------+---------------------------+|_1   |_2     |_3                         |+-----+-------+---------------------------+|33564|5709683|[4849237]                  ||14061|5709683|[2696565]                  ||26381|5709683|[3123010]                  ||29786|5709683|[4849237]                  ||6693 |5709683|[1936005, 2542704, 4247864]|
'''# 将数据转化为稀疏向量表示,如向量(1.0,0.0,1.0,3.0用稀疏格式表示为(4,[0,2,3],[1.0,1.0,3.0]) 第一个4表示向量的长度(元素个数),[0,2,3]就是indices数组,[1.0,1.0,3.0]是values数组 表示向量0的位置的值是1.0,2的位置的值是1.0,而3的位置的值是3.0,其他的位置都是0def get_feature_list_from_partition(iterator):result = []for arr in iterator:arr_tmp = list(set(arr[2]))arr_tmp.sort()#print(arr_tmp)result.append((arr[0], Vectors.sparse(arr[1],  arr_tmp, [1] * len(arr_tmp))))return result'''['5709682', '1120079', '2598452', '1936005']['1120079', '1936005', '2598452', '5709682'][('41040', SparseVector(5709683, {1120079: 1.0, 1936005: 1.0, 2598452: 1.0, 5709682: 1.0}))]['2897092', '1828321', '2332195', '3735127', '1936005', '2608535', '1682647']['1682647', '1828321', '1936005', '2332195', '2608535', '2897092', '3735127'][('41040', SparseVector(5709683, {1120079: 1.0, 1936005: 1.0, 2598452: 1.0, 5709682: 1.0})), ('41270', SparseVector(5709683, {1682647: 1.0, 1828321: 1.0, 1936005: 1.0, 2332195: 1.0, 2608535: 1.0, 2897092: 1.0, 3735127: 1.0}))]['1936005']['1936005'][('41040', SparseVector(5709683, {1120079: 1.0, 1936005: 1.0, 2598452: 1.0, 5709682: 1.0})), ('41270', SparseVector(5709683, {1682647: 1.0, 1828321: 1.0, 1936005: 1.0, 2332195: 1.0, 2608535: 1.0, 2897092: 1.0, 3735127: 1.0})), ('10249', SparseVector(5709683, {1936005: 1.0}))]['1936005']['1936005'][('41040', SparseVector(5709683, {1120079: 1.0, 1936005: 1.0, 2598452: 1.0, 5709682: 1.0})), ('41270', SparseVector(5709683, {1682647: 1.0, 1828321: 1.0, 1936005: 1.0, 2332195: 1.0, 2608535: 1.0, 2897092: 1.0, 3735127: 1.0})), ('10249', SparseVector(5709683, {1936005: 1.0})), ('12968', SparseVector(5709683, {1936005: 1.0}))]['4247864', '1936005', '2542704']['1936005', '2542704', '4247864']'''#以上为get_feature_list_from_partition函数处理过程item_vec_rdd = itemUserRDD.mapPartitions(get_feature_list_from_partition)#化为稀疏向量'''+-----+-------------------------------------------------+|33564|(5709683,[4849237],[1.0])                        ||14061|(5709683,[2696565],[1.0])                        ||26381|(5709683,[3123010],[1.0])                        ||29786|(5709683,[4849237],[1.0])                        ||6693 |(5709683,[1936005,2542704,4247864],[1.0,1.0,1.0])||12968|(5709683,[1936005],[1.0])                        ||11907|(5709683,[19390],[1.0])                          ||31294|(5709683,[2008802],[1.0])                        ||4280 |(5709683,[2008802],[1.0])                        ||39583|(5709683,[3735127],[1.0])                        |+-----+-------------------------------------------------+'''item_vec_df = item_vec_rdd.toDF(["item", "features"])#给每一列取名'''+-----+--------------------+| item|            features|+-----+--------------------+|41040|(5709683,[1120079...||41270|(5709683,[1682647...||10249|(5709683,[1936005...||12968|(5709683,[1936005...|| 6693|(5709683,[1936005...||11907|(5709683,[19390],...||22472|(5709683,[2008802...|'''mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5) # 给mhl模型传入参数model = mh.fit(item_vec_df) # 训练模型#计算物品之间的杰卡德距离,阈值为0.98item_sim_df = model.approxSimilarityJoin(item_vec_df, item_vec_df, 0.98, distCol="JaccardDistance") \.select(psf.col("datasetA.item").alias("idA"),psf.col("datasetB.item").alias("idB"),psf.col("JaccardDistance")).orderBy(psf.col("datasetA.item"), psf.col("JaccardDistance"))'''+-----+-----+------------------+|  idA|  idB|   JaccardDistance|+-----+-----+------------------+|10249|12968|               0.0||10249|10249|               0.0||10249| 6693|0.6666666666666667||10249|41040|              0.75||10249|41270|0.8571428571428572||11318|44601|               0.0||11318|11318|               0.0||11318| 6228|               0.0||11318|47815|               0.0||11318| 6693|0.6666666666666667|'''#筛掉物品id相同的行,并将杰卡德距离转变为杰卡德相似度,值越大说明相似度越高item_sim_rdd = item_sim_df.rdd.filter(lambda x: x.idA != x.idB) \.map(lambda x: (x.idA, x.idB, 1 - x.JaccardDistance))'''+-----+-----+-------------------+|   _1|   _2|                 _3|+-----+-----+-------------------+|10249|12968|                1.0||10249| 6693|0.33333333333333326||10249|41040|               0.25||11318|47815|                1.0||11318| 6228|                1.0||11318|44601|                1.0||11318| 6693|0.33333333333333326||12968|10249|                1.0||12968| 6693|0.33333333333333326||12968|41040|               0.25|+-----+-----+-------------------+'''rdd13 = item_sim_rdd.map(lambda data: ','.join(str(d) for d in data))#print(type(rdd13))#<class 'pyspark.rdd.PipelinedRDD'>PipelinedRDD操作被流水线化并发送到worker;代码从上到下执行。它是RDD的一个子类

第三步,(相似度*冷却得分=兴趣值)通过兴趣值给用户推荐电影

    k = 5 # 每个物品获取5个近邻物品unionRDD = unionRDD.distinct()#去重'''+-------+--------+------------------+|user_id|video_id|score             |+-------+--------+------------------+|1936005|41040   |1.2311204247835448||2332195|41270   |1.2311204247835448||19390  |11907   |1.2311204247835448||3735127|18399   |1.2311204247835448||3123010|26381   |1.2311204247835448||1828321|41270   |1.2311204247835448||1936005|6693    |1.2311204247835448||2542704|6693    |1.2311204247835448||3735127|44786   |1.2311204247835448||2598452|41040   |1.2311204247835448|+-------+--------+------------------+'''#getTopN(x[1], k)取x[1]数组前K项,topkSim为每个物品与5个以内近邻物品的相似度topkSim = item_sim_rdd.map(lambda x: (x[0], (x[0], x[1], x[2]))).groupByKey().map(lambda x: (x[0], list(x[1]))).map(lambda x: (x[0], getTopN(x[1], k))).flatMap(lambda x: x[1]).map(lambda x: (x[0], (x[1], x[2])))'''+-----+--------------------+|   _1|                  _2|+-----+--------------------+|37639|        [35839, 1.0]||37639|        [29786, 1.0]||37639|        [31328, 1.0]||37639|        [46749, 1.0]||37639|        [33564, 1.0]||26744|        [40027, 1.0]||26744|[6693, 0.33333333...||24774|        [26381, 1.0]|   '''R2 = topkSim.join(unionRDD.map(lambda x: (x[1], (x[0], x[2]))))'''
+----+---------------------------------------------+
|_1  |_2                                           |
+----+---------------------------------------------+
|3139|[[33680, 1.0], [2008802, 1.2311204247835448]]|
|3139|[[31294, 1.0], [2008802, 1.2311204247835448]]|
|3139|[[4280, 1.0], [2008802, 1.2311204247835448]] |
|3139|[[28149, 1.0], [2008802, 1.2311204247835448]]|
|3139|[[36100, 1.0], [2008802, 1.2311204247835448]]|
+----+---------------------------------------------+'''R3 = R2.map(lambda x: ((x[1][1][0], x[1][0][0]), float(x[1][1][1]) * float(x[1][0][1])))#相似度*评分=兴趣值'''+----------------+-------------------+|              _1|                 _2|+----------------+-------------------+|[1936005, 10249]| 1.2311204247835448|| [1936005, 6693]|0.41037347492784815||[1936005, 41040]| 0.3077801061958862||[1936005, 41270]|0.17587434639764918||[1936005, 12968]| 1.2311204247835448|| [1936005, 6693]|0.41037347492784815||[1936005, 41040]| 0.3077801061958862||[1936005, 41270]|0.17587434639764918|'''R4 = R3.reduceByKey(lambda x, y: x + y)#将键相同的合并,兴趣值相加'''+----------------+-------------------+|              _1|                 _2|+----------------+-------------------+|[2542704, 10249]|0.41037347492784815||[4849237, 29786]|  6.155602123917724||[2008802, 31294]|  4.308921486742407||[2542704, 44601]|  3.693361274350634||[3735127, 39583]|  6.155602123917724||[4849237, 28149]| 0.6155602123917724||[2598452, 41270]|0.12311204247835444|| [3735127, 6693]|0.13679115830928282||[5709682, 12968]| 0.3077801061958862||[18ui'''#R4.toDF().show()N = 10#筛掉用户看过的电影,不予推荐R5 = R4.leftOuterJoin(unionRDD.map(lambda x: ((x[0], x[1]), 1))).filter(lambda x: x[1][1] == None).map(lambda x: (x[0][0], (x[0][0], x[0][1], x[1][0])))'''+-------+-------------------------------------+|_1     |_2                                   |+-------+-------------------------------------+|4849237|[4849237, 31294, 0.6155602123917724] ||1828321|[1828321, 41040, 0.12311204247835444]||2332195|[2332195, 41040, 0.12311204247835444]||1682647|[1682647, 41040, 0.12311204247835444]||1120079|[1120079, 6693, 0.20518673746392407] |+-------+-------------------------------------+'''R6 = R5.groupByKey().map(lambda x: (x[0], list(x[1])))R7 = R6.map(lambda x: (x[0], getTopN(x[1], N)))#得到前10个推荐结果#R7.toDF().show(5,False)'''+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------+|_1     |_2                                                                                                                                                     |+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------+|2008802|[[2008802, 29786, 0.6155602123917724]]                                                                                                                 ||4849237|[[4849237, 48824, 0.6155602123917724], [4849237, 28149, 0.6155602123917724], [4849237, 3139, 0.6155602123917724], [4849237, 31294, 0.6155602123917724]]||2332195|[[2332195, 10249, 0.17587434639764918], [2332195, 12968, 0.17587434639764918], [2332195, 6693, 0.13679115830928282]]                                   ||1828321|[[1828321, 12968, 0.17587434639764918], [1828321, 10249, 0.17587434639764918], [1828321, 6693, 0.13679115830928282]]                                   ||1682647|[[1682647, 10249, 0.17587434639764918], [1682647, 12968, 0.17587434639764918], [1682647, 6693, 0.13679115830928282]]                                   |+-------+-------------------------------------------------------------------------------------------'''# R9 = R7.map(toCSVLine)# R9.coalesce(1).saveAsTextFile('file:///home/admin/pyspark_script3/data/4days/' + str(int(time.time())))
if __name__ == '__main__':# if len(sys.argv) > 0:#     ArgsUtils().deal_args(sys.argv, config_dict)conf = SparkConf().setMaster("local[4]").setAppName("movie_features_handle")spark = SparkSession.builder.config(conf=conf).getOrCreate()spark.sparkContext.setLogLevel("ERROR")sc = spark.sparkContextrun_main()spark.stop()

附录1:

从topkSim开始,我用语言注释说不清楚,所以单独拎出来运行,结合图片理解

1.topkSim拿出来单独运行了一下,看数据结构,如下,此段不需要出现在推荐算法代码中

topkSim = item_sim_rdd.map(lambda x: (x[0], (x[0], x[1], x[2])))
'''
+-----+----------------------------------+
|_1   |_2                                |
+-----+----------------------------------+
|10249|[10249, 12968, 1.0]               |
|10249|[10249, 6693, 0.33333333333333326]|
|10249|[10249, 41040, 0.25]              |
|10249|[10249, 41270, 0.1428571428571428]|
|11318|[11318, 47815, 1.0]               |
|11318|[11318, 6228, 1.0]                |
|11318|[11318, 44601, 1.0]               |
|11318|[11318, 6693, 0.33333333333333326]|
|12968|[12968, 10249, 1.0]               |
|12968|[12968, 6693, 0.33333333333333326]|
+-----+----------------------------------+'''
topkSim = item_sim_rdd.map(lambda x: (x[0], (x[0], x[1], x[2]))).groupByKey()
'''
+-----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_1   |_2                                                                                                                                                                                                                                                                 |
+-----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|11318|[[[11318, 44601, 1.0], [11318, 6228, 1.0], [11318, 47815, 1.0], [11318, 6693, 0.33333333333333326]], 0, 4]                                                                                                                                                         |
|9308 |[[[9308, 44786, 1.0], [9308, 18399, 1.0], [9308, 842, 1.0], [9308, 39583, 1.0], [9308, 45507, 1.0], [9308, 41270, 0.1428571428571428]], 0, 6]                                                                                                                      |
|33680|[[[33680, 31294, 1.0], [33680, 4280, 1.0], [33680, 28149, 1.0], [33680, 3139, 1.0], [33680, 36100, 1.0], [33680, 48824, 1.0], [33680, 22472, 0.5]], 0, 7]                                                                                                          |
|46749|[[[46749, 29786, 1.0], [46749, 33564, 1.0], [46749, 35839, 1.0], [46749, 37639, 1.0], [46749, 31328, 1.0], [46749, 22472, 0.5]], 0, 6]                                                                                                                             |
|39583|[[[39583, 44786, 1.0], [39583, 9308, 1.0], [39583, 18399, 1.0], [39583, 45507, 1.0], [39583, 842, 1.0], [39583, 41270, 0.1428571428571428]], 0, 6]                                                                                                                 |
|26744|[[[26744, 40027, 1.0]], 0, 1]                                                                                                                                                                                                                                      |
|6693 |[[[6693, 6228, 0.33333333333333326], [6693, 10249, 0.33333333333333326], [6693, 11318, 0.33333333333333326], [6693, 12968, 0.33333333333333326], [6693, 44601, 0.33333333333333326], [6693, 47815, 0.33333333333333326], [6693, 41040, 0.16666666666666663]], 0, 7]|
|28149|[[[28149, 33680, 1.0], [28149, 48824, 1.0], [28149, 3139, 1.0], [28149, 31294, 1.0], [28149, 36100, 1.0], [28149, 4280, 1.0], [28149, 22472, 0.5]], 0, 7]                                                                                                          |
|35839|[[[35839, 31328, 1.0], [35839, 37639, 1.0], [35839, 46749, 1.0], [35839, 29786, 1.0], [35839, 33564, 1.0], [35839, 22472, 0.5]], 0, 6]                                                                                                                             |
|10249|[[[10249, 12968, 1.0], [10249, 6693, 0.33333333333333326], [10249, 41040, 0.25]], 0, 3]                                                                                                                                                                            |
+-----+-------------------------------------------------------------------------------------------
'''
.map(lambda x: (x[0], list(x[1])))
'''
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_1   |_2                                                                                                                                                                                                                                                                                             |
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|11318|[[11318, 44601, 1.0], [11318, 47815, 1.0], [11318, 6228, 1.0], [11318, 6693, 0.33333333333333326]]                                                                                                                                                                                             |
|9308 |[[9308, 39583, 1.0], [9308, 842, 1.0], [9308, 44786, 1.0], [9308, 45507, 1.0], [9308, 18399, 1.0], [9308, 41270, 0.1428571428571428]]                                                                                                                                                          |
|33680|[[33680, 36100, 1.0], [33680, 3139, 1.0], [33680, 48824, 1.0], [33680, 4280, 1.0], [33680, 28149, 1.0], [33680, 31294, 1.0], [33680, 22472, 0.5]]                                                                                                                                              |
|46749|[[46749, 35839, 1.0], [46749, 33564, 1.0], [46749, 37639, 1.0], [46749, 29786, 1.0], [46749, 31328, 1.0], [46749, 22472, 0.5]]                                                                                                                                                                 |
|39583|[[39583, 9308, 1.0], [39583, 45507, 1.0], [39583, 18399, 1.0], [39583, 44786, 1.0], [39583, 842, 1.0], [39583, 41270, 0.1428571428571428]]                                                                                                                                                     |
|26744|[[26744, 40027, 1.0], [26744, 6693, 0.33333333333333326]]                                                                                                                                                                                                                                      |
|6693 |[[6693, 44601, 0.33333333333333326], [6693, 6228, 0.33333333333333326], [6693, 40027, 0.33333333333333326], [6693, 10249, 0.33333333333333326], [6693, 47815, 0.33333333333333326], [6693, 12968, 0.33333333333333326], [6693, 26744, 0.33333333333333326], [6693, 11318, 0.33333333333333326]]|
|28149|[[28149, 36100, 1.0], [28149, 4280, 1.0], [28149, 3139, 1.0], [28149, 31294, 1.0], [28149, 48824, 1.0], [28149, 33680, 1.0], [28149, 22472, 0.5]]                                                                                                                                              |
|35839|[[35839, 37639, 1.0], [35839, 31328, 1.0], [35839, 33564, 1.0], [35839, 46749, 1.0], [35839, 29786, 1.0], [35839, 22472, 0.5]]                                                                                                                                                                 |
|10249|[[10249, 12968, 1.0], [10249, 6693, 0.33333333333333326], [10249, 41040, 0.25]]                                                                                                                                                                                                                |
+-----+----------------------------------------------------------------------------------
.map(lambda x: (x[0], getTopN(x[1], k)))
'''
+-----+---------------------------------------------------------------------------------------------------------------------+
|_1   |_2                                                                                                                   |
+-----+---------------------------------------------------------------------------------------------------------------------+
|26744|[[26744, 40027, 1.0], [26744, 6693, 0.33333333333333326]]                                                            |
|6228 |[[6228, 11318, 1.0], [6228, 47815, 1.0], [6228, 44601, 1.0]]                                                         |
|4280 |[[4280, 28149, 1.0], [4280, 31294, 1.0], [4280, 3139, 1.0], [4280, 48824, 1.0], [4280, 33680, 1.0]]                  |
|11318|[[11318, 47815, 1.0], [11318, 44601, 1.0], [11318, 6228, 1.0]]                                                       |
|35839|[[35839, 37639, 1.0], [35839, 31328, 1.0], [35839, 29786, 1.0], [35839, 46749, 1.0], [35839, 33564, 1.0]]            |
|28149|[[28149, 31294, 1.0], [28149, 33680, 1.0], [28149, 48824, 1.0], [28149, 3139, 1.0], [28149, 4280, 1.0]]              |
|9308 |[[9308, 45507, 1.0], [9308, 39583, 1.0], [9308, 44786, 1.0], [9308, 842, 1.0], [9308, 18399, 1.0]]                   |
|41040|[[41040, 12968, 0.25], [41040, 10249, 0.25], [41040, 6693, 0.16666666666666663], [41040, 41270, 0.09999999999999998]]|
|10249|[[10249, 12968, 1.0], [10249, 6693, 0.33333333333333326], [10249, 41040, 0.25], [10249, 41270, 0.1428571428571428]]  |
|22472|[[22472, 31294, 0.5], [22472, 46749, 0.5], [22472, 33564, 0.5], [22472, 29786, 0.5], [22472, 48824, 0.5]]            |
+-----+---------------------------------------------------------------------------------------------------------------------+
'''
.flatMap(lambda x: x[1])#把上面的列表第二列每一行中的每一个【】变成一列
'''
+-----+-----+-------------------+
|_1   |_2   |_3                 |
+-----+-----+-------------------+
|37639|31328|1.0                |
|37639|35839|1.0                |
|37639|29786|1.0                |
|37639|46749|1.0                |
|37639|33564|1.0                |
|26744|40027|1.0                |
|26744|6693 |0.33333333333333326|
|24774|26381|1.0                |
|47815|11318|1.0                |
|47815|6228 |1.0                |
+-----+-----+-------------------+
'''
.map(lambda x: (x[0], (x[1], x[2])))
'''
+-----+------------+
|_1   |_2          |
+-----+------------+
|22472|[37639, 0.5]|
|22472|[29786, 0.5]|
|22472|[46749, 0.5]|
|22472|[4280, 0.5] |
|22472|[33680, 0.5]|
|44786|[39583, 1.0]|
|44786|[9308, 1.0] |
|44786|[45507, 1.0]|
|44786|[842, 1.0]  |
|44786|[18399, 1.0]|
+-----+------------+
'''

2.r2,r3也单独拿出来理解,如下图

 

3.r5单独运行

b=unionRDD.map(lambda x: ((x[0], x[1]), 1))
+----------------+---+
|uid       iid
+----------------+---+
|[1936005, 41040]|  1|
|[2332195, 41270]|  1|
|  [19390, 11907]|  1|
|[3735127, 18399]|  1|
|[3123010, 26381]|  1|
|[1828321, 41270]|  1|
| [1936005, 6693]|  1|
| [2542704, 6693]|  1|
|[3735127, 44786]|  1|
|[2598452, 41040]|  1|
|[1936005, 10249]|  1|
| [2008802, 3139]|  1|
|[2542704, 11318]|  1|
|[2608535, 41270]|  1|
R5 = R4.leftOuterJoin(b)
+----------------+----------------------+
|_1              |_2                    |
+----------------+----------------------+
|[4247864, 26744]|[1.641493899711393, 1]|
|[5709682, 6693] |[0.20518673746392407,]|
|[5709682, 10249]|[0.3077801061958862,] |
|[3735127, 9308] |[6.155602123917724, 1]|
|[2598452, 6693] |[0.20518673746392407,]|
+----------------+----------------------+

4.最后的r7

 到此,就结束了,我脱离代码在纸上上整理了一遍代码思路,字太丑,还是不上传了

原文参考:https://zhuanlan.zhihu.com/p/84095942

数据变了,推荐算法思路和代码基本不变

[pyspark]itemcf协同过滤推荐算法------应用华为比赛数据实现(包含转化为稀疏向量,lsh模型,杰卡德距离)相关推荐

  1. 05-机器学习_(协同过滤推荐算法与应用)---没用

    机器学习算法day03_协同过滤推荐算法及应用 课程大纲 协同过滤推荐算法原理 协同过滤推荐算法概述 协同过滤推荐算法思想 协同过滤推荐算法分析 协同过滤推荐算法要点 协同过滤推荐算法实现 协同过滤推 ...

  2. Java语言开发在线购物推荐网 购物商城推荐系统 基于用户、物品的协同过滤推荐算法 SSM(Spring+SpringMVC+Mybatis)开发框架 大数据、人工智能、机器学习项目开发

    Java语言开发在线购物推荐网 购物商城推荐系统 基于用户.物品的协同过滤推荐算法 SSM(Spring+SpringMVC+Mybatis)开发框架 大数据.人工智能.机器学习项目开发ShopRec ...

  3. Java语言开发在线音乐推荐网 音乐推荐系统 网易云音乐爬虫 基于用户、物品的协同过滤推荐算法 SSM(Spring+SpringMVC+Mybatis)框架 大数据、人工智能、机器学习项目开发

    Java语言开发在线音乐推荐网 音乐推荐系统 网易云音乐爬虫 基于用户.物品的协同过滤推荐算法 SSM(Spring+SpringMVC+Mybatis)框架 大数据.人工智能.机器学习项目开发Mus ...

  4. Python+Django+Mysql个性化图书推荐系统 图书在线推荐系统 基于用户、项目、内容的协同过滤推荐算法(带设计报告)

    Python+Django+Mysql个性化图书推荐系统 图书在线推荐系统 基于用户.项目.内容的协同过滤推荐算法 WebBookRSM.Python python实现协同过滤推荐算法实现 源代码下载 ...

  5. Java语言springboot开发框架实现个性化美食推荐网 在线美食推荐系统 基于用户、物品的协同过滤推荐算法实现

    Java语言springboot开发框架实现个性化美食推荐网 在线美食推荐系统 基于用户.物品的协同过滤推荐算法实现WebFoodRecSystem 一.项目简介 1.开发工具和使用技术 IDEA/E ...

  6. Python+Django+Mysql开发在线美食推荐网 协同过滤推荐算法在美食网站中的运用 基于用户、物品的协同过滤推荐算法 个性化推荐算法、机器学习、分布式大数据、人工智能开发

    Python+Django+Mysql开发在线美食推荐网 协同过滤推荐算法在美食网站中的运用 基于用户.物品的协同过滤推荐算法 个性化推荐算法.机器学习.分布式大数据.人工智能开发 FoodRecom ...

  7. 使用Java+SSM(Spring+SpringMVC+Mybatis)开发在线美食推荐网 美食推荐系统 美食天下美食爬虫 基于用户、物品的协同过滤推荐算法实现 大数据、人工智能、机器学习项目开发

    使用Java+SSM(Spring+SpringMVC+Mybatis)开发在线美食推荐网 美食推荐系统 美食天下美食爬虫 基于用户.物品的协同过滤推荐算法实现 大数据.人工智能.机器学习项目开发Fo ...

  8. 使用Java语言开发在线电影推荐网 电影推荐系统 豆瓣电影爬虫 基于用户、物品的协同过滤推荐算法实现 SSM(Spring+SpringMVC+Mybatis)开发框架 机器学习、人工智能、大数据开发

    使用Java语言开发在线电影推荐网 电影推荐系统 豆瓣电影爬虫 基于用户.物品的协同过滤推荐算法实现 SSM(Spring+SpringMVC+Mybatis)开发框架 机器学习.人工智能.大数据开发 ...

  9. Python+Django+Mysql开发在线购物推荐网 协同过滤推荐算法在购物网站中的运用 个性化推荐算法开发 基于用户、物品的协同过滤推荐算法 机器学习、分布式大数据、人工智能开发

    Python+Django+Mysql开发在线购物推荐网 协同过滤推荐算法在购物网站中的运用 个性化推荐算法开发 基于用户.物品的协同过滤推荐算法 机器学习.分布式大数据.人工智能开发 ShopRec ...

最新文章

  1. 14年阿里巴巴管理经验总监:教你管理7步心法(演讲全文)
  2. java中utilities类_servletutilities属于哪个java包
  3. 事件驱动模式--Reactor
  4. Typescript中使用Axios
  5. One Bug of WatiN?
  6. 二进制逆向工程师_利用Ghidra逆向分析Go二进制程序(下篇)
  7. 排序算法中平均时间复杂度_操作系统中的作业排序(算法,时间复杂度和示例)...
  8. 论文写作——paper Note
  9. mongodb 学习笔记 04 -- 游标、索引
  10. javascript中数组遍历问题
  11. 可以十倍地提高.NET 应用程序的速度集群存储器对象缓存控件NCache
  12. Java多线程:线程同步与关键字synchronized
  13. 牢记公式,ardupilot EKF2就是纸老虎(五)!
  14. 美国大学计算机理论专业phd,专家:美国大学计算机专业PHD申请难度有多大
  15. SEM测试样品减薄及表面复型
  16. bootloader学习笔记---第一篇以stm32为例
  17. 如何用电脑画平面坐标图_在Word中如何画坐标图?
  18. 详细探究一下何为数字孪生技术,它的来源与价值又为何?
  19. Bzoj1758: [Wc2010]重建计划
  20. Wault Finance闪贷攻击溯源

热门文章

  1. EXCEL中快速在下方插入空白行,并且与下方空白单元格合并
  2. 【COMSOL】Marzas 材料模型 C 源文件代码解析
  3. 中望3D 2021 倒圆角
  4. Unity中GPUInstance详解
  5. python安装包报错解决方案
  6. php mysql字符串截取比较读取_MySQL字符串截取 和 截取字符进行查询
  7. 架构师之路工作量化与细化
  8. Fate原理(面试必备)
  9. 20180802 (个别内置方法)
  10. MCU设计之 - 启动模式(Boot0Boot1)