大数据基础之Spark——Spark pregel详细过程,一看就懂
Pregel概述
- Pregel是Google提出的用于大规模分布式图计算框架
- 图遍历(BFS)
- 单源最短路径(SSSP)
- PageRank计算 - Pregel的计算由一系列迭代组成,称为supersteps
- Pregel迭代过程
- 每个顶点从上一个superstep接收入站消息
- 计算顶点新的属性值
- 在下一个superstep中想相邻的顶点发送消息
- 当没有剩余消息是,迭代结束
Pregel原理分析
pregel函数源码以及各个参数的简介:
def pregel[A: ClassTag](initialMsg: A,maxIterations: Int = Int.MaxValue,activeDirection: EdgeDirection = EdgeDirection.Either)(vprog: (VertexId, VD, A) => VD,sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],mergeMsg: (A, A) => A): Graph[VD, ED] = {Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)}
参数 | 说明 |
---|---|
initialMsg | 图初始化的时候,开始模型计算的时候,所有节点都会收到一个消息 |
maxIterations | 最大迭代次数 |
activeDirection | 规定了发送消息的方向 |
vprog | 节点接收改消息将聚合后的数据和本节点进行属性的合并 |
sendMsg | 激活状态节点调用该方法发送消息 |
mergeMsg | 如果一个节点接收到多条消息,先用mergeMsg来将多条消息聚合成为一条消息,如果节点只收到一条消息,则不调用改函数 |
案例:求5顶点到1顶点的最短距离
- 顶点的两个状态:
- 钝化态:类似于休眠,不做任何处理
- 激活态:可以进行数据的接受和发送 - 顶点能够处于激活状态需要的条件
- 成功收到消息
- 成功发送任何一条消息
首先,给5节点的初始值为0,其他值的初始值为正无穷大
然后从5节点出发,每次用发送方的值+权重和接受方的值相比较,取小值作为接受方的值,然后再以此方式发送给下一节点,如果发送方的值+权重大于接受方的值,则无法发送给下一节点。根据这种方法得到的结果如下图:
最终得到的5顶点到1顶点的最短距离为15。
代码实现:
val conf = new SparkConf().setMaster("local[2]").setAppName("mytst")val sc = SparkContext.getOrCreate(conf)val vect = sc.parallelize(Array((1L, ("Alice", 28)),(2L, ("Bob", 27)),(3L, ("Charlie", 65)),(4L, ("David", 42)),(5L, ("Ed", 55)),(6L, ("Fran", 50))))val edges = sc.parallelize(Array(Edge(2L, 1L, 7),Edge(2L, 4L, 2),Edge(3L, 2L, 4),Edge(3L, 6L, 3),Edge(4L, 1L, 1),Edge(2L, 5L, 2),Edge(5L, 3L, 8),Edge(5L, 6L, 3)))val graphx = Graph(vect,edges)// 设置起始顶点val srcVectId = 5Lval initialGraph = graphx.mapVertices({case (vid,(name,age))=>if (vid==5L) 0.0 else Double.PositiveInfinity})// 调用pregelval pregelGraph = initialGraph.pregel(Double.PositiveInfinity, //每个点的初始值,无穷大Int.MaxValue, //最大迭代次数EdgeDirection.Out //发送信息的方向)( //vprog(接受到的消息和自己的消息进行合并)(vid:VertexId,vd:Double,distMsg:Double)=>{val minDist = math.min(vd,distMsg)println(s"顶点${vid},属性${vd},收到消息${distMsg},合并后的属性${minDist}")minDist},(edgeTriplet:EdgeTriplet[Double,PartitionID])=>{ //发送消息,如果自己的消息+权重<目的地的消息,则发送if(edgeTriplet.srcAttr+edgeTriplet.attr<edgeTriplet.dstAttr){println(s"顶点${edgeTriplet.srcId} 给顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}")Iterator[(VertexId,Double)]((edgeTriplet.dstId,edgeTriplet.srcAttr+edgeTriplet.attr))}else{Iterator.empty}},(msg1:Double,msg2:Double)=>math.min(msg1,msg2) //多条接收消息,mergeMessage,取小合并多条消息)// 输出结果pregelGraph.triplets.foreach(println)println(pregelGraph.vertices.collect.mkString(","))// 关闭资源sc.stop()
迭代结果展示:
//------------------------------------------ 各个顶点接受初始消息initialMsg ------------------------------------------
顶点3,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点2,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点4,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点6,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点1,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点5,属性0.0,收到消息Infinity,合并后的属性0.0
//------------------------------------------ 第一次迭代 ------------------------------------------
顶点5 给 顶点6 发送消息 3.0
顶点5 给 顶点3 发送消息 8.0
顶点3,属性Infinity,收到消息8.0,合并后的属性8.0
顶点6,属性Infinity,收到消息3.0,合并后的属性3.0
//------------------------------------------ 第二次迭代 ------------------------------------------
顶点3 给 顶点2 发送消息 12.0
顶点2,属性Infinity,收到消息12.0,合并后的属性12.0
//------------------------------------------ 第三次迭代 ------------------------------------------
顶点2 给 顶点4 发送消息 14.0
顶点2 给 顶点1 发送消息 19.0
顶点1,属性Infinity,收到消息19.0,合并后的属性19.0
顶点4,属性Infinity,收到消息14.0,合并后的属性14.0
//------------------------------------------ 第四次迭代 ------------------------------------------
顶点4 给 顶点1 发送消息 15.0
顶点1,属性19.0,收到消息15.0,合并后的属性15.0
//------------------------------------------ 第五次迭代不用发送消息 ------------------------------------------
大数据基础之Spark——Spark pregel详细过程,一看就懂相关推荐
- 【大数据基础】vmware+ununtu安装详细过程
环境配置 1. 虚拟机VMware下载与安装 首先安装vmware workstation 密钥直接利用科技生成. 检查vmware是否成功安装 2. VMware安装Ubuntu18.04 LTS ...
- Spark大数据开发学习:Spark基础入门
在诸多的大数据技术框架当中,Spark可以说是占据了非常重要的地位,继Hadoop之后,基于实时数据处理需求的不断上升,Spark开始占据越来越大的市场份额,而Spark,也成为大数据的必学知识点.今 ...
- 【计算机大数据毕设之基于spark+hadoop的大数据分析论文写作参考案例】
[计算机大数据毕设之基于spark+hadoop的大数据分析论文写作参考案例-哔哩哔哩] https://b23.tv/zKOtd3L 目 录 一 引言1 二 系统分析2 2.1 必要性和可行性 ...
- 大数据改变世界,Spark改变大数据——中国Spark技术峰会见闻
作者:张相於,当当网推荐系统开发经理 责编:周建丁(zhoujd@csdn.net) 笔者5月15日参加了"中国云计算技术大会"中的"中国Spark技术峰会", ...
- 大数据基础学习三:Ubuntu下安装VMware Tools超详细步骤及需要注意的问题(以ubuntu-18.04.3、Mware Workstation 15.1.0 Pro为例)
大数据基础学习三:Ubuntu下安装VMware Tools超详细步骤及需要注意的问题 (以ubuntu-18.04.3.Mware Workstation 15.1.0 Pro for Window ...
- spark启动的worker节点是localhost_Spark大数据在线培训:Spark运行原理解析
在大数据技术框架当中,Spark是继Hadoop之后的又一代表性框架,也是学习大数据当中必学的重点技术框架.在这些年的发展当中,Spark所占据的市场地位,也在不断拓展.今天的Spark大数据在线培训 ...
- 大数据项目(基于spark)--新冠疫情防控指挥作战平台项目
大数据项目(基于spark)–新冠疫情防控指挥作战平台项目 文章目录 第一章 项目介绍 1.1 项目背景 1.2 项目架构 1.3 项目截图 1.4 功能模块 第二章 数据爬取 2.1 数据清单 2. ...
- CDH大数据平台搭建之SPARK集群搭建
CDH大数据平台搭建之SPARK集群搭建 一.安装规划 二.下载 三.安装及配置 1.解压 2.修改配置文件 四.启动 五.JPS检查节点 一.安装规划 参考CHD大数据平台搭建之集群规划 二.下载 ...
- 我眼中的大数据(五)——Spark
CSDN话题挑战赛第2期 参赛话题:大数据技术分享 Hadoop MapReduce虽然已经可以满足大数据的应用场景,但是其执行速度和编程复杂度并不让人们满意.Spark因其拥有更快的执行速度和更友好 ...
- 大数据基础——Hadoop大数据平台搭建
文章目录 前言 Hadoop大数据平台搭建 一.Hadoop原理和功能介绍 二.Hadoop安装部署 三.Hadoop常用操作 总结 前言 分布式机器学习为什么需求大数据呢?随着海量用户数据的积累,单 ...
最新文章
- 【组队学习】【31期】动手学数据分析
- mysql slave 线程 简书_MySQL主从复制(Master-Slave)实践
- qt 判断ctrl键被按下_惊雷!证监会公告,又一家千亿白马股被按下“暂停键”...
- 【整理】使用云计算服务面临的几个安全问题
- PhpStorm配置SVN的完整方法
- 微信支付 企业转账 小程序发红包 提现 发红包 企业支付等遇到的问题
- php js跨域上传文件,Jquery实现跨域异步上传文件步骤详解
- Redhat 6 git 服务器配置(gitweb)
- 毕设题目:Matlab DTMF双音多频
- PDFLIB C++修改及使用说明
- 基于SpringBoot在线电影订票系统
- 使用OpenSSL生成证书 pem文件的生成 celery Security
- 关闭windows自动更新N种方法
- [C++杂谈]:MFC中使用excel2007读写excel表格
- 有赞实时数仓建设实践与经验
- dw超链接标签_使用Dreamweaver制作网页超链接的详细图文教程--系统之家
- 0002计算机组成原理与体系结构02
- 锐龙r5 5600h核显什么水平 r5 5600h属于什么级别
- Ubuntu18.04安装ROS最后的rosdep update不成功解决方案(亲测可行)
- Kafka 压缩、限流和 SASL_PLAIN 、 SASL_SCRAM-SHA-256简单认证
热门文章
- html title中加图标,科技常识:HTML中title前面小图标的实现_如何给网页标题添加icon小图标...
- 多模态语义检索 | 基于 MetaSpore 快速部署 HuggingFace 预训练模型
- 哪些技术会决定前端开发者的未来发展?
- JAVAEE和项目开发——JSP详解
- VBA判断win操作系统是32位还是64位
- A2Billing 代码分析
- 实现一个简单的抽奖系统
- 转转支付网关之注解式HTTP客户端
- C语言程序设计简谐运动的模拟,C语言课程设计简谐振动实验的模拟.doc
- matlab信息隐藏图片,信息隐藏 实验二 BMP 图像信息隐藏