文章目录

  • 一、Spout类
  • 二、Bolt类
  • 三、源码实现
    • 3.1 MySpout 类
  • 3.2 MyBolt 类
  • 3.3 测试类

一、Spout类

Spout类extends BaseRichSpout,BaseRichSpout extends BaseComponent implements IRichSpout,IRichSpout extends ISpout;分析ISpout几个方法:

1、void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
任务调用的时候,在一个worker上初始化;提供了集群拓扑作业的配置信息、当前作业的任务信息、collector用来发送封装的tuples单元
2、void nextTuple();
strom要求spout发送数据给output collector,非阻塞式方法,如果没有数据发送,该方法就会return;

二、Bolt类

Bolt类extends BaseRichBolt,BaseRichBolt实现IRichBolt,IRichBolt继承IBolt,接下来分析IBolt几个方法:

1、 void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
根据注释,当任务来了的时候,这个会在集群中的某个worker节点被初始化,他提供了Bolt的运行环境。3个参数:stormConf给当前Bolt准备的配置对象;context可以获取任务的位置信息,包括任务id和组件id、输入输出流的信息;collector发送数据

2、 void execute(Tuple input);
处理一个单一的tuple输入流,元组对象包含了metadata元数据信息(封装了发送的数据的来自于哪个组件、哪个流、哪个任务),value值可以被获取到Tuple对象的getValue方法

当然,它们都有一个共同的方法-declareOutputFields,负责给所有的流声明了output输出策略。

三、源码实现

3.1 MySpout 类

在这个类中,我们首先要在open方法中初始化,然后在nextTuple方法中,不停的采集数据、向后发送数据。在调用collector的emit方法向后发射数据的时候,要对后面的Bolt声明发送数据的字段名称。类似于Android中使用Intent、SP传值时定义的类型、名称。

public class MySpout extends BaseRichSpout{Map map;TopologyContext context;SpoutOutputCollector collector;int i=0;/*** 配置初始化spout类,提升作用域* */@Overridepublic void open(Map map, TopologyContext context, SpoutOutputCollector collector) {// TODO Auto-generated method stubthis.map=map;this.context=context;this.collector=collector;}/*** 采集,向后发送数据* */@Overridepublic void nextTuple() {i++;List num = new Values(i);//看他的实现,这个Values就是一个可变数组,里面在不断的循环this.collector.emit(num);//把Values发出去就行了
//      this.collector.emit(num,2);//把Values发出去就行了System.err.println("spuot---------"+i);Utils.sleep(1000);}/*** 向接收数据的逻辑处理单元发送数据的字段名称* */@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("num"));//可以声明多个,取决于我向后发送了几个。类似于intent和sp的方式}}

看Values()的实现,它其实就是一个可变数组,里面在不停的循环:

public Values(Object... vals) {super(vals.length);for(Object o: vals) {add(o);}}

3.2 MyBolt 类

这个类的作用就是接受上一个Spout发送过来的数据,并求和累加。过程还是首先在prepare方法中进行初始化,然后在execute方法中根据Spout定义的发送数据字段名称,来获取到传递过来的数据。很显然这一步就能满足需求,无需再继续向后发射数据了。

public class MyBolt extends BaseRichBolt{Map stormConf;TopologyContext context;OutputCollector collector;int sum=0;@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.stormConf=stormConf;this.context=context;this.collector=collector;}/*** 获取数据,有必要的话,向后继续发送数据* */@Overridepublic void execute(Tuple input) {//1.获取到数据int i = input.getIntegerByField("num");//因为我发过来的就是int类型,和intent、sp方式极其类似//2.求和累加sum+=i;System.err.println("sum:=============================="+sum);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub}}

3.3 测试类

首先要构建拓扑结构,并设置Spout、Bolt,指定分发策略。这里才用的是shuffleGrouping的分发策略。最后创建本地化集群,将我们的作业提交到集群运行即可。

public class Test {/*** 构建拓扑结构,放入集群运行* @param args命令行参数*/public static void main(String[] args) {//构建storm拓扑结构TopologyBuilder tb = new TopologyBuilder();tb.setSpout("wsspout", new MySpout());//通过shuffleGrouping将spout和bolt联系起来,指定分发策略。并行度3,写不写都行tb.setBolt("wsbolt", new MyBolt(),3).shuffleGrouping("wsspout");//创建本地storm集群LocalCluster lc = new LocalCluster();Config config = new Config();//提交作业到本地集群,拓扑作业name、作业配置信息、拓扑创建lc.submitTopology("wordsum", config, tb.createTopology());}
}

使用Storm实现WordSum相关推荐

  1. 大数据技术之_17_Storm学习_Storm 概述+Storm 基础知识+Storm 集群搭建+Storm 常用 API+Storm 分组策略和并发度

    大数据技术之_17_Storm学习 一 Storm 概述 1.1 离线计算是什么? 1.2 流式计算是什么? 1.3 Storm 是什么? 1.4 Storm 与 Hadoop 的区别 1.5 Sto ...

  2. 【Storm】Spout的storm-starter及Grouping策略、并发度讲解、网站浏览量和用户数统计

    maven先安装好. 以下讲storm-starter的使用. 1.从github下载官方的storm-starter例子包,是maven工程, 地址 https://github.com/natha ...

  3. Apache 流框架 Flink,Spark Streaming,Storm对比分析(一)

    https://bigdata.163.com/product/article/5 Apache 流框架 Flink,Spark Streaming,Storm对比分析(一) 转载于:https:// ...

  4. a prefect storm歌词_Storm s Perfect Storm歌词

    Storm s Perfect Storm歌词 添加日期:2003-05-05 时长:02分18秒 歌手:X Men Artist: Sara Evans Album: Sara Evans : Re ...

  5. 聊聊storm的stream的分流与合并

    序 本文主要研究一下storm的stream的分流与合并 实例 @Testpublic void testStreamSplitJoin() throws InvalidTopologyExcepti ...

  6. 流式大数据处理的三种框架:Storm,Spark和Samza

    2019独角兽企业重金招聘Python工程师标准>>> 许多分布式计算系统都可以实时或接近实时地处理大数据流.本文将对三种Apache框架分别进行简单介绍,然后尝试快速.高度概述其异 ...

  7. storm入门教程 第一章 前言[转]

    1.1   实时流计算 互联网从诞生的第一时间起,对世界的最大的改变就是让信息能够实时交互,从而大大加速了各个环节的效率.正因为大家对信息实时响应.实时交互的需求,软件行业除了个人操作系统之外,数据库 ...

  8. 【大数据实时计算框架】Storm框架

    一.大数据实时计算框架 1.什么是实时计算?流式计算? (一)什么是Storm? Storm为分布式实时计算提供了一组通用原语,可被用于"流处理"之中,实时处理消息并更新数据库.这 ...

  9. Storm 0.9安装指南

    Storm 0.9.2安装指南 0 Storm0.9的亮点 引用网上的描写叙述: "Storm 0.9.0.1版本号的第一亮点是引入了netty transport.Storm网络传输机制实 ...

最新文章

  1. 【算法编程】斐波那契数列
  2. python大作业思路_python大作业
  3. DCMTK:测试文件是否使用DICOM Part 10格式
  4. solaris配置php,Solaris下安装Oracle_启动Oracle及监听
  5. org.apache.maven.archiver.MavenArchiver.getManifest
  6. Linux 切mms数据流,libmms MMSH Server响应解析缓冲区溢出漏洞
  7. 去除内联元素之间的间距
  8. Linux操作系统进程模型分析
  9. Golang sha256 加密,PHP hash_hmac(‘sha256‘, $string, $key)加密,Js CryptoJS.HmacSHA256(string, key) 加密
  10. PDFlib+PDI图像和超文本元素提供了许多有用的功能
  11. python期权价格计算器_使用Python构建内在价值计算器
  12. 局域网sip服务器搭建:opensips
  13. LTE帧结构----符号长度
  14. bzoj 1853: [Scoi2010]幸运数字 容斥
  15. 2021CCPC网络预选赛(重赛)
  16. 弹指间,网页灰飞烟灭——Google灭霸彩蛋实现
  17. 中科院基于gpt的学术优化网站搭建教程
  18. 很久以前的一篇对初学Oracle建议的文章
  19. 【Java位运算】n1和n>>1含义
  20. 尚硅谷MySQL笔记

热门文章

  1. 制备pdms膜的方法_四川大学杨伟团队JMCC:实现柔性压阻式压力传感器的规模化制备...
  2. 一台计算机有64,在同一台计算机上使用带有32位和64位Altium设计软件的数据库元件库...
  3. UrlUtils工具类,Java URL工具类,Java URL链接工具类
  4. Python 学习笔记 - Memcached
  5. Windows搭建golang开发平台
  6. Python之数据聚合与分组运算
  7. 等概率随机函数的实现
  8. error BK1506
  9. 艾伟也谈项目管理,在团队中如何推行一项新的实践
  10. [转]arm汇编相关链接