HamaWhite 原创。转载请注明出处!欢迎大家增加Giraph 技术交流群: 228591158

Giraph中Aggregator的基本使用方法请參考官方文档:http://giraph.apache.org/aggregators.html 。本文重点在解析Giraph怎样实现Aggregators后文用图示的方法描写叙述了Aggregator的运行过程。

基本原理:在每一个超级步中,每一个Worker计算本地的聚集值。

超级步计算完毕后,把本地的聚集值发送给Master汇总。在MasterCompute()运行后,把全局的聚集值回发给全部的Workers。

缺点:当某个应用(或算法)使用了多个聚集器(Aggregators),Master要完毕全部聚集器的计算。由于Master要接受、处理、发送大量的数据,不管是在计算方面还是网络通信层次,都会导致Master成为系统瓶颈。

改进:採用分片聚集 (sharded aggregators) . 在每一个超级步的最后。每一个聚集器被派发给一个Worker。该Worker接受和聚集其它Workers发送给该聚集器的值。

然后Workers把自己的全部的聚集器发送给Master。这样Master就无需运行不论什么聚集,仅仅是接收每一个聚集器的终于值。在MasterCompute.compute运行后,Master不是直接把全部的聚集器发送给全部的Workers,而是发送给聚集器所属的Worker。然后每一个Worker再把其上的聚集器发送给全部的Workers.

首先给出Master <-- > Worker间, Worker <--> Worker间通信协议,在每一个类中的doRequest(ServerData serverData)方法中会解析并存储收到的消息。
1).  org.apache.giraph.comm.requests.SendWorkerAggregatorsRequest 类 . Worker --> Worker Owner
功能:每一个worker把当前超步的局部 aggregated values 发送到该Aggregator的拥有者。
2).  org.apache.giraph.comm.requests.SendAggregatorsToMasterRequest 类. Worker Owner--> Master
功能:每一个Worker把自己所拥有的Aggregator的终于 aggregated values 发送给 master。
3).  org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest 类. Master --> Worker Owner.
功能:master把终于的 aggregated values 或aggregators 发送给该Aggregator的拥有者。
4).  org.apache.giraph.comm.requests.SendAggregatorsToWorkerRequest 类。 Worker Owner--> Worker
功能: 发送终于的 aggregated values 到 其它workers。发送者为该Aggregator的拥有者。接受者为除发送者之外的全部workers。

Aggregator分类和 注冊

Giraph中把Aggregator分为两类:regular aggregators和persistent aggregators。

regular aggregators的值在每一个超级步開始会被重置为初始值,然而persistent aggregators的值在整个应用(算法)中一直保持。

举例来说。若LongSumAggregator在每一个顶点的compute()方法中加1。假设使用regular aggregators,在每一个超级步中就能够读取前一个超级步的參与计算的顶点总数;假设使用persistent aggregators,就能够获取前面全部超级步中參与计算的顶点总和。

在使用aggregator之前,必需要在mastes上Registering aggregators。做法:继承org.apache.giraph.master.DefaultMasterCompute类,重写 void initalize() 方法。

在该方法中注冊aggregators。语法例如以下:

registerAggregator(aggregatorName, aggregatorClass)
    registerPersistentAggregator(aggregatorName, aggregatorClass)

说明:MasterCompute.initalize()方法仅仅在第 INPUT_SUPERSTEP (-1) 超级步中运行一次。详细在 BSPServiceMaster.runMasterCompute(long superstep)方法中。在MasterCompute.compute()方法中,能够使用下述方法读取或改动聚集器的值。

getAggregatedValue(aggregatorName) //获取前一个超级步的聚集器值
     setAggregatedValue(aggregatorName, aggregatedValue) //改动聚集器的值

MasterCompute.compute()总是在Vertex.compute()前运行。 因为第 INPUT_SUPERSTEP ( -1)个超级步进行的是数据的载入和重分布过程,不计算Vertex.compute()。第0个超级步Vertex.compute()又是在MasterCompute.compute()方法后运行。故对第 -1 、 0个超级步MasterCompute.compute()方法中获得的聚集器值均为其初始值。从第1个超级步開始。MasterCompute.compute()方法才获得了全部Vertex.compute()在第0个超级步聚集的值。

1. 从第0个超级步開始。BspServiceMaster调用MasterAggregatorHandler类的finishSuperStep(MasterClient masterClient) 方法把聚集器派发给Worker。聚集器的value为上一个超级步的全局聚集值(final aggregated values)。第一次为初始值。先给出MasterAggregatorHandler的类继承关系。例如以下:

finishSuperStep(MasterClient masterClient) 方法核心内容例如以下:

  /*** Finalize aggregators for current superstep and share them with workers*/public void finishSuperstep(MasterClient masterClient) {for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {if (aggregator.isChanged()) {// if master compute changed the value, use the one he choseaggregator.setPreviousAggregatedValue(aggregator.getCurrentAggregatedValue());// reset aggregator for the next superstepaggregator.resetCurrentAggregator();}}/*** 把聚集器发送给所属的Worker。发送内容:* 1). Name of the aggregator* 2). Class of the aggregator* 3). Value of the aggretator*/try {for (Map.Entry<String, AggregatorWrapper<Writable>> entry :aggregatorMap.entrySet()) {masterClient.sendAggregator(entry.getKey(),entry.getValue().getAggregatorClass(),entry.getValue().getPreviousAggregatedValue());}masterClient.finishSendingAggregatedValues();} catch (IOException e) {throw new IllegalStateException("finishSuperstep: " +"IOException occurred while sending aggregators", e);}}

问题1:怎样确定aggregator的Worker Owner ?
答:依据aggregator的Name来确定它所属的Worker。计算方法例如以下:

/*** 依据aggregatorName和全部的workers列表来计算aggregator所属的Worker* 參数aggregatorName:Name of the aggregator* 參数workers: Workers的list列表* 返回值:Worker which owns the aggregator*/
public static WorkerInfo getOwner(String aggregatorName,List<WorkerInfo> workers) {//用aggregatorName的HashCode()值模以 Workers的总数目int index = Math.abs(aggregatorName.hashCode() % workers.size());return workers.get(index);  //返回aggregator所属的Worker
}

问题2:Worker 怎样推断自身是否接收完自己所拥有的aggregators?
答:Master给某个Worker发送aggregators时。同一时候发送到该Worker的aggregators数目。使用的 SendAggregatorsToOwnerRequest类对消息进行封装和解析。

2. Worker接受Master发送的Aggregator,Worker把接收到的聚集体值发送给其它全部Workers,然后每一个Workers就会得到上一个超级步的全局聚集值。
由前文知道,每一个Worker都有一个ServerData对象,ServerData类中关于Aggregator的两个成员变量例如以下:

// 保存Worker在当前超步拥有的aggregators
private final OwnerAggregatorServerData ownerAggregator;
// 保存前一个超步的aggregators
private final AllAggregatorServerData allAggregatorData;

能够看到,ownerAggregatorData用来存储在当前超步Master发送给Worker的聚集器,allAggregatorData用来保存上一个超级步全局的聚集值。ownerAggregatorData和allAggregatorData值的初始化在SendAggregatorsToOwnerRequest 类中的doRequest(ServerData serverData)方法中,例如以下:

public void doRequest(ServerData serverData) {DataInput input = getDataInput();AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();try {//收到的Aggregators数目。在CountingOutputStream类中有计数器counter,//每向输出流中加入一个聚集器对象,计数加1. 发送时,在flush方法中把该值插入到输出流最前面。

int numAggregators = input.readInt(); for (int i = 0; i < numAggregators; i++) { String aggregatorName = input.readUTF(); String aggregatorClassName = input.readUTF(); if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) { LongWritable count = new LongWritable(0); //Master发送给该Worker的requests总数目. count.readFields(input); aggregatorData.receivedRequestCountFromMaster(count.get(), getSenderTaskId()); } else { Class<Aggregator<Writable>> aggregatorClass = AggregatorUtils.getAggregatorClass(aggregatorClassName); aggregatorData.registerAggregatorClass(aggregatorName, aggregatorClass); Writable aggregatorValue = aggregatorData.createAggregatorInitialValue(aggregatorName); aggregatorValue.readFields(input); //把收到的上一次全局聚集的值赋值给allAggregatorData aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue); //ownerAggregatorData仅仅接受聚集器 serverData.getOwnerAggregatorData().registerAggregator( aggregatorName, aggregatorClass); } } } catch (IOException e) { throw new IllegalStateException("doRequest: " + "IOException occurred while processing request", e); } //接受一个 request,计数减1。同一时候把收到的Data加入到allAggregatorServerData的List<byte[]> masterData中 aggregatorData.receivedRequestFromMaster(getData()); }

每一个Worker在開始计算前。会调用BspServiceWorker类的prepareSuperStep()方法来进行聚集器值的派发和接受其它Workers发送的聚集器值。调用关系例如以下:

BspServiceWorker类的prepareSuperStep()方法例如以下:

@Override
public void prepareSuperstep() {if (getSuperstep() != INPUT_SUPERSTEP) {/** aggregatorHandler为WorkerAggregatorHandler类型,* 可參考上文中MasterAggregatorHandler的类继承关系.* workerAggregatorRequestProcessor声明为WorkerAggregatorRequestProcessor(接口)* 类型,实际为NettyWorkerAggregatorRequestProcessor的实例。* 用于Worker间发送聚集器的值。

*/ aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor); } }

WorkerAggregatorHandler类的prepareSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法例如以下:

public void prepareSuperstep(WorkerAggregatorRequestProcessor requestProcessor) {AllAggregatorServerData allAggregatorData =serviceWorker.getServerData().getAllAggregatorData();/*** 等待直到Master发送给该Worker的聚集器都已接受完,* 返回值为Master发送给该Worker的全部Data(聚集器)*/Iterable<byte[]> dataToDistribute =allAggregatorData.getDataFromMasterWhenReady(serviceWorker.getMasterInfo());// 把从Master收到的Data(聚集器)发送给其它全部WorkersrequestProcessor.distributeAggregators(dataToDistribute);// 等待直到接受完其它Workers发送给该Workers的聚集器allAggregatorData.fillNextSuperstepMapsWhenReady(getOtherWorkerIdsSet(), previousAggregatedValueMap,currentAggregatorMap);// 仅仅是清空allAggregatorServerData的List<byte[]> masterData对象// 为下一个超级步接受Master发送的聚集器做准备allAggregatorData.reset();
}

以下详述Worker怎样判定已接收全然部Master发送的全部Request ? 主要目的在于描写叙述分布式环境下线程间怎样协作。

在AllAggregatorServerData类中定义了TaskIdsPermitBarrier类型的变量masterBarrier,用来推断是否接收完Master发送的Request. TaskIdsPermitBarrier类中主要使用wait()、notifyAll()等方法来控制。当获得的aggregatorName等于AggregatorUtils.SPECIAL_COUNT_AGGREGATOR时,会调用requirePermits(long permits,int taskId)来添加接收的arrivedTaskIds和须要等待的request数目waitingOnPermits. 接受一个Request

  /*** Require more permits. This will increase the number of times permits* were required. Doesn't wait for permits to become available.** @param permits Number of permits to require* @param taskId Task id which required permits*/public synchronized void requirePermits(long permits, int taskId) {arrivedTaskIds.add(taskId);waitingOnPermits += permits;notifyAll();}
接受一个Request后,会调用releaseOnePermit()方法把waitingOnPermits减1。

3. 在Vertex.compute()方法中。每一个Worker聚集自身的值。

计算完毕后。调用WorkerAggregatorHandler类的finishSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法,把本地的聚集器的值给句聚集器的aggregatorName发送给该aggregator所属的Worker. Aggregator的属主Worker接受其它全部Workers发送的本地聚集值进行汇总,汇总完毕后发送给Master,供下一次超级步的MasterCompute.compute()方法使用。

finishSuperstep方法例如以下:

 /*** Send aggregators to their owners and in the end to the master** @param requestProcessor Request processor for aggregators*/public void finishSuperstep(WorkerAggregatorRequestProcessor requestProcessor) {OwnerAggregatorServerData ownerAggregatorData =serviceWorker.getServerData().getOwnerAggregatorData();// First send partial aggregated values to their owners and determine// which aggregators belong to this workerfor (Map.Entry<String, Aggregator<Writable>> entry :currentAggregatorMap.entrySet()) {boolean sent = requestProcessor.sendAggregatedValue(entry.getKey(),entry.getValue().getAggregatedValue());if (!sent) {// If it's my aggregator, add it directlyownerAggregatorData.aggregate(entry.getKey(),entry.getValue().getAggregatedValue());}}// FlushrequestProcessor.flush();// Wait to receive partial aggregated values from all other workersIterable<Map.Entry<String, Writable>> myAggregators =ownerAggregatorData.getMyAggregatorValuesWhenReady(getOtherWorkerIdsSet());// Send final aggregated values to masterAggregatedValueOutputStream aggregatorOutput =new AggregatedValueOutputStream();for (Map.Entry<String, Writable> entry : myAggregators) {int currentSize = aggregatorOutput.addAggregator(entry.getKey(),entry.getValue());if (currentSize > maxBytesPerAggregatorRequest) {requestProcessor.sendAggregatedValuesToMaster(aggregatorOutput.flush());}   }requestProcessor.sendAggregatedValuesToMaster(aggregatorOutput.flush());// Wait for master to receive aggregated values before proceedingserviceWorker.getWorkerClient().waitAllRequests();ownerAggregatorData.reset();}

调用关系例如以下:

4. 大同步后,Master调用MasterAggregatorHandler类的prepareSusperStep(masterClient)方法。收集聚集器的值。方法内容例如以下:

  public void prepareSuperstep(MasterClient masterClient) {// 收集上次超级步的聚集值,为master compute 做准备for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {// 假设是 Persistent Aggregator,则累加if (aggregator.isPersistent()) {aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());}aggregator.setPreviousAggregatedValue(aggregator.getCurrentAggregatedValue());aggregator.resetCurrentAggregator();progressable.progress();}}

然后调用MasterCompute.compute()方法(可能会改动聚集器的值),在该方法内若依据聚集器的值调用了MasterCompute类的haltCompute()方法来终止MaterCompute,则表明要结束整个Job。那么Master就会通知全部Workers要结束整个作业;在该方法内若没有调用MasterCompute类的haltCompute()方法。则回到步骤1继续进行迭代。

说明:Job迭代结束条件有三,满足其一即可:
1) 达到最大迭代次数
2) 没有活跃顶点且没有消息在传递
3) 终止MasterCompute计算

总结:为解决在多个Aggregator条件下,Master成为系统瓶颈的问题。採取了把全部Aggregator派发给某一部分Workers。由这些Workers完毕全局的聚集值的计算与发送,Master仅仅须要与这些Workers进行简单数据通信就可以,大大减少了Master的工作量。

附加:以下用图示方法说明上述运行过程。

实验条件:
    1). 一个Master,四个Worker
    2). 两个Aggregators,记为A1和A2。

1. Master把Aggregators发送给Workers,收到Aggregator的Worker就作为该Aggregator的Owner。

下图中Master把A1发送给Worker1,A2发送给Worker3.那么Worker1就作为A1的Owner,Worker3就是A2的Owner。该步骤在MasterAggregatorHandler类的finishSuperStep(MasterClient masterClient) 方法中完毕,使用的是SendAggregatorsToOwnerRequest 通信协议。注:每一个Owner Worker 可能有多个聚集器。

图1 Master分发Aggregator

2. Workers接受Master发送的Aggregator,然后把Aggregator发送给其它Workers。

Worker1要把A1分别发送给Worker2、Worker3和Worker4;Worker3要把A2分别发送给Worker1、Worker2和Worker4。该步骤在WorkerAggregatorHandler类的prepareSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法中完毕,使用的是SendAggregatorsToMasterRequest 通信协议。此步骤完毕后,每一个Worker上都有了聚集器A1和A2(详细为上一个超步的全局终于聚集值)。

3. 每一个Worker调用Vertex.compute()方法開始计算,收集本地的Aggregator聚集值。对聚集体A1来说,Worker1、Worker2、Worker3、Worker4的本地聚集值依次记为:A1、A12、 A13A14。对聚集器A2来说,Worker1、Worker2、Worker3、Worker4的本地聚集值依次记为:A2、A22、 A23、A24。计算完毕后,每一个Worker就要把本地的聚集值发送给聚集器的Owner,聚集器的Owner在接收的时候会合并聚集。

那么A1、A12、 A13、A14要发送给Worker1进行全局聚集得到A1’,A21 、A22、 A23、A24要发送给Worker3进行全局聚集得到A2’。计算公式例如以下:


此部分採用的是SendWorkerAggregatorsRequest通信协议。Worker1和Worker3要把汇总的A1和A2的新值:A1’ 和A2’发送给Master,供下一次超级步的MasterCompute.compute()方法使用採用的是SendAggregatorsToMasterRequest通信协议。

此部分在WorkerAggregatorHandler类的finishSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法中完毕。步骤例如以下图所看到的:

4. Master收到Worker1发送的A1’ 和Woker3发送的A2’后,此步骤在MasterAggregatorHandler类的prepareSusperStep(masterClient)方法中完毕。然后调用MasterCompute.compute()方法,此方法可能会改动聚集器的值,如得到A1’’和A2’’。在masterCompute.compute()方法内若依据聚集器的值调用了MasterCompute类的haltCompute()方法来终止MaterCompute,则表明要结束整个Job。那么Master就会通知全部Workers要结束整个作业;在该方法内若没有调用MasterCompute类的haltCompute()方法。则回到步骤1继续进行迭代,继续把A1’’发送给Worker1。A2’’发送给Worker3。

完。

本人原创,转载请注明出处!

欢迎大家增加Giraph 技术交流群: 228591158

转载于:https://www.cnblogs.com/mfmdaoyou/p/7375726.html

Giraph源代码分析(九)—— Aggregators 原理解析相关推荐

  1. MediaInfo源代码分析 5:JPEG解析代码分析

    ===================================================== MediaInfo源代码分析系列文章列表: MediaInfo源代码分析 1:整体结构 Me ...

  2. ST电机库v5.4.4源代码分析(1): FOC原理(结合ST电机库)

    编者:沉尸 (5912129@qq.com) 本文字描述电机FOC的原理性内容,大部分取材于网络,但是我对于某些细节进行了比较详细的描述.因为很多最初的出处链接没有记录下来,所以没有标明,忘谅解!本文 ...

  3. java基础流程分析,及原理解析,因为bu满,而qian行

    基本功 =>同样的流程 别人的解释*=>基础二 面向对象的特征 四个基本特征:抽象,继承, 封装, 多态 抽象: 就好比用程序描述一个人,肯定得抽象的通过(身高,体重,年龄 , 胖瘦)这些 ...

  4. Https丢包分析及底层原理解析

    一.背景 生产服务器为阿里云,应用系统出现请求第三方服务偶发性接口异常,数据查询不回来,查看日志,分析得出,请求接口超过最大连接时间30秒,正常1.2秒左右,能返回数据.经过确认,第三方https连接 ...

  5. kcp 介绍与源代码分析_KCP-GO源码解析

    原标题:KCP-GO源码解析 原文作者:张伯雨 golang技术社区 概念 ARQ:自动重传请求(Automatic Repeat-reQuest,ARQ)是OSI模型中数据链路层的错误纠正协议之一. ...

  6. MediaInfo源代码分析 4:Inform()函数

    ===================================================== MediaInfo源代码分析系列文章列表: MediaInfo源代码分析 1:整体结构 Me ...

  7. MediaInfo源代码分析 3:Open()函数

    ===================================================== MediaInfo源代码分析系列文章列表: MediaInfo源代码分析 1:整体结构 Me ...

  8. MediaInfo源代码分析 2:API函数

    ===================================================== MediaInfo源代码分析系列文章列表: MediaInfo源代码分析 1:整体结构 Me ...

  9. MediaInfo源代码分析 1:整体结构

    ===================================================== MediaInfo源代码分析系列文章列表: MediaInfo源代码分析 1:整体结构 Me ...

最新文章

  1. 转载:PHP JSON_ENCODE 不编码中文汉字的方法
  2. 包(package)
  3. TCP连接过程:三次握手与四次握手—Vecloud微云
  4. 发起一个ajax请求,发送ajax请求
  5. Java并发编程系列之CountDownLatch用法及详解
  6. linux python默认安装目录_非root用户在linux服务器自己目录下安装需要的python版本及其模块...
  7. 悔不当初:回顾进化之路
  8. 最原创的验证码产生过程,桃花朵朵开
  9. ios13 无法传参_iOS13个人热点功能频遭投诉
  10. python并行计算for循环_在python中并行化这个嵌套的for循环
  11. 我逛了下 JDK 一条街,发现了不少好东西!
  12. docker中的hassio升级_趣说Docker
  13. 白话之jsonp跨域原理分析
  14. 46. magento cron
  15. 上海瀚示中文显示电子拣货标签 智能仓储物流工匠级革新
  16. 关于小学计算机论文题目,经典小学计算机课论文选题 小学计算机课论文题目怎样定...
  17. Reloading current route in Angular 5 / Angular 6 / Angular 7
  18. html5如何快速选择工具,PS快速选择工具怎么使用?快捷键是什么?
  19. 工作经验|lambada处理集合的常用10种实战骚操作,我都记录下来了
  20. 【Visual C++】游戏开发五十六 浅墨DirectX教程二十三 打造游戏GUI界面(一)

热门文章

  1. python 对10个数进行排序
  2. goland设置goproxy是参数时Environment时的设置方法
  3. 2022-3-27学习博客
  4. Orangepi 4B
  5. IDEA Android Studio 配置
  6. Axure中继器的使用
  7. 视频创作没有音效素材?只需16行Python代码让你用都用不完,步骤非常详细
  8. 处理器运算能力单位(TOPS)
  9. python for finance第二版_python书籍分享
  10. Vant Weapp 0.5.11 发布,有赞小程序 UI 组件库