Storm的StreamID使用样例(版本1.0.2)
随手尝试了一下StreamID的的用法。留个笔记。
==数据样例==
{"Address": "小桥镇小桥中学对面","CityCode": "511300","CountyCode": "511322","EnterpriseCode": "YUNDA","MailNo": "667748320345","Mobile": "183****5451","Name": "王***","ProvCode": "510000","Weight": "39" }
==拓扑结构==
==程序源码==
<Spout1>
package test;import com.alibaba.fastjson.JSONObject; import common.constants.Constants; import common.simulate.DataRandom; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;import java.util.Map;public class Spout1 extends BaseRichSpout {private SpoutOutputCollector _collector = null;private DataRandom _dataRandom = null;private int _timeInterval = 1000;@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declareStream("Stream1", new Fields("json"));declarer.declareStream("Stream2", new Fields("json"));}@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {_collector = collector;_dataRandom = DataRandom.getInstance();if (conf.containsKey(Constants.SpoutInterval)) {_timeInterval = Integer.valueOf((String) conf.get(Constants.SpoutInterval));}}@Overridepublic void nextTuple() {try {Thread.sleep(_timeInterval);} catch (InterruptedException e) {e.printStackTrace();}JSONObject jsonObject = _dataRandom.getRandomExpressData();System.out.print("[---Spout1---]jsonObject=" + jsonObject + "\n");_collector.emit("Stream1", new Values(jsonObject.toJSONString()));_collector.emit("Stream2", new Values(jsonObject.toJSONString()));} }
<CountBolt1>
package test;import com.alibaba.fastjson.JSONObject; import common.constants.Constants; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;import java.util.HashMap; import java.util.Map;public class CountBolt1 extends BaseRichBolt {private OutputCollector _collector = null;private int taskId = 0;private Map<String, Integer> _map = new HashMap<>();@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declareStream("Stream3", new Fields("company", "count"));}@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {_collector = collector;taskId = context.getThisTaskId();}@Overridepublic void execute(Tuple input) {String str = input.getStringByField("json");JSONObject jsonObject = JSONObject.parseObject(str);String company = jsonObject.getString(Constants.EnterpriseCode);int count = 0;if (_map.containsKey(company)) {count = _map.get(company);}count++;_map.put(company, count);_collector.emit("Stream3", new Values(company, count));System.out.print("[---CountBolt1---]" +"taskId=" + taskId + ", company=" + company + ", count=" + count + "\n");} }
<CountBolt2>
package test;import com.alibaba.fastjson.JSONObject; import common.constants.Constants; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;import java.util.HashMap; import java.util.Map; import java.util.UUID;public class CountBolt2 extends BaseRichBolt {private OutputCollector _collector = null;private int _taskId = 0;private Map<String, Integer> _map = new HashMap<>();@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {_collector = outputCollector;_taskId = topologyContext.getThisTaskId();}@Overridepublic void execute(Tuple tuple) {String str = tuple.getStringByField("json");JSONObject jsonObject = JSONObject.parseObject(str);String prov = jsonObject.getString(Constants.ProvCode);int count = 0;if (_map.containsKey(prov)) {count = _map.get(prov);}count++;_map.put(prov, count);_collector.emit("Stream4", new Values(prov, count, UUID.randomUUID()));System.out.print("[---CountBolt2---]" +"taskId=" + _taskId + ", prov=" + prov + ", count=" + count + "\n");}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declareStream("Stream4", new Fields("prov", "count", "random"));} }
<CountBolt3>
package test;import com.alibaba.fastjson.JSONObject; import common.constants.Constants; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;import java.util.HashMap; import java.util.Map; import java.util.UUID;public class CountBolt3 extends BaseRichBolt {private OutputCollector _collector = null;private int _taskId = 0;private Map<String, Integer> _map = new HashMap<>();@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {_collector = outputCollector;_taskId = topologyContext.getThisTaskId();}@Overridepublic void execute(Tuple tuple) {String str = tuple.getStringByField("json");JSONObject jsonObject = JSONObject.parseObject(str);String city = jsonObject.getString(Constants.CityCode);int count = 0;if (_map.containsKey(city)) {count = _map.get(city);}count++;_map.put(city, count);_collector.emit("Stream4", new Values(city, count));System.out.print("[---CountBolt3---]" +"taskId=" + _taskId + ", city=" + city + ", count=" + count + "\n");}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declareStream("Stream4", new Fields("city", "count"));} }
<TopBolt>
package test;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple;import java.util.List; import java.util.Map;public class TopBolt extends BaseRichBolt {@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {}@Overridepublic void execute(Tuple tuple) {System.out.print("[---TopBolt---]StreamID=" + tuple.getSourceStreamId() + "\n");List<Object> values = tuple.getValues();for(Object value : values) {System.out.print("[---TopBolt---]value=" + value + "\n");}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {} }
<TestTopology>
package test;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields;public class TestTopology {public static void main(String[] args)throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("Spout1", new Spout1());builder.setBolt("Count1", new CountBolt1()).shuffleGrouping("Spout1", "Stream1");builder.setBolt("Count2", new CountBolt2()).shuffleGrouping("Spout1", "Stream2");builder.setBolt("Count3", new CountBolt3()).shuffleGrouping("Spout1", "Stream2");builder.setBolt("Top", new TopBolt()).fieldsGrouping("Count1", "Stream3", new Fields("company")).fieldsGrouping("Count2", "Stream4", new Fields("prov")).fieldsGrouping("Count3", "Stream4", new Fields("city"));Config config = new Config();config.setNumWorkers(1);config.put(common.constants.Constants.SpoutInterval, args[1]);if (Boolean.valueOf(args[0])) {StormSubmitter.submitTopology("TestTopology1", config, builder.createTopology());} else {LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("TestTopology1", config, builder.createTopology());}} }
==结果日志==
[---Spout1---]jsonObject={"CityCode":"511300","CountyCode":"511322","Address":"小桥镇小桥中学对面","MailNo":"667748320345","ProvCode":"510000","Mobile":"183****5451","EnterpriseCode":"YUNDA","Weight":"39","Name":"王***"} [---CountBolt1---]taskId=1, company=YUNDA, count=1 [---CountBolt3---]taskId=3, city=511300, count=1 [---CountBolt2---]taskId=2, prov=510000, count=1 [---TopBolt---]StreamID=Stream4 [---TopBolt---]value=510000 [---TopBolt---]value=1 [---TopBolt---]value=99bd1cdb-d5c1-4ac8-b1a1-a4cfffb5a616 [---TopBolt---]StreamID=Stream4 [---TopBolt---]value=511300 [---TopBolt---]value=1 [---TopBolt---]StreamID=Stream3 [---TopBolt---]value=YUNDA [---TopBolt---]value=1
转载于:https://www.cnblogs.com/quchunhui/p/8302192.html
Storm的StreamID使用样例(版本1.0.2)相关推荐
- EasyX图形库安装,以及使用样例(vc6.0,vs2013,其他类同)
①官网下载 ②解压安装 (由于自己电脑安装了vc6.0 和vs2013以该两个为例,其他都是一样的安装方法) ③图形库测试 利用图形库画星空 (l编译器vs 2013) #include<std ...
- spark mllib lda 中文分词、主题聚合基本样例
github https://github.com/cclient/spark-lda-example spark mllib lda example 官方示例较为精简 在官方lda示例的基础上,给合 ...
- PTA | 实验二 | PTA综合实验参考样例 | C++/C语言OJ练习题
为鼓励居民节约用水,自来水公司采取按用水量阶梯式计价的办法,居民应交水费y(元)与月用水量x(吨)的关系如下,请编写程序实现水费的计算. 输入格式: 输入在一行中给出实数x. 输出格式: 按照以下的格 ...
- 微软发布了Visual Stduio 2010 RTM版本的虚拟机vhd文件,包含样例和动手实验(免费)...
原文: hthttp://www.almnetworks.net/zh-CN/post/2010/06/30/Now-Available-Visual-Studio-2010-RTM-Virtual- ...
- VOT-toolkit Python 版本使用教程--官方样例版
1.下载与安装 不成功的话,用以下方案: 附:如果你以这种方式,可以查看./build/lib/vot/utilities/cli.py下面的输入参数的含义. 这是vot-toolkit定义输入参数的 ...
- YOLOv4 资源环境配置和测试样例效果
YOLOv4 资源环境配置和测试样例效果 基本环境:cuda=10.0,cudnn>=7.0, opencv>=2.4 一.下载yolov4 git clone https://githu ...
- 【ZooKeeper Notes 3】ZooKeeper Java API 使用样例
查看PDF版本 转载请注明:@ni掌柜 nileader@gmail.com ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务框架,包含一组简单的原语集合.通过这些原语言的组合使用, ...
- Android NDK基础样例
Android NDK基础样例 NDK(Native Development Kit),用C/C++封装一些东西?好像就这么理解好了== 一.环境准备 这个好讨厌==!因为我环境都已经搭了很久了. 已 ...
- HLS-Demo IOS 视屏直播样例
from: https://github.com/yangchao0033/HLS-Demo/blob/master/README.md demo简介:如果觉得文章有用的话,请读者在github上点个 ...
最新文章
- [摘录]第8章 与非美国人谈判的技巧
- Nginx Rewrite详解
- 生鲜在卖场中的六大类别
- android 清屏函数,浅谈android截屏问题
- cmake打包ICONV库
- Python读写txt
- Java中堆内存和栈内存详解(转)
- Hibernate--Criteria Query and DetachedCriteria
- java 泛型(generics)使用总结
- 2012.4.13总结(一)
- 定位CPU异常抖动---tomcat热部署的坑[转载]
- WorkerMan源码分析 - 实现最简单的原型
- 腾讯X5 内核 的导入
- 石油化工行业的MES系统解决方案
- 携程线上测评测试题目,答案解析
- one class classification
- 王宝强代言计算机学校,《破晓屠龙》王宝强代言传奇游戏电脑手机都能玩
- 解决SQL Server报错:229、262、5123
- 【开源】司马编译器结构
- 电商秒杀系统设计分析
热门文章
- 敏捷测试2015新看点
- linux病毒sfewfesfs
- 【教程】Cubieboard变苹果无线airplay音响
- [转]RDLC报表-参数传递及主从报表
- Swift中的一致性哈希算法(补充)
- Jmeter 命令行选项目录
- Hibernate的各种保存方式的区别
- 【GitLab】gitlab上配置webhook后,点击测试报错:Requests to the local network are not allowed...
- 《Spring Cloud与Docker微服务架构实战》配套代码
- 对OCR文字识别软件进行自动分析和识别设置的教程