打成jar包放在主节点上去运行.

  1 import java.util.Map;
  2
  3 import backtype.storm.Config;
  4 import backtype.storm.StormSubmitter;
  5 import backtype.storm.generated.AlreadyAliveException;
  6 import backtype.storm.generated.InvalidTopologyException;
  7 import backtype.storm.spout.SpoutOutputCollector;
  8 import backtype.storm.task.OutputCollector;
  9 import backtype.storm.task.TopologyContext;
 10 import backtype.storm.topology.OutputFieldsDeclarer;
 11 import backtype.storm.topology.TopologyBuilder;
 12 import backtype.storm.topology.base.BaseRichBolt;
 13 import backtype.storm.topology.base.BaseRichSpout;
 14 import backtype.storm.tuple.Fields;
 15 import backtype.storm.tuple.Tuple;
 16 import backtype.storm.tuple.Values;
 17 import backtype.storm.utils.Utils;
 18
 19 /**
 20  * 在集群运行
 21  * 数字累加求和
 22  * 先添加storm依赖
 23  *
 24  * @author Administrator
 25  *
 26  */
 27 public class StormTopologySum {
 28
 29
 30     /**
 31      * spout需要继承baserichspout,实现未实现的方法
 32      * @author Administrator
 33      *
 34      */
 35     public static class MySpout extends BaseRichSpout{
 36         private Map conf;
 37         private TopologyContext context;
 38         private SpoutOutputCollector collector;
 39
 40         /**
 41          * 初始化方法,只会执行一次
 42          * 在这里面可以写一个初始化的代码
 43          * Map conf:其实里面保存的是topology的一些配置信息
 44          * TopologyContext context:topology的上下文,类似于servletcontext
 45          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
 46          */
 47         @Override
 48         public void open(Map conf, TopologyContext context,
 49                 SpoutOutputCollector collector) {
 50             this.conf = conf;
 51             this.context = context;
 52             this.collector = collector;
 53         }
 54
 55         int num = 1;
 56         /**
 57          * 这个方法是spout中最重要的方法,
 58          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
 59          * 每调用一次,会向外发射一条数据
 60          */
 61         @Override
 62         public void nextTuple() {
 63             System.out.println("spout发射:"+num);
 64             //把数据封装到values中,称为一个tuple,发射出去
 65             this.collector.emit(new Values(num++));
 66             Utils.sleep(1000);
 67         }
 68
 69         /**
 70          * 声明输出字段
 71          */
 72         @Override
 73         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 74             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
 75             //fields中定义的参数和values中传递的数值是一一对应的
 76             declarer.declare(new Fields("num"));
 77         }
 78
 79     }
 80
 81
 82     /**
 83      * 自定义bolt需要实现baserichbolt
 84      * @author Administrator
 85      *
 86      */
 87     public static class MyBolt extends BaseRichBolt{
 88         private Map stormConf;
 89         private TopologyContext context;
 90         private OutputCollector collector;
 91
 92         /**
 93          * 和spout中的open方法意义一样
 94          */
 95         @Override
 96         public void prepare(Map stormConf, TopologyContext context,
 97                 OutputCollector collector) {
 98             this.stormConf = stormConf;
 99             this.context = context;
100             this.collector = collector;
101         }
102
103         int sum = 0;
104         /**
105          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
106          */
107         @Override
108         public void execute(Tuple input) {
109             //input.getInteger(0);//也可以根据角标获取tuple中的数据
110             Integer value = input.getIntegerByField("num");
111             sum+=value;
112             System.out.println("和:"+sum);
113         }
114
115         /**
116          * 声明输出字段
117          */
118         @Override
119         public void declareOutputFields(OutputFieldsDeclarer declarer) {
120             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
121             //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
122         }
123
124     }
125     /**
126      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
127      * @param args
128      */
129     public static void main(String[] args) {
130         //组装topology
131         TopologyBuilder topologyBuilder = new TopologyBuilder();
132         topologyBuilder.setSpout("spout1", new MySpout());
133         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
134         topologyBuilder.setBolt("bolt1", new MyBolt()).setNumTasks(2).shuffleGrouping("spout1");
135
136         //创建本地storm集群
137         /*LocalCluster localCluster = new LocalCluster();
138         localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());*/
139
140         //在集群运行
141         String simpleName = StormTopologySum.class.getSimpleName();
142         try {
143             Config stormConf = new Config();
144             //stormConf.setNumWorkers(2);
145             StormSubmitter.submitTopology(simpleName, stormConf, topologyBuilder.createTopology());
146         } catch (AlreadyAliveException e) {
147             e.printStackTrace();
148         } catch (InvalidTopologyException e) {
149             e.printStackTrace();
150         }
151     }
152 }

转载于:https://www.cnblogs.com/DreamDrive/p/5786198.html

Storm累计求和Demo并且在集群上运行相关推荐

  1. 在生产集群上运行topology

    2019独角兽企业重金招聘Python工程师标准>>> 在生产集群上运行topology 博客分类: 分布式计算 在生产集群上运行topology跟本地模式差不多.下面是步骤: 1) ...

  2. pythonspark集群模式运行_有关python numpy pandas scipy 等 能在YARN集群上 运行PySpark

    有关这个问题,似乎这个在某些时候,用python写好,且spark没有响应的算法支持, 能否能在YARN集群上 运行PySpark方式, 将python分析程序提交上去? Spark Applicat ...

  3. 从认证到调度,K8s 集群上运行的小程序到底经历了什么?

    作者 | 声东  阿里云售后技术专家 导读:不知道大家有没有意识到一个现实:大部分时候,我们已经不像以前一样,通过命令行,或者可视窗口来使用一个系统了. 前言 现在我们上微博.或者网购,操作的其实不是 ...

  4. 在local模式下的spark程序打包到集群上运行

    一.前期准备 前期的环境准备,在Linux系统下要有Hadoop系统,spark伪分布式或者分布式,具体的教程可以查阅我的这两篇博客: Hadoop2.0伪分布式平台环境搭建 Spark2.4.0伪分 ...

  5. spark在集群上运行

    1.spark在集群上运行应用的详细过程 (1)用户通过spark-submit脚本提交应用 (2)spark-submit脚本启动驱动器程序,调用用户定义的main()方法 (3)驱动器程序与集群管 ...

  6. anaconda3环境整体打包放在Spark集群上运行

    一.将虚拟Python环境打包 创建好环境后,进入到环境所在的文件夹,例如环境是/home/hadoop/anaconda3/envs, cd到envs下,使用打包命令将当前目录下的文件打成zip包: ...

  7. 如何在集群上运行Shark

    如何在集群上运行Shark 本文介绍在计算机集群上如何启动和运行Shark.如果对Amazon EC2上运行Shark感兴趣,请点击这里查看如何使用EC2脚本快速启动预先配置好的集群. 依赖: 注意: ...

  8. 小白学习Spark03-在集群上运行Spark

    03 在集群上运行Spark 3.1 Spark运行架构 3.1.1 驱动器节点 3.1.2 执行器节点 3.1.3 集群管理器 3.1.4 启动Spark程序 3.1.5 小结 3.2 使用spar ...

  9. MapReduce作业在Hadoop完全分布式集群上运行的问题与思考(持续更新)

    1.集群已搭建好且通过了WordCount测试,但是在eclipse上开发的程序却仍然是只在namenode上运行 不知道是不是没有配置好eclipse上的Map/Reduce Locations,个 ...

最新文章

  1. 8)排序②排序算法之选择排序[1]直接选择排序
  2. Java重写equals()和hashCode()
  3. 虚拟机在Hyper-V和Citrix Xenserver上的区别
  4. 201612-5 卡牌游戏
  5. Linux C中发现无法连接到math.h中的数学函数解决办法
  6. java valueof_Java Short类valueOf()方法及示例
  7. 网页设计个人主页源码_WebSSH - 网页上的SSH终端
  8. 微软将 Teams 移动应用纳入漏洞奖励计划,最高奖金3万美元
  9. 图神经网络GNN论文2019-2020顶会列表
  10. mysql在windows配置多节点_Windows环境配置MySQL集群
  11. win10每次开机桌面计算机就没有,怎样解决Win10开机需要按F1才能进入桌面
  12. vue3.0之-watch全面解析
  13. RT-Thread:W25Q128虚拟U盘并搭载文件系统
  14. 人工智能及其应用——第一章学习笔记
  15. Java 11 – ChaCha20-Poly1305加密示例
  16. matlab使用CVX求解优化问题时,如果变量搜索空间过大,导致求解的数值解相当不准确,通过变量替换,缩小搜索空间
  17. Java之-springboot
  18. Python 函数嵌套
  19. html-Basics
  20. 投掷硬币实matlab,Matlab扔硬币程序设计

热门文章

  1. php页面栏目访问权限,PHPCMS 栏目和内容浏览权限的解决方法
  2. cpu居高不下 linux,linux cpu居高不下 调试
  3. unicode_literals导致的UnicodeEncodeError
  4. oracle获取 表名,Oracle获取当前数据库的所有表名字段名和注释
  5. 转结构体_golang处理gb2312转utf8编码的问题
  6. int类型存小数 mysql_2020年最新版MySQL面试题(一)
  7. 线性表的应用之多项式的表示与相加
  8. qt中初始化界面的几种方法
  9. ie php脚本引擎,使用php重新实现PHP脚本引擎内置函数
  10. php $_post 报错,关于php输入$_post[‘’]报错的原因