前文回顾:

PySpark与GraphFrames的安装与使用
https://xxmdmst.blog.csdn.net/article/details/123009617

networkx快速解决连通图问题
https://xxmdmst.blog.csdn.net/article/details/123012333

前面我讲解了PySpark图计算库的使用以及纯python解决连通图问题的两个示例。这篇文章我们继续对上次的连通图问题改用PySpark实现。

需求1:找社区

刘备和关羽有关系,说明他们是一个社区,刘备和张飞也有关系,那么刘备、关羽、张飞归为一个社区,以此类推。

对于这个连通图问题使用Pyspark如何解决呢?

首先,我们创建spark对象:

from pyspark.sql import SparkSession, Row
from graphframes import GraphFramespark = SparkSession \.builder \.appName("PySpark") \.master("local[*]") \.getOrCreate()
sc = spark.sparkContext
# 设置检查点目录
sc.setCheckpointDir("checkpoint")

然后构建数据:

data = [['刘备', '关羽'],['刘备', '张飞'],['张飞', '诸葛亮'],['曹操', '司马懿'],['司马懿', '张辽'],['曹操', '曹丕']
]
data = spark.createDataFrame(data, ["人员", "相关人员"])
data.show()
+------+--------+
|  人员|相关人员|
+------+--------+
|  刘备|    关羽|
|  刘备|    张飞|
|  张飞|  诸葛亮|
|  曹操|  司马懿|
|司马懿|    张辽|
|  曹操|    曹丕|
+------+--------+

很明显原始数据就是图计算所要求的边数据,只修改一下列名即可:

edges = data.toDF("src", "dst")
edges.printSchema()
root|-- src: string (nullable = true)|-- dst: string (nullable = true)

下面我们开始构建顶点数据:

vertices = (edges.rdd.flatMap(lambda x: x).distinct().map(lambda x: Row(x)).toDF(["id"])
)
vertices.show()
+------+
|    id|
+------+
|诸葛亮|
|  刘备|
|  曹操|
|司马懿|
|  曹丕|
|  关羽|
|  张飞|
|  张辽|
+------+

下面使用spark的图计算 计算连通图:

g = GraphFrame(vertices, edges)
result = g.connectedComponents().orderBy("component")
result.show()
+------+------------+
|    id|   component|
+------+------------+
|司马懿|           0|
|  张辽|           0|
|  曹丕|           0|
|  曹操|           0|
|  关羽|635655159808|
|  刘备|635655159808|
|  张飞|635655159808|
|诸葛亮|635655159808|
+------+------------+

可以看到结果中已经顺利将一个社区的成员通过一个相同的component标识出来,成功解决需求。

需求2:统一用户识别

abcde这5个字段表示mac地址,ip地址,device_id,imei等唯一标识,tags表示用户的标签。由于某些原因,同一用户的唯一标识字段总是有几个字段存在缺失,现在要求将同一个用户的数据都能识别出来,同时将每个用户的标签进行合并。原始数据和结果模型示例如下:

首先,我们构建数据:

df = spark.createDataFrame([['a1', None, 'c1', None, None, 'tag1'],[None, None, 'c1', 'd1', None, 'tag2'],[None, 'b1', None, 'd1', None, 'tag3'],[None, 'b1', 'c1', 'd1', 'e1', 'tag4'],['a2', 'b2', None, None, None, 'tag1'],[None, 'b4', 'c4', None, 'e4', 'tag1'],['a2', None, None, 'd2', None, 'tag2'],[None, None, 'c2', 'd2', None, 'tag3'],[None, 'b3', None, None, 'e3', 'tag1'],[None, None, 'c3', None, 'e3', 'tag2'],
], list("abcde")+["tags"])
df.show()

结果:

+----+----+----+----+----+----+
|   a|   b|   c|   d|   e|tags|
+----+----+----+----+----+----+
|  a1|null|  c1|null|null|tag1|
|null|null|  c1|  d1|null|tag2|
|null|  b1|null|  d1|null|tag3|
|null|  b1|  c1|  d1|  e1|tag4|
|  a2|  b2|null|null|null|tag1|
|null|  b4|  c4|null|  e4|tag1|
|  a2|null|null|  d2|null|tag2|
|null|null|  c2|  d2|null|tag3|
|null|  b3|null|null|  e3|tag1|
|null|null|  c3|null|  e3|tag2|
+----+----+----+----+----+----+

接下来的思路依然跟上次一样,首先为每一行数据分配一个唯一id,然后对每个唯一标识的列,根据是否一样构建行与行之间的连接关系,所有的唯一标识列产生的连接关系共同作为图计算的边。

下面使用RDD的zipWithUniqueId方法为每一行产生一个唯一ID,并将这个ID移动到最前(由于这个数据后面可能会多次被频繁使用所以缓存起来):

tmp = df.rdd.zipWithUniqueId().map(lambda x: (x[1], x[0]))
tmp.cache()
tmp.first()
(0, Row(a='a1', b=None, c='c1', d=None, e=None, tags='tag1'))

根据唯一id构建顶点数据:

vertices = tmp.map(lambda x: Row(x[0])).toDF(["id"])
vertices.show()
+---+
| id|
+---+
|  0|
|  1|
|  7|
|  2|
|  8|
|  3|
|  4|
| 10|
|  5|
| 11|
+---+

接下来,构建边数据:

def func(p):for k, ids in p:ids = list(ids)n = len(ids)if n <= 1:continuefor i in range(n-1):for j in range(i+1, n):yield (ids[i], ids[j])edges = []
keylist = list("abcde")
for key in keylist:data = tmp.mapPartitions(lambda area: [(row[key], i) for i, row in area if row[key]])edgeRDD = data.groupByKey().mapPartitions(func)edges.append(edgeRDD)
edgesDF = sc.union(edges).toDF(["src", "dst"])
edgesDF.cache()
edgesDF.show()
+---+---+
|src|dst|
+---+---+
|  8|  4|
|  7|  2|
|  0|  1|
|  0|  2|
|  1|  2|
|  4| 10|
|  1|  7|
|  1|  2|
|  7|  2|
|  5| 11|
+---+---+

可以看到所有的行号关系已经被成功获取。

下面使用图计算 计算出属于同一用户的行:

gdf = GraphFrame(vertices, edgesDF)
components = gdf.connectedComponents()
components.show()
+---+---------+
| id|component|
+---+---------+
|  0|        0|
|  1|        0|
|  7|        0|
|  2|        0|
|  8|        4|
|  3|        3|
|  4|        4|
| 10|        4|
|  5|        5|
| 11|        5|
+---+---------+

有了行号和所归属的组唯一标识,我们可以通过表连接获取原始数据的每一行所归属的component:

result = tmp.cogroup(components.rdd) \.map(lambda pair: pair[1][0].data[0] + Row(pair[1][1].data[0])) \.toDF(df.schema.names+["component"])
result.cache()
result.show()
+----+----+----+----+----+----+---------+
|   a|   b|   c|   d|   e|tags|component|
+----+----+----+----+----+----+---------+
|  a1|null|  c1|null|null|tag1|        0|
|null|null|  c1|  d1|null|tag2|        0|
|null|  b1|  c1|  d1|  e1|tag4|        0|
|null|  b4|  c4|null|  e4|tag1|        3|
|  a2|null|null|  d2|null|tag2|        4|
|null|  b3|null|null|  e3|tag1|        5|
|null|  b1|null|  d1|null|tag3|        0|
|  a2|  b2|null|null|null|tag1|        4|
|null|null|  c2|  d2|null|tag3|        4|
|null|null|  c3|null|  e3|tag2|        5|
+----+----+----+----+----+----+---------+

可以看到我们已经成功的进行同一用户识别了,剩下的只需要分组并使用pandas的逻辑合并数据:

def func(pdf):row = pdf[keylist].bfill().head(1)row["tags"] = pdf.tags.str.cat(sep=",")return rowresult.groupBy("component").applyInPandas(func, schema="a string, b string, c string, d string, e string, tags string"
).show()
+----+---+---+----+----+-------------------+
|   a|  b|  c|   d|   e|               tags|
+----+---+---+----+----+-------------------+
|  a1| b1| c1|  d1|  e1|tag1,tag2,tag4,tag3|
|null| b4| c4|null|  e4|               tag1|
|  a2| b2| c2|  d2|null|     tag2,tag1,tag3|
|null| b3| c3|null|  e3|          tag1,tag2|
+----+---+---+----+----+-------------------+

可以看到已经顺利得到需要的结果。

注意:applyInPandas要求返回的结果必须是pandas的datafream对象,所以相对之前的逻辑由.iloc[0]改成了.head(1)

如果你的spark不是3.X版本,没有applyInPandas方法,用原生rdd的方法则会麻烦很多:

def func(pair):component, rows = pairkeyList = list("abcde")ids = {}for row in rows:for key in keylist:v = getattr(row, key)if v:ids[key] = vids.setdefault("tags", []).append(row.tags)result = []for key in keylist:result.append(ids.get(key))result.append(",".join(ids["tags"]))return resultresult2 = result.rdd.groupBy(lambda row: row.component).map(func).toDF(df.schema)
result2.cache()
result2.show()

结果也一样:

+----+---+---+----+----+-------------------+
|   a|  b|  c|   d|   e|               tags|
+----+---+---+----+----+-------------------+
|  a1| b1| c1|  d1|  e1|tag1,tag2,tag4,tag3|
|null| b4| c4|null|  e4|               tag1|
|  a2| b2| c2|  d2|null|     tag2,tag1,tag3|
|null| b3| c3|null|  e3|          tag1,tag2|
+----+---+---+----+----+-------------------+

PySpark求解连通图问题相关推荐

  1. networkx快速解决连通图问题

    需求1:找社区 有一份数据部分如下,比如:刘备和关羽有关系,说明他们是一个团伙,刘备和张飞也有关系,那么刘备.关羽.张飞归为一个社区,以此类推. 在python中这是典型的查找连通图的问题,直接的思路 ...

  2. paper survey之——多机器人协作介绍(Multi-Robot System, MRS)

    多机器人系统通过交流协作和分享信息改进了单个机器人的性能,如任务执行效率.健壮性.灵活性和容错性,同时涵盖了分布式决策,编队控制.区域覆盖及其相关应用. 本博文根据主要是对目前多机器人相关的任务和pa ...

  3. 双连通图强连通图概念解释以及tarjan算法求解该类问题总结

    最近看了看类的相关题,感觉简单的题过于模板,但是对于难题的转化,如果对与这方面的概念不清楚,很难写,故总结一下. PS:博客里部分内容会和离散数学中的图论知识有联系,如果没有了解过相关知识可能比较难理 ...

  4. 生成树算法求解网络容量最大连通图

    生成树算法求解网络容量最大连通图 问题描述 图相关概念 最大生成树prim算法 问题描述 图一 单波传输容量 最大传输距离 总容量 100 Gb/s 3000 km 8 Tb/s 200 Gb/s 1 ...

  5. 三十七、Prim算法--求解最小生成树

    一.Prim算法介绍 普利姆(Prim)算法求最小生成树,也就是在包含 n 个顶点的连通图中,找出只有(n-1)条边包含所有 n 个顶点的 连通子图,也就是所谓的极小连通子图 普利姆的算法如下: 设 ...

  6. 基于Warshall算法的连通图及欧拉图判定方法

    1736年欧拉解决了哥尼斯堡七桥问题.他在这一具体问题的基础上进一步研究,最终找到了一个简便的原则可以鉴别一个图(多重图)能否一笔画成. 本文中,笔者使用布尔矩阵来存储一个无向图,并结合集合论中&qu ...

  7. [Spark]PySpark入门学习教程---介绍(1)

    一 安装指引 (91条消息) [Hadoop] mac搭建hadoop3.X 伪分布模式_小墨鱼的专栏-CSDN博客https://zengwenqi.blog.csdn.net/article/de ...

  8. 交替最小二乘矩阵分解_使用交替最小二乘矩阵分解与pyspark建立推荐系统

    交替最小二乘矩阵分解 pyspark上的动手推荐系统 (Hands-on recommender system on pyspark) Recommender System is an informa ...

  9. 贪婪算法在求解最短路径中的应用(JAVA)--Dijkstra算法

    最短路径问题最经典的算法就是Dijkstra算法,虽然不如Floyd算法能够求全源的最短路径,但是在效率上明显强于Floyd算法. 想了解Floyd算法的读者可以参考动态规划在求解全源最短路径中的应用 ...

最新文章

  1. laravel php跨域请求,laravel开发中跨域的解决方案
  2. Mongo使用navicat解除14天限制
  3. c语言树莓派音乐播放器,使用web端来控制我的树莓派播放音乐
  4. routing zuul_金三银四跳槽季快到了:送上Spring cloud全家桶系列之Zuul
  5. 塑料浮船坞行业调研报告 - 市场现状分析与发展前景预测
  6. php购物车paypal代码,PayPal购物车HTML代码
  7. Navicat for MySQ破译版
  8. java获取行政区划编码(省市区县居委5级)
  9. Session的活化与钝化
  10. uniapp截取部分区域
  11. 斯皮尔曼相关系数范围_Spearman Rank(斯皮尔曼等级)相关系数及MATLAB实现
  12. android模拟器玩手游,电脑上玩手游PC安卓模拟器哪个好用?哪个手机模拟器最好...
  13. 增量Lint检测实现原理
  14. Unity Shader学习-高光反射
  15. 5G对广播电视的影响以及应用-论文
  16. DIY蓝牙键盘(2) - 理解HID报文描述符
  17. oracle ora 02437,给表追加主键-----报错ORA-02437: 无法验证 (DENGCHAO.TEST) - 违反主键
  18. iApp屏蔽浏览器元素
  19. 解决SQL Server 导入System.Web.dll程序集,报错问题
  20. 关于近期微信小游戏马甲过包的一点感悟

热门文章

  1. java 行 javac不行_javac可以java不行
  2. python判断质数_使用Python语言判断质数(素数)的简单方法讲解
  3. 把公网ip通过DNSPOD进行域名解析
  4. [自定义SurfaceView] 气泡效果
  5. Android消息机制详解
  6. 5个设计师必备的页面设计工具!
  7. 缺失值和重复值的处理
  8. KMP快速计算next与nextval
  9. LCA算法以及原理详解
  10. Python学习日记(十八) 序列化模块