实例分析lifeCycle:

RandomWordSpout

  1.  1 package cn.itcast.storm.spout;
     2 import java.util.Map;
     3 import java.util.Random;
     4 import org.apache.commons.logging.Log;
     5 import org.apache.commons.logging.LogFactory;
     6 import backtype.storm.spout.SpoutOutputCollector;
     7 import backtype.storm.task.TopologyContext;
     8 import backtype.storm.topology.OutputFieldsDeclarer;
     9 import backtype.storm.topology.base.BaseRichSpout;
    10 import backtype.storm.tuple.Fields;
    11 import backtype.storm.tuple.Values;
    12 import backtype.storm.utils.Utils;
    13 public class RandomWordSpout extends BaseRichSpout {
    14     private static final long serialVersionUID = -4287209449750623371L;
    15
    16     private static final Log log = LogFactory.getLog(RandomWordSpout.class);
    17     private SpoutOutputCollector collector;
    18
    19     private String[] words = new String[]{"storm", "hadoop", "hive", "flume"};
    20
    21     private Random random = new Random();
    22
    23     public RandomWordSpout() {
    24         log.warn("RandomWordSpout constructor method invoked");
    25     }
    26     @Override
    27     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    28         log.warn("RandomWordSpout open() method invoked");
    29         this.collector = collector;
    30     }
    31     @Override
    32     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    33         log.warn("RandomWordSpout declareOutputFields() method invoked");
    34         declarer.declare(new Fields("str"));
    35     }
    36     @Override
    37     public void nextTuple() {
    38         log.warn("RandomWordSpout nextTuple() method invoked");
    39         Utils.sleep(500);
    40         String str = words[random.nextInt(words.length)];
    41         collector.emit(new Values(str));
    42     }
    43     @Override
    44     public void activate() {
    45         log.warn("RandomWordSpout activate() method invoked");
    46     }
    47     @Override
    48     public void deactivate() {
    49         log.warn("RandomWordSpout deactivate() method invoked");
    50     }
    51 }

TransferBolt

  1.  1 package cn.itcast.storm.bolt;
     2 import java.util.Map;
     3 import org.apache.commons.logging.Log;
     4 import org.apache.commons.logging.LogFactory;
     5 import backtype.storm.task.TopologyContext;
     6 import backtype.storm.topology.BasicOutputCollector;
     7 import backtype.storm.topology.OutputFieldsDeclarer;
     8 import backtype.storm.topology.base.BaseBasicBolt;
     9 import backtype.storm.tuple.Fields;
    10 import backtype.storm.tuple.Tuple;
    11 import backtype.storm.tuple.Values;
    12 public class TransferBolt extends BaseBasicBolt {
    13     private static final long serialVersionUID = 4223708336037089125L;
    14     private static final Log log = LogFactory.getLog(TransferBolt.class);
    15
    16     public TransferBolt() {
    17         log.warn("TransferBolt constructor method invoked");
    18     }
    19
    20     @Override
    21     public void prepare(Map stormConf, TopologyContext context) {
    22         log.warn("TransferBolt prepare() method invoked");
    23     }
    24     @Override
    25     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    26         log.warn("TransferBolt declareOutputFields() method invoked");
    27         declarer.declare(new Fields("word"));
    28     }
    29     @Override
    30     public void execute(Tuple input, BasicOutputCollector collector) {
    31         log.warn("TransferBolt execute() method invoked");
    32         String word = input.getStringByField("str");
    33         collector.emit(new Values(word));
    34     }
    35 }


WriterBolt

  1.  1 package cn.itcast.storm.bolt;
     2 import java.io.FileWriter;
     3 import java.io.IOException;
     4 import java.util.Map;
     5 import org.apache.commons.logging.Log;
     6 import org.apache.commons.logging.LogFactory;
     7 import backtype.storm.task.TopologyContext;
     8 import backtype.storm.topology.BasicOutputCollector;
     9 import backtype.storm.topology.OutputFieldsDeclarer;
    10 import backtype.storm.topology.base.BaseBasicBolt;
    11 import backtype.storm.tuple.Tuple;
    12 public class WriterBolt extends BaseBasicBolt {
    13     private static final long serialVersionUID = -6586283337287975719L;
    14
    15     private static final Log log = LogFactory.getLog(WriterBolt.class);
    16
    17     private FileWriter writer = null;
    18
    19     public WriterBolt() {
    20         log.warn("WriterBolt constructor method invoked");
    21     }
    22     @Override
    23     public void prepare(Map stormConf, TopologyContext context) {
    24         log.warn("WriterBolt prepare() method invoked");
    25         try {
    26             writer = new FileWriter("/home/" + this);
    27         } catch (IOException e) {
    28             log.error(e);
    29             throw new RuntimeException(e);
    30         }
    31     }
    32     @Override
    33     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    34         log.warn("WriterBolt declareOutputFields() method invoked");
    35     }
    36
    37     @Override
    38     public void execute(Tuple input, BasicOutputCollector collector) {
    39         log.warn("WriterBolt execute() method invoked");
    40         String s = input.getString(0);
    41         try {
    42             writer.write(s);
    43             writer.write("\n");
    44             writer.flush();
    45         } catch (IOException e) {
    46             log.error(e);
    47             throw new RuntimeException(e);
    48         }
    49     }
    50 }

TopoMain
  1. package cn.itcast.storm.topology;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import backtype.storm.Config;
    import backtype.storm.StormSubmitter;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    import cn.itcast.storm.bolt.TransferBolt;
    import cn.itcast.storm.bolt.WriterBolt;
    import cn.itcast.storm.spout.RandomWordSpout;
    public class TopoMain {private static final Log log = LogFactory.getLog(TopoMain.class);public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("random", new RandomWordSpout(), 2);builder.setBolt("transfer", new TransferBolt(), 4).shuffleGrouping("random");builder.setBolt("writer", new WriterBolt(), 4).fieldsGrouping("transfer", new Fields("word"));Config conf = new Config();conf.setNumWorkers(2);conf.setDebug(true);log.warn("submitting topology...");StormSubmitter.submitTopology("life-cycle", conf, builder.createTopology());log.warn("topology submitted !");}
    }


方法执行顺序:
  • Spout方法调用顺势
  1. declareOutputFields()(调用一次)
  2. open() (调用一次)
  3. activate() (调用一次)
  4. nextTuple()    (循环调用 )
  5. deactivate() (手动调用)
  • Bolt方法调用顺序
  1. declareOutputFields() (调用一次)
  2. prepare() (调用一次)
  3. execute()     (循环执行)

执行日志:

  1. [root@master work]# storm jar lifeCycle1.jar cn.itcast.storm.topology.TopoMain
  2. Running:/usr/local/jdk/bin/java -client -Dstorm.options=-Dstorm.home=/usr/local/apache-storm-0.9.4-Dstorm.log.dir=/usr/local/apache-storm-0.9.4/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file=-cp /usr/local/apache-storm-0.9.4/lib/clj-time-0.4.1.jar:....-Dstorm.jar=lifeCycle1.jar cn.itcast.storm.topology.TopoMain
  3. 464[main] WARN cn.itcast.storm.spout.RandomWordSpout-RandomWordSpout constructor method invoked #初始化对象,执行构造方法
  4. 490[main] WARN cn.itcast.storm.bolt.TransferBolt-TransferBolt constructor method invoked
  5. 505[main] WARN cn.itcast.storm.bolt.WriterBolt-WriterBolt constructor method invoked
  6. 515[main] WARN cn.itcast.storm.topology.TopoMain- submitting topology...
  7. 516[main] WARN cn.itcast.storm.bolt.TransferBolt-TransferBolt declareOutputFields()method invoked
  8. 906[main] WARN cn.itcast.storm.bolt.WriterBolt-WriterBolt declareOutputFields() method invoked
  9. 909[main] WARN cn.itcast.storm.spout.RandomWordSpout-RandomWordSpout declareOutputFields() method invoked
  10. 1106[main] INFO backtype.storm.StormSubmitter-Jar not uploaded to master yet.Submitting jar...
  11. 1117[main] INFO backtype.storm.StormSubmitter-Uploading topology jar lifeCycle1.jar to assigned location:/tmp/storm/nimbus/inbox/stormjar-13252904-45c2-41e8-8703-957feae2bf27.jar
  12. 1361[main] INFO backtype.storm.StormSubmitter-Successfully uploaded topology jar to assigned location:/tmp/storm/nimbus/inbox/stormjar-13252904-45c2-41e8-8703-957feae2bf27.jar
  13. 1362[main] INFO backtype.storm.StormSubmitter-Submitting topology life-cycle in distributed mode with conf {"topology.workers":2,"topology.debug":true}
  14. 1568[main] INFO backtype.storm.StormSubmitter-Finished submitting topology: life-cycle
  15. 1568[main] WARN cn.itcast.storm.topology.TopoMain- topology submitted !
worker日志

  1. 2015-05-16T17:57:18.295+0800 b.s.d.worker [INFO]Worker6ae03c97-dac4-4ef3-9f10-227de1219b16for storm life-cycle-4-1431770222 on 1360b011-2e64-4964
  2. -9f6c-d849db954ff2:6703 has finished loading
  3. 2015-05-16T17:57:18.797+0800 b.s.d.executor [INFO]Preparing bolt transfer:(5)
  4. 2015-05-16T17:57:18.798+0800 b.s.d.executor [INFO]Preparing bolt writer:(11)
  5. 2015-05-16T17:57:18.812+0800 c.i.s.b.WriterBolt[WARN]WriterBolt prepare() method invoked
  6. 2015-05-16T17:57:18.813+0800 b.s.d.executor [INFO]Prepared bolt writer:(11)
  7. 2015-05-16T17:57:18.820+0800 c.i.s.b.TransferBolt[WARN]TransferBolt prepare() method invoked
  8. 2015-05-16T17:57:18.821+0800 b.s.d.executor [INFO]Prepared bolt transfer:(5)
  9. 2015-05-16T17:57:18.834+0800 b.s.d.executor [INFO]Preparing bolt __system:(-1)
  10. 2015-05-16T17:57:18.834+0800 b.s.d.executor [INFO]Preparing bolt transfer:(7)
  11. 2015-05-16T17:57:18.839+0800 c.i.s.b.TransferBolt[WARN]TransferBolt prepare() method invoked
  12. 2015-05-16T17:57:18.839+0800 b.s.d.executor [INFO]Prepared bolt transfer:(7)
  13. 2015-05-16T17:57:18.840+0800 b.s.d.executor [INFO]Preparing bolt __acker:(1)
  14. 2015-05-16T17:57:18.841+0800 b.s.d.executor [INFO]Opening spout random:(3)
  15. 2015-05-16T17:57:18.841+0800 b.s.d.executor [INFO]Preparing bolt writer:(9)
  16. 2015-05-16T17:57:18.842+0800 c.i.s.b.WriterBolt[WARN]WriterBolt prepare() method invoked
  17. 2015-05-16T17:57:18.842+0800 b.s.d.executor [INFO]Prepared bolt writer:(9)
  18. 2015-05-16T17:57:18.846+0800 b.s.d.executor [INFO]Prepared bolt __acker:(1)
  19. 2015-05-16T17:57:18.848+0800 c.i.s.s.RandomWordSpout[WARN]RandomWordSpout open() method invoked
  20. 2015-05-16T17:57:18.854+0800 b.s.d.executor [INFO]Opened spout random:(3)
  21. 2015-05-16T17:57:18.867+0800 b.s.d.executor [INFO]Prepared bolt __system:(-1)
  22. 2015-05-16T17:57:18.873+0800 b.s.d.executor [INFO]Activating spout random:(3)
  23. 2015-05-16T17:57:18.873+0800 c.i.s.s.RandomWordSpout[WARN]RandomWordSpout activate() method invoked
  24. 2015-05-16T17:57:18.873+0800 c.i.s.s.RandomWordSpout[WARN]RandomWordSpout nextTuple() method invoked
  25. 2015-05-16T17:57:19.159+0800 b.s.d.executor [INFO]Processing received message source: random:4, stream: default, id:{},[hadoop]
  26. 2015-05-16T17:57:19.160+0800 c.i.s.b.TransferBolt[WARN]TransferBolt execute() method invoked
  27. 2015-05-16T17:57:19.161+0800 b.s.d.task [INFO]Emitting: transfer default [hadoop]
  28. 2015-05-16T17:57:19.162+0800 b.s.d.executor [INFO]Processing received message source: transfer:7, stream: default, id:{},[hadoop]
  29. 2015-05-16T17:57:19.162+0800 c.i.s.b.WriterBolt[WARN]WriterBolt execute() method invoked

转载于:https://www.cnblogs.com/51runsky/p/4572823.html

3. Storm编程框架相关推荐

  1. Storm编程模型总结

    目录 前言: 1.Storm编程模型 2.对应的的WordCount案例 总结: 目录 前言: 对于Storm的编程模型有必要做一个详细的介绍(配合WC案例来介绍) 1.Storm编程模型 上图中组件 ...

  2. 从0开始搭建编程框架——主框架和源码

    一个良好的结构是"对修改关闭,对扩展开放"的.(转载请指明出于breaksoftware的csdn博客) 这个过程就像搭建积木.框架本身需要有足够的向内扩展能力以使自身有进化能力, ...

  3. 从0开始搭建编程框架——思考

    需求来源于问题.(转载请指明出于breaksoftware的csdn博客) 之前有个人做前端开发的同学在群里问"C语言能做什么?能写网页么?",然后大家就开始基于这个问题展开争辩. ...

  4. 网络与服务器编程框架库 acl_3.0.13 发布

    2019独角兽企业重金招聘Python工程师标准>>> acl 3.0.13 版本 (项目主页:https://sourceforge.net/projects/acl/,技术文章主 ...

  5. python3 socketserver模块 网络服务编程框架

    socket编程过于底层,编程虽然有套路,但是想要写出健壮的代码还是比较困难的,所以很多语言都对socket底层 API进行封装,Python的封装就是--socketserver模块.它是网络服务编 ...

  6. Win32汇编基本编程框架

    Win32汇编编程框架如下: .386 .model flat,stdcall option casemap:none <一些include语句> .stack [堆栈段的大小] .dat ...

  7. 第五章-分布式并行编程框架MapReduce

    第五章-分布式并行编程框架MapReduce 文章目录 第五章-分布式并行编程框架MapReduce MapReduce概述 分布式并行编程 MapReduce模型和函数 MapReduce体系结构 ...

  8. 网络与服务器编程框架库 acl_3.0.12 发布

    2019独角兽企业重金招聘Python工程师标准>>> acl 3.0.12 版本 (项目主页:https://sourceforge.net/projects/acl/,  技术文 ...

  9. 吴恩达《优化深度神经网络》精炼笔记(3)-- 超参数调试、Batch正则化和编程框架...

    AI有道 不可错过的AI技术公众号 关注 重要通知 本公众号原名"红色石头的机器学习之路"已经改名为"AI有道",请大家留意并继续关注本公众号!谢谢! 上节课我 ...

最新文章

  1. 基于java多线程来实现生产者和消费者的实例
  2. MySQL 调优基础:Linux内存管理 Linux文件系统 Linux 磁盘IO Linux网络
  3. intellij idea 1314 插件推荐及快速上手建议 (已更新!)
  4. ym——Android之ListView性能优化
  5. sklearn自学指南(part49)--字典学习
  6. Python——assert(断言)主进程级的终止判断语句
  7. Andorid Binder进程间通信---Binder本地对象,实体对象,引用对象,代理对象的引用计数...
  8. 面向对象(Python):学习笔记之多态
  9. 用计算机绘画教学反思,画图_《画图》教学反思
  10. Python加载失败
  11. matplotlib 设置中文字体
  12. 智慧家居·万物互联:我的智能花盆DIY之旅(ESP32)
  13. 李永乐线性代数辅导讲义第四章学霸小结
  14. 为什么计算机三分技术七分管理,如何理解“七分管理,三分技术,运作贯穿始终”?...
  15. 匿名邮件爆迅雷看看丑闻
  16. 从架构设计角度分析AAC源码-Room注解使用大全(基于2.4版本源码解析)(一)
  17. oracle 判断重复次数,sql 查询 某字段 重复次数 最多的记录
  18. 上传文件时,文件名中文乱码
  19. Linux的markdown笔记软件,3款免费好用的Markdown笔记应用,可以替代印象笔记
  20. Codeforces Round #509 (Div. 2) F. Ray in the tube(思维)

热门文章

  1. Python-Matplotlib可视化(4)——添加注释让统计图通俗易懂
  2. Ubuntu18.04系统中python3.7安装MultiNEAT库
  3. linux ping 命令_Linux ping命令示例
  4. Android RadioButton,使用Kotlin的RadioGroup
  5. UITableView上的iOS UIRefreshControl
  6. Linux下C语言的调试器 Gdb
  7. 网络人工智能研究方向有哪些?
  8. 产品经理该如何做竞品分析
  9. C++基础教程之数组
  10. Java基础:什么是List接口,如何去运用?