1. PageRank

http://blog.csdn.net/hguisu/article/details/7996185

2. Connected Components

3. Triangle Counting

例子:

users.txt

1
2
3
4
5
6
7
1,BarackObama,Barack Obama
2,ladygaga,Goddess of Love
3,jeresig,John Resig
4,justinbieber,Justin Bieber
6,matei_zaharia,Matei Zaharia
7,odersky,Martin Odersky
8,anonsys

followers.txt

1
2
3
4
5
6
7
8
2 1
4 1
1 2
6 3
7 3
7 6
6 7
3 7

算法实战:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main.scala
import org.apache.spark.graphx.GraphLoader
import org.apache.spark.{SparkConf, SparkContext}
object graphx_algorism {
  System.setProperty("hadoop.home.dir","E:/zhuangji/winutil/")
  def main(args:Array[String]):Unit={
    val conf=new SparkConf().setMaster("local[2]").setAppName("graph_algorism").set("spark.cores.max","10"//set spark.cores.max 可以设置核数
    val sc=new SparkContext(conf)
    // graph初始化,从文件中读
    val graph=GraphLoader.edgeListFile(sc,"E:/Java_WS/ScalaDemo/data/followers.txt")
    val users=sc.textFile("E:/Java_WS/ScalaDemo/data/users.txt").map{
      line=>val fields=line.split(",")
        (fields(0).toLong,fields(1))
    }
     
    // 1.
    //PageRank
    val ranks=graph.pageRank(0.001).vertices  // 0.001 是PageRank 的参数,尚未知道是什么意思
    ranks.collect.foreach(println)
    val ranksByUsername=users.join(ranks).map{
      case(id,(username,rank))=>(username,rank)
    }
    println(ranksByUsername.collect().mkString("\n"))
     
    //2.
    // Connected Components: LianTongTi
    val cc=graph.connectedComponents().vertices
    println(cc.collect)
    val ccByUsername=users.join(cc).map{
      case(id,(username,cc))=>(username,cc)
    }
    println(ccByUsername.collect().mkString("\n"))
     
    //3.
    //Triangle Count
    val graphT=GraphLoader.edgeListFile(sc,"E:/Java_WS/ScalaDemo/data/followers.txt",true).partitionBy(PartitionStrategy.RandomVertexCut)
    val triCounts=graphT.triangleCount().vertices
    val triCountByUsername=users.join(triCounts).map{case(id,(username,tc))=>(username,tc)}
    println(triCountByUsername.collect().mkString("\n"))
}

图论简介

图的组成

离散数学中非常重要的一个部分就是图论,下面是一个无向连通图

顶点(vertex)

上图中的A,B,C,D,E称为图的顶点。

顶点与顶点之间的连线称之为边。

图的数学表示

读大学的时候,一直没有想明白为什么要学劳什子的线性代数。直到这两天看《数学之美》一书时,才发觉,线性代数在一些计算机应用领域,那简直就是不可或缺啊。

我们比较容易理解的平面几何和立体几何(一个是二维,一个是三维),而线性代数解决的其实是一个高维问题,由于无法直觉的感受到,所以很难。如果想比较通俗的理解一下数学为什么有这么多的分支及其内在关联,强烈推荐读一下《数学桥 对高等数学的一次观赏之旅》。

在数学中,用什么来表示图呢,答案就是线性代数里面的矩阵,想想看,图的关联矩阵,图的邻接矩阵。总之就是矩阵啦,线性代数一下子有用了。下面是一个具体的例子。

图的并行化处理

刚才说到图可以用矩阵来表示,图的并行化问题在某种程度上就被转化为矩阵运算的并行化问题。

那么以矩阵的乘法为例,看看其是否可以并行化处理。

以矩阵 A X B 为例,说明并行化处理过程。

将上述的矩阵A和B划分为四个部分,如下图所示

首次对齐之后

子矩阵相乘

相乘之后,A的子矩阵左移,B的子矩阵上移

计算结果合并

图的并行化处理框架,从Pregel说起

上一节的重点有两点

  1. 图用矩阵来表示,对图的运算就是矩阵的运算
  2. 矩阵乘法运算可以并行化,动态演示其并行化的原理

你说ok,我明白了。哪有没有一种合适的并行化处理框架可以用来进行图的计算呢,那你肯定想到了MapReduce。

MapReduce尽管也是一个不错的并行化处理框架,但在图计算方面,有许多缺点,主要是计算的中间过程需要存储到硬盘,效率很低。

Google针对图的并行处理,专门提出了一个了不起的框架Pregel。其执行时的动态视图如下所示。

Pregel有如下优点

  • 级联可扩性好 scalability
  • 容错性强
  • 能够很好的表示各种图的常用算法

Pregel的计算模型

计算模型如下图所示,重要的有三个

  1. 作用于每个顶点的处理逻辑 vertexProgram
  2. 消息发送,用于相邻节点间的通讯 sendMessage
  3. 消息合并逻辑 messageCombining

Pregel在Spark中的实现

非常感谢你能坚持看到现在,这篇博客内容很多,有点难。我想还是上一幅图将其内在逻辑整一下再继续说下去。

该图要表示的意思是这样的,Graphx利用了Spark这样了一个并行处理框架来实现了图上的一些可并行化执行的算法。

本篇博客要表达的意思就是上面加红的这句话,请诸位看官仔细理解。

  • 算法是否能够并行化与Spark本身无关
  • 算法并行化与否的本身,需要通过数学来证明
  • 已经证明的可并行化算法,利用Spark来实现会是一个错的选择,因为Graphx支持pregel的图计算模型

Graphx中的重要概念

Graph

毫无疑问,图本身是graphx中一个非常重要的概念。

成员变量

graph中重要的成员变量分别为

  1. vertices
  2. edges
  3. triplets

为什么要引入triplets呢,主要是和Pregel这个计算模型相关,在triplets中,同时记录着edge和vertex. 具体代码就不罗列了。

成员函数

函数分成几大类

  1. 对所有顶点或边的操作,但不改变图结构本身,如mapEdges, mapVertices
  2. 子图,类似于集合操作中的filter subGraph
  3. 图的分割,即paritition操作,这个对于Spark计算来说,很关键,正是因为有了不同的Partition,才有了并行处理的可能, 不同的PartitionStrategy,其收益不同。最容易想到的就是利用Hash来将整个图分成多个区域。
  4. outerJoinVertices 顶点的外连接操作

图的运算和操作 GraphOps

图的常用算法是集中抽象到GraphOps这个类中,在Graph里作了隐式转换,将Graph转换为GraphOps

implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED]): GraphOps[VD, ED] = g.ops

支持的操作如下

  1. collectNeighborIds
  2. collectNeighbors
  3. collectEdges
  4. joinVertices
  5. filter
  6. pickRandomVertex
  7. pregel
  8. pageRank
  9. staticPageRank
  10. connectedComponents
  11. triangleCount
  12. stronglyConnectedComponents

RDD

RDD是Spark体系的核心,那么Graphx中引入了哪些新的RDD呢,有俩,分别为

  1. VertexRDD
  2. EdgeRDD

较之EdgeRdd,VertexRDD更为重要,其上的操作也很多,主要集中于Vertex之上属性的合并,说到合并就不得不扯到关系代数和集合论,所以在VertexRdd中能看到许多类似于sql中的术语,如

  • leftJoin
  • innerJoin

至于leftJoin, innerJoin, outerJoin的区别,建议谷歌一下,不再赘述。

Graphx场景分析

图的存储和加载

在进行数学计算的时候,图用线性代数中的矩阵来表示,那么如何进行存储呢?

学数据结构的时候,老师肯定说过好多的办法,不再啰嗦了。

不过在大数据的环境下,如果图很巨大,表示顶点和边的数据不足以放在一个文件中怎么办? 用HDFS

加载的时候,一台机器的内存不足以容下怎么办? 延迟加载,在真正需要数据时,将数据分发到不同机器中,采用级联方式。

一般来说,我们会将所有与顶点相关的内容保存在一个文件中vertexFile,所有与边相关的信息保存在另一个文件中edgeFile。

生成某一个具体的图时,用edge就可以表示图中顶点的关联关系,同时图的结构也表示出来了。

GraphLoader

graphLoader是graphx中专门用于图的加载和生成,最重要的函数就是edgeListFile,定义如下。

def edgeListFile(sc: SparkContext,path: String,canonicalOrientation: Boolean = false,minEdgePartitions: Int = 1,edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[Int, Int] ={val startTime = System.currentTimeMillis// Parse the edge data table directly into edge partitionsval lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)val edges = lines.mapPartitionsWithIndex { (pid, iter) =>val builder = new EdgePartitionBuilder[Int, Int]iter.foreach { line =>if (!line.isEmpty && line(0) != '#') {val lineArray = line.split("\\s+")if (lineArray.length < 2) {logWarning("Invalid line: " + line)}val srcId = lineArray(0).toLongval dstId = lineArray(1).toLongif (canonicalOrientation && srcId > dstId) {builder.add(dstId, srcId, 1)} else {builder.add(srcId, dstId, 1)}}}Iterator((pid, builder.toEdgePartition))}.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))edges.count()logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,vertexStorageLevel = vertexStorageLevel)} // end of edgeListFile

应用举例之PageRank

什么是PageRank

PageRank是Google专有的算法,用于衡量特定网页相对于搜索引擎索引中的其他网页而言的重要程度。它由Larry Page 和 Sergey Brin在20世纪90年代后期发明。PageRank实现了将链接价值概念作为排名因素。
PageRank将对页面的链接看成投票,指示了重要性。

pageRank的核心思想

”在互联网上,如果一个网页被很多其它网页所链接,说明它受到普遍的承认和依赖,那么它的排名就很高。“  (摘自数学之美第10章)

你说这也太简单了吧,不是跟没说一个样吗,怎么用数学来表示呢?

呵呵,起初我也这么想的,后来多看了几遍之后,明白了一点点。分析步骤用文字表述如下,

  1. 网页和网页之间的关系用图来表示
  2. 网页A和网页B之间的连接关系表示任意一个用户从网页A到转到网页B的可能性(概率)
  3. 所有网页的排名用一维向量来B来表示

所有网页之间的连接用矩阵A来表示,所有网页排名用B来表示。

pageRank如何进行并行化

好了,上面的数学阐述说明了“网页排名的计算可以最终抽象为矩阵相乘”,而在开始的时候已经证明过矩阵相乘可以并行化处理。

理论研究结束了,接下来的就是工程实现了,借用Pregel模型,PageRank中定义的各主要函数分别如下。

vertexProgram

def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = {val (oldPR, lastDelta) = attrval newPR = oldPR + (1.0 - resetProb) * msgSum(newPR, newPR - oldPR)}

sendMessage

def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {if (edge.srcAttr._2 > tol) {Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))} else {Iterator.empty}}

messageCombiner

def messageCombiner(a: Double, b: Double): Double = a + b

一点点启示

通过pageRank这个例子,我们能够搞清楚如何将平素学习的数学理论用以解决实际问题。

“学习的东西总是有价值的,至于用的上用不上,全靠造化了”

完整代码

// Connect to the Spark cluster
val sc = new SparkContext("spark://master.amplab.org", "research")
// Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile("graphx/data/users.txt").map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
// Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
// Attach the user attributes
val graph = followerGraph.outerJoinVertices(users) {case (uid, deg, Some(attrList)) => attrList// Some users may not have attributes so we set them as emptycase (uid, deg, None) => Array.empty[String]
}
// Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
// Compute the PageRank
val pagerankGraph = subgraph.pageRank(0.001)
// Get the attributes of the top pagerank users
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {case (uid, attrList, Some(pr)) => (pr, attrList.toList)case (uid, attrList, None) => (0.0, attrList.toList)
}println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))

小结

本篇讲来讲去就在强调一个问题,Spark是一个分布式并行计算框架。能不能用Spark,其实大体取决于问题的数学模型本身,如果可以并行化处理,则用之,切不可削足适履。

另一个用张图来总结一下提到的数学知识吧。

再一次强烈推荐《数学桥》

graphx 基础算法相关推荐

  1. 基础算法整理(1)——递归与递推

    程序调用自身的编程技巧称为递归( recursion).递归做为一种算法在程序设计语言中广泛应用. 一个过程或函数在其定义或说明中有直接或间接调用自身的一种方法,它通常把一个大型复杂的问题层层转化为一 ...

  2. 暑期集训2:ACM基础算法 练习题G:POJ - 1298

    2018学校暑期集训第二天--ACM基础算法 练习题G  --  POJ - 1298 The Hardest Problem Ever Julius Caesar lived in a time o ...

  3. 暑期集训2:ACM基础算法 练习题C:CF-1008A

    2018学校暑期集训第二天--ACM基础算法 练习题A  --   CodeForces - 1008A Romaji Vitya has just started learning Berlanes ...

  4. 暑期集训2:ACM基础算法 练习题B:CF-1008B

    2018学校暑期集训第二天--ACM基础算法 练习题B  --   CodeForces - 1008B Turn the Rectangles There are nn rectangles in ...

  5. 暑期集训2:ACM基础算法 练习题A:CF-1008C

    2018学校暑期集训第二天--ACM基础算法 练习题A  --  CodeForces - 1008C Reorder the Array You are given an array of inte ...

  6. 暑期集训2:ACM基础算法 例2:POJ-2456

    2018学校暑期集训第二天--ACM基础算法 例二  --   POJ - 2456 Aggressive cows Farmer John has built a new long barn, wi ...

  7. 暑期集训2:ACM基础算法 例1:POJ-1064

    2018学校暑期集训第二天--ACM基础算法 例一  --  POJ - 1064 Cable master Inhabitants of the Wonderland have decided to ...

  8. 第02期 基础算法(Leetcode)刻意练习开营计划

    背景 如果说 Java 是自动档轿车,C 就是手动档吉普.数据结构与算法呢?是变速箱的工作原理.你完全可以不知道变速箱怎样工作,就把自动档的车子从 A 开到 B,而且未必就比懂得的人慢.写程序这件事, ...

  9. 【基础算法】算法,从排序学起(一)

    本文目录 1.导言 2.谈谈排序 2.1 何为排序?(What is sorting?) 2.2 排序的应用(Why sorting?) 2.3 常见排序算法的种类(How to sort?) 3.基 ...

  10. 计算机及网络应用基础思维导图_计算机基础/算法/面试题 PDF+思维导图下载

    之前为了面试,整理了九大应付面试的思维导图 + 一份 630 页的程序员内功修炼手册 + 一份计算机基础/算法/Java技术栈/Linux C++技术栈的资料.当时我就是靠着这份思维导图以及整理的 P ...

最新文章

  1. linux curl 命令 http请求、下载文件、ftp上传下载
  2. python runner是什么_HttpRunner-01-初识
  3. Synbak 2.1 发布,系统备份工具
  4. Diffie-Hellman密钥交换协议
  5. xml约束和实际场景使用
  6. 线性一致性理解Linearizability
  7. 富士康立讯精密可能仍在苹果汽车代工商候选名单中
  8. java怎么复制别人的数据库_数据库实现主从复制
  9. 桶排序,冒泡排序,快速排序三者比较(例子说名)
  10. Debian 8 Jessie desktop on arm
  11. 吴恩达机器学习课程资源(笔记、中英文字幕视频、课后作业,提供百度云镜像!)
  12. 高级API 快速入门之第八章 多线程02
  13. 音乐播放器代码和网页播放器代码
  14. 带宽下载速度单位换算
  15. 入门安全测试需要知道什么?需要掌握哪些知识?
  16. 安全多方计算 # 个人笔记
  17. HTTP contentType
  18. Java定义一个抽象类科学家_Java程序设计作业
  19. 阅文JAVA后端笔试
  20. 成都百词斩2018web前端秋招笔试题

热门文章

  1. iOS底层探索之类的加载(一):read_images分析
  2. 历时3个月终克2.1大礼包
  3. wordpress后台加载速度异常缓慢排查记录(原创)
  4. 【BABY夜谈大数据】基于内容的推荐
  5. 【风马一族_php】NO2_php基础知识
  6. Spark Streaming揭秘 Day6 关于SparkStreaming Job的一些思考
  7. LeetCode 刷题隔天忘怎么办?
  8. 「leetcode」1. 两数之和:map等候多时了
  9. B - Sort the Array
  10. Ps2021教程,如何在photoshop中调整建筑物透视效果?