参照上图

构建DRPC拓补图的拓补构造器:

package backtype.storm.drpc;import backtype.storm.Constants;
import backtype.storm.ILocalDRPC;
import backtype.storm.coordination.BatchBoltExecutor;
import backtype.storm.coordination.CoordinatedBolt;
import backtype.storm.coordination.CoordinatedBolt.FinishedCallback;
import backtype.storm.coordination.CoordinatedBolt.IdStreamSpec;
import backtype.storm.coordination.CoordinatedBolt.SourceArgs;
import backtype.storm.coordination.IBatchBolt;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.StreamInfo;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.topology.*;
import backtype.storm.tuple.Fields;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;// Trident subsumes the functionality provided by this class, so it's deprecated
@Deprecated
public class LinearDRPCTopologyBuilder {String _function;List<Component> _components = new ArrayList<Component>();public LinearDRPCTopologyBuilder(String function) {_function = function;}public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt, Number parallelism) {return addBolt(new BatchBoltExecutor(bolt), parallelism);}public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt) {return addBolt(bolt, 1);}@Deprecatedpublic LinearDRPCInputDeclarer addBolt(IRichBolt bolt, Number parallelism) {if (parallelism == null)parallelism = 1;Component component = new Component(bolt, parallelism.intValue());_components.add(component);return new InputDeclarerImpl(component);}@Deprecatedpublic LinearDRPCInputDeclarer addBolt(IRichBolt bolt) {return addBolt(bolt, null);}public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt, Number parallelism) {return addBolt(new BasicBoltExecutor(bolt), parallelism);}public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt) {return addBolt(bolt, null);}public StormTopology createLocalTopology(ILocalDRPC drpc) {return createTopology(new DRPCSpout(_function, drpc));}public StormTopology createRemoteTopology() {return createTopology(new DRPCSpout(_function));}private StormTopology createTopology(DRPCSpout spout) {final String SPOUT_ID = "spout";final String PREPARE_ID = "prepare-request";TopologyBuilder builder = new TopologyBuilder();builder.setSpout(SPOUT_ID, spout);builder.setBolt(PREPARE_ID, new PrepareRequest()).noneGrouping(SPOUT_ID);int i = 0;for (; i < _components.size(); i++) {Component component = _components.get(i);Map<String, SourceArgs> source = new HashMap<String, SourceArgs>();if (i == 1) {source.put(boltId(i - 1), SourceArgs.single());} else if (i >= 2) {source.put(boltId(i - 1), SourceArgs.all());}IdStreamSpec idSpec = null;if (i == _components.size() - 1&& component.bolt instanceof FinishedCallback) {idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID,PrepareRequest.ID_STREAM);}BoltDeclarer declarer = builder.setBolt(boltId(i),new CoordinatedBolt(component.bolt, source, idSpec),component.parallelism);for (Map conf : component.componentConfs) {declarer.addConfigurations(conf);}if (idSpec != null) {declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), PrepareRequest.ID_STREAM,new Fields("request"));}if (i == 0 && component.declarations.isEmpty()) {declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM);} else {String prevId;if (i == 0) {prevId = PREPARE_ID;} else {prevId = boltId(i - 1);}for (InputDeclaration declaration : component.declarations) {declaration.declare(prevId, declarer);}}if (i > 0) {declarer.directGrouping(boltId(i - 1),Constants.COORDINATED_STREAM_ID);}}IRichBolt lastBolt = _components.get(_components.size() - 1).bolt;OutputFieldsGetter getter = new OutputFieldsGetter();lastBolt.declareOutputFields(getter);Map<String, StreamInfo> streams = getter.getFieldsDeclaration();if (streams.size() != 1) {throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology");}String outputStream = streams.keySet().iterator().next();List<String> fields = streams.get(outputStream).get_output_fields();if (fields.size() != 2) {throw new RuntimeException("Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result.");}<span style="color:#ff0000;"><span style="background-color: rgb(153, 153, 153);">builder.setBolt(boltId(i), new JoinResult(PREPARE_ID))</span>.fieldsGrouping(boltId(i - 1), outputStream,new Fields(fields.get(0))).fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM,new Fields("request"));i++;<span style="background-color: rgb(153, 153, 153);">builder.setBolt(boltId(i), new ReturnResults()).noneGrouping(</span>boltId(i - 1));return builder.createTopology();</span>}private static String boltId(int index) {return "bolt" + index;}private static class Component {public IRichBolt bolt;public int parallelism;public List<Map> componentConfs;public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();public Component(IRichBolt bolt, int parallelism) {this.bolt = bolt;this.parallelism = parallelism;this.componentConfs = new ArrayList();}}private static interface InputDeclaration {public void declare(String prevComponent, InputDeclarer declarer);}private class InputDeclarerImpl extendsBaseConfigurationDeclarer<LinearDRPCInputDeclarer> implementsLinearDRPCInputDeclarer {Component _component;public InputDeclarerImpl(Component component) {_component = component;}@Overridepublic LinearDRPCInputDeclarer fieldsGrouping(final Fields fields) {addDeclaration(new InputDeclaration() {@Overridepublic void declare(String prevComponent, InputDeclarer declarer) {declarer.fieldsGrouping(prevComponent, fields);}});return this;}@Overridepublic LinearDRPCInputDeclarer fieldsGrouping(final String streamId,final Fields fields) {addDeclaration(new InputDeclaration() {@Overridepublic void declare(String prevComponent, InputDeclarer declarer) {declarer.fieldsGrouping(prevComponent, streamId, fields);}});return this;}@Overridepublic LinearDRPCInputDeclarer globalGrouping() {addDeclaration(new InputDeclaration() {@Overridepublic void declare(String prevComponent, InputDeclarer declarer) {declarer.globalGrouping(prevComponent);}});return this;}@Overridepublic LinearDRPCInputDeclarer globalGrouping(final String streamId) {addDeclaration(new InputDeclaration() {@Overridepublic void declare(String prevComponent, InputDeclarer declarer) {declarer.globalGrouping(prevComponent, streamId);}});return this;}@Overridepublic LinearDRPCInputDeclarer shuffleGrouping() {addDeclaration(new InputDeclaration() {@Overridepublic void declare(String prevComponent, InputDeclarer declarer) {declarer.shuffleGrouping(prevComponent);}});return this;}@Overridepublic LinearDRPCInputDeclarer shuffleGrouping(final String streamId) {addDeclaration(new InputDeclaration() {@Overridepublic void declare(String prevComponent, InputDeclarer declarer) {declarer.shuffleGrouping(prevComponent, streamId);}});return this;}@Overridepublic LinearDRPCInputDeclarer localOrShuffleGrouping() {addDeclaration(new InputDeclaration() {@Overridepublic void declare(String prevComponent, InputDeclarer declarer) {declarer.localOrShuffleGrouping(prevComponent);}});return this;}@Overridepublic LinearDRPCInputDeclarer localOrShuffleGrouping(final String streamId) {addDeclaration(new InputDeclaration() {@Overridepublic void declare(String prevComponent, InputDeclarer declarer) {declarer.localOrShuffleGrouping(prevComponent, streamId);}});return this;}@Overridepublic LinearDRPCInputDeclarer noneGrouping() {addDeclaration(new InputDeclaration() {@Overridepublic void declare(String prevComponent, InputDeclarer declarer) {declarer.noneGrouping(prevComponent);}});return this;}@Overridepublic LinearDRPCInputDeclarer noneGrouping(final String streamId) {addDeclaration(new InputDeclaration() {@Overridepublic void declare(String prevComponent, InputDeclarer declarer) {declarer.noneGrouping(prevComponent, streamId);}});return this;}@Overridepublic LinearDRPCInputDeclarer allGrouping() {addDeclaration(new InputDeclaration() {@Overridepublic void declare(String prevComponent, InputDeclarer declarer) {declarer.allGrouping(prevComponent);}});return this;}@Overridepublic LinearDRPCInputDeclarer allGrouping(final String streamId) {addDeclaration(new InputDeclaration() {@Overridepublic void declare(String prevComponent, InputDeclarer declarer) {declarer.allGrouping(prevComponent, streamId);}});return this;}@Overridepublic LinearDRPCInputDeclarer directGrouping() {addDeclaration(new InputDeclaration() {@Overridepublic void declare(String prevComponent, InputDeclarer declarer) {declarer.directGrouping(prevComponent);}});return this;}@Overridepublic LinearDRPCInputDeclarer directGrouping(final String streamId) {addDeclaration(new InputDeclaration() {@Overridepublic void declare(String prevComponent, InputDeclarer declarer) {declarer.directGrouping(prevComponent, streamId);}});return this;}@Overridepublic LinearDRPCInputDeclarer customGrouping(final CustomStreamGrouping grouping) {addDeclaration(new InputDeclaration() {@Overridepublic void declare(String prevComponent, InputDeclarer declarer) {declarer.customGrouping(prevComponent, grouping);}});return this;}@Overridepublic LinearDRPCInputDeclarer customGrouping(final String streamId,final CustomStreamGrouping grouping) {addDeclaration(new InputDeclaration() {@Overridepublic void declare(String prevComponent, InputDeclarer declarer) {declarer.customGrouping(prevComponent, streamId, grouping);}});return this;}private void addDeclaration(InputDeclaration declaration) {_component.declarations.add(declaration);}@Overridepublic LinearDRPCInputDeclarer addConfigurations(Map conf) {_component.componentConfs.add(conf);return this;}}
}
package backtype.storm.drpc;import backtype.storm.Config;
import backtype.storm.ILocalDRPC;
import backtype.storm.generated.DRPCRequest;
import backtype.storm.generated.DistributedRPCInvocations;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.ServiceRegistry;
import backtype.storm.utils.Utils;
import com.alibaba.fastjson.JSON;
import org.apache.thrift7.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class DRPCSpout extends BaseRichSpout {public static Logger LOG = LoggerFactory.getLogger(DRPCSpout.class);SpoutOutputCollector _collector;List<DRPCInvocationsClient> _clients = new ArrayList<DRPCInvocationsClient>();String _function;String _local_drpc_id = null;private static class DRPCMessageId {String id;int index;public DRPCMessageId(String id, int index) {this.id = id;this.index = index;}}public DRPCSpout(String function) {_function = function;}public DRPCSpout(String function, ILocalDRPC drpc) {_function = function;_local_drpc_id = drpc.getServiceId();}@Overridepublic void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {_collector = collector;if (_local_drpc_id == null) {int numTasks = context.getComponentTasks(context.getThisComponentId()).size();int index = context.getThisTaskIndex();int port = Utils.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT));List<String> servers = (List<String>) conf.get(Config.DRPC_SERVERS);if (servers == null || servers.isEmpty()) {throw new RuntimeException("No DRPC servers configured for topology");}if (numTasks < servers.size()) {for (String s : servers) {_clients.add(new DRPCInvocationsClient(s, port));}} else {int i = index % servers.size();_clients.add(new DRPCInvocationsClient(servers.get(i), port));}}}@Overridepublic void close() {for (DRPCInvocationsClient client : _clients) {client.close();}}@Overridepublic void nextTuple() {boolean gotRequest = false;if (_local_drpc_id == null) {for (int i = 0; i < _clients.size(); i++) {DRPCInvocationsClient client = _clients.get(i);try {DRPCRequest req = client.fetchRequest(_function);if (req.get_request_id().length() > 0) {Map returnInfo = new HashMap();returnInfo.put("id", req.get_request_id());returnInfo.put("host", client.getHost());returnInfo.put("port", client.getPort());gotRequest = true;_collector.emit(new Values(req.get_func_args(),JSON.toJSONString(returnInfo)),new DRPCMessageId(req.get_request_id(), i));break;}} catch (TException e) {LOG.error("Failed to fetch DRPC result from DRPC server", e);}}} else {DistributedRPCInvocations.Iface drpc = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);if (drpc != null) { // can happen during shutdown of drpc while// topology is still uptry {DRPCRequest req = drpc.fetchRequest(_function);if (req.get_request_id().length() > 0) {Map returnInfo = new HashMap();returnInfo.put("id", req.get_request_id());returnInfo.put("host", _local_drpc_id);returnInfo.put("port", 0);gotRequest = true;_collector.emit(new Values(req.get_func_args(),JSON.toJSONString(returnInfo)),new DRPCMessageId(req.get_request_id(), 0));}} catch (TException e) {throw new RuntimeException(e);}}}if (!gotRequest) {Utils.sleep(1);}}@Overridepublic void ack(Object msgId) {}@Overridepublic void fail(Object msgId) {DRPCMessageId did = (DRPCMessageId) msgId;DistributedRPCInvocations.Iface client;if (_local_drpc_id == null) {client = _clients.get(did.index);} else {client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);}try {client.failRequest(did.id);} catch (TException e) {LOG.error("Failed to fail request", e);}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("args", "return-info"));}
}

returnResult源码

package backtype.storm.drpc;import backtype.storm.Config;
import backtype.storm.generated.DistributedRPCInvocations;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.ServiceRegistry;
import backtype.storm.utils.Utils;
import com.alibaba.fastjson.JSON;
import org.apache.thrift7.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class ReturnResults extends BaseRichBolt {public static final Logger LOG = LoggerFactory.getLogger(ReturnResults.class);OutputCollector _collector;boolean local;Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();@Overridepublic void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {_collector = collector;local = stormConf.get(Config.STORM_CLUSTER_MODE).equals("local");}@Overridepublic void execute(Tuple input) {String result = (String) input.getValue(0);String returnInfo = (String) input.getValue(1);if (returnInfo != null) {Map retMap = (Map) JSON.parse(returnInfo);final String host = (String) retMap.get("host");final int port = Utils.getInt(retMap.get("port"));String id = (String) retMap.get("id");DistributedRPCInvocations.Iface client;if (local) {client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host);} else {List server = new ArrayList() {{add(host);add(port);}};if (!_clients.containsKey(server)) {_clients.put(server, new DRPCInvocationsClient(host, port));}client = _clients.get(server);}try {client.result(id, result);_collector.ack(input);} catch (TException e) {LOG.error("Failed to return results to DRPC server", e);_collector.fail(input);}}}@Overridepublic void cleanup() {for (DRPCInvocationsClient c : _clients.values()) {c.close();}}public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

一张图让你了解Storm的DRPC实现原理相关推荐

  1. 原来10张图就可以搞懂分布式链路追踪系统原理

    分布式系统为什么需要链路追踪? 随着互联网业务快速扩展,软件架构也日益变得复杂,为了适应海量用户高并发请求,系统中越来越多的组件开始走向分布式化,如单体架构拆分为微服务.服务内缓存变为分布式缓存.服务 ...

  2. 【Linux】一张图让你读懂Linux内核运行原理

    参考资料 http://makelinux.net/kernel_map/

  3. 15张图呈现数据库事务背后的并发原理

    本文分享自华为云社区<将数据库9种锁.3种读.4种隔离级别一次性串联起来,用15张图呈现背后数据库事务背后的并发原理>,作者: breakDawn. 前段时间开发时,正好遇到了2个进程同时 ...

  4. 一张图剖析企业大数据平台的核心架构

    我们先来看看这张图,这是某公司使用的大数据平台架构图,大部分公司应该都差不多: 从这张大数据的整体架构图上看来,大数据的核心层应该是:数据采集层.数据存储与分析层.数据共享层.数据应用层,可能叫法有所 ...

  5. GIS大数据开发框架一张图

    文章目录 1 前言 2 一张图 3 参考资料 1 前言 由于自己经验并不丰富,理解也不深刻,但是被hadoop.spark.flink.geospark.sedona之间的关系搞得头晕,于是搜集了网络 ...

  6. 不带头节点的链表有哪些缺点_23张图!万字详解「链表」,从小白到大佬!

    链表和数组是数据类型中两个重要又常用的基础数据类型. 数组是连续存储在内存中的数据结构,因此它的优势是可以通过下标迅速的找到元素的位置,而它的缺点则是在插入和删除元素时会导致大量元素的被迫移动,为了解 ...

  7. 3dmax图像采样器抗锯齿_内幕揭秘!同样的场景同一张图,用3DMAX网渲平台进行二次渲染时间竟然相差3个小时之多!...

    一个分辨率:4000*2000的室内客餐厅,3dmax版本是2014版本,渲染器版本为vray3.63,机器:阿里云1台服务器,这个同样的场景同样的参数同一张图,用3dmax网渲平台进行二次渲染发现时 ...

  8. 17张图揭密支付宝系统架构

    支付宝的系统架构图,仅供参考.不管是不是支付行业,都值得我们参考,学习. image image image image image image image image image image ima ...

  9. 参加海峡两岸城市地理信息系统论坛2010 年会(一张图、规划信息化和空间句法的碎碎念)...

    上周末去清华建筑学院开了个会,叫做海峡两岸城市地理信息系统论坛2010 年会,主题很大,但是内容比较集中一些,就是围绕着GIS与城市规划.一天下来听了20个报告,挺佩服主办方的时间控制,这么密集的报告 ...

最新文章

  1. atexit()函数
  2. 视频导切台RGBlink 控制软件下载与测试
  3. python求众数程序_python求众数问题实例
  4. No sleep, no sex, no life,程序员这次忍不了了
  5. 电网操作:线路、主变、母线操作讲解
  6. 在双系统(Windows与Ubuntu)下删除Ubuntu启动项
  7. C/C++进程文件锁 之 fcntl函数的用法总结(非阻塞O_NONBLOCK)
  8. 常用WebService一览表(一)
  9. linux查看报警信息,linux_监控zabbix微信报警详细步骤
  10. java 高级api_Java常用API-高级
  11. 什么软件可以测试电脑硬盘速度,硬盘速度测试软件
  12. scratch课程案例——蜘蛛森林
  13. sourceTree 添加 ssh key 方法
  14. Kattis Simon Says
  15. 必做: 1041、1024、1077、2218、1183(较难)
  16. Excel怎么快速计算人数
  17. Linux实用技巧——paste横向合并文件内容
  18. 较全面的常见的OJ评判结果以及它们表示的意思
  19. 监控眼 android,监控眼XMEye
  20. [GridView]解决js-xlsx导出Excel时数字太长变为科学计数的问题

热门文章

  1. React入门(4)--react提升项目性能的Api(pureComponent、memo、useMemo、useCallback)
  2. 定时 关闭和开启 FTP SERVICE 服务 命令
  3. 物联网中传感器和执行器的类型
  4. 微软最牛MS08-067漏洞各系统补丁下载地址
  5. 什么样的产品 网站值得我们坚持下去?
  6. iframe打开base64格式的PDF显示空白
  7. android自动微信支付平台,Android微信APP支付开发要点
  8. chrome中任意修改网页内容
  9. 解决联想电脑所有浏览器主页被篡改成百度网址导航
  10. 用Java实现生命游戏