第1关:GraphX-构建图及相关基本操作

import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object GraphX_Test_stu{def main(args:Array[String]): Unit ={//屏蔽日志Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)//设置运行环境val conf = new SparkConf().setAppName("SimpleGraph").setMaster("local")val sc = new SparkContext(conf)//设置顶点和边,注意顶点和边都是用元组定义的Array//顶点的数据类型是VD:(String,Int)val vertexArray = Array((1L,("Bob",89)),(2L,("Sunny",70)),(3L,("Tony",99)),(4L,("Helen",58)),(5L,("John",55)),(6L,("Tom",83)),(7L,("Marry",94)),(8L,("Cook",76)),(9L,("Linda",84)))//边的数据类型ED:Intval edgeArray = Array(Edge(1L,2L,5),Edge(1L,3L,9),Edge(2L,4L,4),Edge(3L,4L,6),Edge(3L,6L,8),Edge(3L,7L,4),Edge(4L,5L,7),Edge(4L,8L,6),Edge(8L,3L,7),Edge(8L,7L,2),Edge(8L,9L,1))//构造vertexRDD和edgeRDDval vertexRDD:RDD[(Long,(String,Int))] = sc.parallelize(vertexArray)val edgeRDD:RDD[Edge[Int]] = sc.parallelize(edgeArray)//构造Graph[VD,ED]val graph:Graph[(String,Int),Int] = Graph(vertexRDD, edgeRDD)//*********************图的属性//找出图中成绩大于60的顶点println("Find the vertices with scores greater than 60 in the graph")graph.vertices.filter{case (id,(name,grade)) => grade > 60}.collect.foreach{case (id,(name,grade)) => println(s"$name $grade")}println//边操作,找出图中边属性大于5的边println("Find the edge of the graph whose edge attribute is greater than 5")graph.edges.filter(e => e.attr > 5).collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))println//triplets操作.((srcId,srcAttr),(dstID,dstAttr),attr)//列出边属性>5的tripltesprintln("Find the tripltes with edge attributes greater than 5")for (triplet <- graph.triplets.filter(t => t.attr > 5).collect){println(s"${triplet.srcAttr._1} ${triplet.dstAttr._1}")}println//Degrees操作//找出图中最大的出度、入度、度数println("Find the maximum outDegrees, inDegrees, and Degrees in the graph")def max(a:(VertexId,Int),b:(VertexId,Int)):(VertexId,Int) = {if(a._2 > b._2) a else b}println("max of outDegrees" + graph.outDegrees.reduce(max)  + " max of inDegrees" + graph.inDegrees.reduce(max) + " max of Degrees" + graph.degrees.reduce(max))//********************转换操作//顶点的转换操作,顶点成绩+10println("Vertex conversion operation   vertex scores added 10")graph.mapVertices{ case (id, (name, age)) => (id, (name,age+10))}.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._1}"))println//边的转换操作,边的属性println("Edge conversion operation   multiplying the attribute of the edge by 2")graph.mapEdges(e => e.attr*2).edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))println//********************结构操作//找出顶点成绩>60的子图println("Find subgraphs with vertex scores greater than 60")val subGraph = graph.subgraph(vpred = (id, vd) => vd._2 >= 60)//找出子图所有顶点println("Find all the vertices of the subgraph:")subGraph.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))println//找出子图所有边println("Find all sides of the subgraph:")subGraph.edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))println//********************结构操作//连接操作val inDegrees:VertexRDD[Int] = graph.inDegreescase class User(name:String,grade:Int,inDeg:Int,outDeg:Int)//创建一个新图,顶点VD的数据类型为User,并从graph做类型转换val initialUserGraph:Graph[User,Int] = graph.mapVertices{case (id,(name,grade)) => User(name,grade,0,0)}//initialUserGraph与inDegrees、outDegrees(RDD)进行连接//并修改initialUserGraph中inDeg值、outDeg值val userGraph = initialUserGraph.outerJoinVertices(initialUserGraph.inDegrees){case(id, u, inDegOpt) => User(u.name, u.grade, inDegOpt.getOrElse(0), u.outDeg)}.outerJoinVertices(initialUserGraph.outDegrees){case(id, u, outDegOpt) => User(u.name, u.grade, u.inDeg, outDegOpt.getOrElse(0))}//连接图的属性userGraph.vertices.collect.foreach(v => println(s"${v._2.name} inDeg: ${v._2.inDeg} outDeg:${v._2.outDeg}"))println//找出出度和入度相同的顶点println("Find the same vertex with the same degree of penetration")userGraph.vertices.filter{case (id,u) => u.inDeg == u.outDeg}.collect.foreach{case (id,property) => println(property.name)}printlnsc.stop()}
}

第2关:GraphX-求最短路径、二跳邻居操作

import org.apache.log4j.{Level,Logger}
import org.apache.spark.{SparkContext,SparkConf}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
object GraphX_Test_2_stu{def main(args:Array[String]): Unit ={//屏蔽日志Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)//设置运行环境val conf = new SparkConf().setAppName("SimpleGraph").setMaster("local")val sc = new SparkContext(conf)//设置顶点和边,注意顶点和边都是用元组定义的Array//顶点的数据类型是VD:(String,Int)val vertexArray = Array((1L,("Bob",89)),(2L,("Sunny",70)),(3L,("Tony",99)),(4L,("Helen",58)),(5L,("John",55)),(6L,("Tom",83)),(7L,("Marry",94)),(8L,("Cook",76)),(9L,("Linda",84)))//边的数据类型ED:Intval edgeArray = Array(Edge(1L,2L,5),Edge(1L,3L,9),Edge(2L,4L,4),Edge(3L,4L,6),Edge(3L,6L,8),Edge(3L,7L,4),Edge(4L,5L,7),Edge(4L,8L,6),Edge(8L,3L,7),Edge(8L,7L,2),Edge(8L,9L,1))//构造vertexRDD和edgeRDDval vertexRDD:RDD[(Long,(String,Int))] = sc.parallelize(vertexArray)val edgeRDD:RDD[Edge[Int]] = sc.parallelize(edgeArray)//构造Graph[VD,ED]val graph:Graph[(String,Int),Int] = Graph(vertexRDD, edgeRDD)//********************实用操作//找出顶点1到各顶点的最短距离println("Find the shortest distance from vertex 1 to each vertex")val sourceId:VertexId = 1L  //定义远点val initialGraph = graph.mapVertices((id,_) => if (id == sourceId) 0.0 else Double.PositiveInfinity)val sssp = initialGraph.pregel(Double.PositiveInfinity)((id,dist,newDist) => math.min(dist,newDist),triplet => {//计算权重if(triplet.srcAttr + triplet.attr < triplet.dstAttr){Iterator((triplet.dstId,triplet.srcAttr + triplet.attr))}else{Iterator.empty}},(a,b) => math.min(a,b))println(sssp.vertices.collect.mkString("\n"))printlndef sendMsgFunc(edge:EdgeTriplet[Int, Int]) = {if(edge.srcAttr <= 0){if(edge.dstAttr <= 0){// 如果双方都小于0,则不发送信息Iterator.empty}else{// srcAttr小于0,dstAttr大于零,则将dstAttr-1后发送Iterator((edge.srcId, edge.dstAttr - 1))}}else{if(edge.dstAttr <= 0){// srcAttr大于0,dstAttr<0,则将srcAttr-1后发送Iterator((edge.dstId, edge.srcAttr - 1))}else{// 双方都大于零,则将属性-1后发送val toSrc = Iterator((edge.srcId, edge.dstAttr - 1))val toDst = Iterator((edge.dstId, edge.srcAttr - 1))toDst ++ toSrc}}}val friends = Pregel(graph.mapVertices((vid, value)=> if(vid == 1) 2 else -1),// 发送初始值-1,// 指定阶数2,// 双方向发送EdgeDirection.Either)(// 将值设为大的一方vprog = (vid, attr, msg) => math.max(attr, msg),//sendMsgFunc,//(a, b) => math.max(a, b)).subgraph(vpred = (vid, v) => v >= 0)println("Confirm Vertices of friends ")friends.vertices.collect.foreach(println(_))sc.stop()}
}

educoder中Spark GraphX—构建图及相关操作相关推荐

  1. spark graphx从txt文件中读数据构建图

    程序功能:导入顶点以及边的数据,生成边RDD和顶点RDD,构建图 import org.apache.spark._ import org.apache.spark.graphx._ // To ma ...

  2. 用Spark GraphX进行图计算(详细步骤)

    SimpleGraphX 1.1 创建文件夹(10.103.105.63) spark-3.1.1-bin-hadoop2.7/Test/ mkdir graphx cd graphx mkdir s ...

  3. Educoder中Spark算子--java版本

    第1关:QueueStream 编程要求 在右侧编辑器补充代码,完成以下需求: 将时间戳转换成规定格式的时间形式(格式为:yyyy-MM-dd HH:mm:ss ) 提取数据中的起始URL(切割符为空 ...

  4. Educoder中Spark任务提交

    第1关:spark-submit提交 #!/bin/bashcp -r Spark/SparkRDD/target/project.jar /root cd /opt/spark/dist/bin # ...

  5. WebView中的视频全屏的相关操作

    最近工作中,基本一直在用WebView,今天就把它整理下: WebView 顾名思义,就是放一个网页,一个看起来十分简单,但是用起来不是那么简单的控件. 首先你肯定要定义,初始化一个webview,其 ...

  6. django框架中的QuerySet详解及相关操作

    QuerySet概念 从数据库中查询出来的结果一般是一个集合,这个集合叫做 QuerySet. <QuerySet [<Goods: Goods object (8)>, <G ...

  7. 转动风车java_java实现-图的相关操作

    importjava.util.LinkedList;public classGraph {private int vertexSize;//顶点的数量 private int[] vertexs;/ ...

  8. Spark GraphX图计算入门

    一.什么是图计算 图计算,可以简单理解为以图这种数据结构为基础,整合相关算法来实现对应应用的计算模型.社交网络中人与人之间的关系,如果用计算机数据结构表示,最合适的就是图了.其中图的顶点表示社交中的人 ...

  9. Spark GraphX相关使用方法

    Spark GraphX是一个分布式图处理框架,Spark GraphX基于Spark平台提供对图计算和图挖掘简洁易用的而丰富多彩的接口,极大的方便了大家对分布式图处理的需求.Spark GraphX ...

最新文章

  1. curl: (7) couldn‘t connect to host 解决方法
  2. php对象的三大特征,关于php中面向对象的三大特征(封装/继承/多态)
  3. 【Python】吊打pyecharts,又一超级棒的开源可视化库
  4. 【[网络流二十四题]最长不下降子序列问题】
  5. MPEG音视频编解码之MP3编解码概述
  6. 欢迎加入免费星球,一起交流大数据技术。
  7. 动手学pytorch之tensor数据(一)
  8. 8.9 NOIP模拟测试15 建设城市(city)+轰炸行动(bomb)+石头剪刀布(rps)
  9. iPhone13系列预计5499起;蔚来回应31岁企业家“自动驾驶”车祸去世;小米取消MIX4防丢失模式无卡联网服务|极客头条...
  10. 2015 NI 校招笔试机试面试
  11. 数据库中的case when ,if ,if null
  12. 为佳作喝彩: Google Play 2022 年度中国开发者最佳榜单
  13. 在线读书——孙天泽(嵌入式设计及Linux驱动开发指南——基于ARM9处理器)
  14. 【Proteus仿真】TLC5615输出256点正弦波(振幅可调)
  15. c语言写抢QQ红包软件,Android实现QQ抢红包插件
  16. 转载:徐家骏:我在华为工作十年的感悟
  17. 台式计算机里的电池有啥用,电脑主板上的电池有什么用?看完秒懂!
  18. std::pi_挤压Pi:家庭音频历险记
  19. swiper插件实现轮播图
  20. 关于CG,CV,DIP

热门文章

  1. winedit使用教程_基础教程:BCDEDIT常用命令及使用实例(图)
  2. 摩托罗拉Android 7寸,3.7寸屏600MHz处理器 摩托罗拉MT710评测
  3. 一段经典模拟退火算法代码
  4. 房子装修真的好累~~
  5. com lofter android,LOFTER
  6. uos命令_UOS与Deepin OS区别详解
  7. Phpcms v9建站详细流程
  8. Java:Java实现简单闹钟设计
  9. python服务器传输wav文件,Python处理wav文件(二进制文件读写)
  10. 微信小程序来了,整理微信小程序学习教程网站