spout介绍

一个spout是由流组成的数据源在storm的拓扑里,通常情况下会读取外部的数据源 
然后emit(发射)到拓扑里面,比如是kafka,MySQL或者redis等等,Spout有两种实现一种是可靠的消息实现,如果发送失败则会重试,另外一种是不可靠的消息实现可能会出现消息丢失,spout可以一次声明多个数据流通过OutputFieldsDeclarer类的declareStream方法,当然前提是你的SpoutOutputCollector里的emit也是多个流

Spout里面主要的方法是nextTuple,它里面可以发射新的tuple到拓扑,或者当没有消息的时候就return,需要注意,这个方法里面不能阻塞,因为storm调用spout方法是单线程的,其他的主要方法是ack和fail,如果使用了可靠的spout,可以使用ack和fail来确定消息发送状态

相关扩展: 
IRichSpout:spout类必须实现的接口 
BaseRichSpout :可靠的spout有ack确保 
BaseBasicSpout :不可靠的spout

1.Spout组件:创建Spout(WordCountSpout)组件采集数据,作为整个Topology的数据源

WordCountSpout:
package storm;import lombok.SneakyThrows;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;/*** @author: wtl* @License: (C) Copyright 2020, wtl Corporation Limited.* @Contact: 1050100468@qq.com* @Date: 2020/11/27 5:14* @Version: 1.0* @Description:*/
public class WordCountSpout implements IRichSpout {private SpoutOutputCollector spoutOutputCollector;private String [] lines = {"I love you","hello world","hello kitty","nihao beijing"};private Random random = new Random();@Overridepublic void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.spoutOutputCollector = spoutOutputCollector;}@Overridepublic void close() {}@Overridepublic void activate() {}@Overridepublic void deactivate() {}@SneakyThrows@Overridepublic void nextTuple() {int nextInt = random.nextInt(lines.length);spoutOutputCollector.emit(new Values(lines[nextInt]));TimeUnit.SECONDS.sleep(1);}@Overridepublic void ack(Object o) {}@Overridepublic void fail(Object o) {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("line"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}
}

bolt介绍

Bolts 业务处理单元 
所有的拓扑处理都会在bolt中进行,bolt里面可以做任何etl,比如过滤,函数,聚合,连接,写入数据库系统或缓存等,一个bolt可以做简单的事件流转换,如果是复杂的流转化,往往需要多个bolt参与,这就是流计算,每个bolt都进行一个业务逻辑处理,bolt也可以emit多个流到下游,通过declareStream方法声明输出的schema。

Bolt里面主要的方法是execute方法,每次处理一个输入的tuple,bolt里面也可以发射新的tuple使用OutputCollector类,bolt里面每处理一个tuple必须调用ack方法以便于storm知道某个tuple何时处理完成。Strom里面的IBasicBolt接口可以自动 
调用ack。

相关拓展: 
IRichBolt:bolts的通用接口 
IBasicBolt:扩展的bolt接口,可以自动处理ack 
OutputCollector:bolt发射tuple到下游bolt里面

2.Bolt组件1:创建Bolt(WordCountSplitBolt)组件进行分词操作

SplitBolt:
package storm;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.Map;
import java.util.stream.Stream;/*** @author: wtl* @License: (C) Copyright 2020, wtl Corporation Limited.* @Contact: 1050100468@qq.com* @Date: 2020/11/27 5:41* @Version: 1.0* @Description:*/
public class SplitBolt implements IRichBolt {private TopologyContext topologyContext;private OutputCollector outputCollector;@Overridepublic void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {this.topologyContext = topologyContext;this.outputCollector = outputCollector;}@Overridepublic void execute(Tuple tuple) {String line = tuple.getStringByField("line");String[] splits = line.split(" ");Stream.of(splits).forEach(word -> {outputCollector.emit(new Values(word,1));});}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("word","count"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}
}

3.Bolt组件2:创建Bolt(WordCountTotalBolt)组件进行单词统计操作

TotalBolt:
package storm;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.HashMap;
import java.util.Map;/*** @author: wtl* @License: (C) Copyright 2020, wtl Corporation Limited.* @Contact: 1050100468@qq.com* @Date: 2020/11/27 5:57* @Version: 1.0* @Description:*/
public class TotalBolt implements IRichBolt {private TopologyContext topologyContext;private OutputCollector outputCollector;private Map<String,Integer> map;@Overridepublic void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {this.topologyContext = topologyContext;this.outputCollector = outputCollector;this.map = new HashMap<>();}@Overridepublic void execute(Tuple tuple) {String word = tuple.getStringByField("word");Integer count = tuple.getIntegerByField("count");if (!map.containsKey(word)){map.put(word,count);}else{map.put(word,map.get(word) + count);}map.forEach((key, value) -> {System.out.println(key + ": " + value);});}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("word","total"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}
}

4.Topology主程序:

WordCountTopology:
package storm;import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;/*** @author: wtl* @License: (C) Copyright 2020, wtl Corporation Limited.* @Contact: 1050100468@qq.com* @Date: 2020/11/27 6:06* @Version: 1.0* @Description:*/
public class WordCountTopology {public static void main(String[] args) throws Exception {TopologyBuilder topologyBuilder = new TopologyBuilder();topologyBuilder.setSpout("wcSpout",new WordCountSpout(),1);topologyBuilder.setBolt("wcSplitBolt",new SplitBolt(),1).shuffleGrouping("wcSpout");topologyBuilder.setBolt("wcTotalBolt",new TotalBolt(),1).fieldsGrouping("wcSplitBolt",new Fields("word"));//创建任务StormTopology job = topologyBuilder.createTopology();Config config = new Config();//运行任务有两种模式//1 本地模式   2 集群模式//1、本地模式LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("MyWordCount",config,job);}
}

Storm的WordCount案例(spout bolt详细总结)相关推荐

  1. BigData之Storm:Apache Storm的简介、深入理解、下载、案例应用之详细攻略

    BigData之Storm:Apache Storm的简介.深入理解.下载.案例应用之详细攻略 目录 Apache Storm的简介 Apache Storm的深入理解 1.Storm与hadoop ...

  2. Storm精华问答 | 如何理解spout/bolt的生命周期?

    戳蓝字"CSDN云计算"关注我们哦! Storm是Twitter开源的分布式实时大数据处理框架,被业界称为实时版Hadoop.随着越来越多的场景对Hadoop的MapReduce高 ...

  3. MapReduce入门(一)—— MapReduce概述 + WordCount案例实操

    MapReduce入门(一)-- MapReduce概述 文章目录 MapReduce入门(一)-- MapReduce概述 1.1 MapReduce 定义 1.2 MapReduce 优缺点 1. ...

  4. Spark快速上手-WordCount案例

    在此之前,我已经用MapReduce 框架实现了WordCount案例,接下来,我开始学习数据处理的另外一个非常重要的方法:Spark.首先,使用WordCount案例实现Spark快速上手. 创建M ...

  5. WordCount案例

    WordCount案例 需求 1. 需求说明 2. 文件 案例分析 1.需求分析 2.输入数据 3.期望输出数据 4.Mapper类 5. Reducer类 6. Driver类 代码实现 1. 编写 ...

  6. Database之SQL:SQL之over partition by开窗函数的简介、使用方法(求各班级内各自排名/求各班级内第一名/求各班级内分数递增和等案例解析)之详细攻略

    Database之SQL:SQL之over partition by开窗函数的简介.使用方法(求各班级内各自排名/求各班级内第一名/求各班级内分数递增和等案例解析)之详细攻略 目录 over part ...

  7. ML之ME/LF:机器学习之风控业务中常用模型评估指标PSI(人群偏移度指标)的的简介、使用方法、案例应用之详细攻略

    ML之ME/LF:机器学习之风控业务中常用模型评估指标PSI(人群偏移度指标)的的简介.使用方法.案例应用之详细攻略 目录 PSI(稳定度指标)的简介 1.如何计算PSI? (1).PSI计算过程

  8. Dataset:Big Mart Sales数据集的简介、下载、案例应用之详细攻略

    Dataset:Big Mart Sales数据集的简介.下载.案例应用之详细攻略 目录 Big Mart Sales数据集的简介 1.特征解释 Big Mart Sales数据集的下载 Big Ma ...

  9. Python语言学习之lambda:lambda函数的简介、使用方法、案例大全之详细攻略

    Python语言学习之lambda:lambda函数的简介.使用方法.案例大全之详细攻略 目录 lambda函数的简介 1.lambda匿名函数的格式 2.lambda函数特点 3.lambda函数与 ...

最新文章

  1. GO 从零开始的语法学习二
  2. 中央处理器属于计算机外部设备吗,不属于计算机外部设备的是
  3. 2015-2020年各类国际会议与期刊基于图像的三维对象重建论文综述(7)——Datasets
  4. 惊呆!这辆悍马自己在跑跑跑跑跑跑跑!
  5. 详解 ManualResetEvent
  6. 学英语查单词:快乐英语,简单生活,why not Bing EngKoo!?
  7. c语言网页版在线编译器_梦幻西游网页版在线玩 梦幻西游网页版礼包兑换码_梦幻西游网页版...
  8. ppt讲解中的过渡_PPT「过渡页」怎么设计才好看?
  9. 导向滤波-Guided Image Filtering
  10. High-Resolution Net(HRNet) 论文笔记
  11. 鸡兔同笼问题的python解法
  12. 记一次面试(被骗)经历
  13. 这些喜闻乐见的Java面试知识点,你都掌握了吗?
  14. ogg initial同步
  15. 【python】纸箱抽奖
  16. 去百度还是去创新工厂?信开复还是信彦宏?
  17. 淘宝数据魔方技术架构解析读后感
  18. MySQL操作之视图
  19. 骚年,这简历一看就是包装过了
  20. 使用agg方法聚合数据

热门文章

  1. PB中关于GetChild的用法
  2. Retrofit中关于CallAdapter使用的设计模式分析
  3. LCD 3LCD DLP LED投影仪成像原理
  4. Windows资源监控工具大全
  5. 李永乐复习全书概率论与数理统计 第一、二章 随机事件和概率、随机变量及其概率分布
  6. 计算机硬盘空间不足怎么删,电脑d盘的空间不足怎么办_电脑怎么清空d盘-win7之家...
  7. 计算机二级链表,你知道吗?计算机二级考试公共基础知识冲刺复习笔记:线性链表、双向链表与循环链表...
  8. 鸿业负荷计算9.0打不开闪退完美解决方法
  9. 微信小程序--实时语音识别
  10. 长安大学计算机考研难度如何,长安大学考研难吗?一般要什么水平才可以进入?...