Storm之同步服务DRPC
一、概述
DRPC (Distributed RPC -- remote procedure call分布式远程过程调用)是一种同步服务实现的机制,在Storm中客户端提交数据请求之后,立刻取得计算结果并返回给客户端。同时充分利用Storm的计算能力实现高密度的并行实时计算。
二、架构
- DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。
- DRPC Server 负责接收 RPC 请求,并将该请求发送到 Storm中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。(其实,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。)
- DRPC设计目的是为了充分利用Storm的计算能力实现高密度的并行实时计算。(Storm接收若干个数据流输入,数据在Topology当中运行完成,然后通过DRPC将结果进行输出。)
解释:客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流。DRPC 服务器会为每个函数调用都标记了一个唯一的 id。随后拓扑会执行函数来计算结果,并在拓扑的最后使JoinResult的Bolt实现数据的聚合, ReturnResults 的 bolt 连接到 DRPC 服务器,根据函数调用的 id 来将函数调用的结果返回。
三、实现方式
3.1、通过LinearDRPCTopologyBuilder (该方法也过期,不建议使用)
该方法会自动为我们设定Spout、将结果返回给DRPC Server等,我们只需要将Topology实现
package com.lxk.storm.drpc;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;public class BasicDRPCTopology {public static class ExclaimBolt extends BaseBasicBolt {@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String input = tuple.getString(1);collector.emit(new Values(tuple.getValue(0), input + "!"));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "result"));}}public static void main(String[] args) throws Exception {LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");builder.addBolt(new ExclaimBolt(), 3);Config conf = new Config();if (args == null || args.length == 0) {LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));for (String word : new String[] { "hello", "goodbye" }) {System.err.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));}cluster.shutdown();drpc.shutdown();} else {conf.setNumWorkers(3);StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());}}
}
结果:
3.2、直接通过普通的拓扑构造方法TopologyBuilder来创建DRPC拓扑
需要手动设定好开始的DRPCSpout以及结束的ReturnResults
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package com.lxk.storm.drpc;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.drpc.DRPCSpout;
import backtype.storm.drpc.ReturnResults;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;public class ManualDRPC {public static class ExclamationBolt extends BaseBasicBolt {@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("result", "return-info"));}@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String arg = tuple.getString(0);Object retInfo = tuple.getValue(1);collector.emit(new Values(arg + "!!!", retInfo));}}public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();LocalDRPC drpc = new LocalDRPC();DRPCSpout spout = new DRPCSpout("exclamation", drpc);builder.setSpout("drpc", spout);builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");LocalCluster cluster = new LocalCluster();Config conf = new Config();cluster.submitTopology("exclaim", conf, builder.createTopology());System.err.println(drpc.execute("exclamation", "aaa"));System.err.println(drpc.execute("exclamation", "bbb"));}
}
结果:
四、Storm运行模式
4.1、本地模式
public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();LocalDRPC drpc = new LocalDRPC();DRPCSpout spout = new DRPCSpout("exclamation", drpc);builder.setSpout("drpc", spout);builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");LocalCluster cluster = new LocalCluster();Config conf = new Config();cluster.submitTopology("exclaim", conf, builder.createTopology());System.err.println(drpc.execute("exclamation", "aaa"));System.err.println(drpc.execute("exclamation", "bbb"));}
4.2、远程模式(集群模式)
配置
集群drpc
---------------------------------------------------
修改
$ vi conf/storm.yaml
drpc.servers:- "node03"分发配置storm.yaml文件给其他节点启动zk
主节点启动 nimbus,supervisor,drpc
从启动 supervisor
- 启动DRPC Server:bin/storm drpc &
- 通过StormSubmitter.submitTopology提交拓扑:StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
public static void main(String[] args) { DRPCClient client = new DRPCClient("node03", 3772);//通信端口 try {String result = client.execute("exclamation", "11,22");System.out.println(result);} catch (TException e) {e.printStackTrace();} catch (DRPCExecutionException e) {e.printStackTrace();}
总结:Drpc分布式远程调用帮我们
- 实现了drpcSpout用来向后发送数据,我们只需要传参即可。
- 实现了最后的JoinResult用来汇合结果,ReturnResult用来将结果返回客户端。从而达到实时的目的。
- 我们可以修改并行度,使集群的并行计算能力达到最优,主要实现并行计算。
Storm之同步服务DRPC相关推荐
- rsync同步服务实验讲解
rsync 同步服务 复制: 源所有数据 同步: 只传输变化数据 • 命令用法 – rsync [选项...] 源目录 目标目录 • 本地同步 – rsync [选项...] 本地目录1 本地目录2 ...
- Opera浏览器同步服务被黑,用户数据和存储密码泄露
8月26日晚,知名浏览器厂商Opera发布公告,表示其云同步服务遭遇黑客攻击,开启了浏览器同步功能的用户将受影响. Opera公司的一台用于存储用户同步数据的服务器被攻破,如果用户开启了跨平台数据同步 ...
- Rsync数据同步服务
Rsync数据同步服务 Rsync软件适用与unix/linux/windows等多种操作系统平台 Rsync是一款开源的,快速的,多功能的,可实现全量及增量的本地或远程数据同步备份的优秀工具,可以实 ...
- User profile synchronization service starting issues 用户配置文件同步服务启动问题
User profile synchronization service starting issues 用户配置文件同步服务启动问题 这里,尽管我删除并重建了user profile service ...
- Storm集群使用DRPC功能Version1.0.1
在Storm集群上开启DRPC功能, 基于Storm的1.0.1版本, 并且执行简单的例子测试. 1.DRPC概念 DRPC就是分布式远程过程调用. Storm里面引入DRPC主要是利用storm的实 ...
- ntpd时钟同步服务
原网址:http://blog.csdn.net/wzyzzu/article/details/46515129 ntpd时钟同步服务 目录 参考: CentOS配置时间同步NTP: http://w ...
- SDH通信网络时钟同步服务(NTP/PTP精密网络时钟源)精度分析
SDH通信网络时钟同步服务(NTP/PTP精密网络时钟源)精度分析 SDH通信网络时钟同步服务(NTP/PTP精密网络时钟源)精度分析 安徽京准公司提供原创资料!! 3) 从站时钟要从高一级设备或同一 ...
- 大数据开发平台-数据同步服务
什么是数据同步服务?顾名思义,就是在不同的系统之间同步数据.根据具体业务目的和应用场景的不同,各种数据同步服务框架的功能侧重点往往不尽相同,因而大家也会用各种大同小异的名称来称呼这类服务,比如数据传输 ...
- win2003能装mysql_win2003 安装2个mysql实例做主从同步服务配置
win2003 安装2个mysql实例做主从同步服务配置 2017年12月12日 | 萬仟网IT编程 | 我要评论 配置前的准备: 2台电脑,均安装windows2003 64位.均分三区c,d,e. ...
最新文章
- 多任务版udp聊天器
- 在C语言中,double、long、unsigned、int、char类型数据所占字节数
- Windows下安装Docker图解
- 使用AOP与注解记录Java日志
- 图像语义分割 -- UNET++
- 40个html作品,40个效果惊人的单页设计
- Robot Framework(十四) 扩展RobotFramework框架——创建测试库
- Flutter游戏:启动时的欢迎页
- Python 这么简单还用学吗?
- 删除ftp服务器上文件夹的方法,删除ftp服务器文件夹
- SAP PP销售预测转独立需求CODE
- 10周拿下腾讯数据分析师认证
- 将数据从前台传到后台方法总结
- matlab中删除矩阵中的某些行
- android原生widget 电量控制(PowerSave)设计浅析
- 【GDOI 2016 Day1】第四题 疯狂动物城
- 校招拼多多笔试题(前端工程师)
- 赴泰国的签证怎么办理
- 【C++】万年历的实现
- 建模杂谈系列142 关于MVD的思考
热门文章
- 全连接神经网络(DNN)
- 全国计算机等级考试电子版证书查询
- echarts热力背景图_echarts 热力图
- 百度“份量十足”,字节包容多样 |新年礼盒大赏时刻(附抽奖)
- 计算机信息管理专业的职业生涯规划,计算机信息管理专业职业生涯规划书
- vue的props传数组_详解vue.js之props传递参数
- CS5801AN是一个HDMI2.0b到DP1.4a转换器
- 国家电网泛在电力物联网分析—中台战略
- 关于米联客中pcie驱动编译失败的问题
- 线下分享 | 2018云和恩墨大讲堂深圳站