项目背景与需求

DMP项目介绍

随着互联网的发展,互联网的广告推送逐渐诞生了AdNetwork(广告网络)、AdExchange和DSP 需求方平台。

互联网的广告资源非常丰富,除了微博,微信这样的大平台外,很多小媒体也能够提供大量优质的广告位。大量的小媒体组成了一个很大的盘子,于是有AdNetwork平台为对接广告主和媒体,为广告主提供统一的界面,联络多家媒体, 行成统一的定价。

AdNetwork 不止一家,小媒体们会选择不同的 AdNetwork,很多广告主依然面临选择困难。很多优质媒体由于不满意AdNetwork的定价策略未加入,于是产生了AdExchange广告交易平台试图统一AdNetwork

AdExchange相对于AdNetwork采用了实时的交易定价,类似于股票的撮合交易。大体流程是媒体发起广告请求给 RTB 系统, 请求广告进行展示,广告主根据自己需求决定是否竞价, 以及自己的出价。会有多个广告主同时出价, 价高者得。

AdExchangeRTB 让广告的展示价格更透明更公平,让媒体得到了最大化的广告费。但是对于广告主来说,只关心能否让合适的人看到了这些广告,同等价格内广告效果能否更好。于是DSP (需求方平台)应运而生, 主要负责和 AdExchange 交互辅助广告主进行实时竞价。

有了DSP 平台,广告主就可以针对自己想要的目标受众投放广告了。 比如说, 一个广告主是卖化妆品的, 他可以设置投放广告的标签为 20 岁上下, 女性, 时尚人士。DMP, 全称 Data Management Platform,即数据管理平台,就管理了用户的标签数据。

DMP 主要负责收集用户数据并为用户打上标签,DSP 主要通过标签筛选用户,所以需要为用户数据打上标签,这个也可以称为用户画像。

中国程序化广告生态图

DMP的主要任务

DMP的数据主要来自于:

数据来自于以往的竞价记录(以往的交易)和收集到的用户数据(第三方或自己收集)。

这里有一份pmt.json的样例数据用于测试,pmt.json是一个JSON Line 文件,共三千条数据,每一条数据都是一个独立的 JSON 字符串,共90个字段:

本文只演示简单的打标签和统一用户识别的需求,所需要用到的字段含义如下:

字段 解释
adspacetype 广告位类型,1 : Banner2 : 插屏,3 : 全屏
channelid 频道 ID
keywords 关键字
sex 用户性别
age 用户年龄
imei 手机串码
imeimd5 IMEIMD5
imeisha1 IMEISHA-1
mac 手机 MAC 地址
macmd5 MACMD5
macsha1 MACSHA-1
openudid 苹果设备的识别码
openudidmd5 OpenUDIDMD5
openudidsha1 OpenUDIDSHA-1
idfa 手机 APP 的广告码
idfamd5 IDFAMD5
idfasha1 IDFASHA-1

需求说明

首先,我们根据adspacetype,channelid,keywords,sex,age打标签,然后对后续的唯一标识字段进行合并。例如某用户在一个时间点上, 汇报了 mac, udid, 另外一个时间点汇报了 mac, uuid,这两条数据实际就归属于同一个用户,需要对这些数据进行合并。

最终结果,标签按照一定规范使用一个JSON字符串存储在一个字段中;取第一个有效ID作为这个用户的ID。

前置知识已经放在前面的文章里:

PySpark与GraphFrames的安装与使用
https://xxmdmst.blog.csdn.net/article/details/123009617

networkx快速解决连通图问题
https://xxmdmst.blog.csdn.net/article/details/123012333

PySpark求解连通图问题
https://xxmdmst.blog.csdn.net/article/details/123036398

开始编码

下面我们回到广告项目的数据处理问题,pmt.json数据集下载:https://pan.baidu.com/s/1_qx2nc2r3H5eQiSfnUxx3Q?pwd=xhzw

PySpark解决需求

首先创建spark会话对象,启动本地伪分布式集群:

from pyspark.sql import SparkSession, Row
from graphframes import GraphFramespark = SparkSession \.builder \.appName("PySpark") \.master("local[*]") \.getOrCreate()
sc = spark.sparkContext
sc.setCheckpointDir("checkpoint")
spark

读取数据并筛选需要的列:

data = spark.read.json("pmt.json")
data = data.select(["imei", "imeimd5", "imeisha1", "mac", "macmd5","macsha1", "openudid", "openudidmd5", "openudidsha1","idfa", "idfamd5", "idfasha1", "adspacetype","channelid", "keywords", "sex", "age"
])
data.take(2)

输出:

[Row(imei='', imeimd5='', imeisha1='', mac='52:54:00:b0:e5:04', macmd5='', macsha1='', openudid='ILHOYMFMOKEHZRTRCCWRMMDRALMCNAXYAYIFPHFN', openudidmd5='', openudidsha1='', idfa='', idfamd5='', idfasha1='', adspacetype=2, channelid='123510', keywords='赵立新,黄璐,梁静,老炮儿,盲山', sex='1', age='38'),Row(imei='', imeimd5='', imeisha1='', mac='', macmd5='', macsha1='', openudid='YGWVDAVUXMFWJUDMHAOWAJUGQCJRAXYOXLHSHSGE', openudidmd5='', openudidsha1='', idfa='JMLQQLMQGHKKWDJHDUQLLIJTAJSDRIPT', idfamd5='', idfasha1='', adspacetype=2, channelid='123471', keywords='美文', sex='1', age='49')]

根据前面的思路,我们首先需要对每一行数据生成一个唯一ID,再对广告位类型,频道 ID,关键字、性别和年龄等字段进行打标签,其他字段作为主键:

keyList = ["imei", "imeimd5", "imeisha1", "mac","macmd5", "macsha1", "openudid", "openudidmd5","openudidsha1",  "idfa", "idfamd5", "idfasha1"]def createTags(pair):row, rid = pairtags = [f"AD{row.adspacetype}", f"CH{row.channelid}",f"GD{row.sex}", f"AG{row.age}"]for key in row.keywords.split(","):tags.append(f"KW{key}")values = []for key in keyList:values.append(row[key])values.append(tags)values.append(rid)return valuestagsRDD = data.rdd.zipWithUniqueId().map(createTags)
print(tagsRDD.take(2))
[['', '', '', '52:54:00:b0:e5:04', '', '', 'ILHOYMFMOKEHZRTRCCWRMMDRALMCNAXYAYIFPHFN', '', '', '', '', '', ['AD2', 'CH123510', 'GD1', 'AG38', 'KW赵立新', 'KW黄璐', 'KW梁静', 'KW老炮儿', 'KW盲山'], 0], ['', '', '', '', '', '', 'YGWVDAVUXMFWJUDMHAOWAJUGQCJRAXYOXLHSHSGE', '', '', 'JMLQQLMQGHKKWDJHDUQLLIJTAJSDRIPT', '', '', ['AD2', 'CH123471', 'GD1', 'AG49', 'KW美文'], 2]]

将其转换成DataFrame并缓存起来:

tagsDF = tagsRDD.toDF(keyList+["tags", "rid"])
tagsDF.cache()
tagsDF.checkpoint()
tagsDF.take(2)
[Row(imei='', imeimd5='', imeisha1='', mac='52:54:00:b0:e5:04', macmd5='', macsha1='', openudid='ILHOYMFMOKEHZRTRCCWRMMDRALMCNAXYAYIFPHFN', openudidmd5='', openudidsha1='', idfa='', idfamd5='', idfasha1='', tags=['AD2', 'CH123510', 'GD1', 'AG38', 'KW赵立新', 'KW黄璐', 'KW梁静', 'KW老炮儿', 'KW盲山'], rid=0),Row(imei='', imeimd5='', imeisha1='', mac='', macmd5='', macsha1='', openudid='YGWVDAVUXMFWJUDMHAOWAJUGQCJRAXYOXLHSHSGE', openudidmd5='', openudidsha1='', idfa='JMLQQLMQGHKKWDJHDUQLLIJTAJSDRIPT', idfamd5='', idfasha1='', tags=['AD2', 'CH123471', 'GD1', 'AG49', 'KW美文'], rid=2)]

下面我们对所有唯一标识字段计算关联边并合并:

import pandas as pddef func(pdf):ids = pdf.ridn = len(ids)if n <= 1:return pd.DataFrame(columns=["src", "dst"])edges = []for i in range(n-1):for j in range(i+1, n):edges.append((ids[i], ids[j]))return pd.DataFrame(edges)edge_rdds = []
for key in keyList:edge = tagsDF.filter(tagsDF[key] != "") \.select(key, "rid") \.groupBy(key) \.applyInPandas(func, "src int,dst int")edge_rdds.append(edge.rdd)
edgesDF = sc.union(edge_rdds).toDF(["src", "dst"])
edgesDF.cache()
edgesDF.checkpoint()
edgesDF.show(5)

结果:

+----+----+
| src| dst|
+----+----+
|1538|1540|
|1538|2214|
|1538|2790|
|1538|4054|
|1538|1223|
+----+----+
only showing top 5 rows

然后获取顶点:

vertices = tagsDF.select(tagsDF.rid.alias("id"))
vertices.show(5)
+---+
| id|
+---+
|  0|
|  2|
|  4|
|  6|
|  8|
+---+
only showing top 5 rows

开始计算连通图:

gdf = GraphFrame(vertices, edgesDF)
components = gdf.connectedComponents()
components.show()
+---+---------+
| id|component|
+---+---------+
|  0|        0|
|  2|        2|
|  4|        4|
|  6|        0|
|  8|        8|
| 10|       10|
| 12|       12|
| 14|       14|
| 16|       16|
| 18|       18|
| 20|       20|
| 22|       22|
| 24|       24|
| 26|       26|
| 28|       28|
| 30|       30|
| 32|       32|
| 34|       34|
| 36|       36|
| 38|        8|
+---+---------+

然后表连接,获取每一行数据对应的唯一用户id标识component:

result = tagsDF.join(components, on=tagsDF.rid == components.id)
result.cache()
result.take(2)
[Row(imei='', imeimd5='', imeisha1='', mac='52:54:00:b0:e5:04', macmd5='', macsha1='', openudid='ILHOYMFMOKEHZRTRCCWRMMDRALMCNAXYAYIFPHFN', openudidmd5='', openudidsha1='', idfa='', idfamd5='', idfasha1='', tags=['AD2', 'CH123510', 'GD1', 'AG38', 'KW赵立新', 'KW黄璐', 'KW梁静', 'KW老炮儿', 'KW盲山'], rid=0, id=0, component=0),Row(imei='', imeimd5='', imeisha1='', mac='', macmd5='', macsha1='', openudid='YGWVDAVUXMFWJUDMHAOWAJUGQCJRAXYOXLHSHSGE', openudidmd5='', openudidsha1='', idfa='JMLQQLMQGHKKWDJHDUQLLIJTAJSDRIPT', idfamd5='', idfasha1='', tags=['AD2', 'CH123471', 'GD1', 'AG49', 'KW美文'], rid=2, id=2, component=2)]

接下来在对每个用户聚合:

from collections import Counter
import jsondef combineTags(pdf):mainID = Nonefor key in keyList:v = pdf[key].bfill().iat[0]if v:mainID = vbreakc = Counter()for tags in pdf.tags.values:for tag in tags:c[tag] += 1return pd.DataFrame([(mainID, json.dumps(c, ensure_ascii=False))])result2 = result.groupBy("component").applyInPandas(combineTags, schema="mainID string, tags string"
)
result2.show()
+--------------------+--------------------+
|              mainID|                tags|
+--------------------+--------------------+
|   52:54:00:b0:e5:04|{"AD2": 2, "CH123...|
|   52:54:00:83:f5:9a|{"AD3": 3, "CH123...|
|YGWVDAVUXMFWJUDMH...|{"AD2": 1, "CH123...|
|   52:54:00:04:7c:4b|{"AD1": 3, "CH123...|
|   52:54:00:3d:65:db|{"AD3": 2, "CH123...|
|   52:54:00:c4:e6:e7|{"AD3": 3, "CH123...|
|     070264565409917|{"AD1": 2, "CH123...|
|     945658800321180|{"AD1": 1, "CH123...|
|     077683806650676|{"AD1": 2, "CH123...|
|   52:54:00:7a:8b:19|{"AD1": 7, "CH123...|
|   52:54:00:cd:e2:c1|{"AD2": 1, "CH123...|
|   52:54:00:ef:8a:18|{"AD2": 1, "CH123...|
|IPCACDLYSSJUSBKPZ...|{"AD1": 1, "CH123...|
|   52:54:00:1a:e1:69|{"AD2": 4, "CH123...|
|HQPEUGDMONXDFCVIU...|{"AD3": 3, "CH123...|
|   52:54:00:50:12:35|{"AD1": 3, "CH123...|
|   52:54:00:f5:5f:75|{"AD2": 1, "CH123...|
|   52:54:00:4e:82:9a|{"AD1": 4, "CH123...|
|   52:54:00:0c:e1:84|{"AD1": 3, "CH123...|
|   52:54:00:16:29:00|{"AD3": 1, "CH123...|
+--------------------+--------------------+
only showing top 20 rows

可以看到,这样就顺利的计算出了我们需要的结果。

最终完整代码:

from pyspark.sql import SparkSession, Row
from graphframes import GraphFrame
import pandas as pd
from collections import Counter
import jsonspark = SparkSession \.builder \.appName("PySpark") \.master("local[*]") \.getOrCreate()
sc = spark.sparkContext
sc.setCheckpointDir("checkpoint")data = spark.read.json("pmt.json")
data = data.select(["imei", "imeimd5", "imeisha1", "mac", "macmd5","macsha1", "openudid", "openudidmd5", "openudidsha1","idfa", "idfamd5", "idfasha1", "adspacetype","channelid", "keywords", "sex", "age"
])
keyList = ["imei", "imeimd5", "imeisha1", "mac","macmd5", "macsha1", "openudid", "openudidmd5","openudidsha1",  "idfa", "idfamd5", "idfasha1"]def createTags(pair):row, rid = pairtags = [f"AD{row.adspacetype}", f"CH{row.channelid}",f"GD{row.sex}", f"AG{row.age}"]for key in row.keywords.split(","):tags.append(f"KW{key}")values = []for key in keyList:values.append(row[key])values.append(tags)values.append(rid)return valuestagsRDD = data.rdd.zipWithUniqueId().map(createTags)
tagsDF = tagsRDD.toDF(keyList+["tags", "rid"])
tagsDF.cache()
tagsDF.checkpoint()def createEdges(pdf):ids = pdf.ridn = len(ids)if n <= 1:return pd.DataFrame(columns=["src", "dst"])edges = []for i in range(n-1):for j in range(i+1, n):edges.append((ids[i], ids[j]))return pd.DataFrame(edges)edge_rdds = []
for key in keyList:edge = tagsDF.filter(tagsDF[key] != "") \.select(key, "rid") \.groupBy(key) \.applyInPandas(createEdges, "src int,dst int")edge_rdds.append(edge.rdd)
edgesDF = sc.union(edge_rdds).toDF(["src", "dst"])
vertices = tagsDF.select(tagsDF.rid.alias("id"))gdf = GraphFrame(vertices, edgesDF)
components = gdf.connectedComponents()
tmp = tagsDF.join(components, on=tagsDF.rid == components.id)def mapTags(pdf):mainID = Nonefor key in keyList:v = pdf[key].bfill().iat[0]if v:mainID = vbreakc = Counter()for tags in pdf.tags.values:for tag in tags:c[tag] += 1return pd.DataFrame([(mainID, json.dumps(c, ensure_ascii=False))])result = tmp.groupBy("component").applyInPandas(mapTags, schema="mainID string, tags string"
)
result = result.toPandas()
result

可以看到耗时1分32秒。

纯Python解决需求

可以看到使用分布式后,即使是伪分布式也是如此的慢,如果数据量是单机内存可以装的下的使用纯Python可以快无数倍,下面展示本需求的完整解决代码:

import json
import pandas as pd
import networkx as nx
from collections import Counter
import jsonwith open("pmt.json", encoding="u8") as f:data = [json.loads(row) for row in f]
data = pd.DataFrame(data)[["imei", "imeimd5", "imeisha1", "mac", "macmd5","macsha1", "openudid", "openudidmd5", "openudidsha1","idfa", "idfamd5", "idfasha1", "adspacetype","channelid", "keywords", "sex", "age"
]]
data.replace("",pd.NA,inplace=True)g = nx.Graph()
keyList = ["imei", "imeimd5", "imeisha1", "mac","macmd5", "macsha1", "openudid", "openudidmd5","openudidsha1",  "idfa", "idfamd5", "idfasha1"]
g.add_nodes_from(data.index)
for c in keyList:for ids in data.index.groupby(data[c]).values():n = len(ids)if n == 1:continuefor i in range(n-1):for j in range(i+1, n):g.add_edge(ids[i], ids[j])def combineTags(df):mainID = Nonefor key in keyList:v = df[key].bfill().iat[0]if not pd.isna(v):mainID = vbreakc = Counter()for v in df["adspacetype"].values:c[f"AD{v}"] += 1for v in df["channelid"].values:c[f"CH{v}"] += 1for keywords in df.keywords.values:for key in keywords:c[f"KW{key}"]+=1for v in df["sex"].values:c[f"GD{v}"] += 1for v in df["age"].values:c[f"AG{v}"] += 1return (mainID, json.dumps(c, ensure_ascii=False))result = []
for sub_g in nx.connected_components(g):sub_g = g.subgraph(sub_g)g_node = sub_g.nodes()ids = list(g_node)df_split = data.iloc[ids]result.append(combineTags(df_split))
result = pd.DataFrame(result, columns=["mainID", "tags"])
result

可以看到耗时仅2秒钟。

DMP数据处理之统一用户识别相关推荐

  1. Spring Cloud Alibaba 统一门户:基于网关的统一用户认证方案

    本讲咱们涉及以下三方面内容: 传统的用户认证方案: JWT 与 JJWT: 基于网关的统一用户认证. 传统的用户认证方案 我们直奔主题,什么是用户认证呢?对于大多数与用户相关的操作,软件系统首先要确认 ...

  2. 统一用户认证和单点登录和授权的原理与流程

    统一用户认证和单点登录和授权的原理与流程 1 前言 2 介绍 2.1 统一用户认证 2.2 单点登录 2.3 授权 3 原理 3.1 统一用户认证原理 3.2 单点登录原理 3.3 OAuth授权原理 ...

  3. rstudio拉格朗日插值法_电力窃漏电用户识别案例

    一.案例综述 案例编号:102003 案例名称:电力.热力.燃气及水生产和供应业--电力窃漏电用户识别 作者姓名(或单位.或来源):朱江 案例所属行业:D442 电力供应 案例所用软件:R 案例包含知 ...

  4. 统一用户单点登录系统

    see also:http://www.ibmtech.com.cn/dandian.html 统一用户单点登录的基本原理 一般来说,每个应用系统都拥有独立的用户信息管理功能,用户信息的格式.命名与存 ...

  5. 多平台统一用户系统设计

    0x00 引言 现在越来越多的产品都实现了在不同平台上的功能支持.比如原来的 App 为了微信上的流量,开发了小程序.原来只做微信公众号的,后来为了更好的体验开发了 App 等等.这里面临用户账号迁移 ...

  6. 统一用户认证和单点登录(SSO)解决方案

    本文以某新闻单位多媒体数据库系统为例,提出建立企业用户认证中心,实现基于安全策略的统一用户管理.认证和单点登录,解决用户在同时使用多个应用系统时所遇到的重复登录问题. 随着信息技术和网络技术的迅猛发展 ...

  7. 服务器多账户管理系统,统一用户管理解决方案

    一.方案概述 日常工作中,使用人员在使用各应用系统时都要做登录操作,需要记住每个系统的登录口令:对于系统管理员,人员的新增.调离或退休都需要到各个系统中都要进行重新配置.若操行不及时或遗漏,就会带来系 ...

  8. 统一用户及权限管理系统

    欢迎来到 MSDN > 白皮书与解决方案建议 > 经典案例研究 统一用户及权限管理系统 发布日期: 2004-09-08 解决方案概述 用户档案 行业: 政府 业务对象:徐汇区政府 商业背 ...

  9. 用户识别率提升 25 倍 | 看神策数据如何利用 ID-Mapping 激活全域营销

    ​​​​​​​ 数字媒介的多元化.消费者触媒的无序化.信息爆炸对传播有效性的消减,使企业在营销增长和品牌心智经营上备受挑战.其中,企服行业因客户多角色共同决策.决策周期长.线索流程管理复杂.续约续费不 ...

  10. Confluence与Jira整合之统一用户管理

    http://leign.iteye.com/blog/566079 Confluence与Jira整合之统一用户管理 博客分类: JAVA 配置管理MySQLHSQLDBWebXML 说到Confl ...

最新文章

  1. 撤销 git commit
  2. Delphi XE2 之 FireMonkey 入门(13) - 动画(下)
  3. 10篇写给Git初学者的最佳教程
  4. topsis综合评价法_DRG如何评价临床医师绩效——以肿瘤科为例
  5. 【Codeforces 1421 D】Hexagons,贪心,模拟
  6. NYOJ 214(二分插入)
  7. 经典中的博弈:第一章 C++的Hello,World!
  8. 六石管理学:提出分形进度的概念
  9. 超神能力:云库局面分析
  10. ansys轴对称模型之二维模型
  11. 小米路由器 内核 linux,小米路由器配置ssh登入方法教程
  12. oracle11g磁盘阵列 pdf,融会贯通 从Oracle11g到SQL Server2008 中文PDF版 16.7MB
  13. ClickHouse 来自战斗民族的OLAP利器
  14. 山水印|竹林野茶:它,被称为茶叶中瑰宝,不止抗癌那么简单
  15. linux jboss的安装路径,LINUX下JBOSS的安装及配置
  16. 大数据应用常见的6种商业模式
  17. 首家优秀型厂商,百度智能云智能对话通过信通院权威评测!
  18. 现控报告-- 分析倒立摆系统稳定性、能控性及能观性分析,设计PID控制方案(附matlab)
  19. AVR mega48 ISP下载及熔丝位修正
  20. Springboot使用Maven项目使用 Profiles和Spring Profile进行多环境配置 动态激活指定

热门文章

  1. SAS逻辑回归之多分类
  2. ibm cloud怎么使用_使用IBM Cloud中的业务规则服务构建酒店预订应用程序
  3. 虚幻4渲染编程(环境模拟篇)【第二卷:体积云天空模拟(2)---3D体纹理低云】...
  4. 自己封装一个v-model指令
  5. 搭建Hadoop高可用集群
  6. 炫酷的时间HTML页面,炫酷css3垂直时间轴特效
  7. solidity投票(ballot)合约
  8. 京东店铺所有商品API接口(JD整店商品查询API接口)
  9. 【工作规划】未来自我学习计划及发展注意事项
  10. umi hooks里的mutate使用方法