随手尝试了一下StreamID的的用法。留个笔记。

==数据样例==

{"Address": "小桥镇小桥中学对面","CityCode": "511300","CountyCode": "511322","EnterpriseCode": "YUNDA","MailNo": "667748320345","Mobile": "183****5451","Name": "王***","ProvCode": "510000","Weight": "39"
}

==拓扑结构==

==程序源码==

<Spout1>

package test;import com.alibaba.fastjson.JSONObject;
import common.constants.Constants;
import common.simulate.DataRandom;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;import java.util.Map;public class Spout1 extends BaseRichSpout {private SpoutOutputCollector _collector = null;private DataRandom _dataRandom = null;private int _timeInterval = 1000;@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declareStream("Stream1", new Fields("json"));declarer.declareStream("Stream2", new Fields("json"));}@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {_collector = collector;_dataRandom = DataRandom.getInstance();if (conf.containsKey(Constants.SpoutInterval)) {_timeInterval = Integer.valueOf((String) conf.get(Constants.SpoutInterval));}}@Overridepublic void nextTuple() {try {Thread.sleep(_timeInterval);} catch (InterruptedException e) {e.printStackTrace();}JSONObject jsonObject = _dataRandom.getRandomExpressData();System.out.print("[---Spout1---]jsonObject=" + jsonObject + "\n");_collector.emit("Stream1", new Values(jsonObject.toJSONString()));_collector.emit("Stream2", new Values(jsonObject.toJSONString()));}
}

<CountBolt1>

package test;import com.alibaba.fastjson.JSONObject;
import common.constants.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.HashMap;
import java.util.Map;public class CountBolt1 extends BaseRichBolt {private OutputCollector _collector = null;private int taskId = 0;private Map<String, Integer> _map = new HashMap<>();@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declareStream("Stream3", new Fields("company", "count"));}@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {_collector = collector;taskId = context.getThisTaskId();}@Overridepublic void execute(Tuple input) {String str = input.getStringByField("json");JSONObject jsonObject = JSONObject.parseObject(str);String company = jsonObject.getString(Constants.EnterpriseCode);int count = 0;if (_map.containsKey(company)) {count = _map.get(company);}count++;_map.put(company, count);_collector.emit("Stream3", new Values(company, count));System.out.print("[---CountBolt1---]" +"taskId=" + taskId + ", company=" + company + ", count=" + count + "\n");}
}

<CountBolt2>

package test;import com.alibaba.fastjson.JSONObject;
import common.constants.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.HashMap;
import java.util.Map;
import java.util.UUID;public class CountBolt2 extends BaseRichBolt {private OutputCollector _collector = null;private int _taskId = 0;private Map<String, Integer> _map = new HashMap<>();@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {_collector = outputCollector;_taskId = topologyContext.getThisTaskId();}@Overridepublic void execute(Tuple tuple) {String str = tuple.getStringByField("json");JSONObject jsonObject = JSONObject.parseObject(str);String prov = jsonObject.getString(Constants.ProvCode);int count = 0;if (_map.containsKey(prov)) {count = _map.get(prov);}count++;_map.put(prov, count);_collector.emit("Stream4", new Values(prov, count, UUID.randomUUID()));System.out.print("[---CountBolt2---]" +"taskId=" + _taskId + ", prov=" + prov + ", count=" + count + "\n");}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declareStream("Stream4", new Fields("prov", "count", "random"));}
}

<CountBolt3>

package test;import com.alibaba.fastjson.JSONObject;
import common.constants.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.HashMap;
import java.util.Map;
import java.util.UUID;public class CountBolt3 extends BaseRichBolt {private OutputCollector _collector = null;private int _taskId = 0;private Map<String, Integer> _map = new HashMap<>();@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {_collector = outputCollector;_taskId = topologyContext.getThisTaskId();}@Overridepublic void execute(Tuple tuple) {String str = tuple.getStringByField("json");JSONObject jsonObject = JSONObject.parseObject(str);String city = jsonObject.getString(Constants.CityCode);int count = 0;if (_map.containsKey(city)) {count = _map.get(city);}count++;_map.put(city, count);_collector.emit("Stream4", new Values(city, count));System.out.print("[---CountBolt3---]" +"taskId=" + _taskId + ", city=" + city + ", count=" + count + "\n");}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declareStream("Stream4", new Fields("city", "count"));}
}

<TopBolt>

package test;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;import java.util.List;
import java.util.Map;public class TopBolt extends BaseRichBolt {@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {}@Overridepublic void execute(Tuple tuple) {System.out.print("[---TopBolt---]StreamID=" + tuple.getSourceStreamId() + "\n");List<Object> values = tuple.getValues();for(Object value : values) {System.out.print("[---TopBolt---]value=" + value + "\n");}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {}
}

<TestTopology>

package test;import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;public class TestTopology {public static void main(String[] args)throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("Spout1", new Spout1());builder.setBolt("Count1", new CountBolt1()).shuffleGrouping("Spout1", "Stream1");builder.setBolt("Count2", new CountBolt2()).shuffleGrouping("Spout1", "Stream2");builder.setBolt("Count3", new CountBolt3()).shuffleGrouping("Spout1", "Stream2");builder.setBolt("Top", new TopBolt()).fieldsGrouping("Count1", "Stream3", new Fields("company")).fieldsGrouping("Count2", "Stream4", new Fields("prov")).fieldsGrouping("Count3", "Stream4", new Fields("city"));Config config = new Config();config.setNumWorkers(1);config.put(common.constants.Constants.SpoutInterval, args[1]);if (Boolean.valueOf(args[0])) {StormSubmitter.submitTopology("TestTopology1", config, builder.createTopology());} else {LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("TestTopology1", config, builder.createTopology());}}
}

==结果日志==

[---Spout1---]jsonObject={"CityCode":"511300","CountyCode":"511322","Address":"小桥镇小桥中学对面","MailNo":"667748320345","ProvCode":"510000","Mobile":"183****5451","EnterpriseCode":"YUNDA","Weight":"39","Name":"王***"}
[---CountBolt1---]taskId=1, company=YUNDA, count=1
[---CountBolt3---]taskId=3, city=511300, count=1
[---CountBolt2---]taskId=2, prov=510000, count=1
[---TopBolt---]StreamID=Stream4
[---TopBolt---]value=510000
[---TopBolt---]value=1
[---TopBolt---]value=99bd1cdb-d5c1-4ac8-b1a1-a4cfffb5a616
[---TopBolt---]StreamID=Stream4
[---TopBolt---]value=511300
[---TopBolt---]value=1
[---TopBolt---]StreamID=Stream3
[---TopBolt---]value=YUNDA
[---TopBolt---]value=1

转载于:https://www.cnblogs.com/quchunhui/p/8302192.html

Storm的StreamID使用样例(版本1.0.2)相关推荐

  1. EasyX图形库安装,以及使用样例(vc6.0,vs2013,其他类同)

    ①官网下载 ②解压安装 (由于自己电脑安装了vc6.0 和vs2013以该两个为例,其他都是一样的安装方法) ③图形库测试 利用图形库画星空 (l编译器vs 2013) #include<std ...

  2. spark mllib lda 中文分词、主题聚合基本样例

    github https://github.com/cclient/spark-lda-example spark mllib lda example 官方示例较为精简 在官方lda示例的基础上,给合 ...

  3. PTA | 实验二 | PTA综合实验参考样例 | C++/C语言OJ练习题

    为鼓励居民节约用水,自来水公司采取按用水量阶梯式计价的办法,居民应交水费y(元)与月用水量x(吨)的关系如下,请编写程序实现水费的计算. 输入格式: 输入在一行中给出实数x. 输出格式: 按照以下的格 ...

  4. 微软发布了Visual Stduio 2010 RTM版本的虚拟机vhd文件,包含样例和动手实验(免费)...

    原文: hthttp://www.almnetworks.net/zh-CN/post/2010/06/30/Now-Available-Visual-Studio-2010-RTM-Virtual- ...

  5. VOT-toolkit Python 版本使用教程--官方样例版

    1.下载与安装 不成功的话,用以下方案: 附:如果你以这种方式,可以查看./build/lib/vot/utilities/cli.py下面的输入参数的含义. 这是vot-toolkit定义输入参数的 ...

  6. YOLOv4 资源环境配置和测试样例效果

    YOLOv4 资源环境配置和测试样例效果 基本环境:cuda=10.0,cudnn>=7.0, opencv>=2.4 一.下载yolov4 git clone https://githu ...

  7. 【ZooKeeper Notes 3】ZooKeeper Java API 使用样例

    查看PDF版本 转载请注明:@ni掌柜 nileader@gmail.com ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务框架,包含一组简单的原语集合.通过这些原语言的组合使用, ...

  8. Android NDK基础样例

    Android NDK基础样例 NDK(Native Development Kit),用C/C++封装一些东西?好像就这么理解好了== 一.环境准备 这个好讨厌==!因为我环境都已经搭了很久了. 已 ...

  9. HLS-Demo IOS 视屏直播样例

    from: https://github.com/yangchao0033/HLS-Demo/blob/master/README.md demo简介:如果觉得文章有用的话,请读者在github上点个 ...

最新文章

  1. [摘录]第8章 与非美国人谈判的技巧
  2. Nginx Rewrite详解
  3. 生鲜在卖场中的六大类别
  4. android 清屏函数,浅谈android截屏问题
  5. cmake打包ICONV库
  6. Python读写txt
  7. Java中堆内存和栈内存详解(转)
  8. Hibernate--Criteria Query and DetachedCriteria
  9. java 泛型(generics)使用总结
  10. 2012.4.13总结(一)
  11. 定位CPU异常抖动---tomcat热部署的坑[转载]
  12. WorkerMan源码分析 - 实现最简单的原型
  13. 腾讯X5 内核 的导入
  14. 石油化工行业的MES系统解决方案
  15. 携程线上测评测试题目,答案解析
  16. one class classification
  17. 王宝强代言计算机学校,《破晓屠龙》王宝强代言传奇游戏电脑手机都能玩
  18. 解决SQL Server报错:229、262、5123
  19. 【开源】司马编译器结构
  20. 电商秒杀系统设计分析

热门文章

  1. 敏捷测试2015新看点
  2. linux病毒sfewfesfs
  3. 【教程】Cubieboard变苹果无线airplay音响
  4. [转]RDLC报表-参数传递及主从报表
  5. Swift中的一致性哈希算法(补充)
  6. Jmeter 命令行选项目录
  7. Hibernate的各种保存方式的区别
  8. 【GitLab】gitlab上配置webhook后,点击测试报错:Requests to the local network are not allowed...
  9. 《Spring Cloud与Docker微服务架构实战》配套代码
  10. 对OCR文字识别软件进行自动分析和识别设置的教程