欢迎转载,转载请注明出处。

介绍TridentTopology的使用,重点分析newDRPCStream和stateQuery的实现机理。

使用TridentTopology进行数据处理的时候,经常会使用State来保存一些状态,这些保存起来的State通过stateQuery来进行查询。问题恰恰在这里产生,即对state进行更新的Stream和尔后进行stateQuery的Stream并非同一个,那么它们之间是如何关联起来的呢。

在TridentTopology中,有一些Processor可能会同处于一个Bolt中,这些Processor形成一个processing chain, 那么Tuple又是如何在这些Processor之间进行传递的呢。

TridentWordCount

编译和运行

lein compile storm.starter.trident.TridentWordCount
java -cp $(lein classpath) storm.starter.trident.TridentWordCount 

main函数

public static void main(String[] args) throws Exception {Config conf = new Config();conf.setMaxSpoutPending(20);if (args.length == 0) {LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("wordCounter", conf, buildTopology(drpc));for (int i = 0; i < 100; i++) {System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));Thread.sleep(1000);}}else {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, buildTopology(null));}} 

buildTopology

 public static StormTopology buildTopology(LocalDRPC drpc) {FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),new Values("how many apples can you eat"), new Values("to be or not to be the person"));spout.setCycle(true);TridentTopology topology = new TridentTopology();TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),new Count(), new Fields("count")).parallelismHint(16);topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields("word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"),new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));return topology.build();} 

示意图

在整个topology中,有两个不同的spout。

运行结果该如何理解

此图有好几个问题

  1. PartitionPersistProcessor和StateQueryProcessor同处于一个bolt,该bolt为SubtopologyBolt
  2. SubtopologyBolt有来自多个不同Stream的输入,根据不同的Streamid找到对应的InitialReceiver
  3. drpcspout在执行的时候,是一直不停的emit消息到SubtopologyBolt,还是发送完一次消息就停止发送

不同的tuple,其sourcestream不一样,根据SourceStream,找到对应的InitialReceiver

    Map<String, InitialReceiver> _roots = new HashMap(); 

状态更新

进行状态更新的Processor名为PartitionPersistProcessor

execute

记录哪些tuple需要进行状态更新

finishBatch

状态真正更新是发生在finishBatch阶段

persistentAggregate

PartitionPersistProcessor

  • SubtopologyBolt::execute

    • PartitionPersistProcessor::finishBatch

      •   _updater::updateState

        • Snapshottable::update

当状态更新的时候,状态查询是否会发生?

状态查询

进行状态查询的Processor名为StateQueryProcessor

execute

finishBatch

查询的时候,首先调用batchRetrieive来获得最新的状态更新结果,再对每个最新的结果使用_function来进行处理。

调用层次

  • SubtopologyBolt::finishBatch

    •   StateQueryProcessor::finishBatch

      • _function.batchRetrieve
      • _function.execute   将处理过的结果发送给下一跳进行处理

消息的传递

TridentTuple

如何决定bolt内部的哪个processor来处理接收到的消息,这个是根据不同的Stream来判断InitialReceiver完成。

当SubtopologyBolt接收到最原始的tuple时,根据streamid找到InitialReceiver后,InitialReceiver在receive函数中作的第一件事情就是根据tuple来创建一个tridenttuple,tridenttuple会被处在同一个SubtopologyBolt中的processor一一处理,处理的结果是保存在tridenttuple和processorcontext中。

ProcessorContext

ProcessorContext记录两个重要的信息,即当前的batchId和batchState.

public class ProcessorContext {public Object batchId;public Object[] state;public ProcessorContext(Object batchId, Object[] state) {this.batchId = batchId;this.state = state;}
}

TridentCollector

tridentcollector在emit的时候将消息由各个TupleReceiver进行处理。目前仅有BridgeReceiver实现了该接口。

BridgeReceiver负责将消息发送给另外的Bolt进行处理。这里说的“另外的Bolt”是指Vanilla Topology中的Bolt.

Apache Storm技术实战之3 -- TridentWordCount相关推荐

  1. Apache Storm技术实战之2 -- BasicDRPCTopology

    欢迎转载,转载请注明出处,徽沪一郎. 本文通过BasicDRPCTopology的实例来分析DRPCTopology在提交的时候, Topology中究竟含有哪些内容? BasicDRPCTopolo ...

  2. Apache Spark技术实战之6 -- spark-submit常见问题及其解决

    除本人同意外,严禁一切转载,徽沪一郎. 概要 编写了独立运行的Spark Application之后,需要将其提交到Spark Cluster中运行,一般会采用spark-submit来进行应用的提交 ...

  3. 《大数据架构和算法实现之路:电商系统的技术实战》——1.5 相关软件:R和Mahout...

    本节书摘来自华章计算机<大数据架构和算法实现之路:电商系统的技术实战>一书中的第1章,第1.5节,作者 黄 申,更多章节内容可以访问云栖社区"华章计算机"公众号查看. ...

  4. Spotify如何对Apache Storm进行规模扩展

    [编者的话]Spotify是一家音乐流媒体服务商,最新的数据显示他们已经有6000万用户.Spotify内部使用Apache Storm来构建实时类系统,包括广告定位.音乐推荐以及数据可视化等.本文来 ...

  5. BigData之Storm:Apache Storm的简介、深入理解、下载、案例应用之详细攻略

    BigData之Storm:Apache Storm的简介.深入理解.下载.案例应用之详细攻略 目录 Apache Storm的简介 Apache Storm的深入理解 1.Storm与hadoop ...

  6. storm apache java_Apache Ignite与Apache Storm(深入)

    Apache Ignite和Apache Storm在很多方面都是两种截然不同的技术 - 特别是因为Storm有一个非常具体的用例,而Ignite在同一个屋檐下有相当多的工具 . 据我了解,Ignit ...

  7. apache ignite_使用Apache Storm和Apache Ignite进行复杂事件处理(CEP)

    apache ignite 在本文中, "使用Apache Ignite进行高性能内存计算"一书的作者将讨论使用Apache Strom和Apache Ignite进行复杂的事件处 ...

  8. kite 使用 go_使用Apache Storm和Kite SDK Morphlines的可配置ETL处理

    kite 使用 go 从我担任软件工程师的第一天起,我总是听到很多方面的相同要求: " 我们希望所有内容都可配置,我们希望在运行时更改所有内容,我们希望有一个可视化工具来应用所有这些逻辑,以 ...

  9. 使用Apache Storm和Apache Ignite进行复杂的事件处理(CEP)

    在本文中, "使用Apache Ignite进行高性能内存计算"一书的作者将讨论使用Apache Strom和Apache Ignite进行复杂的事件处理. 本文的一部分摘自 书 ...

最新文章

  1. 计算机多媒体运用的ppt课件,《计算机多媒体》PPT课件.ppt
  2. python pandas写入数据后保存_python读取MySQL数据使用pandas写入到csv,并保存列名
  3. python算法与数据结构-希尔排序算法
  4. python List中元素两两组合
  5. 动画分析步骤“三步曲”
  6. 广州市岑村教练场考科目二,惊险通过,经验总结
  7. 服务器性能优化和Mysql性能优化
  8. Python中直接查看对象值和使用print()输出的区别
  9. endnote导入参考文献及国标(Chinese standard)
  10. 【扩频通信】基于matlab GUI扩频通信系统仿真【含Matlab源码 772期】
  11. 利用html创建pdf文件
  12. 100套计算机毕设源码+论文 免费分享 【2020最新版】
  13. 1024info .php,GitHub - dingusxp/code1024
  14. 今日头条适配方案_ 今日头条大改版,小程序强势登场
  15. ubuntu DNS修改
  16. hong书网页版x-s、x-t
  17. elementUi中的el-select/el-input去掉border边框
  18. 人工湖对环境温度的调节问题
  19. 第五章:Sharding-JDBC 自定义分片算法
  20. 我们熟悉的106短信的水好深啊

热门文章

  1. 微软ASP.NET站点部署指南(11):部署SQL Server数据库更新
  2. 对事件循环的一点理解
  3. 微软公布测试版Visual Studio for Mac和Visual Studio 2017 for Windows
  4. GIT入门笔记(11)- 多种撤销修改场景和对策--实战练习
  5. 苹果电脑如何读写ntfs格式磁盘
  6. VSS 2005 配置简明手册
  7. inline函数的作用
  8. 使用Word 2007写blog
  9. 1631. 最小体力消耗路径
  10. 单例设计模式共享数据分析、解决,call_once