storm入门——本地模式helloworld
创建maven项目,在pom.xml中加入以下配置:
<dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><type>jar</type><version>0.9.3-rc1</version></dependency>
创建SimpleSpout类用于获取数据流:
1 package com.hirain.storm.helloworld; 2 3 import java.util.Map; 4 import java.util.Random; 5 6 import backtype.storm.spout.SpoutOutputCollector; 7 import backtype.storm.task.TopologyContext; 8 import backtype.storm.topology.OutputFieldsDeclarer; 9 import backtype.storm.topology.base.BaseRichSpout; 10 import backtype.storm.tuple.Fields; 11 import backtype.storm.tuple.Values; 12 13 public class SimpleSpout extends BaseRichSpout{ 14 15 /** 16 * 17 */ 18 private static final long serialVersionUID = 1L; 19 20 //用来发射数据的工具类 21 private SpoutOutputCollector collector; 22 23 private static String[] info = new String[]{ 24 "comaple\t,12424,44w46,654,12424,44w46,654,", 25 "lisi\t,435435,6537,12424,44w46,654,", 26 "lipeng\t,45735,6757,12424,44w46,654,", 27 "hujintao\t,45735,6757,12424,44w46,654,", 28 "jiangmin\t,23545,6457,2455,7576,qr44453", 29 "beijing\t,435435,6537,12424,44w46,654,", 30 "xiaoming\t,46654,8579,w3675,85877,077998,", 31 "xiaozhang\t,9789,788,97978,656,345235,09889,", 32 "ceo\t,46654,8579,w3675,85877,077998,", 33 "cto\t,46654,8579,w3675,85877,077998,", 34 "zhansan\t,46654,8579,w3675,85877,077998,"}; 35 36 Random random=new Random(); 37 38 39 /** 40 * 在SpoutTracker类中被调用,每调用一次就可以向storm集群中发射一条数据(一个tuple元组),该方法会被不停的调用 41 */ 42 public void nextTuple() { 43 try { 44 String msg = info[random.nextInt(11)]; 45 // 调用发射方法 46 collector.emit(new Values(msg)); 47 // 模拟等待100ms 48 Thread.sleep(100); 49 } catch (InterruptedException e) { 50 e.printStackTrace(); 51 } 52 } 53 /** 54 * 初始化collector 55 */ 56 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 57 this.collector = collector; 58 59 } 60 61 62 /** 63 * 定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。 64 * 该declarer变量有很大作用,我们还可以调用declarer.declareStream();来定义stramId,该id可以用来定义更加复杂的流拓扑结构 65 */ 66 public void declareOutputFields(OutputFieldsDeclarer declarer) { 67 declarer.declare(new Fields("source")); //collector.emit(new Values(msg));参数要对应 68 } 69 70 }
创建SimpleBolt类,用于处理数据:
1 package com.hirain.storm.helloworld; 2 3 import backtype.storm.topology.BasicOutputCollector; 4 import backtype.storm.topology.OutputFieldsDeclarer; 5 import backtype.storm.topology.base.BaseBasicBolt; 6 import backtype.storm.tuple.Fields; 7 import backtype.storm.tuple.Tuple; 8 import backtype.storm.tuple.Values; 9 10 11 12 public class SimpleBolt extends BaseBasicBolt { 13 14 /** 15 * 16 */ 17 private static final long serialVersionUID = 1L; 18 19 public void execute(Tuple input,BasicOutputCollector collector) { 20 try { 21 String msg = input.getString(0); 22 if (msg != null){ 23 //System.out.println("msg="+msg); 24 collector.emit(new Values(msg + "msg is processed!")); 25 } 26 27 } catch (Exception e) { 28 e.printStackTrace(); 29 } 30 31 } 32 33 public void declareOutputFields( 34 OutputFieldsDeclarer declarer) { 35 declarer.declare(new Fields("info")); 36 37 } 38 39 }
创建main方法配置storm的topology并启动本地模式运行:
1 package com.hirain.storm.helloworld; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.StormSubmitter; 6 import backtype.storm.topology.TopologyBuilder; 7 8 public class SimpleTopology { 9 10 11 public static void main(String[] args) { 12 try { 13 // 实例化TopologyBuilder类。 14 TopologyBuilder topologyBuilder = new TopologyBuilder(); 15 // 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。 16 topologyBuilder.setSpout("SimpleSpout", new SimpleSpout(), 1); 17 // 设置数据处理节点并分配并发数。指定该节点接收喷发节点的策略为随机方式。 18 topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 3).shuffleGrouping("SimpleSpout"); 19 Config config = new Config(); 20 config.setDebug(true); 21 if (args != null && args.length > 0) { 22 config.setNumWorkers(1); 23 StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology()); 24 } else { 25 // 这里是本地模式下运行的启动代码。 26 config.setMaxTaskParallelism(1); 27 LocalCluster cluster = new LocalCluster(); 28 cluster.submitTopology("simple", config, topologyBuilder.createTopology()); 29 } 30 31 } catch (Exception e) { 32 e.printStackTrace(); 33 } 34 } 35 }
以上为storm的简单的helloworld,仅供参考
转载于:https://www.cnblogs.com/zhangyukun/p/4031066.html
storm入门——本地模式helloworld相关推荐
- Storm入门(三)HelloWorld示例
一.配置开发环境 storm有两种操作模式: 本地模式和远程模式.使用本地模式的时候,你可以在你的本地机器上开发测试你的topology, 一切都在你的本地机器上模拟出来; 用远程模式的时候你提交的t ...
- 在线实时大数据平台Storm本地模式运行的一个小发现
1.现象:生产中分别部署了两台服务器,独立运行storm,然后拓扑程序提交是本地模式,发现不用启动storm和zookeeper也可以运行: #jps 没有下面进程 QuorumPeerMain ...
- Storm入门到精通(四)---本地实例Demo
单词实时计数 maven项目的结构: 一.Pom.xml [html] view plain copy <project xmlns="http://maven.apache.org/ ...
- storm入门教程 第一章 前言[转]
1.1 实时流计算 互联网从诞生的第一时间起,对世界的最大的改变就是让信息能够实时交互,从而大大加速了各个环节的效率.正因为大家对信息实时响应.实时交互的需求,软件行业除了个人操作系统之外,数据库 ...
- Storm入门之第一章
原书下载地址 译者:吴京润 编辑:方腾飞 译者注:本文翻译自<Getting Started With Storm>,本书中所有Storm相关术语都用斜体英文表示. 这些术语的字面意义 ...
- Storm入门(一)原理介绍
问题导读: 1.hadoop有master与slave,Storm与之对应的节点是什么? 2.Storm控制节点上面运行一个后台程序被称之为什么? 3.Supervisor的作用是什么? 4.Topo ...
- strom-1.1.0模拟单词统计功能,Spout编写,Bolt编写,TopologyDriver编写,本地模式运行,集群模式运行,集群模式下看输出结果
统计文本中的单词出现的频率,其中文本内容如下: 创建项目 项目结构如下: 创建pom.xml,代码如下: <?xml version="1.0" encoding=" ...
- java jni helloword_JNI入门教程之HelloWorld篇
JNI入门教程之HelloWorld篇 来源:互联网 宽屏版 评论 2008-05-31 09:07:11 本文讲述如何使用JNI技术实现HelloWorld,目的是让读者熟悉JNI的机制并编写第 ...
- 《Storm入门》中文版
本文翻译自<Getting Started With Storm>译者:吴京润 编辑:郭蕾 方腾飞 本书的译文仅限于学习和研究之用,没有原作者和译者的授权不能用于商业用途. 译者序 ...
最新文章
- pandas.read_csv(path_features_known_csv, header=None)的用法
- IC卡读卡器web开发,支持IE,Chrome,Firefox,Safari,Opera等主流浏览 器
- 通过jquery获取td下的input标签的值,并且改变onclick的参数值
- MOQ TIP1:简介加基础
- jQuery-DOM节点插入总结
- 深入理解Java:内省(Introspector)
- ProcessOnLoading
- dw cs 5 安装失败解决方案
- 熊猫烧香病毒样本分析
- Pygame安装方法(Windows10, Python-3.7.2)
- 一个例子说明贝叶斯定理
- 如何不用绿幕,从视频中移除背景?
- 第一周-2.3成绩排序
- xcode 使用xparse,xccov解析xcresult文件,查看代码覆盖率,导出日志,提取附件等
- linux打开浏览器密码取消,Deepin下打开谷歌chrome浏览器提示解锁登录密钥环的解决方法...
- 银行提供了整存整取定期存储业务,其存取分为一年、二年、三年、五年,到期凭存单支取本息
- 使用ctrl+alt+delete没有任务管理器选项,鼠标右键点击任务栏显示任务管理器不可用
- html网页转换swf格式,[转]html网页 swf播放器使用代码
- 吉他谱——浪花一朵朵
- 【Linux】ifconfig命令详解
热门文章
- 眼科裂隙灯是否伤眼?
- hive的udf,udaf,udtf各自依賴兩種class(转载+分析整理)
- python中使用ZADD方法报错AttributeError: 'int' object has no attribute 'items'
- HTTP 与HTTPS的结构
- winowsformshost 的构造函数执行符合指定的绑定约束的_C# 应该允许为 struct 定义无参构造函数...
- Disruptor并发框架--学习笔记
- 国内视频云市场转入整合阶段
- 网络分流器-网络分流器-网络安全评估探讨
- java使用jeids实现redis2.6的list操作(4)
- 如何正确的在一个循环中删除ArrayList中的元素。