Spark GraphX是一个分布式图处理框架,Spark GraphX基于Spark平台提供对图计算和图挖掘简洁易用的而丰富多彩的接口,极大的方便了大家对分布式图处理的需求。Spark GraphX由于底层是基于Spark来处理的,所以天然就是一个分布式的图处理系统。图的分布式或者并行处理其实是把这张图拆分成很多的子图,然后我们分别对这些子图进行计算,计算的时候可以分别迭代进行分阶段的计算,即对图进行并行计算。

Spark GraphX基本操作:

[plain] view plaincopy
  1. import org.apache.spark.SparkContext
  2. import org.apache.spark._
  3. import org.apache.spark.graphx._
  4. import org.apache.spark.graphx.Graph
  5. import org.apache.spark.graphx.Edge
  6. import org.apache.spark.graphx.VertexRDD
  7. import org.apache.spark.graphx.util.GraphGenerators
  8. import org.apache.spark.graphx.GraphLoader
  9. import org.apache.spark.storage.StorageLevel
  10. import org.apache.spark.rdd.RDD
  11. object SparkGraphx1 {
  12. def main(args: Array[String]) {
  13. val sc = new SparkContext("spark://centos.host1:7077", "Spark Graphx")
  14. //创建点RDD
  15. val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array(
  16. (3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
  17. (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
  18. //创建边RDD
  19. val relationships: RDD[Edge[String]] = sc.parallelize(Array(
  20. Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
  21. Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
  22. //定义一个默认用户,避免有不存在用户的关系
  23. val defaultUser = ("John Doe", "Missing")
  24. //构造Graph
  25. val graph = Graph(users, relationships, defaultUser)
  26. //点RDD、边RDD过滤
  27. val fcount1 = graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
  28. println("postdocs users count: " + fcount1)
  29. val fcount2 = graph.edges.filter(edge => edge.srcId > edge.dstId).count
  30. println("srcId > dstId edges count: " + fcount2)
  31. val fcount3 = graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count
  32. println("srcId > dstId edges count: " + fcount3)
  33. //Triplets(三元组),包含源点、源点属性、目标点、目标点属性、边属性
  34. val triplets: RDD[String] = graph.triplets.map(triplet => triplet.srcId + "-" +
  35. triplet.srcAttr._1 + "-" + triplet.attr + "-" + triplet.dstId + "-" + triplet.dstAttr._1)
  36. triplets.collect().foreach(println(_))
  37. //度、入度、出度
  38. val degrees: VertexRDD[Int] = graph.degrees;
  39. degrees.collect().foreach(println)
  40. val inDegrees: VertexRDD[Int] = graph.inDegrees
  41. inDegrees.collect().foreach(println)
  42. val outDegrees: VertexRDD[Int] = graph.outDegrees
  43. outDegrees.collect().foreach(println)
  44. //构建子图
  45. val subGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
  46. subGraph.vertices.collect().foreach(println(_))
  47. subGraph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
  48. .collect().foreach(println(_))
  49. //Map操作,根据原图的一些特性得到新图,原图结构是不变的,下面两个逻辑是等价的,但是第一个不会被graphx系统优化
  50. val newVertices = graph.vertices.map { case (id, attr) => (id, (attr._1 + "-1", attr._2 + "-2")) }
  51. val newGraph1 = Graph(newVertices, graph.edges)
  52. val newGraph2 = graph.mapVertices((id, attr) => (id, (attr._1 + "-1", attr._2 + "-2")))
  53. //构造一个新图,顶点属性是出度
  54. val inputGraph: Graph[Int, String] =
  55. graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
  56. //根据顶点属性为出度的图构造一个新图,依据PageRank算法初始化边与点
  57. val outputGraph: Graph[Double, Double] =
  58. inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)
  59. //图的反向操作,新的图形的所有边的方向相反,不修改顶点或边性属性、不改变的边的数目,它可以有效地实现不必要的数据移动或复制
  60. var rGraph = graph.reverse
  61. //Mask操作也是根据输入图构造一个新图,达到一个限制制约的效果
  62. val ccGraph = graph.connectedComponents()
  63. val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
  64. val validCCGraph = ccGraph.mask(validGraph)
  65. //Join操作,原图外连出度点构造一个新图  ,出度为顶点属性
  66. val degreeGraph2 = graph.outerJoinVertices(outDegrees) { (id, attr, outDegreeOpt) =>
  67. outDegreeOpt match {
  68. case Some(outDeg) => outDeg
  69. case None => 0 //没有出度标识为零
  70. }
  71. }
  72. //缓存。默认情况下,缓存在内存的图会在内存紧张的时候被强制清理,采用的是LRU算法
  73. graph.cache()
  74. graph.persist(StorageLevel.MEMORY_ONLY)
  75. graph.unpersistVertices(true)
  76. //GraphLoader构建Graph
  77. var path = "/user/hadoop/data/temp/graph/graph.txt"
  78. var minEdgePartitions = 1
  79. var canonicalOrientation = false // if sourceId < destId this value is true
  80. val graph1 = GraphLoader.edgeListFile(sc, path, canonicalOrientation, minEdgePartitions,
  81. StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY)
  82. val verticesCount = graph1.vertices.count
  83. println(s"verticesCount: $verticesCount")
  84. graph1.vertices.collect().foreach(println)
  85. val edgesCount = graph1.edges.count
  86. println(s"edgesCount: $edgesCount")
  87. graph1.edges.collect().foreach(println)
  88. //PageRank
  89. val pageRankGraph = graph1.pageRank(0.001)
  90. pageRankGraph.vertices.sortBy(_._2, false).saveAsTextFile("/user/hadoop/data/temp/graph/graph.pr")
  91. pageRankGraph.vertices.top(5)(Ordering.by(_._2)).foreach(println)
  92. //Connected Components
  93. val connectedComponentsGraph = graph1.connectedComponents()
  94. connectedComponentsGraph.vertices.sortBy(_._2, false).saveAsTextFile("/user/hadoop/data/temp/graph/graph.cc")
  95. connectedComponentsGraph.vertices.top(5)(Ordering.by(_._2)).foreach(println)
  96. //TriangleCount主要用途之一是用于社区发现 保持sourceId小于destId
  97. val graph2 = GraphLoader.edgeListFile(sc, path, true)
  98. val triangleCountGraph = graph2.triangleCount()
  99. triangleCountGraph.vertices.sortBy(_._2, false).saveAsTextFile("/user/hadoop/data/temp/graph/graph.tc")
  100. triangleCountGraph.vertices.top(5)(Ordering.by(_._2)).foreach(println)
  101. sc.stop()
  102. }
  103. }

—————————————————————————————————————————————————————————————————————————————

Spark GraphX的一些其他有用操作:

[plain] view plaincopy
  1. import org.apache.spark._
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.graphx._
  4. import org.apache.spark.graphx.Graph
  5. import org.apache.spark.graphx.util.GraphGenerators
  6. import org.apache.spark.rdd.RDD
  7. object SparkGraphx {
  8. def main(args: Array[String]) {
  9. val sc = new SparkContext("spark://centos.host1:7077", "Spark Graphx")
  10. //通过GraphGenerators构建一个随机图
  11. val numVertices = 100
  12. val numEParts = 2
  13. val mu = 4.0
  14. val sigma = 1.3
  15. val graph: Graph[Double, Int] = GraphGenerators.logNormalGraph(
  16. sc, numVertices, numEParts, mu, sigma).mapVertices((id, _) => id.toDouble)
  17. graph.triplets.collect.foreach(triplet => println(triplet.srcId + "-" + triplet.srcAttr + "-" +
  18. triplet.attr + "-" + triplet.dstId + "-" + triplet.dstAttr))
  19. //mapReduceTriplets函数使用样例
  20. //计算年龄大于自己的关注者的总人数和总年龄
  21. val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)](
  22. //Map函数
  23. triplet => {
  24. if (triplet.srcAttr > triplet.dstAttr) {
  25. Iterator((triplet.dstId, (1, triplet.srcAttr)))
  26. } else {
  27. Iterator.empty
  28. }
  29. },
  30. //Reduce函数
  31. (a, b) => (a._1 + b._1, a._2 + b._2)
  32. )
  33. //计算年龄大于自己的关注者的平均年龄
  34. val avgAgeOfOlderFollowers: VertexRDD[Double] =
  35. olderFollowers.mapValues((id, value) => value match {case (count, totalAge) => totalAge / count })
  36. avgAgeOfOlderFollowers.collect.foreach(println(_))
  37. //定义一个Reduce函数来计算图中最大度的点
  38. def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  39. if (a._2 > b._2) a else b
  40. }
  41. val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
  42. println(s"maxInDegree: $maxInDegree")
  43. val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
  44. println(s"maxOutDegree: $maxOutDegree")
  45. val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
  46. println(s"maxDegrees: $maxDegrees")
  47. //计算邻居相关函数,这些操作是相当昂贵的,需要大量的重复信息作为他们的通信,因此相同的计算还是推荐用mapReduceTriplets
  48. val neighboorIds:VertexRDD[Array[VertexId]] = graph.collectNeighborIds(EdgeDirection.Out)
  49. val neighboors:VertexRDD[Array[(VertexId, Double)]] = graph.collectNeighbors(EdgeDirection.Out);
  50. //Pregel API。计算单源最短路径
  51. val graph1 = GraphGenerators.logNormalGraph(sc, numVertices, numEParts, mu, sigma).mapEdges(e => e.attr.toDouble)
  52. //定义一个源值 点
  53. val sourceId: VertexId = 42
  54. //初始化图的所有点,除了与指定的源值点相同值的点为0.0以外,其他点为无穷大
  55. val initialGraph = graph1.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
  56. //Pregel有两个参数列表,第一个参数列表包括的是:初始化消息、迭代最大数、边的方向(Out)。第二个参数列表包括的是:用户定义的接受消息、计算消息、联合合并消息的函数。
  57. val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  58. //点程序
  59. (id, dist, newDist) => math.min(dist, newDist),
  60. //发送消息
  61. triplet => {
  62. if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
  63. Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
  64. } else {
  65. Iterator.empty
  66. }
  67. },
  68. //合并消息
  69. (a, b) => math.min(a, b)
  70. )
  71. println(sssp.vertices.collect.mkString("\n"))
  72. //aggregateUsingIndex操作
  73. val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
  74. val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
  75. val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
  76. val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
  77. sc.stop()
  78. }
  79. }

本文转自

转自:http://blog.csdn.net/fighting_one_piece/article/details/39668267

http://blog.csdn.net/fighting_one_piece/article/details/39673193,所有权力归原作者所有。

Spark GraphX相关使用方法相关推荐

  1. educoder中Spark GraphX—构建图及相关操作

    第1关:GraphX-构建图及相关基本操作 import org.apache.log4j.{Level, Logger} import org.apache.spark.graphx._ impor ...

  2. Spark GraphX下强连通子图和社团发现算法在1T TPC-DS数据集下执行方法、优化和性能估算

    概述: 下面内容说的是在TPC-DS 1T数据集上用web_sales表ws_bill_customer_sk, ws_ship_customer_sk作为起始点和结束点,以ws_quantity为权 ...

  3. 《Spark GraphX in Action》书评及作者访谈

    \ 关键要点 \ 图数据分析与传统数据分析间的差异是什么? \ 如何使用Apache Spark GraphX软件库和GraphFrames这样的API进行图数据处理. \ 使用图数据分析的热门用例. ...

  4. 快刀初试:Spark GraphX在淘宝的实践

    (本文由团队中梧苇和我一起撰写,并由团队中的林岳,岩岫,世仪等多人Review,发表于程序员的8月刊,由于篇幅原因,略作删减,本文为完整版) 对于网络科学而言,世间万物都可以抽象成点,而事物之间的关系 ...

  5. 明风:分布式图计算的平台Spark GraphX 在淘宝的实践

    快刀初试:Spark GraphX在淘宝的实践 作者:明风 (本文由团队中梧苇和我一起撰写,并由团队中的林岳,岩岫,世仪等多人Review,发表于程序员的8月刊,由于篇幅原因,略作删减,本文为完整版) ...

  6. Spark—GraphX编程指南

    GraphX编程指南 GraphX 是新的图形和图像并行计算的Spark API.从整理上看,GraphX 通过引入 弹性分布式属性图(Resilient Distributed Property G ...

  7. Spark GraphX在淘宝的实践

    原文链接:http://rec-sys.net/forum.php?mod=viewthread&tid=398 由于Spark GraphX性能良好,又有丰富的功能和运算符,能在海量数据上自 ...

  8. 顶会论文阅读-22年CCF A级别spark graphX研究

    这篇文章是从dblp上面自行下载的唐老师发的A类文章,主要讲的是对spark源码当中sparkgraphX模块的优化: incgraph:基于Spark GraphX的分布式增量图计算模型和框架: 原 ...

  9. spark GraphX官方文档翻译--转载

    6. Spark GraphX 6.1 概述 GraphX是spark的一个新组件用于图和并行图计算.在一个高水平,GraphX通过引进一个新的图抽象扩展了spark RDD:带有顶点和边属性的有向多 ...

最新文章

  1. oracle erp 财务入门,Oracle ERP EBS 顾问财务模块基本业务操作专题
  2. python为什么这么火 知乎-没想到吧!Google 排名第一的编程语言,为什么会这么火?...
  3. 基于子类的动态代理:
  4. MSIL学习笔记(01):ilasm和ildasm
  5. 基于MATLAB的FFT傅立叶分析
  6. 数字图像处理的就业前景
  7. ustc linux 网络通,USTC 网络通脚本
  8. opencv 的norm_22、OpenCV用卷积Filter2D进行滤波器
  9. JVM性能分析与定位
  10. 导致集群重启_园区网核心交换机S7706异常重启导致无线网络故障
  11. matlab中的求导函数diff
  12. Office文件转PDF的解决方案
  13. VMWare安装CentOS8及黑屏问题解决方法
  14. 程序员投入时间和精力实现财富增长之道,这可能会伴随你程序员整个生涯(请不要连续点赞)
  15. Pixracer V1.0编译固件
  16. html左侧导航菜单多级,css3多级菜单导航栏、侧边菜单栏
  17. 关于ubuntu的详细介绍
  18. 阿丹学理财之P2P投资
  19. 事务transactional详解
  20. Build:Download maven-metadata.xml...打包一直卡在这里

热门文章

  1. The Google File System
  2. 剖析微软Hyper-V的最佳部署方式
  3. TreeView 操作应用
  4. java算法在工作,我在北京找工作(三):java实现算法2 直接插入排序+不可变类...
  5. C语言课后习题(63)
  6. libssl-dev linux下载,libssl
  7. 强势回归丨2021数据库大咖讲坛(第1期):数据库高可用容灾方案的实践与探索
  8. PostgreSQL的clog—从事务回滚速度谈起
  9. 9个问题,带你掌握流程控制语句中的java原理
  10. SecSolar:为代码“捉虫”,让你能更专心写代码