DRPC(分布式远程过程调用,Distributed Remote Procedure Call)是storm整合流(stream)、Spout、Bolt、Topology而形成的一种模式。引入DRPC旨在借助Storm集群实现远程的并行计算。DRPC客户端只要指定要使用的“函数”(函数实现了特定的功能),并将要计算的内容发送给storm的drpc服务,待服务端调用响应函数计算完成后,客户端会收到相应的返回结果。

DRPC的流程可以用官方给出的下图来阐述。

  1. 客户端将计算请求和参数发给DRPC Server
  2. DRPC Spout从DRPC Server接收“函数调用”所需的流,DRPC Server会赋予每个调用一个唯一的ID(request id)
  3. spout将流处理后发给拓扑中的“函数”进行计算
  4. 最终会在拓扑中的最后一个Bolt中,将计算结果返回给DRPC Server,返回结果与之前的request id想对应
  5. DRPC Server会将此次远程过程调用结果返回给客户端

Storm的DRPC服务分为客户度和服务端,服务端创建DRPC拓扑后提交到storm集群,并确保storm集群开启drpc服务。客户端相对独立的角色通过IP,DRPC服务端口,向拓扑提交计算参数,并获取计算结果。

DRPC拓扑的创建
DRPC拓扑与普通的拓扑区别仅仅在于构造拓扑所需的方法不同,storm提供了LinearDRPCTopologyBuilder方法来构造DRPC服务端。(LinearDRPCTopologyBuilder至少在0.9.6之后已经被废弃,响应的DRPC也由Trident实现)。
下面仍以旧版本的LinearDRPCTopologyBuilder来构造拓扑,至少可以了解DRPC的相关原理,也有助于理解之后的Trident。下面的代码修改于storm给出的示例代码BasicDRPCTopology。

package test.storm.starter;import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.StormSubmitter;
import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;public class BasicDRPCTopology {public static class ExclaimBolt extends BaseBasicBolt {@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String input = tuple.getString(1);collector.emit(new Values(tuple.getValue(0), input + "!!!"));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "exclaim"));}}public static class StarBolt extends BaseBasicBolt {@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String input = tuple.getString(1);collector.emit(new Values(tuple.getValue(0), input + "***"));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "result"));}}public static void main(String[] args) throws Exception {LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");builder.addBolt(new ExclaimBolt(), 3);builder.addBolt(new StarBolt(), 3).shuffleGrouping();Config conf = new Config();if (args == null || args.length == 0) {LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));for (String word : new String[]{ "hello", "goodbye" }) {System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));}Thread.sleep(10000);drpc.shutdown();cluster.shutdown();} else {conf.setNumWorkers(3);//进程数为3StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());}}
}

代码注释:
ExclaimBolt继承自BaseBasicBolt,BaseBasicBolt对于发送的每个元组封装了自动的进行ack()或者fail(),确保了可靠性。
execute方法会被不停的调用,前面曾提及Bolt的接收的元组第一个字段是DRPC Server提供的request ID,第二个字段是待计算的参数。ExclaimBolt将请求的参数加上”!!!”之后,和request ID拼成新的元组发出。
StarBolt和ExclaimBolt完全类似,但是它是拓扑中的最后一个bolt,它处理后的元组将以Fields(“id”, “result”)为字段名返回给DRPC Server。

DRPC构造器:创建函数名为”exclamation”线性DRPC构造器

LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");

addBolt方法将bolt依次添加到DRPC构造器列表中,其第一个参数为实现了Bolt接口的方法,第二个参数为计算的并行度。
拓扑的创建使用createRemoteTopology方法,而不是createTopology方法。

提交拓扑到生产环境
将上述代码打包后提交到集群中,为了使用drpc服务,应首先确保已经使用 storm drpc开启drpc服务。

storm dprc //开启drpc服务
storm jar basicdrpc.jar test.storm.starter.BasicDRPCTopology BasicDRPC//将当前目录下的basicdrpc.jar提交到拓扑,拓扑名为BasicDRPC

DRPC客户端

//for storm-1.0.2及以上
依赖的jar包(jar包来源于storm安装包lib目录下)
log4j-api-2.1.jar
log4j-core-2.1.jar
log4j-slf4j-impl-2.1.jar
slf4j-api-1.7.7.jar
storm-core-1.0.2.jarpackage test.TestDRPC;import java.util.Map;
import org.apache.storm.utils.DRPCClient;
import org.apache.storm.utils.Utils;public class TestDRPC {public static void main(String[] args) throws Exception {Map conf = Utils.readDefaultConfig();DRPCClient client = new DRPCClient(conf,"localhost",3772);//3772是drpc对外默认的服务端口//for storm-0.9.6//DRPCClient client = new DRPCClient("localhost",3772);String arr[] = new String []{"hello","world","storm","java","drpc"};for(String str:arr){System.out.println("DRPC result:" + client.execute("exclamation", str));}}
}

随着storm版本的更新,DRPCClient的构造方法也发生了变化,相对于storm-0.9.6需要传入当前环境的参数。
execute()方法的第一个参数是函数名,也就是LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(“exclamation”);定义的函数名。
第二个参数就是需要exclamation计算的参数。
将客户端代码打成“可执行”的jar包,假设名为testDRPC.jar

(xxx@AS6U3.amd64)[root@xx xx]# java -jar testDRPC.jar
DRPC result:hello!!!***
DRPC result:world!!!***
DRPC result:storm!!!***
DRPC result:java!!!***
DRPC result:drpc!!!***

当然你要确保执行testDRPC.jar的服务是和storm集群是可以正常通信的。

storm示例之DRPC相关推荐

  1. Storm中的DRPC简单概述

    前边我们基Hadoop实现了RPC,下面将一下Storm中的DRPC DRPC:分布式RPC,Storm中的DRPC是使用Storm实时并行计算真正强大的函数,Storm拓扑作为输入接收函数参数流,并 ...

  2. storm apache java_Apache Storm 示例 Java 拓扑 - Azure HDInsight | Microsoft Docs

    您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn. 以 Java 语言创建 ...

  3. Storm示例剖析-fastWordCount

    本文介绍介绍使用storm开发项目代码,介绍spout和bolt的相关接口等.storm的业务开发主要包括spout的开发,bolt的开发以及topology的创建等. 代码框架 spout 下述sp ...

  4. Storm的本地运行模式示例

    以word count为例,本地化运行模式(不需要安装zookeeper.storm集群),maven工程, pom.xml文件如下: <project xmlns="http://m ...

  5. Storm集群使用DRPC功能Version1.0.1

    在Storm集群上开启DRPC功能, 基于Storm的1.0.1版本, 并且执行简单的例子测试. 1.DRPC概念 DRPC就是分布式远程过程调用. Storm里面引入DRPC主要是利用storm的实 ...

  6. Storm专题一、Storm DRPC 分布式计算

    本文译自:https://storm.incubator.apache.org/documentation/Distributed-RPC.html Storm里面引入DRPC主要是利用storm的实 ...

  7. Twitter Storm: DRPC学习

    学习途径 作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明 网址: http://xumingming.sinaapp.com/756/twitte ...

  8. 【Storm】DRPC精解和案例分析

    一.DRPC简介和工作流程 1. DRPC 简介 分布式RPC( distributed RPC,DRPC) 用于对storm上大量的函数调用进行并行计算.对于每一次函数调用,storm集群上运行的拓 ...

  9. storm drpc

    转 http://www.cnblogs.com/panfeng412/archive/2012/07/02/storm-common-patterns-of-distributed-rpc.html ...

最新文章

  1. oracle解锁system密码,Oracle System密码忘记 密码修改、删除账号锁定lock
  2. Android JNI的第一步——从HelloWorld开始
  3. Qt修炼手册7_图形:用户自定义QGraphicsItem
  4. 求最大、次大和第3大的值
  5. 我们公司也实行了OKR
  6. DataGrid中的超级链接列使用注意点
  7. 10年资深面试官直言:80%人面试Java都会止步于此!
  8. Cmake构建_设置debug与release不同名字
  9. Oracle学习之路-- 案例分析实现行列转换的几种方式
  10. CronTrigger 示例 1
  11. 计算机网络习题集与习题解析 pdf,计算机网络习题集与答案.pdf
  12. excel多表数据自动关联
  13. Minecraft Mod 开发:4-创造模式物品栏
  14. java计算机毕业设计海康物流MyBatis+系统+LW文档+源码+调试部署
  15. 天气预报小程序的设计与实现
  16. 在QQ群和QQ空间中挂马
  17. MFC制作Windows画图程序(二)
  18. 桌面存放linux文件无法删除,桌面文件无法删除怎么办【图文教程】
  19. 09- 京东客户购买意向预测 (机器学习集成算法) (项目九) *
  20. 破解Navicat全家桶

热门文章

  1. LINQ之查询语法—let子句
  2. 滤波、信号、数字与模拟、金字塔不懂才怪教程
  3. 【双目视觉】摄像头测试
  4. python库——图形艺术
  5. 闪耀暖暖总是显示服务器连接失败,《闪耀暖暖》服务器连接失败怎么办 当前无网络链接怎么办...
  6. Android 深层链接DeepLink和应用链接AppLink:实现浏览器跳转 app
  7. java根据模板导出PDF详细教程(无bug版)
  8. virtio系列-packed virtqueue
  9. 北京下雪了,2011.02.10
  10. 米联客MZ7035A注意事项