聊聊storm的stream的分流与合并
序
本文主要研究一下storm的stream的分流与合并
实例
@Testpublic void testStreamSplitJoin() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("sentence-spout", new SentenceSpout());// SentenceSpout --> SplitStreamBoltbuilder.setBolt("split-bolt", new SplitStreamBolt()).shuffleGrouping("sentence-spout");// SplitStreamBolt split two stream --> WordCountBolt//NOTE 这里要指定上游的bolt以及要处理的streamIdbuilder.setBolt("long-word-count-bolt", new CountStreamBolt(),5).shuffleGrouping("split-bolt","longWordStream");builder.setBolt("short-word-count-bolt", new CountStreamBolt(),5).shuffleGrouping("split-bolt","shortWordStream");// WordCountBolt join --> ReportBoltbuilder.setBolt("report-bolt", new ReportBolt()).shuffleGrouping("long-word-count-bolt").shuffleGrouping("short-word-count-bolt");submitRemote(builder);}
- 这里在SplitStreamBolt里头将stream分为两个,之后有两个CountStreamBolt分别处理两个stream的数据,最后归到同一个stream由ReportBolt消费tuple
SplitStreamBolt
public class SplitStreamBolt extends BaseRichBolt {private static final Logger LOGGER = LoggerFactory.getLogger(SplitStreamBolt.class);private OutputCollector collector;public void prepare(Map config, TopologyContext context, OutputCollector collector) {this.collector = collector;}//NOTE 这里要自己ackpublic void execute(Tuple tuple) {String sentence = tuple.getStringByField("sentence");String[] words = sentence.split(" ");for(String word : words){// NOTE 这里指定发送给指定streamIdif(word.length() > 4){this.collector.emit("longWordStream",new Values(word));}else{this.collector.emit("shortWordStream",new Values(word));}}this.collector.ack(tuple);}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));//NOTE 这里通过declareStream声明direct stream,并指定streamIddeclarer.declareStream("longWordStream",true,new Fields("word"));declarer.declareStream("shortWordStream",true,new Fields("word"));}
}
- 这里额外声明了两个stream,一个是longWordStream,一个是shortWordStream
- 对于word长度大于4的发送到longWordStream,小于等于4的发送到longWordStream
CountStreamBolt
public class CountStreamBolt extends BaseBasicBolt{private static final Logger LOGGER = LoggerFactory.getLogger(CountStreamBolt.class);Map<String, Integer> longWordCounts = new HashMap<String, Integer>();Map<String, Integer> shortWordCounts = new HashMap<String, Integer>();@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {String sourceStreamId = input.getSourceStreamId();String word = input.getString(0);if(sourceStreamId.equals("longWordStream")){Integer count = longWordCounts.get(word);if (count == null) count = 0;count++;longWordCounts.put(word, count);LOGGER.info("long word:{} -> {}",word,count);collector.emit(new Values(word, count));return ;}if(sourceStreamId.equals("shortWordStream")){Integer count = shortWordCounts.get(word);if (count == null) count = 0;count++;shortWordCounts.put(word, count);LOGGER.info("short word:{} -> {}",word,count);collector.emit(new Values(word, count));return ;}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}
}
- 这里为了展示sourceStreamId的区别,所以两个stream共用了同一个bolt,但是topology那里是两个实例
- 实际也可以是两个不同的bolt类来处理两个stream的数据
小结
- OutputFieldsDeclarer可以通过declareStream方法声明多个streamId
- OutputCollector可以通过emit(String streamId, List<Object> tuple)方法来选择性将tuple发送到指定的streamId
- OutputCollector也有emit方法参数没有streamId,其内部默认是使用Utils.DEFAULT_STREAM_ID(
default
)作为实际的streamId
doc
- How to split one stream or join multiple stream
聊聊storm的stream的分流与合并相关推荐
- 聊聊storm的AggregateProcessor的execute及finishBatch方法
序 本文主要研究一下storm的AggregateProcessor的execute及finishBatch方法 实例 TridentTopology topology = new TridentTo ...
- 聊聊storm的LoggingClusterMetricsConsumer
为什么80%的码农都做不了架构师?>>> 序 本文主要研究一下storm的LoggingClusterMetricsConsumer LoggingClusterMetrics ...
- 聊聊storm TridentBoltExecutor的finishBatch方法
序 本文主要研究一下storm TridentBoltExecutor的finishBatch方法 MasterBatchCoordinator.nextTuple storm-core-1.2.2- ...
- 聊聊storm的PartialKeyGrouping
序 本文主要研究一下storm的PartialKeyGrouping 实例 @Testpublic void testPartialKeyGrouping() throws InvalidTopolo ...
- Java8 Stream应用:Map合并、过滤、遍历、values int求和等
1. Java多个Map合并 // 多个Map<Long,Integer>, 根据key相同的,value累积求和: public static Map mapCombine(List&l ...
- 聊聊storm的direct grouping
序 本文主要研究一下storm的direct grouping direct grouping direct grouping是一种特殊的grouping,它是由上游的producer直接指定下游哪个 ...
- java 两个stream合并_Java Stream 流如何进行合并操作
1. 前言 Java Stream Api 提供了很多有用的 Api 让我们很方便将集合或者多个同类型的元素转换为流进行操作.今天我们来看看如何合并 Stream 流. 2. Stream 流的合并 ...
- mapper文件cant resolve param_Nodejs 中基于 Stream 的多文件合并实现
本文先从一个 Stream 的基本示例开始,有个初步认识,中间会讲在 Stream 中什么时候会出现内存泄漏,及如何避免最后基于 Nodejs 中的 Stream 实现一个多文件合并为一个文件的例子. ...
- 聊聊storm的CheckpointSpout
为什么80%的码农都做不了架构师?>>> 序 本文主要研究一下storm的CheckpointSpout TopologyBuilder storm-2.0.0/storm-c ...
最新文章
- 网络学习:VLAN和独臂路由
- 好系统U盘启动来说一说win10系统有必要更新到最新版本吗?
- VMware Fusion下的虚拟机绑定地址
- 【数字信号处理】基本序列 ( 基本序列列举 | 单位脉冲序列 | 单位脉冲函数 | 离散单位脉冲函数 | 单位脉冲函数 与 离散单位脉冲函数的区别 )
- python将图像转换为8位单通道_【图像处理】OpenCV系列三十五--- equalizeHist函数详解...
- 在Debian 上安装php zip扩展
- io.js 1.0.x发布
- 由Google Protocol Buffer的小例子引起的g++编译问题
- 什么是创新?如何创新?
- composer php 打包图片,composer 打包到 packagist
- ES6 async函数(超级详细、易懂)
- Claris FileMaker Pro更新至19.2.1.14中文版
- 一篇文章教你用 java爬虫 下载全站视频
- 计算机函数公式发生额总计,16个Excel函数公式,解决会计工作中80﹪的难题!
- SPSS 27 发布了!我为什么要在两个月前买SPSS?为什么?为什么?为什么?
- 差分探头和隔离探头有什么区别
- html渐变编织背景,CSS hover背景/文字渐变效果
- 机器人 陆梅东_第十八届全国中小学组电脑制作活动上海赛区.DOC
- c#(WinForm)绘制两个圆的内公切线
- 这些选择器你都知道吗?