Storm-drpc
Getting Started with Storm
storm实战构建大数据实时计算
DRPC拓扑
分布式远程过程调用(DRPC),它利用Storm的分布式特性执行远程过程调用(RPC)。对每一次函数调用,Storm集群上运行的拓扑接收调用方法名称和参数作为输入流,经过一系列的计算,最终将计算结果作为输出流返回发射出去。
LinearDRPCTopologyBuilder
LinearDRPCTopologyBuilder是Storm提供的一个线形Topology builder,它可以自动完成几乎所有的DRPC步骤。它包括
- 设置Spout
- 返回结果到DRPC服务器
- 为Bolt提供有限的聚合元组的能力。
//org.apache.storm.drpc.LinearDRPCTopologyBuilderpublic class LinearDRPCTopologyBuilder {//构造函数,指定function名称public LinearDRPCTopologyBuilder(String function);//添加元组, 以及并发数public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt, Number parallelism);//创建本地拓扑:需要实现ILocalDRPC的drpc参数public StormTopology createLocalTopology(ILocalDRPC drpc){return createTopology(new DRPCSpout(_function, drpc));}//创建远程拓扑,不需要参数public StormTopology createRemoteTopology(){return createTopology(new DRPCSpout(_function));}//创建topologyprivate StormTopology createTopology(DRPCSpout spout);
}
重点分析下createTopology(DRPCSpout spout)
,其中spout
发送元组信息如下:
@Override
public void nextTuple() {DRPCRequest req = client.fetchRequest(_function);if(req.get_request_id().length() > 0) {Map returnInfo = new HashMap();returnInfo.put("id", req.get_request_id());returnInfo.put("host", client.getHost());returnInfo.put("port", client.getPort());//1.1请求参数 ,1.2.请求request_id+ip+host, //2..request_id_collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), i));break;}
}
private StormTopology createTopology(DRPCSpout spout){final String SPOUT_ID = "spout";final String PREPARE_ID = "prepare-request";TopologyBuilder builder = new TopologyBuilder();//预先创建一个spoutbuilder.setSpout(SPOUT_ID, spout);//创建一个PrepareRequest(生成一个请求id,为return-info创建一个流,为args创建一个流)builder.setBolt(PREPARE_ID, new PrepareRequest()).noneGrouping(SPOUT_ID);//省略中间处理步骤//创建CoordinatedBoltBoltDeclarer declarer = builder.setBolt(boltId(i),new CoordinatedBolt(component.bolt, source, idSpec),component.parallelism);//创建direct groupingsdeclarer.directGrouping(boltId(i-1), Constants.COORDINATED_STREAM_ID); //省略中间处理步骤//JoinResult 将结果与return-info拼接起来builder.setBolt(boltId(i), new JoinResult(PREPARE_ID)).fieldsGrouping(boltId(i-1), outputStream, new Fields(fields.get(0))).fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request"));i++;//ReturnResults:连接到DRPCServer,返回结构builder.setBolt(boltId(i), new ReturnResults()).noneGrouping(boltId(i-1));return builder.createTopology();}
}
本地模式DRPC
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.security.InvalidParameterException;
import java.util.Map;public class Chap03DrpcApp {public static void main(String[] args) {//创建一个LocalDRPC对象,模拟DRPC服务器LocalDRPC drpc = new LocalDRPC();LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("add");builder.addBolt(new AdderBolt(), 2);Config conf = new Config();conf.setDebug(true);LocalCluster cluster = new LocalCluster();cluster.submitTopology("drpcder-topology", conf,builder.createLocalTopology(drpc));String result = drpc.execute("add", "1+-1");System.out.println("result==="+result);result = drpc.execute("add", "1+1+5+10");System.out.println("result==="+result);cluster.shutdown();drpc.shutdown();}static class AdderBolt extends BaseBasicBolt {@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {//input.getString(0) ==== requestIdString[] numbers = input.getString(1).split("\\+");Integer added = 0;if(numbers.length<2){throw new InvalidParameterException("Should be at least 2 numbers");}for(String num : numbers){added += Integer.parseInt(num);}collector.emit(new Values(input.getValue(0),added));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id","result"));}}
}
远程模式DRPC
- 创建topology
public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("add");builder.addBolt(new AdderBolt(), 2);Config conf = new Config();conf.setDebug(false);//以远程模式启动StormSubmitter.submitTopology("remote",conf,builder.createRemoteTopology());}
- 开启drpc服务配置:只需要在
nimbus
开启即可。
drpc.servers:- "s156"
部署topology,
storm jar xxx.jar xxx.MainClazz
调用client
public void testDRPC() throws Exception {Config conf = new Config();//远程服务器地址, 默认端口3772DRPCClient client = new DRPCClient(conf,"s159", 3772);//调用远程方法,并获取结果String result = client.execute("remoteDRPC", "hello");}
LinearDRPCTopologyBuilder
只能处理“线性的”DRPC拓扑,若如果DRPC调用中包含复杂的带有分支和合并的Bolt拓扑,需要使用CoordinatedBolt
来完成这种非线性拓扑的计算。
Storm-drpc相关推荐
- Storm DRPC环境搭建笔记
Storm DRPC环境搭建笔记 By Mickey.Pro 1. 安装系统 CentOS 6.3 64bit minimal http://www.osyunwei.com/archives/475 ...
- Storm DRPC 使用及访问C++ Bolt问题的解决方法
原创文章,欢迎转载,转载请注明出处:http://blog.csdn.net/jmppok/article/details/16840231 参考1: storm下运行C++程序(一) 参考2: St ...
- Twitter Storm: DRPC学习
学习途径 作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明 网址: http://xumingming.sinaapp.com/756/twitte ...
- storm DRPC问题
一.配置集群storm.yaml文件,配置drpc.server. 二.开启drpc服务,storm drpc. 三.编写DrpcTopology程序.如下: <span style=" ...
- storm DRPC例子
1,DRPC原理 客户端给DRPC服务器发送要执行的方法的名字,以及这个方法的参数.实现了这个函数的topology使用DRPCSpout从DRPC服务器接收函 数调用流.每个函数调用被DRPC服务器 ...
- storm drpc学习
storm drpc 是什么?咋一听觉得挺高大上的,其实也就是那么一回事.就是storm的topology 提供了很多函数,并且函数名唯一,函数里面封装了一些算法操作.只需要在调用的时候指定函数名和传 ...
- storm drpc
转 http://www.cnblogs.com/panfeng412/archive/2012/07/02/storm-common-patterns-of-distributed-rpc.html ...
- storm DRPC指南
storm DRPC指南 @(STORM)[storm] storm DRPC指南 一什么是DRPC 二LocalDPRC的例子 三RemoteDRPC将DPRC拓扑提交至集群 1启动DRPC服务器 ...
- Storm DRPC 使用
来源:http://blog.csdn.net/jmppok/article/details/16839363 1. DRPC介绍 Storm是一个分布式实时处理框架,它支持以DRPC方式调用.可以理 ...
- storm drpc实例
为什么80%的码农都做不了架构师?>>> 序 本文主要演示一下storm drpc实例 配置 version: '2' services:supervisor:image: s ...
最新文章
- Java中的显示锁 ReentrantLock 和 ReentrantReadWriteLock
- windows server 2003 IIS6.0部署PHP
- Entropy Broker 2.0 发布,加密安全随机数
- JS作用域理解(声明提升)
- IT项目需求分析的重点关注事项
- emqx—mqtt消息服务器
- java setlt;intgt;_java使用Nagao算法实现新词发现、热门词的挖掘
- Apache Hudi 在 B 站构建实时数据湖的实践
- 「代码随想录」本周学习小结!(动态规划系列四)
- UNIX环境高级编程之第5章:标准I/O库-习题
- 洛谷P4548 [CTSC2006]歌唱王国(概率生成函数)
- 现在的技术人啊,1小时200的活都看不上了吗?
- 《查拉图斯特拉如是说》摘抄
- android,手机 遥控,
- Angelababy否认已怀身孕:我怎么不知道
- 研究超音波应用 利用声音作为新的通讯协议
- WIFI遥控小车 —— 基于ESP8266_01
- 2021-4-16美股交易第一课(暗盘)
- 为什么现在的以太网中,同轴电缆已被双绞线取代?
- 【观察】新数据时代下的浪潮存储,背后的跨越、突破与探索
热门文章
- PHP笔试题(六)_易可易
- 关于LCD的烧屏问题
- 全国计算机等级考试电子版证书查询
- 流媒体发展新趋势 p2p网络技术 p2p穿透 p2p音视频解决方案
- 基于P2P的流媒体技术概述
- CF786B Legacy(线段树优化建图)
- 今天公布!英语四六级考试成绩
- 100个人围成一圈c语言,C语言 约瑟夫圈问题:N个人围成一圈,从第一个人开始按顺序报数并编号1,2,3,……N,然后开始从第一个人转圈报数,凡是报到3的退出圈子。则剩下的最后一个人编号是多少。...
- 新世纪直升机技术的发展趋势
- java输出排列整齐,java 每次输出有单个数字和好几位数字时候 排列不整齐。。这个不知道有没有办法解决的?...