欢迎转载,转载请注明出处,徽沪一郎.

本文通过BasicDRPCTopology的实例来分析DRPCTopology在提交的时候, Topology中究竟含有哪些内容?

BasicDRPCTopology

main函数

DRPC 分布式远程调用(这个说法有意思,远程调用本来就是分布的,何须再加个D, <头文字D>看多了, :)

public static void main(String[] args) throws Exception {LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");builder.addBolt(new ExclaimBolt(), 3);Config conf = new Config();if (args == null || args.length == 0) {LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));for (String word : new String[]{ "hello", "goodbye" }) {System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));}cluster.shutdown();drpc.shutdown();}else {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());}}

问题: 上面的代码中只是添加了一个bolt,并没有设定Spout. 我们知道一个topology中最起码得有一个Spout,那么这里的Spout又隐身于何处呢?

关键的地方就在builder.createLocalTopology, 调用关系如下

  • LinearDRPCTopologyBuilder::createLocalTopology

    •   LinearDRPCTopologyBuilder::createTopology()

      •   LinearDRPCTopologyBuilder::createTopology(new DRPCSpout(_function))

原来DRPCTopology中使用的Spout是DRPCSpout.

LinearDRPCTopology::createTopology

既然代码已经读到此处,何不再进一步看看createTopology的实现.

简要说明一下该段代码的处理逻辑:

  1. 设置DRPCSpout
  2. 以bolt为入参,创建CoordinatedBolt
  3. 添加JoinResult Bolt
  4. 添加ReturnResult Bolt: ReturnResultBolt连接到DRPCServer,并返回结果
    private StormTopology createTopology(DRPCSpout spout) {final String SPOUT_ID = "spout";final String PREPARE_ID = "prepare-request";TopologyBuilder builder = new TopologyBuilder();builder.setSpout(SPOUT_ID, spout);builder.setBolt(PREPARE_ID, new PrepareRequest()).noneGrouping(SPOUT_ID);int i=0;for(; i<_components.size();i++) {Component component = _components.get(i);Map<String, SourceArgs> source = new HashMap<String, SourceArgs>();if (i==1) {source.put(boltId(i-1), SourceArgs.single());} else if (i>=2) {source.put(boltId(i-1), SourceArgs.all());}IdStreamSpec idSpec = null;if(i==_components.size()-1 && component.bolt instanceof FinishedCallback) {idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, PrepareRequest.ID_STREAM);}BoltDeclarer declarer = builder.setBolt(boltId(i),new CoordinatedBolt(component.bolt, source, idSpec),component.parallelism);for(Map conf: component.componentConfs) {declarer.addConfigurations(conf);}if(idSpec!=null) {declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), PrepareRequest.ID_STREAM, new Fields("request"));}if(i==0 && component.declarations.isEmpty()) {declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM);} else {String prevId;if(i==0) {prevId = PREPARE_ID;} else {prevId = boltId(i-1);}for(InputDeclaration declaration: component.declarations) {declaration.declare(prevId, declarer);}}if(i>0) {declarer.directGrouping(boltId(i-1), Constants.COORDINATED_STREAM_ID); }}IRichBolt lastBolt = _components.get(_components.size()-1).bolt;OutputFieldsGetter getter = new OutputFieldsGetter();lastBolt.declareOutputFields(getter);Map<String, StreamInfo> streams = getter.getFieldsDeclaration();if(streams.size()!=1) {throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology");}String outputStream = streams.keySet().iterator().next();List<String> fields = streams.get(outputStream).get_output_fields();if(fields.size()!=2) {throw new RuntimeException("Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result.");}builder.setBolt(boltId(i), new JoinResult(PREPARE_ID)).fieldsGrouping(boltId(i-1), outputStream, new Fields(fields.get(0))).fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request"));i++;builder.setBolt(boltId(i), new ReturnResults()).noneGrouping(boltId(i-1));return builder.createTopology();}

Bolt

处理逻辑: 在接收到的每一个单词后面添加'!'.

 public static class ExclaimBolt extends BaseBasicBolt {@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String input = tuple.getString(1);collector.emit(new Values(tuple.getValue(0), input + "!"));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "result"));}}

运行

java -cp $(lein classpath) storm.starter.BasicDRPCTopology

转载于:https://www.cnblogs.com/hseagle/p/3511185.html

Apache Storm技术实战之2 -- BasicDRPCTopology相关推荐

  1. Apache Storm技术实战之3 -- TridentWordCount

    欢迎转载,转载请注明出处. 介绍TridentTopology的使用,重点分析newDRPCStream和stateQuery的实现机理. 使用TridentTopology进行数据处理的时候,经常会 ...

  2. Apache Spark技术实战之6 -- spark-submit常见问题及其解决

    除本人同意外,严禁一切转载,徽沪一郎. 概要 编写了独立运行的Spark Application之后,需要将其提交到Spark Cluster中运行,一般会采用spark-submit来进行应用的提交 ...

  3. 《大数据架构和算法实现之路:电商系统的技术实战》——1.5 相关软件:R和Mahout...

    本节书摘来自华章计算机<大数据架构和算法实现之路:电商系统的技术实战>一书中的第1章,第1.5节,作者 黄 申,更多章节内容可以访问云栖社区"华章计算机"公众号查看. ...

  4. Spotify如何对Apache Storm进行规模扩展

    [编者的话]Spotify是一家音乐流媒体服务商,最新的数据显示他们已经有6000万用户.Spotify内部使用Apache Storm来构建实时类系统,包括广告定位.音乐推荐以及数据可视化等.本文来 ...

  5. BigData之Storm:Apache Storm的简介、深入理解、下载、案例应用之详细攻略

    BigData之Storm:Apache Storm的简介.深入理解.下载.案例应用之详细攻略 目录 Apache Storm的简介 Apache Storm的深入理解 1.Storm与hadoop ...

  6. storm apache java_Apache Ignite与Apache Storm(深入)

    Apache Ignite和Apache Storm在很多方面都是两种截然不同的技术 - 特别是因为Storm有一个非常具体的用例,而Ignite在同一个屋檐下有相当多的工具 . 据我了解,Ignit ...

  7. apache ignite_使用Apache Storm和Apache Ignite进行复杂事件处理(CEP)

    apache ignite 在本文中, "使用Apache Ignite进行高性能内存计算"一书的作者将讨论使用Apache Strom和Apache Ignite进行复杂的事件处 ...

  8. kite 使用 go_使用Apache Storm和Kite SDK Morphlines的可配置ETL处理

    kite 使用 go 从我担任软件工程师的第一天起,我总是听到很多方面的相同要求: " 我们希望所有内容都可配置,我们希望在运行时更改所有内容,我们希望有一个可视化工具来应用所有这些逻辑,以 ...

  9. 使用Apache Storm和Apache Ignite进行复杂的事件处理(CEP)

    在本文中, "使用Apache Ignite进行高性能内存计算"一书的作者将讨论使用Apache Strom和Apache Ignite进行复杂的事件处理. 本文的一部分摘自 书 ...

最新文章

  1. 进程间通信IPC之--共享内存
  2. python程序员面试宝典 勘误_《前端面试江湖》勘误合集(二)
  3. 粒子群优化RBF神经网络源码程序
  4. R语言实战应用精讲50篇(三)-多重线性回归系列之模型评估与诊断应用案例
  5. 用友为什么要进军PLM市场
  6. MathExam任务一
  7. 我的MIDAS中间层服务器运行效果图
  8. java的websocket_java 实现websocket的两种方式实例详解
  9. 如何修复无效的目标版本:Maven Build中的1.7、1.8、1.9或1.10错误
  10. 【电路补习笔记】5、三极管的参数与选型
  11. 如何绘制逻辑图 — 7.逻辑的表达:业务逻辑
  12. vant-UI组件初使用:浅谈 - 解说篇
  13. 1-4 多文档界面处理(2)
  14. Eclipse中 *.properties 文件编码设置
  15. VSCode插件Code Runner用于C++
  16. ORA-01438:value larger than specified precision allowed for this column
  17. MongoDB实验练习题
  18. Contents mismatch at: 08000000H (Flash=FFH Required=00H) ! Too many errors to display !
  19. 一个女孩写给一个男孩子的信
  20. 程序设计入门——C语言 翁恺 第1周编程练习

热门文章

  1. openCV+ASM+LBP+Gabor实现人脸识别(GT人脸库)
  2. Cordova WP8 插件开发
  3. CentOS 6.4 中yum命令安装php5.2.17
  4. ruby中数组的常用函数
  5. 用OFFICE 2007发送的文章
  6. 一个用户故事的样例(极限编程)
  7. No New-Net
  8. 论文解读——Improving Object Detection With One Line of Code
  9. 基础算法 -- 贪心算法
  10. mysql索引b树和hash_B树索引和Hash索引的应用场景和区别(转载)