不多说,直接上干货!

    Hadoop 上运行的是 MapReduce 作业,而在 Storm 上运行的是拓扑 Topology,这两者之间是非常不同的。一个关键的区别是:一个MapReduce 作业最终会结束,而一个 Topology 拓扑会永远运行(除非手动杀掉)

Topology拓扑

  从字面上解释Topology,就是网络拓扑,是指用传输介质互连各种设备的物理布局,是构成网络的成员间特定的物理的(即真实的),或者逻辑的,即虚拟的排列方式。拓扑是一种不考虑物体的大小、形状等物理属性,而只使用点或者线描述多个物体实际位置与关系的抽象表示方法。拓扑不关心事物的细节,也不在乎相互的比例关系,只是以图的形式表示一定范围内多个物体之间的相互关系。从Storm角度考虑,它不是网络拓扑,但是又类似于网络拓扑的结构,所以取名Topology。
那么Storm的Topology指的是类似于网络拓扑图的一种虚拟结构。Storm的拓扑Topology类似于MapReduce任务,一个关键的区别是MapReduce任务运行一段时间后最终会完成,而Storm拓扑一直运行(直到杀掉它)。一个拓扑是由Spout和Bolt组成的图,Spout和Bolt之间通过流分组连接起来。图1形象地描述了Topology中的Spout和Bolt之间的关系。

               

                      图1    Spout和Bolt的关系图

  通过对图1的理解可以看出,Topology是由Spout、Bolt、数据载体Tuple等构成的一定规则的网络拓扑图。Storm提供了TopologyBuilder类来创建Topology。打个比方,TopologyBuilder是Topology的骨架,Spout、Bolt是Topology的肉和血液。TopologyBuilder类的主要方法如图2所示。

            

                    图 2    TopologyBuilder类的主要方法

   TopologyBuilder实际上是封装了Topology的Thrift接口,也就是说Topology实际上是通过Thrift定义的一个结构,TopologyBuilder将这个对象建立起来,然后Nimbus实际上运行一个Thrift服务器,用于接收用户提交的结构。由于采用Thrift实现,所以用户可以用其他语言建立Topology,这样就提供了比较方便的多语言操作支持。

 Topology实例
  下面从一个简单的例子开始介绍Topology的构建和定义,通过此案例能够基本理解Storm,并且能够构建一个简单的Topology。本实例使用Topology来统计一个句子中单词出现的频率。下面详细介绍如何设计和运行Topology,以及一些注意事项。

  1. 设计Topology结构
  在编写代码之前,首先要设计Topology。在理清数据处理逻辑之后,创建Topology就非常简单了。统计单词词频的Topology的大致结构如图3所示。可以将Topology分成3个部分:一是数据源KafkaSpout,负责发送语句;二是数据处理者SplitSentenceBolt,负责切分语句;三是数据再处理者WordCountBolt,负责累加单词的频率

                         图 3      Topology的结构

 2. 设计数据流
  设计的Topology是从KafkaSpout中读取句子,并把句子划分成单词,然后汇总每个单词出现的次数,一个Bolt负责获取句子后划分成单词,一个Bolt分别对应计算每一个单词出现的次数,然后Tuple在Spout和Bolt之间传递,如图3-15所示。

          

                       图4    Topology内部数据流图

  

          

   3. 代码实现
(1)构建Maven环境
  为了开发Topology,需要把Storm相关的JAR包添加到CLASSPATH中,要么手动添加所有相关的JAR包,要么使用Maven来管理所有的依赖。Storm的JAR包发布在Clojars(一个Maven库),如果使用Maven,需要把下面的配置代码添加在项目的pom.xml中。

<repository><id>clojars.org</id><url>http:// clojars.org/repo</url>
</repository>
<dependency><groupId>storm</groupId><artifactId>storm</artifactId><version>0.8.2</version><scope>test</scope>
</dependency>

(2)定义Topology
  定义Topology的内部逻辑,代码如下:

  

SpoutConf?ig kafkaConf?ig = new SpoutConf?ig(brokerHosts, "storm-sentence", "", "storm");
kafkaConf?ig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(1,new KafkaSpout(kafkaConf?ig), 10);// id, spout, parallelism_hint
builder.setBolt(2, new SplitSentence(), 10) .shuffleGrouping(1);
builder.setBolt(3, new WordCount(), 20) .f?ieldsGrouping(2, new Fields("word"));

  声明的Topology的Spout是从Kafka中读取句子,Spout用setSpout方法插入一个独特的ID到Topology中。Topology中的每个节点必须给予一个ID,ID是由其他Bolt用于订阅该节点的输出流,KafkaSpout在Topology中的ID为1。
setBolt用于在Topology中插入Bolt。在Topology中定义的第一个Bolt是切割句子的Bolt,该Bolt(即SplitSentence)将句子流转成单词流;setBolt的最后一个参数是Bolt的并行量,因为SplitSentence是10个并发,所以在Storm集群中有10个线程并行执行。当Topology遇到性能瓶颈时,可以通过增加Bolt并行数量来解决。setBolt方法返回一个对象,用来定义Bolt的输入。例如,SplitSentence约定使用组件ID为1的输出流,1是指已经定义的KafkaSpout。SplitSentence会消耗KafkaSpout发出的每一个元组。
  SplitSentence的关键方法是execute,它将句子拆分成单词,并发出每个单词作为新的元组。另一个重要的方法是declareOutputFields,其中声明了Bolt输出元组的架构,这个方法声明它发出一个域为“word”的元组。
SplitSentence对句子中的每个单词发射一个新的Tuple,WordCount在内存中维护每个单词出现次数的映射,WordCount每收到一个单词,都会更新内存中的统计状态。

  SplitSentence的实现代码如下:

public class SplitSentence implements IBasicBolt{public void prepare(Map conf, TopologyContext context) {}public void execute(Tuple tuple, BasicOutputCollector collector) {String sentence = tuple.getString(0);for(String word: sentence.split(" ")) {collector.emit(new Values(word));}}public void cleanup() {}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}

  WordCount的实现代码如下:

public class WordCount implements IBasicBolt {private Map<String, Integer> _counts = new HashMap<String, Integer>();public void prepare(Map conf, TopologyContext context) {}public void execute(Tuple tuple, BasicOutputCollector collector) {String word = tuple.getString(0);int count;if(_counts.containsKey(word)) {count = _counts.get(word);} else {count = 0;}count++;_counts.put(word, count);collector.emit(new Values(word, count));}public void cleanup() {}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}
}

  4. Topology运行
  Topology运行有两种模式:本地模式和分布式模式。这两种模式的接口区别很大,使用场景也不相同。另外,下面还将介绍Topology的运行流程、方法调用过程以及并行度等。
  1. Topology运行模式
  Topology的运行模式可以分为本地模式和分布式模式,模式可以在配置文件中和代码中设置。
  (1)本地模式
  Storm用一个进程中的线程来模拟所有的Spout和Bolt。本地模式对开发和测试来说比较有用。storm-starter中的Topology是以本地模式运行的,可以看到Topology中的每一个组件发射的消息。示例代码如下:

Config conf = new Conf?ig();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();

  首先,这段代码通过定义一个LocalCluster对象来定义一个进程内的集群。提交Topology给这个虚拟的集群和提交Topology给分布式集群相同。通过调用submitTopology方法来提交Topology,共有3个参数:要运行的Topology的名称、一个配置对象,以及要运行的Topology本身。
  Topology是以名称来唯一区别的,可以用这个名称来杀掉该Topology,而且必须显式地杀掉,否则它会一直运行。

conf对象可以配置内容很多,下面两个是最常见的:
  TOPOLOGY_WORKERS (setNumWorkers):定义希望集群分配多少个工作进程来执行这个Topology。Topology中的每个组件都需要线程来执行。每个组件到底用多少个线程是通过setBolt和setSpout来指定的。这些线程都运行在工作进程中。每一个工作进程包含一些节点的一些工作线程。例如,指定300个线程,60个进程,那么每个工作进程中要执行6个线程,而这6个线程可能属于不同的组件(Spout或Bolt)。可以调整每个组件的并行度以及这些线程所在的进程数量来调整Topology的性能。
  TOPOLOGY_DEBUG (setDebug):当它设置为true时,Storm会记录下每个组件发射的每条消息。这在本地环境调试Topology时很有用,但是在生产环境中如果这么做,则会影响性能。

(2)分布式模式
  Storm由若干节点组成。提交Topology给Nimbus时,也会提交Topology代码。Nimbus负责分发代码和给Topolgoy分配工作进程。如果一个工作进程挂掉了,Nimbus节点会将其重新分配到其他节点。分布式模式提交拓扑的代码如下:

StormSubmitter.submitTopology(topologyName, topologyConf?ig,  builder.createTopology());

  在Storm代码编写完成之后,需要打包成JAR包放到Nimbus中运行。在打包时,不需要把依赖的JAR都打进去,否则运行时会出现重复的配置文件错误导致Topology无法运行,因为在Topology运行之前,会加载本地的storm.yaml配置文件。
  在Nimbus运行的命令如下。

storm  jar  StormTopology.jar  mainclass  args

  2. Topology运行流程
  在Topology的运行流程中,有几点需要特别说明。
  1)提交Topology后,Storm会把代码先存放到Nimbus节点的inbox目录下;之后,把当前Storm运行的配置生成一个stormconf.ser文件放到Nimbus节点的stormdist目录中,此目录中同时还有序列化之后的Topology代码文件。
  2)在设定Topology关联的Spout和Bolt时,可以同时设置当前Spout和Bolt的Executor和Task数量。在默认情况下,一个Topology的Task总和与Executor的总和一致。之后,系统根据Worker的数量,尽量将这些Task平均分配到不同的Worker上执行。Worker在哪个Supervisor节点上运行是由Storm本身决定的。
  3)在任务分配好之后,Nimbus节点将任务的信息提交到ZooKeeper集群,同时在ZooKeeper集群中有Workerbeats,这里存储了当前Topology所有Worker进程的心跳信息。
  4)Supervisor节点不断轮询ZooKeeper集群,在ZooKeeper的assignments中保存了所有Topology的任务分配信息、代码存储目录、任务之间的关联关系等,Supervisor通过轮询此节点的内容来领取自己的任务,启动Worker进程运行。
  5)一个Topology运行之后,不断通过Spout来发送流,通过Bolt来不断处理接收到的流,流是无界的。最后一步会不间断地执行,除非手动结束该Topology。

  3. Topology的方法调用流程
  Topology中的流处理时,调用方法的过程如图3-16所示。
  Topology方法调用的过程有如下一些要点:
  1)每个组件(Spout或者Bolt)的构造方法和declareOutputFields方法都只被调用一次。
  2)open方法和prepare方法被调用多次。在入口函数中设定的setSpout或者setBolt中的并行度参数是指Executor的数量,是负责运行组件中的Task的线程数量,此数量是多少,上述两个方法就会被调用多少次,在每个Executor运行时调用一次。
  3)nextTuple方法和execute方法是一直运行的,nextTuple方法不断发射Tuple,Bolt的execute不断接收Tuple进行处理。只有这样不断地运行,才会产生无界的Tuple流,体现实时性。这类似于Java线程的run方法。
  4)提交一个Topology之后,Storm创建Spout/Bolt实例并进行序列化。之后,将序列化的组件发送给所有任务所在的节点(即Supervisor节点),在每一个任务上反序列化组件。
  5)Spout和Bolt之间、Bolt和Bolt之间的通信,通过ZeroMQ的消息队列实现。
  6)图3-16没有列出ack和fail方法,在一个Tuple成功处理之后,需要调用ack方法来标记成功,否则调用fail方法标记失败,重新处理该Tuple。

             

                        图5    Topology流处理过程图      

    

  4. Topology并行度
  在Topology的执行单元中,有几个和并行度相关的概念。
  (1)Worker
  每个Worker都属于一个特定的Topology,每个Supervisor节点的Worker可以有多个,每个Worker使用一个单独的端口,Worker对Topology中的每个组件运行一个或者多个Executor线程来提供Task的执行服务。
  (2)Executor
  Executor是产生于Worker进程内部的线程,会执行同一个组件的一个或者多个Task。
  (3)Task
  实际的数据处理由Task完成。在Topology的生命周期中,每个组件的Task数量不会变化,而Executor的数量却不一定。Executor数量小于等于Task的数量,在默认情况下,二者是相等的。
  在运行一个Topology时,可以根据具体的情况来设置不同数量的Worker、Task、Executor,设置的位置也可以在多个地方。
  1)Worker设置:可以设置yaml中的topology.workers属性。在代码中通过Conf?ig的setNumWorkers方法设定。
  2)Executor设置:通过Topology的入口类中的setBolt、setSpout方法的最后一个参数指定,如果不指定,则使用默认值1。
  3)Task设置:在默认情况下,和executor数量一致。在代码中通过TopologyBuilder的setNumTasks方法设定具体某个组件的Task数量。

  5. 终止Topology
  在Nimbus启动的节点上,使用下面的命令来终止一个Topology的运行。
  storm kill topologyName
  执行kill之后,通过UI界面查看Topology状态,其先变成KILLED状态,清理完本地目录和ZooKeeper集群中与当前Topology相关的信息之后,此Topology将彻底消失。

  6.Topology跟踪
  提交Topology后,可以在Storm UI界面查看整个Topology运行的过程。

   

         

         

  如下

Storm概念学习系列之Topology拓扑相关推荐

  1. Storm概念学习系列之storm-starter项目(完整版)(博主推荐)

    这是书籍<从零开始学Storm>赵必厦 2014年出版的配套代码! storm-starter项目包含使用storm的各种各样的例子.项目托管在GitHub上面,其网址为: http:// ...

  2. Storm概念学习系列之storm的特性

    不多说,直接上干货! storm的特性 Storm 是一个开源的分布式实时计算系统,可以简单.可靠地处理大量的数据流. Storm支持水平扩展,具有高容错性,保证每个消息都会得到处理,而且处理速度很快 ...

  3. Storm概念学习系列之Stream消息流 和 Stream Grouping 消息流组

    不多说,直接上干货! Stream消息流是Storm中最关键的抽象,是一个没有边界的Tuple序列. Stream Grouping 消息流组是用来定义一个流如何分配到Tuple到Bolt. Stre ...

  4. Storm概念学习系列之Task任务

    每一个Spout/Bolt的线程称为一个Task. Task任务 Task是运行Spout或Bolt的单元,每一个Spout/Bolt的线程称为一个Task. 在Storm 0.8及之后的版本中,Ta ...

  5. storm安装笔记以及提交拓扑任务

    Storm -- Distributed and fault-tolerant realtime computation 这是一个分布式的.容错的实时计算系统 把Storm依赖组件的版本贴出来供各位参 ...

  6. Zookeeper概念学习系列之分布式事务

    不多说,直接上干货! 初学者来说,肯定会有这么一个疑问.为什么会在zookeeper里牵扯到分布式事务? zookeeper到底是什么? zookeeper实际上是yahoo开发的,用于分布式中一致性 ...

  7. Hadoop HDFS概念学习系列之熟练掌握HDFS的Shell访问(十五)

    调用文件系统(FS)Shell命令应使用 $HADOOP_HOME/bin/hadoop fs  *** 的形式!!! 所有的FS Shell命令使用URI路径作为参数. URI格式是scheme:/ ...

  8. Beam概念学习系列之Pipeline 数据处理流水线

    不多说,直接上干货! Pipeline 数据处理流水线 Pipeline将Source PCollection ParDo.Sink组织在一起形成了一个完整的数据处理的过程. Beam概念学习系列之P ...

  9. (转)STORM启动与部署TOPOLOGY

    STORM启动与部署TOPOLOGY 启动ZOOPKEEPER zkServer.sh start 启动NIMBUS storm nimbus & 启动SUPERVISOR storm sup ...

最新文章

  1. Extjs 基础篇—— Function基础
  2. R语言tidyr包pivot_longer函数、pivot_wider函数数据表变换实战(长表到宽表、宽表到长表)
  3. Visual Studio中没有为此解决方案配置选中要生成的项目
  4. 旧文 | 舒迅:产品经理必读的九步法
  5. 遇到oracle错误1012,跟着感觉走,解决安装RAC过程中OCR完整性错误,待深入剖析...
  6. 内存映射和独立存贮器
  7. cuteftp 9 显示中文乱码
  8. 维护和维修涉密计算机网络 必须严格采取,安全保密管理员主要负责涉密网络的日常安全保密管理工作,包括()。A.涉密网络的日常运行维护工 - 普法考试题库问答...
  9. 零基础写java网络爬虫
  10. blackscholes matlab,基于MATLAB的Black-Scholes-Merton欧式期权定价模型的计算研究
  11. 查看知乎404问题解决办法
  12. 数据建模 Database Modeling:概念 (Conceptual) vs 逻辑 (Logical) vs 物理数据 (Physical) 模型
  13. Aspose.3D使用教程:使用 Java 将 FBX 转换为 RVM 或 RVM 转换为 FBX 文件
  14. 服务器put请求获取不到参数
  15. 营销理论扫盲贴:4P/4C/4R/4S
  16. termux命令行美化oh my zsh
  17. znpc改版前后网址修改办法
  18. tiri单片机_51单片机的SCON寄存器与C程序解析
  19. iBase4J nginx配置
  20. 用python批量下载modis数据的速度怎么样_批量下载MODIS数据

热门文章

  1. String转换为int类型
  2. OpenResty+Lua+redis+mysql实现高性能高可用限流缓存
  3. 零宽断言 python_正则表达式-零宽断言
  4. win7 能下node什么版本_微软从未公开的win10版本,3GB+极度精简,老爷机有救了
  5. jquery 树形框 横_利用jQuery设计横/纵向菜单
  6. cad小插件文字刷_文字狗最佳排版神器 小恐龙公文排版助手Office WPS插件
  7. 计算机网络有哪些技能知识,网络基础知识及操作技能.ppt
  8. B端产品经理,应从哪些方面理解业务?
  9. 方法论:写好一份产品需求的系统化思考模型
  10. 女人的安全感到底是什么?