这篇文章算是对这一个月以来天池比赛《CIKM 2019 EComm AI:用户行为预测》的一个总结,我们队伍第一次打这种“海量数据”的比赛,最终成绩能保留在第一页就已经很满意了。

这次比赛的要求是根据用户历史行为预测未来行为,对于特定的用户, 为其推荐最多50个TA未来3天可能会点击的商品,也就是根据用户的历史交互行为、用户基本属性、物品基本属性,来预测测试集内的用户可能点击50个商品,也就是我们一般做推荐系统中,需要得到的那个推荐池。比赛用的数据集非常庞大,交互数据集有23G,大概6亿多条交互数据,用户有400多万,物品有一千多万。

刚开始处理这批数据的时候我的内心是崩溃的,一起不管在处理线上数据,还是我们私下训练各种推荐排序模型的时候,基本数据都没有到达过上亿的级别。这就意味着,可能这次比赛,召回层的作用会尤为重要。(推荐一般分为三个阶段,召回、排序和重排序,对于海量数据来说,召回意味着筛选,将物料池从百万级别降到百级别,这个工作听起来就很吓人了)同时,组委会还给了提示:

GNN这个东西以我们的软硬件实力,十有八九是用不起来了,所以,我们尝试了一下简单的思路,也就是本文的主题,ItemCF算法,而且我们没有优化它,甚至连参数都没有调整,只做了一些效率优化的工作。

我们采用pyspark完成整个数据的处理工作,首先,加载环境:

from pyspark import SparkContext

from pyspark import SparkConf

from pyspark.ml.feature import MinHashLSH

from pyspark.ml.linalg import Vectors

import pyspark.sql.functions as psf

from pyspark.sql import SparkSession

from pyspark.sql.types import FloatType

from pyspark.sql.types import StructType

from pyspark.sql.types import StringType

from pyspark.sql.types import StructField

from pyspark.sql.types import IntegerType

import math

import argparse

import time

import os

Path = "hdfs:///ECommAI/round2/train/"

spark = SparkSession \

.builder \

.appName("ItemCF") \

.master("yarn") \

.config("spark.executor.memory", "20g")\

.config("spark.executor.cores", "4")\

.config("spark.cores.max", "10")\

.config("spark.driver.memory","20g")\

.config("spark.executor.instances","10") \

.config("spark.sql.broadcastTimeout", "36000") \

.config("spark.local.dir", "/hadoop/spark") \

.config("spark.executor.extraJavaOptions", "-Xss80M") \

.getOrCreate()

sc = spark.sparkContext

包括了spark的上下文环境、MinHashLSH(用来加速计算近邻)、和一些基础库。数据在HDFS上。然后,加载数据(我们提前分好了训练集和测试集,但是为了省事就一起训练,得到线上提交结果)。

def PrepareTrainData():

train = spark.read.format("csv").option("delimiter", "\t").option("header", "false").schema(

actionSchema).load("/ECommAI/round2/train/*.csv")

ratingsRDD = train.rdd

return ratingsRDD

def PrepareTestData():

test = spark.read.format("csv").option("delimiter", "\t").option("header", "false").schema(

actionSchema).load("/ECommAI/round2/test/*.csv")

ratingsRDD = test.rdd

return ratingsRDD

train = PrepareTrainData()

test = PrepareTestData()

整合数据,计算物品相似度。训练数据以

的形式拿到,然后换位,聚合,得到

的数据,然后将每个

内的

转换成稀疏向量喂入MinHashLSH内,得到物品相似度矩阵,输出结果为

def get_feature_list_from_partition(iterator):

result = []

for arr in iterator:

arr_tmp = list(set(arr[1][1]))

arr_tmp.sort()

result.append((arr[0], Vectors.sparse(arr[1][0], arr_tmp, [1] * len(arr_tmp))))

return result

unionRDD = train.union(test)

userItemRDD = unionRDD.map(lambda x: (x[0], x[1]))

# 计算物品相似度

# 基于LSH的操作

max_user_id = userItemRDD.map(lambda x: int(x[0])).distinct().max()

itemUserRDD = userItemRDD.map(lambda x: (x[1], [x[0]])).reduceByKey(lambda x, y: x + y)

itemUserRDD = itemUserRDD.map(lambda x: (x[0], [max_user_id + 1, x[1]]))

item_vec_rdd = itemUserRDD.mapPartitions(get_feature_list_from_partition)

item_vec_df = item_vec_rdd.toDF(["item", "features"])

item_vec_df.show()

mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)

model = mh.fit(item_vec_df)

item_sim_df = model.approxSimilarityJoin(item_vec_df, item_vec_df, 0.98, distCol="JaccardDistance") \

.select(psf.col("datasetA.item").alias("idA"),

psf.col("datasetB.item").alias("idB"),

psf.col("JaccardDistance")).orderBy(psf.col("datasetA.item"), psf.col("JaccardDistance"))

item_sim_rdd = item_sim_df.rdd.filter(lambda x: x.idA != x.idB) \

.map(lambda x: (x.idA, x.idB, 1 - x.JaccardDistance))

rdd13 = item_sim_rdd.map(lambda data: ','.join(str(d) for d in data))

进入ItemCF的候选物品计算部分,构造形如

的相似度列表,每个物品获取20个近邻物品。

k = 20

unionRDD = unionRDD.distinct()

topkSim = item_sim_rdd.map(lambda x: (x[0], (x[0], x[1], x[2]))).groupByKey().map(

lambda x: (x[0], list(x[1]))).map(lambda x: (x[0], getTopN(x[1], k))).flatMap(

lambda x: x[1]).map(lambda x: (x[0], (x[1], x[2])))

根据ItemCF公式计算物品得分(此处公式就不写了,在黄老师的新书里,Spark构建协同过滤描述的很清楚)。

R2 = topkSim.join(unionRDD.map(lambda x: (x[1], (x[0], x[2]))))

R3 = R2.map(lambda x: ((x[1][1][0], x[1][0][0]), float(x[1][1][1]) * float(x[1][0][1])))

R4 = R3.reduceByKey(lambda x, y: x + y)

为每个用户,得到50个推荐结果,并存储结果。

N = 50

R5 = R4.leftOuterJoin(unionRDD.map(lambda x: ((x[0], x[1]), 1))).filter(lambda x: x[1][1] == None).map(

lambda x: (x[0][0], (x[0][0], x[0][1], x[1][0])))

R6 = R5.groupByKey().map(lambda x: (x[0], list(x[1])))

R7 = R6.map(lambda x: (x[0], getTopN(x[1], N)))

R9 = R7.map(toCSVLine)

R9.coalesce(1).saveAsTextFile("hdfs:///ECommAI/round2/ItemCF/predict/" + str(int(time.time())))

中间的一些功能性函数:

def PrePareBestK(sc, Path):

K = sc.textFile(Path)

K = int(K.collect()[0])

return K

def find(user, user_list):

for u in user_list:

if u == user:

return True

return False

def takeSecond(elem):

return elem[2]

def getTopN(x, k):

x.sort(key=takeSecond, reverse=True)

x = x[:k]

return x

def toCSVLine(data):

output_str = str(data[0]) + "\t"

return output_str + ','.join([str(d[1]) for d in data[1]])

总的来说,就是一次基于pyspark的ItemCF线下实战,数据集用的天池比赛的,整体运行时间在8个小时左右,结果是0.01569(还在第一页)。

基于传统的推荐算法,可能上限就到这了,想得到更好的结果,可能采用一些基于深度学习的方法可能更好,我们也尝试了基于Word2Vec词向量的itemCF,YoutubeMatchModel、DeepFM方法,甚至还考虑过用关联规则而不是GNN来补充交互数据,不过都是后话了。

java itemcf_大规模电商推荐数据分析-基于ItemCF的召回相关推荐

  1. numpy 矩阵 秩_大规模电商推荐数据分析-基于矩阵分解的召回

    前面两篇文章回顾了我们团队做天池数据比赛<CIKM 2019 EComm AI:用户行为预测>的复赛方案,现在让我们的目光回到初赛.初赛数据组织和复赛一样,只是数据量差异较大.(初赛才2G ...

  2. 基于JAVA跨境电商网站计算机毕业设计源码+数据库+lw文档+系统+部署

    基于JAVA跨境电商网站计算机毕业设计源码+数据库+lw文档+系统+部署 基于JAVA跨境电商网站计算机毕业设计源码+数据库+lw文档+系统+部署 本源码技术栈: 项目架构:B/S架构 开发语言:Ja ...

  3. Java开源生鲜电商平台-Java分布式以及负载均衡架构与设计详解(源码可下载)

    Java开源生鲜电商平台-Java分布式以及负载均衡架构与设计详解(源码可下载) 说明:主要是针对一些中大型的项目需要进行分布式以及负载均衡的架构提一些思路与建议. 面对大量用户访问.高并发请求,海量 ...

  4. 掌握电商数据的4个要点,电商平台数据分析其实很简单

    从近两年爆火的淘宝网红直播再到抖音由社交到电商的转变,不难看出电商行业迎来了又一次升级,从原来的图文时代升级到了直播时代,从原来的以"货"为中心开始转向以"人" ...

  5. JAVA计算机毕业设计电商平台客流统计系统Mybatis+系统+数据库+调试部署

    JAVA计算机毕业设计电商平台客流统计系统Mybatis+系统+数据库+调试部署 JAVA计算机毕业设计电商平台客流统计系统Mybatis+系统+数据库+调试部署 本源码技术栈: 项目架构:B/S架构 ...

  6. 开课吧:电商产品数据分析的核心目标是什么?

    电商的本质是零售,所以在做电商产品数据分析时,始终围绕着"成交"这个核心目标.在这个过程中,会涉及到"人.货.场"三个概念. 人:指流量.用户或成员: 货:商品 ...

  7. [附源码]计算机毕业设计JAVA社区生鲜电商平台

    [附源码]计算机毕业设计JAVA社区生鲜电商平台 项目运行 环境配置: Jdk1.8 + Tomcat7.0 + Mysql + HBuilderX(Webstorm也行)+ Eclispe(Inte ...

  8. 从0到1搭建电商营销数据分析平台(一)

    欢迎关注公众号--<数据三分钟> 一线大厂的师兄师姐结合自己的工作实践,将数据知识浅显道来,每天三分钟,祝你成为数据达人.还有面试指导和内推机会. 电商领域数据是最重要的能源,数据的力量正 ...

  9. Java毕设项目电商平台客流统计系统(java+VUE+Mybatis+Maven+Mysql)

    Java毕设项目电商平台客流统计系统(java+VUE+Mybatis+Maven+Mysql) 项目运行 环境配置: Jdk1.8 + Tomcat8.5 + Mysql + HBuilderX(W ...

最新文章

  1. php判断前端传的多个字段与数据库匹配
  2. expected unqualified-id before numeric constant问题原因
  3. 技术开发中一些名词解释
  4. 拖动卡顿_教你4招,让你的ps永不卡顿
  5. linux终端 美化 git,linux终端美化oh-my-zsh
  6. 【BZOJ】3524: [Poi2014]Couriers
  7. hql懒加载后判断对象是否存在_JPA数据懒加载LAZY和实时加载EAGER(二) - Mr.Simm - 博客园...
  8. html5 抓取网页数据,从网页抓取数据的一般方法
  9. 系统集成项目管理工程师06《项目成本管理》
  10. 梦三国2英霸模式貂蝉攻略(玩大流)
  11. Photoshop从入门到放弃
  12. C# 舒特二次开发采集考勤记录并同步设备时间
  13. luogu 3426题解 (KMP)
  14. 【大数据处理】广州餐饮店铺爬虫并可视化,上传至hdfs
  15. GICv3软件overview手册之GICv3基本功能(3)
  16. 唯智信息:制造业如何在物流4.0时代更好地智能化发展
  17. django + MySQL + flup + Nginx 的一些相关配置文件的备份
  18. 中国企业如何玩转海外媒体推广?
  19. php页面导入excel表格,php页面导入excel表格数据:php导入excel 怎么获取excel表格数据...
  20. 高等数学(第七版)同济大学 习题8-6 个人解答

热门文章

  1. 皮一皮:顶尖黑客技术,10秒教学,不会你打我!
  2. 每日一皮:在同一个项目上工作2年的样子...
  3. 每日一皮:用户永远不知道怎么用我们的产品...
  4. 每日一皮:地铁上打瞌睡的程序员...
  5. 为什么Java进程使用的RAM比Heap Size大?
  6. 通宵加班、猝死频发,但仍建议你不要轻易买保险
  7. python协程学习——写个并发获取网站标题的工具
  8. linux 并行计算命令,Linux下的并行神器——parallel
  9. python类的函数_python 类函数
  10. 给帝国cms7.5后台文章编辑器ckeditor增加一个行距的功能插件