本文主要研究一下storm的stream的分流与合并

实例

    @Testpublic void testStreamSplitJoin() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("sentence-spout", new SentenceSpout());// SentenceSpout --> SplitStreamBoltbuilder.setBolt("split-bolt", new SplitStreamBolt()).shuffleGrouping("sentence-spout");// SplitStreamBolt split two stream --> WordCountBolt//NOTE 这里要指定上游的bolt以及要处理的streamIdbuilder.setBolt("long-word-count-bolt", new CountStreamBolt(),5).shuffleGrouping("split-bolt","longWordStream");builder.setBolt("short-word-count-bolt", new CountStreamBolt(),5).shuffleGrouping("split-bolt","shortWordStream");// WordCountBolt join --> ReportBoltbuilder.setBolt("report-bolt", new ReportBolt()).shuffleGrouping("long-word-count-bolt").shuffleGrouping("short-word-count-bolt");submitRemote(builder);}
  • 这里在SplitStreamBolt里头将stream分为两个,之后有两个CountStreamBolt分别处理两个stream的数据,最后归到同一个stream由ReportBolt消费tuple

SplitStreamBolt

public class SplitStreamBolt extends BaseRichBolt {private static final Logger LOGGER = LoggerFactory.getLogger(SplitStreamBolt.class);private OutputCollector collector;public void prepare(Map config, TopologyContext context, OutputCollector collector) {this.collector = collector;}//NOTE 这里要自己ackpublic void execute(Tuple tuple) {String sentence = tuple.getStringByField("sentence");String[] words = sentence.split(" ");for(String word : words){// NOTE 这里指定发送给指定streamIdif(word.length() > 4){this.collector.emit("longWordStream",new Values(word));}else{this.collector.emit("shortWordStream",new Values(word));}}this.collector.ack(tuple);}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));//NOTE 这里通过declareStream声明direct stream,并指定streamIddeclarer.declareStream("longWordStream",true,new Fields("word"));declarer.declareStream("shortWordStream",true,new Fields("word"));}
}
  • 这里额外声明了两个stream,一个是longWordStream,一个是shortWordStream
  • 对于word长度大于4的发送到longWordStream,小于等于4的发送到longWordStream

CountStreamBolt

public class CountStreamBolt extends BaseBasicBolt{private static final Logger LOGGER = LoggerFactory.getLogger(CountStreamBolt.class);Map<String, Integer> longWordCounts = new HashMap<String, Integer>();Map<String, Integer> shortWordCounts = new HashMap<String, Integer>();@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {String sourceStreamId = input.getSourceStreamId();String word = input.getString(0);if(sourceStreamId.equals("longWordStream")){Integer count = longWordCounts.get(word);if (count == null) count = 0;count++;longWordCounts.put(word, count);LOGGER.info("long word:{} -> {}",word,count);collector.emit(new Values(word, count));return ;}if(sourceStreamId.equals("shortWordStream")){Integer count = shortWordCounts.get(word);if (count == null) count = 0;count++;shortWordCounts.put(word, count);LOGGER.info("short word:{} -> {}",word,count);collector.emit(new Values(word, count));return ;}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}
}
  • 这里为了展示sourceStreamId的区别,所以两个stream共用了同一个bolt,但是topology那里是两个实例
  • 实际也可以是两个不同的bolt类来处理两个stream的数据

小结

  • OutputFieldsDeclarer可以通过declareStream方法声明多个streamId
  • OutputCollector可以通过emit(String streamId, List<Object> tuple)方法来选择性将tuple发送到指定的streamId
  • OutputCollector也有emit方法参数没有streamId,其内部默认是使用Utils.DEFAULT_STREAM_ID(default)作为实际的streamId

doc

  • How to split one stream or join multiple stream

聊聊storm的stream的分流与合并相关推荐

  1. 聊聊storm的AggregateProcessor的execute及finishBatch方法

    序 本文主要研究一下storm的AggregateProcessor的execute及finishBatch方法 实例 TridentTopology topology = new TridentTo ...

  2. 聊聊storm的LoggingClusterMetricsConsumer

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下storm的LoggingClusterMetricsConsumer LoggingClusterMetrics ...

  3. 聊聊storm TridentBoltExecutor的finishBatch方法

    序 本文主要研究一下storm TridentBoltExecutor的finishBatch方法 MasterBatchCoordinator.nextTuple storm-core-1.2.2- ...

  4. 聊聊storm的PartialKeyGrouping

    序 本文主要研究一下storm的PartialKeyGrouping 实例 @Testpublic void testPartialKeyGrouping() throws InvalidTopolo ...

  5. Java8 Stream应用:Map合并、过滤、遍历、values int求和等

    1. Java多个Map合并 // 多个Map<Long,Integer>, 根据key相同的,value累积求和: public static Map mapCombine(List&l ...

  6. 聊聊storm的direct grouping

    序 本文主要研究一下storm的direct grouping direct grouping direct grouping是一种特殊的grouping,它是由上游的producer直接指定下游哪个 ...

  7. java 两个stream合并_Java Stream 流如何进行合并操作

    1. 前言 Java Stream Api 提供了很多有用的 Api 让我们很方便将集合或者多个同类型的元素转换为流进行操作.今天我们来看看如何合并 Stream 流. 2. Stream 流的合并 ...

  8. mapper文件cant resolve param_Nodejs 中基于 Stream 的多文件合并实现

    本文先从一个 Stream 的基本示例开始,有个初步认识,中间会讲在 Stream 中什么时候会出现内存泄漏,及如何避免最后基于 Nodejs 中的 Stream 实现一个多文件合并为一个文件的例子. ...

  9. 聊聊storm的CheckpointSpout

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下storm的CheckpointSpout TopologyBuilder storm-2.0.0/storm-c ...

最新文章

  1. 网络学习:VLAN和独臂路由
  2. 好系统U盘启动来说一说win10系统有必要更新到最新版本吗?
  3. VMware Fusion下的虚拟机绑定地址
  4. 【数字信号处理】基本序列 ( 基本序列列举 | 单位脉冲序列 | 单位脉冲函数 | 离散单位脉冲函数 | 单位脉冲函数 与 离散单位脉冲函数的区别 )
  5. python将图像转换为8位单通道_【图像处理】OpenCV系列三十五--- equalizeHist函数详解...
  6. 在Debian 上安装php zip扩展
  7. io.js 1.0.x发布
  8. 由Google Protocol Buffer的小例子引起的g++编译问题
  9. 什么是创新?如何创新?
  10. composer php 打包图片,composer 打包到 packagist
  11. ES6 async函数(超级详细、易懂)
  12. Claris FileMaker Pro更新至19.2.1.14中文版
  13. 一篇文章教你用 java爬虫 下载全站视频
  14. 计算机函数公式发生额总计,16个Excel函数公式,解决会计工作中80﹪的难题!
  15. SPSS 27 发布了!我为什么要在两个月前买SPSS?为什么?为什么?为什么?
  16. 差分探头和隔离探头有什么区别
  17. html渐变编织背景,CSS hover背景/文字渐变效果
  18. 机器人 陆梅东_第十八届全国中小学组电脑制作活动上海赛区.DOC
  19. c#(WinForm)绘制两个圆的内公切线
  20. 这些选择器你都知道吗?

热门文章

  1. 【转载】Linux下安装、配置、启动Apache
  2. 匹配3位或4位区号的电话号码,其中区号可以用小括号括起来,也可以不用,区号与本地号间可以用连字号或空格间隔,也可以没有间隔...
  3. JavaScript Document
  4. 如何在Terminal命令行模式下运行Objective-C
  5. Apache反向代理设置【转载】
  6. web集群时session同步的3种方法
  7. C#创建和调用DLL
  8. Python3中typing模块介绍
  9. C++中局部类的使用
  10. shell python优势_python的优势