概述:
下面内容说的是在TPC-DS 1T数据集上用web_sales表ws_bill_customer_sk, ws_ship_customer_sk作为起始点和结束点,以ws_quantity为权重跑Spark GraphX(2.0.0以上版本)程序的正确姿势。用下面程序跑可以避免Spark GraphX在大数据情况下的各种bug, 在程序效率,gc稳定性上都有增强。

数据特征:
1T TPC-DS数据集中web_sales表(顶点:11924692, 边:720000376)

强连通子图算法(对Spark GraphX内置算法进行修改)可以做的改造如下:
1. 修复GraphX官方还未修复的关于源码核心类VertexRDD的bug,打patch到部署的GraphX包中
2. 解决堆溢出问题,通过用checkpoint截断RDD lineage的方法解决线程栈溢出等诸多bug
3. 控制总内存,用于RDD存储内存和JVM内部各代的比例,初始最小堆大小等参数,使driver端和executor端的JVM GC和full GC时间占用控制在总时间的5%左右
4. 增加用于提升系统稳定的参数
5. 使用图的顶点表RDD和边表RDD序列化功能,在内存充足和内存不足的情况下都能有效减少内存,减少和磁盘交互数据量。
6. 合理划分数据partition数,cores数和parallelism的比例,使中间过程数据大小合理,CPU能充分利用处理task,CPU数和并发处理的线程比例合理
7. 根据强连通子图只关注连通,不关注权重的特点和连通性与连通次数无关的特点提前过滤无效数据。
8. 升级核心消息传递接口到aggregateMessages

Fast unfolding社团算法(对参考1中的算法做修改)可以做的改造如下:
1. 修改核心消息传递接口aggregateMessages的参数减少消息传播过程数据量,减少每层迭代所用时间
2. 增加堆外内存量消除fetch shuffle metadata问题
3. 增加executor内存中存储内存的比例,解决executor存储内存不足的问题
4. 及时释放无用RDD,控制运行时内存无效内存占用。

算法运行时间:
Spark 2.x.x环境下,顶点1千万,边7亿的图,社团发现算法2小时,强连通子图算法30分钟。中间数据量为6T左右。图算法通信量大,过程数据量大。不少算法内存消耗大。数据膨胀率能到100,就是20G的原始数据,有可能2T的中间数据,800G内存消耗。

**********
概念理解
**********
有向图强连子图:任意uv之间可连接到。用于衡量可达性,无权重。
有向图完全子图:任意uv之间必须直接相连。用于解释团的概念。
团:完全子图是一个团,其他团不能完全包含这个团。团用于衡量局部密度。
三角数量:clique团是NP-complete问题,用三角数量来简化图结构的度量
局部聚类系数:

************强连通子图********************
export SPARK_HOME=/opt/ihenan/spark-2.0.0-bin-ihenan
unset HIVE_HOME

bin/spark-shell \
--master spark://172.17.199.3:7077 \
--name "scc_1000_itr" \
--driver-memory 10g \
--driver-java-options='-XX:NewRatio=4' \
--total-executor-cores 20 --executor-memory 22g --executor-cores 2 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.default.parallelism=50 \
--conf spark.yarn.executor.memoryOverhead=3000 \
--conf spark.executor.extraJavaOptions="-Xms12g -Xss16m"  \
--conf spark.network.timeout=300s \
--conf spark.shuffle.io.retryWait=20s \
--conf spark.shuffle.io.maxRetries=6 \
--jars /home/ihenanhadoop/ihenan_huxl/louvain_pro_memser_scc_ckp.jar 2> ~/ihenan_huxl/scc_1000_log.txt

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
sc.setCheckpointDir("/tmp/spark/checkpoint1000")
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val sqlDF = sql("SELECT distinct ws_bill_customer_sk, ws_ship_customer_sk FROM tpcds_bin_partitioned_orc_1000.web_sales where ws_bill_customer_sk is not null and ws_ship_customer_sk is not null")
sqlDF.map(t => t(0) + " " + t(1)).repartition(10).rdd.saveAsTextFile("gin1000")
var graph = GraphLoader.edgeListFile(sc, "gin1000", false, 20, StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER)
var sccGraph = SCC.run(graph, 10)

def sortedConnectedComponents(
 connectedComponents: Graph[VertexId, _])
 : Seq[(VertexId, Long)] = {
 val componentCounts = connectedComponents.vertices.map(_._2).
 countByValue
 componentCounts.toSeq.sortBy(_._2).reverse
}
val componentCounts = sortedConnectedComponents(sccGraph)
componentCounts.size
componentCounts.take(10).foreach(println)

************fast unfolding社团发现****************
export SPARK_HOME=/opt/ihenan/spark-2.0.0-bin-ihenan
unset HIVE_HOME

bin/spark-shell \
--master spark://172.17.199.3:7077 \
--name community_detection \
--driver-memory 10G \
--total-executor-cores 120 --executor-memory 30G --executor-cores 5 \
--conf spark.default.parallelism=200 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.executor.extraJavaOptions="-Xms22g"  \
--conf spark.memory.storageFraction=0.8 \
--conf spark.shuffle.io.maxRetries=5 \
--conf spark.shuffle.io.retryWait=10s \
--conf spark.yarn.executor.memoryOverhead=5000 \
--jars /home/ihenanhadoop/ihenan_huxl/louvain_pro_memser_scc_ckp.jar 2> ~/ihenan_huxl/cd_1000_log.txt

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val sqlDF = hiveContext.sql("SELECT ws_bill_customer_sk, ws_ship_customer_sk, ws_quantity FROM tpcds_bin_partitioned_orc_1000.web_sales  where ws_bill_customer_sk is not null and ws_ship_customer_sk is not null and ws_quantity is not null")
var edgeRDD = sqlDF.map(t => Edge(t.getAs[Long]("ws_bill_customer_sk"), t.getAs[Long]("ws_ship_customer_sk"), t.getAs[Int]("ws_quantity").toLong)).rdd
edgeRDD = edgeRDD.coalesce(500, shuffle = true)
val graph = Graph.fromEdges(edgeRDD, None).groupEdges(_ + _)
val runner = new HDFSLouvainRunner(2000,1,"cd1000")
runner.run(sc, graph)

参考资料:
1. 参与DARPA的XDATA项目和图计算项目的Sotera公司的程序
https://github.com/Sotera/distributed-graph-analytics
2. 解释louvain fast unfolding输出结果的意义:
http://blog.csdn.net/sparkexpert/article/details/50389975
3. 解决RDD lineage非常长超过线程栈大小的问题:
https://github.com/JerryLead/blogs/blob/master/BigDataSystems/Spark/StackOverflowDiagnosis/GraphX_StackOverflow_Cause_Diagnosis.md
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-td5649.html
4. 。。。

Spark GraphX下强连通子图和社团发现算法在1T TPC-DS数据集下执行方法、优化和性能估算相关推荐

  1. 聚类技术---复杂网络社团检测_基于Plato高性能图计算框架的社团发现算法

    近年来,图作为一种表示和分析大数据的有效方法,因为特别适合用作 社交网络.推荐系统.网络安全.文本检索和生物医疗等领域至关重要的 数据分析和挖掘工具, 而受到广泛关注. 这里的"图" ...

  2. 社团发现算法-BGLL算法(附代码实现)

    一.社团发现算法 人们发现许多实际网络均具有社团结构, 即整个网络由若干个社团组成,社团之间的连接相对稀疏.社团内部的连接相对稠密.社团发现则是利用图拓扑结构中所蕴藏的信息从复杂网络 中解析出其模块化 ...

  3. 开源复杂网络分析软件中社团发现算法总结

    复杂网络研究中的一个重要部分就是社团发现(Community Detection)算法的研究,密歇根大学物理学系教授Mark Newman就主要在社团发现方面做出了很多贡献.今天简单总结一下几个开源复 ...

  4. Louvain 社团发现算法学习(我的java实现+数据用例)

    为了大家方便,直接把数据放在github了: https://github.com/qq547276542/Louvain 算法介绍: Louvain 算法是基于模块度的社区发现算法,该算法在效率和效 ...

  5. 【项目笔记】若干基本社团发现算法介绍

    两个衡量指标:边介数 & 模块度 边介数计算: 以下用图来自:https://blog.csdn.net/weixin_44704845/article/details/102686597 选 ...

  6. Spark—GraphX编程指南

    GraphX编程指南 GraphX 是新的图形和图像并行计算的Spark API.从整理上看,GraphX 通过引入 弹性分布式属性图(Resilient Distributed Property G ...

  7. 数学建模——K-means聚类算法与社团发现

    将之前的课程报告整理一下,以具体的应用案例介绍K-means算法与其他方法结合的应用场景. let's go~ 1  概述 复杂网络是指规模庞大,具有自组织.自相似.小世界以及无标度特性的网络[1]. ...

  8. 华为诺亚ICLR 2020满分论文:基于强化学习的因果发现算法

    2019-12-30 13:04:12 人工智能顶会 ICLR 2020 将于明年 4 月 26 日于埃塞俄比亚首都亚的斯亚贝巴举行,不久之前,大会官方公布论文接收结果:在最终提交的 2594 篇论文 ...

  9. 除了基于模块度之外的其它社团检测算法

    一.模块度的局限性 (1)判断网络是否具有较强的社团结构一种方法是把一个给定网络与该网络相应的随机化模型做对比.通常做法是通过随机重连方式生成许多具有相同度序列的随机化网络,并计算这些网络的模块度的均 ...

最新文章

  1. Linux: TLB 查询流程
  2. 宏转录组方法_高级转录组分析和R语言数据可视化第十二期 (线上线下同时开课)...
  3. fc-ae-1553_什么是AE-L,AF-L和*按钮,它们的作用是什么?
  4. 【数据结构】(面试题)使用两个栈实现一个队列(详细介绍)
  5. 什么原因导致百度搜索比Google要少很多?
  6. Java限流之 —— Nginx限流
  7. android语法高亮插件,安卓语法高亮编辑器 HighlightTextEditor
  8. centos6.5 x86_64安装rsyslog + loganalyzer
  9. linux 0.11根文件系统,构建一个最小Linux根文件系统
  10. bios还原默认设置
  11. 软件工程(吕云翔第二版)部分简答题答案
  12. 2022中兴捧月图像去噪赛道结果小结
  13. 稳定同位素示踪技术在内源性物质代谢调控中的应用
  14. 微分几何笔记(3) —— Frenet标架及Frenet方程组
  15. 《孙子兵法战略运筹之谋攻篇》
  16. 手把手教你高效快捷的创建Swift Framework
  17. openssl加密解密
  18. 百家号在电脑上如何查看作者的其它文章
  19. 3分钟教会你用KaTeX在csdn博客中编辑数学公式
  20. 02-网络为什么要分层

热门文章

  1. 正确重启计算机的方法,电脑一开机就会出现 重启并选择正确的启动设备或在选定的启动设...
  2. matlab开环调速,直流电动机开环调速MATLAB系统仿真
  3. 5G/NR 随机接入过程之PRACH时域资源
  4. 用python画小猪佩奇的编码_如何用Python代码画小猪佩奇
  5. 当年也是翩翩少年,如今落得秃顶大叔,程序员秃顶算工伤吗?
  6. “永恒之蓝”第一弹-关于防范感染勒索蠕虫病毒的紧急通知
  7. 戴尔电脑怎么录屏?这6个方法教你轻松录屏
  8. 电脑重装系统,微信备份与恢复聊天记录,保存的文件。微信聊天记录迁移
  9. 高德地图实现租房学习
  10. Cannot execute request on any known server或DiscoveryClient_UNKNOWN/DESKTOP-MQ8D0C9:8761