实验指导:

19.1 实验目的

1.了解Spark的图计算框架GraphX的基本知识;

2.能利用GraphX进行建图;

3.能利用GraphX进行基本的图操作;

4.理解GraphX图操作的算法。

19.2 实验要求

要求实验结束时,每位学生能完成正确运行Spark GraphX的示例程序,正确上传到集群中运行得到正确的实验结果。

实验结束时能对实验代码进行一定的理解。

19.3 实验原理

Spark GraphX是一个分布式图处理框架,Spark GraphX基于Spark平台提供对图计算和图挖掘简洁易用的而丰富多彩的接口,极大的方便了大家对分布式图处理的需求。

社交网络中人与人之间有很多关系链,例如Twitter、Facebook、微博、微信,这些都是大数据产生的地方,都需要图计算,现在的图处理基本都是分布式的图处理,而并非单机处理,Spark GraphX由于底层是基于Spark来处理的,所以天然就是一个分布式的图处理系统。

图的分布式或者并行处理其实是把这张图拆分成很多的子图,然后我们分别对这些子图进行计算,计算的时候可以分别迭代进行分阶段的计算,即对图进行并行计算。

适用范围:图计算。

19.4 实验步骤

本实验主要可以分为在IDEA安装Scala的插件、编写Scala的程序、打包提交程序和查看程序运行结果等几个步骤。详细步骤如下:

确认好上述“操作前提”后,读者可按下述步骤执行

19.4.1 在Intellij IDEA 中安装Scala的插件

注:IntelliJ IDEA对Scala编程比较方便。如不习惯请参考网上Eclipse版本。

点击File -> Plugins -> 输入”Scala” 搜索 -> 点击搜索条目为Scala的项目-> 点击右侧绿色按钮Install -> 安装完成之后重启IDEA。

19.4.2 新建Scala Module

点击当前IDEA中的project然后右键点击new -> module -> 选择Scala -> next -> 输入module name “sparkgraphx”。

点击新建的sparkgraphx 的module 然后右键点击”Add Framework Support” 选择maven支持。

19.4.3 添加maven依赖

在pom.xml 文件中添加如下依赖

<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-graphx_2.10</artifactId><version>1.5.1</version></dependency>
</dependencies>

右键选项点击Reimport导入依赖。(需要稍等一段时间导入maven依赖)。

19.4.4 新建Scala程序

打开项目目录然后右键新建package命名为test,然后新建Scala class 将Kindt由class改为object。输入名字为GraphXExample。

输入代码:

package testimport org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object GraphXExample {def main(args: Array[String]) {Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)val conf = new SparkConf().setAppName("SimpleGraphX").setMaster("local")val sc = new SparkContext(conf)val vertexArray = Array((1L, ("Alice", 28)),(2L, ("Bob", 27)),(3L, ("Charlie", 65)),(4L, ("David", 42)),(5L, ("Ed", 55)),(6L, ("Fran", 50)))val edgeArray = Array(Edge(2L, 1L, 7),Edge(2L, 4L, 2),Edge(3L, 2L, 4),Edge(3L, 6L, 3),Edge(4L, 1L, 1),Edge(5L, 2L, 2),Edge(5L, 3L, 8),Edge(5L, 6L, 3))val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)println("属性演示")println("**********************************************************")println("找出图中年龄大于30的顶点:")graph.vertices.filter { case (id, (name, age)) => age > 30}.collect.foreach {case (id, (name, age)) => println(s"$name is $age")}println("找出图中属性大于5的边:")graph.edges.filter(e => e.attr > 5).collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))printlnprintln("列出边属性>5的tripltes:")for (triplet <- graph.triplets.filter(t => t.attr > 5).collect) {println(s"${triplet.srcAttr._1} likes ${triplet.dstAttr._1}")}printlnprintln("找出图中最大的出度、入度、度数:")def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {if (a._2 > b._2) a else b}println("max of outDegrees:" + graph.outDegrees.reduce(max) + " max of inDegrees:" + graph.inDegrees.reduce(max) + " max of Degrees:" + graph.degrees.reduce(max))printlnprintln("**********************************************************")println("转换操作")println("**********************************************************")println("顶点的转换操作,顶点age + 10:")graph.mapVertices{ case (id, (name, age)) => (id, (name, age+10))}.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))printlnprintln("边的转换操作,边的属性*2:")graph.mapEdges(e=>e.attr*2).edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))printlnprintln("**********************************************************")println("结构操作")println("**********************************************************")println("顶点年纪>30的子图:")val subGraph = graph.subgraph(vpred = (id, vd) => vd._2 >= 30)println("子图所有顶点:")subGraph.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))printlnprintln("子图所有边:")subGraph.edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))printlnprintln("**********************************************************")println("连接操作")println("**********************************************************")val inDegrees: VertexRDD[Int] = graph.inDegreescase class User(name: String, age: Int, inDeg: Int, outDeg: Int)val initialUserGraph: Graph[User, Int] = graph.mapVertices { case (id, (name, age)) => User(name, age, 0, 0)}val userGraph = initialUserGraph.outerJoinVertices(initialUserGraph.inDegrees) {case (id, u, inDegOpt) => User(u.name, u.age, inDegOpt.getOrElse(0), u.outDeg)}.outerJoinVertices(initialUserGraph.outDegrees) {case (id, u, outDegOpt) => User(u.name, u.age, u.inDeg,outDegOpt.getOrElse(0))}println("连接图的属性:")userGraph.vertices.collect.foreach(v => println(s"${v._2.name} inDeg: ${v._2.inDeg}  outDeg: ${v._2.outDeg}"))printlnprintln("出度和入度相同的人员:")userGraph.vertices.filter {case (id, u) => u.inDeg == u.outDeg}.collect.foreach {case (id, property) => println(property.name)}printlnprintln("**********************************************************")println("聚合操作")println("**********************************************************")println("找出5到各顶点的最短:")val sourceId: VertexId = 5Lval initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)val sssp = initialGraph.pregel(Double.PositiveInfinity)((id, dist, newDist) => math.min(dist, newDist),triplet => {if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))} else {Iterator.empty}},(a,b) => math.min(a,b))println(sssp.vertices.collect.mkString("\n"))sc.stop()}
} 

19.4.5 程序运行

输入命令运行:

[root@master ~]# cd /usr/cstor /spark/
[root@master spark-1.6.0/]# bin/spark-submit --class test.GraphXExample ~/sparkgraphx.jar 

19.5 实验结果

程序正确运行结果如图19-1所示:

图19-1

实验操作:

步骤1:搭建Spark集群

配置Spark集群(独立模式):

前提:1、请自行配置各节点之间的免密登录,并在/etc/hosts中写好hostname与IP的对应,这样方便配置文件的相互拷贝。2、因为下面实验涉及Spark集群使用HDFS,所以按照之前的实验预先部署好HDFS。

在master机上操作:确定存在spark。

[root@master ~]# ls /usr/cstor
spark/
[root@master ~]#

在master机上操作:进入/usr/cstor目录中。

[root@master ~]# cd /usr/cstor
[root@master cstor]#

进入配置文件目录/usr/cstor/spark/conf, 先拷贝并修改slave.templae为slave。

[root@master ~]# cd /usr/cstor/spark/conf
[root@master cstor]# cp  slaves.template slaves

然后用vim命令编辑器编辑slaves文件

[root@master cstor]# vim slaves

编辑slaves文件将下述内容添加到slaves文件中。

slave1
slave2
slave3

上述内容表示当前的Spark集群共有三台slave机,这三台机器的机器名称分别是slave1~3。

在spark-conf.sh中加入JAVA_HOME。

[root@master cstor]# vim /usr/cstor/spark/sbin/spark-config.sh

加入以下内容

export JAVA_HOME=/usr/local/jdk1.7.0_79

将配置好的Spark拷贝至slaveX、client。(machines在目录/root/data/2下,如果不存在则自己新建一个)

使用for循环语句完成多机拷贝。

[root@master ~]# cd /root/data/2
[root@master ~]# cat  machines
slave1
slave2
slave3
client
[root@master ~]# for  x  in  `cat  machines` ; do  echo  $x ; scp  -r  /usr/cstor/spark/  $x:/usr/cstor/; done;

在master机上操作:启动Spark集群。

[root@master local]# /usr/cstor/spark/sbin/start-all.sh

步骤2:配置集成开发环境

配置Spark集群使用HDFS:

首先关闭集群(在master上执行)

[root@master ~]# /usr/cstor/spark/sbin/stop-all.sh

将Spark环境变量模板复制成环境变量文件。

[root@master ~]# cd /usr/cstor/spark/conf
[root@master conf]# cp spark-env.sh.template spark-env.sh

修改Spark环境变量配置文件spark-env.sh。

[root@master conf]$ vim spark-env.sh

在sprak-env.sh配置文件中添加下列内容。

export HADOOP_CONF_DIR=/usr/cstor/hadoop/etc/hadoop

重新启动spark

[root@master local]# /usr/cstor/spark/sbin/start-all.sh

步骤3:编写Scala程序

步骤4:打包程序

步骤5:运行程序

在xshell中运行后会有乱码出现,不知道什么原因,还未找到解决方案

但是我们在idea中可以运行查看,是没有任何问题的

实验十九 Spark实验:GraphX相关推荐

  1. 实验十八 Spark实验:Spark Streaming

    实验指导: 18.1 实验目的 1. 了解Spark Streaming版本的WordCount和MapReduce版本的WordCount的区别: 2. 理解Spark Streaming的工作流程 ...

  2. ArcGIS实验教程——实验十九:网络分析(最短路径实现)

    ArcGIS实验视频教程合集:<ArcGIS实验教程从入门到精通>(附配套实验数据) 一.实验描述 网络分析模块用于实现基于网络数据集的网络分析功能,包括路径分析.服务区分析.最近设施点分 ...

  3. Arduino实验十九 使用ULN2003达林阵列驱动电机风扇

    学习任务: 学会使用旋转编码器 组件: Arduion主板 ULN2003达林阵列驱动 触摸开关 风扇组件 直流电机 杜邦线 面包板 USB数据线 下图是风扇组件 下图是ULN2003达林阵列组件 实 ...

  4. 201671010426 孙锦喆 实验十四 团队项目评审课程学习总结

    项目 内容 这个作业属于哪个课程 2016计算机科学与工程学院软件工程(西北师范大学) 这个作业的要求在哪里 实验十四 团队项目评审&课程学习总结 作业学习目标 (1)掌握软件项目评审会流程 ...

  5. 【黑金原创教程】【FPGA那些事儿-驱动篇I 】实验十二:串口模块① — 发送

    实验十二:串口模块① - 发送 串口固然是典型的实验,想必许多同学已经作烂,不过笔者还要循例介绍一下.我们知道串口有发送与接收之分,实验十二的实验目的就是实现串口发送,然而不同的是 ... 笔者会用另 ...

  6. c语言实验报告9 四川师范大学,四川师范大学数学与软件科学学院程序设计实验报告实验十...

    四川师范大学数学与软件科学学院程序设计实验报告实验十 一.实验目的 (1) 掌握位运算的基本概念和方法,以及有关位运算的常见算法和基本应用: (2) 掌握文件和文件指针概念,以及文件的一般定义.操作方 ...

  7. c语言对分查找实验报告,C语言实验指导.doc

    C语言实验指导.doc C语言实验指导(要求认真填写实验报告中的各项内容,不得空白或填写未发现问题)实验一 顺序结构程序设计实验二 选择结构程序设计实验三 循环结构程序设计实验四 一维数组及其应用实验 ...

  8. 【黑金原创教程】【FPGA那些事儿-驱动篇I 】实验二十九:LCD模块

    实验二十九:LCD模块 据说Alinx 301支持 7"TFT,好奇的朋友一定疑惑道,它们3.2"TFT以及7"TFT等两者之间究竟有何区别呢?答案很简单,前者自带控制器 ...

  9. stm32l0的停止模式怎么唤醒_「正点原子STM32Mini板资料连载」第十九章 待机唤醒实验...

    1)实验平台:正点原子STM32mini开发板 2)摘自<正点原子STM32 不完全手册(HAL 库版)>关注官方微信号公众号,获取更多资料:正点原子 第十九章 待机唤醒实验 本章我们将向 ...

最新文章

  1. 阿里云 rds 在windows 里面恢复
  2. ssh闲置一段时间自动登出问题的解决
  3. java垃圾回收机制_JVM的垃圾回收机制——垃圾回收算法
  4. 使用Shiro的JdbcRealm实现查询数据库进行身份认证
  5. 28岁学python转行_28岁转行程序员,学Java还是Python?码农:想快点月薪过万就选它...
  6. Apple 的 CEO和Google的CEO在星巴克聊什么呢?
  7. DOS口令启用停用的管理员密码
  8. python 工程 ——文件、包、__init__及导入方法
  9. 《Python cookbook》笔记二
  10. mysql授权许可_分析MySQL的授权许可
  11. Gradle与Makefile构建工具的对比
  12. 快速合并所有txt文档
  13. 《社会企业开展应聘文职人员培训规范》团体标准在新华书店上架
  14. 《中国化工贸易》征稿函
  15. 撰写论文时常用的研究方法有哪些?
  16. 基于有源钳位三电平的有源电力滤波器(ANPC-APF)MATLAB仿真,包括自建的DSOGI锁相模块和PQ谐波检测模块。 可简单解释。
  17. 傅里叶特征学习高频:Fourier Features Let Networks Learn High Frequency Functions in Low Dimensional Domains
  18. 【历史上的今天】12 月 31 日:千年虫问题;DNA 计算之父出生;微机先驱 Cromemco 成立
  19. relative和absolute的使用(详细+案例)
  20. 软件项目管理课后习题——第7章软件项目的质量管理与配置管理

热门文章

  1. 中兴面试-2022-7-13
  2. 很喜欢的一个人:马云(从顽皮少年到商界大侠)
  3. 商业模式新生代之开放商业模式
  4. Dijkstra 算法-《数据结构》严蔚敏
  5. 《数据结构》C语言版 严蔚敏版本 学习笔记
  6. java opencv磨皮算法_深度学习AI美颜系列 - AI美颜磨皮算法[转]
  7. Quartus-II两种方式实现D触发器及时序仿真和波形验证
  8. Lifekeeper Linux
  9. win7系统定时删除数据的批处理命令_win7系统使用批处理删除文件详细教程
  10. 2023年股票质押违约处置研究报告