作者 | 白松

Giraph介绍:

Apache Giraph is an iterative graph processing system built for high scalability. For example, it is currently used at Facebook to analyze the social graph formed by users and their connections. Giraph originated as the open-source counterpart to Pregel, the graph processing architecture developed at Google and described in a 2010 paper. Both systems are inspired by the Bulk Synchronous Parallelmodel of distributed computation introduced by Leslie Valiant. Giraph adds several features beyond the basic Pregel model, including master computation, sharded aggregators, edge-oriented input, out-of-core computation, and more. With a steady development cycle and a growing community of users worldwide, Giraph is a natural choice for unleashing the potential of structured datasets at a massive scale.

原理:

Giraph基于Hadoop而建,将MapReduce中Mapper进行封装,未使用reducer。在Mapper中进行多次迭代,每次迭代等价于BSP模型中的SuperStep。一个Hadoop Job等价于一次BSP作业。基础结构如下图所示。

每部分的功能如下:

1. ZooKeeper: responsible for computation state

–partition/worker mapping

–global state: #superstep

–checkpoint paths, aggregator values, statistics

2. Master: responsible for coordination

–assigns partitions to workers

–coordinates synchronization

–requests checkpoints

–aggregates aggregator values

–collects health statuses

3. Worker: responsible for vertices

–invokes active vertices compute() function

–sends, receives and assigns messages

–computes local aggregation values

说明

(1)实验环境

三台服务器:test165、test62、test63。test165同时是JobTracker和TaskTracker.

测试例子:官网自带的SSSP程序,数据是自己模拟生成。

运行命令:Hadoop jar giraph-examples-1.0.0-for-hadoop-0.20.203.0-jar-with-dependencies.jar org.apache.giraph.GiraphRunner org.apache.giraph.examples.SimpleShortestPathsVertex -vif org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat -vip /user/giraph/SSSP -of org.apache.giraph.io.formats.IdWithValueTextOutputFormat -op /user/giraph/output-sssp-debug-7 -w 5

(2)为节约空间,下文中所有代码均为核心代码片段。

(3)core-site.xml中hadoop.tmp.dir的路径设为:/home/hadoop/hadooptmp

(4)写本文是多次调试完成的,故文中的JobID不一样,读者可理解为同一JobID.

(5)后续文章也遵循上述规则。

org.apache.giraph.graph.GraphMapper类

Giraph中自定义org.apache.giraph.graph.GraphMapper类来继承Hadoop中的 org.apache.hadoop.mapreduce.Mapper类,覆写了setup()、map()、cleanup()和run()方法。GraphMapper类的说明如下:

“This mapper that will execute the BSP graph tasks alloted to this worker. All tasks will be performed by calling the GraphTaskManager object managed by this GraphMapper wrapper classs. Since this mapper will not be passing data by key-value pairs through the MR framework, the Mapper parameter types are irrelevant, and set to Object type.”

BSP的运算逻辑被封装在GraphMapper类中,其拥有一GraphTaskManager对象,用来管理Job的tasks。每个GraphMapper对象都相当于BSP中的一个计算节点(compute node)。

在GraphMapper类中的setup()方法中,创建GraphTaskManager对象并调用其setup()方法进行一些初始化工作。如下:

map()方法为空,因为所有操作都被封装在了GraphTaskManager类中。在run()方法中调用GraphTaskManager对象的execute()方法进行BSP迭代计算。

org.apache.giraph.graph.GraphMapper类

功能:The Giraph-specific business logic for a single BSP compute node in whatever underlying type of cluster our Giraph job will run on. Owning object will provide the glue into the underlying cluster framework and will call this object to perform Giraph work.

下面讲述setup()方法,代码如下:

依次介绍每个方法的功能:

1、locateZookeeperClasspath(zkPathList)

找到ZK jar的本地副本,其路径为:/home/hadoop/hadooptmp/mapred/local/taskTracker/root/jobcache/job_201403270456_0001/jars/job.jar ,用于启动ZooKeeper服务。

2、startZooKeeperManager(),初始化和配置ZooKeeperManager。

定义如下:

3、org.apache.giraph.zk.ZooKeeperManager 类

功能:Manages the election of ZooKeeper servers, starting/stopping the services, etc.

ZooKeeperManager类的setup()定义如下:

createCandidateStamp()方法在 HDFS上 的_bsp/_defaultZkManagerDir/job_201403301409_0006/_task 目录下为每个task创建一个文件,文件内容为空。文件名为本机的Hostname+taskPartition,如下截图:

运行时指定了5个workers(-w 5),再加上一个master,所有上面有6个task。

getZooKeeperServerList()方法中,taskPartition为0的task会调用createZooKeeperServerList()方法创建ZooKeeper server List,也是创建一个空文件,通过文件名来描述Zookeeper servers。

首先获取taskDirectory(_bsp/_defaultZkManagerDir/job_201403301409_0006/_task)目录下文件,如果当前目录下有文件,则把文件名(Hostname+taskPartition)中的Hostname和taskPartition存入到hostNameTaskMap中。扫描taskDirectory目录后,若hostNameTaskMap的size大于serverCount(等于GiraphConstants.java中的ZOOKEEPER_SERVER_COUNT变量,定义为1),就停止外层的循环。外层循环的目的是:因为taskDirectory下的文件每个task文件时多个task在分布式条件下创建的,有可能task 0在此创建server List时,别的task还没有生成后task文件。Giraph默认为每个Job启动一个ZooKeeper服务,也就是说只有一个task会启动ZooKeeper服务。

经过多次测试,task 0总是被选为ZooKeeper Server ,因为在同一进程中,扫描taskDirectory时,只有它对应的task 文件(其他task的文件还没有生成好),然后退出for循环,发现hostNameTaskMap的size等于1,直接退出while循环。那么此处就选了test162 0。

最后,创建了文件:_bsp/_defaultZkManagerDir/job_201403301409_0006/zkServerList_test162 0

onlineZooKeeperServers(),根据zkServerList_test162 0文件,Task 0 先生成zoo.cfg配置文件,使用ProcessBuilder来创建ZooKeeper服务进程,然后Task 0 再通过socket连接到ZooKeeper服务进程上,最后创建文件 _bsp/_defaultZkManagerDir/job_201403301409_0006/_zkServer/test162 0 来标记master任务已完成。worker一直在进行循环检测master是否生成好 _bsp/_defaultZkManagerDir/job_201403301409_0006/_zkServer/test162 0即worker等待直到master上的ZooKeeper服务已经启动完成。

启动ZooKeeper服务的命令如下:

4、determineGraphFunctions()。

GraphTaskManager类中有CentralizedServiceMaster对象和CentralizedServiceWorker 对象,分别对应于master和worker。每个BSP compute node扮演的角色判定逻辑如下:

a) If not split master, everyone does the everything and/or running ZooKeeper.

b) If split master/worker, masters also run ZooKeeper

c) If split master/worker == true and giraph.zkList is set, the master will not instantiate a ZK instance, but will assume a quorum is already active on the cluster for Giraph to use.

该判定在GraphTaskManager 类中的静态方法determineGraphFunctions()中定义,片段代码如下:

默认的,Giraph会区分master和worker。会在master上面启动zookeeper服务,不会在worker上启动ZooKeeper服务。那么Task 0 就是master+ZooKeeper,其他Tasks就是workers。

更多技术文章可以查看“了解更多”

启动zookeeper_Giraph源码分析(一)—启动ZooKeeper服务相关推荐

  1. 【Android 插件化】VirtualApp 源码分析 ( 启动应用源码分析 | HomePresenterImpl 启动应用方法 | VirtualCore 启动插件应用最终方法 )

    文章目录 一.启动应用源码分析 1.HomeActivity 启动应用点击方法 2.HomePresenterImpl 启动应用方法 3.VirtualCore 启动插件应用最终方法 一.启动应用源码 ...

  2. storm启动supervisor源码分析-supervisor.clj

    storm启动supervisor源码分析-supervisor.clj supervisor是storm集群重要组成部分,supervisor主要负责管理各个"工作节点".sup ...

  3. storm启动nimbus源码分析-nimbus.clj

    storm启动nimbus源码分析-nimbus.clj nimbus是storm集群的"控制器",是storm集群的重要组成部分.我们可以通用执行bin/storm nimbus ...

  4. 3supervisor启动worker源码分析-worker.clj

    supervisor启动worker源码分析-worker.clj supervisor通过调用sync-processes函数来启动worker,关于sync-processes函数的详细分析请参见 ...

  5. supervisor启动worker源码分析-worker.clj

    supervisor通过调用sync-processes函数来启动worker,关于sync-processes函数的详细分析请参见"storm启动supervisor源码分析-superv ...

  6. Spring Boot 2.x 启动全过程源码分析(全)

    上篇<Spring Boot 2.x 启动全过程源码分析(一)入口类剖析>我们分析了 Spring Boot 入口类 SpringApplication 的源码,并知道了其构造原理,这篇我 ...

  7. Spring Boot 2.x 启动全过程源码分析(上)入口类剖析

    转载自   Spring Boot 2.x 启动全过程源码分析(上)入口类剖析 Spring Boot 的应用教程我们已经分享过很多了,今天来通过源码来分析下它的启动过程,探究下 Spring Boo ...

  8. Linux内核源码分析--内核启动之(4)Image内核启动(setup_arch函数)(Linux-3.0 ARMv7)【转】...

    原文地址:Linux内核源码分析--内核启动之(4)Image内核启动(setup_arch函数)(Linux-3.0 ARMv7) 作者:tekkamanninja 转自:http://blog.c ...

  9. Wifi模块—源码分析Wifi启动(Android P)

    一.前言 Android P在wifi这块改动挺大的,Wifi到AndoidO之后不再使用jni,所以AndroidP也一样不再使用jni来实现Java代码与本地的C/C++代码交互,而是使用HIDL ...

最新文章

  1. Software-testing-foundations-homework3
  2. 输入序列连续的序列检测
  3. ESP32 入门教学,不入门,不教学
  4. 编程体系结构(02):Java异常体系
  5. 企业级 SpringBoot 教程 (二)Spring Boot配置文件详解
  6. 计算机采用逻辑元件的发展顺序是,全国计算机等级考试一级选择题以及答案
  7. 线性代数 第六版 答案
  8. html5吻胸小游戏,html5气球大战小游戏代码
  9. Sublime text3 安装PyV8
  10. appium 配置 打开应用闪退
  11. vue使用高德地图为信息窗体再添加点击事件
  12. 众里寻她千百度,wordperss 热键
  13. < 谈谈对 SPA(单页面应用)的理解 >
  14. 向U盘中安装Linux系统的经验(不是制作安装盘)
  15. 自学python(2):利用opencv实现读图,显示,画框,裁剪的python代码
  16. serialplot虚拟串口示波器使用方法
  17. 基于优化的多核局部费舍尔判别分析的故障分类
  18. 【正点原子I.MX6U-MINI应用篇】1、编写第一个应用App程序helloworld
  19. C#串口通信数据不完整
  20. 打印出每个学生的总分和平均分,每门课的总分和平均分

热门文章

  1. .net测试篇之单元测试/集成测试神器Autofixture
  2. [NewLife.XCode]脏数据
  3. 【招聘(南京)】南京纳龙科技有限公司招高级.net开发工程师
  4. C# 未来新特性:静态委托和函数指针
  5. .Net Core跨平台应用研究-HelloArm(串口篇)
  6. 基于Asp.Net Core打造轻量级内部服务治理RPC(一)
  7. 分布式事务一致性解决方案
  8. 手把手引进门之 ASP.NET Core Entity Framework Core(官方教程翻译版 版本3.2.5)
  9. 塑造Visual Studio的未来
  10. [C#7] 1.Tuples(元组)