PageRank算法

PageRank算法原理剖析及Spark实现 - 简书 (jianshu.com)

import org.apache.spark.SparkContext
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSessioncase class User(name:String,age:Int,inDeg:Int,outDeg:Int)object GraphDemo4 {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName("sparkgraph").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextval users: RDD[(Long, (String, Int))] = sc.makeRDD( //元组里面不限类型Array((1L, ("Alice", 28)),(2L, ("Bob", 27)),(3L, ("Charlie", 65)),(4L, ("David", 42)),(5L, ("Ed", 55)),(6L, ("Fran", 50)),(7L,("KB11",8)),(8L,("KB12",7)),(9L,("KB13",9))))val edges: RDD[Edge[Int]] = sc.makeRDD(Array(Edge(2L, 1L, 7),Edge(3L, 2L, 4),Edge(4L, 1L, 1),Edge(2L, 4L, 2),Edge(5L, 2L, 2),Edge(5L, 3L, 8),Edge(3L, 6L, 3),Edge(5L, 6L, 3),Edge(7L,8L,12),Edge(8L,9L,32),Edge(9L,7L,35)))val graph: Graph[(String, Int), Int] = Graph(users,edges)val graph1: Graph[User, Int] = graph.mapVertices{case(id,(name,age))=>{User(name,age,0,0)}}
//    graph.mapVertices{(x,y)=>User(y._1,y._2,0,0)}graph1.vertices.collect().foreach(println)println("---------------------------------")val inDegrees: VertexRDD[Int] = graph.inDegreesinDegrees.collect().foreach(println)val graph2: Graph[User, Int] = graph1.outerJoinVertices(inDegrees)((id, user, inDeg)=>{User(user.name,user.age,inDeg.getOrElse(0),0)})graph2.vertices.foreach(println)val outDegrees: VertexRDD[Int] = graph.outDegreesprintln("-------------------------")outDegrees.foreach(println)val graph3: Graph[User, Int] = graph2.outerJoinVertices(outDegrees)((id, user, outDeg)=>{User(user.name,user.age,user.inDeg,outDeg.getOrElse(0))})println("-------------------------")graph3.vertices.foreach(x=>println(x._2.name+"喜欢 "+x._2.outDeg+"人,被 "+x._2.inDeg+"人喜欢。"))
//    graph3.vertices.foreach{case (x,y)=>{println(y.name+"喜欢 "+y.outDeg+"人,被")+y.inDeg+"人喜欢")}}println("---------------pageRank-----------------------")val graph44: Graph[Double, Double] = graph.pageRank(0.0001)graph44.vertices.foreach(println)/**0,0001* (6,0.9969646507526427)* (2,0.9969646507526427)* (1,1.7924127957615184)* (5,0.5451618049228395)* (3,0.6996243163176441)* (4,0.9688717814927127)*/println("---------connectedComponents------------")val graphConn: Graph[VertexId, Int] = graph.connectedComponents()graphConn.triplets.collect().foreach(println)val emailRDD: RDD[(Long, String)] = sc.parallelize(Array((1L, "qq.com"),(3L, "163.com"),(6L, "souhu.com"),(7L,"fox.com")))val phoneRDD: RDD[(Long, String)] = sc.parallelize(Array((1L, "12345678922"),(3L, "22232342433"),(6L, "23543652577")))val graphjoin: Graph[(String, Int), Int] = graph.joinVertices(emailRDD)((id, y, z)=>{(y._1+"@"+z,y._2)})
//    graph.vertices.collect().foreach(println)println("----------------------------------------------")
//    graphjoin.vertices.collect().foreach(println)println("-----------------------------------")val graphjoin2: Graph[(String, Int), Int] = graph.outerJoinVertices(emailRDD)((id, y, z)=>{(y._1+"@"+z.getOrElse("tao.com"),y._2)})
//    graph.vertices.collect().foreach(println)/*** (4,(David@tao.com,42))* (1,(Alice@qq.com,28))* (5,(Ed@tao.com,55))* (6,(Fran@souhu.com,50))* (2,(Bob@tao.com,27))* (3,(Charlie@163.com,65))*/println("----------------------------------------------")
//    graphjoin2.vertices.collect().foreach(println)println("--------------------------------------------------")/*** (4,(David,42))* (1,(Alice,28))* (5,(Ed,55))* (6,(Fran,50))* (2,(Bob,27))* (3,(Charlie,65))*/val graphjoin3: Graph[(String, Int), Int] = graph.outerJoinVertices(phoneRDD)((id, y, z)=>{(y._1+":"+z.getOrElse("13611112222"),y._2)})
//    graph.vertices.collect().foreach(println)println("----------------------------------------------")
//    graphjoin3.vertices.collect().foreach(println)/*** (4,(David:13611112222,42))* (1,(Alice:12345678922,28))* (5,(Ed:13611112222,55))* (6,(Fran:23543652577,50))* (2,(Bob:13611112222,27))* (3,(Charlie:22232342433,65))*/}}

pregel函数

Spark GraphX 中的 pregel函数_Bamdli-CSDN博客

import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSessionobject GraphDemo5 {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName("sparkgraph").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextval users: RDD[(Long, (String, Int))] = sc.makeRDD( //元组里面不限类型Array((1L, ("Alice", 28)),(2L, ("Bob", 27)),(3L, ("Charlie", 65)),(4L, ("David", 42)),(5L, ("Ed", 55)),(6L, ("Fran", 50))))val edges: RDD[Edge[Int]] = sc.makeRDD(Array(Edge(2L, 1L, 7),Edge(3L, 2L, 4),Edge(4L, 1L, 1),Edge(2L, 4L, 2),Edge(5L, 2L, 2),Edge(5L, 3L, 8),Edge(3L, 6L, 3),Edge(5L, 6L, 3)))val graph: Graph[(String, Int), Int] = Graph(users,edges)val startVertexId=5Lval initGraph: Graph[Double, Int] = graph.mapVertices {case (vid, (name, age)) => {if (vid == 5L) 0.0else Double.MaxValue}}initGraph.vertices.foreach(println)/*** (4,1.7976931348623157E308)* (6,1.7976931348623157E308)* (1,1.7976931348623157E308)* (2,1.7976931348623157E308)* (3,1.7976931348623157E308)* (5,0.0)*/println("------------------------------------------------")val pregelGraph: Graph[Double, PartitionID] = initGraph.pregel(Double.PositiveInfinity, //初始值10,EdgeDirection.Out)((vid: VertexId, vd: Double, disMsg: Double) => { //vprog: (VertexId, VD, A) => VD,val min: Double = math.min(vd, disMsg)println(s"顶点$vid, 属性$vd 收到的消息$disMsg 属性值与收到的消息比较后结果取最小值 $min")min},(edgeTriplet: EdgeTriplet[Double, PartitionID]) => {println(s"顶点${edgeTriplet.srcId} 给${edgeTriplet.dstId} 发送消息:${edgeTriplet.srcAttr} " +s"与${edgeTriplet.attr} 相加结果: ${edgeTriplet.srcAttr + edgeTriplet.attr}")if (edgeTriplet.srcAttr + edgeTriplet.attr < edgeTriplet.dstAttr)Iterator[(VertexId, Double)]((edgeTriplet.dstId, edgeTriplet.srcAttr + edgeTriplet.attr))elseIterator.empty},(msg1: Double, msg2: Double) => {math.min(msg1, msg2)})pregelGraph.vertices.foreach(println)/*** 顶点4, 属性1.7976931348623157E308 收到的消息Infinity 属性值与收到的消息比较后结果取最小值 1.7976931348623157E308* 顶点3, 属性1.7976931348623157E308 收到的消息Infinity 属性值与收到的消息比较后结果取最小值 1.7976931348623157E308* 顶点1, 属性1.7976931348623157E308 收到的消息Infinity 属性值与收到的消息比较后结果取最小值 1.7976931348623157E308* 顶点5, 属性0.0 收到的消息Infinity 属性值与收到的消息比较后结果取最小值 0.0* 顶点6, 属性1.7976931348623157E308 收到的消息Infinity 属性值与收到的消息比较后结果取最小值 1.7976931348623157E308* 顶点2, 属性1.7976931348623157E308 收到的消息Infinity 属性值与收到的消息比较后结果取最小值 1.7976931348623157E308* 顶点5 给2 发送消息:0.0 与2 相加结果: 2.0* 顶点5 给3 发送消息:0.0 与8 相加结果: 8.0* 顶点2 给4 发送消息:1.7976931348623157E308 与2 相加结果: 1.7976931348623157E308* 顶点4 给1 发送消息:1.7976931348623157E308 与1 相加结果: 1.7976931348623157E308* 顶点2 给1 发送消息:1.7976931348623157E308 与7 相加结果: 1.7976931348623157E308* 顶点3 给2 发送消息:1.7976931348623157E308 与4 相加结果: 1.7976931348623157E308* 顶点3 给6 发送消息:1.7976931348623157E308 与3 相加结果: 1.7976931348623157E308* 顶点5 给6 发送消息:0.0 与3 相加结果: 3.0* 顶点6, 属性1.7976931348623157E308 收到的消息3.0 属性值与收到的消息比较后结果取最小值 3.0* 顶点2, 属性1.7976931348623157E308 收到的消息2.0 属性值与收到的消息比较后结果取最小值 2.0* 顶点3, 属性1.7976931348623157E308 收到的消息8.0 属性值与收到的消息比较后结果取最小值 8.0* 顶点2 给4 发送消息:2.0 与2 相加结果: 4.0* 顶点3 给6 发送消息:8.0 与3 相加结果: 11.0* 顶点2 给1 发送消息:2.0 与7 相加结果: 9.0* 顶点3 给2 发送消息:8.0 与4 相加结果: 12.0* 顶点4, 属性1.7976931348623157E308 收到的消息4.0 属性值与收到的消息比较后结果取最小值 4.0* 顶点1, 属性1.7976931348623157E308 收到的消息9.0 属性值与收到的消息比较后结果取最小值 9.0* 顶点4 给1 发送消息:4.0 与1 相加结果: 5.0* 顶点1, 属性9.0 收到的消息5.0 属性值与收到的消息比较后结果取最小值 5.0* (1,5.0)* (5,0.0)* (4,4.0)* (6,3.0)* (2,2.0)* (3,8.0)*/}}

航班飞行网图分析

import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSessionobject FlightDemo {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName("flightdemo").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextval flightRDD: RDD[String] = sc.textFile("in/flight.csv")
//    flightRDD.collect().foreach(println)//找出所有机场编号 和  机场名称val airPort: RDD[(VertexId, String)] = flightRDD.map(x => x.split(",")).flatMap(x => Array((x(5).toLong, x(6)), (x(7).toLong, x(8)))).distinct()  //(12953,LGA)//找出所有航线和航线间的距离val lines: RDD[Edge[Int]] = flightRDD.map(x => x.split(",")).map(x => (x(5).toLong, x(7).toLong, x(16).toInt)).distinct().map(x => Edge(x._1, x._2, x._3))//    lines.collect().foreach(println)val graph: Graph[String, Int] = Graph(airPort,lines)
//    graph.triplets.collect().foreach(println)//求机场数量(求顶点个数)val vertices: VertexId = graph.numVertices//求航线数量(求边个数)val edges: VertexId = graph.numEdgesprintln("机场数量:"+ vertices + "航线数量:"+ edges)//求最长飞行线路(最长的边)graph.triplets.sortBy(x=>x.attr,false).take(3).foreach(x=>println(x.srcAttr+"到达"+x.dstAttr+"距离是"+x.attr))//求最短飞行线路(最短的边)graph.triplets.sortBy(x=>x.attr, true).take(3).foreach(x=>println(x.srcAttr+"到达"+x.dstAttr+"距离是"+x.attr))//入场航班最多的机场val value: RDD[(VertexId, Int)] = graph.inDegrees.sortBy(x=>x._2,false)val tuple: (VertexId, Int) = value.take(1)(0)println("进场航班最多的机场" +tuple)//出厂航班最多的机场val value2: RDD[(VertexId, Int)] = graph.outDegrees.sortBy(x=>x._2,false)val tuple2: (VertexId, Int) = value2.take(1)(0)println("出场航班最多的机场" +tuple2)//找出最重要的前三个机场graph.pageRank(0.05).vertices.sortBy(x=>x._2,false).take(3).foreach(println)println("--------------------------------------")//找出10397机场到其它机场的最便宜的航线val startAirPort=10397Lval init_graph: Graph[Double, Double] = graph.mapVertices((id, name) => {if (id == startAirPort) 0.0else Double.MaxValue}).mapEdges(e => 180 + e.attr.toDouble * 0.15)
//    init_graph.triplets.take(3).foreach(println)val pregel_graph: Graph[Double, Double] = init_graph.pregel(Double.MaxValue,Int.MaxValue,EdgeDirection.Out)((id, dist, new_dist) => {math.min(dist, new_dist)},(triple) => {if (triple.attr + triple.srcAttr < triple.dstAttr)Iterator((triple.dstId, triple.attr + triple.srcAttr)) //合并属性elseIterator.empty},(new_dist1, new_dist2) => {math.min(new_dist1, new_dist2)})pregel_graph.vertices.filter(x=>x._1==12892L).collect().foreach(println)/*** 机场数量:301航线数量:4088* HNL到达JFK距离是4983* JFK到达HNL距离是4983* HNL到达EWR距离是4963* PSG到达WRG距离是31* WRG到达PSG距离是31* ACV到达CEC距离是56* 进场航班最多的机场(10397,152)* 出场航班最多的机场(10397,153)* (10397,11.060247708032241)* (13930,10.805558753161533)* (11298,10.652656481033038)* --------------------------------------* (12892,472.05)*/}}

Spark GraphX 中的PageRank算法、pregel函数、航班飞行网图分析相关推荐

  1. Spark项目模拟——航班飞行网图分析

    文章目录 Spark项目模拟--航班飞行网图分析 需求描述 数据准备 需求思路整理 代码实现 总结 机场数量: 航线数量: 最大的边属性: 哪个机场到达航班最多: 找出最重要的飞行航线: 找出最便宜的 ...

  2. Spark GraphX-航班飞行网图分析

    文章目录 1.如下图所示的航班表,解决以下问题: 2.思路如下: 3.代码如下: 4.代码如下: 1.如下图所示的航班表,解决以下问题: 统计航班飞行网图中机场的数量 统计航班飞行网图中航线的数量 计 ...

  3. Spark GraphX 中的 pregel函数(转载)

    文章目录 pregel函数源码 与 各个参数介绍: 案例: 求顶点5 到 其他各顶点的 最短距离 pregel原理分析 一篇关于 Spark GraphX 中 pregel函数 的笔记,通过一个小案例 ...

  4. 大数据——Spark GraphX中算法介绍

    一.ConnectedComponents算法 ConnectedComponents即连通体算法用id标注图中每个连通体,将连通体中序号最小的顶点的id作为连通体的id. 图关系如下时: //创建点 ...

  5. 在微博中应用PageRank算法

    这个想法很早就有了,因为我是做搜索引擎背景的,能够深刻的理解PageRank算法在搜索引擎中的重要性,绝对的核心技术之一.不过,这篇博客,并不打算介绍PageRank算法的原理,而是,让我们来看看,这 ...

  6. 搜索引擎中的PageRank算法

    PageRank算法 PageRank算法总的来说就是预先给每个网页一个PR值(下面用PR值指代PageRank值),由于PR值物理意义上为一个网页被访问概率,所以一般是 1 N \frac{1}{N ...

  7. Vue中的Diff算法 patch函数-简单Diff算法-双端Diff算法-快速Diff算法-当数据发生改变,视图如何更新?

    文章目录 Vue中的Diff算法 概述 前置知识 patch方法 简单Diff算法 总结 双端Diff算法 --vue2 快速Diff算法 --vue3 vue2和vue3 Diff算法的区别 当数据 ...

  8. R语言中使用非凸惩罚函数回归(SCAD、MCP)分析前列腺数据

    原文链接:http://tecdat.cn/?p=20828 本文使用lasso或非凸惩罚拟合线性回归,GLM和Cox回归模型的正则化,特别是_最小_最_大凹_度_惩罚_函数_(MCP)_和光滑切片绝 ...

  9. 【Spark GraphX】社交网图分析

    目录 一.数据 1.数据关系图 2. 数据说明 3.顶点表 4.边表 二.需求 三.需求实现 1.构造fans网图 2.找出年龄大于30岁的顶点 3.找出边属性大于5的边 4.将每个顶点的年龄+20 ...

最新文章

  1. TextView-- 测量文字宽度
  2. 对不起,我的代码评审毁了一个程序员!
  3. 共享一个简单的 Javacript Helper library
  4. 笔记本电源适配器的选择方法
  5. python爬虫post请求_Python爬虫之GET和POST请求
  6. 在eclipse上Checkstyle的安装和使用
  7. eclipse中经常用到的快捷键
  8. TreeView控件二(递归算法)
  9. strcompare php,PHP中的startswith()和endsWith()函数
  10. 一个form 如何做两次提交_如何做一个自信魅力的女人
  11. 3-16Pytorch与随机抽样
  12. 《TensorFlow 2.0深度学习算法实战教材》学习笔记(九、卷积神经网络)
  13. 每日一练||第二周汇总
  14. c语言程序设计基础所有知识点,《C语言程序设计》基础知识点总结.doc
  15. 贪吃蛇游戏-小程序游戏
  16. 龙芯2k1000-pmon(5)- pmon无法修改环境变量的问题
  17. android的wifi开发,android开发教程之wifi开发示例
  18. 苹果手机怎么备份通讯录?
  19. UI网页设计制作思路
  20. TextView(文本框)

热门文章

  1. 计算机组成原理期末考试:三种地址映射方式
  2. Python 下划线 单下划线 双下划线的作用
  3. 基于51单片机和霍尔传感器的测速
  4. TX2 Linux配置组播(Multicast)
  5. 2016年魅族Java研发面试总结
  6. C#中的函数重载和构造函数
  7. NOJ [1120] Reimu\'s Teleport
  8. Istio 1.12 引入 Wasm 插件配置 API 以扩展 Istio 生态
  9. S7 1200 PLC对编码器计数,断电记忆
  10. 中职计算机组装与维修试卷,中职计算机组装与维修试卷-20210601144450.docx-原创力文档...