3. Storm编程框架
实例分析lifeCycle:
1 package cn.itcast.storm.spout; 2 import java.util.Map; 3 import java.util.Random; 4 import org.apache.commons.logging.Log; 5 import org.apache.commons.logging.LogFactory; 6 import backtype.storm.spout.SpoutOutputCollector; 7 import backtype.storm.task.TopologyContext; 8 import backtype.storm.topology.OutputFieldsDeclarer; 9 import backtype.storm.topology.base.BaseRichSpout; 10 import backtype.storm.tuple.Fields; 11 import backtype.storm.tuple.Values; 12 import backtype.storm.utils.Utils; 13 public class RandomWordSpout extends BaseRichSpout { 14 private static final long serialVersionUID = -4287209449750623371L; 15 16 private static final Log log = LogFactory.getLog(RandomWordSpout.class); 17 private SpoutOutputCollector collector; 18 19 private String[] words = new String[]{"storm", "hadoop", "hive", "flume"}; 20 21 private Random random = new Random(); 22 23 public RandomWordSpout() { 24 log.warn("RandomWordSpout constructor method invoked"); 25 } 26 @Override 27 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 28 log.warn("RandomWordSpout open() method invoked"); 29 this.collector = collector; 30 } 31 @Override 32 public void declareOutputFields(OutputFieldsDeclarer declarer) { 33 log.warn("RandomWordSpout declareOutputFields() method invoked"); 34 declarer.declare(new Fields("str")); 35 } 36 @Override 37 public void nextTuple() { 38 log.warn("RandomWordSpout nextTuple() method invoked"); 39 Utils.sleep(500); 40 String str = words[random.nextInt(words.length)]; 41 collector.emit(new Values(str)); 42 } 43 @Override 44 public void activate() { 45 log.warn("RandomWordSpout activate() method invoked"); 46 } 47 @Override 48 public void deactivate() { 49 log.warn("RandomWordSpout deactivate() method invoked"); 50 } 51 }
1 package cn.itcast.storm.bolt; 2 import java.util.Map; 3 import org.apache.commons.logging.Log; 4 import org.apache.commons.logging.LogFactory; 5 import backtype.storm.task.TopologyContext; 6 import backtype.storm.topology.BasicOutputCollector; 7 import backtype.storm.topology.OutputFieldsDeclarer; 8 import backtype.storm.topology.base.BaseBasicBolt; 9 import backtype.storm.tuple.Fields; 10 import backtype.storm.tuple.Tuple; 11 import backtype.storm.tuple.Values; 12 public class TransferBolt extends BaseBasicBolt { 13 private static final long serialVersionUID = 4223708336037089125L; 14 private static final Log log = LogFactory.getLog(TransferBolt.class); 15 16 public TransferBolt() { 17 log.warn("TransferBolt constructor method invoked"); 18 } 19 20 @Override 21 public void prepare(Map stormConf, TopologyContext context) { 22 log.warn("TransferBolt prepare() method invoked"); 23 } 24 @Override 25 public void declareOutputFields(OutputFieldsDeclarer declarer) { 26 log.warn("TransferBolt declareOutputFields() method invoked"); 27 declarer.declare(new Fields("word")); 28 } 29 @Override 30 public void execute(Tuple input, BasicOutputCollector collector) { 31 log.warn("TransferBolt execute() method invoked"); 32 String word = input.getStringByField("str"); 33 collector.emit(new Values(word)); 34 } 35 }
1 package cn.itcast.storm.bolt; 2 import java.io.FileWriter; 3 import java.io.IOException; 4 import java.util.Map; 5 import org.apache.commons.logging.Log; 6 import org.apache.commons.logging.LogFactory; 7 import backtype.storm.task.TopologyContext; 8 import backtype.storm.topology.BasicOutputCollector; 9 import backtype.storm.topology.OutputFieldsDeclarer; 10 import backtype.storm.topology.base.BaseBasicBolt; 11 import backtype.storm.tuple.Tuple; 12 public class WriterBolt extends BaseBasicBolt { 13 private static final long serialVersionUID = -6586283337287975719L; 14 15 private static final Log log = LogFactory.getLog(WriterBolt.class); 16 17 private FileWriter writer = null; 18 19 public WriterBolt() { 20 log.warn("WriterBolt constructor method invoked"); 21 } 22 @Override 23 public void prepare(Map stormConf, TopologyContext context) { 24 log.warn("WriterBolt prepare() method invoked"); 25 try { 26 writer = new FileWriter("/home/" + this); 27 } catch (IOException e) { 28 log.error(e); 29 throw new RuntimeException(e); 30 } 31 } 32 @Override 33 public void declareOutputFields(OutputFieldsDeclarer declarer) { 34 log.warn("WriterBolt declareOutputFields() method invoked"); 35 } 36 37 @Override 38 public void execute(Tuple input, BasicOutputCollector collector) { 39 log.warn("WriterBolt execute() method invoked"); 40 String s = input.getString(0); 41 try { 42 writer.write(s); 43 writer.write("\n"); 44 writer.flush(); 45 } catch (IOException e) { 46 log.error(e); 47 throw new RuntimeException(e); 48 } 49 } 50 }
package cn.itcast.storm.topology; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import backtype.storm.Config; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import cn.itcast.storm.bolt.TransferBolt; import cn.itcast.storm.bolt.WriterBolt; import cn.itcast.storm.spout.RandomWordSpout; public class TopoMain {private static final Log log = LogFactory.getLog(TopoMain.class);public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("random", new RandomWordSpout(), 2);builder.setBolt("transfer", new TransferBolt(), 4).shuffleGrouping("random");builder.setBolt("writer", new WriterBolt(), 4).fieldsGrouping("transfer", new Fields("word"));Config conf = new Config();conf.setNumWorkers(2);conf.setDebug(true);log.warn("submitting topology...");StormSubmitter.submitTopology("life-cycle", conf, builder.createTopology());log.warn("topology submitted !");} }
- Spout方法调用顺势
- declareOutputFields()(调用一次)
- open() (调用一次)
- activate() (调用一次)
- nextTuple() (循环调用 )
- deactivate() (手动调用)
- Bolt方法调用顺序
- declareOutputFields() (调用一次)
- prepare() (调用一次)
- execute() (循环执行)
[root@master work]# storm jar lifeCycle1.jar cn.itcast.storm.topology.TopoMain
Running:/usr/local/jdk/bin/java -client -Dstorm.options=-Dstorm.home=/usr/local/apache-storm-0.9.4-Dstorm.log.dir=/usr/local/apache-storm-0.9.4/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file=-cp /usr/local/apache-storm-0.9.4/lib/clj-time-0.4.1.jar:....-Dstorm.jar=lifeCycle1.jar cn.itcast.storm.topology.TopoMain
464[main] WARN cn.itcast.storm.spout.RandomWordSpout-RandomWordSpout constructor method invoked #初始化对象,执行构造方法
490[main] WARN cn.itcast.storm.bolt.TransferBolt-TransferBolt constructor method invoked
505[main] WARN cn.itcast.storm.bolt.WriterBolt-WriterBolt constructor method invoked
515[main] WARN cn.itcast.storm.topology.TopoMain- submitting topology...
516[main] WARN cn.itcast.storm.bolt.TransferBolt-TransferBolt declareOutputFields()method invoked
906[main] WARN cn.itcast.storm.bolt.WriterBolt-WriterBolt declareOutputFields() method invoked
909[main] WARN cn.itcast.storm.spout.RandomWordSpout-RandomWordSpout declareOutputFields() method invoked
1106[main] INFO backtype.storm.StormSubmitter-Jar not uploaded to master yet.Submitting jar...
1117[main] INFO backtype.storm.StormSubmitter-Uploading topology jar lifeCycle1.jar to assigned location:/tmp/storm/nimbus/inbox/stormjar-13252904-45c2-41e8-8703-957feae2bf27.jar
1361[main] INFO backtype.storm.StormSubmitter-Successfully uploaded topology jar to assigned location:/tmp/storm/nimbus/inbox/stormjar-13252904-45c2-41e8-8703-957feae2bf27.jar
1362[main] INFO backtype.storm.StormSubmitter-Submitting topology life-cycle in distributed mode with conf {"topology.workers":2,"topology.debug":true}
1568[main] INFO backtype.storm.StormSubmitter-Finished submitting topology: life-cycle
1568[main] WARN cn.itcast.storm.topology.TopoMain- topology submitted !
2015-05-16T17:57:18.295+0800 b.s.d.worker [INFO]Worker6ae03c97-dac4-4ef3-9f10-227de1219b16for storm life-cycle-4-1431770222 on 1360b011-2e64-4964
-9f6c-d849db954ff2:6703 has finished loading
2015-05-16T17:57:18.797+0800 b.s.d.executor [INFO]Preparing bolt transfer:(5)
2015-05-16T17:57:18.798+0800 b.s.d.executor [INFO]Preparing bolt writer:(11)
2015-05-16T17:57:18.812+0800 c.i.s.b.WriterBolt[WARN]WriterBolt prepare() method invoked
2015-05-16T17:57:18.813+0800 b.s.d.executor [INFO]Prepared bolt writer:(11)
2015-05-16T17:57:18.820+0800 c.i.s.b.TransferBolt[WARN]TransferBolt prepare() method invoked
2015-05-16T17:57:18.821+0800 b.s.d.executor [INFO]Prepared bolt transfer:(5)
2015-05-16T17:57:18.834+0800 b.s.d.executor [INFO]Preparing bolt __system:(-1)
2015-05-16T17:57:18.834+0800 b.s.d.executor [INFO]Preparing bolt transfer:(7)
2015-05-16T17:57:18.839+0800 c.i.s.b.TransferBolt[WARN]TransferBolt prepare() method invoked
2015-05-16T17:57:18.839+0800 b.s.d.executor [INFO]Prepared bolt transfer:(7)
2015-05-16T17:57:18.840+0800 b.s.d.executor [INFO]Preparing bolt __acker:(1)
2015-05-16T17:57:18.841+0800 b.s.d.executor [INFO]Opening spout random:(3)
2015-05-16T17:57:18.841+0800 b.s.d.executor [INFO]Preparing bolt writer:(9)
2015-05-16T17:57:18.842+0800 c.i.s.b.WriterBolt[WARN]WriterBolt prepare() method invoked
2015-05-16T17:57:18.842+0800 b.s.d.executor [INFO]Prepared bolt writer:(9)
2015-05-16T17:57:18.846+0800 b.s.d.executor [INFO]Prepared bolt __acker:(1)
2015-05-16T17:57:18.848+0800 c.i.s.s.RandomWordSpout[WARN]RandomWordSpout open() method invoked
2015-05-16T17:57:18.854+0800 b.s.d.executor [INFO]Opened spout random:(3)
2015-05-16T17:57:18.867+0800 b.s.d.executor [INFO]Prepared bolt __system:(-1)
2015-05-16T17:57:18.873+0800 b.s.d.executor [INFO]Activating spout random:(3)
2015-05-16T17:57:18.873+0800 c.i.s.s.RandomWordSpout[WARN]RandomWordSpout activate() method invoked
2015-05-16T17:57:18.873+0800 c.i.s.s.RandomWordSpout[WARN]RandomWordSpout nextTuple() method invoked
2015-05-16T17:57:19.159+0800 b.s.d.executor [INFO]Processing received message source: random:4, stream: default, id:{},[hadoop]
2015-05-16T17:57:19.160+0800 c.i.s.b.TransferBolt[WARN]TransferBolt execute() method invoked
2015-05-16T17:57:19.161+0800 b.s.d.task [INFO]Emitting: transfer default [hadoop]
2015-05-16T17:57:19.162+0800 b.s.d.executor [INFO]Processing received message source: transfer:7, stream: default, id:{},[hadoop]
2015-05-16T17:57:19.162+0800 c.i.s.b.WriterBolt[WARN]WriterBolt execute() method invoked
转载于:https://www.cnblogs.com/51runsky/p/4572823.html
3. Storm编程框架相关推荐
- Storm编程模型总结
目录 前言: 1.Storm编程模型 2.对应的的WordCount案例 总结: 目录 前言: 对于Storm的编程模型有必要做一个详细的介绍(配合WC案例来介绍) 1.Storm编程模型 上图中组件 ...
- 从0开始搭建编程框架——主框架和源码
一个良好的结构是"对修改关闭,对扩展开放"的.(转载请指明出于breaksoftware的csdn博客) 这个过程就像搭建积木.框架本身需要有足够的向内扩展能力以使自身有进化能力, ...
- 从0开始搭建编程框架——思考
需求来源于问题.(转载请指明出于breaksoftware的csdn博客) 之前有个人做前端开发的同学在群里问"C语言能做什么?能写网页么?",然后大家就开始基于这个问题展开争辩. ...
- 网络与服务器编程框架库 acl_3.0.13 发布
2019独角兽企业重金招聘Python工程师标准>>> acl 3.0.13 版本 (项目主页:https://sourceforge.net/projects/acl/,技术文章主 ...
- python3 socketserver模块 网络服务编程框架
socket编程过于底层,编程虽然有套路,但是想要写出健壮的代码还是比较困难的,所以很多语言都对socket底层 API进行封装,Python的封装就是--socketserver模块.它是网络服务编 ...
- Win32汇编基本编程框架
Win32汇编编程框架如下: .386 .model flat,stdcall option casemap:none <一些include语句> .stack [堆栈段的大小] .dat ...
- 第五章-分布式并行编程框架MapReduce
第五章-分布式并行编程框架MapReduce 文章目录 第五章-分布式并行编程框架MapReduce MapReduce概述 分布式并行编程 MapReduce模型和函数 MapReduce体系结构 ...
- 网络与服务器编程框架库 acl_3.0.12 发布
2019独角兽企业重金招聘Python工程师标准>>> acl 3.0.12 版本 (项目主页:https://sourceforge.net/projects/acl/, 技术文 ...
- 吴恩达《优化深度神经网络》精炼笔记(3)-- 超参数调试、Batch正则化和编程框架...
AI有道 不可错过的AI技术公众号 关注 重要通知 本公众号原名"红色石头的机器学习之路"已经改名为"AI有道",请大家留意并继续关注本公众号!谢谢! 上节课我 ...
最新文章
- 基于java多线程来实现生产者和消费者的实例
- MySQL 调优基础:Linux内存管理 Linux文件系统 Linux 磁盘IO Linux网络
- intellij idea 1314 插件推荐及快速上手建议 (已更新!)
- ym——Android之ListView性能优化
- sklearn自学指南(part49)--字典学习
- Python——assert(断言)主进程级的终止判断语句
- Andorid Binder进程间通信---Binder本地对象,实体对象,引用对象,代理对象的引用计数...
- 面向对象(Python):学习笔记之多态
- 用计算机绘画教学反思,画图_《画图》教学反思
- Python加载失败
- matplotlib 设置中文字体
- 智慧家居·万物互联:我的智能花盆DIY之旅(ESP32)
- 李永乐线性代数辅导讲义第四章学霸小结
- 为什么计算机三分技术七分管理,如何理解“七分管理,三分技术,运作贯穿始终”?...
- 匿名邮件爆迅雷看看丑闻
- 从架构设计角度分析AAC源码-Room注解使用大全(基于2.4版本源码解析)(一)
- oracle 判断重复次数,sql 查询 某字段 重复次数 最多的记录
- 上传文件时,文件名中文乱码
- Linux的markdown笔记软件,3款免费好用的Markdown笔记应用,可以替代印象笔记
- Codeforces Round #509 (Div. 2) F. Ray in the tube(思维)