目录

1、Pregel API:

2、代码实现:

使用pregal实现找出源顶点到每个节点最小花费

使用pregel实现找出源节点到每个节点的最大深度


1、Pregel API:

图本身就是内在的递归的数据结构,因为一个顶点的属性可能依赖于其neighbor,而neighbor的属性又依赖于他们的neighbour。所以很多重要的图算法都会迭代计算每个顶点的属性,直到达到一个稳定状态。

GraphX中的Pregel操作符是一个批量同步并行(bulk-synchronous parallel message abstraction)的messaging abstraction,用于图的拓扑结构(topology of the graph)。The Pregel operator executes in a series of super steps in whichvertices receive the sum of their inbound messagesfrom the previous super step,compute a new valuefor the vertex property, and thensend messages to neighboring verticesin the next super step. Message是作为edge triplet的一个函数并行计算的,message的计算可以使用source和dest顶点的属性。没有收到message的顶点在super step中被跳过。迭代会在么有剩余的信息之后停止,并返回最终的图。

pregel的定义:

def pregel[A]

(initialMsg: A,//在第一次迭代中每个顶点获取的起始

msgmaxIter: Int = Int.MaxValue,//迭代计算的次数

activeDir: EdgeDirection = EdgeDirection.Out

)(

vprog: (VertexId, VD, A) => VD,//顶点的计算函数,在每个顶点运行,根据顶点的ID,属性和获取的inbound message来计算顶点的新属性值。顶一次迭代的时候,inbound message为initialMsg,且每个顶点都会执行一遍该函数。以后只有上次迭代中接收到信息的顶点会执行。

sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],//应用于顶点的出边(out edges)用于接收顶点发出的信息

mergeMsg: (A, A) => A//合并信息的算法

)

算法实现的大致过程:

var g = mapVertices((vid, vdata) => vprog(vid, vdata, initMsg)).cache //第一步是根据initMsg在每个顶点执行一次vprog算法,从而每个顶点的属性都会迭代一次。

var messages = g.mapReduceTriplets(sendMsg, mergeMsg)

var messagesCount = messages.count

var i = 0

while(activeMessages > 0 && i < maxIterations){

g = g.joinVertices(messages)(vprog).cache

val oldMessages = messages

messages = g.mapReduceTriplets(

sendMsg,

mergeMsg,

Some((oldMessages, activeDirection))

).cache()

activeMessages = messages.count

i += 1

}

g

pregel算法的一个实例:将图跟一些一些初始的score做关联,然后将顶点分数根据出度大小向外发散,并自己保留一份:

//将图中顶点添加上该顶点的出度属性

val graphWithDegree = graph.outerJoinVertices(graph.outDegrees){

case (vid, name, deg) => (name, deg match {

case Some(deg) => deg+0.0

case None => 1.25}

)

}//将图与初始分数做关联

val graphWithScoreAndDegree = graphWithDegree.outerJoinVertices(scoreRDD){

case (vid, (name, deg), score) => (name,deg, score.getOrElse(0.0))

}

graphWithScoreAndDegree.vertices.foreach(x => println("++++++++++++id:"+x._1+"; deg: "+x._2._2+"; score:"+x._2._3))//将图与初始分数做关联

val graphWithScoreAndDegree = graphWithDegree.outerJoinVertices(scoreRDD){

case (vid, (name, deg), score) => (name,deg, score.getOrElse(0.0))

}

graphWithScoreAndDegree.vertices.foreach(x => println("++++++++++++id:"+x._1+"; deg: "+x._2._2+"; score:"+x._2._3))

算法的第一步:将0.0(也就是传入的初始值initMsg)跟各个顶点的值相加(还是原来的值),然后除以顶点的出度。这一步很重要,不能忽略。 并且在设计的时候也要考虑结果会不会被这一步所影响。

解释来源:https://www.jianshu.com/p/d9170a0723e4

2、代码实现:

使用pregal实现找出源顶点到每个节点最小花费

package homeWorkimport org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.graphx.util.GraphGeneratorsobject MapGraphX5 {def main(args: Array[String]): Unit = {//设置运行环境val conf = new SparkConf().setAppName("Pregel API GraphX").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("WARN")// 构建图val myVertices = sc.parallelize(Array((1L, 0), (2L, 0), (3L, 0), (4L, 0),(5L, 0)))val myEdges = sc.makeRDD(Array(Edge(1L, 2L, 2.5),Edge(2L, 3L, 3.6), Edge(3L, 4L, 4.5),Edge(4L, 5L, 0.1), Edge(3L, 5L, 5.2)))val myGraph = Graph(myVertices, myEdges)//设置源顶点val sourceId: VertexId = 1L//初始化数据集,是源顶点就为0.0,不是就设置为double的正无穷大val initialGraph = myGraph.mapVertices((id, _) =>if (id == sourceId) 0.0 else Double.PositiveInfinity)/*def pregel[A](initialMsg : A,maxIterations : scala.Int = { /* compiled code */ },activeDirection : org.apache.spark.graphx.EdgeDirection = { /* compiled code */ })(vprog : scala.Function3[org.apache.spark.graphx.VertexId, VD, A, VD],sendMsg : scala.Function1[org.apache.spark.graphx.EdgeTriplet[VD, ED],scala.Iterator[scala.Tuple2[org.apache.spark.graphx.VertexId, A]]],mergeMsg : scala.Function2[A, A, A])(implicit evidence$6 : scala.reflect.ClassTag[A]): org.apache.spark.graphx.Graph[VD, ED] = { /* compiled code */ }
*/val sssp: Graph[Double, Double] = initialGraph.pregel(//initialMsDouble.PositiveInfinity//maxIterations和activeDirection使用默认值)(//vprog   更改数据集(id, dist, newDist) => math.min(dist, newDist),//sendMsgtriplet => { // Send Message//寻找1L顶点到每个顶点的最小花费if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {//满足sum(起始顶点+边值) 小于 终止顶点当前数据集中的值,就把sum发送给终止顶点,更新数据集的数据Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))} else {Iterator.empty}},//mergeMsg    选择当前数据和发送数据的最小值传送(a, b) => math.min(a, b))sssp.vertices.collect.foreach(println(_))}
}

使用pregel实现找出源节点到每个节点的最大深度

package pregelimport org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{Edge, EdgeDirection, Graph}object Demo2 {def main(args: Array[String]): Unit = {//设置运行环境val conf = new SparkConf().setAppName("Pregol Api GraphX").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("WARN")// 构建图val myVertices = sc.parallelize(Array((1L, "张三"), (2L, "李四"), (3L, "王五"), (4L, "钱六"),(5L, "领导")))val myEdges = sc.makeRDD(Array( Edge(1L,2L,"朋友"),Edge(2L,3L,"朋友") , Edge(3L,4L,"朋友"),Edge(4L,5L,"上下级"),Edge(3L,5L,"上下级")))val myGraph = Graph(myVertices,myEdges)val g =  myGraph.mapVertices((vid,vd)=>0)var newGraph: Graph[Int, String] = g.pregel(0)((id, attr, maxValue) => maxValue,triplet => { // Send Messageif (triplet.srcAttr + 1 > triplet.dstAttr) {Iterator((triplet.dstId, triplet.srcAttr + 1))} else {Iterator.empty}},(a: Int, b: Int) => math.max(a, b))newGraph.vertices.collect.foreach(println(_))}}

sparkGraphX 图操作:pregel(加强的aggregateMessages)相关推荐

  1. SparkGraphX图计算(一)

    SparkGraphX基本介绍 一.什么是图 二.什么是SparkGraphX 三.常见的图算法 1.PageRank算法 2.最短路径算法 3.社群发现 4.推荐算法ALS和SVD++ 四.Grap ...

  2. 原创:Spark中GraphX图运算pregel详解

    原创:Spark中GraphX图运算pregel详解 由于本人文字表达能力不足,还是多多以代码形式表述,首先展示测试代码,然后解释: package com.txq.spark.test import ...

  3. 根据传入坐标和图片URL地址对图片进行切图操作、将图片转化成Base64位码

    目录 1.根据传入坐标和图片URL地址对图片进行切图操作 2.将图片转化成Base64位编码.根据传入坐标 算出切点坐标 在开发过程的学习记录,此两个工具类主要是对图像的处理(切图),对文件的想换转化 ...

  4. redmine 贴图操作

    在说明.概述或者描述的标签右侧,会发现一列操作如图: 这里是用来做编辑框里面的文字排版用的,其中,留意到这个图标: 下面说下具体的贴图操作: 1.先在上传文件的位置把你要黏贴的图片选择进来. 2.点击 ...

  5. 知识图谱学习笔记-图操作

    一.自定义图 import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.graphx.{Edge, Verte ...

  6. 值得收藏的时间复杂度速查表:数据结构操作、排序算法、图操作、堆操作

    时间复杂度速查表 这篇文章覆盖了计算机科学里面常见算法的时间和空间的大 OBig-O 复杂度. 在参加面试前,我们经常需要花费很多时间从互联网上查找各种搜索和排序算法的优劣,了节省大家的时间,我收集了 ...

  7. UML类图操作(二)

    在上一篇中讲了类图的操作以及介绍,当然类不是单独存在的,类与类之间存在着关系,UML提供了类之间的关系的表达方式. UML类图操作(一) 下面简单介绍设置操作,双击关系线,然后找到Detail 1.关 ...

  8. UML类图操作(一)

    UML类图操作(二) 类图(Class Diagram)用于描述系统中所包含的类以及它们之间的相互关系,帮助人们简化对系统的理解,它是系统分析和设计阶段的重要产物,也是系统编码和测试的重要模型依据.在 ...

  9. 前端开发 网络图片的下载与切图操作 0228 需操作

    普通图片直接下载 右键下载即可 背景图片的查找与下载 通过查找样式找到图片的路径 然后下载 切图操作 需要使用photoshop软件 下载地址 链接: https://pan.baidu.com/s/ ...

最新文章

  1. 在DataGridView控件中加入ComboBox下拉列表框的实现
  2. python爬虫提取人名_python爬虫—爬取英文名以及正则表达式的介绍
  3. c语言如何快速看懂别人的程序,探究如何快速看懂单片机程序方法
  4. 金蝶结账时显示系统错误h80004005_干货!超详细操作流程!金蝶、用友日常账务处理大全!...
  5. c# xls 复制一行_编写干净的C#代码技巧
  6. mysql sharding 知乎_分库分表系列(1)-shardingsphere核心概念
  7. mysql切换数据库命令_MySQL数据库的基础使用命令大全
  8. win任务栏计算机变未知,深度技术Win7电脑任务栏图标显示异常的解决方法
  9. lol比尔吉沃特服务器未响应,LOL比尔吉沃特9月30日网络波动公告 引起卡机掉线丢包状况...
  10. 微信小程序实现车牌号录入
  11. NeurIPS 2019 | DetNAS:首个搜索物体检测Backbone的方法
  12. Seq2Seq模型中的集束搜索(Beam Search)
  13. 各系统安装NetFrameWork3.5 安装
  14. Excel相关操作(二)Springboot框架使用easyexcel工具
  15. torch.norm-L2范数
  16. 在线制作简易业务流程图
  17. 什么是软件产品----老吴说产品
  18. citus多CN部署
  19. 微星B450M安装ubuntu 18.04 BIOS更改启动顺序
  20. 2021腾讯校招笔试

热门文章

  1. Spring Boot进阶(13):如何获取@ApiModelProperty(value = “序列号“, name = “uuid“)中的value值 | 超级详细,建议收藏
  2. 数据挖掘——决策树和K近邻
  3. Computer-System Structures八大思想
  4. 手持式超声波气象站旱作节水灌溉
  5. 亲影:您的相册需要一个专属管家
  6. h5/5+APP消息推送神器:Goeasy.js
  7. HDUOJ 4513 吉哥系列故事——完美队形II
  8. 32位低功耗MCU的设计
  9. php扩容方案,PHP程序员玩转Linux系列-腾讯云硬盘扩容挂载
  10. 解决office的PPT和WPS的PPT不兼容的问题