Spark Pregel参数说明

Pregel是个强大的基于图的迭代算法,也是Spark中的一个迭代应用aggregateMessage的典型案例,用它可以在图中方便的迭代计算,如最短路径、关键路径、n度关系等。然而对于之前对图计算接触不多的童鞋来说,这个api还算是一个比较重量组的接口,不太容易理解。 Spark中的Pregel定义如下:

def pregel[A: ClassTag](initialMsg: A,maxIterations: Int = Int.MaxValue,activeDirection: EdgeDirection = EdgeDirection.Either)(vprog: (VertexId, VD, A) => VD,sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],mergeMsg: (A, A) => A): Graph[VD, ED] = {Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)}

各个参数的意义详细解释如下:
initialMsg: 初始化消息,这个初始消息会被用来初始化图中的每个节点的属性,在pregel进行调用时,会首先在图上使用mapVertices来根据initialMsg的值更新每个节点的值,至于如何更新,则由vprog参数而定,vprog函数就接收了initialMsg消息做为参数来更新对应节点的值

maxIterations: 最大迭代次数

activeDirection: 表示边的活跃方向,什么是活跃方向呢,首先要解释一下活跃消息与活跃顶点的概念,活跃节点是指在某一轮迭代中,pregel会以sendMsg和mergeMsg为参数来调用graph的aggregateMessage方法后收到消息的节点,活跃消息就是这轮迭代中所有被收成功收到的消息。这样一来,有的边的src节点是活跃节点,有的dst节点是活跃节点,而有的边两端节点都是活跃节点。如果activeDirection参数指定为“EdgeDirection.Out”,则在下一轮迭代时,只有接收消息的出边(src—>dst)才会执行sendMsg函数,也就是说,sendMsg回调函数会过滤掉”dst—>src”的edgeTriplet上下文参数

vprog: 节点变换函数,在初始时,以及每轮迭代后,pregel会根据上一轮使用的msg和这里的vprod函数在图上调用joinVertices方法变化每个收到消息的节点,注意这个函数除初始时外,都是仅在接收到消息的节点上运行,这一点可以从源码中看到,源码中用的是joinVertices(message)(vprog),因此,没有收到消息的节点在join之后就滤掉了

sendMsg: 消息发送函数,该函数的运行参数是一个代表边的上下文,pregel在调用aggregateMessages时,会将EdgeContext转换成EdgeTriplet对象(ctx.toEdgeTriplet)来使用,用户需要通过Iterator[(VertexId,A)]指定发送哪些消息,发给那些节点,发送的内容是什么,因为在一条边上可以发送多个消息,如sendToDst,如sendToSrc,所以这里是个Iterator,每一个元素是一个tuple,其中的vertexId表示要接收此消息的节点的id,它只能是该边上的srcId或dstId,而A就是要发送的内容,因此如果是需要由src发送一条消息A给dst,则有:Iterator((dstId,A)),如果什么消息也不发送,则可以返回一个空的Iterator:Iterator.empty

mergeMsg: 邻居节点收到多条消息时的合并逻辑,注意它区别于vprog函数,mergeMsg仅能合并消息内容,但合并后并不会更新到节点中去,而vprog函数可以根据收到的消息(就是mergeMsg产生的结果)更新节点属性。

代码示例:
最短路径实现

package BooksCodeimport org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators
import org.graphstream.graph.implementations.{AbstractEdge, SingleGraph, SingleNode}object ShortestPath_Pregel {def main(args: Array[String]): Unit = {Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)val conf = new SparkConf().setAppName("ShortestPath_Pregel").setMaster("local")val sc = new SparkContext(conf)val graph:Graph[Long,Double] = GraphGenerators.logNormalGraph(sc,numVertices = 10).mapEdges(e =>e.attr.toDouble)val sourceId = 5Lval initialGraph = graph.mapVertices((id,_)=>if(id==sourceId) 0.0 else Double.PositiveInfinity)
//    println(initialGraph.vertices.collect.mkString("\n"))println(initialGraph.edges.distinct().collect.mkString("\n"))println("################################################")val sssp = initialGraph.pregel(Double.PositiveInfinity)(// verte program(id,dist,newDisst) =>{println((id,dist,newDisst))math.min(dist,newDisst)} ,//Send Messagetriplet => {if (triplet.srcAttr + triplet.attr < triplet.dstAttr){Iterator((triplet.dstId,triplet.srcAttr + triplet.attr))}else {Iterator.empty}},(a,b) => math.min(a,b)                //Merge message)//println(sssp.vertices.collect.mkString("\n"))}}

Graphx中pregel详解及具体应用分析(以最短路径为例)相关推荐

  1. 原创:Spark中GraphX图运算pregel详解

    原创:Spark中GraphX图运算pregel详解 由于本人文字表达能力不足,还是多多以代码形式表述,首先展示测试代码,然后解释: package com.txq.spark.test import ...

  2. 函数中{}输出格式详解(C#)

    Console.WriteLine()函数中{}输出格式详解(C#) Console.WriteLine()函数的格式一直没怎么注意.今天同事问起Console.WriteLine({0:D3},a) ...

  3. Java中CAS详解

    转载自  Java中CAS详解 在JDK 5之前Java语言是靠synchronized关键字保证同步的,这会导致有锁 锁机制存在以下问题: (1)在多线程竞争下,加锁.释放锁会导致比较多的上下文切换 ...

  4. 【转】图形流水线中坐标变换详解:模型矩阵、视角矩阵、投影矩阵

    转自:图形流水线中坐标变换详解:模型矩阵.视角矩阵.投影矩阵_sherlockreal的博客-CSDN博客_视角矩阵 图形流水线中坐标变换详解:模型矩阵.视角矩阵.投影矩阵 图形流水线中坐标变换过程 ...

  5. oracle itl解析,oracle数据块dump文件中ITL详解

    oracle数据块dump文件中ITL详解 dump出Oracle block后,可以看到事物槽,包含有事物槽号(ITL),XID,UBA,FLAG,LCK,SCN. 本文主要讨论FLAG标记的规则, ...

  6. GraphX中Pregel单源点最短路径(转)

    原文链接:GraphX中Pregel单源点最短路径 GraphX中的单源点最短路径例子,使用的是类Pregel的方式. 核心部分是三个函数: 1.节点处理消息的函数  vprog: (VertexId ...

  7. android中getSystemService详解

    原文地址:android中getSystemService详解作者:邹斌 http://blog.sina.com.cn/s/blog_71d1e4fc0100o8qr.html http://blo ...

  8. Oracle中CONCAT详解

    Oracle中CONCAT详解 1.什么是CONCAT 新的改变 我们对Markdown编辑器进行了一些功能拓展与语法支持,除了标准的Markdown编辑器功能,我们增加了如下几点新功能,帮助你用它写 ...

  9. Java中LinkedList详解

    Java中LinkedList详解 LinkedList底层是双向链表 单向链表 双向链表 LinkedList新增的方法 主要增加了针对头结点与尾结点进行操作的方法, 即针对第一个元素和最后一个元素 ...

最新文章

  1. 构建单层单向RNN网络对MNIST数据集分类
  2. java实现rsa欧几里得算法求d_RSA 加密算法的 java 实现
  3. Java解释XML文件的小例子
  4. python get post请求_使用python封装get+post请求
  5. java getparametermap_重写getParameterMap后,报错 ,
  6. 架构的变迁,从分层架构先聊起
  7. 【mysql基础知识】通过Navicat控制小数点位数,以及填充0后不显示的问题
  8. 永磁同步电机矢量控制中的双闭环是什么意思_三菱伺服控制器与变频器区别,三菱伺服控制器优势在哪?...
  9. POST的Response数据问题
  10. 电子技术基础(数字部分)(第六版) 康华光 课后习题答案
  11. 精品软件试用及分享 屏幕录像专家注册版 附下载地址
  12. 机器视觉编码靶标自动提取和解码Coded Target/  Marker Detector
  13. 必备技能!聊聊二维码扫码登录的原理
  14. 雅诗兰黛公司将收购Dr. Jart+
  15. 深度|人脸图像的技术原理及在电商业务中的应用
  16. Xsolla对话成都游戏茶馆CEO
  17. C基础 | 【05】(内存结构以及复合类型)
  18. 最短路(弗洛伊德——最短路)
  19. 加密算法详解AES/HmacSHA1/DES
  20. 计算机类中文核心期刊目录

热门文章

  1. 要不要学Python?Python真的有那么厉害?
  2. Vue设置button的disable属性
  3. 分享一个Java超市积分管理系统项目的制作方法。
  4. 在线文档编辑工具哪个更好?
  5. verdi直接打开list文件
  6. TUTK 手机客户端注意事项
  7. TUTK[MediaSDK][iOS]没有声音或声音异常
  8. 道哥:我人生有两大选择,为的却都是同一件事
  9. 基于javaweb仿京东商城管理系统的设计与实现(含论文和程序源码及数据库文件)
  10. 回归分析(三)——多项式回归解决非线性问题