Spark Graphx Pregel

  • 一.Pregel概述
    • 1.什么是pregel?
    • 2.pregel应用场景
  • 二.Pregel源码及参数解释
    • 1.源码
    • 2.参数详细解释
      • (1)initialMsg
      • (2)maxIteration
      • (3)activeDirection
      • (4)vprog
      • (5)sendMsg
      • (6)mergeMsg
  • 三.Pregel计算顶点5 到 其他各顶点的 最短距离
    • 1.图信息
      • (1)顶点信息
      • (2)边信息
    • 2.Pregel原理分析
      • (1)调用pregel方法之前
      • (2)当调用pregel方法开始
      • (3)第一次迭代开始
      • (4)第二次迭代开始
      • (5)第三次迭代开始
      • (6)第四次迭代开始
      • (7)第五次迭代开始
    • 3.代码实现

一.Pregel概述

1.什么是pregel?

Pregel是Google 提出的用于大规模分布式图计算框架。Pregel是个强大的基于图的迭代算法。

2.pregel应用场景

一般pregel可以在图中进行迭代计算,如求最短路径,关键路径,n度关系等。

二.Pregel源码及参数解释

1.源码

 def pregel[A: ClassTag](initialMsg: A,maxIterations: Int = Int.MaxValue,activeDirection: EdgeDirection = EdgeDirection.Either)(vprog: (VertexId, VD, A) => VD,sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],mergeMsg: (A, A) => A): Graph[VD, ED] = {Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)}

2.参数详细解释

(1)initialMsg

初始化消息。这个初始化消息会被用来初始化图中的每个节点的属性,在pregel调用时,会首先在图上使用mapVertices来根据initialMsg的值更新每个几点的值。至于如何更新,则由vprog参数而定,vprog函数就接收了initialMsg消息作为参数来更新对应节点的值。

(2)maxIteration

最大迭代次数

(3)activeDirection

表示边的活跃方向

  • 活跃节点:是指在某一轮迭代中,pregel会以sendMsg和mergeMsg为参数来调用graph的aggregateMessage方法后收到消息的节点

  • 活跃消息:是这轮迭代中所有被成功收到的消息

    则有的边src节点是活跃节点,有的dst节点是活跃节点,有的边两端节点都是活跃节点。如果activeDirection参数被指定为"EdgeDirection.out",则在下一轮迭代中,只接收消息的出边(src—>dst)才会执行sendMsg函数。也就是说,sendMsg回调函数会过滤掉(dst—>src)的edgeTriplet上下文参数

(4)vprog

节点变换函数。

在初始时,以及每轮迭代后,pregel会根据上一轮使用的msg和这里的vprog函数在图上调用joinVertices方法变化每个收到消息的节点。

(5)sendMsg

消息发送函数。该函数的运行参数是一个代表边的上下文,pregel在调用aggregateMessage是,会将EdgeContext转换成EdgeTriplet对象来使用,用户需要通过Iterator[(VertexID,A)]指定发送哪些消息,发送哪些节点,发送哪些内容;因为在一条边上可以发送多个消息,如sendToDst,sendToSrc,所以这里是个Iterator,每个元素是一个tuple,其中的vertexId便是接收此消息的节点id,只能是该边上的srcId或者dstId,而A就是要发送的内容

因此,如果要由src发送一条消息A到dst,则有:Iterator((dstId,A)),如果什么消息也不发送,则返回一个空的Iterator:Iterator.empty

(6)mergeMsg

邻居节点收到多条消息时的合并逻辑。

区别与vprog,mergeMsg仅能合并消息内容,但合并后并不会更新到节点中去,而vprog函数可以根据收到的消息(就是mergeMsg产生的结果)更新节点属性

三.Pregel计算顶点5 到 其他各顶点的 最短距离

1.图信息

(1)顶点信息

      (1L, ("Alice", 28)),(2L, ("Bob", 27)),(3L, ("Charlie", 65)),(4L, ("David", 42)),(5L, ("Ed", 55)),(6L, ("Fran", 50))

(2)边信息

      Edge(2L, 1L, 7),Edge(2L, 4L, 2),Edge(3L, 2L, 4),Edge(3L, 6L, 3),Edge(4L, 1L, 1),Edge(2L, 5L, 2),Edge(5L, 3L, 8),Edge(5L, 6L, 3)

2.Pregel原理分析

  • 顶点的两个状态:

    • 钝化态:类似于休眠,不做任何处理
    • 激活态:可以进行数据的接受和发送
  • 顶点能够处于激活状态需要的条件
    • 成功收到消息
    • 成功发送任何一条消息

(1)调用pregel方法之前

先把图的各个顶点的属性初始化,即顶点5到自己的距离为0,所以设为0,其他顶点都设为正无穷大Double.PositiveInifinity

(2)当调用pregel方法开始

首先,所有顶点都将接收到一条初始消息initialMsg ,使所有顶点都处于激活态(红色标识的节点)

(3)第一次迭代开始

所有顶点以EdgeDirection.Out的边方向调用sendMsg方法发送消息给目标顶点,如果 源顶点的属性+边的属性<目标顶点的属性,则发送消息。否则不发送。

5—>3(0+8<Double.Infinity,成功),

5—>6(0+3<Double.Infinity,成功),

3—>2(Double.Infinity+4>Double.Infinity,失败),

3—>6(Double.Infinity+3>Double.Infinity,失败),

2—>1(Double.Infinity+7>Double.Infinity,失败),

2—>4(Double.Infinity+2>Double.Infinity,失败),

2—>5(Double.Infinity+2>Double.Infinity,失败),

4—>1(Double.Infinity+1>Double.Infinity,失败)

sendMsg方法执行完成之后,根据顶点处于激活态的条件,顶点5成功地分别给顶点3和顶点6发送消息,顶点3 和 顶点6 也成功地接受到了消息。

所以此时只有5,3,6三个顶点处于激活状态,其他顶点全部钝化。然后收到消息的顶点3和顶点6都调用vprog方法,将收到的消息与自身属性合并。如图所示,至此第一次迭代结束。

(4)第二次迭代开始

顶点3 给 顶点6 发送消息失败,顶点3 给 顶点2 发送消息成功,此时 顶点3 成功发送消息,顶点2 成功接收消息,所以顶点2 和 顶点3 都成为激活状态,其他顶点都成为钝化状态。然后顶点2 调用vprog方法,将收到的消息 与 自身的属性合并。至此第二次迭代结束

3—>2(8+4<Double.Infinity,成功),

3—>6(8+3>3,失败)

(5)第三次迭代开始

顶点3分别发送消息给顶点2失败 和 顶点6失败,顶点2 分别发消息给 顶点1成功、顶点4成功、顶点5失败 ,所以 顶点2、顶点1、顶点4 成为激活状态,其他顶点为钝化状态。顶点1 和 顶点4分别调用vprog方法,将收到的消息 与 自身的属性合并。至此第三次迭代结束

3—>2(8+4=12,失败),

3—>6(8+3>3,失败)

2—>1(12+7<Double.Infinity,成功)

2—>4(12+2<Double.Infinity,成功)

(6)第四次迭代开始

顶点2 分别发送消息给 顶点1失败 和 顶点4失败。顶点4 给 顶点1发送消息成功,顶点1 和 顶点4 进入激活状态,其他顶点进入钝化状态。顶点1 调用vprog方法,将收到的消息 与 自身的属性合并

2—>1(12+7=19,失败)

2—>4(12+2=14,失败)

4—>1(14+1<19,成功)

(7)第五次迭代开始

顶点4 再给 顶点1发送消息失败,顶点4 和 顶点1 进入钝化状态,此时全图都进入钝化状态。至此结束

4—>1(14+1=15,失败)

结论:由上述分析过程可知,顶点5到其他各顶点距离全部算出,

5—>1 (15)

5—>2 (12)

5—>3 (8)

5—>4 (14)

5—>6 (3)

3.代码实现

package suanfaimport org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object PregelDemo {def main(args: Array[String]): Unit = {//TODO:1.创建SparkContext对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("pregeldemo")val sc = new SparkContext(conf)//TODO:2、创建顶点val vertexArray = Array((1L, ("Alice", 28)),(2L, ("Bob", 27)),(3L, ("Charlie", 65)),(4L, ("David", 42)),(5L, ("Ed", 55)),(6L, ("Fran", 50)))val vertexRDD: RDD[(VertexId, (String, Int))] = sc.makeRDD(vertexArray)//TODO:3、创建边,边的属性代表 相邻两个顶点之间的距离val edgeArray = Array(Edge(2L, 1L, 7),Edge(2L, 4L, 2),Edge(3L, 2L, 4),Edge(3L, 6L, 3),Edge(4L, 1L, 1),Edge(2L, 5L, 2),Edge(5L, 3L, 8),Edge(5L, 6L, 3))val edgeRDD: RDD[Edge[Int]] = sc.makeRDD(edgeArray)//TODO:4、创建图(使用apply方式创建)val graph1 = Graph(vertexRDD, edgeRDD)/* ************************** 使用pregle算法计算 ,顶点5 到 各个顶点的最短距离 ************************** *///TODO:5、调用pregel算法//todo:(1)設置頂點信息//被计算的图中 起始顶点idval srcVertexId = 5L//给每个顶点赋属性值val initialGraph: Graph[Double, Int] = graph1.mapVertices { case (vid, (name, age)) => if (vid == srcVertexId) 0.0 else Double.PositiveInfinity }println(" 1.每个顶点的属性值如下")initialGraph.vertices.collect().foreach(println)println("---------------开始调用pregel---------------")//todo:(2)調用pregel,返回的还是一个图val pregelGraph: Graph[Double, PartitionID] = initialGraph.pregel(Double.PositiveInfinity,//每个点的初始值,无穷大Int.MaxValue,               //最大迭代次数EdgeDirection.Out       //发送消息的方向)(//todo:vprog:接受到的消息和自己的消息进行合并//这个顶点sendMsg发送的顶点信息// 三个参数 vprog: (VertexId, VD, A) => VD,//VertexId当前节点的顶点id,VD当前顶点的属性,A接收到的信息//返回值:当前顶点更新后的属性(vid: VertexId, vd: Double, distMsg: Double) => {println(s"----------顶点${vid}调用vprog:接受到的消息和自己的消息进行合并----------------")//即将接收到的信息和顶点属性进行比较,取最小值进行更新该顶点属性val minDist = math.min(vd, distMsg)println(s"顶点${vid},顶点属性${vd},收到消息${distMsg},合并后的属性${minDist}")minDist},//todo: sendMsg:发送消息,如果自己的消息+权重<目的地的消息,则发送//一个参数 : sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)]//即表的信息//返回值:发送成功后的节点id和发送的消息的一个迭代器(edgeTriplet: EdgeTriplet[Double, PartitionID]) => {println(s"----------调用${edgeTriplet.srcId}调用sendMsg发送消息给顶点${edgeTriplet.dstId}------------")if (edgeTriplet.srcAttr + edgeTriplet.attr < edgeTriplet.dstAttr) {println(s"顶点${edgeTriplet.srcId}给顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}")Iterator[(VertexId, Double)]((edgeTriplet.dstId, edgeTriplet.srcAttr + edgeTriplet.attr))} else {Iterator.empty}},//todo:mergeMsg多条接收消息,mergeMessage,取小合并多条消息// mergeMsg: (A, A) => A)(msg1: Double, msg2: Double) => {println(s"----------${msg1},${msg2}调用mergeMsg:合并多条接收消息------------")println(msg1,msg2)math.min(msg1, msg2)})}
}

输出结果:

//初始化每个节点的属性
1.每个顶点的属性值如下
(1,Infinity)
(2,Infinity)
(3,Infinity)
(4,Infinity)
(5,0.0)
(6,Infinity)
---------------开始调用pregel---------------
//最开始每个节点都会收到初始化的属性Double.PositiveInfinity,会通过调用vprog(接受到的消息和自己的消息进行合并)对节点属性进行合并
----------顶点5调用vprog:接受到的消息和自己的消息进行合并----------------
顶点5,顶点属性0.0,收到消息Infinity,合并后的属性0.0
----------顶点6调用vprog:接受到的消息和自己的消息进行合并----------------
顶点6,顶点属性Infinity,收到消息Infinity,合并后的属性Infinity
----------顶点1调用vprog:接受到的消息和自己的消息进行合并----------------
顶点1,顶点属性Infinity,收到消息Infinity,合并后的属性Infinity
----------顶点3调用vprog:接受到的消息和自己的消息进行合并----------------
顶点3,顶点属性Infinity,收到消息Infinity,合并后的属性Infinity
----------顶点2调用vprog:接受到的消息和自己的消息进行合并----------------
顶点2,顶点属性Infinity,收到消息Infinity,合并后的属性Infinity
----------顶点4调用vprog:接受到的消息和自己的消息进行合并----------------
顶点4,顶点属性Infinity,收到消息Infinity,合并后的属性Infinity
//---------------第一次迭代---------------
//第一次进行迭代,按照发送消息的方向发送消息,可知,第一次迭代只有顶点5发送消息到顶点6和3是满足sendMsg的条件的。即发送成功,
----------调用3调用sendMsg发送消息给顶点6------------
----------调用5调用sendMsg发送消息给顶点6------------
顶点5给顶点6 发送消息 3.0
----------调用2调用sendMsg发送消息给顶点5------------
----------调用3调用sendMsg发送消息给顶点2------------
----------调用4调用sendMsg发送消息给顶点1------------
----------调用5调用sendMsg发送消息给顶点3------------
顶点5给顶点3 发送消息 8.0
----------调用2调用sendMsg发送消息给顶点4------------
----------调用2调用sendMsg发送消息给顶点1------------
//顶点6和顶点3接收到信息后就会调用vprog进行合并属性
----------顶点6调用vprog:接受到的消息和自己的消息进行合并--------------
顶点6,顶点属性Infinity,收到消息3.0,合并后的属性3.0
----------顶点3调用vprog:接受到的消息和自己的消息进行合并----------------
顶点3,顶点属性Infinity,收到消息8.0,合并后的属性8.0
//---------------第二次迭代---------------
//经过第一次迭代后,3,5,6处于激活状态
//顶点3有出边,顶点6没有出边,可知,顶点3给顶点2发送消息成功,给顶点6发送失败
----------调用3调用sendMsg发送消息给顶点2------------
顶点3给顶点2 发送消息 12.0
----------调用3调用sendMsg发送消息给顶点6------------
//收到消息的顶点2调用vprog合并顶点属性
----------顶点2调用vprog:接受到的消息和自己的消息进行合并----------------
顶点2,顶点属性Infinity,收到消息12.0,合并后的属性12.0
//---------------第三次迭代---------------
----------调用2调用sendMsg发送消息给顶点5------------
----------调用2调用sendMsg发送消息给顶点1------------
顶点2给顶点1 发送消息 19.0
----------调用2调用sendMsg发送消息给顶点4------------
顶点2给顶点4 发送消息 14.0
----------顶点1调用vprog:接受到的消息和自己的消息进行合并----------------
顶点1,顶点属性Infinity,收到消息19.0,合并后的属性19.0
----------顶点4调用vprog:接受到的消息和自己的消息进行合并----------------
顶点4,顶点属性Infinity,收到消息14.0,合并后的属性14.0
//---------------第四次迭代---------------
----------调用4调用sendMsg发送消息给顶点1------------
顶点4给顶点1 发送消息 15.0
----------顶点1调用vprog:接受到的消息和自己的消息进行合并----------------
顶点1,顶点属性19.0,收到消息15.0,合并后的属性15.0
//---------- 第五次迭代不用发送消息,所有节点钝化 -----------------

通过对输出结果进行分析,可知,大致流程是首先在未调用pregel方法之前给每个节点一个初始值,然后通过调用pregel给每个顶点收到一条初始消息initialMsg,所有顶点处于激活状态,调用vprog对每个节点进行属性合并。然后每个激活态的顶点开始调用sendMsg根据 EdgeDirection.Out方向进行消息发送,将发送成功的顶点进行激活,其他顶点进行钝化处理,接收消息成功的顶点开始调用vprog进行合并顶点信息。这些被激活的顶点进行再次迭代,直到所有顶点钝化结束完成

Spark Graphx Pregel(pregel参数详解,pregel调用实现过程的详细解释)相关推荐

  1. XGBRegressor参数详解以及调参过程

    XGBRegressor参数详解以及调参过程 一.参数 1.通用参数booster.nthread 2.学习目标参数 (1)objective (2)eval_metric (3)seed 3.Tre ...

  2. 关于spark的sample()算子参数详解

    sample(withReplacement : scala.Boolean, fraction : scala.Double,seed scala.Long) sample算子时用来抽样用的,其有3 ...

  3. 04_Flink-HA高可用、Standalone集群模式、Flink-Standalone集群重要参数详解、集群节点重启及扩容、启动组件、Flink on Yarn、启动命令等

    1.4.Flink集群安装部署standalone+yarn 1.4.1.Standalone集群模式 1.4.2.Flink-Standalone集群重要参数详解 1.4.3.集群节点重启及扩容 1 ...

  4. Spark: sortBy和sortByKey函数详解

    在很多应用场景都需要对结果数据进行排序,Spark中有时也不例外.在Spark中存在两种对RDD进行排序的函数,分别是 sortBy和sortByKey函数.sortBy是对标准的RDD进行排序,它是 ...

  5. Spark SQL原理及常用方法详解(二)

    Spark SQL 一.Spark SQL基础知识 1.Spark SQL简介 (1)简单介绍 (2)Datasets & DataFrames (3)Spark SQL架构 (4)Spark ...

  6. 阿里云AMD服务器ECS实例g6a、c6a和r6a性能参数详解

    阿里云AMD服务器ECS计算型c6a.通用型g6a和内存型r6a实例,CPU采用2.6GHz主频的AMD EPYCTM ROME处理器,睿频3.3GHz,计算性能稳定,云服务器吧来详细说下阿里云ECS ...

  7. 阿里云AMD服务器ECS计算型c7a、通用型g7a和内存型r7a性能参数详解

    阿里云第二代AMD服务器ECS计算型c7a.通用型g7a和内存型r7a实例,CPU采用2.55 GHz主频的AMD EPYCTM MILAN处理器,单核睿频最高3.5 GHz,算力更强,云服务器吧详细 ...

  8. CI流水线配置文件参数详解(一)

    文章目录 4. 参数详解(一) 4.1 ``script`` 4.2 ``image`` 指定使用Docker镜像.如 ``iamge:name`` ,暂时忽略. 4.3 ``before_scrip ...

  9. 内存性能参数详解(转载)

    内存性能参数详解 先说说最有效提高你机器内存性能的几个参数:CL,TRP,TRCD CAS Latency "列地址选通脉冲潜伏期" BIOS中可能的其他描述为:tCL.CAS L ...

最新文章

  1. 解决工控网络通信协议威胁的实践
  2. python判断文件是否存在、不存在则创建_python判断文件是否存在,不存在就创建一个的实例...
  3. 通过live555实现H264 RTSP直播
  4. 笔记-高项案例题-2015年下-整体管理
  5. 巧妙设备MTU的大小,轻松提网速
  6. hbase1.1.1 连接集群_Hadoop2.7.1+Hbase1.1.2集群环境搭建(10) hadoop hbase kerberos
  7. 记录一次安恒信息面试过程
  8. 《响应式Web图形设计》一7.4 减小文件尺寸
  9. HDU 1166 - 敌兵布阵
  10. 编程基本功:谁改出的问题谁解,对不对?
  11. R语言文本聚类实例——以《金庸全集》为例
  12. Linux随笔15-Ubuntu20.04允许root用户图形界面登录、chrony局域网时间同步服务、ipxe实现系统自动化安装
  13. 计算空间中点到直线的距离
  14. python识别图片手写文字_Python徒手实现识别手写数字—简易图片数据库
  15. 【USACO】 录制唱片
  16. Linux GCC 编译过程分析及常用检错的编译选项
  17. 【自制壁纸生成器】2022新年壁纸领取,换一张手机壁纸,迎接2022叭~
  18. 低代码行业风涌云起,他们靠什么跻身全球第一阵营?
  19. OpenCV-图像二值化
  20. java面试一紧张头脑就空了_我在面试、答辩时非常紧张,大脑一片空白,说话时声音发抖,心跳加快,完全放松不下来,不受控制,怎么办?...

热门文章

  1. Unity3D粒子特效
  2. 爬虫概念以及网站首页爬取
  3. MySQL 关联查询多一列统计出勤
  4. Ubuntu引导修复/Ubuntu的暴力安装方法
  5. 设计模式—装饰者模式(用麻辣烫来分析)
  6. 关于Burp Suite Intruder 的四种攻击方式
  7. juicer无法正常渲染出结果
  8. 【华为OD机试真题】完美走位(C++Cjavapython)100%通过率 超详细代码注释 代码解读
  9. 百万年薪python之路 -- 字典(dict)练习
  10. 70个python练手项目,不可多得呀,建议白嫖!!!需要留邮箱哦