Getting Started with Storm
storm实战构建大数据实时计算

DRPC拓扑

分布式远程过程调用(DRPC),它利用Storm的分布式特性执行远程过程调用(RPC)。对每一次函数调用,Storm集群上运行的拓扑接收调用方法名称和参数作为输入流,经过一系列的计算,最终将计算结果作为输出流返回发射出去。

LinearDRPCTopologyBuilder

LinearDRPCTopologyBuilder是Storm提供的一个线形Topology builder,它可以自动完成几乎所有的DRPC步骤。它包括

  • 设置Spout
  • 返回结果到DRPC服务器
  • 为Bolt提供有限的聚合元组的能力。
//org.apache.storm.drpc.LinearDRPCTopologyBuilderpublic class LinearDRPCTopologyBuilder {//构造函数,指定function名称public LinearDRPCTopologyBuilder(String function);//添加元组,   以及并发数public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt, Number parallelism);//创建本地拓扑:需要实现ILocalDRPC的drpc参数public StormTopology createLocalTopology(ILocalDRPC drpc){return createTopology(new DRPCSpout(_function, drpc));}//创建远程拓扑,不需要参数public StormTopology createRemoteTopology(){return createTopology(new DRPCSpout(_function));}//创建topologyprivate StormTopology createTopology(DRPCSpout spout);
}

重点分析下createTopology(DRPCSpout spout),其中spout发送元组信息如下:

@Override
public void nextTuple() {DRPCRequest req = client.fetchRequest(_function);if(req.get_request_id().length() > 0) {Map returnInfo = new HashMap();returnInfo.put("id", req.get_request_id());returnInfo.put("host", client.getHost());returnInfo.put("port", client.getPort());//1.1请求参数 ,1.2.请求request_id+ip+host, //2..request_id_collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), i));break;}
}
 private StormTopology createTopology(DRPCSpout spout){final String SPOUT_ID = "spout";final String PREPARE_ID = "prepare-request";TopologyBuilder builder = new TopologyBuilder();//预先创建一个spoutbuilder.setSpout(SPOUT_ID, spout);//创建一个PrepareRequest(生成一个请求id,为return-info创建一个流,为args创建一个流)builder.setBolt(PREPARE_ID, new PrepareRequest()).noneGrouping(SPOUT_ID);//省略中间处理步骤//创建CoordinatedBoltBoltDeclarer declarer = builder.setBolt(boltId(i),new CoordinatedBolt(component.bolt, source, idSpec),component.parallelism);//创建direct groupingsdeclarer.directGrouping(boltId(i-1), Constants.COORDINATED_STREAM_ID); //省略中间处理步骤//JoinResult 将结果与return-info拼接起来builder.setBolt(boltId(i), new JoinResult(PREPARE_ID)).fieldsGrouping(boltId(i-1), outputStream, new Fields(fields.get(0))).fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request"));i++;//ReturnResults:连接到DRPCServer,返回结构builder.setBolt(boltId(i), new ReturnResults()).noneGrouping(boltId(i-1));return builder.createTopology();}
}

本地模式DRPC

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.security.InvalidParameterException;
import java.util.Map;public class Chap03DrpcApp {public static void main(String[] args) {//创建一个LocalDRPC对象,模拟DRPC服务器LocalDRPC drpc = new LocalDRPC();LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("add");builder.addBolt(new AdderBolt(), 2);Config conf = new Config();conf.setDebug(true);LocalCluster cluster = new LocalCluster();cluster.submitTopology("drpcder-topology", conf,builder.createLocalTopology(drpc));String result = drpc.execute("add", "1+-1");System.out.println("result==="+result);result = drpc.execute("add", "1+1+5+10");System.out.println("result==="+result);cluster.shutdown();drpc.shutdown();}static class AdderBolt extends BaseBasicBolt {@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {//input.getString(0) ==== requestIdString[] numbers = input.getString(1).split("\\+");Integer added = 0;if(numbers.length<2){throw new InvalidParameterException("Should be at least 2 numbers");}for(String num : numbers){added += Integer.parseInt(num);}collector.emit(new Values(input.getValue(0),added));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id","result"));}}
}

远程模式DRPC

  • 创建topology
 public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("add");builder.addBolt(new AdderBolt(), 2);Config conf = new Config();conf.setDebug(false);//以远程模式启动StormSubmitter.submitTopology("remote",conf,builder.createRemoteTopology());}
  • 开启drpc服务配置:只需要在nimbus开启即可。
drpc.servers:- "s156"
  • 部署topology, storm jar xxx.jar xxx.MainClazz

  • 调用client

 public void testDRPC() throws Exception {Config conf = new Config();//远程服务器地址, 默认端口3772DRPCClient client = new DRPCClient(conf,"s159", 3772);//调用远程方法,并获取结果String result = client.execute("remoteDRPC", "hello");}

LinearDRPCTopologyBuilder只能处理“线性的”DRPC拓扑,若如果DRPC调用中包含复杂的带有分支和合并的Bolt拓扑,需要使用CoordinatedBolt来完成这种非线性拓扑的计算。

Storm-drpc相关推荐

  1. Storm DRPC环境搭建笔记

    Storm DRPC环境搭建笔记 By Mickey.Pro 1. 安装系统 CentOS 6.3 64bit minimal http://www.osyunwei.com/archives/475 ...

  2. Storm DRPC 使用及访问C++ Bolt问题的解决方法

    原创文章,欢迎转载,转载请注明出处:http://blog.csdn.net/jmppok/article/details/16840231 参考1: storm下运行C++程序(一) 参考2: St ...

  3. Twitter Storm: DRPC学习

    学习途径 作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明 网址: http://xumingming.sinaapp.com/756/twitte ...

  4. storm DRPC问题

    一.配置集群storm.yaml文件,配置drpc.server. 二.开启drpc服务,storm drpc. 三.编写DrpcTopology程序.如下: <span style=" ...

  5. storm DRPC例子

    1,DRPC原理 客户端给DRPC服务器发送要执行的方法的名字,以及这个方法的参数.实现了这个函数的topology使用DRPCSpout从DRPC服务器接收函 数调用流.每个函数调用被DRPC服务器 ...

  6. storm drpc学习

    storm drpc 是什么?咋一听觉得挺高大上的,其实也就是那么一回事.就是storm的topology 提供了很多函数,并且函数名唯一,函数里面封装了一些算法操作.只需要在调用的时候指定函数名和传 ...

  7. storm drpc

    转 http://www.cnblogs.com/panfeng412/archive/2012/07/02/storm-common-patterns-of-distributed-rpc.html ...

  8. storm DRPC指南

    storm DRPC指南 @(STORM)[storm] storm DRPC指南 一什么是DRPC 二LocalDPRC的例子 三RemoteDRPC将DPRC拓扑提交至集群 1启动DRPC服务器 ...

  9. Storm DRPC 使用

    来源:http://blog.csdn.net/jmppok/article/details/16839363 1. DRPC介绍 Storm是一个分布式实时处理框架,它支持以DRPC方式调用.可以理 ...

  10. storm drpc实例

    为什么80%的码农都做不了架构师?>>>    序 本文主要演示一下storm drpc实例 配置 version: '2' services:supervisor:image: s ...

最新文章

  1. Java中的显示锁 ReentrantLock 和 ReentrantReadWriteLock
  2. windows server 2003 IIS6.0部署PHP
  3. Entropy Broker 2.0 发布,加密安全随机数
  4. JS作用域理解(声明提升)
  5. IT项目需求分析的重点关注事项
  6. emqx—mqtt消息服务器
  7. java setlt;intgt;_java使用Nagao算法实现新词发现、热门词的挖掘
  8. Apache Hudi 在 B 站构建实时数据湖的实践
  9. 「代码随想录」本周学习小结!(动态规划系列四)
  10. UNIX环境高级编程之第5章:标准I/O库-习题
  11. 洛谷P4548 [CTSC2006]歌唱王国(概率生成函数)
  12. 现在的技术人啊,1小时200的活都看不上了吗?
  13. 《查拉图斯特拉如是说》摘抄
  14. android,手机 遥控,
  15. Angelababy否认已怀身孕:我怎么不知道
  16. 研究超音波应用 利用声音作为新的通讯协议
  17. WIFI遥控小车 —— 基于ESP8266_01
  18. 2021-4-16美股交易第一课(暗盘)
  19. 为什么现在的以太网中,同轴电缆已被双绞线取代?
  20. 【观察】新数据时代下的浪潮存储,背后的跨越、突破与探索

热门文章

  1. PHP笔试题(六)_易可易
  2. 关于LCD的烧屏问题
  3. 全国计算机等级考试电子版证书查询
  4. 流媒体发展新趋势 p2p网络技术 p2p穿透 p2p音视频解决方案
  5. 基于P2P的流媒体技术概述
  6. CF786B Legacy(线段树优化建图)
  7. 今天公布!英语四六级考试成绩
  8. 100个人围成一圈c语言,C语言 约瑟夫圈问题:N个人围成一圈,从第一个人开始按顺序报数并编号1,2,3,……N,然后开始从第一个人转圈报数,凡是报到3的退出圈子。则剩下的最后一个人编号是多少。...
  9. 新世纪直升机技术的发展趋势
  10. java输出排列整齐,java 每次输出有单个数字和好几位数字时候 排列不整齐。。这个不知道有没有办法解决的?...