一、配置开发环境

storm有两种操作模式: 本地模式和远程模式。使用本地模式的时候,你可以在你的本地机器上开发测试你的topology, 一切都在你的本地机器上模拟出来; 用远程模式的时候你提交的topology会在一个集群的机器上执行。

建议使用maven,只需要加上storm的依赖就可以了。

<dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.1.0</version><scope>provided</scope></dependency>

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.ljh.storm</groupId><artifactId>storm-helloworld</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>storm-helloworld</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.1.0</version><scope>provided</scope></dependency></dependencies><build><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>cn.ljh.storm.helloworld.ExclamationTopology</mainClass></manifest></archive></configuration></plugin></plugins></build>
</project>

二、HelloWorld关联代码

ExclamationTopology.java

package cn.ljh.storm.helloworld;import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;public class ExclamationTopology {public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("word", new TestWordSpout(), 1);builder.setBolt("exclaim", new ExclamationBolt(), 1).shuffleGrouping("word");builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("exclaim");Config conf = new Config();conf.setDebug(true);if (args != null && args.length > 0) {conf.setNumWorkers(3);StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());}else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("test3", conf, builder.createTopology());Utils.sleep(20000);cluster.killTopology("test3");cluster.shutdown();}}
}

TestWordSpout.java

package cn.ljh.storm.helloworld;import org.apache.storm.topology.OutputFieldsDeclarer;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class TestWordSpout extends BaseRichSpout {public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);SpoutOutputCollector _collector;public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {_collector = collector;}public void nextTuple() {Utils.sleep(100);final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};final Random rand = new Random();final String word = words[rand.nextInt(words.length)];_collector.emit(new Values(word));}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}
}

ExclamationBolt.java

package cn.ljh.storm.helloworld;import java.util.Map;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;public class ExclamationBolt extends BaseRichBolt {OutputCollector _collector;public void prepare(Map conf, TopologyContext context, OutputCollector collector) {_collector = collector;}public void execute(Tuple tuple) {_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));_collector.ack(tuple);}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}

PrintBolt.java

package cn.ljh.storm.helloworld;import java.util.Map;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class PrintBolt extends BaseRichBolt {private static Logger LOG = LoggerFactory.getLogger(PrintBolt.class);OutputCollector _collector;public void prepare(Map conf, TopologyContext context, OutputCollector collector) {_collector = collector;}public void execute(Tuple tuple) {LOG.info(tuple.getString(0) + " Hello World!");_collector.ack(tuple);}public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

三、实际运行

storm有本地模式和远程模式。

1、本地模式

本地模式一般用于测试和开发阶段,直接在Eclipse执行ExclamationTopology的main函数进行。

本地模式的代码中有设置睡眠时间,到时间后主动kill topoloyg。

Utils.sleep(20000);

开始设置的时间是10S,运行log中没有期待的输出,反而出现以下错误。

org.apache.storm.shade.org.apache.zookeeper.server.ServerCnxn$EndOfStreamException: Unable to read additional data from client sessionid 
    0x15c8a2872ac000f, likely client has closed socketat org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) [storm-core-1.1.0.jar:1.1.0]at org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) [storm-core-1.1.0.jar:1.1.0]at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]

后面设置时间为20S,运行log中也有上面错误,但是有期待的输出。

原因是机器比较慢,还没初始化完就到时间跳出了,所以把睡眠时间设置大些。

2、远程模式

集群模式需要先创建一个包含程序代码以及代码所依赖的依赖包的jar包(有关storm的jar包不用包括, 这些jar包会在工作节点上自动被添加到classpath里面去)。如果使用maven, 那么插件:Maven Assembly Plugin可以帮你打包,只要把下面的配置加入pom.xml。

<plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>cn.ljh.storm.helloworld.ExclamationTopology</mainClass></manifest></archive></configuration></plugin>

然后运行mvn assembly:assembly就可以打包了.

(1)用storm提交topology

storm jar storm-helloworld-0.0.1-SNAPSHOT-jar-with-dependencies.jar cn.ljh.storm.helloworld.ExclamationTopology ExclamationTest

运行提交命令后,出现如下log,说明提交成功。

查看集群的进程jps,两个Supervisor节点出现了worker进程

在Nimbus节点的/usr/local/storm/data/nimbus/inbox下面有提交的jar

UI界面显示提交topology

(2)终止一个topology

要终止一个topology, 执行:

storm kill {stormname}

其中{stormname}是提交topology给storm集群的时候指定的名字。

storm不会马上终止topology。相反,它会先终止所有的spout,让它们不再发射任何新的tuple, storm会等Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS秒之后才杀掉所有的工作进程。这会给topology足够的时 间来完成所有我们执行storm kill命令的时候还没完成的tuple。

(3)更新一个运行中的topology

为了更新一个正在运行的topology, 唯一的选择是杀掉正在运行的topology然后重新提交一个新的。

至此HelloWorld示例完成。

四、常见配置

有很多topology级的配置可以设。 以”TOPOLOGY”打头的配置是topology级别的配置,可以覆盖全局级别的配置。下面是一些比较常见的:

1)Config.TOPOLOGY_WORKER设置:  这个设置用多少个工作进程来执行这个topology。比如,如果你把它设置成25, 那么集群里面一共会有25个java进程来执行这个topology的所有task。如果你的这个topology里面所有组件加起来一共有150的并行 度,那么每个进程里面会有6个线程(150 / 25 = 6)。

2)Config.TOPOLOGY_ACKERS: 这个配置设置acker线程的数目。Ackers是Storm的可靠性API的一部分。

3)Config.TOPOLOGY_MAX_SPOUT_PENDING:  这个设置一个spout task上面最多有多少个没有处理的tuple(没有ack/failed)回复, 我们推荐你设置这个配置,以防止tuple队列爆掉。

4)Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS: 这个配置storm的tuple的超时时间  – 超过这个时间的tuple被认为处理失败了。这个设置的默认设置是30秒,对于大多数的topology都已经足够了。

5)Config.TOPOLOGY_SERIALIZATIONS: 为了在你的tuple里面使用自定义类型,你可以用这个配置注册自定义serializer。

转载于:https://www.cnblogs.com/hd3013779515/p/6965311.html

Storm入门(三)HelloWorld示例相关推荐

  1. Struts2 入门教程 HelloWorld示例

    一.创建项目引入jar包 登录https://struts.apache.org/download下载jar包 从这个路径下拷贝下图的jar包:\struts-2.5.20\lib 代码实现 Web. ...

  2. mongoDB 入门指南、示例

    http://www.cnblogs.com/hoojo/archive/2011/06/01/2066426.html mongoDB 入门指南.示例 上一篇:简单介绍mongoDB 一.准备工作 ...

  3. 脑残式网络编程入门(三):HTTP协议必知必会的一些知识

    为什么80%的码农都做不了架构师?>>>    本文原作者:"竹千代",原文由"玉刚说"写作平台提供写作赞助,原文版权归"玉刚说&q ...

  4. Koa入门教程之示例应用

    Koa入门教程之示例应用 Koa范例 一个包含一些小示例的存储库,这些示例说明了如何使用Koa创建Web应用程序和其他HTTP服务器. 源码地址 https://github.com/koajs/ex ...

  5. 《Storm入门》中文版

    本文翻译自<Getting Started With Storm>译者:吴京润    编辑:郭蕾 方腾飞 本书的译文仅限于学习和研究之用,没有原作者和译者的授权不能用于商业用途. 译者序 ...

  6. vue(vue-cli+vue-router)+babel+webpack项目搭建入门(三)

    vue(vue-cli+vue-router)+babel+webpack项目搭建入门<三> 本系列文章将介绍基于vue+webpack的前端项目的构建过程.文章分为四章内容,第一章介绍开 ...

  7. redis入门(三)

    文章目录 @[toc] redis入门(三) 目录 前言 事务 原理 Lua脚本 安装 脚本命令 EVAL EVALSHA lua和redis互操作 SCRIPT EXISTS SCRIPT FLUS ...

  8. cala开发编程入门Hello World示例

    Scala开发编程入门Hello World示例 1.下载Scala http://www.scala-lang.org/downloads Windows scala-2.9.1.final.zip ...

  9. SpringBoot——入门(HelloWorld和探究HelloWorld)

    一.简介 Spring Boot 是由 Pivotal 团队提供的全新框架,其设计目的是用来简化新 Spring 应用的初始搭建以及开发过程.该框架使用了特定的方式来进行配置,从而使开发人员不再需要定 ...

最新文章

  1. Android 使用 setImageResource 清空图片
  2. Banner图的播放
  3. debugger vsm_ProteusVSM常见问题解答
  4. MySQL / 各种锁
  5. data类型的Url的格式
  6. ipa在线安装搭建_三种越狱工具安装方法
  7. 灵魂一问-如何彻底防止APK反编译?成功定级腾讯T3-2
  8. oracle9i 全库备份,Windows下Oracle9i数据库文件如何自动备份?
  9. MySQL存储过程(五)——存储过程查看、删除和修改
  10. 简单理解下内存的几大区域
  11. 新建文本html,创建邮件模板时html内容和文本内容哪种好
  12. web测试点和app测试点
  13. python写的串口助手_Python实现的简单的单片机串口助手程序
  14. ThoughtWorks思特沃克2018校园招聘之春招家庭作业 - 无人机
  15. Linux 查看网络流量 iftop
  16. 成长与发展---怎么给博士军团当好“博导”?(PL)
  17. php+日期周几,如何判断php一个日期是周几
  18. Android车载导航的一些困境
  19. Android Kotlin之Flow数据流
  20. 【HAOI2014】贴海报

热门文章

  1. artDIalog 弹出层
  2. Android-获取系统的应用程序的信息
  3. 原创-WINDOWS SERVER 2008 WEB服务器安装配置
  4. LightOj 1027 A Dangerous Maze
  5. Spring AOP 前置通知
  6. eclipse svn新增文件不显示在文件列表,只有修改文件可以提交!
  7. BZOJ 2442: [Usaco2011 Open]修剪草坪( dp )
  8. 请问android直接post请求登录地址成功后,webview还是现实登录界面
  9. 操作数组的常用方式二-----排序、查找
  10. asp.net 去除字符串右侧的最后一个字符