一、概述

DRPC (Distributed RPC -- remote procedure call分布式远程过程调用)是一种同步服务实现的机制,在Storm中客户端提交数据请求之后,立刻取得计算结果并返回给客户端。同时充分利用Storm的计算能力实现高密度的并行实时计算。

二、架构

  • DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。
  • DRPC Server 负责接收 RPC 请求,并将该请求发送到 Storm中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。(其实,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。
  • DRPC设计目的是为了充分利用Storm的计算能力实现高密度的并行实时计算。(Storm接收若干个数据流输入,数据在Topology当中运行完成,然后通过DRPC将结果进行输出。)

解释:客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流。DRPC 服务器会为每个函数调用都标记了一个唯一的 id。随后拓扑会执行函数来计算结果,并在拓扑的最后使JoinResult的Bolt实现数据的聚合, ReturnResults 的 bolt 连接到 DRPC 服务器,根据函数调用的 id 来将函数调用的结果返回。

三、实现方式

 3.1、通过LinearDRPCTopologyBuilder (该方法也过期,不建议使用)
该方法会自动为我们设定Spout、将结果返回给DRPC Server等,我们只需要将Topology实现

package com.lxk.storm.drpc;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;public class BasicDRPCTopology {public static class ExclaimBolt extends BaseBasicBolt {@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String input = tuple.getString(1);collector.emit(new Values(tuple.getValue(0), input + "!"));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "result"));}}public static void main(String[] args) throws Exception {LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");builder.addBolt(new ExclaimBolt(), 3);Config conf = new Config();if (args == null || args.length == 0) {LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));for (String word : new String[] { "hello", "goodbye" }) {System.err.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));}cluster.shutdown();drpc.shutdown();} else {conf.setNumWorkers(3);StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());}}
}

结果:

3.2、直接通过普通的拓扑构造方法TopologyBuilder来创建DRPC拓扑
需要手动设定好开始的DRPCSpout以及结束的ReturnResults

/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package com.lxk.storm.drpc;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.drpc.DRPCSpout;
import backtype.storm.drpc.ReturnResults;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;public class ManualDRPC {public static class ExclamationBolt extends BaseBasicBolt {@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("result", "return-info"));}@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String arg = tuple.getString(0);Object retInfo = tuple.getValue(1);collector.emit(new Values(arg + "!!!", retInfo));}}public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();LocalDRPC drpc = new LocalDRPC();DRPCSpout spout = new DRPCSpout("exclamation", drpc);builder.setSpout("drpc", spout);builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");LocalCluster cluster = new LocalCluster();Config conf = new Config();cluster.submitTopology("exclaim", conf, builder.createTopology());System.err.println(drpc.execute("exclamation", "aaa"));System.err.println(drpc.execute("exclamation", "bbb"));}
}

结果:

四、Storm运行模式

4.1、本地模式

public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();LocalDRPC drpc = new LocalDRPC();DRPCSpout spout = new DRPCSpout("exclamation", drpc);builder.setSpout("drpc", spout);builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");LocalCluster cluster = new LocalCluster();Config conf = new Config();cluster.submitTopology("exclaim", conf, builder.createTopology());System.err.println(drpc.execute("exclamation", "aaa"));System.err.println(drpc.execute("exclamation", "bbb"));}

 4.2、远程模式(集群模式)

配置

集群drpc
---------------------------------------------------
修改
$ vi conf/storm.yaml
drpc.servers:- "node03"分发配置storm.yaml文件给其他节点启动zk
主节点启动 nimbus,supervisor,drpc
从启动 supervisor
  • 启动DRPC Server:bin/storm drpc &
  • 通过StormSubmitter.submitTopology提交拓扑:StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

public static void main(String[] args) {     DRPCClient client = new DRPCClient("node03", 3772);//通信端口  try {String result = client.execute("exclamation", "11,22");System.out.println(result);} catch (TException e) {e.printStackTrace();} catch (DRPCExecutionException e) {e.printStackTrace();}

总结:Drpc分布式远程调用帮我们

  1. 实现了drpcSpout用来向后发送数据,我们只需要传参即可。
  2. 实现了最后的JoinResult用来汇合结果,ReturnResult用来将结果返回客户端。从而达到实时的目的。
  3. 我们可以修改并行度,使集群的并行计算能力达到最优,主要实现并行计算。

Storm之同步服务DRPC相关推荐

  1. rsync同步服务实验讲解

    rsync 同步服务 复制: 源所有数据 同步: 只传输变化数据 • 命令用法 – rsync [选项...] 源目录 目标目录 • 本地同步 – rsync [选项...] 本地目录1 本地目录2 ...

  2. Opera浏览器同步服务被黑,用户数据和存储密码泄露

    8月26日晚,知名浏览器厂商Opera发布公告,表示其云同步服务遭遇黑客攻击,开启了浏览器同步功能的用户将受影响. Opera公司的一台用于存储用户同步数据的服务器被攻破,如果用户开启了跨平台数据同步 ...

  3. Rsync数据同步服务

    Rsync数据同步服务 Rsync软件适用与unix/linux/windows等多种操作系统平台 Rsync是一款开源的,快速的,多功能的,可实现全量及增量的本地或远程数据同步备份的优秀工具,可以实 ...

  4. User profile synchronization service starting issues 用户配置文件同步服务启动问题

    User profile synchronization service starting issues 用户配置文件同步服务启动问题 这里,尽管我删除并重建了user profile service ...

  5. Storm集群使用DRPC功能Version1.0.1

    在Storm集群上开启DRPC功能, 基于Storm的1.0.1版本, 并且执行简单的例子测试. 1.DRPC概念 DRPC就是分布式远程过程调用. Storm里面引入DRPC主要是利用storm的实 ...

  6. ntpd时钟同步服务

    原网址:http://blog.csdn.net/wzyzzu/article/details/46515129 ntpd时钟同步服务 目录 参考: CentOS配置时间同步NTP: http://w ...

  7. SDH通信网络时钟同步服务(NTP/PTP精密网络时钟源)精度分析

    SDH通信网络时钟同步服务(NTP/PTP精密网络时钟源)精度分析 SDH通信网络时钟同步服务(NTP/PTP精密网络时钟源)精度分析 安徽京准公司提供原创资料!! 3) 从站时钟要从高一级设备或同一 ...

  8. 大数据开发平台-数据同步服务

    什么是数据同步服务?顾名思义,就是在不同的系统之间同步数据.根据具体业务目的和应用场景的不同,各种数据同步服务框架的功能侧重点往往不尽相同,因而大家也会用各种大同小异的名称来称呼这类服务,比如数据传输 ...

  9. win2003能装mysql_win2003 安装2个mysql实例做主从同步服务配置

    win2003 安装2个mysql实例做主从同步服务配置 2017年12月12日 | 萬仟网IT编程 | 我要评论 配置前的准备: 2台电脑,均安装windows2003 64位.均分三区c,d,e. ...

最新文章

  1. 多任务版udp聊天器
  2. 在C语言中,double、long、unsigned、int、char类型数据所占字节数
  3. Windows下安装Docker图解
  4. 使用AOP与注解记录Java日志
  5. 图像语义分割 -- UNET++
  6. 40个html作品,40个效果惊人的单页设计
  7. Robot Framework(十四) 扩展RobotFramework框架——创建测试库
  8. Flutter游戏:启动时的欢迎页
  9. Python 这么简单还用学吗?
  10. 删除ftp服务器上文件夹的方法,删除ftp服务器文件夹
  11. SAP PP销售预测转独立需求CODE
  12. 10周拿下腾讯数据分析师认证
  13. 将数据从前台传到后台方法总结
  14. matlab中删除矩阵中的某些行
  15. android原生widget 电量控制(PowerSave)设计浅析
  16. 【GDOI 2016 Day1】第四题 疯狂动物城
  17. 校招拼多多笔试题(前端工程师)
  18. 赴泰国的签证怎么办理
  19. 【C++】万年历的实现
  20. 建模杂谈系列142 关于MVD的思考

热门文章

  1. 全连接神经网络(DNN)
  2. 全国计算机等级考试电子版证书查询
  3. echarts热力背景图_echarts 热力图
  4. 百度“份量十足”,字节包容多样 |新年礼盒大赏时刻(附抽奖)
  5. 计算机信息管理专业的职业生涯规划,计算机信息管理专业职业生涯规划书
  6. vue的props传数组_详解vue.js之props传递参数
  7. CS5801AN是一个HDMI2.0b到DP1.4a转换器
  8. 国家电网泛在电力物联网分析—中台战略
  9. 关于米联客中pcie驱动编译失败的问题
  10. 线下分享 | 2018云和恩墨大讲堂深圳站