Storm的WordCount案例(spout bolt详细总结)
spout介绍
一个spout是由流组成的数据源在storm的拓扑里,通常情况下会读取外部的数据源
然后emit(发射)到拓扑里面,比如是kafka,MySQL或者redis等等,Spout有两种实现一种是可靠的消息实现,如果发送失败则会重试,另外一种是不可靠的消息实现可能会出现消息丢失,spout可以一次声明多个数据流通过OutputFieldsDeclarer类的declareStream方法,当然前提是你的SpoutOutputCollector里的emit也是多个流
Spout里面主要的方法是nextTuple,它里面可以发射新的tuple到拓扑,或者当没有消息的时候就return,需要注意,这个方法里面不能阻塞,因为storm调用spout方法是单线程的,其他的主要方法是ack和fail,如果使用了可靠的spout,可以使用ack和fail来确定消息发送状态
相关扩展:
IRichSpout:spout类必须实现的接口
BaseRichSpout :可靠的spout有ack确保
BaseBasicSpout :不可靠的spout
1.Spout组件:创建Spout(WordCountSpout)组件采集数据,作为整个Topology的数据源
WordCountSpout:
package storm;import lombok.SneakyThrows;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;/*** @author: wtl* @License: (C) Copyright 2020, wtl Corporation Limited.* @Contact: 1050100468@qq.com* @Date: 2020/11/27 5:14* @Version: 1.0* @Description:*/
public class WordCountSpout implements IRichSpout {private SpoutOutputCollector spoutOutputCollector;private String [] lines = {"I love you","hello world","hello kitty","nihao beijing"};private Random random = new Random();@Overridepublic void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.spoutOutputCollector = spoutOutputCollector;}@Overridepublic void close() {}@Overridepublic void activate() {}@Overridepublic void deactivate() {}@SneakyThrows@Overridepublic void nextTuple() {int nextInt = random.nextInt(lines.length);spoutOutputCollector.emit(new Values(lines[nextInt]));TimeUnit.SECONDS.sleep(1);}@Overridepublic void ack(Object o) {}@Overridepublic void fail(Object o) {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("line"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}
}
bolt介绍
Bolts 业务处理单元
所有的拓扑处理都会在bolt中进行,bolt里面可以做任何etl,比如过滤,函数,聚合,连接,写入数据库系统或缓存等,一个bolt可以做简单的事件流转换,如果是复杂的流转化,往往需要多个bolt参与,这就是流计算,每个bolt都进行一个业务逻辑处理,bolt也可以emit多个流到下游,通过declareStream方法声明输出的schema。
Bolt里面主要的方法是execute方法,每次处理一个输入的tuple,bolt里面也可以发射新的tuple使用OutputCollector类,bolt里面每处理一个tuple必须调用ack方法以便于storm知道某个tuple何时处理完成。Strom里面的IBasicBolt接口可以自动
调用ack。
相关拓展:
IRichBolt:bolts的通用接口
IBasicBolt:扩展的bolt接口,可以自动处理ack
OutputCollector:bolt发射tuple到下游bolt里面
2.Bolt组件1:创建Bolt(WordCountSplitBolt)组件进行分词操作
SplitBolt:
package storm;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.Map;
import java.util.stream.Stream;/*** @author: wtl* @License: (C) Copyright 2020, wtl Corporation Limited.* @Contact: 1050100468@qq.com* @Date: 2020/11/27 5:41* @Version: 1.0* @Description:*/
public class SplitBolt implements IRichBolt {private TopologyContext topologyContext;private OutputCollector outputCollector;@Overridepublic void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {this.topologyContext = topologyContext;this.outputCollector = outputCollector;}@Overridepublic void execute(Tuple tuple) {String line = tuple.getStringByField("line");String[] splits = line.split(" ");Stream.of(splits).forEach(word -> {outputCollector.emit(new Values(word,1));});}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("word","count"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}
}
3.Bolt组件2:创建Bolt(WordCountTotalBolt)组件进行单词统计操作
TotalBolt:
package storm;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.HashMap;
import java.util.Map;/*** @author: wtl* @License: (C) Copyright 2020, wtl Corporation Limited.* @Contact: 1050100468@qq.com* @Date: 2020/11/27 5:57* @Version: 1.0* @Description:*/
public class TotalBolt implements IRichBolt {private TopologyContext topologyContext;private OutputCollector outputCollector;private Map<String,Integer> map;@Overridepublic void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {this.topologyContext = topologyContext;this.outputCollector = outputCollector;this.map = new HashMap<>();}@Overridepublic void execute(Tuple tuple) {String word = tuple.getStringByField("word");Integer count = tuple.getIntegerByField("count");if (!map.containsKey(word)){map.put(word,count);}else{map.put(word,map.get(word) + count);}map.forEach((key, value) -> {System.out.println(key + ": " + value);});}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("word","total"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}
}
4.Topology主程序:
WordCountTopology:
package storm;import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;/*** @author: wtl* @License: (C) Copyright 2020, wtl Corporation Limited.* @Contact: 1050100468@qq.com* @Date: 2020/11/27 6:06* @Version: 1.0* @Description:*/
public class WordCountTopology {public static void main(String[] args) throws Exception {TopologyBuilder topologyBuilder = new TopologyBuilder();topologyBuilder.setSpout("wcSpout",new WordCountSpout(),1);topologyBuilder.setBolt("wcSplitBolt",new SplitBolt(),1).shuffleGrouping("wcSpout");topologyBuilder.setBolt("wcTotalBolt",new TotalBolt(),1).fieldsGrouping("wcSplitBolt",new Fields("word"));//创建任务StormTopology job = topologyBuilder.createTopology();Config config = new Config();//运行任务有两种模式//1 本地模式 2 集群模式//1、本地模式LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("MyWordCount",config,job);}
}
Storm的WordCount案例(spout bolt详细总结)相关推荐
- BigData之Storm:Apache Storm的简介、深入理解、下载、案例应用之详细攻略
BigData之Storm:Apache Storm的简介.深入理解.下载.案例应用之详细攻略 目录 Apache Storm的简介 Apache Storm的深入理解 1.Storm与hadoop ...
- Storm精华问答 | 如何理解spout/bolt的生命周期?
戳蓝字"CSDN云计算"关注我们哦! Storm是Twitter开源的分布式实时大数据处理框架,被业界称为实时版Hadoop.随着越来越多的场景对Hadoop的MapReduce高 ...
- MapReduce入门(一)—— MapReduce概述 + WordCount案例实操
MapReduce入门(一)-- MapReduce概述 文章目录 MapReduce入门(一)-- MapReduce概述 1.1 MapReduce 定义 1.2 MapReduce 优缺点 1. ...
- Spark快速上手-WordCount案例
在此之前,我已经用MapReduce 框架实现了WordCount案例,接下来,我开始学习数据处理的另外一个非常重要的方法:Spark.首先,使用WordCount案例实现Spark快速上手. 创建M ...
- WordCount案例
WordCount案例 需求 1. 需求说明 2. 文件 案例分析 1.需求分析 2.输入数据 3.期望输出数据 4.Mapper类 5. Reducer类 6. Driver类 代码实现 1. 编写 ...
- Database之SQL:SQL之over partition by开窗函数的简介、使用方法(求各班级内各自排名/求各班级内第一名/求各班级内分数递增和等案例解析)之详细攻略
Database之SQL:SQL之over partition by开窗函数的简介.使用方法(求各班级内各自排名/求各班级内第一名/求各班级内分数递增和等案例解析)之详细攻略 目录 over part ...
- ML之ME/LF:机器学习之风控业务中常用模型评估指标PSI(人群偏移度指标)的的简介、使用方法、案例应用之详细攻略
ML之ME/LF:机器学习之风控业务中常用模型评估指标PSI(人群偏移度指标)的的简介.使用方法.案例应用之详细攻略 目录 PSI(稳定度指标)的简介 1.如何计算PSI? (1).PSI计算过程
- Dataset:Big Mart Sales数据集的简介、下载、案例应用之详细攻略
Dataset:Big Mart Sales数据集的简介.下载.案例应用之详细攻略 目录 Big Mart Sales数据集的简介 1.特征解释 Big Mart Sales数据集的下载 Big Ma ...
- Python语言学习之lambda:lambda函数的简介、使用方法、案例大全之详细攻略
Python语言学习之lambda:lambda函数的简介.使用方法.案例大全之详细攻略 目录 lambda函数的简介 1.lambda匿名函数的格式 2.lambda函数特点 3.lambda函数与 ...
最新文章
- GO 从零开始的语法学习二
- 中央处理器属于计算机外部设备吗,不属于计算机外部设备的是
- 2015-2020年各类国际会议与期刊基于图像的三维对象重建论文综述(7)——Datasets
- 惊呆!这辆悍马自己在跑跑跑跑跑跑跑!
- 详解 ManualResetEvent
- 学英语查单词:快乐英语,简单生活,why not Bing EngKoo!?
- c语言网页版在线编译器_梦幻西游网页版在线玩 梦幻西游网页版礼包兑换码_梦幻西游网页版...
- ppt讲解中的过渡_PPT「过渡页」怎么设计才好看?
- 导向滤波-Guided Image Filtering
- High-Resolution Net(HRNet) 论文笔记
- 鸡兔同笼问题的python解法
- 记一次面试(被骗)经历
- 这些喜闻乐见的Java面试知识点,你都掌握了吗?
- ogg initial同步
- 【python】纸箱抽奖
- 去百度还是去创新工厂?信开复还是信彦宏?
- 淘宝数据魔方技术架构解析读后感
- MySQL操作之视图
- 骚年,这简历一看就是包装过了
- 使用agg方法聚合数据
热门文章
- PB中关于GetChild的用法
- Retrofit中关于CallAdapter使用的设计模式分析
- LCD 3LCD DLP LED投影仪成像原理
- Windows资源监控工具大全
- 李永乐复习全书概率论与数理统计 第一、二章 随机事件和概率、随机变量及其概率分布
- 计算机硬盘空间不足怎么删,电脑d盘的空间不足怎么办_电脑怎么清空d盘-win7之家...
- 计算机二级链表,你知道吗?计算机二级考试公共基础知识冲刺复习笔记:线性链表、双向链表与循环链表...
- 鸿业负荷计算9.0打不开闪退完美解决方法
- 微信小程序--实时语音识别
- 长安大学计算机考研难度如何,长安大学考研难吗?一般要什么水平才可以进入?...